Move log reader and writer over to split timestamp channels

Reuse TimestampChannel (and split it up further) to implement naming and
finding channels for log reader and writer.  Test both the old and new
timestamp configuration.

Once the config changes go in for y2020, this should fix reading the log
file James broke.

Change-Id: I6b09eec69c064ded3b3c149e0fdf23162bd352cf
diff --git a/aos/network/timestamp_channel.cc b/aos/network/timestamp_channel.cc
index 12c6713..d022b60 100644
--- a/aos/network/timestamp_channel.cc
+++ b/aos/network/timestamp_channel.cc
@@ -5,62 +5,99 @@
 namespace aos {
 namespace message_bridge {
 
+ChannelTimestampFinder::ChannelTimestampFinder(
+    const Configuration *configuration, const std::string_view name,
+    const Node *node)
+    : configuration_(configuration), name_(name), node_(node) {}
+
+std::string ChannelTimestampFinder::SplitChannelName(
+    const Channel *channel, const Connection *connection) {
+  std::string type(channel->type()->string_view());
+  std::replace(type.begin(), type.end(), '.', '-');
+  return absl::StrCat("/aos/remote_timestamps/",
+                      connection->name()->string_view(),
+                      channel->name()->string_view(), "/", type);
+}
+
+std::string ChannelTimestampFinder::CombinedChannelName(
+    std::string_view remote_node) {
+  return absl::StrCat("/aos/remote_timestamps/", remote_node);
+}
+
+const Channel *ChannelTimestampFinder::ForChannel(
+    const Channel *channel, const Connection *connection) {
+  const std::string split_timestamp_channel_name =
+      SplitChannelName(channel, connection);
+  const Channel *split_timestamp_channel = configuration::GetChannel(
+      configuration_, split_timestamp_channel_name,
+      RemoteMessage::GetFullyQualifiedName(), name_, node_, true);
+  if (split_timestamp_channel != nullptr) {
+    return split_timestamp_channel;
+  }
+
+  const std::string shared_timestamp_channel_name =
+      CombinedChannelName(connection->name()->string_view());
+  const Channel *shared_timestamp_channel = configuration::GetChannel(
+      configuration_, shared_timestamp_channel_name,
+      RemoteMessage::GetFullyQualifiedName(), name_, node_, true);
+  if (shared_timestamp_channel != nullptr) {
+    LOG(WARNING) << "Failed to find timestamp channel {\"name\": \""
+                 << split_timestamp_channel << "\", \"type\": \""
+                 << RemoteMessage::GetFullyQualifiedName()
+                 << "\"}, falling back to old version.";
+    return shared_timestamp_channel;
+  }
+
+  CHECK(shared_timestamp_channel != nullptr)
+      << ": Remote timestamp channel { \"name\": \""
+      << split_timestamp_channel_name << "\", \"type\": \""
+      << RemoteMessage::GetFullyQualifiedName()
+      << "\" } not found in config for " << name_
+      << (configuration::MultiNode(configuration_)
+              ? absl::StrCat(" on node ", node_->name()->string_view())
+              : ".");
+
+  return nullptr;
+}
+
 ChannelTimestampSender::ChannelTimestampSender(aos::EventLoop *event_loop)
     : event_loop_(event_loop) {
   CHECK(configuration::MultiNode(event_loop_->configuration()));
-  timestamp_loggers_.resize(event_loop_->configuration()->nodes()->size());
 }
 
 aos::Sender<RemoteMessage> *ChannelTimestampSender::SenderForChannel(
     const Channel *channel, const Connection *connection) {
+  ChannelTimestampFinder finder(event_loop_);
   // Look at any pre-created channel/connection pairs.
-  auto it =
-      channel_timestamp_loggers_.find(std::make_pair(channel, connection));
-  if (it != channel_timestamp_loggers_.end()) {
-    return it->second.get();
+  {
+    auto it =
+        channel_timestamp_loggers_.find(std::make_pair(channel, connection));
+    if (it != channel_timestamp_loggers_.end()) {
+      return it->second.get();
+    }
   }
 
-  const Node *other_node = configuration::GetNode(
-      event_loop_->configuration(), connection->name()->string_view());
-  const size_t other_node_index =
-      configuration::GetNodeIndex(event_loop_->configuration(), other_node);
+  const Channel *timestamp_channel = finder.ForChannel(channel, connection);
 
-  std::string type(channel->type()->string_view());
-  std::replace(type.begin(), type.end(), '.', '-');
-  const std::string single_timestamp_channel =
-      absl::StrCat("/aos/remote_timestamps/", connection->name()->string_view(),
-                   channel->name()->string_view(), "/", type);
-  if (event_loop_->HasChannel<RemoteMessage>(single_timestamp_channel)) {
-    LOG(INFO) << "Making RemoteMessage channel " << single_timestamp_channel;
-    auto result = channel_timestamp_loggers_.try_emplace(
-        std::make_pair(channel, connection),
-        std::make_unique<aos::Sender<RemoteMessage>>(
-            event_loop_->MakeSender<RemoteMessage>(single_timestamp_channel)));
-    return result.first->second.get();
-  } else {
-    // Then look for any per-remote-node channels.
-    if (timestamp_loggers_[other_node_index]) {
-      return &timestamp_loggers_[other_node_index];
+  {
+    auto it = timestamp_loggers_.find(timestamp_channel);
+    if (it != timestamp_loggers_.end()) {
+      CHECK(channel_timestamp_loggers_
+                .try_emplace(std::make_pair(channel, connection), it->second)
+                .second);
+      return it->second.get();
     }
-    const std::string shared_timestamp_channel = absl::StrCat(
-        "/aos/remote_timestamps/", connection->name()->string_view());
-    LOG(INFO) << "Looking for " << shared_timestamp_channel;
-    if (event_loop_->HasChannel<RemoteMessage>(shared_timestamp_channel)) {
-      LOG(WARNING) << "Failed to find timestamp channel {\"name\": \""
-                   << single_timestamp_channel << "\", \"type\": \""
-                   << RemoteMessage::GetFullyQualifiedName()
-                   << "\"}, falling back to old version.";
-      timestamp_loggers_[other_node_index] =
-          event_loop_->MakeSender<RemoteMessage>(shared_timestamp_channel);
-      return &timestamp_loggers_[other_node_index];
-    } else {
-      LOG(ERROR) << "Failed";
-    }
-
-    // Explode with an error about the new channel.
-    event_loop_->MakeSender<RemoteMessage>(single_timestamp_channel);
-    return nullptr;
   }
+
+  auto result = channel_timestamp_loggers_.try_emplace(
+      std::make_pair(channel, connection),
+      std::make_shared<aos::Sender<RemoteMessage>>(
+          event_loop_->MakeSender<RemoteMessage>(
+              timestamp_channel->name()->string_view())));
+
+  CHECK(timestamp_loggers_.try_emplace(timestamp_channel, result.first->second)
+            .second);
+  return result.first->second.get();
 }
 
 }  // namespace message_bridge