Add helper classes for log reading
The goal is to inject span reader factory deep into the log reading
and sorting. LogFilesContainer encapsulates log files and provides
a couple of useful validations and abstractions to simplify log sorting.
It is the first part of change. Follow one will push SelectedLogParts
even deeper into the log reader.
Change-Id: Ic5253bd2b7c87fbbf55ad8d39a480af2871ddb71
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 8c057e9..cd6da67 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -764,9 +764,10 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
- PartsMerger merger(FilterPartsForNode(parts, "pi1"));
+ PartsMerger merger("pi1", 0, log_files);
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -865,9 +866,10 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
- PartsMerger merger(FilterPartsForNode(parts, "pi1"));
+ PartsMerger merger("pi1", 0, log_files);
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -929,16 +931,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1071,20 +1074,21 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
// mapper0 will not provide any messages while mapper1 will provide all
// messages due to the channel filter callbacks used
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
mapper0.set_replay_channels_callback(
[&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
mapper1.set_replay_channels_callback(
@@ -1211,19 +1215,15 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
- for (const auto &p : parts) {
- LOG(INFO) << p;
- }
-
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1335,16 +1335,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1454,16 +1455,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1537,16 +1539,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1621,16 +1624,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1695,16 +1699,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1761,9 +1766,10 @@
const std::vector<LogFile> parts =
SortParts({logfile0_, logfile1_, logfile2_});
+ LogFilesContainer log_files(parts);
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
@@ -1799,12 +1805,13 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1872,16 +1879,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -2130,10 +2138,11 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
ASSERT_EQ(parts[0].parts.size(), 2u);
- BootMerger merger(FilterPartsForNode(parts, "pi2"));
+ BootMerger merger("pi2", log_files);
EXPECT_EQ(merger.node(), 1u);
@@ -2308,6 +2317,7 @@
const std::vector<LogFile> parts =
SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+ LogFilesContainer log_files(parts);
for (const auto &x : parts) {
LOG(INFO) << x;
@@ -2316,11 +2326,11 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -2526,6 +2536,7 @@
const std::vector<LogFile> parts =
SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+ LogFilesContainer log_files(parts);
for (const auto &x : parts) {
LOG(INFO) << x;
@@ -2534,11 +2545,11 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });