Merge "Add basic plotter for WebGL"
diff --git a/WORKSPACE b/WORKSPACE
index cbc390c..d43693f 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -549,3 +549,10 @@
     name = "com_github_google_flatbuffers",
     path = "third_party/flatbuffers",
 )
+
+http_file(
+    name = "sample_logfile",
+    downloaded_file_path = "log.fbs",
+    sha256 = "91c98edee0c90a19992792c711dde4a6743af2d6d7e45b5079ec228fdf51ff11",
+    urls = ["http://www.frc971.org/Build-Dependencies/small_sample_logfile.fbs"],
+)
diff --git a/aos/BUILD b/aos/BUILD
index 6b3c5bc..e964066 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -1,5 +1,5 @@
 load("//tools:environments.bzl", "mcu_cpus")
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_python_library")
 
 filegroup(
     name = "prime_binaries",
@@ -295,6 +295,19 @@
     visibility = ["//visibility:public"],
 )
 
+flatbuffer_python_library(
+    name = "configuration_fbs_python",
+    srcs = ["configuration.fbs"],
+    namespace = "aos",
+    tables = [
+        "Configuration",
+        "Channel",
+        "Map",
+        "Node",
+    ],
+    visibility = ["//visibility:public"],
+)
+
 cc_library(
     name = "configuration",
     srcs = [
diff --git a/aos/configuration.cc b/aos/configuration.cc
index b37c324..65c218e 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -370,12 +370,34 @@
           << " has an unknown \"source_node\"";
 
       if (c->has_destination_nodes()) {
-        for (const flatbuffers::String *n : *c->destination_nodes()) {
-          CHECK(GetNode(&result.message(), n->string_view()) != nullptr)
+        for (const Connection *connection : *c->destination_nodes()) {
+          CHECK(connection->has_name());
+          CHECK(GetNode(&result.message(), connection->name()->string_view()) !=
+                nullptr)
               << ": Channel " << FlatbufferToJson(c)
-              << " has an unknown \"destination_nodes\" " << n->string_view();
+              << " has an unknown \"destination_nodes\" "
+              << connection->name()->string_view();
 
-          CHECK_NE(n->string_view(), c->source_node()->string_view())
+          switch (connection->timestamp_logger()) {
+            case LoggerConfig::LOCAL_LOGGER:
+            case LoggerConfig::NOT_LOGGED:
+              CHECK(!connection->has_timestamp_logger_node());
+              break;
+            case LoggerConfig::REMOTE_LOGGER:
+            case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+              CHECK(connection->has_timestamp_logger_node());
+              CHECK(
+                  GetNode(&result.message(),
+                          connection->timestamp_logger_node()->string_view()) !=
+                  nullptr)
+                  << ": Channel " << FlatbufferToJson(c)
+                  << " has an unknown \"timestamp_logger_node\""
+                  << connection->name()->string_view();
+              break;
+          }
+
+          CHECK_NE(connection->name()->string_view(),
+                   c->source_node()->string_view())
               << ": Channel " << FlatbufferToJson(c)
               << " is forwarding data to itself";
         }
@@ -609,8 +631,9 @@
     return false;
   }
 
-  for (const flatbuffers::String *s : *channel->destination_nodes()) {
-    if (s->string_view() == node->name()->string_view()) {
+  for (const Connection *connection : *channel->destination_nodes()) {
+    CHECK(connection->has_name());
+    if (connection->name()->string_view() == node->name()->string_view()) {
       return true;
     }
   }
@@ -618,5 +641,91 @@
   return false;
 }
 
+bool ChannelMessageIsLoggedOnNode(const Channel *channel, const Node *node) {
+  switch(channel->logger()) {
+    case LoggerConfig::LOCAL_LOGGER:
+      if (node == nullptr) {
+        // Single node world.  If there is a local logger, then we want to use
+        // it.
+        return true;
+      }
+      return channel->source_node()->string_view() ==
+             node->name()->string_view();
+    case LoggerConfig::REMOTE_LOGGER:
+      CHECK(channel->has_logger_node());
+
+      return channel->logger_node()->string_view() ==
+             CHECK_NOTNULL(node)->name()->string_view();
+    case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+      CHECK(channel->has_logger_node());
+
+      if (channel->source_node()->string_view() ==
+          CHECK_NOTNULL(node)->name()->string_view()) {
+        return true;
+      }
+      if (channel->logger_node()->string_view() == node->name()->string_view()) {
+        return true;
+      }
+
+      return false;
+    case LoggerConfig::NOT_LOGGED:
+      return false;
+  }
+
+  LOG(FATAL) << "Unknown logger config " << static_cast<int>(channel->logger());
+}
+
+const Connection *ConnectionToNode(const Channel *channel, const Node *node) {
+  if (!channel->has_destination_nodes()) {
+    return nullptr;
+  }
+  for (const Connection *connection : *channel->destination_nodes()) {
+    if (connection->name()->string_view() == node->name()->string_view()) {
+      return connection;
+    }
+  }
+  return nullptr;
+}
+
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel,
+                                          const Node *node,
+                                          const Node *logger_node) {
+  const Connection *connection = ConnectionToNode(channel, node);
+  if (connection == nullptr) {
+    return false;
+  }
+  return ConnectionDeliveryTimeIsLoggedOnNode(connection, logger_node);
+}
+
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection,
+                                          const Node *node) {
+  switch (connection->timestamp_logger()) {
+    case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+      CHECK(connection->has_timestamp_logger_node());
+      if (connection->name()->string_view() == node->name()->string_view()) {
+        return true;
+      }
+
+      if (connection->timestamp_logger_node()->string_view() ==
+          node->name()->string_view()) {
+        return true;
+      }
+
+      return false;
+    case LoggerConfig::LOCAL_LOGGER:
+      return connection->name()->string_view() == node->name()->string_view();
+    case LoggerConfig::REMOTE_LOGGER:
+      CHECK(connection->has_timestamp_logger_node());
+
+      return connection->timestamp_logger_node()->string_view() ==
+             node->name()->string_view();
+    case LoggerConfig::NOT_LOGGED:
+      return false;
+  }
+
+  LOG(FATAL) << "Unknown logger config "
+             << static_cast<int>(connection->timestamp_logger());
+}
+
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index cc1f950..42c404e 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -2,6 +2,46 @@
 
 namespace aos;
 
+enum LoggerConfig : ubyte {
+  // This data should be logged on this node.
+  LOCAL_LOGGER,
+  // This data should be logged on a remote node.
+  REMOTE_LOGGER,
+  // This data should not be logged.
+  NOT_LOGGED,
+  // This data should be logged both on this node and on the remote node.
+  // This is useful where you want to log a message on both the sender and
+  // receiver to create self-contained log files.
+  LOCAL_AND_REMOTE_LOGGER
+}
+
+table Connection {
+  // Node name to forward to.
+  name:string;
+
+  // How the delivery timestamps for this connection should be logged.  Do we
+  // log them with the local logger (i.e. the logger running on the node that
+  // this message is delivered to)?  Do we log them on another node (a central
+  // logging node)?  Do we log them in both places redundantly?
+  timestamp_logger:LoggerConfig = LOCAL_LOGGER;
+
+  // If the corresponding delivery timestamps for this channel are logged
+  // remotely, which node should be responsible for logging the data.  Note:
+  // for now, this can only be the source node.  Empty implies the node this
+  // connection is connecting to (i.e. name).
+  timestamp_logger_node:string;
+
+  // Priority to forward data with.
+  priority:ushort = 100;
+
+  // Time to live in nanoseconds before the message is dropped.
+  // A value of 0 means no timeout, i.e. reliable.  When a client connects, the
+  // latest message from this channel will be sent regardless.
+  // TODO(austin): We can retry more than just the last message on reconnect
+  //   if we want.  This is an unlikely scenario though.
+  time_to_live:uint = 0;
+}
+
 // Table representing a channel.  Channels are where data is published and
 // subscribed from.  The tuple of name, type is the identifying information.
 table Channel {
@@ -11,8 +51,8 @@
   type:string;
   // Max frequency in messages/sec of the data published on this channel.
   frequency:int = 100;
-  // Max size of the data being published.  (This will be automatically
-  // computed in the future.)
+  // Max size of the data being published.  (This will hopefully be
+  // automatically computed in the future.)
   max_size:int = 1000;
 
   // Sets the maximum number of senders on a channel.
@@ -27,9 +67,17 @@
   // If nodes is populated below, this needs to also be populated.
   source_node:string;
 
-  // The destination node names for data sent on this channel.
+  // The destination nodes for data sent on this channel.
   // This only needs to be populated if this message is getting forwarded.
-  destination_nodes:[string];
+  destination_nodes:[Connection];
+
+  // What service is responsible for logging this channel:
+  logger:LoggerConfig = LOCAL_LOGGER;
+  // If the channel is logged remotely, which node should be responsible for
+  // logging the data.  Note: this requires that the data is forwarded to the
+  // node responsible for logging it.  Empty implies the node this connection
+  // is connecting to (i.e. name).
+  logger_node:string;
 }
 
 // Table to support renaming channel names.
diff --git a/aos/configuration.h b/aos/configuration.h
index ab235a3..7bf8203 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -65,6 +65,18 @@
 // provided node.
 bool ChannelIsReadableOnNode(const Channel *channel, const Node *node);
 
+// Returns true if the message is supposed to be logged on this node.
+bool ChannelMessageIsLoggedOnNode(const Channel *channel, const Node *node);
+
+const Connection *ConnectionToNode(const Channel *channel, const Node *node);
+// Returns true if the delivery timestamps are supposed to be logged on this
+// node.
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel,
+                                          const Node *node,
+                                          const Node *logger_node);
+bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection,
+                                          const Node *node);
+
 // Prints a channel to json, but without the schema.
 std::string CleanedChannelToString(const Channel *channel);
 
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index ff0b570..fab6745 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -200,7 +200,7 @@
 // Tests that our node writeable helpers work as intended.
 TEST_F(ConfigurationTest, ChannelIsSendableOnNode) {
   FlatbufferDetachedBuffer<Channel> good_channel(JsonToFlatbuffer(
-R"channel({
+      R"channel({
   "name": "/test",
   "type": "aos.examples.Ping",
   "source_node": "foo"
@@ -208,7 +208,7 @@
       Channel::MiniReflectTypeTable()));
 
   FlatbufferDetachedBuffer<Channel> bad_channel(JsonToFlatbuffer(
-R"channel({
+      R"channel({
   "name": "/test",
   "type": "aos.examples.Ping",
   "source_node": "bar"
@@ -216,7 +216,7 @@
       Channel::MiniReflectTypeTable()));
 
   FlatbufferDetachedBuffer<Node> node(JsonToFlatbuffer(
-R"node({
+      R"node({
   "name": "foo"
 })node",
       Node::MiniReflectTypeTable()));
