Resend any reliable messages received before startup
We want to be able to publish (and receive) low frequency configuration
messages. This lets us make those *not* periodic when we move them
across nodes, simplifying the system.
This takes some special tracking at startup. We very much don't want to
re-send messages already sent. That would result in 2 receive
timestamps for a single node for 1 send packet, probably breaking log
sorting. I'm not interested in learning what would break...
Change-Id: I489460cd4919907516e504e6694d7cef544b0da6
diff --git a/aos/network/BUILD b/aos/network/BUILD
index b142967..8d49f20 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -299,10 +299,11 @@
":message_bridge_test_client_config",
":message_bridge_test_server_config",
],
- shard_count = 4,
+ shard_count = 5,
deps = [
":message_bridge_client_lib",
":message_bridge_server_lib",
+ "//aos:event",
"//aos:json_to_flatbuffer",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
diff --git a/aos/network/message_bridge_client.fbs b/aos/network/message_bridge_client.fbs
index 9dddd16..7056506 100644
--- a/aos/network/message_bridge_client.fbs
+++ b/aos/network/message_bridge_client.fbs
@@ -18,6 +18,9 @@
// to get their monotonic time.
monotonic_offset:int64 (id: 3);
+ // Number of duplicate packets we received and dropped.
+ duplicate_packets:uint (id: 4);
+
// TODO(austin): Per channel counts?
}
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index e5c18b6..99b2ac1 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -94,7 +94,7 @@
SctpClientConnection::SctpClientConnection(
aos::ShmEventLoop *const event_loop, std::string_view remote_name,
const Node *my_node, std::string_view local_host,
- std::vector<std::unique_ptr<aos::RawSender>> *channels, int client_index,
+ std::vector<SctpClientChannelState> *channels, int client_index,
MessageBridgeClientStatus *client_status)
: event_loop_(event_loop),
connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
@@ -223,57 +223,71 @@
CHECK_EQ(message->size, flatbuffers::GetPrefixedSize(message->data()) +
sizeof(flatbuffers::uoffset_t));
- connection_->mutate_received_packets(connection_->received_packets() + 1);
-
const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
+ SctpClientChannelState *channel_state = &((*channels_)[stream_to_channel_[stream]]);
- // Publish the message.
- RawSender *sender = (*channels_)[stream_to_channel_[stream]].get();
- sender->Send(message_header->data()->data(), message_header->data()->size(),
- aos::monotonic_clock::time_point(
- chrono::nanoseconds(message_header->monotonic_sent_time())),
- aos::realtime_clock::time_point(
- chrono::nanoseconds(message_header->realtime_sent_time())),
- message_header->queue_index());
-
- client_status_->SampleFilter(
- client_index_,
+ if (message_header->queue_index() == channel_state->last_queue_index &&
aos::monotonic_clock::time_point(
- chrono::nanoseconds(message_header->monotonic_sent_time())),
- sender->monotonic_sent_time());
+ chrono::nanoseconds(message_header->monotonic_sent_time())) ==
+ channel_state->last_timestamp) {
+ LOG(INFO) << "Duplicate message from " << message->PeerAddress();
+ connection_->mutate_duplicate_packets(connection_->duplicate_packets() + 1);
+ // Duplicate message, ignore.
+ } else {
+ connection_->mutate_received_packets(connection_->received_packets() + 1);
- if (stream_reply_with_timestamp_[stream]) {
- // TODO(austin): Send back less if we are only acking. Maybe only a
- // stream id? Nothing if we are only forwarding?
+ channel_state->last_queue_index = message_header->queue_index();
+ channel_state->last_timestamp = aos::monotonic_clock::time_point(
+ chrono::nanoseconds(message_header->monotonic_sent_time()));
- // Now fill out the message received reply. This uses a MessageHeader
- // container so it can be directly logged.
- message_reception_reply_.mutable_message()->mutate_channel_index(
- message_header->channel_index());
- message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
- message_header->monotonic_sent_time());
- message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
- message_header->realtime_sent_time());
- message_reception_reply_.mutable_message()->mutate_queue_index(
- message_header->queue_index());
+ // Publish the message.
+ RawSender *sender = channel_state->sender.get();
+ sender->Send(message_header->data()->data(), message_header->data()->size(),
+ aos::monotonic_clock::time_point(chrono::nanoseconds(
+ message_header->monotonic_sent_time())),
+ aos::realtime_clock::time_point(
+ chrono::nanoseconds(message_header->realtime_sent_time())),
+ message_header->queue_index());
- // And capture the relevant data needed to generate the forwarding
- // MessageHeader.
- message_reception_reply_.mutable_message()->mutate_monotonic_remote_time(
- sender->monotonic_sent_time().time_since_epoch().count());
- message_reception_reply_.mutable_message()->mutate_realtime_remote_time(
- sender->realtime_sent_time().time_since_epoch().count());
- message_reception_reply_.mutable_message()->mutate_remote_queue_index(
- sender->sent_queue_index());
+ client_status_->SampleFilter(
+ client_index_,
+ aos::monotonic_clock::time_point(
+ chrono::nanoseconds(message_header->monotonic_sent_time())),
+ sender->monotonic_sent_time());
- // Unique ID is channel_index and monotonic clock.
- // TODO(austin): Depending on if we are the logger node or not, we need to
- // guarentee that this ack gets received too... Same path as the logger.
- client_.Send(kTimestampStream(),
- std::string_view(reinterpret_cast<const char *>(
- message_reception_reply_.span().data()),
- message_reception_reply_.span().size()),
- 0);
+ if (stream_reply_with_timestamp_[stream]) {
+ // TODO(austin): Send back less if we are only acking. Maybe only a
+ // stream id? Nothing if we are only forwarding?
+
+ // Now fill out the message received reply. This uses a MessageHeader
+ // container so it can be directly logged.
+ message_reception_reply_.mutable_message()->mutate_channel_index(
+ message_header->channel_index());
+ message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
+ message_header->monotonic_sent_time());
+ message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
+ message_header->realtime_sent_time());
+ message_reception_reply_.mutable_message()->mutate_queue_index(
+ message_header->queue_index());
+
+ // And capture the relevant data needed to generate the forwarding
+ // MessageHeader.
+ message_reception_reply_.mutable_message()->mutate_monotonic_remote_time(
+ sender->monotonic_sent_time().time_since_epoch().count());
+ message_reception_reply_.mutable_message()->mutate_realtime_remote_time(
+ sender->realtime_sent_time().time_since_epoch().count());
+ message_reception_reply_.mutable_message()->mutate_remote_queue_index(
+ sender->sent_queue_index());
+
+ // Unique ID is channel_index and monotonic clock.
+ // TODO(austin): Depending on if we are the logger node or not, we need to
+ // guarentee that this ack gets received too... Same path as the logger.
+ client_.Send(kTimestampStream(),
+ std::string_view(reinterpret_cast<const char *>(
+ message_reception_reply_.span().data()),
+ message_reception_reply_.span().size()),
+ 0);
+ }
}
VLOG(1) << "Received data of length " << message->size << " from "
@@ -308,7 +322,24 @@
event_loop_->configuration(), channel->name()->string_view(),
channel->type()->string_view(), event_loop_->name(),
event_loop_->node());
- channels_[channel_index] = event_loop_->MakeRawSender(mapped_channel);
+
+ channels_[channel_index].sender =
+ event_loop_->MakeRawSender(mapped_channel);
+
+ std::unique_ptr<aos::RawFetcher> raw_fetcher =
+ event_loop_->MakeRawFetcher(mapped_channel);
+ raw_fetcher->Fetch();
+
+ if (raw_fetcher->context().data != nullptr) {
+ VLOG(1) << "Found data on "
+ << configuration::CleanedChannelToString(channel)
+ << ", won't resend it.";
+ channels_[channel_index].last_queue_index =
+ raw_fetcher->context().queue_index;
+ channels_[channel_index].last_timestamp =
+ raw_fetcher->context().monotonic_remote_time;
+ }
+
break;
}
}
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 8134cbd..2b48906 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -15,6 +15,16 @@
namespace aos {
namespace message_bridge {
+// Structure to hold per channel state.
+struct SctpClientChannelState {
+ // The sender for a channel.
+ std::unique_ptr<aos::RawSender> sender;
+ // The last queue index of a message sent. Used for detecting duplicates.
+ uint32_t last_queue_index = 0xffffffff;
+ // The last timestamp of a message sent. Used for detecting duplicates.
+ monotonic_clock::time_point last_timestamp = monotonic_clock::min_time;
+};
+
// See message_bridge_protocol.h for more details about the protocol.
// This class encapsulates all the state required to connect to a server and
@@ -24,7 +34,7 @@
SctpClientConnection(aos::ShmEventLoop *const event_loop,
std::string_view remote_name, const Node *my_node,
std::string_view local_host,
- std::vector<std::unique_ptr<aos::RawSender>> *channels,
+ std::vector<SctpClientChannelState> *channels,
int client_index,
MessageBridgeClientStatus *client_status);
@@ -62,7 +72,7 @@
SctpClient client_;
// Channels to send received messages on.
- std::vector<std::unique_ptr<aos::RawSender>> *channels_;
+ std::vector<SctpClientChannelState> *channels_;
// Stream number -> channel lookup.
std::vector<int> stream_to_channel_;
// Bitmask signaling if we should be replying back with delivery times.
@@ -97,7 +107,7 @@
MessageBridgeClientStatus client_status_;
// Channels to send data over.
- std::vector<std::unique_ptr<aos::RawSender>> channels_;
+ std::vector<SctpClientChannelState> channels_;
// List of connections. These correspond to the nodes in source_node_names_
std::vector<std::unique_ptr<SctpClientConnection>> connections_;
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index e7107ea..9406d4b 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -31,6 +31,7 @@
connection_builder.add_state(State::DISCONNECTED);
// TODO(austin): Track dropped packets.
connection_builder.add_received_packets(0);
+ connection_builder.add_duplicate_packets(0);
connection_builder.add_monotonic_offset(0);
connection_offsets.emplace_back(connection_builder.Finish());
}
@@ -90,6 +91,10 @@
client_connection_builder.add_state(connection->state());
client_connection_builder.add_received_packets(
connection->received_packets());
+ if (connection->duplicate_packets() != 0) {
+ client_connection_builder.add_duplicate_packets(
+ connection->duplicate_packets());
+ }
// Strip out the monotonic offset if it isn't populated.
TimestampFilter *filter = &filters_[client_connection_offsets_.size()];
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 50ac97e..b6c94e5 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -24,9 +24,8 @@
channel_->max_size() == other_channel->max_size());
}
-void ChannelState::SendData(SctpServer *server, const Context &context) {
- // TODO(austin): I don't like allocating this buffer when we are just freeing
- // it at the end of the function.
+flatbuffers::FlatBufferBuilder ChannelState::PackContext(
+ const Context &context) {
flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
fbb.ForceDefaults(true);
VLOG(1) << "Found " << peers_.size() << " peers on channel "
@@ -38,6 +37,14 @@
fbb.FinishSizePrefixed(logger::PackMessage(&fbb, context, channel_index_,
logger::LogType::kLogMessage));
+ return fbb;
+}
+
+void ChannelState::SendData(SctpServer *server, const Context &context) {
+ // TODO(austin): I don't like allocating this buffer when we are just freeing
+ // it at the end of the function.
+ flatbuffers::FlatBufferBuilder fbb = PackContext(context);
+
// TODO(austin): Track which connections need to be reliable and handle
// resending properly.
size_t sent_count = 0;
@@ -175,6 +182,7 @@
}
int ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
+ VLOG(1) << "Disconnected " << assoc_id;
for (ChannelState::Peer &peer : peers_) {
if (peer.sac_assoc_id == assoc_id) {
// TODO(austin): This will not handle multiple clients from
@@ -190,12 +198,47 @@
int ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
int stream, SctpServer *server) {
+ VLOG(1) << "Connected to assoc_id: " << assoc_id;
for (ChannelState::Peer &peer : peers_) {
if (peer.connection->name()->string_view() == node->name()->string_view()) {
+ // There's a peer already connected. Disconnect them and take over.
+ if (peer.sac_assoc_id != 0) {
+ LOG(WARNING) << "Peer " << peer.sac_assoc_id
+ << " already connected, aborting old connection.";
+ server->Abort(peer.sac_assoc_id);
+ }
+
peer.sac_assoc_id = assoc_id;
peer.stream = stream;
peer.server_connection_statistics->mutate_state(State::CONNECTED);
server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
+ if (last_message_fetcher_ && peer.connection->time_to_live() == 0) {
+ last_message_fetcher_->Fetch();
+ VLOG(1) << "Got a connection on a reliable channel "
+ << configuration::StrippedChannelToString(
+ last_message_fetcher_->channel())
+ << ", sending? "
+ << (last_message_fetcher_->context().data != nullptr);
+ if (last_message_fetcher_->context().data != nullptr) {
+ // SendData sends to all... Only send to the new one.
+ // TODO(austin): I don't like allocating this buffer when we are just
+ // freeing it at the end of the function.
+ flatbuffers::FlatBufferBuilder fbb =
+ PackContext(last_message_fetcher_->context());
+
+ if (server->Send(std::string_view(reinterpret_cast<const char *>(
+ fbb.GetBufferPointer()),
+ fbb.GetSize()),
+ peer.sac_assoc_id, peer.stream,
+ peer.connection->time_to_live() / 1000000)) {
+ peer.server_connection_statistics->mutate_sent_packets(
+ peer.server_connection_statistics->sent_packets() + 1);
+ } else {
+ peer.server_connection_statistics->mutate_dropped_packets(
+ peer.server_connection_statistics->dropped_packets() + 1);
+ }
+ }
+ }
return peer.node_index;
}
}
@@ -261,9 +304,17 @@
if (configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) &&
channel->has_destination_nodes()) {
+
+ bool any_reliable = false;
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->time_to_live() == 0) {
+ any_reliable = true;
+ }
+ }
max_size = std::max(channel->max_size(), max_size);
- std::unique_ptr<ChannelState> state(
- new ChannelState{channel, channel_index});
+ std::unique_ptr<ChannelState> state(new ChannelState{
+ channel, channel_index,
+ any_reliable ? event_loop_->MakeRawFetcher(channel) : nullptr});
for (const Connection *connection : *channel->destination_nodes()) {
const Node *other_node = configuration::GetNode(
@@ -305,13 +356,19 @@
state_ptr->SendData(&server_, context);
});
} else {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ CHECK_GE(connection->time_to_live(), 1000u);
+ }
CHECK(timestamp_state_ == nullptr);
timestamp_state_ = state.get();
}
channels_.emplace_back(std::move(state));
} else if (channel == timestamp_channel) {
std::unique_ptr<ChannelState> state(
- new ChannelState{channel, channel_index});
+ new ChannelState{channel, channel_index, nullptr});
+ for (const Connection *connection : *channel->destination_nodes()) {
+ CHECK_GE(connection->time_to_live(), 1000u);
+ }
timestamp_state_ = state.get();
channels_.emplace_back(std::move(state));
} else {
@@ -338,8 +395,10 @@
continue;
}
- node_index = channel_state->NodeDisconnected(assoc_id);
- CHECK_NE(node_index, -1);
+ int new_node_index = channel_state->NodeDisconnected(assoc_id);
+ if (new_node_index != -1) {
+ node_index = new_node_index;
+ }
}
if (node_index != -1) {
@@ -360,6 +419,10 @@
const union sctp_notification *snp =
(const union sctp_notification *)message->data();
+ if (VLOG_IS_ON(2)) {
+ PrintNotification(message.get());
+ }
+
switch (snp->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE: {
const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
@@ -380,10 +443,6 @@
}
} break;
}
-
- if (VLOG_IS_ON(2)) {
- PrintNotification(message.get());
- }
} else if (message->message_type == Message::kMessage) {
HandleData(message.get());
}
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index ad384f4..2a75ad6 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -24,8 +24,11 @@
// new message from the event loop.
class ChannelState {
public:
- ChannelState(const Channel *channel, int channel_index)
- : channel_index_(channel_index), channel_(channel) {}
+ ChannelState(const Channel *channel, int channel_index,
+ std::unique_ptr<aos::RawFetcher> last_message_fetcher)
+ : channel_index_(channel_index),
+ channel_(channel),
+ last_message_fetcher_(std::move(last_message_fetcher)) {}
// Class to encapsulate all the state per client on a channel. A client may
// be subscribed to multiple channels.
@@ -73,6 +76,9 @@
// Sends the data in context using the provided server.
void SendData(SctpServer *server, const Context &context);
+ // Packs a context into a size prefixed message header for transmission.
+ flatbuffers::FlatBufferBuilder PackContext(const Context &context);
+
// Handles reception of delivery times.
void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
absl::Span<const uint8_t> data);
@@ -85,10 +91,16 @@
const int channel_index_;
const Channel *const channel_;
+ std::unique_ptr<aos::RawFetcher> fetcher_;
+
std::vector<Peer> peers_;
std::deque<SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader>>
sent_messages_;
+
+ // A fetcher to use to send the last message when a node connects and is
+ // reliable.
+ std::unique_ptr<aos::RawFetcher> last_message_fetcher_;
};
// This encapsulates the state required to talk to *all* the clients from this
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index a3e7f63..36bdde9 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -4,6 +4,7 @@
#include <thread>
#include "absl/strings/str_cat.h"
+#include "aos/event.h"
#include "aos/events/ping_generated.h"
#include "aos/events/pong_generated.h"
#include "aos/network/message_bridge_client_lib.h"
@@ -817,11 +818,323 @@
pi2_test_thread.join();
}
-// TODO(austin): This test confirms that the external state does the right
+// TODO(austin): The above test confirms that the external state does the right
// thing, but doesn't confirm that the internal state does. We either need to
// expose a way to check the state in a thread-safe way, or need a way to jump
// time for one node to do that.
+void SendPing(aos::Sender<examples::Ping> *sender, int value) {
+ aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
+ examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
+ ping_builder.add_value(value);
+ builder.Send(ping_builder.Finish());
+}
+
+// Tests that when a message is sent before the bridge starts up, but is
+// configured as reliable, we forward it. Confirm this survives a client reset.
+TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
+ DoSetShmBase("pi1");
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+
+ FLAGS_application_name = "sender";
+ aos::ShmEventLoop send_event_loop(&pi1_config.message());
+ aos::Sender<examples::Ping> ping_sender =
+ send_event_loop.MakeSender<examples::Ping>("/test");
+ SendPing(&ping_sender, 1);
+ aos::Sender<examples::Ping> unreliable_ping_sender =
+ send_event_loop.MakeSender<examples::Ping>("/unreliable");
+ SendPing(&unreliable_ping_sender, 1);
+
+ FLAGS_application_name = "pi1_message_bridge_server";
+ aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+ FLAGS_application_name = "pi1_timestamp";
+ aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
+
+ // Now do it for "raspberrypi2", the client.
+ DoSetShmBase("pi2");
+ FLAGS_override_hostname = "raspberrypi2";
+
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ aos::ShmEventLoop receive_event_loop(&pi2_config.message());
+ aos::Fetcher<examples::Ping> ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
+ aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+ receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+ const size_t ping_channel_index = configuration::ChannelIndex(
+ receive_event_loop.configuration(), ping_fetcher.channel());
+
+ std::atomic<int> ping_timestamp_count{0};
+ pi1_remote_timestamp_event_loop.MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [ping_channel_index,
+ &ping_timestamp_count](const logger::MessageHeader &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+ if (header.channel_index() == ping_channel_index) {
+ ++ping_timestamp_count;
+ }
+ });
+
+ // Before everything starts up, confirm there is no message.
+ EXPECT_FALSE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+ // Spin up the persistant pieces.
+ std::thread pi1_server_thread(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_server_thread(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+
+ // Event used to wait for the timestamp counting thread to start.
+ aos::Event event;
+ std::thread pi1_remote_timestamp_thread(
+ [&pi1_remote_timestamp_event_loop, &event]() {
+ pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
+ pi1_remote_timestamp_event_loop.Run();
+ });
+
+ event.Wait();
+
+ {
+ // Now, spin up a client for 2 seconds.
+ LOG(INFO) << "Starting first pi2 MessageBridgeClient";
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+ pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_client_event_loop.monotonic_now() +
+ chrono::milliseconds(2050));
+ });
+
+ // And go!
+ pi2_client_event_loop.Run();
+
+ // Confirm there is no detected duplicate packet.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 0u);
+
+ EXPECT_TRUE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+ EXPECT_EQ(ping_timestamp_count, 1);
+ LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
+ }
+
+ {
+ // Now, spin up a second client for 2 seconds.
+ LOG(INFO) << "Starting second pi2 MessageBridgeClient";
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+ pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_client_event_loop.monotonic_now() +
+ chrono::milliseconds(5050));
+ });
+
+ // And go!
+ pi2_client_event_loop.Run();
+
+ // Confirm we detect the duplicate packet correctly.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 1u);
+
+ EXPECT_EQ(ping_timestamp_count, 1);
+ EXPECT_FALSE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+ }
+
+ // Shut everyone else down
+ pi1_server_event_loop.Exit();
+ pi1_client_event_loop.Exit();
+ pi2_server_event_loop.Exit();
+ pi1_remote_timestamp_event_loop.Exit();
+ pi1_remote_timestamp_thread.join();
+ pi1_server_thread.join();
+ pi1_client_thread.join();
+ pi2_server_thread.join();
+}
+
+// Tests that when a message is sent before the bridge starts up, but is
+// configured as reliable, we forward it. Confirm this works across server
+// resets.
+TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
+ // Now do it for "raspberrypi2", the client.
+ DoSetShmBase("pi2");
+ FLAGS_override_hostname = "raspberrypi2";
+
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ aos::ShmEventLoop receive_event_loop(&pi2_config.message());
+ aos::Fetcher<examples::Ping> ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
+ aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+ receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+ DoSetShmBase("pi1");
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+
+ FLAGS_application_name = "sender";
+ aos::ShmEventLoop send_event_loop(&pi1_config.message());
+ aos::Sender<examples::Ping> ping_sender =
+ send_event_loop.MakeSender<examples::Ping>("/test");
+ {
+ aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
+ examples::Ping::Builder ping_builder =
+ builder.MakeBuilder<examples::Ping>();
+ ping_builder.add_value(1);
+ builder.Send(ping_builder.Finish());
+ }
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+ FLAGS_application_name = "pi1_timestamp";
+ aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
+
+ const size_t ping_channel_index = configuration::ChannelIndex(
+ receive_event_loop.configuration(), ping_fetcher.channel());
+
+ std::atomic<int> ping_timestamp_count{0};
+ pi1_remote_timestamp_event_loop.MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [ping_channel_index,
+ &ping_timestamp_count](const logger::MessageHeader &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+ if (header.channel_index() == ping_channel_index) {
+ ++ping_timestamp_count;
+ }
+ });
+
+ // Before everything starts up, confirm there is no message.
+ EXPECT_FALSE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+ // Spin up the persistant pieces.
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_server_thread(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+ std::thread pi2_client_thread(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+
+ // Event used to wait for the timestamp counting thread to start.
+ aos::Event event;
+ std::thread pi1_remote_timestamp_thread(
+ [&pi1_remote_timestamp_event_loop, &event]() {
+ pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
+ pi1_remote_timestamp_event_loop.Run();
+ });
+
+ event.Wait();
+
+ {
+ // Now, spin up a server for 2 seconds.
+ FLAGS_application_name = "pi1_message_bridge_server";
+ aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
+ pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi1_server_event_loop.monotonic_now() +
+ chrono::milliseconds(2050));
+ });
+
+ // And go!
+ pi1_server_event_loop.Run();
+
+ // Confirm there is no detected duplicate packet.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 0u);
+
+ EXPECT_TRUE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+ EXPECT_EQ(ping_timestamp_count, 1);
+ LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+ }
+
+ {
+ // Now, spin up a second server for 2 seconds.
+ FLAGS_application_name = "pi1_message_bridge_server";
+ aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
+ pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi1_server_event_loop.monotonic_now() +
+ chrono::milliseconds(2050));
+ });
+
+ // And go!
+ pi1_server_event_loop.Run();
+
+ // Confirm we detect the duplicate packet correctly.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 1u);
+
+ EXPECT_EQ(ping_timestamp_count, 1);
+ EXPECT_FALSE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+ LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+ }
+
+ // Shut everyone else down
+ pi1_client_event_loop.Exit();
+ pi2_server_event_loop.Exit();
+ pi2_client_event_loop.Exit();
+ pi1_remote_timestamp_event_loop.Exit();
+ pi1_remote_timestamp_thread.join();
+ pi1_client_thread.join();
+ pi2_server_thread.join();
+ pi2_client_thread.join();
+}
+
} // namespace testing
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index 3ac2796..a7c0d1b 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -27,7 +27,8 @@
"name": "pi2",
"priority": 1,
"timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
- "timestamp_logger_nodes": ["pi1"]
+ "timestamp_logger_nodes": ["pi1"],
+ "time_to_live": 5000000
}
]
},
@@ -42,7 +43,8 @@
"name": "pi1",
"priority": 1,
"timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
- "timestamp_logger_nodes": ["pi2"]
+ "timestamp_logger_nodes": ["pi2"],
+ "time_to_live": 5000000
}
]
},
@@ -125,6 +127,20 @@
"timestamp_logger_nodes": ["pi1"]
}
]
+ },
+ {
+ "name": "/unreliable",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"],
+ "time_to_live": 5000000
+ }
+ ]
}
],
"maps": [
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index c8563d5..e18d0d8 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -80,6 +80,44 @@
return ReadSctpMessage(fd_, max_size_);
}
+bool SctpServer::Abort(sctp_assoc_t snd_assoc_id) {
+ // Use the assoc_id for the destination instead of the msg_name.
+ struct msghdr outmsg;
+ outmsg.msg_namelen = 0;
+
+ outmsg.msg_iovlen = 0;
+
+ // Build up the sndinfo message.
+ char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+ outmsg.msg_control = outcmsg;
+ outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
+ outmsg.msg_flags = 0;
+
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+ cmsg->cmsg_level = IPPROTO_SCTP;
+ cmsg->cmsg_type = SCTP_SNDRCV;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+ struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+ sinfo->sinfo_ppid = ++ppid_;
+ sinfo->sinfo_stream = 0;
+ sinfo->sinfo_flags = SCTP_ABORT;
+ sinfo->sinfo_assoc_id = snd_assoc_id;
+
+ // And send.
+ const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (size == -1) {
+ if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
+ return false;
+ }
+ return false;
+ } else {
+ CHECK_EQ(0, size);
+ return true;
+ }
+}
+
bool SctpServer::Send(std::string_view data, sctp_assoc_t snd_assoc_id,
int stream, int timetolive) {
struct iovec iov;
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index 8fa3d15..e18517a 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -36,6 +36,9 @@
bool Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
int timetolive);
+ // Aborts a connection. Returns true on success.
+ bool Abort(sctp_assoc_t snd_assoc_id);
+
int fd() { return fd_; }
// Enables the priority scheduler. This is a SCTP feature which lets us