Switch message_bridge over to RemoteData from MessageHeader
Just like timestamps, we want to add more data here. Instead of having
to add that to MessageHeader, switch over to a new message.
Change-Id: I171fde6db0212683dde5bfc9ba7a3700e858dbf0
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 789b093..42e115b 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -37,6 +37,13 @@
)
flatbuffer_cc_library(
+ name = "remote_data_fbs",
+ srcs = ["remote_data.fbs"],
+ gen_reflections = 1,
+ target_compatible_with = ["@platforms//os:linux"],
+)
+
+flatbuffer_cc_library(
name = "timestamp_fbs",
srcs = ["timestamp.fbs"],
gen_reflections = 1,
@@ -206,6 +213,7 @@
":message_bridge_protocol",
":message_bridge_server_fbs",
":message_bridge_server_status",
+ ":remote_data_fbs",
":remote_message_fbs",
":sctp_lib",
":sctp_server",
@@ -287,6 +295,7 @@
":message_bridge_client_status",
":message_bridge_protocol",
":message_bridge_server_fbs",
+ ":remote_data_fbs",
":remote_message_fbs",
":sctp_client",
":timestamp_fbs",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 97058f1..bde8480 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -8,6 +8,7 @@
#include "aos/network/connect_generated.h"
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/remote_data_generated.h"
#include "aos/network/sctp_client.h"
#include "aos/network/timestamp_generated.h"
#include "aos/unique_malloc_ptr.h"
@@ -218,8 +219,8 @@
}
void SctpClientConnection::HandleData(const Message *message) {
- const logger::MessageHeader *message_header =
- flatbuffers::GetSizePrefixedRoot<logger::MessageHeader>(message->data());
+ const RemoteData *remote_data =
+ flatbuffers::GetSizePrefixedRoot<RemoteData>(message->data());
VLOG(1) << "Got a message of size " << message->size;
CHECK_EQ(message->size, flatbuffers::GetPrefixedSize(message->data()) +
@@ -228,9 +229,9 @@
const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
SctpClientChannelState *channel_state = &((*channels_)[stream_to_channel_[stream]]);
- if (message_header->queue_index() == channel_state->last_queue_index &&
- aos::monotonic_clock::time_point(
- chrono::nanoseconds(message_header->monotonic_sent_time())) ==
+ if (remote_data->queue_index() == channel_state->last_queue_index &&
+ monotonic_clock::time_point(
+ chrono::nanoseconds(remote_data->monotonic_sent_time())) ==
channel_state->last_timestamp) {
LOG(INFO) << "Duplicate message from " << message->PeerAddress();
connection_->mutate_duplicate_packets(connection_->duplicate_packets() + 1);
@@ -238,23 +239,23 @@
} else {
connection_->mutate_received_packets(connection_->received_packets() + 1);
- channel_state->last_queue_index = message_header->queue_index();
- channel_state->last_timestamp = aos::monotonic_clock::time_point(
- chrono::nanoseconds(message_header->monotonic_sent_time()));
+ channel_state->last_queue_index = remote_data->queue_index();
+ channel_state->last_timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(remote_data->monotonic_sent_time()));
// Publish the message.
RawSender *sender = channel_state->sender.get();
- sender->Send(message_header->data()->data(), message_header->data()->size(),
- aos::monotonic_clock::time_point(chrono::nanoseconds(
- message_header->monotonic_sent_time())),
- aos::realtime_clock::time_point(
- chrono::nanoseconds(message_header->realtime_sent_time())),
- message_header->queue_index());
+ sender->Send(remote_data->data()->data(), remote_data->data()->size(),
+ monotonic_clock::time_point(
+ chrono::nanoseconds(remote_data->monotonic_sent_time())),
+ realtime_clock::time_point(
+ chrono::nanoseconds(remote_data->realtime_sent_time())),
+ remote_data->queue_index());
client_status_->SampleFilter(
client_index_,
- aos::monotonic_clock::time_point(
- chrono::nanoseconds(message_header->monotonic_sent_time())),
+ monotonic_clock::time_point(
+ chrono::nanoseconds(remote_data->monotonic_sent_time())),
sender->monotonic_sent_time());
if (stream_reply_with_timestamp_[stream]) {
@@ -264,13 +265,13 @@
// Now fill out the message received reply. This uses a MessageHeader
// container so it can be directly logged.
message_reception_reply_.mutable_message()->mutate_channel_index(
- message_header->channel_index());
+ remote_data->channel_index());
message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
- message_header->monotonic_sent_time());
+ remote_data->monotonic_sent_time());
message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
- message_header->realtime_sent_time());
+ remote_data->realtime_sent_time());
message_reception_reply_.mutable_message()->mutate_queue_index(
- message_header->queue_index());
+ remote_data->queue_index());
// And capture the relevant data needed to generate the forwarding
// MessageHeader.
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 7328f6f..7347607 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -8,6 +8,7 @@
#include "aos/network/connect_generated.h"
#include "aos/network/message_bridge_protocol.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_data_generated.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
#include "aos/network/timestamp_channel.h"
@@ -34,10 +35,22 @@
<< channel_->name()->string_view() << " "
<< channel_->type()->string_view() << " size " << context.size;
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+ fbb.CreateVector(static_cast<const uint8_t *>(context.data),
+ context.size);
+
+ RemoteData::Builder remote_data_builder(fbb);
+ remote_data_builder.add_channel_index(channel_index_);
+ remote_data_builder.add_queue_index(context.queue_index);
+ remote_data_builder.add_monotonic_sent_time(
+ context.monotonic_event_time.time_since_epoch().count());
+ remote_data_builder.add_realtime_sent_time(
+ context.realtime_event_time.time_since_epoch().count());
+ remote_data_builder.add_data(data_offset);
+
// TODO(austin): Use an iovec to build it up in 3 parts to avoid the copy?
// Only useful when not logging.
- fbb.FinishSizePrefixed(logger::PackMessage(&fbb, context, channel_index_,
- logger::LogType::kLogMessage));
+ fbb.FinishSizePrefixed(remote_data_builder.Finish());
return fbb;
}
@@ -115,30 +128,30 @@
flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
server_status.BootUUID(peer.node_index).PackString(builder.fbb());
- RemoteMessage::Builder message_header_builder =
+ RemoteMessage::Builder remote_message_builder =
builder.MakeBuilder<RemoteMessage>();
- message_header_builder.add_channel_index(
+ remote_message_builder.add_channel_index(
message_header->channel_index());
- message_header_builder.add_queue_index(
+ remote_message_builder.add_queue_index(
message_header->remote_queue_index());
- message_header_builder.add_monotonic_sent_time(
+ remote_message_builder.add_monotonic_sent_time(
message_header->monotonic_remote_time());
- message_header_builder.add_realtime_sent_time(
+ remote_message_builder.add_realtime_sent_time(
message_header->realtime_remote_time());
// Swap the remote and sent metrics. They are from the sender's
// perspective, not the receiver's perspective.
- message_header_builder.add_monotonic_remote_time(
+ remote_message_builder.add_monotonic_remote_time(
message_header->monotonic_sent_time());
- message_header_builder.add_realtime_remote_time(
+ remote_message_builder.add_realtime_remote_time(
message_header->realtime_sent_time());
- message_header_builder.add_remote_queue_index(
+ remote_message_builder.add_remote_queue_index(
message_header->queue_index());
- message_header_builder.add_boot_uuid(boot_uuid_offset);
+ remote_message_builder.add_boot_uuid(boot_uuid_offset);
- builder.Send(message_header_builder.Finish());
+ builder.Send(remote_message_builder.Finish());
}
break;
}
@@ -166,7 +179,7 @@
}
void ChannelState::HandleFailure(
- SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message) {
+ SizePrefixedFlatbufferDetachedBuffer<RemoteData> &&message) {
// TODO(austin): Put it in the log queue.
if (VLOG_IS_ON(2)) {
LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index 88433d1..be1801c 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -11,6 +11,7 @@
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/message_bridge_server_status.h"
+#include "aos/network/remote_data_generated.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
#include "aos/network/timestamp_channel.h"
@@ -88,7 +89,7 @@
// Handles (by consuming) failure to deliver a message.
void HandleFailure(
- SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message);
+ SizePrefixedFlatbufferDetachedBuffer<RemoteData> &&message);
private:
const int channel_index_;
@@ -96,8 +97,7 @@
std::vector<Peer> peers_;
- std::deque<SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader>>
- sent_messages_;
+ std::deque<SizePrefixedFlatbufferDetachedBuffer<RemoteData>> sent_messages_;
// A fetcher to use to send the last message when a node connects and is
// reliable.
diff --git a/aos/network/remote_data.fbs b/aos/network/remote_data.fbs
new file mode 100644
index 0000000..0813e57
--- /dev/null
+++ b/aos/network/remote_data.fbs
@@ -0,0 +1,25 @@
+namespace aos.message_bridge;
+
+table RemoteData {
+ // Index into the channel datastructure in the log file header. This
+ // provides the data type.
+ channel_index:uint (id: 0);
+ // Time this message was sent on the monotonic clock in nanoseconds on this
+ // node.
+ monotonic_sent_time:long (id: 1);
+ // Time this message was sent on the realtime clock in nanoseconds on this
+ // node.
+ realtime_sent_time:long (id: 2);
+ // Index into the ipc queue of this message. This should start with 0 and
+ // always monotonically increment if no messages were ever lost. It will
+ // wrap at a multiple of the queue size.
+ queue_index:uint (id: 3);
+
+ // The nested flatbuffer.
+ data:[ubyte] (id: 4);
+
+ // UUID for this boot. This is 16 bytes long.
+ boot_uuid:[uint8] (id: 5);
+}
+
+root_type RemoteData;