Add TimestampMapper to match timestamps with data

This finishes the main sorting code for
https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#

This creates an object to buffer sorted data in, and to buffer data to
match every time we find a timestamp.  Currently, there is no timing out
of the data buffered, but only the data that *could* be forwarded is
buffered.

We also have a bunch of restrictions here to simplify the logic.  The
plan is to relax them as we run into them rather than solve everything
at the start.  And, we can then add tests a lot better.

Change-Id: Idbc515e5594bc031139c7b994aaa71826ff68c0a
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 4474c7e..2886a8e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -490,11 +490,36 @@
 
 std::ostream &operator<<(std::ostream &os, const Message &m) {
   os << "{.channel_index=" << m.channel_index
-     << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp
-     << ", .data="
-     << aos::FlatbufferToJson(m.data,
-                              {.multi_line = false, .max_vector_size = 1})
-     << "}";
+     << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
+  if (m.data.Verify()) {
+    os << ", .data="
+       << aos::FlatbufferToJson(m.data,
+                                {.multi_line = false, .max_vector_size = 1});
+  }
+  os << "}";
+  return os;
+}
+
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
+  os << "{.channel_index=" << m.channel_index
+     << ", .queue_index=" << m.queue_index
+     << ", .monotonic_event_time=" << m.monotonic_event_time
+     << ", .realtime_event_time=" << m.realtime_event_time;
+  if (m.remote_queue_index != 0xffffffff) {
+    os << ", .remote_queue_index=" << m.remote_queue_index;
+  }
+  if (m.monotonic_remote_time != monotonic_clock::min_time) {
+    os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
+  }
+  if (m.realtime_remote_time != realtime_clock::min_time) {
+    os << ", .realtime_remote_time=" << m.realtime_remote_time;
+  }
+  if (m.data.Verify()) {
+    os << ", .data="
+       << aos::FlatbufferToJson(m.data,
+                                {.multi_line = false, .max_vector_size = 1});
+  }
+  os << "}";
   return os;
 }
 
@@ -559,8 +584,28 @@
   return ss.str();
 }
 