@@ -230,19 +230,23 @@
 // Tests that our node readable and writeable helpers work as intended.
 TEST_F(ConfigurationTest, ChannelIsReadableOnNode) {
   FlatbufferDetachedBuffer<Channel> good_channel(JsonToFlatbuffer(
-R"channel({
+      R"channel({
   "name": "/test",
   "type": "aos.examples.Ping",
   "source_node": "bar",
   "destination_nodes": [
-    "baz",
-    "foo",
+    {
+      "name": "baz"
+    },
+    {
+      "name": "foo"
+    }
   ]
 })channel",
       Channel::MiniReflectTypeTable()));
 
   FlatbufferDetachedBuffer<Channel> bad_channel1(JsonToFlatbuffer(
-R"channel({
+      R"channel({
   "name": "/test",
   "type": "aos.examples.Ping",
   "source_node": "bar"
@@ -250,18 +254,20 @@
       Channel::MiniReflectTypeTable()));
 
   FlatbufferDetachedBuffer<Channel> bad_channel2(JsonToFlatbuffer(
-R"channel({
+      R"channel({
   "name": "/test",
   "type": "aos.examples.Ping",
   "source_node": "bar",
   "destination_nodes": [
-    "baz"
+    {
+      "name": "baz"
+    }
   ]
 })channel",
       Channel::MiniReflectTypeTable()));
 
   FlatbufferDetachedBuffer<Node> node(JsonToFlatbuffer(
-R"node({
+      R"node({
   "name": "foo"
 })node",
       Node::MiniReflectTypeTable()));
