Switch magic timestamp channel to RemoteMessage
In order to track reboots and such, we really need to communicate more
information from the message_bridge to the logger. It is a shame to
have to modify the MessageHeader to do this, even though it would be
nice. Switch the remote timestamp channels over to a new RemoteMessage
channel instead, and add code to rename the channel when replaying.
There are no known log files with a MessageHeader actually logged, so
most of this should be pretty safe. I've tested this on an old log file
by hand.
Change-Id: If81b31869b95040151d833d20ec3eb8623ab1cd4
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 4818aa2..13ba42e 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -1,10 +1,23 @@
load("//aos/seasocks:gen_embedded.bzl", "gen_embedded")
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
load("//aos:config.bzl", "aos_config")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
package(default_visibility = ["//visibility:public"])
flatbuffer_cc_library(
+ name = "remote_message_fbs",
+ srcs = ["remote_message.fbs"],
+ gen_reflections = 1,
+)
+
+cc_static_flatbuffer(
+ name = "remote_message_schema",
+ function = "aos::message_bridge::RemoteMessageSchema",
+ target = ":remote_message_fbs_reflection_out",
+)
+
+flatbuffer_cc_library(
name = "connect_fbs",
srcs = ["connect.fbs"],
gen_reflections = 1,
@@ -178,6 +191,7 @@
":message_bridge_protocol",
":message_bridge_server_fbs",
":message_bridge_server_status",
+ ":remote_message_fbs",
":sctp_lib",
":sctp_server",
":timestamp_fbs",
@@ -257,6 +271,7 @@
":message_bridge_client_status",
":message_bridge_protocol",
":message_bridge_server_fbs",
+ ":remote_message_fbs",
":sctp_client",
":timestamp_fbs",
"//aos/events:shm_event_loop",
@@ -285,7 +300,7 @@
name = "message_bridge_test_common_config",
src = "message_bridge_test_common.json",
flatbuffers = [
- "//aos/events/logging:logger_fbs",
+ ":remote_message_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index b6c94e5..d8a632e 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -8,6 +8,7 @@
#include "aos/network/connect_generated.h"
#include "aos/network/message_bridge_protocol.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
#include "glog/logging.h"
@@ -106,11 +107,11 @@
// This needs to be munged and cleaned up to match the timestamp
// standard.
- aos::Sender<logger::MessageHeader>::Builder builder =
+ aos::Sender<RemoteMessage>::Builder builder =
peer.timestamp_logger->MakeBuilder();
- logger::MessageHeader::Builder message_header_builder =
- builder.MakeBuilder<logger::MessageHeader>();
+ RemoteMessage::Builder message_header_builder =
+ builder.MakeBuilder<RemoteMessage>();
message_header_builder.add_channel_index(
message_header->channel_index());
@@ -173,10 +174,10 @@
// time out eventually. Need to sort that out.
}
-void ChannelState::AddPeer(
- const Connection *connection, int node_index,
- ServerConnection *server_connection_statistics, bool logged_remotely,
- aos::Sender<logger::MessageHeader> *timestamp_logger) {
+void ChannelState::AddPeer(const Connection *connection, int node_index,
+ ServerConnection *server_connection_statistics,
+ bool logged_remotely,
+ aos::Sender<RemoteMessage> *timestamp_logger) {
peers_.emplace_back(connection, node_index, server_connection_statistics,
logged_remotely, timestamp_logger);
}
@@ -330,7 +331,7 @@
// timestamps from it.
if (delivery_time_is_logged && !timestamp_loggers_[other_node_index]) {
timestamp_loggers_[other_node_index] =
- event_loop_->MakeSender<logger::MessageHeader>(
+ event_loop_->MakeSender<RemoteMessage>(
absl::StrCat("/aos/remote_timestamps/",
connection->name()->string_view()));
}
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index c0c1d56..04b0e16 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -11,6 +11,7 @@
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/message_bridge_server_status.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
#include "aos/network/timestamp_generated.h"
#include "glog/logging.h"
@@ -36,7 +37,7 @@
Peer(const Connection *new_connection, int new_node_index,
ServerConnection *new_server_connection_statistics,
bool new_logged_remotely,
- aos::Sender<logger::MessageHeader> *new_timestamp_logger)
+ aos::Sender<RemoteMessage> *new_timestamp_logger)
: connection(new_connection),
node_index(new_node_index),
server_connection_statistics(new_server_connection_statistics),
@@ -50,7 +51,7 @@
const aos::Connection *connection;
const int node_index;
ServerConnection *server_connection_statistics;
- aos::Sender<logger::MessageHeader> *timestamp_logger = nullptr;
+ aos::Sender<RemoteMessage> *timestamp_logger = nullptr;
// If true, this message will be logged on a receiving node. We need to
// keep it around to log it locally if that fails.
@@ -67,7 +68,7 @@
void AddPeer(const Connection *connection, int node_index,
ServerConnection *server_connection_statistics,
bool logged_remotely,
- aos::Sender<logger::MessageHeader> *timestamp_logger);
+ aos::Sender<RemoteMessage> *timestamp_logger);
// Returns true if this channel has the same name and type as the other
// channel.
@@ -125,7 +126,7 @@
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
- std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers_;
+ std::vector<aos::Sender<RemoteMessage>> timestamp_loggers_;
SctpServer server_;
MessageBridgeServerStatus server_status_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 36bdde9..ed192b7 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -34,18 +34,18 @@
}
class MessageBridgeTest : public ::testing::Test {
- public:
- MessageBridgeTest()
- : pi1_config(aos::configuration::ReadConfig(
- "aos/network/message_bridge_test_server_config.json")),
- pi2_config(aos::configuration::ReadConfig(
- "aos/network/message_bridge_test_client_config.json")) {
- util::UnlinkRecursive(ShmBase("pi1"));
- util::UnlinkRecursive(ShmBase("pi2"));
- }
+ public:
+ MessageBridgeTest()
+ : pi1_config(aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json")),
+ pi2_config(aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json")) {
+ util::UnlinkRecursive(ShmBase("pi1"));
+ util::UnlinkRecursive(ShmBase("pi2"));
+ }
- aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
- aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
};
// Test that we can send a ping message over sctp and receive it.
@@ -89,8 +89,8 @@
ping_event_loop.MakeSender<examples::Ping>("/test");
aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
- aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
- pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
+ aos::Fetcher<RemoteMessage> message_header_fetcher1 =
+ pi1_test_event_loop.MakeFetcher<RemoteMessage>(
"/pi1/aos/remote_timestamps/pi2");
// Fetchers for confirming the remote timestamps made it.
@@ -121,8 +121,8 @@
aos::Fetcher<ClientStatistics> client_statistics_fetcher =
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
- aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
- test_event_loop.MakeFetcher<logger::MessageHeader>(
+ aos::Fetcher<RemoteMessage> message_header_fetcher2 =
+ test_event_loop.MakeFetcher<RemoteMessage>(
"/pi2/aos/remote_timestamps/pi1");
// Event loop for fetching data delivered to pi2 from pi1 to match up
@@ -282,8 +282,9 @@
"/pi1/aos/remote_timestamps/pi2",
[pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
&ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
- &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
@@ -879,9 +880,9 @@
std::atomic<int> ping_timestamp_count{0};
pi1_remote_timestamp_event_loop.MakeWatcher(
"/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index,
- &ping_timestamp_count](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
if (header.channel_index() == ping_channel_index) {
++ping_timestamp_count;
}
@@ -1034,9 +1035,9 @@
std::atomic<int> ping_timestamp_count{0};
pi1_remote_timestamp_event_loop.MakeWatcher(
"/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index,
- &ping_timestamp_count](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
if (header.channel_index() == ping_channel_index) {
++ping_timestamp_count;
}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a7c0d1b..2b89080 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -74,13 +74,13 @@
},
{
"name": "/pi1/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "pi1",
"frequency": 10
},
{
"name": "/pi2/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "pi2",
"frequency": 10
},
diff --git a/aos/network/remote_message.fbs b/aos/network/remote_message.fbs
new file mode 100644
index 0000000..5659704
--- /dev/null
+++ b/aos/network/remote_message.fbs
@@ -0,0 +1,31 @@
+namespace aos.message_bridge;
+
+table RemoteMessage {
+ // Index into the channel datastructure in the log file header. This
+ // provides the data type.
+ channel_index:uint (id: 0);
+ // Time this message was sent on the monotonic clock in nanoseconds on this
+ // node.
+ monotonic_sent_time:long (id: 1);
+ // Time this message was sent on the realtime clock in nanoseconds on this
+ // node.
+ realtime_sent_time:long (id: 2);
+ // Index into the ipc queue of this message. This should start with 0 and
+ // always monotonically increment if no messages were ever lost. It will
+ // wrap at a multiple of the queue size.
+ queue_index:uint (id: 3);
+
+ // The nested flatbuffer.
+ data:[ubyte] (id: 4);
+
+ // Time this message was sent on the monotonic clock of the remote node in
+ // nanoseconds.
+ monotonic_remote_time:int64 = -9223372036854775808 (id: 5);
+ // Time this message was sent on the realtime clock of the remote node in
+ // nanoseconds.
+ realtime_remote_time:int64 = -9223372036854775808 (id: 6);
+ // Queue index of this message on the remote node.
+ remote_queue_index:uint32 = 4294967295 (id: 7);
+}
+
+root_type RemoteMessage;