-NodeMerger::NodeMerger(std::vector<std::unique_ptr<LogPartsSorter>> parts)
-    : parts_sorters_(std::move(parts)) {}
+NodeMerger::NodeMerger(std::vector<LogParts> parts) {
+  CHECK_GE(parts.size(), 1u);
+  const std::string part0_node = parts[0].node;
+  for (size_t i = 1; i < parts.size(); ++i) {
+    CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
+  }
+  for (LogParts &part : parts) {
+    parts_sorters_.emplace_back(std::move(part));
+  }
+
+  node_ = configuration::GetNodeIndex(log_file_header()->configuration(),
+                                      part0_node);
+
+  monotonic_start_time_ = monotonic_clock::max_time;
+  realtime_start_time_ = realtime_clock::max_time;
+  for (const LogPartsSorter &parts_sorter : parts_sorters_) {
+    if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
+      monotonic_start_time_ = parts_sorter.monotonic_start_time();
+      realtime_start_time_ = parts_sorter.realtime_start_time();
+    }
+  }
+}
 
 Message *NodeMerger::Front() {
   // Return the current Front if we have one, otherwise go compute one.
@@ -572,23 +617,23 @@
   // duplicates.
   Message *oldest = nullptr;
   sorted_until_ = monotonic_clock::max_time;
-  for (std::unique_ptr<LogPartsSorter> &parts_sorter : parts_sorters_) {
-    Message *m = parts_sorter->Front();
+  for (LogPartsSorter &parts_sorter : parts_sorters_) {
+    Message *m = parts_sorter.Front();
     if (!m) {
-      sorted_until_ = std::min(sorted_until_, parts_sorter->sorted_until());
+      sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
       continue;
     }
     if (oldest == nullptr || *m < *oldest) {
       oldest = m;
-      current_ = parts_sorter.get();
+      current_ = &parts_sorter;
     } else if (*m == *oldest) {
       // Found a duplicate.  It doesn't matter which one we return.  It is
       // easiest to just drop the new one.
-      parts_sorter->PopFront();
+      parts_sorter.PopFront();
     }
 
     // PopFront may change this, so compute it down here.
-    sorted_until_ = std::min(sorted_until_, parts_sorter->sorted_until());
+    sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
   }
 
   // Return the oldest message found.  This will be nullptr if nothing was
@@ -602,6 +647,306 @@
   current_ = nullptr;
 }
 
+TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
+    : node_merger_(std::move(parts)),
+      node_(node_merger_.node()),
+      message_{.channel_index = 0xffffffff,
+               .queue_index = 0xffffffff,
+               .monotonic_event_time = monotonic_clock::min_time,
+               .realtime_event_time = realtime_clock::min_time,
+               .remote_queue_index = 0xffffffff,
+               .monotonic_remote_time = monotonic_clock::min_time,
+               .realtime_remote_time = realtime_clock::min_time,
+               .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()} {
+  const Configuration *config = log_file_header()->configuration();
+  // Only fill out nodes_data_ if there are nodes.  Otherwise everything gets
+  // pretty simple.
+  if (configuration::MultiNode(config)) {
+    nodes_data_.resize(config->nodes()->size());
+    const Node *my_node = config->nodes()->Get(node());
+    for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
+      const Node *node = config->nodes()->Get(node_index);
+      NodeData *node_data = &nodes_data_[node_index];
+      node_data->channels.resize(config->channels()->size());
+      // We should save the channel if it is delivered to the node represented
+      // by the NodeData, but not sent by that node.  That combo means it is
+      // forwarded.
+      size_t channel_index = 0;
+      node_data->any_delivered = false;
+      for (const Channel *channel : *config->channels()) {
+        node_data->channels[channel_index].delivered =
+            configuration::ChannelIsReadableOnNode(channel, node) &&
+            configuration::ChannelIsSendableOnNode(channel, my_node);
+        node_data->any_delivered = node_data->any_delivered ||
+                                   node_data->channels[channel_index].delivered;
+        ++channel_index;
+      }
+    }
+
+    for (const Channel *channel : *config->channels()) {
+      source_node_.emplace_back(configuration::GetNodeIndex(
+          config, channel->source_node()->string_view()));
+    }
+  }
+}
+
+void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
+  CHECK(configuration::MultiNode(log_file_header()->configuration()));
+  CHECK_NE(timestamp_mapper->node(), node());
+  CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
+
+  NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
+  // Only set it if this node delivers to the peer timestamp_mapper.  Otherwise
+  // we could needlessly save data.
+  if (node_data->any_delivered) {
+    LOG(INFO) << "Registering on node " << node() << " for peer node "
+              << timestamp_mapper->node();
+    CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
+
+    timestamp_mapper->nodes_data_[node()].peer = this;
+  }
+}
+
+void TimestampMapper::FillMessage(Message *m) {
+  message_ = {
+      .channel_index = m->channel_index,
+      .queue_index = m->queue_index,
+      .monotonic_event_time = m->timestamp,
+      .realtime_event_time = aos::realtime_clock::time_point(
+          std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+      .remote_queue_index = 0xffffffff,
+      .monotonic_remote_time = monotonic_clock::min_time,
+      .realtime_remote_time = realtime_clock::min_time,
+      .data = std::move(m->data)};
+}
+
+TimestampedMessage *TimestampMapper::Front() {
+  // No need to fetch anything new.  A previous message still exists.
+  switch (first_message_) {
+    case FirstMessage::kNeedsUpdate:
+      break;
+    case FirstMessage::kInMessage:
+      return &message_;
+    case FirstMessage::kNullptr:
+      return nullptr;
+  }
+
+  if (nodes_data_.empty()) {
+    // Simple path.  We are single node, so there are no timestamps to match!
+    CHECK_EQ(messages_.size(), 0u);
+    Message *m = node_merger_.Front();
+    if (!m) {
+      first_message_ = FirstMessage::kNullptr;
+      return nullptr;
+    }
+    // Fill in message_ so we have a place to associate remote timestamps, and
+    // return it.
+    FillMessage(m);
+
+    CHECK_GE(message_.monotonic_event_time, last_message_time_);
+    last_message_time_ = message_.monotonic_event_time;
+    first_message_ = FirstMessage::kInMessage;
+    return &message_;
+  }
+
+  // We need to only add messages to the list so they get processed for messages
+  // which are delivered.  Reuse the flow below which uses messages_ by just
+  // adding the new message to messages_ and continuing.
+  if (messages_.empty()) {
+    if (!Queue()) {
+      // Found nothing to add, we are out of data!
+      first_message_ = FirstMessage::kNullptr;
+      return nullptr;
+    }
+
+    // Now that it has been added (and cannibalized), forget about it upstream.
+    node_merger_.PopFront();
+  }
+
+  Message *m = &(messages_.front());
+
+  if (source_node_[m->channel_index] == node()) {
+    // From us, just forward it on, filling the remote data in as invalid.
+    FillMessage(m);
+    CHECK_GE(message_.monotonic_event_time, last_message_time_);
+    last_message_time_ = message_.monotonic_event_time;
+    first_message_ = FirstMessage::kInMessage;
+    return &message_;
+  } else {
+    // Got a timestamp, find the matching remote data, match it, and return it.
+    Message data = MatchingMessageFor(*m);
+
+    // Return the data from the remote.  The local message only has timestamp
+    // info which isn't relevant anymore once extracted.
+    message_ = {
+        .channel_index = m->channel_index,
+        .queue_index = m->queue_index,
+        .monotonic_event_time = m->timestamp,
+        .realtime_event_time = aos::realtime_clock::time_point(
+            std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+        .remote_queue_index = m->data.message().remote_queue_index(),
+        .monotonic_remote_time =
+            monotonic_clock::time_point(std::chrono::nanoseconds(
+                m->data.message().monotonic_remote_time())),
+        .realtime_remote_time = realtime_clock::time_point(
+            std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
+        .data = std::move(data.data)};
+    CHECK_GE(message_.monotonic_event_time, last_message_time_);
+    last_message_time_ = message_.monotonic_event_time;
+    first_message_ = FirstMessage::kInMessage;
+    return &message_;
+  }
+}
+
+void TimestampMapper::PopFront() {
+  CHECK(first_message_ != FirstMessage::kNeedsUpdate);
+  first_message_ = FirstMessage::kNeedsUpdate;
+
+  if (nodes_data_.empty()) {
+    // We are thin wrapper around node_merger.  Call it directly.
+    node_merger_.PopFront();
+  } else {
+    // Since messages_ holds the data, drop it.
+    messages_.pop_front();
+  }
+}
+
+Message TimestampMapper::MatchingMessageFor(const Message &message) {
+  TimestampMapper *peer =
+      CHECK_NOTNULL(nodes_data_[source_node_[message.channel_index]].peer);
+  // The queue which will have the matching data, if available.
+  std::deque<Message> *data_queue =
+      &peer->nodes_data_[node()].channels[message.channel_index].messages;
+
+  // Figure out what queue index we are looking for.
+  CHECK(message.data.message().has_remote_queue_index());
+  const uint32_t remote_queue_index =
+      message.data.message().remote_queue_index();
+
+  CHECK(message.data.message().has_monotonic_remote_time());
+  CHECK(message.data.message().has_realtime_remote_time());
+
+  const monotonic_clock::time_point monotonic_remote_time(
+      std::chrono::nanoseconds(message.data.message().monotonic_remote_time()));
+  const realtime_clock::time_point realtime_remote_time(
+      std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+
+  peer->QueueUntil(monotonic_remote_time);
+
+  if (data_queue->empty()) {
+    return Message{
+        .channel_index = message.channel_index,
+        .queue_index = remote_queue_index,
+        .timestamp = monotonic_remote_time,
+        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+  }
+
+  // The algorithm below is constant time with some assumptions.  We need there
+  // to be no missing messages in the data stream.  This also assumes a queue
+  // hasn't wrapped.  That is conservative, but should let us get started.
+  //
+  // TODO(austin): We can break these assumptions pretty easily once we have a
+  // need.
+  CHECK_EQ(
+      data_queue->back().queue_index - data_queue->front().queue_index + 1u,
+      data_queue->size());
+
+  if (remote_queue_index < data_queue->front().queue_index ||
+      remote_queue_index > data_queue->back().queue_index) {
+    return Message{
+        .channel_index = message.channel_index,
+        .queue_index = remote_queue_index,
+        .timestamp = monotonic_remote_time,
+        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+  }
+
+  // Pull the data out and confirm that the timestamps match as expected.
+  Message result = std::move(
+      (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
+  CHECK_EQ(result.timestamp, monotonic_remote_time)
+      << ": Queue index matches, but timestamp doesn't.  Please investigate!";
+  CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
+               result.data.message().realtime_sent_time())),
+           realtime_remote_time)
+      << ": Queue index matches, but timestamp doesn't.  Please investigate!";
+  // Now drop the data off the front.  We have deduplicated timestamps, so we
+  // are done.  And all the data is in order.
+  data_queue->erase(data_queue->begin(),
+                    data_queue->begin() + (1 + remote_queue_index -
+                                           data_queue->front().queue_index));
+  return result;
+}
+
+void TimestampMapper::QueueUntil(monotonic_clock::time_point t) {
+  if (queued_until_ > t) {
+    return;
+  }
+  while (true) {
+    if (!messages_.empty() && messages_.back().timestamp > t) {
+      queued_until_ = std::max(queued_until_, messages_.back().timestamp);
+      return;
+    }
+
+    if (!Queue()) {
+      // Found nothing to add, we are out of data!
+      queued_until_ = monotonic_clock::max_time;
+      return;
+    }
+
+    // Now that it has been added (and cannibalized), forget about it upstream.
+    node_merger_.PopFront();
+  }
+}
+
+bool TimestampMapper::Queue() {
+  Message *m = node_merger_.Front();
+  if (m == nullptr) {
+    return false;
+  }
+  for (NodeData &node_data : nodes_data_) {
+    if (!node_data.any_delivered) continue;
+    if (node_data.channels[m->channel_index].delivered) {
+      // TODO(austin): This copies the data...  Probably not worth stressing
+      // about yet.
+      // TODO(austin): Bound how big this can get.  We tend not to send massive
+      // data, so we can probably ignore this for a bit.
+      node_data.channels[m->channel_index].messages.emplace_back(*m);
+    }
+  }
+
+  messages_.emplace_back(std::move(*m));
+  return true;
+}
+
+std::string TimestampMapper::DebugString() const {
+  std::stringstream ss;
+  ss << "node " << node() << " [\n";
+  for (const Message &message : messages_) {
+    ss << "  " << message << "\n";
+  }
+  ss << "] queued_until " << queued_until_;
+  for (const NodeData &ns : nodes_data_) {
+    if (ns.peer == nullptr) continue;
+    ss << "\nnode " << ns.peer->node() << " remote_data [\n";
+    size_t channel_index = 0;
+    for (const NodeData::ChannelData &channel_data :
+         ns.peer->nodes_data_[node()].channels) {
+      if (channel_data.messages.empty()) {
+        continue;
+      }
+ 
+      ss << "  channel " << channel_index << " [\n";
+      for (const Message &m : channel_data.messages) {
+        ss << "    " << m << "\n";
+      }
+      ss << "  ]\n";
+      ++channel_index;
+    }
+    ss << "] queued_until " << ns.peer->queued_until_;
+  }
+  return ss.str();
+}
+
 SplitMessageReader::SplitMessageReader(
     const std::vector<std::string> &filenames)
     : filenames_(filenames),