Split local data and timestamps into separate files
This sets us up to read all the timestamps before the data to enable
sorting. This doesn't do that yet. It only sets the files and formats
up to be ready.
Change-Id: I1213a70c90867a082ca37b21eafd99f9b95b600d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 40a331a..679d364 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -704,16 +704,10 @@
}
void MultiNodeLogNamer::Rotate(const Node *node) {
- if (node == this->node()) {
- if (data_writer_) {
- data_writer_->Rotate();
- }
- } else {
- for (std::pair<const Channel *const, NewDataWriter> &data_writer :
- data_writers_) {
- if (node == data_writer.second.node()) {
- data_writer.second.Rotate();
- }
+ for (auto &data_map : {&node_data_writers_, &node_timestamp_writers_}) {
+ auto it = data_map->find(node);
+ if (it != data_map->end()) {
+ it->second.Rotate();
}
}
}
@@ -757,25 +751,16 @@
return nullptr;
}
- // Now, sort out if this is data generated on this node, or not. It is
- // generated if it is sendable on this node.
- if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
- if (!data_writer_) {
- MakeDataWriter();
- }
- data_writer_->UpdateMaxMessageSize(
- PackMessageSize(LogType::kLogMessage, channel->max_size()));
- return data_writer_.get();
- }
-
// Ok, we have data that is being forwarded to us that we are supposed to
// log. It needs to be logged with send timestamps, but be sorted enough
// to be able to be processed.
- CHECK(data_writers_.find(channel) == data_writers_.end());
// Track that this node is being logged.
- const Node *source_node = configuration::GetNode(
- configuration_, channel->source_node()->string_view());
+ const Node *source_node =
+ configuration::MultiNode(configuration_)
+ ? configuration::GetNode(configuration_,
+ channel->source_node()->string_view())
+ : nullptr;
if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
nodes_.emplace_back(source_node);
@@ -783,26 +768,26 @@
// If we already have a data writer for the node, then use the same writer for
// all channels of that node.
- if (node_data_writers_.find(source_node) != node_data_writers_.end()) {
- node_data_writers_.find(source_node)
- ->second->UpdateMaxMessageSize(
- PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
- return node_data_writers_.find(source_node)->second;
+ auto it = node_data_writers_.find(source_node);
+ if (it != node_data_writers_.end()) {
+ it->second.UpdateMaxMessageSize(
+ PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
+ return &(it->second);
}
// If we don't have a data writer for the node, create one.
NewDataWriter data_writer(
this, source_node, node_,
- [this, channel](NewDataWriter *data_writer) {
- OpenWriter(channel, data_writer);
+ [this, source_node](NewDataWriter *data_writer) {
+ OpenDataWriter(source_node, data_writer);
},
[this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()),
{StoredDataType::DATA});
- data_writers_.emplace(channel, std::move(data_writer));
- node_data_writers_.emplace(source_node, &data_writers_.find(channel)->second);
- return &(data_writers_.find(channel)->second);
+ auto result = node_data_writers_.emplace(source_node, std::move(data_writer));
+ CHECK(result.second);
+ return &(result.first->second);
}
NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
@@ -812,30 +797,31 @@
configuration::ChannelIsReadableOnNode(channel, this->node());
CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
- CHECK(data_writers_.find(channel) == data_writers_.end());
-
if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
nodes_.emplace_back(node);
}
+ CHECK_NE(node, this->node());
+
// If we have a remote timestamp writer for a particular node, use the same
// writer for all remote timestamp channels of that node.
- if (node_timestamp_writers_.find(node) != node_timestamp_writers_.end()) {
- return node_timestamp_writers_.find(node)->second;
+ auto it = node_timestamp_writers_.find(node);
+ if (it != node_timestamp_writers_.end()) {
+ return &(it->second);
}
// If there are no remote timestamp writers for the node, create one.
NewDataWriter data_writer(
this, configuration::GetNode(configuration_, node), node_,
- [this, channel](NewDataWriter *data_writer) {
- OpenForwardedTimestampWriter(channel, data_writer);
+ [this](NewDataWriter *data_writer) {
+ OpenForwardedTimestampWriter(node_, data_writer);
},
[this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
PackRemoteMessageSize(), {StoredDataType::REMOTE_TIMESTAMPS});
- data_writers_.emplace(channel, std::move(data_writer));
- node_timestamp_writers_.emplace(node, &data_writers_.find(channel)->second);
- return &(data_writers_.find(channel)->second);
+ auto result = node_timestamp_writers_.emplace(node, std::move(data_writer));
+ CHECK(result.second);
+ return &(result.first->second);
}
NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
@@ -848,19 +834,29 @@
return nullptr;
}
- if (!data_writer_) {
- MakeDataWriter();
+ // There is only one of these.
+ auto it = node_timestamp_writers_.find(this->node());
+ if (it != node_timestamp_writers_.end()) {
+ it->second.UpdateMaxMessageSize(
+ PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
+ return &(it->second);
}
- data_writer_->UpdateMaxMessageSize(
- PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
- return data_writer_.get();
+
+ NewDataWriter data_writer(
+ this, node_, node_,
+ [this](NewDataWriter *data_writer) { OpenTimestampWriter(data_writer); },
+ [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+ PackMessageSize(LogType::kLogDeliveryTimeOnly, 0),
+ {StoredDataType::TIMESTAMPS});
+
+ auto result = node_timestamp_writers_.emplace(node_, std::move(data_writer));
+ CHECK(result.second);
+ return &(result.first->second);
}
WriteCode MultiNodeLogNamer::Close() {
- data_writers_.clear();
node_data_writers_.clear();
node_timestamp_writers_.clear();
- data_writer_.reset();
if (ran_out_of_space_) {
return WriteCode::kOutOfSpace;
}
@@ -868,13 +864,15 @@
}
void MultiNodeLogNamer::ResetStatistics() {
- for (std::pair<const Channel *const, NewDataWriter> &data_writer :
- data_writers_) {
+ for (std::pair<const Node *const, NewDataWriter> &data_writer :
+ node_data_writers_) {
if (!data_writer.second.writer) continue;
data_writer.second.writer->WriteStatistics()->ResetStats();
}
- if (data_writer_ != nullptr && data_writer_->writer != nullptr) {
- data_writer_->writer->WriteStatistics()->ResetStats();
+ for (std::pair<const Node *const, NewDataWriter> &data_writer :
+ node_timestamp_writers_) {
+ if (!data_writer.second.writer) continue;
+ data_writer.second.writer->WriteStatistics()->ResetStats();
}
max_write_time_ = std::chrono::nanoseconds::zero();
max_write_time_bytes_ = -1;
@@ -886,48 +884,38 @@
}
void MultiNodeLogNamer::OpenForwardedTimestampWriter(
- const Channel *channel, NewDataWriter *data_writer) {
- std::string filename =
- absl::StrCat("timestamps/", data_writer->node()->name()->string_view(),
- "/source_", channel->source_node()->string_view(),
- "_timestamp_", data_writer->node()->name()->string_view(),
- ".part", data_writer->parts_index(), ".bfbs", extension_);
- CreateBufferWriter(filename, data_writer->max_message_size(),
- &data_writer->writer);
-}
-
-void MultiNodeLogNamer::OpenWriter(const Channel *channel,
- NewDataWriter *data_writer) {
+ const Node * /*source_node*/, NewDataWriter *data_writer) {
const std::string filename = absl::StrCat(
- CHECK_NOTNULL(channel->source_node())->string_view(), "_data/",
- channel->source_node()->string_view(), "_data.part",
+ "timestamps/remote_", data_writer->node()->name()->string_view(), ".part",
data_writer->parts_index(), ".bfbs", extension_);
CreateBufferWriter(filename, data_writer->max_message_size(),
&data_writer->writer);
}
-void MultiNodeLogNamer::MakeDataWriter() {
- if (!data_writer_) {
- data_writer_ = std::make_unique<NewDataWriter>(
- this, node_, node_,
- [this](NewDataWriter *writer) {
- std::string name;
- if (node() != nullptr) {
- name = absl::StrCat(name, node()->name()->string_view(), "_");
- }
- absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
- extension_);
- CreateBufferWriter(name, writer->max_message_size(), &writer->writer);
- },
- [this](NewDataWriter *data_writer) {
- CloseWriter(&data_writer->writer);
- },
- // Default size is 0 so it will be obvious if something doesn't fix it
- // afterwards.
- 0,
- std::initializer_list<StoredDataType>{StoredDataType::DATA,
- StoredDataType::TIMESTAMPS});
+void MultiNodeLogNamer::OpenDataWriter(const Node *source_node,
+ NewDataWriter *data_writer) {
+ std::string filename;
+
+ if (source_node != nullptr) {
+ if (source_node == node_) {
+ filename = absl::StrCat(source_node->name()->string_view(), "_");
+ } else {
+ filename = absl::StrCat("data/", source_node->name()->string_view(), "_");
+ }
}
+
+ absl::StrAppend(&filename, "data.part", data_writer->parts_index(), ".bfbs",
+ extension_);
+ CreateBufferWriter(filename, data_writer->max_message_size(),
+ &data_writer->writer);
+}
+
+void MultiNodeLogNamer::OpenTimestampWriter(NewDataWriter *data_writer) {
+ std::string filename =
+ absl::StrCat(node()->name()->string_view(), "_timestamps.part",
+ data_writer->parts_index(), ".bfbs", extension_);
+ CreateBufferWriter(filename, data_writer->max_message_size(),
+ &data_writer->writer);
}
void MultiNodeLogNamer::CreateBufferWriter(
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 6a18997..ccdfe5a 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -506,14 +506,12 @@
private:
// Opens up a writer for timestamps forwarded back.
- void OpenForwardedTimestampWriter(const Channel *channel,
+ void OpenForwardedTimestampWriter(const Node *source_node,
NewDataWriter *data_writer);
// Opens up a writer for remote data.
- void OpenWriter(const Channel *channel, NewDataWriter *data_writer);
-
- // Opens the main data writer file for this node responsible for data_writer_.
- void MakeDataWriter();
+ void OpenDataWriter(const Node *source_node, NewDataWriter *data_writer);
+ void OpenTimestampWriter(NewDataWriter *data_writer);
void CreateBufferWriter(std::string_view path, size_t max_message_size,
std::unique_ptr<DetachedBufferWriter> *destination);
@@ -523,14 +521,17 @@
// A version of std::accumulate which operates over all of our DataWriters.
template <typename T, typename BinaryOperation>
T accumulate_data_writers(T t, BinaryOperation op) const {
- for (const std::pair<const Channel *const, NewDataWriter> &data_writer :
- data_writers_) {
+ for (const std::pair<const Node *const, NewDataWriter> &data_writer :
+ node_data_writers_) {
if (data_writer.second.writer != nullptr) {
t = op(std::move(t), data_writer.second);
}
}
- if (data_writer_ != nullptr && data_writer_->writer != nullptr) {
- t = op(std::move(t), *data_writer_);
+ for (const std::pair<const Node *const, NewDataWriter> &data_writer :
+ node_timestamp_writers_) {
+ if (data_writer.second.writer != nullptr) {
+ t = op(std::move(t), data_writer.second);
+ }
}
return t;
}
@@ -552,15 +553,10 @@
int total_write_messages_ = 0;
int total_write_bytes_ = 0;
- // File to write both delivery timestamps and local data to.
- std::unique_ptr<NewDataWriter> data_writer_;
-
- std::map<const Channel *, NewDataWriter> data_writers_;
-
// Data writer per remote node.
- std::map<const Node *, NewDataWriter *> node_data_writers_;
+ std::map<const Node *, NewDataWriter> node_data_writers_;
// Remote timestamp writers per node.
- std::map<const Node *, NewDataWriter *> node_timestamp_writers_;
+ std::map<const Node *, NewDataWriter> node_timestamp_writers_;
};
// This is specialized log namer that deals with directory centric log events.
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index f6e5447..648f430 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -79,7 +79,7 @@
}
EXPECT_EQ(logfile_uuids.size(), 2u);
- EXPECT_EQ(parts_uuids.size(), 6u);
+ EXPECT_EQ(parts_uuids.size(), 8u);
// And confirm everything is on the correct node.
EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
@@ -105,12 +105,14 @@
// And the parts index matches.
EXPECT_EQ(log_header[2].message().parts_index(), 0);
- EXPECT_EQ(log_header[3].message().parts_index(), 1);
- EXPECT_EQ(log_header[4].message().parts_index(), 2);
+
+ EXPECT_EQ(log_header[3].message().parts_index(), 0);
+ EXPECT_EQ(log_header[4].message().parts_index(), 1);
EXPECT_EQ(log_header[5].message().parts_index(), 0);
- EXPECT_EQ(log_header[6].message().parts_index(), 1);
- EXPECT_EQ(log_header[7].message().parts_index(), 2);
+
+ EXPECT_EQ(log_header[6].message().parts_index(), 0);
+ EXPECT_EQ(log_header[7].message().parts_index(), 1);
EXPECT_EQ(log_header[8].message().parts_index(), 0);
EXPECT_EQ(log_header[9].message().parts_index(), 1);
@@ -127,24 +129,18 @@
// And that the data_stored field is right.
EXPECT_THAT(*log_header[2].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::DATA));
EXPECT_THAT(*log_header[3].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
EXPECT_THAT(*log_header[4].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
EXPECT_THAT(*log_header[5].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::DATA));
EXPECT_THAT(*log_header[6].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
EXPECT_THAT(*log_header[7].message().data_stored(),
- ::testing::ElementsAre(StoredDataType::DATA,
- StoredDataType::TIMESTAMPS));
+ ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
EXPECT_THAT(*log_header[8].message().data_stored(),
::testing::ElementsAre(StoredDataType::DATA));
@@ -176,49 +172,44 @@
std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
// Timing reports, pings
- EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/pi1/aos",
- "aos.message_bridge.ServerStatistics", 1),
- std::make_tuple("/test", "aos.examples.Ping", 1),
- std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
- << " : " << logfiles_[2];
- {
- std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
- std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
- 1)};
- if (!std::get<0>(GetParam()).shared) {
- channel_counts.push_back(
- std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
- "aos-message_bridge-Timestamp",
- "aos.message_bridge.RemoteMessage", 1));
- }
- EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
- ::testing::UnorderedElementsAreArray(channel_counts))
- << " : " << logfiles_[3];
+ if (shared()) {
+ EXPECT_THAT(
+ CountChannelsData(config, logfiles_[2]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+ 200),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+ 21),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
+ std::make_tuple("/test", "aos.examples.Ping", 2001)))
+ << " : " << logfiles_[2];
+ } else {
+ EXPECT_THAT(
+ CountChannelsData(config, logfiles_[2]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+ 200),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+ 21),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
+ std::make_tuple("/test", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp",
+ "aos.message_bridge.RemoteMessage", 200)))
+ << " : " << logfiles_[2];
}
- {
- std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
- std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
- 20),
- std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
- 199),
- std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
- std::make_tuple("/test", "aos.examples.Ping", 2000)};
- if (!std::get<0>(GetParam()).shared) {
- channel_counts.push_back(
- std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
- "aos-message_bridge-Timestamp",
- "aos.message_bridge.RemoteMessage", 199));
- }
- EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
- ::testing::UnorderedElementsAreArray(channel_counts))
- << " : " << logfiles_[4];
- }
+
+ EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+ ::testing::UnorderedElementsAre())
+ << " : " << logfiles_[3];
+ EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+ ::testing::UnorderedElementsAre())
+ << " : " << logfiles_[4];
+
// Timestamps for pong
EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
UnorderedElementsAre())
@@ -235,27 +226,21 @@
<< " : " << logfiles_[4];
// Timing reports and pongs.
- EXPECT_THAT(CountChannelsData(config, logfiles_[5]),
- UnorderedElementsAre(
- std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
- std::make_tuple("/pi2/aos",
- "aos.message_bridge.ServerStatistics", 1)))
- << " : " << logfiles_[5];
EXPECT_THAT(
- CountChannelsData(config, logfiles_[6]),
- UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
- << " : " << logfiles_[6];
- EXPECT_THAT(
- CountChannelsData(config, logfiles_[7]),
+ CountChannelsData(config, logfiles_[5]),
UnorderedElementsAre(
- std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
- std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
- std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
- 20),
+ std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
200),
+ std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
+ 21),
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
- std::make_tuple("/test", "aos.examples.Pong", 2000)))
+ std::make_tuple("/test", "aos.examples.Pong", 2001)))
+ << " : " << logfiles_[5];
+ EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
+ << " : " << logfiles_[6];
+ EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
<< " : " << logfiles_[7];
// And ping timestamps.
EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
@@ -977,7 +962,7 @@
}
const std::vector<LogFile> sorted_parts =
- SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
+ SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 1, 1, 2, 2));
EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
LogReader reader(sorted_parts);
@@ -1155,10 +1140,10 @@
// For snappy, needs to have enough data to be >1 chunk of compressed data so
// that we don't corrupt the entire log part.
::std::string compressed_contents =
- aos::util::ReadFileToStringOrDie(logfiles_[4]);
+ aos::util::ReadFileToStringOrDie(logfiles_[2]);
aos::util::WriteStringToFileOrDie(
- logfiles_[4],
+ logfiles_[2],
compressed_contents.substr(0, compressed_contents.size() - 100));
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
@@ -1954,7 +1939,7 @@
// hitting any fatal errors.
{
LogReader relogged_reader(SortParts(MakeLogFiles(
- tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
+ tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 1, 1, 2, 2, true)));
relogged_reader.Register();
relogged_reader.event_loop_factory()->Run();
@@ -1964,7 +1949,7 @@
{
LogReader relogged_reader(
SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
- 3, 3, true)),
+ 1, 1, 2, 2, true)),
reader.configuration());
relogged_reader.Register();
@@ -2150,7 +2135,7 @@
//
// TODO(austin): I'm not the most thrilled with this test pattern... It
// feels brittle in a different way.
- if (file.find("source_pi1_timestamp_pi2") == std::string::npos) {
+ if (file.find("timestamps/remote_pi2") == std::string::npos) {
switch (log_header->message().parts_index()) {
case 0:
EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
@@ -2287,93 +2272,107 @@
monotonic_clock::max_time);
EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
monotonic_clock::max_time);
- switch (log_header->message().parts_index()) {
- case 0:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::max_time);
- break;
- case 1:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90200)));
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90350)));
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90200)));
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90350)));
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::max_time);
- break;
- case 2:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90200)))
- << file;
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90350)))
- << file;
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90200)))
- << file;
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(90350)))
- << file;
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(100000)))
- << file;
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(100150)))
- << file;
- break;
- case 3:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::milliseconds(1323) +
- chrono::microseconds(200)));
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100350)));
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::milliseconds(1323) +
- chrono::microseconds(200)));
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100350)));
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::max_time)
- << file;
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::max_time)
- << file;
- break;
- case 4:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::milliseconds(1323) +
- chrono::microseconds(200)));
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100350)));
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::milliseconds(1323) +
- chrono::microseconds(200)));
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100350)));
- EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(1423000)))
- << file;
- EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10200150)))
- << file;
- break;
- default:
- FAIL();
- break;
+ if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
+ switch (log_header->message().parts_index()) {
+ case 0:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_local_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ break;
+ default:
+ FAIL();
+ break;
+ }
+ } else if (log_header->message().data_stored()->Get(0) ==
+ StoredDataType::TIMESTAMPS) {
+ switch (log_header->message().parts_index()) {
+ case 0:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90200)));
+ EXPECT_EQ(oldest_local_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90350)));
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90200)));
+ EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90350)));
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::max_time);
+ break;
+ case 1:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90200)))
+ << file;
+ EXPECT_EQ(oldest_local_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90350)))
+ << file;
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90200)))
+ << file;
+ EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(90350)))
+ << file;
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(100000)))
+ << file;
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(100150)))
+ << file;
+ break;
+ case 2:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
+ EXPECT_EQ(
+ oldest_local_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
+ EXPECT_EQ(
+ oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::max_time)
+ << file;
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::max_time)
+ << file;
+ break;
+ case 3:
+ ASSERT_EQ(oldest_remote_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
+ EXPECT_EQ(
+ oldest_local_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
+ EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
+ EXPECT_EQ(
+ oldest_local_unreliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
+ EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(1423000)))
+ << file;
+ EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+ monotonic_clock::time_point(chrono::microseconds(10200150)))
+ << file;
+ break;
+ default:
+ FAIL();
+ break;
+ }
}
}
@@ -2513,7 +2512,7 @@
2u);
if (log_header->message().node()->name()->string_view() != "pi1") {
- ASSERT_TRUE(file.find("source_pi1_timestamp_pi2") != std::string::npos);
+ ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
ReadNthMessage(file, 0);
@@ -2915,8 +2914,10 @@
std::vector<std::string> missing_parts;
- missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
- missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
+ missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
+ Extension());
+ missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
+ Extension());
missing_parts.emplace_back(absl::StrCat(
logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
diff --git a/aos/events/logging/multinode_logger_test_lib.cc b/aos/events/logging/multinode_logger_test_lib.cc
index d15a277..f383778 100644
--- a/aos/events/logging/multinode_logger_test_lib.cc
+++ b/aos/events/logging/multinode_logger_test_lib.cc
@@ -109,8 +109,9 @@
unlink((file + ".xz").c_str());
}
- for (const auto &file : MakeLogFiles(tmp_dir_ + "/relogged1",
- tmp_dir_ + "/relogged2", 3, 3, true)) {
+ for (const auto &file :
+ MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, 3,
+ 3, true)) {
unlink(file.c_str());
}
@@ -137,7 +138,8 @@
std::vector<std::string> MultinodeLoggerTest::MakeLogFiles(
std::string logfile_base1, std::string logfile_base2, size_t pi1_data_count,
- size_t pi2_data_count, bool relogged_config) {
+ size_t pi2_data_count, size_t pi1_timestamps_count,
+ size_t pi2_timestamps_count, bool relogged_config) {
std::string_view sha256 = relogged_config
? std::get<0>(GetParam()).relogged_sha256
: std::get<0>(GetParam()).sha256;
@@ -148,30 +150,33 @@
result.emplace_back(
absl::StrCat(logfile_base1, "_pi1_data.part", i, Extension()));
}
+ for (size_t i = 0; i < pi1_timestamps_count; ++i) {
+ result.emplace_back(
+ absl::StrCat(logfile_base1, "_pi1_timestamps.part", i, Extension()));
+ }
for (size_t i = 0; i < pi2_data_count; ++i) {
result.emplace_back(
absl::StrCat(logfile_base2, "_pi2_data.part", i, Extension()));
}
- result.emplace_back(logfile_base2 + "_pi1_data/pi1_data.part0" + Extension());
- result.emplace_back(logfile_base2 + "_pi1_data/pi1_data.part1" + Extension());
- result.emplace_back(logfile_base1 + "_pi2_data/pi2_data.part0" + Extension());
- result.emplace_back(logfile_base1 + "_pi2_data/pi2_data.part1" + Extension());
+ for (size_t i = 0; i < pi2_timestamps_count; ++i) {
+ result.emplace_back(
+ absl::StrCat(logfile_base2, "_pi2_timestamps.part", i, Extension()));
+ }
+ result.emplace_back(logfile_base2 + "_data/pi1_data.part0" + Extension());
+ result.emplace_back(logfile_base2 + "_data/pi1_data.part1" + Extension());
+ result.emplace_back(logfile_base1 + "_data/pi2_data.part0" + Extension());
+ result.emplace_back(logfile_base1 + "_data/pi2_data.part1" + Extension());
// shared and not shared config types will have the same output since the data
// writers are consolidated to per node instead of per channel.
- result.emplace_back(logfile_base1 +
- "_timestamps/pi2/source_pi1_timestamp_pi2.part0" +
+ result.emplace_back(logfile_base1 + "_timestamps/remote_pi2.part0" +
Extension());
- result.emplace_back(logfile_base1 +
- "_timestamps/pi2/source_pi1_timestamp_pi2.part1" +
+ result.emplace_back(logfile_base1 + "_timestamps/remote_pi2.part1" +
Extension());
- result.emplace_back(logfile_base1 +
- "_timestamps/pi2/source_pi1_timestamp_pi2.part2" +
+ result.emplace_back(logfile_base1 + "_timestamps/remote_pi2.part2" +
Extension());
- result.emplace_back(logfile_base2 +
- "_timestamps/pi1/source_pi2_timestamp_pi1.part0" +
+ result.emplace_back(logfile_base2 + "_timestamps/remote_pi1.part0" +
Extension());
- result.emplace_back(logfile_base2 +
- "_timestamps/pi1/source_pi2_timestamp_pi1.part1" +
+ result.emplace_back(logfile_base2 + "_timestamps/remote_pi1.part1" +
Extension());
return result;
@@ -180,24 +185,20 @@
std::vector<std::string> MultinodeLoggerTest::MakePi1RebootLogfiles() {
std::vector<std::string> result;
result.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
- result.emplace_back(logfile_base1_ + "_pi1_data.part1" + Extension());
- result.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
- result.emplace_back(logfile_base1_ + "_pi1_data.part3" + Extension());
- result.emplace_back(logfile_base1_ + "_pi1_data.part4" + Extension());
- result.emplace_back(logfile_base1_ + "_pi2_data/pi2_data.part0" +
- Extension());
- result.emplace_back(logfile_base1_ + "_pi2_data/pi2_data.part1" +
- Extension());
- result.emplace_back(logfile_base1_ + "_pi2_data/pi2_data.part2" +
- Extension());
- result.emplace_back(logfile_base1_ + "_pi2_data/pi2_data.part3" +
- Extension());
+ result.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" + Extension());
+ result.emplace_back(logfile_base1_ + "_pi1_timestamps.part1" + Extension());
+ result.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" + Extension());
+ result.emplace_back(logfile_base1_ + "_pi1_timestamps.part3" + Extension());
+
+ result.emplace_back(logfile_base1_ + "_data/pi2_data.part0" + Extension());
+ result.emplace_back(logfile_base1_ + "_data/pi2_data.part1" + Extension());
+ result.emplace_back(logfile_base1_ + "_data/pi2_data.part2" + Extension());
+ result.emplace_back(logfile_base1_ + "_data/pi2_data.part3" + Extension());
result.emplace_back(absl::StrCat(
logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
for (size_t i = 0; i < 6; ++i) {
result.emplace_back(absl::StrCat(
- logfile_base1_, "_timestamps/pi2/source_pi1_timestamp_pi2.part", i,
- Extension()));
+ logfile_base1_, "_timestamps/remote_pi2.part", i, Extension()));
}
return result;
}
@@ -205,7 +206,7 @@
std::vector<std::string> MultinodeLoggerTest::MakePi1SingleDirectionLogfiles() {
std::vector<std::string> result;
result.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
- result.emplace_back(logfile_base1_ + "_pi1_data.part1" + Extension());
+ result.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" + Extension());
result.emplace_back(absl::StrCat(
logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
return result;
@@ -221,9 +222,10 @@
std::vector<std::vector<std::string>> MultinodeLoggerTest::StructureLogFiles() {
std::vector<std::vector<std::string>> result{
- std::vector<std::string>{logfiles_[2], logfiles_[3], logfiles_[4]},
- // std::vector<std::string>{logfiles_[5], logfiles_[6]},
- std::vector<std::string>{logfiles_[5], logfiles_[6], logfiles_[7]},
+ std::vector<std::string>{logfiles_[2]},
+ std::vector<std::string>{logfiles_[3], logfiles_[4]},
+ std::vector<std::string>{logfiles_[5]},
+ std::vector<std::string>{logfiles_[6], logfiles_[7]},
std::vector<std::string>{logfiles_[8], logfiles_[9]},
std::vector<std::string>{logfiles_[10], logfiles_[11]}};
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index cf05727..1bea704 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -115,8 +115,10 @@
std::vector<std::string> MakeLogFiles(std::string logfile_base1,
std::string logfile_base2,
- size_t pi1_data_count = 3,
- size_t pi2_data_count = 3,
+ size_t pi1_data_count = 1,
+ size_t pi2_data_count = 1,
+ size_t pi1_timestamp_count = 2,
+ size_t pi2_timestamp_count = 2,
bool relogged_config = false);
std::vector<std::string> MakePi1RebootLogfiles();