@@ -274,7 +280,293 @@
       ChannelIsReadableOnNode(&bad_channel2.message(), &node.message()));
 }
 
+// Tests that our node message is logged helpers work as intended.
+TEST_F(ConfigurationTest, ChannelMessageIsLoggedOnNode) {
+  FlatbufferDetachedBuffer<Channel> logged_on_self_channel(JsonToFlatbuffer(
+      R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "destination_nodes": [
+    {
+      "name": "baz"
+    }
+  ]
+})channel",
+      Channel::MiniReflectTypeTable()));
 
+  FlatbufferDetachedBuffer<Channel> not_logged_channel(JsonToFlatbuffer(
+      R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "logger": "NOT_LOGGED",
+  "destination_nodes": [
+    {
+      "name": "baz",
+      "timestamp_logger": "LOCAL_LOGGER"
+    }
+  ]
+})channel",
+      Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Channel> logged_on_remote_channel(JsonToFlatbuffer(
+      R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "logger": "REMOTE_LOGGER",
+  "logger_node": "baz",
+  "destination_nodes": [
+    {
+      "name": "baz"
+    }
+  ]
+})channel",
+      Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Channel> logged_on_separate_logger_node_channel(
+      JsonToFlatbuffer(
+          R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "logger": "REMOTE_LOGGER",
+  "logger_node": "foo",
+  "destination_nodes": [
+    {
+      "name": "baz"
+    }
+  ]
+})channel",
+          Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Channel> logged_on_both_channel (
+      JsonToFlatbuffer(
+          R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "logger": "LOCAL_AND_REMOTE_LOGGER",
+  "logger_node": "baz",
+  "destination_nodes": [
+    {
+      "name": "baz"
+    }
+  ]
+})channel",
+          Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Node> foo_node(JsonToFlatbuffer(
+      R"node({
+  "name": "foo"
+})node",
+      Node::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Node> bar_node(JsonToFlatbuffer(
+      R"node({
+  "name": "bar"
+})node",
+      Node::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Node> baz_node(JsonToFlatbuffer(
+      R"node({
+  "name": "baz"
+})node",
+      Node::MiniReflectTypeTable()));
+
+  // Local logger.
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+                                            &foo_node.message()));
+  EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+                                           &bar_node.message()));
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_self_channel.message(),
+                                            &baz_node.message()));
+
+  // No logger.
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&not_logged_channel.message(),
+                                            &foo_node.message()));
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&not_logged_channel.message(),
+                                           &bar_node.message()));
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&not_logged_channel.message(),
+                                            &baz_node.message()));
+
+  // Remote logger.
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+                                            &foo_node.message()));
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+                                            &bar_node.message()));
+  EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_remote_channel.message(),
+                                           &baz_node.message()));
+
+  // Separate logger.
+  EXPECT_TRUE(ChannelMessageIsLoggedOnNode(
+      &logged_on_separate_logger_node_channel.message(), &foo_node.message()));
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(
+      &logged_on_separate_logger_node_channel.message(), &bar_node.message()));
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(
+      &logged_on_separate_logger_node_channel.message(), &baz_node.message()));
+
+  // Logged in multiple places.
+  EXPECT_FALSE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+                                            &foo_node.message()));
+  EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+                                           &bar_node.message()));
+  EXPECT_TRUE(ChannelMessageIsLoggedOnNode(&logged_on_both_channel.message(),
+                                           &baz_node.message()));
+}
+
+// Tests that our forwarding timestamps are logged helpers work as intended.
+TEST_F(ConfigurationTest, ConnectionDeliveryTimeIsLoggedOnNode) {
+  FlatbufferDetachedBuffer<Channel> logged_on_self_channel(JsonToFlatbuffer(
+      R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "logger": "REMOTE_LOGGER",
+  "logger_node": "baz",
+  "destination_nodes": [
+    {
+      "name": "baz"
+    }
+  ]
+})channel",
+      Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Channel> not_logged_channel(JsonToFlatbuffer(
+      R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "logger": "NOT_LOGGED",
+  "destination_nodes": [
+    {
+      "name": "baz",
+      "timestamp_logger": "NOT_LOGGED"
+    }
+  ]
+})channel",
+      Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Channel> logged_on_remote_channel(JsonToFlatbuffer(
+      R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "destination_nodes": [
+    {
+      "name": "baz",
+      "timestamp_logger": "REMOTE_LOGGER",
+      "timestamp_logger_node": "bar"
+    }
+  ]
+})channel",
+      Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Channel> logged_on_separate_logger_node_channel(
+      JsonToFlatbuffer(
+          R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "logger": "REMOTE_LOGGER",
+  "logger_node": "foo",
+  "destination_nodes": [
+    {
+      "name": "baz",
+      "timestamp_logger": "REMOTE_LOGGER",
+      "timestamp_logger_node": "foo"
+    }
+  ]
+})channel",
+          Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Channel> logged_on_both_channel (
+      JsonToFlatbuffer(
+          R"channel({
+  "name": "/test",
+  "type": "aos.examples.Ping",
+  "source_node": "bar",
+  "destination_nodes": [
+    {
+      "name": "baz",
+      "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+      "timestamp_logger_node": "bar"
+    }
+  ]
+})channel",
+          Channel::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Node> foo_node(JsonToFlatbuffer(
+      R"node({
+  "name": "foo"
+})node",
+      Node::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Node> bar_node(JsonToFlatbuffer(
+      R"node({
+  "name": "bar"
+})node",
+      Node::MiniReflectTypeTable()));
+
+  FlatbufferDetachedBuffer<Node> baz_node(JsonToFlatbuffer(
+      R"node({
+  "name": "baz"
+})node",
+      Node::MiniReflectTypeTable()));
+
+  // Local logger.
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_self_channel.message(), &baz_node.message(),
+      &foo_node.message()));
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_self_channel.message(), &baz_node.message(),
+      &bar_node.message()));
+  EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_self_channel.message(), &baz_node.message(),
+      &baz_node.message()));
+
+  // No logger means.
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &not_logged_channel.message(), &baz_node.message(), &foo_node.message()));
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &not_logged_channel.message(), &baz_node.message(), &bar_node.message()));
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &not_logged_channel.message(), &baz_node.message(), &baz_node.message()));
+
+  // Remote logger.
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_remote_channel.message(), &baz_node.message(),
+      &foo_node.message()));
+  EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_remote_channel.message(), &baz_node.message(),
+      &bar_node.message()));
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_remote_channel.message(), &baz_node.message(),
+      &baz_node.message()));
+
+  // Separate logger.
+  EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+      &foo_node.message()));
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+      &bar_node.message()));
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_separate_logger_node_channel.message(), &baz_node.message(),
+      &baz_node.message()));
+
+  // Logged on both the node and a remote node.
+  EXPECT_FALSE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_both_channel.message(), &baz_node.message(),
+      &foo_node.message()));
+  EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_both_channel.message(), &baz_node.message(),
+      &bar_node.message()));
+  EXPECT_TRUE(ConnectionDeliveryTimeIsLoggedOnNode(
+      &logged_on_both_channel.message(), &baz_node.message(),
+      &baz_node.message()));
+}
 }  // namespace testing
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e35fe5b..1c2815d 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -269,6 +269,10 @@
   QueueMessages();
 }
 
