Merge changes I05202eff,I26f5d5ee,I04534a09,Ia8978a12
* changes:
Reject duplicate logger_nodes nodes
Merge Connections as well as Channels
Disallow logging timestamps without data
Don't crash on channels with timestamps but no data
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 2b9ac6d..314f82f 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -91,6 +91,18 @@
rhs.message().type()->string_view();
}
+bool operator<(const FlatbufferDetachedBuffer<Connection> &lhs,
+ const FlatbufferDetachedBuffer<Connection> &rhs) {
+ return lhs.message().name()->string_view() <
+ rhs.message().name()->string_view();
+}
+
+bool operator==(const FlatbufferDetachedBuffer<Connection> &lhs,
+ const FlatbufferDetachedBuffer<Connection> &rhs) {
+ return lhs.message().name()->string_view() ==
+ rhs.message().name()->string_view();
+}
+
bool operator==(const FlatbufferDetachedBuffer<Application> &lhs,
const FlatbufferDetachedBuffer<Application> &rhs) {
return lhs.message().name()->string_view() ==
@@ -337,6 +349,46 @@
<< ", can only use [-a-zA-Z0-9_/]";
}
+ if (c->has_logger_nodes()) {
+ // Confirm that we don't have duplicate logger nodes.
+ absl::btree_set<std::string_view> logger_nodes;
+ for (const flatbuffers::String *s : *c->logger_nodes()) {
+ logger_nodes.insert(s->string_view());
+ }
+ CHECK_EQ(static_cast<size_t>(logger_nodes.size()),
+ c->logger_nodes()->size())
+ << ": Found duplicate logger_nodes in "
+ << CleanedChannelToString(c);
+ }
+
+ if (c->has_destination_nodes()) {
+ // Confirm that we don't have duplicate timestamp logger nodes.
+ for (const Connection *d : *c->destination_nodes()) {
+ if (d->has_timestamp_logger_nodes()) {
+ absl::btree_set<std::string_view> timestamp_logger_nodes;
+ for (const flatbuffers::String *s : *d->timestamp_logger_nodes()) {
+ timestamp_logger_nodes.insert(s->string_view());
+ }
+ CHECK_EQ(static_cast<size_t>(timestamp_logger_nodes.size()),
+ d->timestamp_logger_nodes()->size())
+ << ": Found duplicate timestamp_logger_nodes in "
+ << CleanedChannelToString(c);
+ }
+ }
+
+ // There is no good use case today for logging timestamps but not the
+ // corresponding data. Instead of plumbing through all of this on the
+ // reader side, let'd just disallow it for now.
+ if (c->logger() == LoggerConfig::NOT_LOGGED) {
+ for (const Connection *d : *c->destination_nodes()) {
+ CHECK(d->timestamp_logger() == LoggerConfig::NOT_LOGGED)
+ << ": Logging timestamps without data is not supported. If "
+ "you have a good use case, let's talk. "
+ << CleanedChannelToString(c);
+ }
+ }
+ }
+
// Make sure everything is sorted while we are here... If this fails,
// there will be a bunch of weird errors.
if (last_channel != nullptr) {
@@ -482,8 +534,45 @@
auto result = channels.insert(RecursiveCopyFlatBuffer(c));
if (!result.second) {
// Already there, so merge the new table into the original.
- *result.first =
+ auto merged =
MergeFlatBuffers(*result.first, RecursiveCopyFlatBuffer(c));
+
+ if (merged.message().has_destination_nodes()) {
+ absl::btree_set<FlatbufferDetachedBuffer<Connection>> connections;
+ for (const Connection *connection :
+ *merged.message().destination_nodes()) {
+ auto connection_result =
+ connections.insert(RecursiveCopyFlatBuffer(connection));
+ if (!connection_result.second) {
+ *connection_result.first =
+ MergeFlatBuffers(*connection_result.first,
+ RecursiveCopyFlatBuffer(connection));
+ }
+ }
+ if (static_cast<size_t>(connections.size()) !=
+ merged.message().destination_nodes()->size()) {
+ merged.mutable_message()->clear_destination_nodes();
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+ std::vector<flatbuffers::Offset<Connection>> connection_offsets;
+ for (const FlatbufferDetachedBuffer<Connection> &connection :
+ connections) {
+ connection_offsets.push_back(
+ RecursiveCopyFlatBuffer(&connection.message(), &fbb));
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<Connection>>>
+ destination_nodes_offset = fbb.CreateVector(connection_offsets);
+ Channel::Builder channel_builder(fbb);
+ channel_builder.add_destination_nodes(destination_nodes_offset);
+ fbb.Finish(channel_builder.Finish());
+ FlatbufferDetachedBuffer<Channel> destinations_channel(
+ fbb.Release());
+ merged = MergeFlatBuffers(merged, destinations_channel);
+ }
+ }
+
+ *result.first = std::move(merged);
}
}
}
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index c650a5d..a73a34a 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -866,6 +866,37 @@
EXPECT_THAT(result, ::testing::ElementsAreArray({0, 1, 0, 0}));
}
+// Tests that we reject invalid logging configurations.
+TEST_F(ConfigurationDeathTest, InvalidLoggerConfig) {
+ EXPECT_DEATH(
+ {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig(ArtifactPath("aos/testdata/invalid_logging_configuration.json"));
+ },
+ "Logging timestamps without data");
+}
+
+// Tests that we reject duplicate timestamp destination node configurations.
+TEST_F(ConfigurationDeathTest, DuplicateTimestampDestinationNodes) {
+ EXPECT_DEATH(
+ {
+ FlatbufferDetachedBuffer<Configuration> config = ReadConfig(
+ ArtifactPath("aos/testdata/duplicate_destination_nodes.json"));
+ },
+ "Found duplicate timestamp_logger_nodes in");
+}
+
+// Tests that we reject duplicate logger node configurations for a channel's
+// data.
+TEST_F(ConfigurationDeathTest, DuplicateLoggerNodes) {
+ EXPECT_DEATH(
+ {
+ FlatbufferDetachedBuffer<Configuration> config = ReadConfig(
+ ArtifactPath("aos/testdata/duplicate_logger_nodes.json"));
+ },
+ "Found duplicate logger_nodes in");
+}
+
} // namespace testing
} // namespace configuration
} // namespace aos
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index f306492..169311e 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -586,9 +586,7 @@
event_loop, node,
logged_configuration()->channels()->Get(logged_channel_index));
- if (channel->logger() == LoggerConfig::NOT_LOGGED) {
- continue;
- }
+ const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
@@ -611,12 +609,13 @@
configuration::ChannelIsSendableOnNode(channel, node) &&
configuration::ConnectionCount(channel);
- state->SetChannel(logged_channel_index,
- configuration::ChannelIndex(configuration(), channel),
- event_loop ? event_loop->MakeRawSender(channel) : nullptr,
- filter, is_forwarded, source_state);
+ state->SetChannel(
+ logged_channel_index,
+ configuration::ChannelIndex(configuration(), channel),
+ event_loop && logged ? event_loop->MakeRawSender(channel) : nullptr,
+ filter, is_forwarded, source_state);
- if (is_forwarded) {
+ if (is_forwarded && logged) {
const Node *source_node = configuration::GetNode(
configuration(), channel->source_node()->string_view());
diff --git a/aos/testdata/BUILD b/aos/testdata/BUILD
index 2971664..82e67f9 100644
--- a/aos/testdata/BUILD
+++ b/aos/testdata/BUILD
@@ -10,6 +10,8 @@
"config2.json",
"config2_multinode.json",
"config3.json",
+ "duplicate_destination_nodes.json",
+ "duplicate_logger_nodes.json",
"expected.json",
"expected_merge_with.json",
"expected_multinode.json",
@@ -19,6 +21,7 @@
"invalid_channel_name2.json",
"invalid_channel_name3.json",
"invalid_destination_node.json",
+ "invalid_logging_configuration.json",
"invalid_nodes.json",
"invalid_source_node.json",
"self_forward.json",
diff --git a/aos/testdata/config1_multinode.json b/aos/testdata/config1_multinode.json
index f12d927..1110e64 100644
--- a/aos/testdata/config1_multinode.json
+++ b/aos/testdata/config1_multinode.json
@@ -7,7 +7,8 @@
"source_node": "pi2",
"destination_nodes": [
{
- "name": "pi1"
+ "name": "pi1",
+ "time_to_live": 5
}
]
},
diff --git a/aos/testdata/config2_multinode.json b/aos/testdata/config2_multinode.json
index d284ab2..6e8a667 100644
--- a/aos/testdata/config2_multinode.json
+++ b/aos/testdata/config2_multinode.json
@@ -4,7 +4,13 @@
"name": "/foo",
"type": ".aos.bar",
"max_size": 7,
- "source_node": "pi1"
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "time_to_live": 7
+ }
+ ]
},
{
"name": "/foo3",
diff --git a/aos/testdata/duplicate_destination_nodes.json b/aos/testdata/duplicate_destination_nodes.json
new file mode 100644
index 0000000..81814c3
--- /dev/null
+++ b/aos/testdata/duplicate_destination_nodes.json
@@ -0,0 +1,28 @@
+{
+ "channels": [
+ {
+ "name": "/foo",
+ "type": ".aos.bar",
+ "max_size": 5,
+ "source_node": "pi2",
+ "logger": "NOT_LOGGED",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2", "pi2"]
+ }
+ ]
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi1"
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2"
+ }
+ ]
+}
diff --git a/aos/testdata/duplicate_logger_nodes.json b/aos/testdata/duplicate_logger_nodes.json
new file mode 100644
index 0000000..4eed4ee
--- /dev/null
+++ b/aos/testdata/duplicate_logger_nodes.json
@@ -0,0 +1,34 @@
+{
+ "channels": [
+ {
+ "name": "/foo",
+ "type": ".aos.bar",
+ "max_size": 5,
+ "source_node": "pi2",
+ "logger": "REMOTE_LOGGER",
+ "logger_nodes": ["pi1", "pi2", "pi2"],
+ "destination_nodes": [
+ {
+ "name": "pi1"
+ },
+ {
+ "name": "pi3"
+ }
+ ]
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi1"
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2"
+ },
+ {
+ "name": "pi3",
+ "hostname": "raspberrypi3"
+ }
+ ]
+}
diff --git a/aos/testdata/expected_multinode.json b/aos/testdata/expected_multinode.json
index 0946007..c7e761a 100644
--- a/aos/testdata/expected_multinode.json
+++ b/aos/testdata/expected_multinode.json
@@ -7,7 +7,8 @@
"source_node": "pi2",
"destination_nodes": [
{
- "name": "pi1"
+ "name": "pi1",
+ "time_to_live": 5
}
]
},
diff --git a/aos/testdata/invalid_logging_configuration.json b/aos/testdata/invalid_logging_configuration.json
new file mode 100644
index 0000000..9213d8c
--- /dev/null
+++ b/aos/testdata/invalid_logging_configuration.json
@@ -0,0 +1,38 @@
+{
+ "channels": [
+ {
+ "name": "/foo",
+ "type": ".aos.bar",
+ "max_size": 5,
+ "source_node": "pi2",
+ "logger": "NOT_LOGGED",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
+ }
+ ]
+ },
+ {
+ "name": "/foo2",
+ "type": ".aos.bar",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2"
+ }
+ ]
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi1"
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2"
+ }
+ ]
+}