Split local data and timestamps into separate files
This sets us up to read all the timestamps before the data to enable
sorting. This doesn't do that yet. It only sets the files and formats
up to be ready.
Change-Id: I1213a70c90867a082ca37b21eafd99f9b95b600d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index f6e5447..648f430 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -79,7 +79,7 @@
}
EXPECT_EQ(logfile_uuids.size(), 2u);
- EXPECT_EQ(parts_uuids.size(), 6u);
+ EXPECT_EQ(parts_uuids.size(), 8u);
// And confirm everything is on the correct node.
EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
@@ -105,12 +105,14 @@
// And the parts index matches.
EXPECT_EQ(log_header[2].message().parts_index(), 0);
- EXPECT_EQ(log_header[3].message().parts_index(), 1);
- EXPECT_EQ(log_header[4].message().parts_index(), 2);
+
+ EXPECT_EQ(log_header[3].message().parts_index(), 0);
+ EXPECT_EQ(log_header[4].message().parts_index(), 1);
EXPECT_EQ(log_header[5].message().parts_index(), 0);
- EXPECT_EQ(log_header[6].message().parts_index(), 1);
- EXPECT_EQ(log_header[7].message().parts_index(), 2);
+
+ EXPECT_EQ(log_header[6].message().parts_index(), 0);
+ EXPECT_EQ(log_header[7].message().parts_index(), 1);
EXPECT_EQ(log_header[8].message().parts_index(), 0);
EXPECT_EQ(log_header[9].message().parts_index(), 1);
@@ -127,24 +129,18 @@
// And that the data_stored field is right.
EXPECT_THAT(*log_header[2].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::DATA));
EXPECT_THAT(*log_header[3].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
EXPECT_THAT(*log_header[4].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
EXPECT_THAT(*log_header[5].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::DATA));
EXPECT_THAT(*log_header[6].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
EXPECT_THAT(*log_header[7].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
EXPECT_THAT(*log_header[8].message().data_stored(),
::testing::ElementsAre(StoredDataType::DATA));
@@ -176,49 +172,44 @@
std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
// Timing reports, pings
- EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/pi1/aos",
- "aos.message_bridge.ServerStatistics", 1),
- std::make_tuple("/test", "aos.examples.Ping", 1),
- std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
- << " : " << logfiles_[2];
- {
- std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
- std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
- 1)};
- if (!std::get<0>(GetParam()).shared) {
- channel_counts.push_back(
- std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
- "aos-message_bridge-Timestamp",
- "aos.message_bridge.RemoteMessage", 1));
- }
- EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
- ::testing::UnorderedElementsAreArray(channel_counts))
- << " : " << logfiles_[3];
+ if (shared()) {
+ EXPECT_THAT(
+ CountChannelsData(config, logfiles_[2]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+ 200),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+ 21),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
+ std::make_tuple("/test", "aos.examples.Ping", 2001)))
+ << " : " << logfiles_[2];
+ } else {
+ EXPECT_THAT(
+ CountChannelsData(config, logfiles_[2]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+ 200),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+ 21),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
+ std::make_tuple("/test", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp",
+ "aos.message_bridge.RemoteMessage", 200)))
+ << " : " << logfiles_[2];
}
- {
- std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
- std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
- 20),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
- 199),
- std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
- std::make_tuple("/test", "aos.examples.Ping", 2000)};
- if (!std::get<0>(GetParam()).shared) {
- channel_counts.push_back(
- std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
- "aos-message_bridge-Timestamp",
- "aos.message_bridge.RemoteMessage", 199));
- }
- EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
- ::testing::UnorderedElementsAreArray(channel_counts))
- << " : " << logfiles_[4];
- }
+
+ EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+ ::testing::UnorderedElementsAre())
+ << " : " << logfiles_[3];
+ EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+ ::testing::UnorderedElementsAre())
+ << " : " << logfiles_[4];
+
// Timestamps for pong
EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
UnorderedElementsAre())
@@ -235,27 +226,21 @@
<< " : " << logfiles_[4];
// Timing reports and pongs.
- EXPECT_THAT(CountChannelsData(config, logfiles_[5]),
- UnorderedElementsAre(
- std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
- std::make_tuple("/pi2/aos",
- "aos.message_bridge.ServerStatistics", 1)))
- << " : " << logfiles_[5];
EXPECT_THAT(
- CountChannelsData(config, logfiles_[6]),
- UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
- << " : " << logfiles_[6];
- EXPECT_THAT(
- CountChannelsData(config, logfiles_[7]),
+ CountChannelsData(config, logfiles_[5]),
UnorderedElementsAre(
- std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
- std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
- std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
- 20),
+ std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
200),
+ std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
+ 21),
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
- std::make_tuple("/test", "aos.examples.Pong", 2000)))
+ std::make_tuple("/test", "aos.examples.Pong", 2001)))
+ << " : " << logfiles_[5];
+ EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
+ << " : " << logfiles_[6];
+ EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
<< " : " << logfiles_[7];
// And ping timestamps.
EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
@@ -977,7 +962,7 @@
}
const std::vector<LogFile> sorted_parts =
- SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
+ SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 1, 1, 2, 2));
EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
LogReader reader(sorted_parts);
@@ -1155,10 +1140,10 @@
// For snappy, needs to have enough data to be >1 chunk of compressed data so
// that we don't corrupt the entire log part.
::std::string compressed_contents =
- aos::util::ReadFileToStringOrDie(logfiles_[4]);
+ aos::util::ReadFileToStringOrDie(logfiles_[2]);
aos::util::WriteStringToFileOrDie(
- logfiles_[4],
+ logfiles_[2],
compressed_contents.substr(0, compressed_contents.size() - 100));
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
@@ -1954,7 +1939,7 @@
// hitting any fatal errors.
{
LogReader relogged_reader(SortParts(MakeLogFiles(
- tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
+ tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 1, 1, 2, 2, true)));
relogged_reader.Register();
relogged_reader.event_loop_factory()->Run();
@@ -1964,7 +1949,7 @@
{
LogReader relogged_reader(
SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
- 3, 3, true)),
+ 1, 1, 2, 2, true)),
reader.configuration());
relogged_reader.Register();
@@ -2150,7 +2135,7 @@
//
// TODO(austin): I'm not the most thrilled with this test pattern... It
// feels brittle in a different way.
- if (file.find("source_pi1_timestamp_pi2") == std::string::npos) {
+ if (file.find("timestamps/remote_pi2") == std::string::npos) {
switch (log_header->message().parts_index()) {
case 0:
EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
@@ -2287,93 +2272,107 @@
monotonic_clock::max_time);
EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
monotonic_clock::max_time);
- switch (log_header->message().parts_index()) {
- case 0:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::max_time);
- break;
- case 1:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90200)));
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90350)));
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90200)));
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90350)));
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::max_time);
- break;
- case 2:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90200)))
- << file;
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90350)))
- << file;
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90200)))
- << file;
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90350)))
- << file;
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(100000)))
- << file;
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(100150)))
- << file;
- break;
- case 3:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::milliseconds(1323) +
- chrono::microseconds(200)));
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100350)));
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::milliseconds(1323) +
- chrono::microseconds(200)));
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100350)));
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::max_time)
- << file;
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::max_time)
- << file;
- break;
- case 4:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::milliseconds(1323) +
- chrono::microseconds(200)));
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100350)));
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::milliseconds(1323) +
- chrono::microseconds(200)));
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100350)));
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(1423000)))
- << file;
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10200150)))
- << file;
- break;
- default:
- FAIL();
- break;
+ if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
+ switch (log_header->message().parts_index()) {
+ case 0:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_local_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ break;
+ default:
+ FAIL();
+ break;
+ }
+ } else if (log_header->message().data_stored()->Get(0) ==
+ StoredDataType::TIMESTAMPS) {
+ switch (log_header->message().parts_index()) {
+ case 0:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90200)));
+ EXPECT_EQ(oldest_local_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90350)));
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90200)));
+ EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90350)));
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ break;
+ case 1:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90200)))
+ << file;
+ EXPECT_EQ(oldest_local_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90350)))
+ << file;
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90200)))
+ << file;
+ EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90350)))
+ << file;
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(100000)))
+ << file;
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(100150)))
+ << file;
+ break;
+ case 2:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
+ EXPECT_EQ(
+ oldest_local_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
+ EXPECT_EQ(
+ oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::max_time)
+ << file;
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::max_time)
+ << file;
+ break;
+ case 3:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
+ EXPECT_EQ(
+ oldest_local_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
+ EXPECT_EQ(
+ oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(1423000)))
+ << file;
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10200150)))
+ << file;
+ break;
+ default:
+ FAIL();
+ break;
+ }
}
}
@@ -2513,7 +2512,7 @@
2u);
if (log_header->message().node()->name()->string_view() != "pi1") {
- ASSERT_TRUE(file.find("source_pi1_timestamp_pi2") != std::string::npos);
+ ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
ReadNthMessage(file, 0);
@@ -2915,8 +2914,10 @@
std::vector<std::string> missing_parts;
- missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
- missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
+ missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
+ Extension());
+ missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
+ Extension());
missing_parts.emplace_back(absl::StrCat(
logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));