Merge "Add basic plotter for WebGL"
diff --git a/WORKSPACE b/WORKSPACE
index cbc390c..d43693f 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -549,3 +549,10 @@
name = "com_github_google_flatbuffers",
path = "third_party/flatbuffers",
)
+
+http_file(
+ name = "sample_logfile",
+ downloaded_file_path = "log.fbs",
+ sha256 = "91c98edee0c90a19992792c711dde4a6743af2d6d7e45b5079ec228fdf51ff11",
+ urls = ["http://www.frc971.org/Build-Dependencies/small_sample_logfile.fbs"],
+)
diff --git a/aos/BUILD b/aos/BUILD
index 6b3c5bc..e964066 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -1,5 +1,5 @@
load("//tools:environments.bzl", "mcu_cpus")
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_python_library")
filegroup(
name = "prime_binaries",
@@ -295,6 +295,19 @@
visibility = ["//visibility:public"],
)
+flatbuffer_python_library(
+ name = "configuration_fbs_python",
+ srcs = ["configuration.fbs"],
+ namespace = "aos",
+ tables = [
+ "Configuration",
+ "Channel",
+ "Map",
+ "Node",
+ ],
+ visibility = ["//visibility:public"],
+)
+
cc_library(
name = "configuration",
srcs = [
diff --git a/aos/configuration.cc b/aos/configuration.cc
index b37c324..65c218e 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -370,12 +370,34 @@
<< " has an unknown \"source_node\"";
if (c->has_destination_nodes()) {
- for (const flatbuffers::String *n : *c->destination_nodes()) {
- CHECK(GetNode(&result.message(), n->string_view()) != nullptr)
+ for (const Connection *connection : *c->destination_nodes()) {
+ CHECK(connection->has_name());
+ CHECK(GetNode(&result.message(), connection->name()->string_view()) !=
+ nullptr)
<< ": Channel " << FlatbufferToJson(c)
- << " has an unknown \"destination_nodes\" " << n->string_view();
+ << " has an unknown \"destination_nodes\" "
+ << connection->name()->string_view();
- CHECK_NE(n->string_view(), c->source_node()->string_view())
+ switch (connection->timestamp_logger()) {
+ case LoggerConfig::LOCAL_LOGGER:
+ case LoggerConfig::NOT_LOGGED:
+ CHECK(!connection->has_timestamp_logger_node());
+ break;
+ case LoggerConfig::REMOTE_LOGGER:
+ case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+ CHECK(connection->has_timestamp_logger_node());
+ CHECK(
+ GetNode(&result.message(),
+ connection->timestamp_logger_node()->string_view()) !=
+ nullptr)
+ << ": Channel " << FlatbufferToJson(c)
+ << " has an unknown \"timestamp_logger_node\""
+ << connection->name()->string_view();
+ break;
+ }
+
+ CHECK_NE(connection->name()->string_view(),
+ c->source_node()->string_view())
<< ": Channel " << FlatbufferToJson(c)
<< " is forwarding data to itself";
}
@@ -609,8 +631,9 @@
return false;
}
- for (const flatbuffers::String *s : *channel->destination_nodes()) {
- if (s->string_view() == node->name()->string_view()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ CHECK(connection->has_name());
+ if (connection->name()->string_view() == node->name()->string_view()) {
return true;
}
}
@@ -618,5 +641,91 @@
return false;
}
+bool ChannelMessageIsLoggedOnNode(const Channel *channel, const Node *node) {
+ switch(channel->logger()) {
+ case LoggerConfig::LOCAL_LOGGER:
+ if (node == nullptr) {
+ // Single node world. If there is a local logger, then we want to use
+ // it.
+ return true;
+ }
+ return channel->source_node()->string_view() ==
+ node->name()->string_view();
+ case LoggerConfig::REMOTE_LOGGER:
+ CHECK(channel->has_logger_node());
+
+ return channel->logger_node()->string_view() ==
+ CHECK_NOTNULL(node)->name()->string_view();
+ case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+ CHECK(channel->has_logger_node());
+
+ if (channel->source_node()->string_view() ==
+ CHECK_NOTNULL(node)->name()->string_view()) {
+ return true;
+ }
+ if (channel->logger_node()->string_view() == node->name()->string_view()) {
+ return true;
+ }
+
+ return false;
+ case LoggerConfig::NOT_LOGGED:
+ return false;
+ }
+
+ LOG(FATAL) << "Unknown logger config " << static_cast<int>(channel->logger());
+}
+
+const Connection *ConnectionToNode(const Channel *channel, const Node *node) {
+ if (!channel->has_destination_nodes()) {
+ return nullptr;
+ }
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() == node->name()->string_view()) {
+ return connection;
+ }
+ }
+ return nullptr;
+}
+
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel,
+ const Node *node,
+ const Node *logger_node) {
+ const Connection *connection = ConnectionToNode(channel, node);
+ if (connection == nullptr) {
+ return false;
+ }
+ return ConnectionDeliveryTimeIsLoggedOnNode(connection, logger_node);
+}
+
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection,
+ const Node *node) {
+ switch (connection->timestamp_logger()) {
+ case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+ CHECK(connection->has_timestamp_logger_node());
+ if (connection->name()->string_view() == node->name()->string_view()) {
+ return true;
+ }
+
+ if (connection->timestamp_logger_node()->string_view() ==
+ node->name()->string_view()) {
+ return true;
+ }
+
+ return false;
+ case LoggerConfig::LOCAL_LOGGER:
+ return connection->name()->string_view() == node->name()->string_view();
+ case LoggerConfig::REMOTE_LOGGER:
+ CHECK(connection->has_timestamp_logger_node());
+
+ return connection->timestamp_logger_node()->string_view() ==
+ node->name()->string_view();
+ case LoggerConfig::NOT_LOGGED:
+ return false;
+ }
+
+ LOG(FATAL) << "Unknown logger config "
+ << static_cast<int>(connection->timestamp_logger());
+}
+
} // namespace configuration
} // namespace aos
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index cc1f950..42c404e 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -2,6 +2,46 @@
namespace aos;
+enum LoggerConfig : ubyte {
+ // This data should be logged on this node.
+ LOCAL_LOGGER,
+ // This data should be logged on a remote node.
+ REMOTE_LOGGER,
+ // This data should not be logged.
+ NOT_LOGGED,
+ // This data should be logged both on this node and on the remote node.
+ // This is useful where you want to log a message on both the sender and
+ // receiver to create self-contained log files.
+ LOCAL_AND_REMOTE_LOGGER
+}
+
+table Connection {
+ // Node name to forward to.
+ name:string;
+
+ // How the delivery timestamps for this connection should be logged. Do we
+ // log them with the local logger (i.e. the logger running on the node that
+ // this message is delivered to)? Do we log them on another node (a central
+ // logging node)? Do we log them in both places redundantly?
+ timestamp_logger:LoggerConfig = LOCAL_LOGGER;
+
+ // If the corresponding delivery timestamps for this channel are logged
+ // remotely, which node should be responsible for logging the data. Note:
+ // for now, this can only be the source node. Empty implies the node this
+ // connection is connecting to (i.e. name).
+ timestamp_logger_node:string;
+
+ // Priority to forward data with.
+ priority:ushort = 100;
+
+ // Time to live in nanoseconds before the message is dropped.
+ // A value of 0 means no timeout, i.e. reliable. When a client connects, the
+ // latest message from this channel will be sent regardless.
+ // TODO(austin): We can retry more than just the last message on reconnect
+ // if we want. This is an unlikely scenario though.
+ time_to_live:uint = 0;
+}
+
// Table representing a channel. Channels are where data is published and
// subscribed from. The tuple of name, type is the identifying information.
table Channel {
@@ -11,8 +51,8 @@
type:string;
// Max frequency in messages/sec of the data published on this channel.
frequency:int = 100;
- // Max size of the data being published. (This will be automatically
- // computed in the future.)
+ // Max size of the data being published. (This will hopefully be
+ // automatically computed in the future.)
max_size:int = 1000;
// Sets the maximum number of senders on a channel.
@@ -27,9 +67,17 @@
// If nodes is populated below, this needs to also be populated.
source_node:string;
- // The destination node names for data sent on this channel.
+ // The destination nodes for data sent on this channel.
// This only needs to be populated if this message is getting forwarded.
- destination_nodes:[string];
+ destination_nodes:[Connection];
+
+ // What service is responsible for logging this channel:
+ logger:LoggerConfig = LOCAL_LOGGER;
+ // If the channel is logged remotely, which node should be responsible for
+ // logging the data. Note: this requires that the data is forwarded to the
+ // node responsible for logging it. Empty implies the node this connection
+ // is connecting to (i.e. name).
+ logger_node:string;
}
// Table to support renaming channel names.
diff --git a/aos/configuration.h b/aos/configuration.h
index ab235a3..7bf8203 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -65,6 +65,18 @@
// provided node.
bool ChannelIsReadableOnNode(const Channel *channel, const Node *node);
+// Returns true if the message is supposed to be logged on this node.
+bool ChannelMessageIsLoggedOnNode(const Channel *channel, const Node *node);
+
+const Connection *ConnectionToNode(const Channel *channel, const Node *node);
+// Returns true if the delivery timestamps are supposed to be logged on this
+// node.
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel,
+ const Node *node,
+ const Node *logger_node);
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection,
+ const Node *node);
+
// Prints a channel to json, but without the schema.
std::string CleanedChannelToString(const Channel *channel);
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index ff0b570..fab6745 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -200,7 +200,7 @@
// Tests that our node writeable helpers work as intended.
TEST_F(ConfigurationTest, ChannelIsSendableOnNode) {
FlatbufferDetachedBuffer<Channel> good_channel(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "foo"
@@ -208,7 +208,7 @@
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Channel> bad_channel(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "bar"
@@ -216,7 +216,7 @@
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Node> node(JsonToFlatbuffer(
-R"node({
+ R"node({
"name": "foo"
})node",
Node::MiniReflectTypeTable()));
@@ -230,19 +230,23 @@
// Tests that our node readable and writeable helpers work as intended.
TEST_F(ConfigurationTest, ChannelIsReadableOnNode) {
FlatbufferDetachedBuffer<Channel> good_channel(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "bar",
"destination_nodes": [
- "baz",
- "foo",
+ {
+ "name": "baz"
+ },
+ {
+ "name": "foo"
+ }
]
})channel",
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Channel> bad_channel1(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "bar"
@@ -250,18 +254,20 @@
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Channel> bad_channel2(JsonToFlatbuffer(
-R"channel({
+ R"channel({
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "bar",
"destination_nodes": [
- "baz"
+ {
+ "name": "baz"
+ }
]
})channel",
Channel::MiniReflectTypeTable()));
FlatbufferDetachedBuffer<Node> node(JsonToFlatbuffer(
-R"node({
+ R"node({
"name": "foo"
})node",
Node::MiniReflectTypeTable()));
@@ -274,7 +280,293 @@
ChannelIsReadableOnNode(&bad_channel2.message(), &node.message()));
}
+// Tests that our node message is logged helpers work as intended.
+TEST_F(ConfigurationTest, ChannelMessageIsLoggedOnNode) {
+ FlatbufferDetachedBuffer<Channel> logged_on_self_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+ FlatbufferDetachedBuffer<Channel> not_logged_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "NOT_LOGGED",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "LOCAL_LOGGER"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_remote_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "REMOTE_LOGGER",
+ "logger_node": "baz",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_separate_logger_node_channel(
+ JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "REMOTE_LOGGER",
+ "logger_node": "foo",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_both_channel (
+ JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_node": "baz",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> foo_node(JsonToFlatbuffer(
+ R"node({
+ "name": "foo"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> bar_node(JsonToFlatbuffer(
+ R"node({
+ "name": "bar"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> baz_node(JsonToFlatbuffer(
+ R"node({
+ "name": "baz"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ // Local logger.
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+ &foo_node.message()));
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+ &bar_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+ &baz_node.message()));
+
+ // No logger.
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(¬_logged_channel.message(),
+ &foo_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(¬_logged_channel.message(),
+ &bar_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(¬_logged_channel.message(),
+ &baz_node.message()));
+
+ // Remote logger.
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+ &foo_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+ &bar_node.message()));
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+ &baz_node.message()));
+
+ // Separate logger.
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &foo_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &bar_node.message()));
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &baz_node.message()));
+
+ // Logged in multiple places.
+ EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+ &foo_node.message()));
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+ &bar_node.message()));
+ EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+ &baz_node.message()));
+}
+
+// Tests that our forwarding timestamps are logged helpers work as intended.
+TEST_F(ConfigurationTest, ConnectionDeliveryTimeIsLoggedOnNode) {
+ FlatbufferDetachedBuffer<Channel> logged_on_self_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "REMOTE_LOGGER",
+ "logger_node": "baz",
+ "destination_nodes": [
+ {
+ "name": "baz"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> not_logged_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "NOT_LOGGED",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "NOT_LOGGED"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_remote_channel(JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "bar"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_separate_logger_node_channel(
+ JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "logger": "REMOTE_LOGGER",
+ "logger_node": "foo",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "foo"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Channel> logged_on_both_channel (
+ JsonToFlatbuffer(
+ R"channel({
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "bar",
+ "destination_nodes": [
+ {
+ "name": "baz",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_node": "bar"
+ }
+ ]
+})channel",
+ Channel::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> foo_node(JsonToFlatbuffer(
+ R"node({
+ "name": "foo"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> bar_node(JsonToFlatbuffer(
+ R"node({
+ "name": "bar"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ FlatbufferDetachedBuffer<Node> baz_node(JsonToFlatbuffer(
+ R"node({
+ "name": "baz"
+})node",
+ Node::MiniReflectTypeTable()));
+
+ // Local logger.
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_self_channel.message(), &baz_node.message(),
+ &foo_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_self_channel.message(), &baz_node.message(),
+ &bar_node.message()));
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_self_channel.message(), &baz_node.message(),
+ &baz_node.message()));
+
+ // No logger means.
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ ¬_logged_channel.message(), &baz_node.message(), &foo_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ ¬_logged_channel.message(), &baz_node.message(), &bar_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ ¬_logged_channel.message(), &baz_node.message(), &baz_node.message()));
+
+ // Remote logger.
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_remote_channel.message(), &baz_node.message(),
+ &foo_node.message()));
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_remote_channel.message(), &baz_node.message(),
+ &bar_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_remote_channel.message(), &baz_node.message(),
+ &baz_node.message()));
+
+ // Separate logger.
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+ &foo_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+ &bar_node.message()));
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+ &baz_node.message()));
+
+ // Logged on both the node and a remote node.
+ EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_both_channel.message(), &baz_node.message(),
+ &foo_node.message()));
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_both_channel.message(), &baz_node.message(),
+ &bar_node.message()));
+ EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+ &logged_on_both_channel.message(), &baz_node.message(),
+ &baz_node.message()));
+}
} // namespace testing
} // namespace configuration
} // namespace aos
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e35fe5b..1c2815d 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -269,6 +269,10 @@
QueueMessages();
}
+LogReader::~LogReader() {
+ CHECK(!event_loop_unique_ptr_) << "Did you remember to call Deregister?";
+}
+
bool LogReader::ReadBlock() {
if (end_of_file_) {
return false;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 497dabb..b1f6718 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -93,6 +93,7 @@
class LogReader {
public:
LogReader(absl::string_view filename);
+ ~LogReader();
// Registers the timer and senders used to resend the messages from the log
// file.
diff --git a/aos/testdata/invalid_destination_node.json b/aos/testdata/invalid_destination_node.json
index 95dc6d3..1367d13 100644
--- a/aos/testdata/invalid_destination_node.json
+++ b/aos/testdata/invalid_destination_node.json
@@ -5,8 +5,12 @@
"type": ".aos.bar",
"source_node": "roborio",
"destination_nodes": [
- "dest",
- "trojan_horse"
+ {
+ "name": "dest"
+ },
+ {
+ "name": "trojan_horse"
+ }
]
}
],
diff --git a/aos/testdata/self_forward.json b/aos/testdata/self_forward.json
index 101b018..0e88cdf 100644
--- a/aos/testdata/self_forward.json
+++ b/aos/testdata/self_forward.json
@@ -5,7 +5,9 @@
"type": ".aos.bar",
"source_node": "roborio",
"destination_nodes": [
- "roborio"
+ {
+ "name": "roborio"
+ }
]
}
],
diff --git a/debian/python.BUILD b/debian/python.BUILD
index 666f2d8..5e5d810 100644
--- a/debian/python.BUILD
+++ b/debian/python.BUILD
@@ -1,16 +1,20 @@
package(default_visibility = ["@//debian:__pkg__"])
cc_library(
- name = "python3.4_lib",
- hdrs = glob(["usr/include/python3.4m/**/*.h"]),
+ name = "python3.5_lib",
+ srcs = [
+ "usr/lib/x86_64-linux-gnu/libpython3.5m.so",
+ ],
+ hdrs = glob(["usr/include/**/*.h"]),
includes = [
- "usr/include/python3.4m/",
+ "usr/include/",
+ "usr/include/python3.5m/",
],
visibility = ["//visibility:public"],
)
cc_library(
- name = "python3.4_f2py",
+ name = "python3.5_f2py",
srcs = [
"usr/lib/python3/dist-packages/numpy/f2py/src/fortranobject.c",
],
@@ -26,7 +30,7 @@
],
visibility = ["//visibility:public"],
deps = [
- ":python3.4_lib",
+ ":python3.5_lib",
],
)
diff --git a/frc971/analysis/BUILD b/frc971/analysis/BUILD
index b45031f..b526712 100644
--- a/frc971/analysis/BUILD
+++ b/frc971/analysis/BUILD
@@ -21,3 +21,30 @@
srcs = ["__init__.py"],
deps = ["//frc971:python_init"],
)
+
+cc_binary(
+ name = "py_log_reader.so",
+ srcs = ["py_log_reader.cc"],
+ linkshared = True,
+ restricted_to = ["//tools:k8"],
+ deps = [
+ "//aos:configuration",
+ "//aos:json_to_flatbuffer",
+ "//aos/events:shm_event_loop",
+ "//aos/events:simulated_event_loop",
+ "//aos/events/logging:logger",
+ "@com_github_google_glog//:glog",
+ "@python_repo//:python3.5_lib",
+ ],
+)
+
+py_test(
+ name = "log_reader_test",
+ srcs = ["log_reader_test.py"],
+ data = [
+ ":py_log_reader.so",
+ "@sample_logfile//file",
+ ],
+ restricted_to = ["//tools:k8"],
+ deps = ["//aos:configuration_fbs_python"],
+)
diff --git a/frc971/analysis/log_reader_test.py b/frc971/analysis/log_reader_test.py
new file mode 100644
index 0000000..3389d00
--- /dev/null
+++ b/frc971/analysis/log_reader_test.py
@@ -0,0 +1,117 @@
+#!/usr/bin/python3
+import json
+import unittest
+
+from aos.Configuration import Configuration
+from frc971.analysis.py_log_reader import LogReader
+
+
+class LogReaderTest(unittest.TestCase):
+ def setUp(self):
+ self.reader = LogReader("external/sample_logfile/file/log.fbs")
+ # A list of all the channels in the logfile--this is used to confirm that
+ # we did indeed read the config correctly.
+ self.all_channels = [
+ ("/aos", "aos.JoystickState"), ("/aos", "aos.RobotState"),
+ ("/aos", "aos.timing.Report"), ("/aos", "frc971.PDPValues"),
+ ("/aos",
+ "frc971.wpilib.PneumaticsToLog"), ("/autonomous",
+ "aos.common.actions.Status"),
+ ("/autonomous", "frc971.autonomous.AutonomousMode"),
+ ("/autonomous", "frc971.autonomous.Goal"), ("/camera",
+ "y2019.CameraLog"),
+ ("/camera", "y2019.control_loops.drivetrain.CameraFrame"),
+ ("/drivetrain",
+ "frc971.IMUValues"), ("/drivetrain",
+ "frc971.control_loops.drivetrain.Goal"),
+ ("/drivetrain",
+ "frc971.control_loops.drivetrain.LocalizerControl"),
+ ("/drivetrain", "frc971.control_loops.drivetrain.Output"),
+ ("/drivetrain", "frc971.control_loops.drivetrain.Position"),
+ ("/drivetrain", "frc971.control_loops.drivetrain.Status"),
+ ("/drivetrain", "frc971.sensors.GyroReading"),
+ ("/drivetrain",
+ "y2019.control_loops.drivetrain.TargetSelectorHint"),
+ ("/superstructure",
+ "y2019.StatusLight"), ("/superstructure",
+ "y2019.control_loops.superstructure.Goal"),
+ ("/superstructure", "y2019.control_loops.superstructure.Output"),
+ ("/superstructure", "y2019.control_loops.superstructure.Position"),
+ ("/superstructure", "y2019.control_loops.superstructure.Status")
+ ]
+ # A channel that is known to have data on it which we will use for testing.
+ self.test_channel = ("/aos", "aos.timing.Report")
+ # A non-existent channel
+ self.bad_channel = ("/aos", "aos.timing.FooBar")
+
+ def test_do_nothing(self):
+ """Tests that we sanely handle doing nothing.
+
+ A previous iteration of the log reader seg faulted when doing this."""
+ pass
+
+ def test_read_config(self):
+ """Tests that we can read the configuration from the logfile."""
+ config_bytes = self.reader.configuration()
+ config = Configuration.GetRootAsConfiguration(config_bytes, 0)
+
+ channel_set = set(self.all_channels)
+ for ii in range(config.ChannelsLength()):
+ channel = config.Channels(ii)
+ # Will raise KeyError if the channel does not exist
+ channel_set.remove((channel.Name().decode("utf-8"),
+ channel.Type().decode("utf-8")))
+
+ self.assertEqual(0, len(channel_set))
+
+ def test_empty_process(self):
+ """Tests running process() without subscribing to anything succeeds."""
+ self.reader.process()
+ for channel in self.all_channels:
+ with self.assertRaises(ValueError) as context:
+ self.reader.get_data_for_channel(channel[0], channel[1])
+
+ def test_subscribe(self):
+ """Tests that we can subscribe to a channel and get data out."""
+ name = self.test_channel[0]
+ message_type = self.test_channel[1]
+ self.assertTrue(self.reader.subscribe(name, message_type))
+ self.reader.process()
+ data = self.reader.get_data_for_channel(name, message_type)
+ self.assertLess(100, len(data))
+ last_monotonic_time = 0
+ for entry in data:
+ monotonic_time = entry[0]
+ realtime_time = entry[1]
+ json_data = entry[2].replace('nan', '\"nan\"')
+ self.assertLess(last_monotonic_time, monotonic_time)
+ # Sanity check that the realtime times are in the correct range.
+ self.assertLess(1500000000e9, realtime_time)
+ self.assertGreater(2000000000e9, realtime_time)
+ parsed_json = json.loads(json_data)
+ self.assertIn("name", parsed_json)
+
+ last_monotonic_time = monotonic_time
+
+ def test_bad_subscribe(self):
+ """Tests that we return false when subscribing to a non-existent channel."""
+ self.assertFalse(
+ self.reader.subscribe(self.bad_channel[0], self.bad_channel[1]),
+ self.bad_channel)
+
+ def test_subscribe_after_process(self):
+ """Tests that an exception is thrown if we subscribe after calling process()."""
+ self.reader.process()
+ for channel in self.all_channels:
+ with self.assertRaises(RuntimeError) as context:
+ self.reader.subscribe(channel[0], channel[1])
+
+ def test_get_data_before_processj(self):
+ """Tests that an exception is thrown if we retrieve data before calling process()."""
+ for channel in self.all_channels:
+ with self.assertRaises(RuntimeError) as context:
+ self.reader.get_data_for_channel(channel[0], channel[1])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/frc971/analysis/py_log_reader.cc b/frc971/analysis/py_log_reader.cc
new file mode 100644
index 0000000..5a6fce4
--- /dev/null
+++ b/frc971/analysis/py_log_reader.cc
@@ -0,0 +1,283 @@
+// This file provides a Python module for reading logfiles. See
+// log_reader_test.py for usage.
+//
+// This reader works by having the user specify exactly what channels they want
+// data for. We then process the logfile and store all the data on that channel
+// into a list of timestamps + JSON message data. The user can then use an
+// accessor method (get_data_for_channel) to retrieve the cached data.
+
+// Defining PY_SSIZE_T_CLEAN seems to be suggested by most of the Python
+// documentation.
+#define PY_SSIZE_T_CLEAN
+// Note that Python.h needs to be included before anything else.
+#include <Python.h>
+
+#include <memory>
+
+#include "aos/configuration.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
+
+namespace frc971 {
+namespace analysis {
+namespace {
+
+// All the data corresponding to a single message.
+struct MessageData {
+ aos::monotonic_clock::time_point monotonic_sent_time;
+ aos::realtime_clock::time_point realtime_sent_time;
+ // JSON representation of the message.
+ std::string json_data;
+};
+
+// Data corresponding to an entire channel.
+struct ChannelData {
+ std::string name;
+ std::string type;
+ // Each message published on the channel, in order by monotonic time.
+ std::vector<MessageData> messages;
+};
+
+// All the objects that we need for managing reading a logfile.
+struct LogReaderTools {
+ std::unique_ptr<aos::logger::LogReader> reader;
+ std::unique_ptr<aos::SimulatedEventLoopFactory> event_loop_factory;
+ // Event loop to use for subscribing to buses.
+ std::unique_ptr<aos::EventLoop> event_loop;
+ std::vector<ChannelData> channel_data;
+ // Whether we have called process() on the reader yet.
+ bool processed = false;
+};
+
+struct LogReaderType {
+ PyObject_HEAD;
+ LogReaderTools *tools = nullptr;
+};
+
+void LogReader_dealloc(LogReaderType *self) {
+ LogReaderTools *tools = self->tools;
+ if (!tools->processed) {
+ tools->reader->Deregister();
+ }
+ delete tools;
+ Py_TYPE(self)->tp_free((PyObject *)self);
+}
+
+PyObject *LogReader_new(PyTypeObject *type, PyObject * /*args*/,
+ PyObject * /*kwds*/) {
+ LogReaderType *self;
+ self = (LogReaderType *)type->tp_alloc(type, 0);
+ if (self != nullptr) {
+ self->tools = new LogReaderTools();
+ if (self->tools == nullptr) {
+ return nullptr;
+ }
+ }
+ return (PyObject *)self;
+}
+
+int LogReader_init(LogReaderType *self, PyObject *args, PyObject *kwds) {
+ const char *kwlist[] = {"log_file_name", nullptr};
+
+ const char *log_file_name;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", const_cast<char **>(kwlist),
+ &log_file_name)) {
+ return -1;
+ }
+
+ LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+ tools->reader = std::make_unique<aos::logger::LogReader>(log_file_name);
+ tools->event_loop_factory = std::make_unique<aos::SimulatedEventLoopFactory>(
+ tools->reader->configuration());
+ tools->reader->Register(tools->event_loop_factory.get());
+
+ tools->event_loop = tools->event_loop_factory->MakeEventLoop("data_fetcher");
+ tools->event_loop->SkipTimingReport();
+
+ return 0;
+}
+
+PyObject *LogReader_get_data_for_channel(LogReaderType *self,
+ PyObject *args,
+ PyObject *kwds) {
+ const char *kwlist[] = {"name", "type", nullptr};
+
+ const char *name;
+ const char *type;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "ss",
+ const_cast<char **>(kwlist), &name, &type)) {
+ return nullptr;
+ }
+
+ LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+ if (!tools->processed) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "Called get_data_for_bus before calling process().");
+ return nullptr;
+ }
+
+ for (const auto &channel : tools->channel_data) {
+ if (channel.name == name && channel.type == type) {
+ PyObject *list = PyList_New(channel.messages.size());
+ for (size_t ii = 0; ii < channel.messages.size(); ++ii)
+ {
+ const auto &message = channel.messages[ii];
+ PyObject *monotonic_time = PyLong_FromLongLong(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ message.monotonic_sent_time.time_since_epoch())
+ .count());
+ PyObject *realtime_time = PyLong_FromLongLong(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ message.realtime_sent_time.time_since_epoch())
+ .count());
+ PyObject *json_data = PyUnicode_FromStringAndSize(
+ message.json_data.data(), message.json_data.size());
+ PyObject *entry =
+ PyTuple_Pack(3, monotonic_time, realtime_time, json_data);
+ if (PyList_SetItem(list, ii, entry) != 0) {
+ return nullptr;
+ }
+ }
+ return list;
+ }
+ }
+ PyErr_SetString(PyExc_ValueError,
+ "The provided channel was never subscribed to.");
+ return nullptr;
+}
+
+PyObject *LogReader_subscribe(LogReaderType *self, PyObject *args,
+ PyObject *kwds) {
+ const char *kwlist[] = {"name", "type", nullptr};
+
+ const char *name;
+ const char *type;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "ss",
+ const_cast<char **>(kwlist), &name, &type)) {
+ return nullptr;
+ }
+
+ LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+ if (tools->processed) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "Called subscribe after calling process().");
+ return nullptr;
+ }
+
+ const aos::Channel *const channel = aos::configuration::GetChannel(
+ tools->reader->configuration(), name, type, "", nullptr);
+ if (channel == nullptr) {
+ return Py_False;
+ }
+ const int index = tools->channel_data.size();
+ tools->channel_data.push_back(
+ {.name = name, .type = type, .messages = {}});
+ tools->event_loop->MakeRawWatcher(
+ channel, [channel, index, tools](const aos::Context &context,
+ const void *message) {
+ tools->channel_data[index].messages.push_back(
+ {.monotonic_sent_time = context.monotonic_event_time,
+ .realtime_sent_time = context.realtime_event_time,
+ .json_data = aos::FlatbufferToJson(
+ channel->schema(), static_cast<const uint8_t *>(message))});
+ });
+ return Py_True;
+}
+
+static PyObject *LogReader_process(LogReaderType *self,
+ PyObject *Py_UNUSED(ignored)) {
+ LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+ if (tools->processed) {
+ PyErr_SetString(PyExc_RuntimeError, "process() may only be called once.");
+ return nullptr;
+ }
+
+ tools->processed = true;
+
+ tools->event_loop_factory->Run();
+ tools->reader->Deregister();
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *LogReader_configuration(LogReaderType *self,
+ PyObject *Py_UNUSED(ignored)) {
+ LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+ // I have no clue if the Configuration that we get from the log reader is in a
+ // contiguous chunk of memory, and I'm too lazy to either figure it out or
+ // figure out how to extract the actual data buffer + offset.
+ // Instead, copy the flatbuffer and return a copy of the new buffer.
+ aos::FlatbufferDetachedBuffer<aos::Configuration> buffer =
+ aos::CopyFlatBuffer(tools->reader->configuration());
+
+ return PyBytes_FromStringAndSize(
+ reinterpret_cast<const char *>(buffer.data()), buffer.size());
+}
+
+static PyMethodDef LogReader_methods[] = {
+ {"configuration", (PyCFunction)LogReader_configuration, METH_NOARGS,
+ "Return a bytes buffer for the Configuration of the logfile."},
+ {"process", (PyCFunction)LogReader_process, METH_NOARGS,
+ "Processes the logfile and all the subscribed to channels."},
+ {"subscribe", (PyCFunction)LogReader_subscribe,
+ METH_VARARGS | METH_KEYWORDS,
+ "Attempts to subscribe to the provided channel name + type. Returns True "
+ "if successful."},
+ {"get_data_for_channel", (PyCFunction)LogReader_get_data_for_channel,
+ METH_VARARGS | METH_KEYWORDS,
+ "Returns the logged data for a given channel. Raises an exception if you "
+ "did not subscribe to the provided channel. Returned data is a list of "
+ "tuples where each tuple is of the form (monotonic_nsec, realtime_nsec, "
+ "json_message_data)."},
+ {nullptr, 0, 0, nullptr} /* Sentinel */
+};
+
+static PyTypeObject LogReaderType = {
+ PyVarObject_HEAD_INIT(NULL, 0).tp_name = "py_log_reader.LogReader",
+ .tp_doc = "LogReader objects",
+ .tp_basicsize = sizeof(LogReaderType),
+ .tp_itemsize = 0,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
+ .tp_new = LogReader_new,
+ .tp_init = (initproc)LogReader_init,
+ .tp_dealloc = (destructor)LogReader_dealloc,
+ .tp_methods = LogReader_methods,
+};
+
+static PyModuleDef log_reader_module = {
+ PyModuleDef_HEAD_INIT,
+ .m_name = "py_log_reader",
+ .m_doc = "Example module that creates an extension type.",
+ .m_size = -1,
+};
+
+PyObject *InitModule() {
+ PyObject *m;
+ if (PyType_Ready(&LogReaderType) < 0) return nullptr;
+
+ m = PyModule_Create(&log_reader_module);
+ if (m == nullptr) return nullptr;
+
+ Py_INCREF(&LogReaderType);
+ if (PyModule_AddObject(m, "LogReader", (PyObject *)&LogReaderType) < 0) {
+ Py_DECREF(&LogReaderType);
+ Py_DECREF(m);
+ return nullptr;
+ }
+
+ return m;
+}
+
+} // namespace
+} // namespace analysis
+} // namespace frc971
+
+PyMODINIT_FUNC PyInit_py_log_reader(void) {
+ return frc971::analysis::InitModule();
+}