Move log reader and writer over to split timestamp channels

Reuse TimestampChannel (and split it up further) to implement naming and
finding channels for log reader and writer.  Test both the old and new
timestamp configuration.

Once the config changes go in for y2020, this should fix reading the log
file James broke.

Change-Id: I6b09eec69c064ded3b3c149e0fdf23162bd352cf
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 1f926c8..e2dddc4 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -391,46 +391,45 @@
   // Wait until we are connected, then send.
   int ping_count = 0;
   int pi1_server_statistics_count = 0;
-  ping_event_loop.MakeWatcher(
-      "/pi1/aos",
-      [this, &ping_count, &ping_sender,
-       &pi1_server_statistics_count](const ServerStatistics &stats) {
-        VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
+  ping_event_loop.MakeWatcher("/pi1/aos", [this, &ping_count, &ping_sender,
+                                           &pi1_server_statistics_count](
+                                              const ServerStatistics &stats) {
+    VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
 
-        ASSERT_TRUE(stats.has_connections());
-        EXPECT_EQ(stats.connections()->size(), 1);
+    ASSERT_TRUE(stats.has_connections());
+    EXPECT_EQ(stats.connections()->size(), 1);
 
-        bool connected = false;
-        for (const ServerConnection *connection : *stats.connections()) {
-          // Confirm that we are estimating the server time offset correctly. It
-          // should be about 0 since we are on the same machine here.
-          if (connection->has_monotonic_offset()) {
-            EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
-                      chrono::milliseconds(1));
-            EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
-                      chrono::milliseconds(-1));
-            ++pi1_server_statistics_count;
-          }
+    bool connected = false;
+    for (const ServerConnection *connection : *stats.connections()) {
+      // Confirm that we are estimating the server time offset correctly. It
+      // should be about 0 since we are on the same machine here.
+      if (connection->has_monotonic_offset()) {
+        EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::milliseconds(1));
+        EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+                  chrono::milliseconds(-1));
+        ++pi1_server_statistics_count;
+      }
 
-          if (connection->node()->name()->string_view() ==
-              pi2_client_event_loop->node()->name()->string_view()) {
-            if (connection->state() == State::CONNECTED) {
-              EXPECT_TRUE(connection->has_boot_uuid());
-              connected = true;
-            }
-          }
+      if (connection->node()->name()->string_view() ==
+          pi2_client_event_loop->node()->name()->string_view()) {
+        if (connection->state() == State::CONNECTED) {
+          EXPECT_TRUE(connection->has_boot_uuid());
+          connected = true;
         }
+      }
+    }
 
-        if (connected) {
-          VLOG(1) << "Connected!  Sent ping.";
-          auto builder = ping_sender.MakeBuilder();
-          examples::Ping::Builder ping_builder =
-              builder.MakeBuilder<examples::Ping>();
-          ping_builder.add_value(ping_count + 971);
-          builder.Send(ping_builder.Finish());
-          ++ping_count;
-        }
-      });
+    if (connected) {
+      VLOG(1) << "Connected!  Sent ping.";
+      auto builder = ping_sender.MakeBuilder();
+      examples::Ping::Builder ping_builder =
+          builder.MakeBuilder<examples::Ping>();
+      ping_builder.add_value(ping_count + 971);
+      builder.Send(ping_builder.Finish());
+      ++ping_count;
+    }
+  });
 
   // Confirm both client and server statistics messages have decent offsets in
   // them.