+LogReader::~LogReader() {
+  CHECK(!event_loop_unique_ptr_) << "Did you remember to call Deregister?";
+}
+
 bool LogReader::ReadBlock() {
   if (end_of_file_) {
     return false;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 497dabb..b1f6718 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -93,6 +93,7 @@
 class LogReader {
  public:
   LogReader(absl::string_view filename);
+  ~LogReader();
 
   // Registers the timer and senders used to resend the messages from the log
   // file.
diff --git a/aos/testdata/invalid_destination_node.json b/aos/testdata/invalid_destination_node.json
index 95dc6d3..1367d13 100644
--- a/aos/testdata/invalid_destination_node.json
+++ b/aos/testdata/invalid_destination_node.json
@@ -5,8 +5,12 @@
       "type": ".aos.bar",
       "source_node": "roborio",
       "destination_nodes": [
-        "dest",
-        "trojan_horse"
+        {
+          "name": "dest"
+        },
+        {
+          "name": "trojan_horse"
+        }
       ]
     }
   ],
diff --git a/aos/testdata/self_forward.json b/aos/testdata/self_forward.json
index 101b018..0e88cdf 100644
--- a/aos/testdata/self_forward.json
+++ b/aos/testdata/self_forward.json
@@ -5,7 +5,9 @@
       "type": ".aos.bar",
       "source_node": "roborio",
       "destination_nodes": [
-        "roborio"
+        {
+          "name": "roborio"
+        }
       ]
     }
   ],
