Add helpers to figure out what is logged where
The logic to figure out what logger and what service should be logging
what is a bit annoying and tricky. Do it in a function so we can test
it easily.
Change-Id: Ia493fcabd7529a7235941d49462172c9988ce07f
diff --git a/aos/configuration.cc b/aos/configuration.cc
index b37c324..65c218e 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -370,12 +370,34 @@
<< " has an unknown \"source_node\"";
if (c->has_destination_nodes()) {
- for (const flatbuffers::String *n : *c->destination_nodes()) {
- CHECK(GetNode(&result.message(), n->string_view()) != nullptr)
+ for (const Connection *connection : *c->destination_nodes()) {
+ CHECK(connection->has_name());
+ CHECK(GetNode(&result.message(), connection->name()->string_view()) !=
+ nullptr)
<< ": Channel " << FlatbufferToJson(c)
- << " has an unknown \"destination_nodes\" " << n->string_view();
+ << " has an unknown \"destination_nodes\" "
+ << connection->name()->string_view();
- CHECK_NE(n->string_view(), c->source_node()->string_view())
+ switch (connection->timestamp_logger()) {
+ case LoggerConfig::LOCAL_LOGGER:
+ case LoggerConfig::NOT_LOGGED:
+ CHECK(!connection->has_timestamp_logger_node());
+ break;
+ case LoggerConfig::REMOTE_LOGGER:
+ case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+ CHECK(connection->has_timestamp_logger_node());
+ CHECK(
+ GetNode(&result.message(),
+ connection->timestamp_logger_node()->string_view()) !=
+ nullptr)
+ << ": Channel " << FlatbufferToJson(c)
+ << " has an unknown \"timestamp_logger_node\""
+ << connection->name()->string_view();
+ break;
+ }
+
+ CHECK_NE(connection->name()->string_view(),
+ c->source_node()->string_view())
<< ": Channel " << FlatbufferToJson(c)
<< " is forwarding data to itself";
}
@@ -609,8 +631,9 @@
return false;
}
- for (const flatbuffers::String *s : *channel->destination_nodes()) {
- if (s->string_view() == node->name()->string_view()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ CHECK(connection->has_name());
+ if (connection->name()->string_view() == node->name()->string_view()) {
return true;
}
}
@@ -618,5 +641,91 @@
return false;
}
+bool ChannelMessageIsLoggedOnNode(const Channel *channel, const Node *node) {
+ switch(channel->logger()) {
+ case LoggerConfig::LOCAL_LOGGER:
+ if (node == nullptr) {
+ // Single node world. If there is a local logger, then we want to use
+ // it.
+ return true;
+ }
+ return channel->source_node()->string_view() ==
+ node->name()->string_view();
+ case LoggerConfig::REMOTE_LOGGER:
+ CHECK(channel->has_logger_node());
+
+ return channel->logger_node()->string_view() ==
+ CHECK_NOTNULL(node)->name()->string_view();
+ case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+ CHECK(channel->has_logger_node());
+
+ if (channel->source_node()->string_view() ==
+ CHECK_NOTNULL(node)->name()->string_view()) {
+ return true;
+ }
+ if (channel->logger_node()->string_view() == node->name()->string_view()) {
+ return true;
+ }
+
+ return false;
+ case LoggerConfig::NOT_LOGGED:
+ return false;
+ }
+
+ LOG(FATAL) << "Unknown logger config " << static_cast<int>(channel->logger());
+}
+
+const Connection *ConnectionToNode(const Channel *channel, const Node *node) {
+ if (!channel->has_destination_nodes()) {
+ return nullptr;
+ }
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() == node->name()->string_view()) {
+ return connection;
+ }
+ }
+ return nullptr;
+}
+
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel,
+ const Node *node,
+ const Node *logger_node) {
+ const Connection *connection = ConnectionToNode(channel, node);
+ if (connection == nullptr) {
+ return false;
+ }
+ return ConnectionDeliveryTimeIsLoggedOnNode(connection, logger_node);
+}
+
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection,
+ const Node *node) {
+ switch (connection->timestamp_logger()) {
+ case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+ CHECK(connection->has_timestamp_logger_node());
+ if (connection->name()->string_view() == node->name()->string_view()) {
+ return true;
+ }
+
+ if (connection->timestamp_logger_node()->string_view() ==
+ node->name()->string_view()) {
+ return true;
+ }
+
+ return false;
+ case LoggerConfig::LOCAL_LOGGER:
+ return connection->name()->string_view() == node->name()->string_view();
+ case LoggerConfig::REMOTE_LOGGER:
+ CHECK(connection->has_timestamp_logger_node());
+
+ return connection->timestamp_logger_node()->string_view() ==
+ node->name()->string_view();
+ case LoggerConfig::NOT_LOGGED:
+ return false;
+ }
+
+ LOG(FATAL) << "Unknown logger config "
+ << static_cast<int>(connection->timestamp_logger());
+}
+
} // namespace configuration
} // namespace aos
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index cc1f950..42c404e 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -2,6 +2,46 @@
namespace aos;
+enum LoggerConfig : ubyte {
+ // This data should be logged on this node.
+ LOCAL_LOGGER,
+ // This data should be logged on a remote node.
+ REMOTE_LOGGER,
+ // This data should not be logged.
+ NOT_LOGGED,
+ // This data should be logged both on this node and on the remote node.
+ // This is useful where you want to log a message on both the sender and
+ // receiver to create self-contained log files.
+ LOCAL_AND_REMOTE_LOGGER
+}
+
+table Connection {
+ // Node name to forward to.
+ name:string;
+
+ // How the delivery timestamps for this connection should be logged. Do we
+ // log them with the local logger (i.e. the logger running on the node that
+ // this message is delivered to)? Do we log them on another node (a central
+ // logging node)? Do we log them in both places redundantly?
+ timestamp_logger:LoggerConfig = LOCAL_LOGGER;
+
+ // If the corresponding delivery timestamps for this channel are logged
+ // remotely, which node should be responsible for logging the data. Note:
+ // for now, this can only be the source node. Empty implies the node this
+ // connection is connecting to (i.e. name).
+ timestamp_logger_node:string;
+
+ // Priority to forward data with.
+ priority:ushort = 100;
+
+ // Time to live in nanoseconds before the message is dropped.
+ // A value of 0 means no timeout, i.e. reliable. When a client connects, the
+ // latest message from this channel will be sent regardless.
+ // TODO(austin): We can retry more than just the last message on reconnect
+ // if we want. This is an unlikely scenario though.
+ time_to_live:uint = 0;
+}
+
// Table representing a channel. Channels are where data is published and
// subscribed from. The tuple of name, type is the identifying information.
table Channel {
@@ -11,8 +51,8 @@
type:string;
// Max frequency in messages/sec of the data published on this channel.
frequency:int = 100;
- // Max size of the data being published. (This will be automatically
- // computed in the future.)
+ // Max size of the data being published. (This will hopefully be
+ // automatically computed in the future.)
max_size:int = 1000;
// Sets the maximum number of senders on a channel.
@@ -27,9 +67,17 @@
// If nodes is populated below, this needs to also be populated.
source_node:string;
- // The destination node names for data sent on this channel.
+ // The destination nodes for data sent on this channel.
// This only needs to be populated if this message is getting forwarded.
- destination_nodes:[string];
+ destination_nodes:[Connection];
+
+ // What service is responsible for logging this channel:
+ logger:LoggerConfig = LOCAL_LOGGER;
+ // If the channel is logged remotely, which node should be responsible for
+ // logging the data. Note: this requires that the data is forwarded to the
+ // node responsible for logging it. Empty implies the node this connection
+ // is connecting to (i.e. name).
+ logger_node:string;
}
// Table to support renaming channel names.
diff --git a/aos/configuration.h b/aos/configuration.h
index ab235a3..7bf8203 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -65,6 +65,18 @@
// provided node.
bool ChannelIsReadableOnNode(const Channel *channel, const Node *node);
+// Returns true if the message is supposed to be logged on this node.
+bool ChannelMessageIsLoggedOnNode(const Channel *channel, const Node *node);
+
+const Connection *ConnectionToNode(const Channel *channel, const Node *node);
+// Returns true if the delivery timestamps are supposed to be logged on this
+// node.
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel,
+ const Node *node,
+ const Node *logger_node);
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection,
+ const Node *node);
+
// Prints a channel to json, but without the schema.
std::string CleanedChannelToString(const Channel *channel);
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index ff0b570..fab6745 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -200,7 +200,7 @@
// Tests that our node writeable helpers work as intended.
TEST_F(ConfigurationTest, ChannelIsSendableOnNode) {
FlatbufferDetachedBuffer<Channel> good_channel(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "foo"
@@ -208,7 +208,7 @@
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Channel> bad_channel(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "bar"
@@ -216,7 +216,7 @@
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Node> node(JsonToFlatbuffer(
-R"node({
+ R"node({
"name": "foo"
})node",
Node::MiniReflectTypeTable()));
@@ -230,19 +230,23 @@
// Tests that our node readable and writeable helpers work as intended.
TEST_F(ConfigurationTest, ChannelIsReadableOnNode) {
FlatbufferDetachedBuffer<Channel> good_channel(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "bar",
"destination_nodes": [
- "baz",
- "foo",
+ {
+ "name": "baz"
+ },
+ {
+ "name": "foo"
+ }
]
})channel",
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Channel> bad_channel1(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "bar"
@@ -250,18 +254,20 @@
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Channel> bad_channel2(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "bar",
"destination_nodes": [
- "baz"
+ {
+ "name": "baz"
+ }
]
})channel",
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Node> node(JsonToFlatbuffer(
-R"node({
+ R"node({
"name": "foo"
})node",
Node::MiniReflectTypeTable()));
@@ -274,7 +280,293 @@
ChannelIsReadableOnNode(&bad_channel2.message(), &node.message()));
}
+// Tests that our node message is logged helpers work as intended.
+TEST_F(ConfigurationTest, ChannelMessageIsLoggedOnNode) {
+ FlatbufferDetachedBuffer<Channel> logged_on_self_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+ FlatbufferDetachedBuffer<Channel> not_logged_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "NOT_LOGGED",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "LOCAL_LOGGER"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_remote_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "REMOTE_LOGGER",
+ "logger_node": "baz",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_separate_logger_node_channel(
+ JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "REMOTE_LOGGER",
+ "logger_node": "foo",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_both_channel (
+ JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_node": "baz",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> foo_node(JsonToFlatbuffer(
+ R"node({
+ "name": "foo"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> bar_node(JsonToFlatbuffer(
+ R"node({
+ "name": "bar"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> baz_node(JsonToFlatbuffer(
+ R"node({
+ "name": "baz"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ // Local logger.
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+ &foo_node.message()));
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+ &bar_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+ &baz_node.message()));
+
+ // No logger.
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(¬_logged_channel.message(),
+ &foo_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(¬_logged_channel.message(),
+ &bar_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(¬_logged_channel.message(),
+ &baz_node.message()));
+
+ // Remote logger.
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+ &foo_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+ &bar_node.message()));
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+ &baz_node.message()));
+
+ // Separate logger.
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &foo_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &bar_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &baz_node.message()));
+
+ // Logged in multiple places.
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+ &foo_node.message()));
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+ &bar_node.message()));
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+ &baz_node.message()));
+}
+
+// Tests that our forwarding timestamps are logged helpers work as intended.
+TEST_F(ConfigurationTest, ConnectionDeliveryTimeIsLoggedOnNode) {
+ FlatbufferDetachedBuffer<Channel> logged_on_self_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "REMOTE_LOGGER",
+ "logger_node": "baz",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> not_logged_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "NOT_LOGGED",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "NOT_LOGGED"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_remote_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "bar"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_separate_logger_node_channel(
+ JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "REMOTE_LOGGER",
+ "logger_node": "foo",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "foo"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_both_channel (
+ JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_node": "bar"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> foo_node(JsonToFlatbuffer(
+ R"node({
+ "name": "foo"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> bar_node(JsonToFlatbuffer(
+ R"node({
+ "name": "bar"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> baz_node(JsonToFlatbuffer(
+ R"node({
+ "name": "baz"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ // Local logger.
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_self_channel.message(), &baz_node.message(),
+ &foo_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_self_channel.message(), &baz_node.message(),
+ &bar_node.message()));
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_self_channel.message(), &baz_node.message(),
+ &baz_node.message()));
+
+ // No logger means.
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ ¬_logged_channel.message(), &baz_node.message(), &foo_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ ¬_logged_channel.message(), &baz_node.message(), &bar_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ ¬_logged_channel.message(), &baz_node.message(), &baz_node.message()));
+
+ // Remote logger.
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_remote_channel.message(), &baz_node.message(),
+ &foo_node.message()));
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_remote_channel.message(), &baz_node.message(),
+ &bar_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_remote_channel.message(), &baz_node.message(),
+ &baz_node.message()));
+
+ // Separate logger.
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+ &foo_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+ &bar_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+ &baz_node.message()));
+
+ // Logged on both the node and a remote node.
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_both_channel.message(), &baz_node.message(),
+ &foo_node.message()));
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_both_channel.message(), &baz_node.message(),
+ &bar_node.message()));
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_both_channel.message(), &baz_node.message(),
+ &baz_node.message()));
+}
} // namespace testing
} // namespace configuration
} // namespace aos
diff --git a/aos/testdata/invalid_destination_node.json b/aos/testdata/invalid_destination_node.json
index 95dc6d3..1367d13 100644
--- a/aos/testdata/invalid_destination_node.json
+++ b/aos/testdata/invalid_destination_node.json
@@ -5,8 +5,12 @@
"type": ".aos.bar",
"source_node": "roborio",
"destination_nodes": [
- "dest",
- "trojan_horse"
+ {
+ "name": "dest"
+ },
+ {
+ "name": "trojan_horse"
+ }
]
}
],
diff --git a/aos/testdata/self_forward.json b/aos/testdata/self_forward.json
index 101b018..0e88cdf 100644
--- a/aos/testdata/self_forward.json
+++ b/aos/testdata/self_forward.json
@@ -5,7 +5,9 @@
"type": ".aos.bar",
"source_node": "roborio",
"destination_nodes": [
- "roborio"
+ {
+ "name": "roborio"
+ }
]
}
],