@@ -1005,7 +1004,7 @@
   pi1_remote_timestamp_event_loop.MakeWatcher(
       channel_name, [this, channel_name, ping_channel_index,
                      &ping_timestamp_count](const RemoteMessage &header) {
-        VLOG(1) <<channel_name << " RemoteMessage "
+        VLOG(1) << channel_name << " RemoteMessage "
                 << aos::FlatbufferToJson(&header);
         EXPECT_TRUE(header.has_boot_uuid());
         if (shared() && header.channel_index() != ping_channel_index) {
diff --git a/aos/network/timestamp_channel.cc b/aos/network/timestamp_channel.cc
index 12c6713..d022b60 100644
--- a/aos/network/timestamp_channel.cc
+++ b/aos/network/timestamp_channel.cc
@@ -5,62 +5,99 @@
 namespace aos {
 namespace message_bridge {
 
+ChannelTimestampFinder::ChannelTimestampFinder(
+    const Configuration *configuration, const std::string_view name,
+    const Node *node)
+    : configuration_(configuration), name_(name), node_(node) {}
+
+std::string ChannelTimestampFinder::SplitChannelName(
+    const Channel *channel, const Connection *connection) {
+  std::string type(channel->type()->string_view());
+  std::replace(type.begin(), type.end(), '.', '-');
+  return absl::StrCat("/aos/remote_timestamps/",
+                      connection->name()->string_view(),
+                      channel->name()->string_view(), "/", type);
+}
+
+std::string ChannelTimestampFinder::CombinedChannelName(
+    std::string_view remote_node) {
+  return absl::StrCat("/aos/remote_timestamps/", remote_node);
+}
+
+const Channel *ChannelTimestampFinder::ForChannel(
+    const Channel *channel, const Connection *connection) {
+  const std::string split_timestamp_channel_name =
+      SplitChannelName(channel, connection);
+  const Channel *split_timestamp_channel = configuration::GetChannel(
+      configuration_, split_timestamp_channel_name,
+      RemoteMessage::GetFullyQualifiedName(), name_, node_, true);
+  if (split_timestamp_channel != nullptr) {
+    return split_timestamp_channel;
+  }
+
+  const std::string shared_timestamp_channel_name =
+      CombinedChannelName(connection->name()->string_view());
+  const Channel *shared_timestamp_channel = configuration::GetChannel(
+      configuration_, shared_timestamp_channel_name,
+      RemoteMessage::GetFullyQualifiedName(), name_, node_, true);
+  if (shared_timestamp_channel != nullptr) {
+    LOG(WARNING) << "Failed to find timestamp channel {\"name\": \""
+                 << split_timestamp_channel << "\", \"type\": \""
+                 << RemoteMessage::GetFullyQualifiedName()
+                 << "\"}, falling back to old version.";
+    return shared_timestamp_channel;
+  }
+
+  CHECK(shared_timestamp_channel != nullptr)
+      << ": Remote timestamp channel { \"name\": \""
+      << split_timestamp_channel_name << "\", \"type\": \""
+      << RemoteMessage::GetFullyQualifiedName()
+      << "\" } not found in config for " << name_
+      << (configuration::MultiNode(configuration_)
+              ? absl::StrCat(" on node ", node_->name()->string_view())
+              : ".");
+
+  return nullptr;
+}
+
 ChannelTimestampSender::ChannelTimestampSender(aos::EventLoop *event_loop)
     : event_loop_(event_loop) {
   CHECK(configuration::MultiNode(event_loop_->configuration()));
-  timestamp_loggers_.resize(event_loop_->configuration()->nodes()->size());
 }
 
 aos::Sender<RemoteMessage> *ChannelTimestampSender::SenderForChannel(
     const Channel *channel, const Connection *connection) {
+  ChannelTimestampFinder finder(event_loop_);
   // Look at any pre-created channel/connection pairs.
-  auto it =
-      channel_timestamp_loggers_.find(std::make_pair(channel, connection));
-  if (it != channel_timestamp_loggers_.end()) {
-    return it->second.get();
+  {
+    auto it =
+        channel_timestamp_loggers_.find(std::make_pair(channel, connection));
+    if (it != channel_timestamp_loggers_.end()) {
+      return it->second.get();
+    }
   }
 
-  const Node *other_node = configuration::GetNode(
-      event_loop_->configuration(), connection->name()->string_view());
-  const size_t other_node_index =
-      configuration::GetNodeIndex(event_loop_->configuration(), other_node);
+  const Channel *timestamp_channel = finder.ForChannel(channel, connection);
 
-  std::string type(channel->type()->string_view());
-  std::replace(type.begin(), type.end(), '.', '-');
-  const std::string single_timestamp_channel =
-      absl::StrCat("/aos/remote_timestamps/", connection->name()->string_view(),
-                   channel->name()->string_view(), "/", type);
-  if (event_loop_->HasChannel<RemoteMessage>(single_timestamp_channel)) {
-    LOG(INFO) << "Making RemoteMessage channel " << single_timestamp_channel;
-    auto result = channel_timestamp_loggers_.try_emplace(
-        std::make_pair(channel, connection),
-        std::make_unique<aos::Sender<RemoteMessage>>(
-            event_loop_->MakeSender<RemoteMessage>(single_timestamp_channel)));
-    return result.first->second.get();
-  } else {
-    // Then look for any per-remote-node channels.
-    if (timestamp_loggers_[other_node_index]) {
-      return &timestamp_loggers_[other_node_index];
+  {
+    auto it = timestamp_loggers_.find(timestamp_channel);
+    if (it != timestamp_loggers_.end()) {
+      CHECK(channel_timestamp_loggers_
+                .try_emplace(std::make_pair(channel, connection), it->second)
+                .second);
+      return it->second.get();
     }
-    const std::string shared_timestamp_channel = absl::StrCat(
-        "/aos/remote_timestamps/", connection->name()->string_view());
-    LOG(INFO) << "Looking for " << shared_timestamp_channel;
-    if (event_loop_->HasChannel<RemoteMessage>(shared_timestamp_channel)) {
-      LOG(WARNING) << "Failed to find timestamp channel {\"name\": \""
-                   << single_timestamp_channel << "\", \"type\": \""
-                   << RemoteMessage::GetFullyQualifiedName()
-                   << "\"}, falling back to old version.";
-      timestamp_loggers_[other_node_index] =
-          event_loop_->MakeSender<RemoteMessage>(shared_timestamp_channel);
-      return &timestamp_loggers_[other_node_index];
-    } else {
-      LOG(ERROR) << "Failed";
-    }
-
-    // Explode with an error about the new channel.
-    event_loop_->MakeSender<RemoteMessage>(single_timestamp_channel);
-    return nullptr;
   }
+
+  auto result = channel_timestamp_loggers_.try_emplace(
+      std::make_pair(channel, connection),
+      std::make_shared<aos::Sender<RemoteMessage>>(
+          event_loop_->MakeSender<RemoteMessage>(
+              timestamp_channel->name()->string_view())));
+
+  CHECK(timestamp_loggers_.try_emplace(timestamp_channel, result.first->second)
+            .second);
+  return result.first->second.get();
 }
 
 }  // namespace message_bridge
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index b62c5e9..1702632 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -1,6 +1,7 @@
 #ifndef AOS_NETWORK_TIMESTAMP_CHANNEL_
 #define AOS_NETWORK_TIMESTAMP_CHANNEL_
 
+#include <string_view>
 #include <vector>
 
 #include "absl/container/btree_map.h"
@@ -12,6 +13,30 @@
 namespace aos {
 namespace message_bridge {
 
+// Class to find the corresponding channel where timestamps for a specified data
+// channel and connection will be logged.
+class ChannelTimestampFinder {
+ public:
+  ChannelTimestampFinder(aos::EventLoop *event_loop)
+      : ChannelTimestampFinder(event_loop->configuration(), event_loop->name(),
+                               event_loop->node()) {}
+  ChannelTimestampFinder(const Configuration *configuration,
+                         const std::string_view name, const Node *node);
+
+  // Finds the timestamp logging channel for the provided data channel and
+  // connection.
+  const Channel *ForChannel(const Channel *channel,
+                            const Connection *connection);
+  std::string SplitChannelName(const Channel *channel,
+                               const Connection *connection);
+  std::string CombinedChannelName(std::string_view remote_node);
+
+ private:
+  const Configuration *configuration_;
+  const std::string_view name_;
+  const Node *node_;
+};
+
 // Class to manage lifetime, and creating senders for channels and connections.
 class ChannelTimestampSender {
  public:
@@ -35,13 +60,15 @@
   // I'd prefer 3) to be an error, but don't have strong opinions.  We will
   // still be correct if it gets used, as long as everything is consistent.
 
-  // List of Senders per node.
-  std::vector<aos::Sender<RemoteMessage>> timestamp_loggers_;
-
   // Mapping from channel and connection to logger.
   absl::btree_map<std::pair<const Channel *, const Connection *>,
-                  std::unique_ptr<aos::Sender<RemoteMessage>>>
+                  std::shared_ptr<aos::Sender<RemoteMessage>>>
       channel_timestamp_loggers_;
+
+  // Mapping from channel to RemoteMessage sender.  This is the channel that
+  // timestamps are published to.
+  absl::btree_map<const Channel *, std::shared_ptr<aos::Sender<RemoteMessage>>>
+      timestamp_loggers_;
 };
 
 }  // namespace message_bridge