diff --git a/debian/python.BUILD b/debian/python.BUILD
index 666f2d8..5e5d810 100644
--- a/debian/python.BUILD
+++ b/debian/python.BUILD
@@ -1,16 +1,20 @@
 package(default_visibility = ["@//debian:__pkg__"])
 
 cc_library(
-    name = "python3.4_lib",
-    hdrs = glob(["usr/include/python3.4m/**/*.h"]),
+    name = "python3.5_lib",
+    srcs = [
+        "usr/lib/x86_64-linux-gnu/libpython3.5m.so",
+    ],
+    hdrs = glob(["usr/include/**/*.h"]),
     includes = [
-        "usr/include/python3.4m/",
+        "usr/include/",
+        "usr/include/python3.5m/",
     ],
     visibility = ["//visibility:public"],
 )
 
 cc_library(
-    name = "python3.4_f2py",
+    name = "python3.5_f2py",
     srcs = [
         "usr/lib/python3/dist-packages/numpy/f2py/src/fortranobject.c",
     ],
@@ -26,7 +30,7 @@
     ],
     visibility = ["//visibility:public"],
     deps = [
-        ":python3.4_lib",
+        ":python3.5_lib",
     ],
 )
 
diff --git a/frc971/analysis/BUILD b/frc971/analysis/BUILD
index b45031f..b526712 100644
--- a/frc971/analysis/BUILD
+++ b/frc971/analysis/BUILD
@@ -21,3 +21,30 @@
     srcs = ["__init__.py"],
     deps = ["//frc971:python_init"],
 )
