Add monotonic_offset to the ServerStatistics message
Send a base level of traffic between all nodes which should be
forwarding messages between each other. This helps replay, and also
gives us enough information to estimate the monotonic clock offset
between nodes. That lets us sort out latencies between nodes.
Change-Id: I9b10243aca2444e201d0d8c0a551e29560d0e147
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 1f7a3d7..a905728 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -141,6 +141,7 @@
":ping_fbs",
":pong_fbs",
"//aos/network:message_bridge_client_fbs",
+ "//aos/network:timestamp_fbs",
"//aos/network:message_bridge_server_fbs",
],
deps = [":config"],
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 27df016..6047cbc 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -306,6 +306,16 @@
return sender_ ? true : false;
}
+ // Returns the time_points that the last message was sent at.
+ aos::monotonic_clock::time_point monotonic_sent_time() const {
+ return sender_->monotonic_sent_time();
+ }
+ aos::realtime_clock::time_point realtime_sent_time() const {
+ return sender_->realtime_sent_time();
+ }
+ // Returns the queue index that this was sent with.
+ uint32_t sent_queue_index() const { return sender_->sent_queue_index(); }
+
private:
friend class EventLoop;
Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index c9bc53e..54cc658 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -26,6 +26,56 @@
},
{
"name": "/aos/pi1",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi1",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "time_to_live": 5000000
+ },
+ {
+ "name": "pi3",
+ "priority": 1,
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi2",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/aos/pi3",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi3",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/aos/pi1",
"type": "aos.message_bridge.ServerStatistics",
"source_node": "pi1",
"frequency": 2
@@ -141,7 +191,6 @@
{
"match": {
"name": "/aos",
- "type": "aos.logging.LogMessageFbs",
"source_node": "pi1"
},
"rename": {
@@ -151,7 +200,6 @@
{
"match": {
"name": "/aos",
- "type": "aos.logging.LogMessageFbs",
"source_node": "pi2"
},
"rename": {
@@ -161,97 +209,6 @@
{
"match": {
"name": "/aos",
- "type": "aos.logging.LogMessageFbs",
- "source_node": "pi3"
- },
- "rename": {
- "name": "/aos/pi3"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.timing.Report",
- "source_node": "pi1"
- },
- "rename": {
- "name": "/aos/pi1"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.timing.Report",
- "source_node": "pi2"
- },
- "rename": {
- "name": "/aos/pi2"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.timing.Report",
- "source_node": "pi3"
- },
- "rename": {
- "name": "/aos/pi3"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.message_bridge.ServerStatistics",
- "source_node": "pi1"
- },
- "rename": {
- "name": "/aos/pi1"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.message_bridge.ServerStatistics",
- "source_node": "pi2"
- },
- "rename": {
- "name": "/aos/pi2"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.message_bridge.ServerStatistics",
- "source_node": "pi3"
- },
- "rename": {
- "name": "/aos/pi3"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.message_bridge.ClientStatistics",
- "source_node": "pi1"
- },
- "rename": {
- "name": "/aos/pi1"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.message_bridge.ClientStatistics",
- "source_node": "pi2"
- },
- "rename": {
- "name": "/aos/pi2"
- }
- },
- {
- "match": {
- "name": "/aos",
- "type": "aos.message_bridge.ClientStatistics",
"source_node": "pi3"
},
"rename": {
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 11eebdb..f921a04 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -14,6 +14,15 @@
)
flatbuffer_cc_library(
+ name = "timestamp_fbs",
+ srcs = ["timestamp.fbs"],
+ gen_reflections = 1,
+ includes = [
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
+flatbuffer_cc_library(
name = "message_bridge_client_fbs",
srcs = ["message_bridge_client.fbs"],
gen_reflections = 1,
@@ -96,9 +105,19 @@
cc_library(
name = "message_bridge_protocol",
+ srcs = [
+ "message_bridge_protocol.cc",
+ ],
hdrs = [
"message_bridge_protocol.h",
],
+ deps = [
+ ":connect_fbs",
+ "//aos:configuration",
+ "//aos:flatbuffer_merge",
+ "//aos:flatbuffers",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ ],
)
cc_library(
@@ -114,10 +133,12 @@
],
deps = [
":connect_fbs",
+ ":message_bridge_client_fbs",
":message_bridge_protocol",
":message_bridge_server_fbs",
":sctp_lib",
":sctp_server",
+ ":timestamp_fbs",
"//aos:unique_malloc_ptr",
"//aos/events:shm_event_loop",
"//aos/events/logging:logger",
@@ -172,6 +193,7 @@
":message_bridge_protocol",
":message_bridge_server_fbs",
":sctp_client",
+ ":timestamp_fbs",
"//aos/events:shm_event_loop",
"//aos/events/logging:logger",
],
@@ -201,6 +223,7 @@
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
"//aos/network:message_bridge_server_fbs",
+ "//aos/network:timestamp_fbs",
],
deps = ["//aos/events:config"],
)
@@ -240,11 +263,11 @@
flatbuffer_cc_library(
name = "web_proxy_fbs",
srcs = ["web_proxy.fbs"],
+ gen_reflections = True,
includes = [
":connect_fbs_includes",
"//aos:configuration_fbs_includes",
],
- gen_reflections = True,
)
flatbuffer_ts_library(
@@ -258,8 +281,8 @@
cc_library(
name = "web_proxy_utils",
- hdrs = ["web_proxy_utils.h"],
srcs = ["web_proxy_utils.cc"],
+ hdrs = ["web_proxy_utils.h"],
deps = [
":connect_fbs",
":web_proxy_fbs",
@@ -300,25 +323,25 @@
cc_binary(
name = "web_proxy_main",
srcs = ["web_proxy_main.cc"],
- deps = [
- ":gen_embedded",
- ":web_proxy",
- "//aos/events:shm_event_loop",
- "//aos:init",
- "//aos/seasocks:seasocks_logger",
- "//third_party/seasocks",
- "@com_github_google_flatbuffers//:flatbuffers"
- ],
copts = [
"-DWEBRTC_POSIX",
"-Wno-unused-parameter",
],
data = [
- "//aos/network/www:files",
- "//aos/network/www:main_bundle",
- "//aos/network/www:flatbuffers",
- "@com_github_google_flatbuffers//:flatjs",
"//aos/events:pingpong_config.json",
+ "//aos/network/www:files",
+ "//aos/network/www:flatbuffers",
+ "//aos/network/www:main_bundle",
+ "@com_github_google_flatbuffers//:flatjs",
+ ],
+ deps = [
+ ":gen_embedded",
+ ":web_proxy",
+ "//aos:init",
+ "//aos/events:shm_event_loop",
+ "//aos/seasocks:seasocks_logger",
+ "//third_party/seasocks",
+ "@com_github_google_flatbuffers//:flatbuffers",
],
)
diff --git a/aos/network/message_bridge_client.fbs b/aos/network/message_bridge_client.fbs
index 58b653a..df3f02f 100644
--- a/aos/network/message_bridge_client.fbs
+++ b/aos/network/message_bridge_client.fbs
@@ -13,6 +13,11 @@
// Number of packets received on all channels.
received_packets:uint;
+ // This is the measured monotonic offset for just the server -> client
+ // direction measured in nanoseconds. Subtract this from our monotonic time
+ // to get their monotonic time.
+ monotonic_offset:int64;
+
// TODO(austin): Per channel counts?
}
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 93eccc9..263a757 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -9,6 +9,7 @@
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_protocol.h"
#include "aos/network/sctp_client.h"
+#include "aos/network/timestamp_generated.h"
#include "aos/unique_malloc_ptr.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -26,40 +27,6 @@
namespace {
namespace chrono = std::chrono;
-aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
- const Configuration *config, const Node *my_node,
- std::string_view remote_name) {
- CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
-
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(1);
-
- flatbuffers::Offset<Node> node_offset = CopyFlatBuffer<Node>(my_node, &fbb);
- const std::string_view node_name = my_node->name()->string_view();
-
- std::vector<flatbuffers::Offset<Channel>> channel_offsets;
- for (const Channel *channel : *config->channels()) {
- if (channel->has_destination_nodes()) {
- for (const Connection *connection : *channel->destination_nodes()) {
- if (connection->name()->string_view() == node_name &&
- channel->source_node()->string_view() == remote_name) {
- channel_offsets.emplace_back(CopyFlatBuffer<Channel>(channel, &fbb));
- }
- }
- }
- }
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
- channels_offset = fbb.CreateVector(channel_offsets);
-
- Connect::Builder connect_builder(fbb);
- connect_builder.add_channels_to_transfer(channels_offset);
- connect_builder.add_node(node_offset);
- fbb.Finish(connect_builder.Finish());
-
- return fbb.Release();
-}
-
std::vector<int> StreamToChannel(const Configuration *config,
const Node *my_node, const Node *other_node) {
std::vector<int> stream_to_channel;
@@ -120,20 +87,25 @@
}
FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
- const std::vector<std::string_view> &source_node_names,
- const Configuration *configuration) {
+ const std::vector<std::string_view> &source_node_names) {
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
for (const std::string_view node_name : source_node_names) {
- flatbuffers::Offset<Node> node_offset =
- CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+ flatbuffers::Offset<flatbuffers::String> node_name_offset =
+ fbb.CreateString(node_name);
+
+ Node::Builder node_builder(fbb);
+ node_builder.add_name(node_name_offset);
+ flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
ClientConnection::Builder connection_builder(fbb);
connection_builder.add_node(node_offset);
connection_builder.add_state(State::DISCONNECTED);
// TODO(austin): Track dropped packets.
connection_builder.add_received_packets(0);
+ connection_builder.add_monotonic_offset(0);
connection_offsets.emplace_back(connection_builder.Finish());
}
flatbuffers::Offset<
@@ -266,6 +238,26 @@
chrono::nanoseconds(message_header->realtime_sent_time())),
message_header->queue_index());
+ const std::chrono::nanoseconds offset =
+ sender->monotonic_sent_time() -
+ aos::monotonic_clock::time_point(
+ chrono::nanoseconds(message_header->monotonic_sent_time()));
+
+ // If this is our first observation, use that to seed the base offset. That
+ // gets us in the ballpark.
+ if (!filter_.has_sample()) {
+ filter_.set_base_offset(offset);
+ }
+
+ // We can now measure the latency!
+ filter_.Sample(sender->monotonic_sent_time(), offset);
+
+ connection_->mutate_monotonic_offset(
+ (chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(filter_.offset())) +
+ filter_.base_offset())
+ .count());
+
if (stream_reply_with_timestamp_[stream]) {
// TODO(austin): Send back less if we are only acking. Maybe only a
// stream id? Nothing if we are only forwarding?
@@ -316,8 +308,9 @@
sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
source_node_names_(configuration::SourceNodeNames(
event_loop->configuration(), event_loop->node())),
- statistics_(MakeClientStatistics(source_node_names_,
- event_loop->configuration())) {
+ statistics_(MakeClientStatistics(source_node_names_)) {
+ client_connection_offsets_.reserve(
+ statistics_.message().connections()->size());
std::string_view node_name = event_loop->node()->name()->string_view();
// Find all the channels which are supposed to be delivered to us.
@@ -355,10 +348,54 @@
// And kick it all off.
statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
event_loop_->OnRun([this]() {
- statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
- chrono::seconds(1));
+ statistics_timer_->Setup(
+ event_loop_->monotonic_now() + chrono::milliseconds(100),
+ chrono::milliseconds(100));
});
}
+void MessageBridgeClient::SendStatistics() {
+ // Copy from statistics_ and drop monotonic_offset if it isn't populated yet.
+ // There doesn't exist a good way to drop fields otherwise.
+ aos::Sender<ClientStatistics>::Builder builder = sender_.MakeBuilder();
+ client_connection_offsets_.clear();
+
+ for (const ClientConnection *connection :
+ *statistics_.message().connections()) {
+ flatbuffers::Offset<flatbuffers::String> node_name_offset =
+ builder.fbb()->CreateString(connection->node()->name()->string_view());
+ Node::Builder node_builder = builder.MakeBuilder<Node>();
+ node_builder.add_name(node_name_offset);
+ flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+ ClientConnection::Builder client_connection_builder =
+ builder.MakeBuilder<ClientConnection>();
+
+ client_connection_builder.add_node(node_offset);
+ client_connection_builder.add_state(connection->state());
+ client_connection_builder.add_received_packets(
+ connection->received_packets());
+
+ // Strip out the monotonic offset if it isn't populated.
+ if (connection->monotonic_offset() != 0) {
+ client_connection_builder.add_monotonic_offset(
+ connection->monotonic_offset());
+ }
+
+ client_connection_offsets_.emplace_back(client_connection_builder.Finish());
+ }
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+ client_connections_offset =
+ builder.fbb()->CreateVector(client_connection_offsets_);
+
+ ClientStatistics::Builder client_statistics_builder =
+ builder.MakeBuilder<ClientStatistics>();
+ client_statistics_builder.add_connections(client_connections_offset);
+
+ builder.Send(client_statistics_builder.Finish());
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 77bf2b7..2811553 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -7,6 +7,7 @@
#include "aos/events/logging/logger_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/network/connect_generated.h"
+#include "aos/network/timestamp_filter.h"
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/sctp_client.h"
#include "aos/network/sctp_lib.h"
@@ -76,6 +77,9 @@
// id of the server once known. This is only valid if connection_ says
// connected.
sctp_assoc_t remote_assoc_id_ = 0;
+
+ // Filter for the timestamp offset for this connection.
+ TimestampFilter filter_;
};
// This encapsulates the state required to talk to *all* the servers from this
@@ -89,7 +93,7 @@
private:
// Sends out the statistics that are continually updated by the
// SctpClientConnections.
- void SendStatistics() { sender_.Send(statistics_); }
+ void SendStatistics();
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
@@ -102,6 +106,9 @@
// Data to publish.
FlatbufferDetachedBuffer<ClientStatistics> statistics_;
+ // Reserved memory for the client connection offsets to reduce heap
+ // allocations.
+ std::vector<flatbuffers::Offset<ClientConnection>> client_connection_offsets_;
// Channels to send data over.
std::vector<std::unique_ptr<aos::RawSender>> channels_;
diff --git a/aos/network/message_bridge_protocol.cc b/aos/network/message_bridge_protocol.cc
new file mode 100644
index 0000000..9663edf
--- /dev/null
+++ b/aos/network/message_bridge_protocol.cc
@@ -0,0 +1,49 @@
+#include "aos/network/message_bridge_protocol.h"
+
+#include <string_view>
+
+#include "aos/configuration.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/flatbuffers.h"
+#include "aos/network/connect_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace message_bridge {
+
+aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
+ const Configuration *config, const Node *my_node,
+ std::string_view remote_name) {
+ CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ flatbuffers::Offset<Node> node_offset = CopyFlatBuffer<Node>(my_node, &fbb);
+ const std::string_view node_name = my_node->name()->string_view();
+
+ std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+ for (const Channel *channel : *config->channels()) {
+ if (channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() == node_name &&
+ channel->source_node()->string_view() == remote_name) {
+ channel_offsets.emplace_back(CopyFlatBuffer<Channel>(channel, &fbb));
+ }
+ }
+ }
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ channels_offset = fbb.CreateVector(channel_offsets);
+
+ Connect::Builder connect_builder(fbb);
+ connect_builder.add_channels_to_transfer(channels_offset);
+ connect_builder.add_node(node_offset);
+ fbb.Finish(connect_builder.Finish());
+
+ return fbb.Release();
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_protocol.h b/aos/network/message_bridge_protocol.h
index 1136188..7b6a248 100644
--- a/aos/network/message_bridge_protocol.h
+++ b/aos/network/message_bridge_protocol.h
@@ -1,6 +1,11 @@
#ifndef AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
#define AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+#include <string_view>
+
+#include "aos/configuration.h"
+#include "aos/network/connect_generated.h"
+
namespace aos {
namespace message_bridge {
@@ -25,6 +30,11 @@
// The stream on which timestamp replies are sent.
constexpr size_t kTimestampStream() { return 1; }
+// Builds up a subscription request for my_node to remote_name.
+aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
+ const Configuration *config, const Node *my_node,
+ std::string_view remote_name);
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
index 75a6a15..8f06fdb 100644
--- a/aos/network/message_bridge_server.fbs
+++ b/aos/network/message_bridge_server.fbs
@@ -22,6 +22,11 @@
// Number of packets received on all channels.
sent_packets:uint;
+ // This is the measured monotonic offset for the connected node in
+ // nanoseconds. Add this to our monotonic time to get their
+ // monotonic time.
+ monotonic_offset:int64;
+
// TODO(austin): Per channel counts?
}
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index c32829c..f54e2bf 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -33,6 +33,7 @@
connection_builder.add_state(State::DISCONNECTED);
connection_builder.add_dropped_packets(0);
connection_builder.add_sent_packets(0);
+ connection_builder.add_monotonic_offset(0);
connection_offsets.emplace_back(connection_builder.Finish());
}
flatbuffers::Offset<
@@ -211,11 +212,52 @@
configuration::DestinationNodeNames(event_loop->configuration(),
event_loop->node()),
event_loop->configuration())),
+ timestamp_sender_(event_loop_->MakeSender<Timestamp>("/aos")),
+ client_statistics_fetcher_(
+ event_loop_->MakeFetcher<ClientStatistics>("/aos")),
server_("::", event_loop->node()->port()) {
CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
- // TODO(austin): Time sync. sctp gives us filtered round trip time, not
- // target time.
+ server_connection_offsets_.reserve(
+ statistics_.message().connections()->size());
+
+ int32_t max_size = 0;
+ timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
+ filters_.resize(event_loop->configuration()->nodes()->size());
+ server_connection_.resize(event_loop->configuration()->nodes()->size());
+
+ // Seed up all the per-node connection state.
+ for (std::string_view destination_node_name :
+ configuration::DestinationNodeNames(event_loop->configuration(),
+ event_loop->node())) {
+ // Find the largest connection message so we can size our buffers big enough
+ // to receive a connection message.
+ max_size = std::max(
+ max_size,
+ static_cast<int32_t>(MakeConnectMessage(event_loop->configuration(),
+ event_loop->node(),
+ destination_node_name)
+ .size()));
+ const Node *destination_node = configuration::GetNode(
+ event_loop->configuration(), destination_node_name);
+
+ const int node_index = configuration::GetNodeIndex(
+ event_loop->configuration(), destination_node);
+
+ // Now find the timestamp channel forwarded from the other node.
+ const Channel *const other_timestamp_channel =
+ configuration::GetChannel(event_loop_->configuration(), "/aos",
+ Timestamp::GetFullyQualifiedName(),
+ event_loop_->name(), destination_node);
+
+ timestamp_fetchers_[node_index] = event_loop_->MakeFetcher<Timestamp>(
+ other_timestamp_channel->name()->string_view());
+
+ // And then find the server connection that we should be populating
+ // statistics into.
+ server_connection_[node_index] = FindServerConnection(
+ statistics_.mutable_message(), destination_node->name()->string_view());
+ }
// TODO(austin): Logging synchronization.
//
@@ -228,11 +270,16 @@
LOG(INFO) << "Hostname: " << event_loop_->node()->hostname()->string_view();
int channel_index = 0;
+ const Channel *const timestamp_channel = configuration::GetChannel(
+ event_loop_->configuration(), "/aos", Timestamp::GetFullyQualifiedName(),
+ event_loop_->name(), event_loop_->node());
+
for (const Channel *channel : *event_loop_->configuration()->channels()) {
CHECK(channel->has_source_node());
- if (channel->source_node()->string_view() ==
- event_loop_->node()->name()->string_view() &&
+
+ if (configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) &&
channel->has_destination_nodes()) {
+ max_size = std::max(channel->max_size(), max_size);
std::unique_ptr<ChannelState> state(
new ChannelState{channel, channel_index});
@@ -246,13 +293,20 @@
configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
}
- // Call SendData for every message.
- ChannelState *state_ptr = state.get();
- event_loop_->MakeRawWatcher(
- channel,
- [this, state_ptr](const Context &context, const void * /*message*/) {
- state_ptr->SendData(&server_, context);
- });
+ // Don't subscribe to timestamps on the timestamp channel. Those get
+ // handled specially.
+ if (channel != timestamp_channel) {
+ // Call SendData for every message.
+ ChannelState *state_ptr = state.get();
+ event_loop_->MakeRawWatcher(
+ channel, [this, state_ptr](const Context &context,
+ const void * /*message*/) {
+ state_ptr->SendData(&server_, context);
+ });
+ } else {
+ CHECK(timestamp_state_ == nullptr);
+ timestamp_state_ = state.get();
+ }
channels_.emplace_back(std::move(state));
} else {
channels_.emplace_back(nullptr);
@@ -260,10 +314,13 @@
++channel_index;
}
- statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+ // Buffer up the max size a bit so everything fits nicely.
+ server_.SetMaxSize(max_size + 100);
+
+ statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
event_loop_->OnRun([this]() {
- statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
- chrono::seconds(1));
+ statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
+ kPingPeriod);
});
}
@@ -365,5 +422,173 @@
}
}
+void MessageBridgeServer::SendStatistics() {
+ aos::Sender<ServerStatistics>::Builder builder = sender_.MakeBuilder();
+
+ server_connection_offsets_.clear();
+
+ // Copy the statistics over, but only add monotonic_offset if it is valid.
+ for (const ServerConnection *connection :
+ *statistics_.message().connections()) {
+ flatbuffers::Offset<flatbuffers::String> node_name_offset =
+ builder.fbb()->CreateString(connection->node()->name()->string_view());
+ Node::Builder node_builder = builder.MakeBuilder<Node>();
+ node_builder.add_name(node_name_offset);
+ flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+ ServerConnection::Builder server_connection_builder =
+ builder.MakeBuilder<ServerConnection>();
+ server_connection_builder.add_node(node_offset);
+ server_connection_builder.add_state(connection->state());
+ server_connection_builder.add_dropped_packets(
+ connection->dropped_packets());
+ server_connection_builder.add_sent_packets(connection->sent_packets());
+
+ // TODO(austin): If it gets stale, drop it too.
+ if (connection->monotonic_offset() != 0) {
+ server_connection_builder.add_monotonic_offset(
+ connection->monotonic_offset());
+ }
+
+ server_connection_offsets_.emplace_back(server_connection_builder.Finish());
+ }
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+ server_connections_offset =
+ builder.fbb()->CreateVector(server_connection_offsets_);
+
+ ServerStatistics::Builder server_statistics_builder =
+ builder.MakeBuilder<ServerStatistics>();
+ server_statistics_builder.add_connections(server_connections_offset);
+ builder.Send(server_statistics_builder.Finish());
+}
+
+void MessageBridgeServer::Tick() {
+ // Send statistics every kStatisticsPeriod. Use the context so we don't get
+ // caught up with the wakeup delay and jitter.
+ if (event_loop_->context().monotonic_event_time >=
+ last_statistics_send_time_ + kStatisticsPeriod) {
+ SendStatistics();
+ last_statistics_send_time_ = event_loop_->context().monotonic_event_time;
+ }
+
+ // The message_bridge_client application measures and filters the offsets from
+ // all messages it receives. It then sends this on in the ClientStatistics
+ // message. Collect that up and forward it back over the Timestamp message so
+ // we have guarenteed traffic on the other node for timestamping. This also
+ // moves the offsets back across the network so both directions can be
+ // observed.
+ client_statistics_fetcher_.Fetch();
+
+ // Build up the timestamp message. Do it here so that we don't have invalid
+ // data in it.
+ FlatbufferFixedAllocatorArray<Timestamp, 1000> timestamp_copy;
+ flatbuffers::FlatBufferBuilder *fbb = timestamp_copy.Builder();
+
+ if (client_statistics_fetcher_.get()) {
+ // Build up the list of client offsets.
+ std::vector<flatbuffers::Offset<ClientOffset>> client_offsets;
+
+ // Iterate through the connections this node has made.
+ for (const ClientConnection *connection :
+ *client_statistics_fetcher_->connections()) {
+ // Filter out the ones which aren't connected.
+ if (connection->state() != State::CONNECTED) continue;
+ // And the ones without monotonic offsets.
+ if (!connection->has_monotonic_offset()) continue;
+
+ const int node_index = configuration::GetNodeIndex(
+ event_loop_->configuration(),
+ connection->node()->name()->string_view());
+
+ timestamp_fetchers_[node_index].Fetch();
+
+ // Find the offset computed on their node for this client connection
+ // using their timestamp message.
+ bool has_their_offset = false;
+ std::chrono::nanoseconds their_offset = std::chrono::nanoseconds(0);
+ if (timestamp_fetchers_[node_index].get() != nullptr) {
+ for (const ClientOffset *client_offset :
+ *timestamp_fetchers_[node_index]->offsets()) {
+ if (client_offset->node()->name()->string_view() ==
+ event_loop_->node()->name()->string_view()) {
+ if (client_offset->has_monotonic_offset()) {
+ their_offset =
+ std::chrono::nanoseconds(client_offset->monotonic_offset());
+ has_their_offset = true;
+ }
+ break;
+ }
+ }
+ }
+
+ if (has_their_offset) {
+ // Update the filters.
+ if (filters_[node_index].MissingSamples()) {
+ // Update the offset the first time. This should be representative.
+ filters_[node_index].set_base_offset(
+ std::chrono::nanoseconds(connection->monotonic_offset()));
+ }
+ // The message_bridge_clients are the ones running the first filter. So
+ // set the values from that and let the averaging filter run from there.
+ filters_[node_index].FwdSet(
+ timestamp_fetchers_[node_index].context().monotonic_remote_time,
+ std::chrono::nanoseconds(connection->monotonic_offset()));
+ filters_[node_index].RevSet(
+ client_statistics_fetcher_.context().monotonic_event_time,
+ their_offset);
+
+ // Publish!
+ server_connection_[node_index]->mutate_monotonic_offset(
+ -filters_[node_index].offset().count());
+ }
+
+ // Now fill out the Timestamp message with the offset from the client.
+ flatbuffers::Offset<flatbuffers::String> node_name_offset =
+ fbb->CreateString(connection->node()->name()->string_view());
+
+ Node::Builder node_builder(*fbb);
+ node_builder.add_name(node_name_offset);
+ flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+ ClientOffset::Builder client_offset_builder(*fbb);
+ client_offset_builder.add_node(node_offset);
+ client_offset_builder.add_monotonic_offset(
+ connection->monotonic_offset());
+ client_offsets.emplace_back(client_offset_builder.Finish());
+ }
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
+ offsets_offset = fbb->CreateVector(client_offsets);
+
+ Timestamp::Builder builder(*fbb);
+ builder.add_offsets(offsets_offset);
+ timestamp_copy.Finish(builder.Finish());
+ } else {
+ // Publish an empty timestamp if we have nothing.
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
+ offsets_offset =
+ fbb->CreateVector(std::vector<flatbuffers::Offset<ClientOffset>>{});
+ Timestamp::Builder builder(*fbb);
+ builder.add_offsets(offsets_offset);
+ timestamp_copy.Finish(builder.Finish());
+ }
+
+ // Send it out over shm, and using that timestamp, then send it out over sctp.
+ // This avoid some context switches.
+ timestamp_sender_.Send(timestamp_copy);
+
+ Context context;
+ context.monotonic_event_time = timestamp_sender_.monotonic_sent_time();
+ context.realtime_event_time = timestamp_sender_.realtime_sent_time();
+ context.queue_index = timestamp_sender_.sent_queue_index();
+ context.size = timestamp_copy.size();
+ context.data = timestamp_copy.data();
+
+ // Since we are building up the timestamp to send here, we need to trigger the
+ // SendData call ourselves.
+ timestamp_state_->SendData(&server_, context);
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index f202fc9..d271c8c 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -8,8 +8,10 @@
#include "aos/events/logging/logger_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/sctp_server.h"
+#include "aos/network/timestamp_generated.h"
#include "glog/logging.h"
namespace aos {
@@ -105,9 +107,12 @@
// received.
void HandleData(const Message *message);
+ // Handle timestamps and statistics.
+ void Tick();
+
// Sends out the statistics that are continually updated by the
// ChannelState's.
- void SendStatistics() { sender_.Send(statistics_); }
+ void SendStatistics();
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
@@ -116,9 +121,33 @@
aos::Sender<ServerStatistics> sender_;
aos::TimerHandler *statistics_timer_;
FlatbufferDetachedBuffer<ServerStatistics> statistics_;
+ std::vector<flatbuffers::Offset<ServerConnection>> server_connection_offsets_;
+
+ // Sender for the timestamps that we are forwarding over the network.
+ aos::Sender<Timestamp> timestamp_sender_;
+ // ChannelState to send timestamps over the network with.
+ ChannelState *timestamp_state_ = nullptr;
+
+ // Fetcher to grab the measured offsets in the client.
+ aos::Fetcher<ClientStatistics> client_statistics_fetcher_;
+ // All of these are indexed by the other node index.
+ // Fetcher to grab timestamps and therefore offsets from the other nodes.
+ std::vector<aos::Fetcher<Timestamp>> timestamp_fetchers_;
+ // Bidirectional filters for each connection.
+ std::vector<ClippedAverageFilter> filters_;
+ // ServerConnection to fill out the offsets for from each node.
+ std::vector<ServerConnection *> server_connection_;
SctpServer server_;
+ static constexpr std::chrono::nanoseconds kStatisticsPeriod =
+ std::chrono::seconds(1);
+ static constexpr std::chrono::nanoseconds kPingPeriod =
+ std::chrono::milliseconds(100);
+
+ aos::monotonic_clock::time_point last_statistics_send_time_ =
+ aos::monotonic_clock::min_time;
+
// List of channels. The entries that aren't sent from this node are left
// null.
std::vector<std::unique_ptr<ChannelState>> channels_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 383c1c4..c1dd1ca 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -49,8 +49,12 @@
FLAGS_application_name = "pi1_message_bridge_server";
// Force ourselves to be "raspberrypi" and allocate everything.
FLAGS_override_hostname = "raspberrypi";
- aos::ShmEventLoop server_event_loop(&server_config.message());
- MessageBridgeServer message_bridge_server(&server_event_loop);
+ aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
// And build the app which sends the pings.
FLAGS_application_name = "ping";
@@ -61,42 +65,41 @@
// Now do it for "raspberrypi2", the client.
FLAGS_application_name = "pi2_message_bridge_client";
FLAGS_override_hostname = "raspberrypi2";
- aos::ShmEventLoop client_event_loop(&client_config.message());
- MessageBridgeClient message_bridge_client(&client_event_loop);
+ aos::ShmEventLoop pi2_client_event_loop(&client_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&client_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
// And build the app which sends the pongs.
FLAGS_application_name = "pong";
aos::ShmEventLoop pong_event_loop(&client_config.message());
+ // And build the app for testing.
+ FLAGS_application_name = "test";
+ aos::ShmEventLoop test_event_loop(&client_config.message());
+
+ aos::Fetcher<ClientStatistics> client_statistics_fetcher =
+ test_event_loop.MakeFetcher<ClientStatistics>("/aos");
+
// Count the pongs.
int pong_count = 0;
pong_event_loop.MakeWatcher(
- "/test2", [&pong_count, &ping_event_loop](const examples::Ping &ping) {
+ "/test2", [&pong_count](const examples::Ping &ping) {
++pong_count;
LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
- if (pong_count >= 2) {
- LOG(INFO) << "That's enough bailing early.";
- // And Exit is async safe, so thread safe is easy.
- ping_event_loop.Exit();
- }
});
FLAGS_override_hostname = "";
- // Start everything up. Pong is the only thing we don't know how to wait on,
- // so start it first.
- std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
-
- std::thread server_thread(
- [&server_event_loop]() { server_event_loop.Run(); });
- std::thread client_thread(
- [&client_event_loop]() { client_event_loop.Run(); });
-
// Wait until we are connected, then send.
int ping_count = 0;
+ int pi1_server_statistics_count = 0;
ping_event_loop.MakeWatcher(
- "/aos/pi1", [&ping_count, &client_event_loop,
- &ping_sender](const ServerStatistics &stats) {
+ "/aos/pi1",
+ [&ping_count, &pi2_client_event_loop, &ping_sender,
+ &pi1_server_statistics_count](const ServerStatistics &stats) {
LOG(INFO) << FlatbufferToJson(&stats);
ASSERT_TRUE(stats.has_connections());
@@ -104,12 +107,21 @@
bool connected = false;
for (const ServerConnection *connection : *stats.connections()) {
+ // Confirm that we are estimating the server time offset correctly. It
+ // should be about 0 since we are on the same machine here.
+ if (connection->has_monotonic_offset()) {
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ ++pi1_server_statistics_count;
+ }
+
if (connection->node()->name()->string_view() ==
- client_event_loop.node()->name()->string_view()) {
+ pi2_client_event_loop.node()->name()->string_view()) {
if (connection->state() == State::CONNECTED) {
connected = true;
}
- break;
}
}
@@ -124,29 +136,136 @@
}
});
- // Time ourselves out after a while if Pong doesn't do it for us.
+ // Confirm both client and server statistics messages have decent offsets in
+ // them.
+ int pi2_server_statistics_count = 0;
+ pong_event_loop.MakeWatcher("/aos/pi2", [&pi2_server_statistics_count](
+ const ServerStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
+ for (const ServerConnection *connection : *stats.connections()) {
+ if (connection->has_monotonic_offset()) {
+ ++pi2_server_statistics_count;
+ // Confirm that we are estimating the server time offset correctly. It
+ // should be about 0 since we are on the same machine here.
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+ }
+ });
+
+ int pi1_client_statistics_count = 0;
+ ping_event_loop.MakeWatcher(
+ "/aos/pi1", [&pi1_client_statistics_count](const ClientStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
+
+ for (const ClientConnection *connection : *stats.connections()) {
+ if (connection->has_monotonic_offset()) {
+ ++pi1_client_statistics_count;
+ // It takes at least 10 microseconds to send a message between the
+ // client and server. The min (filtered) time shouldn't be over 10
+ // milliseconds on localhost. This might have to bump up if this is
+ // proving flaky.
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(10));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::microseconds(10));
+ }
+ }
+ });
+
+ int pi2_client_statistics_count = 0;
+ pong_event_loop.MakeWatcher("/aos/pi2", [&pi2_client_statistics_count](
+ const ClientStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
+
+ for (const ClientConnection *connection : *stats.connections()) {
+ if (connection->has_monotonic_offset()) {
+ ++pi2_client_statistics_count;
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(10));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::microseconds(10));
+ }
+ }
+ });
+
+ ping_event_loop.MakeWatcher("/aos/pi1", [](const Timestamp ×tamp) {
+ EXPECT_TRUE(timestamp.has_offsets());
+ LOG(INFO) << FlatbufferToJson(×tamp);
+ });
+ pong_event_loop.MakeWatcher("/aos/pi2", [](const Timestamp ×tamp) {
+ EXPECT_TRUE(timestamp.has_offsets());
+ LOG(INFO) << FlatbufferToJson(×tamp);
+ });
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
aos::TimerHandler *quit = ping_event_loop.AddTimer(
[&ping_event_loop]() { ping_event_loop.Exit(); });
ping_event_loop.OnRun([quit, &ping_event_loop]() {
- quit->Setup(ping_event_loop.monotonic_now() + chrono::seconds(10));
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
});
+ // Start everything up. Pong is the only thing we don't know how to wait on,
+ // so start it first.
+ std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+
+ std::thread pi1_server_thread(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_client_thread(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+ std::thread pi2_server_thread(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
// And go!
ping_event_loop.Run();
// Shut everyone else down
- server_event_loop.Exit();
- client_event_loop.Exit();
+ pi1_server_event_loop.Exit();
+ pi1_client_event_loop.Exit();
+ pi2_client_event_loop.Exit();
+ pi2_server_event_loop.Exit();
pong_event_loop.Exit();
- server_thread.join();
- client_thread.join();
+ pi1_server_thread.join();
+ pi1_client_thread.join();
+ pi2_client_thread.join();
+ pi2_server_thread.join();
pong_thread.join();
// Make sure we sent something.
EXPECT_GE(ping_count, 1);
// And got something back.
EXPECT_GE(pong_count, 1);
+
+ // Confirm that we are estimating a monotonic offset on the client.
+ ASSERT_TRUE(client_statistics_fetcher.Fetch());
+
+ EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
+ EXPECT_EQ(client_statistics_fetcher->connections()
+ ->Get(0)
+ ->node()
+ ->name()
+ ->string_view(),
+ "pi1");
+
+ // Make sure the offset in one direction is less than a second.
+ EXPECT_GT(
+ client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
+ EXPECT_LT(
+ client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
+ 1000000000);
+
+ EXPECT_GE(pi1_server_statistics_count, 2);
+ EXPECT_GE(pi2_server_statistics_count, 2);
+ EXPECT_GE(pi1_client_statistics_count, 2);
+ EXPECT_GE(pi2_client_statistics_count, 2);
+
+ // TODO(austin): Need 2 servers going so we can do the round trip offset
+ // estimation.
}
} // namespace testing
diff --git a/aos/network/message_bridge_test_client.json b/aos/network/message_bridge_test_client.json
index 65d28d2..d748225 100644
--- a/aos/network/message_bridge_test_client.json
+++ b/aos/network/message_bridge_test_client.json
@@ -11,7 +11,7 @@
{
"name": "pi2",
"hostname": "raspberrypi2",
- "port": 9971
+ "port": 9972
}
]
}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a073869..1e5dfce 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -18,6 +18,46 @@
},
{
"name": "/aos/pi1",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi1",
+ "frequency": 10,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1
+ }
+ ]
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi2",
+ "frequency": 10,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1
+ }
+ ]
+ },
+ {
+ "name": "/aos/pi1_forwarded",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi2",
+ "frequency": 10,
+ "max_size": 200
+ },
+ {
+ "name": "/aos/pi2_forwarded",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi1",
+ "frequency": 10,
+ "max_size": 200
+ },
+ {
+ "name": "/aos/pi1",
"type": "aos.message_bridge.ServerStatistics",
"source_node": "pi1",
"frequency": 2
@@ -90,22 +130,6 @@
]
}
],
- "applications": [
- {
- "name": "pi2_message_bridge_client",
- "maps": [
- {
- "match": {
- "name": "/test",
- "type": "aos.examples.Ping"
- },
- "rename": {
- "name": "/test2"
- }
- }
- ]
- }
- ],
"maps": [
{
"match": {
@@ -124,6 +148,34 @@
"rename": {
"name": "/aos/pi2"
}
+ },
+ {
+ "match": {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos/pi1",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi1_forwarded"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos/pi2",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi2_forwarded"
+ }
}
]
}
diff --git a/aos/network/message_bridge_test_server.json b/aos/network/message_bridge_test_server.json
index eea92e7..35cedb6 100644
--- a/aos/network/message_bridge_test_server.json
+++ b/aos/network/message_bridge_test_server.json
@@ -11,7 +11,7 @@
{
"name": "pi2",
"hostname": "localhost",
- "port": 9971
+ "port": 9972
}
]
}
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
index 926e59b..5b6df3b 100644
--- a/aos/network/sctp_client.h
+++ b/aos/network/sctp_client.h
@@ -43,6 +43,8 @@
void LogSctpStatus(sctp_assoc_t assoc_id);
+ void set_max_size(size_t max_size) { max_size_ = max_size; }
+
private:
struct sockaddr_storage sockaddr_remote_;
struct sockaddr_storage sockaddr_local_;
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index 1ed816b..5af994e 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -175,9 +175,9 @@
memset(&inmessage, 0, sizeof(struct msghdr));
aos::unique_c_ptr<Message> result(
- reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size)));
+ reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size + 1)));
- iov.iov_len = max_size;
+ iov.iov_len = max_size + 1;
iov.iov_base = result->mutable_data();
inmessage.msg_iov = &iov;
@@ -193,6 +193,7 @@
PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
result->size = size;
+ CHECK_LE(size, max_size) << ": Message overflowed buffer.";
if ((MSG_NOTIFICATION & inmessage.msg_flags)) {
result->message_type = Message::kNotification;
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index 70d5b28..5fd9f53 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -63,8 +63,7 @@
PCHECK(listen(fd_, 100) == 0);
- PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size_,
- sizeof(max_size_)) == 0);
+ SetMaxSize(1000);
}
aos::unique_c_ptr<Message> SctpServer::Read() {
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index a3086d9..b702aa8 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -46,11 +46,19 @@
void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
uint16_t priority);
+ void SetMaxSize(size_t max_size) {
+ max_size_ = max_size;
+ // Have the kernel give us a factor of 10 more. This lets us have more than
+ // one full sized packet in flight.
+ max_size = max_size * 10;
+ PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size,
+ sizeof(max_size)) == 0);
+ }
+
private:
struct sockaddr_storage sockaddr_local_;
int fd_;
- // TODO(austin): Configure this.
size_t max_size_ = 1000;
int ppid_ = 1;
diff --git a/aos/network/timestamp.fbs b/aos/network/timestamp.fbs
new file mode 100644
index 0000000..299ecaf
--- /dev/null
+++ b/aos/network/timestamp.fbs
@@ -0,0 +1,15 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+table ClientOffset {
+ node:Node;
+
+ monotonic_offset:int64;
+}
+
+table Timestamp {
+ offsets:[ClientOffset];
+}
+
+root_type Timestamp;
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index b4280f6..6f2baca 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -24,6 +24,18 @@
// much about precision when solving for the global offset.
class TimestampFilter {
public:
+ // Forces the offset and time to the provided sample without filtering. Used
+ // for syncing with a remote filter calculation.
+ void Set(aos::monotonic_clock::time_point monotonic_now,
+ std::chrono::nanoseconds sample_ns) {
+ const double sample =
+ std::chrono::duration_cast<std::chrono::duration<double>>(sample_ns -
+ base_offset_)
+ .count();
+ offset_ = sample;
+ last_time_ = monotonic_now;
+ }
+
// Updates with a new sample. monotonic_now is the timestamp of the sample on
// the destination node, and sample_ns is destination_time - source_time.
void Sample(aos::monotonic_clock::time_point monotonic_now,
@@ -132,6 +144,13 @@
}
}
+ // Sets the forward sample without filtering. See FwdSample for more details.
+ void FwdSet(aos::monotonic_clock::time_point monotonic_now,
+ std::chrono::nanoseconds sample_ns) {
+ fwd_.Set(monotonic_now, sample_ns);
+ Update(monotonic_now, &last_fwd_time_);
+ }
+
// Adds a forward sample. sample_ns = destination - source; Forward samples
// are from A -> B.
void FwdSample(aos::monotonic_clock::time_point monotonic_now,
@@ -156,6 +175,13 @@
}
}
+ // Sets the forward sample without filtering. See FwdSample for more details.
+ void RevSet(aos::monotonic_clock::time_point monotonic_now,
+ std::chrono::nanoseconds sample_ns) {
+ rev_.Set(monotonic_now, sample_ns);
+ Update(monotonic_now, &last_rev_time_);
+ }
+
// Adds a reverse sample. sample_ns = destination - source; Reverse samples
// are B -> A.
void RevSample(aos::monotonic_clock::time_point monotonic_now,
@@ -214,6 +240,11 @@
last_rev_time_ = aos::monotonic_clock::min_time;
}
+ bool MissingSamples() {
+ return (last_fwd_time_ == aos::monotonic_clock::min_time) ||
+ (last_rev_time_ == aos::monotonic_clock::min_time);
+ }
+
private:
// Updates the offset estimate given the current time, and a pointer to the
// variable holding the last time.
@@ -226,7 +257,6 @@
const double hard_max = fwd_.offset();
const double hard_min = -rev_.offset();
const double average = (hard_max + hard_min) / 2.0;
- LOG(INFO) << "max " << hard_max << " min " << hard_min;
// We don't want to clip the offset to the hard min/max. We really want to
// keep it within a band around the middle. ratio of 0.5 means stay within
// +- 0.25 of the middle of the hard min and max.