Add multi-node local logging to the logger
This is not yet able to forward messages, but is able to log messages
that have been forwarded. Create a log file and test that the
timestamps are getting recorded correctly.
Change-Id: Ica891dbc560543546f6ee594438cebb03672190e
diff --git a/aos/aos_dump.cc b/aos/aos_dump.cc
index e7d9d7c..8d6c2b1 100644
--- a/aos/aos_dump.cc
+++ b/aos/aos_dump.cc
@@ -8,6 +8,7 @@
#include "gflags/gflags.h"
DEFINE_string(config, "./config.json", "File path of aos configuration");
+
int main(int argc, char **argv) {
aos::InitGoogle(&argc, &argv);
@@ -43,13 +44,28 @@
if (channel->name()->c_str() == channel_name &&
channel->type()->str().find(message_type) != std::string::npos) {
event_loop.MakeRawWatcher(
- channel,
- [channel](const aos::Context /* &context*/, const void *message) {
- LOG(INFO) << '(' << channel->type()->c_str() << ") "
- << aos::FlatbufferToJson(
- channel->schema(),
- static_cast<const uint8_t *>(message))
- << '\n';
+ channel, [channel](const aos::Context &context, const void *message) {
+ // Print the flatbuffer out to stdout, both to remove the
+ // unnecessary cruft from glog and to allow the user to readily
+ // redirect just the logged output independent of any debugging
+ // information on stderr.
+ if (context.monotonic_remote_time != context.monotonic_event_time) {
+ std::cout << context.realtime_remote_time << " ("
+ << context.monotonic_remote_time << ") delivered "
+ << context.realtime_event_time << " ("
+ << context.monotonic_event_time << "): "
+ << aos::FlatbufferToJson(
+ channel->schema(),
+ static_cast<const uint8_t *>(message))
+ << '\n';
+ } else {
+ std::cout << context.realtime_event_time << " ("
+ << context.monotonic_event_time << "): "
+ << aos::FlatbufferToJson(
+ channel->schema(),
+ static_cast<const uint8_t *>(message))
+ << '\n';
+ }
});
found_channels++;
}
diff --git a/aos/events/BUILD b/aos/events/BUILD
index dfad4ff..e40ab99 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -115,6 +115,16 @@
deps = [":config"],
)
+aos_config(
+ name = "multinode_pingpong_config",
+ src = "multinode_pingpong.json",
+ flatbuffers = [
+ ":ping_fbs",
+ ":pong_fbs",
+ ],
+ deps = [":config"],
+)
+
cc_library(
name = "pong_lib",
srcs = [
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 94e25c2..80d8798 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -65,7 +65,10 @@
cc_test(
name = "logger_test",
srcs = ["logger_test.cc"],
- data = ["//aos/events:pingpong_config.json"],
+ data = [
+ "//aos/events:multinode_pingpong_config.json",
+ "//aos/events:pingpong_config.json",
+ ],
deps = [
":logger",
"//aos/events:ping_lib",
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index a53d337..45f0811 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -28,7 +28,8 @@
aos::InitGoogle(&argc, &argv);
aos::logger::LogReader reader(FLAGS_logfile);
- aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration(),
+ reader.node());
reader.Register(&log_reader_factory);
std::unique_ptr<aos::EventLoop> printer_event_loop =
@@ -44,6 +45,10 @@
const flatbuffers::string_view type = channel->type()->string_view();
if (name.find(FLAGS_name) != std::string::npos &&
type.find(FLAGS_type) != std::string::npos) {
+ if (!aos::configuration::ChannelIsReadableOnNode(
+ channel, printer_event_loop->node())) {
+ continue;
+ }
LOG(INFO) << "Listening on " << name << " " << type;
CHECK_NOTNULL(channel->schema());
@@ -53,14 +58,27 @@
// unnecessary cruft from glog and to allow the user to readily
// redirect just the logged output independent of any debugging
// information on stderr.
- std::cout << context.realtime_event_time << " ("
- << context.monotonic_event_time << ") "
- << channel->name()->c_str() << ' '
- << channel->type()->c_str() << ": "
- << aos::FlatbufferToJson(
- channel->schema(),
- static_cast<const uint8_t *>(message))
- << '\n';
+ if (context.monotonic_remote_time != context.monotonic_event_time) {
+ std::cout << context.realtime_remote_time << " ("
+ << context.monotonic_remote_time << ") delivered "
+ << context.realtime_event_time << " ("
+ << context.monotonic_event_time << ") "
+ << channel->name()->c_str() << ' '
+ << channel->type()->c_str() << ": "
+ << aos::FlatbufferToJson(
+ channel->schema(),
+ static_cast<const uint8_t *>(message))
+ << '\n';
+ } else {
+ std::cout << context.realtime_event_time << " ("
+ << context.monotonic_event_time << ") "
+ << channel->name()->c_str() << ' '
+ << channel->type()->c_str() << ": "
+ << aos::FlatbufferToJson(
+ channel->schema(),
+ static_cast<const uint8_t *>(message))
+ << '\n';
+ }
});
found_channel = true;
}
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e35fe5b..6eae9e9 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -18,6 +18,9 @@
DEFINE_int32(flush_size, 1000000,
"Number of outstanding bytes to allow before flushing to disk.");
+DEFINE_bool(skip_missing_forwarding_entries, false,
+ "If true, drop any forwarding entries with missing data. If "
+ "false, CHECK.");
namespace aos {
namespace logger {
@@ -86,7 +89,44 @@
polling_period_(polling_period) {
for (const Channel *channel : *event_loop_->configuration()->channels()) {
FetcherStruct fs;
- fs.fetcher = event_loop->MakeRawFetcher(channel);
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
+ const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
+ channel, event_loop_->node()) &&
+ is_readable;
+
+ const bool log_delivery_times =
+ (event_loop_->node() == nullptr)
+ ? false
+ : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, event_loop_->node(), event_loop_->node());
+
+ if (log_message || log_delivery_times) {
+ fs.fetcher = event_loop->MakeRawFetcher(channel);
+ VLOG(1) << "Logging channel "
+ << configuration::CleanedChannelToString(channel);
+
+ if (log_delivery_times) {
+ if (log_message) {
+ VLOG(1) << " Logging message and delivery times";
+ fs.log_type = LogType::kLogMessageAndDeliveryTime;
+ } else {
+ VLOG(1) << " Logging delivery times only";
+ fs.log_type = LogType::kLogDeliveryTimeOnly;
+ }
+ } else {
+ // We don't have a particularly great use case right now for logging a
+ // forwarded message, but either not logging the delivery times, or
+ // logging them on another node. Fail rather than produce bad results.
+ CHECK(configuration::ChannelIsSendableOnNode(channel,
+ event_loop_->node()))
+ << ": Logger only knows how to log remote messages with "
+ "forwarding timestamps.";
+ VLOG(1) << " Logging message only";
+ fs.log_type = LogType::kLogMessage;
+ }
+ }
+
fs.written = false;
fetchers_.emplace_back(std::move(fs));
}
@@ -99,7 +139,9 @@
// so we can capture the latest message on each channel. This lets us have
// non periodic messages with configuration that now get logged.
for (FetcherStruct &f : fetchers_) {
- f.written = !f.fetcher->Fetch();
+ if (f.fetcher.get() != nullptr) {
+ f.written = !f.fetcher->Fetch();
+ }
}
// We need to pick a point in time to declare the log file "started". This
@@ -122,10 +164,16 @@
flatbuffers::Offset<flatbuffers::String> string_offset =
fbb.CreateString(network::GetHostname());
+ flatbuffers::Offset<Node> node_offset =
+ CopyFlatBuffer(event_loop_->node(), &fbb);
+ LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
+
aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
log_file_header_builder.add_name(string_offset);
+ log_file_header_builder.add_node(node_offset);
+
log_file_header_builder.add_configuration(configuration_offset);
// The worst case theoretical out of order is the polling period times 2.
// One message could get logged right after the boundary, but be for right
@@ -157,20 +205,46 @@
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
- int channel_index) {
- flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
- fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+ int channel_index, LogType log_type) {
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+
+ switch(log_type) {
+ case LogType::kLogMessage:
+ case LogType::kLogMessageAndDeliveryTime:
+ data_offset =
+ fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+ break;
+
+ case LogType::kLogDeliveryTimeOnly:
+ break;
+ }
MessageHeader::Builder message_header_builder(*fbb);
message_header_builder.add_channel_index(channel_index);
+ message_header_builder.add_queue_index(context.queue_index);
message_header_builder.add_monotonic_sent_time(
context.monotonic_event_time.time_since_epoch().count());
message_header_builder.add_realtime_sent_time(
context.realtime_event_time.time_since_epoch().count());
- message_header_builder.add_queue_index(context.queue_index);
+ switch (log_type) {
+ case LogType::kLogMessage:
+ message_header_builder.add_data(data_offset);
+ break;
- message_header_builder.add_data(data_offset);
+ case LogType::kLogMessageAndDeliveryTime:
+ message_header_builder.add_data(data_offset);
+ [[fallthrough]];
+
+ case LogType::kLogDeliveryTimeOnly:
+ message_header_builder.add_monotonic_remote_time(
+ context.monotonic_remote_time.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ context.realtime_remote_time.time_since_epoch().count());
+ message_header_builder.add_remote_queue_index(context.remote_queue_index);
+ break;
+ }
+
return message_header_builder.Finish();
}
@@ -188,51 +262,46 @@
size_t channel_index = 0;
// Write each channel to disk, one at a time.
for (FetcherStruct &f : fetchers_) {
- while (true) {
- if (f.fetcher.get() == nullptr) {
- if (!f.fetcher->FetchNext()) {
- VLOG(1) << "No new data on "
- << FlatbufferToJson(f.fetcher->channel());
- break;
- } else {
- f.written = false;
+ // Skip any channels which we aren't supposed to log.
+ if (f.fetcher.get() != nullptr) {
+ while (true) {
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(2) << "No new data on "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
}
- }
- if (f.written) {
- if (!f.fetcher->FetchNext()) {
- VLOG(1) << "No new data on "
- << FlatbufferToJson(f.fetcher->channel());
- break;
+ CHECK(!f.written);
+
+ // TODO(james): Write tests to exercise this logic.
+ if (f.fetcher->context().monotonic_event_time <
+ last_synchronized_time_) {
+ // Write!
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(1);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ channel_index, f.log_type));
+
+ VLOG(2) << "Writing data for channel "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+
+ max_header_size_ = std::max(
+ max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+ writer_->QueueSizedFlatbuffer(&fbb);
+
+ f.written = true;
} else {
- f.written = false;
+ break;
}
}
-
- CHECK(!f.written);
-
- // TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time <
- last_synchronized_time_) {
- // Write!
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(1);
-
- fbb.FinishSizePrefixed(
- PackMessage(&fbb, f.fetcher->context(), channel_index));
-
- VLOG(1) << "Writing data for channel "
- << FlatbufferToJson(f.fetcher->channel());
-
- max_header_size_ = std::max(
- max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- writer_->QueueSizedFlatbuffer(&fbb);
-
- f.written = true;
- } else {
- break;
- }
}
++channel_index;
@@ -373,11 +442,20 @@
queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
}
-const Configuration *LogReader::configuration() {
+const Configuration *LogReader::configuration() const {
return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
->configuration();
}
+const Node *LogReader::node() const {
+ return configuration::GetNode(
+ configuration(),
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->node()
+ ->name()
+ ->string_view());
+}
+
monotonic_clock::time_point LogReader::monotonic_start_time() {
return monotonic_clock::time_point(std::chrono::nanoseconds(
flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
@@ -434,20 +512,32 @@
FlatbufferVector<MessageHeader> front = std::move(channel.front());
- CHECK(front.message().data() != nullptr);
+ if (oldest_channel_index.first > monotonic_start_time() ||
+ event_loop_factory_ != nullptr) {
+ if (!FLAGS_skip_missing_forwarding_entries ||
+ front.message().data() != nullptr) {
+ CHECK(front.message().data() != nullptr)
+ << ": Got a message without data. Forwarding entry which was not "
+ "matched? Use --skip_missing_forwarding_entries to ignore "
+ "this.";
- if (oldest_channel_index.first > monotonic_start_time()) {
- // If we have access to the factory, use it to fix the realtime time.
- if (event_loop_factory_ != nullptr) {
- event_loop_factory_->SetRealtimeOffset(
+ // If we have access to the factory, use it to fix the realtime time.
+ if (event_loop_factory_ != nullptr) {
+ event_loop_factory_->SetRealtimeOffset(
+ monotonic_clock::time_point(
+ chrono::nanoseconds(front.message().monotonic_sent_time())),
+ realtime_clock::time_point(
+ chrono::nanoseconds(front.message().realtime_sent_time())));
+ }
+
+ channel.raw_sender->Send(
+ front.message().data()->Data(), front.message().data()->size(),
monotonic_clock::time_point(
- chrono::nanoseconds(front.message().monotonic_sent_time())),
+ chrono::nanoseconds(front.message().monotonic_remote_time())),
realtime_clock::time_point(
- chrono::nanoseconds(front.message().realtime_sent_time())));
+ chrono::nanoseconds(front.message().realtime_remote_time())),
+ front.message().remote_queue_index());
}
-
- channel.raw_sender->Send(front.message().data()->Data(),
- front.message().data()->size());
} else {
LOG(WARNING) << "Not sending data from before the start of the log file. "
<< oldest_channel_index.first.time_since_epoch().count()
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 3e89ff3..f1af17e 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -31,7 +31,8 @@
// Name of the device which this log file is for.
name:string;
- // TODO(austin): Node!
+ // The current node, if known and running in a multi-node configuration.
+ node:Node;
}
// Table holding a message.
@@ -39,18 +40,28 @@
// Index into the channel datastructure in the log file header. This
// provides the data type.
channel_index:uint;
- // Time this message was sent on the monotonic clock in nanoseconds.
+ // Time this message was sent on the monotonic clock in nanoseconds on this
+ // node.
monotonic_sent_time:long;
- // Time this message was sent on the realtime clock in nanoseconds.
+ // Time this message was sent on the realtime clock in nanoseconds on this
+ // node.
realtime_sent_time:long;
// Index into the ipc queue of this message. This should start with 0 and
// always monotonically increment if no messages were ever lost. It will
// wrap at a multiple of the queue size.
queue_index:uint;
- // TODO(austin): Node.
// TODO(austin): Format? Compressed?
// The nested flatbuffer.
data:[ubyte];
+
+ // Time this message was sent on the monotonic clock of the remote node in
+ // nanoseconds.
+ monotonic_remote_time:long = -9223372036854775808;
+ // Time this message was sent on the realtime clock of the remote node in
+ // nanoseconds.
+ realtime_remote_time:long = -9223372036854775808;
+ // Queue index of this message on the remote node.
+ remote_queue_index:uint = 4294967295;
}
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 497dabb..1e9a218 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -46,10 +46,17 @@
std::vector<struct iovec> iovec_;
};
-// Packes a message pointed to by the context into a MessageHeader.
-flatbuffers::Offset<MessageHeader> PackMessage(
- flatbuffers::FlatBufferBuilder *fbb, const Context &context,
- int channel_index);
+enum class LogType : uint8_t {
+ // The message originated on this node and should be logged here.
+ kLogMessage,
+ // The message originated on another node, but only the delivery times are
+ // logged here.
+ kLogDeliveryTimeOnly,
+ // The message originated on another node. Log it and the delivery times
+ // together. The message_gateway is responsible for logging any messages
+ // which didn't get delivered.
+ kLogMessageAndDeliveryTime
+};
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
@@ -73,6 +80,8 @@
struct FetcherStruct {
std::unique_ptr<RawFetcher> fetcher;
bool written = false;
+
+ LogType log_type;
};
std::vector<FetcherStruct> fetchers_;
@@ -89,6 +98,11 @@
size_t max_header_size_ = 0;
};
+// Packes a message pointed to by the context into a MessageHeader.
+flatbuffers::Offset<MessageHeader> PackMessage(
+ flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+ int channel_index, LogType log_type);
+
// Replays all the channels in the logfile to the event loop.
class LogReader {
public:
@@ -106,7 +120,10 @@
// TODO(austin): Remap channels?
// Returns the configuration from the log file.
- const Configuration *configuration();
+ const Configuration *configuration() const;
+
+ // Returns the node that this log file was created on.
+ const Node *node() const;
// Returns the starting timestamp for the log file.
monotonic_clock::time_point monotonic_start_time();
@@ -125,8 +142,8 @@
// will have to read more data from disk.
bool MessageAvailable();
- // Returns a span with the data for a message from the log file, excluding the
- // size.
+ // Returns a span with the data for a message from the log file, excluding
+ // the size.
absl::Span<const uint8_t> ReadMessage();
// Queues at least max_out_of_order_duration_ messages into channels_.
@@ -144,16 +161,16 @@
// buffer, then into sender), but none of it is all that expensive. We can
// optimize if it is slow later.
//
- // As we place the elements in the sorted list of times, keep doing this until
- // we read a message that is newer than the threshold.
+ // As we place the elements in the sorted list of times, keep doing this
+ // until we read a message that is newer than the threshold.
//
// Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
// small state machine so we can resume), and keep pulling messages back out
// and sending.
//
- // For sorting, we want to use the fact that each channel is sorted, and then
- // merge sort the channels. Have a vector of deques, and then hold a sorted
- // list of pointers to those.
+ // For sorting, we want to use the fact that each channel is sorted, and
+ // then merge sort the channels. Have a vector of deques, and then hold a
+ // sorted list of pointers to those.
//
// TODO(austin): Multithreaded read at some point. Gotta go faster!
// Especially if we start compressing.
@@ -183,8 +200,8 @@
}
};
- // Minimum amount of data to queue up for sorting before we are guarenteed to
- // not see data out of order.
+ // Minimum amount of data to queue up for sorting before we are guarenteed
+ // to not see data out of order.
std::chrono::nanoseconds max_out_of_order_duration_;
// File descriptor for the log file.
@@ -195,8 +212,8 @@
EventLoop *event_loop_ = nullptr;
TimerHandler *timer_handler_;
- // Vector to read into. This uses an allocator which doesn't zero initialize
- // the memory.
+ // Vector to read into. This uses an allocator which doesn't zero
+ // initialize the memory.
std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
// Amount of data consumed already in data_.
@@ -223,8 +240,8 @@
// timestamp.
std::pair<monotonic_clock::time_point, int> PopOldestChannel();
- // Datastructure to hold the list of messages, cached timestamp for the oldest
- // message, and sender to send with.
+ // Datastructure to hold the list of messages, cached timestamp for the
+ // oldest message, and sender to send with.
struct ChannelData {
monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
std::deque<FlatbufferVector<MessageHeader>> data;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 9aae936..02a9a14 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -65,7 +65,8 @@
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
- // This sends out the fetched messages and advances time to the start of the log file.
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
reader.Register(&log_reader_factory);
std::unique_ptr<EventLoop> test_event_loop =
@@ -136,6 +137,123 @@
}
}
+class MultinodeLoggerTest : public ::testing::Test {
+ public:
+ MultinodeLoggerTest()
+ : config_(aos::configuration::ReadConfig(
+ "aos/events/multinode_pingpong_config.json")),
+ event_loop_factory_(&config_.message(), "pi1"),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+ ping_(ping_event_loop_.get()) {}
+
+ // Config and factory.
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+ SimulatedEventLoopFactory event_loop_factory_;
+
+ // Event loop and app for Ping
+ std::unique_ptr<EventLoop> ping_event_loop_;
+ Ping ping_;
+};
+
+// Tests that we can startup at all in a multinode configuration.
+TEST_F(MultinodeLoggerTest, MultiNode) {
+ constexpr chrono::seconds kTimeOffset = chrono::seconds(10000);
+ constexpr uint32_t kQueueIndexOffset = 1024;
+ const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+ const ::std::string logfile = tmpdir + "/multi_logfile.bfbs";
+ // Remove it.
+ unlink(logfile.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile;
+
+ {
+ std::unique_ptr<EventLoop> pong_event_loop =
+ event_loop_factory_.MakeEventLoop("pong");
+
+ std::unique_ptr<aos::RawSender> pong_sender(
+ pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
+ pong_event_loop->configuration(), "/test", "aos.examples.Pong",
+ pong_event_loop->name(), pong_event_loop->node())));
+
+ // Ok, let's fake a remote node. We use the fancy raw sender Send
+ // method that message_gateway will use to do that.
+ int pong_count = 0;
+ pong_event_loop->MakeWatcher(
+ "/test", [&pong_event_loop, &pong_count, &pong_sender,
+ kTimeOffset](const examples::Ping &ping) {
+ flatbuffers::FlatBufferBuilder fbb;
+ examples::Pong::Builder pong_builder(fbb);
+ pong_builder.add_value(ping.value());
+ pong_builder.add_initial_send_time(ping.send_time());
+ fbb.Finish(pong_builder.Finish());
+
+ pong_sender->Send(fbb.GetBufferPointer(), fbb.GetSize(),
+ pong_event_loop->monotonic_now() + kTimeOffset,
+ pong_event_loop->realtime_now() + kTimeOffset,
+ kQueueIndexOffset + pong_count);
+ ++pong_count;
+ });
+
+ DetachedBufferWriter writer(logfile);
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger");
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ Logger logger(&writer, logger_event_loop.get(),
+ std::chrono::milliseconds(100));
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(logfile);
+
+ // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
+ // messages. This won't work today yet until the log reading code gets
+ // significantly better.
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration(),
+ reader.node());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ std::unique_ptr<EventLoop> test_event_loop =
+ log_reader_factory.MakeEventLoop("test");
+
+ int ping_count = 10;
+ int pong_count = 10;
+
+ // Confirm that the ping value matches.
+ test_event_loop->MakeWatcher("/test",
+ [&ping_count](const examples::Ping &ping) {
+ EXPECT_EQ(ping.value(), ping_count + 1);
+ ++ping_count;
+ });
+ // Confirm that the ping and pong counts both match, and the value also
+ // matches.
+ test_event_loop->MakeWatcher(
+ "/test", [&test_event_loop, &ping_count, &pong_count,
+ kTimeOffset](const examples::Pong &pong) {
+ EXPECT_EQ(test_event_loop->context().remote_queue_index,
+ pong_count + kQueueIndexOffset);
+ EXPECT_EQ(test_event_loop->context().monotonic_remote_time,
+ test_event_loop->monotonic_now() + kTimeOffset);
+ EXPECT_EQ(test_event_loop->context().realtime_remote_time,
+ test_event_loop->realtime_now() + kTimeOffset);
+
+ EXPECT_EQ(pong.value(), pong_count + 1);
+ ++pong_count;
+ EXPECT_EQ(ping_count, pong_count);
+ });
+
+ log_reader_factory.RunFor(std::chrono::seconds(100));
+ EXPECT_EQ(ping_count, 2010);
+ EXPECT_EQ(pong_count, 2010);
+
+ reader.Deregister();
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
new file mode 100644
index 0000000..f0e532e
--- /dev/null
+++ b/aos/events/multinode_pingpong.json
@@ -0,0 +1,161 @@
+{
+ "channels": [
+ {
+ "name": "/aos/pi1",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/aos/pi3",
+ "type": "aos.timing.Report",
+ "source_node": "pi3",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1"
+ }
+ ]
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1"
+ }
+ ]
+ },
+ {
+ "name": "/test2",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi3",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1"
+ }
+ ]
+ },
+ {
+ "name": "/test2",
+ "type": "aos.examples.Pong",
+ "source_node": "pi3",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1"
+ }
+ ]
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/aos/pi3"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ },
+ {
+ "name": "pi3",
+ "hostname": "raspberrypi3",
+ "port": 9971
+ }
+ ],
+ "applications": [
+ {
+ "name": "ping2",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ }
+ ]
+ },
+ {
+ "name": "pong2",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ }
+ ]
+ }
+ ]
+}
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index a8e227e..cde538a 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -155,7 +155,11 @@
ShmEventLoop::ShmEventLoop(const Configuration *configuration)
: EventLoop(configuration),
name_(Filename(program_invocation_name)),
- node_(MaybeMyNode(configuration)) {}
+ node_(MaybeMyNode(configuration)) {
+ if (configuration->has_nodes()) {
+ CHECK(node_ != nullptr) << ": Couldn't find node in config.";
+ }
+}
namespace internal {
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a2740e4..af0c02b 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -748,6 +748,22 @@
<< "\" in the configuration.";
}
+SimulatedEventLoopFactory::SimulatedEventLoopFactory(
+ const Configuration *configuration, const Node *node)
+ : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
+ CHECK(configuration_->has_nodes())
+ << ": Got a configuration with no nodes and node \""
+ << node->name()->string_view() << "\" was selected.";
+ bool found = false;
+ for (const Node *node : *configuration_->nodes()) {
+ if (node == node_) {
+ found = true;
+ break;
+ }
+ }
+ CHECK(found) << ": node must be a pointer in the configuration.";
+}
+
SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index fee3ef0..db0b840 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -54,6 +54,8 @@
SimulatedEventLoopFactory(const Configuration *configuration);
SimulatedEventLoopFactory(const Configuration *configuration,
std::string_view node_name);
+ SimulatedEventLoopFactory(const Configuration *configuration,
+ const Node *node);
~SimulatedEventLoopFactory();
::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);