+
+cc_binary(
+    name = "py_log_reader.so",
+    srcs = ["py_log_reader.cc"],
+    linkshared = True,
+    restricted_to = ["//tools:k8"],
+    deps = [
+        "//aos:configuration",
+        "//aos:json_to_flatbuffer",
+        "//aos/events:shm_event_loop",
+        "//aos/events:simulated_event_loop",
+        "//aos/events/logging:logger",
+        "@com_github_google_glog//:glog",
+        "@python_repo//:python3.5_lib",
+    ],
+)
+
+py_test(
+    name = "log_reader_test",
+    srcs = ["log_reader_test.py"],
+    data = [
+        ":py_log_reader.so",
+        "@sample_logfile//file",
+    ],
+    restricted_to = ["//tools:k8"],
+    deps = ["//aos:configuration_fbs_python"],
+)
diff --git a/frc971/analysis/log_reader_test.py b/frc971/analysis/log_reader_test.py
new file mode 100644
index 0000000..3389d00
--- /dev/null
+++ b/frc971/analysis/log_reader_test.py
@@ -0,0 +1,117 @@
+#!/usr/bin/python3
+import json
+import unittest
+
+from aos.Configuration import Configuration
+from frc971.analysis.py_log_reader import LogReader
+
+
+class LogReaderTest(unittest.TestCase):
+    def setUp(self):
+        self.reader = LogReader("external/sample_logfile/file/log.fbs")
+        # A list of all the channels in the logfile--this is used to confirm that
+        # we did indeed read the config correctly.
+        self.all_channels = [
+            ("/aos", "aos.JoystickState"), ("/aos", "aos.RobotState"),
+            ("/aos", "aos.timing.Report"), ("/aos", "frc971.PDPValues"),
+            ("/aos",
+             "frc971.wpilib.PneumaticsToLog"), ("/autonomous",
+                                                "aos.common.actions.Status"),
+            ("/autonomous", "frc971.autonomous.AutonomousMode"),
+            ("/autonomous", "frc971.autonomous.Goal"), ("/camera",
+                                                        "y2019.CameraLog"),
+            ("/camera", "y2019.control_loops.drivetrain.CameraFrame"),
+            ("/drivetrain",
+             "frc971.IMUValues"), ("/drivetrain",
+                                   "frc971.control_loops.drivetrain.Goal"),
+            ("/drivetrain",
+             "frc971.control_loops.drivetrain.LocalizerControl"),
+            ("/drivetrain", "frc971.control_loops.drivetrain.Output"),
+            ("/drivetrain", "frc971.control_loops.drivetrain.Position"),
+            ("/drivetrain", "frc971.control_loops.drivetrain.Status"),
+            ("/drivetrain", "frc971.sensors.GyroReading"),
+            ("/drivetrain",
+             "y2019.control_loops.drivetrain.TargetSelectorHint"),
+            ("/superstructure",
+             "y2019.StatusLight"), ("/superstructure",
+                                    "y2019.control_loops.superstructure.Goal"),
+            ("/superstructure", "y2019.control_loops.superstructure.Output"),
+            ("/superstructure", "y2019.control_loops.superstructure.Position"),
+            ("/superstructure", "y2019.control_loops.superstructure.Status")
+        ]
+        # A channel that is known to have data on it which we will use for testing.
+        self.test_channel = ("/aos", "aos.timing.Report")
+        # A non-existent channel
+        self.bad_channel = ("/aos", "aos.timing.FooBar")
+
+    def test_do_nothing(self):
+        """Tests that we sanely handle doing nothing.
+
+        A previous iteration of the log reader seg faulted when doing this."""
+        pass
+
+    def test_read_config(self):
+        """Tests that we can read the configuration from the logfile."""
+        config_bytes = self.reader.configuration()
+        config = Configuration.GetRootAsConfiguration(config_bytes, 0)
+
+        channel_set = set(self.all_channels)
+        for ii in range(config.ChannelsLength()):
+            channel = config.Channels(ii)
+            # Will raise KeyError if the channel does not exist
+            channel_set.remove((channel.Name().decode("utf-8"),
+                                channel.Type().decode("utf-8")))
+
+        self.assertEqual(0, len(channel_set))
+
+    def test_empty_process(self):
+        """Tests running process() without subscribing to anything succeeds."""
+        self.reader.process()
+        for channel in self.all_channels:
+            with self.assertRaises(ValueError) as context:
+                self.reader.get_data_for_channel(channel[0], channel[1])
+
+    def test_subscribe(self):
+        """Tests that we can subscribe to a channel and get data out."""
+        name = self.test_channel[0]
+        message_type = self.test_channel[1]
+        self.assertTrue(self.reader.subscribe(name, message_type))
+        self.reader.process()
+        data = self.reader.get_data_for_channel(name, message_type)
+        self.assertLess(100, len(data))
+        last_monotonic_time = 0
+        for entry in data:
+            monotonic_time = entry[0]
+            realtime_time = entry[1]
+            json_data = entry[2].replace('nan', '\"nan\"')
+            self.assertLess(last_monotonic_time, monotonic_time)
+            # Sanity check that the realtime times are in the correct range.
+            self.assertLess(1500000000e9, realtime_time)
+            self.assertGreater(2000000000e9, realtime_time)
+            parsed_json = json.loads(json_data)
+            self.assertIn("name", parsed_json)
+
+            last_monotonic_time = monotonic_time
+
+    def test_bad_subscribe(self):
+        """Tests that we return false when subscribing to a non-existent channel."""
+        self.assertFalse(
+            self.reader.subscribe(self.bad_channel[0], self.bad_channel[1]),
+            self.bad_channel)
+
+    def test_subscribe_after_process(self):
+        """Tests that an exception is thrown if we subscribe after calling process()."""
+        self.reader.process()
+        for channel in self.all_channels:
+            with self.assertRaises(RuntimeError) as context:
+                self.reader.subscribe(channel[0], channel[1])
+
+    def test_get_data_before_processj(self):
+        """Tests that an exception is thrown if we retrieve data before calling process()."""
+        for channel in self.all_channels:
+            with self.assertRaises(RuntimeError) as context:
+                self.reader.get_data_for_channel(channel[0], channel[1])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/frc971/analysis/py_log_reader.cc b/frc971/analysis/py_log_reader.cc
new file mode 100644
index 0000000..5a6fce4
--- /dev/null
+++ b/frc971/analysis/py_log_reader.cc
@@ -0,0 +1,283 @@
+// This file provides a Python module for reading logfiles. See
+// log_reader_test.py for usage.
+//
+// This reader works by having the user specify exactly what channels they want
+// data for. We then process the logfile and store all the data on that channel
+// into a list of timestamps + JSON message data. The user can then use an
+// accessor method (get_data_for_channel) to retrieve the cached data.
+
+// Defining PY_SSIZE_T_CLEAN seems to be suggested by most of the Python
+// documentation.
+#define PY_SSIZE_T_CLEAN
+// Note that Python.h needs to be included before anything else.
+#include <Python.h>
+
+#include <memory>
+
+#include "aos/configuration.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
+
+namespace frc971 {
+namespace analysis {
+namespace {
+
+// All the data corresponding to a single message.
+struct MessageData {
+  aos::monotonic_clock::time_point monotonic_sent_time;
+  aos::realtime_clock::time_point realtime_sent_time;
+  // JSON representation of the message.
+  std::string json_data;
+};
+
+// Data corresponding to an entire channel.
+struct ChannelData {
+  std::string name;
+  std::string type;
+  // Each message published on the channel, in order by monotonic time.
+  std::vector<MessageData> messages;
+};
+
+// All the objects that we need for managing reading a logfile.
+struct LogReaderTools {
+  std::unique_ptr<aos::logger::LogReader> reader;
+  std::unique_ptr<aos::SimulatedEventLoopFactory> event_loop_factory;
+  // Event loop to use for subscribing to buses.
+  std::unique_ptr<aos::EventLoop> event_loop;
+  std::vector<ChannelData> channel_data;
+  // Whether we have called process() on the reader yet.
+  bool processed = false;
+};
+
+struct LogReaderType {
+  PyObject_HEAD;
+  LogReaderTools *tools = nullptr;
+};
+
+void LogReader_dealloc(LogReaderType *self) {
+  LogReaderTools *tools = self->tools;
+  if (!tools->processed) {
+    tools->reader->Deregister();
+  }
+  delete tools;
+  Py_TYPE(self)->tp_free((PyObject *)self);
+}
+
+PyObject *LogReader_new(PyTypeObject *type, PyObject * /*args*/,
+                            PyObject * /*kwds*/) {
+  LogReaderType *self;
+  self = (LogReaderType *)type->tp_alloc(type, 0);
+  if (self != nullptr) {
+    self->tools = new LogReaderTools();
+    if (self->tools == nullptr) {
+      return nullptr;
+    }
+  }
+  return (PyObject *)self;
+}
+
+int LogReader_init(LogReaderType *self, PyObject *args, PyObject *kwds) {
+  const char *kwlist[] = {"log_file_name", nullptr};
+
+  const char *log_file_name;
+  if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", const_cast<char **>(kwlist),
+                                   &log_file_name)) {
+    return -1;
+  }
+
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+  tools->reader = std::make_unique<aos::logger::LogReader>(log_file_name);
+  tools->event_loop_factory = std::make_unique<aos::SimulatedEventLoopFactory>(
+      tools->reader->configuration());
+  tools->reader->Register(tools->event_loop_factory.get());
+
+  tools->event_loop = tools->event_loop_factory->MakeEventLoop("data_fetcher");
+  tools->event_loop->SkipTimingReport();
+
+  return 0;
+}
+
+PyObject *LogReader_get_data_for_channel(LogReaderType *self,
+                                                PyObject *args,
+                                                PyObject *kwds) {
+  const char *kwlist[] = {"name", "type", nullptr};
+
+  const char *name;
+  const char *type;
+  if (!PyArg_ParseTupleAndKeywords(args, kwds, "ss",
+                                   const_cast<char **>(kwlist), &name, &type)) {
+    return nullptr;
+  }
+
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+  if (!tools->processed) {
+    PyErr_SetString(PyExc_RuntimeError,
+                    "Called get_data_for_bus before calling process().");
+    return nullptr;
+  }
+
+  for (const auto &channel : tools->channel_data) {
+    if (channel.name == name && channel.type == type) {
+      PyObject *list = PyList_New(channel.messages.size());
+      for (size_t ii = 0; ii < channel.messages.size(); ++ii)
+      {
+        const auto &message = channel.messages[ii];
+        PyObject *monotonic_time = PyLong_FromLongLong(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                message.monotonic_sent_time.time_since_epoch())
+                .count());
+        PyObject *realtime_time = PyLong_FromLongLong(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                message.realtime_sent_time.time_since_epoch())
+                .count());
+        PyObject *json_data = PyUnicode_FromStringAndSize(
+            message.json_data.data(), message.json_data.size());
+        PyObject *entry =
+            PyTuple_Pack(3, monotonic_time, realtime_time, json_data);
+        if (PyList_SetItem(list, ii, entry) != 0) {
+          return nullptr;
+        }
+      }
+      return list;
+    }
+  }
+  PyErr_SetString(PyExc_ValueError,
+                  "The provided channel was never subscribed to.");
+  return nullptr;
+}
+
+PyObject *LogReader_subscribe(LogReaderType *self, PyObject *args,
+                                     PyObject *kwds) {
+  const char *kwlist[] = {"name", "type", nullptr};
+
+  const char *name;
+  const char *type;
+  if (!PyArg_ParseTupleAndKeywords(args, kwds, "ss",
+                                   const_cast<char **>(kwlist), &name, &type)) {
+    return nullptr;
+  }
+
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+  if (tools->processed) {
+    PyErr_SetString(PyExc_RuntimeError,
+                    "Called subscribe after calling process().");
+    return nullptr;
+  }
+
+  const aos::Channel *const channel = aos::configuration::GetChannel(
+      tools->reader->configuration(), name, type, "", nullptr);
+  if (channel == nullptr) {
+    return Py_False;
+  }
+  const int index = tools->channel_data.size();
+  tools->channel_data.push_back(
+      {.name = name, .type = type, .messages = {}});
+  tools->event_loop->MakeRawWatcher(
+      channel, [channel, index, tools](const aos::Context &context,
+                                       const void *message) {
+        tools->channel_data[index].messages.push_back(
+            {.monotonic_sent_time = context.monotonic_event_time,
+             .realtime_sent_time = context.realtime_event_time,
+             .json_data = aos::FlatbufferToJson(
+                 channel->schema(), static_cast<const uint8_t *>(message))});
+      });
+  return Py_True;
+}
+
+static PyObject *LogReader_process(LogReaderType *self,
+                                   PyObject *Py_UNUSED(ignored)) {
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+  if (tools->processed) {
+    PyErr_SetString(PyExc_RuntimeError, "process() may only be called once.");
+    return nullptr;
+  }
+
+  tools->processed = true;
+
+  tools->event_loop_factory->Run();
+  tools->reader->Deregister();
+
+  Py_RETURN_NONE;
+}
+
+static PyObject *LogReader_configuration(LogReaderType *self,
+                                         PyObject *Py_UNUSED(ignored)) {
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+  // I have no clue if the Configuration that we get from the log reader is in a
+  // contiguous chunk of memory, and I'm too lazy to either figure it out or
+  // figure out how to extract the actual data buffer + offset.
+  // Instead, copy the flatbuffer and return a copy of the new buffer.
+  aos::FlatbufferDetachedBuffer<aos::Configuration> buffer =
+      aos::CopyFlatBuffer(tools->reader->configuration());
+
+  return PyBytes_FromStringAndSize(
+      reinterpret_cast<const char *>(buffer.data()), buffer.size());
+}
+
+static PyMethodDef LogReader_methods[] = {
+    {"configuration", (PyCFunction)LogReader_configuration, METH_NOARGS,
+     "Return a bytes buffer for the Configuration of the logfile."},
+    {"process", (PyCFunction)LogReader_process, METH_NOARGS,
+     "Processes the logfile and all the subscribed to channels."},
+    {"subscribe", (PyCFunction)LogReader_subscribe,
+     METH_VARARGS | METH_KEYWORDS,
+     "Attempts to subscribe to the provided channel name + type. Returns True "
+     "if successful."},
+    {"get_data_for_channel", (PyCFunction)LogReader_get_data_for_channel,
+     METH_VARARGS | METH_KEYWORDS,
+     "Returns the logged data for a given channel. Raises an exception if you "
+     "did not subscribe to the provided channel. Returned data is a list of "
+     "tuples where each tuple is of the form (monotonic_nsec, realtime_nsec, "
+     "json_message_data)."},
+    {nullptr, 0, 0, nullptr} /* Sentinel */
+};
+
+static PyTypeObject LogReaderType = {
+    PyVarObject_HEAD_INIT(NULL, 0).tp_name = "py_log_reader.LogReader",
+    .tp_doc = "LogReader objects",
+    .tp_basicsize = sizeof(LogReaderType),
+    .tp_itemsize = 0,
+    .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
+    .tp_new = LogReader_new,
+    .tp_init = (initproc)LogReader_init,
+    .tp_dealloc = (destructor)LogReader_dealloc,
+    .tp_methods = LogReader_methods,
+};
+
+static PyModuleDef log_reader_module = {
+    PyModuleDef_HEAD_INIT,
+    .m_name = "py_log_reader",
+    .m_doc = "Example module that creates an extension type.",
+    .m_size = -1,
+};
+
+PyObject *InitModule() {
+  PyObject *m;
+  if (PyType_Ready(&LogReaderType) < 0) return nullptr;
+
+  m = PyModule_Create(&log_reader_module);
+  if (m == nullptr) return nullptr;
+
+  Py_INCREF(&LogReaderType);
+  if (PyModule_AddObject(m, "LogReader", (PyObject *)&LogReaderType) < 0) {
+    Py_DECREF(&LogReaderType);
+    Py_DECREF(m);
+    return nullptr;
+  }
+
+  return m;
+}
+
+}  // namespace
+}  // namespace analysis
+}  // namespace frc971
+
+PyMODINIT_FUNC PyInit_py_log_reader(void) {
+  return frc971::analysis::InitModule();
+}