Split out MakeSharedSpan

This lets us test a Send method which uses it easier.

Change-Id: I1f0f5ca0f520f681df6ea071ba4f3559e86b01a0
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index c29d820..2d932cc 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -34,6 +34,25 @@
 }
 }  // namespace
 
+std::pair<SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(size_t size) {
+  AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
+      malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
+
+  absl::Span<uint8_t> mutable_span(
+      reinterpret_cast<uint8_t *>(RoundChannelData(span->data(), size)), size);
+  // Use the placement new operator to construct an actual absl::Span in place.
+  new (span) AlignedOwningSpan(mutable_span);
+
+  return std::make_pair(
+      SharedSpan(std::shared_ptr<AlignedOwningSpan>(span,
+                                                    [](AlignedOwningSpan *s) {
+                                                      s->~AlignedOwningSpan();
+                                                      free(s);
+                                                    }),
+                 &span->span),
+      mutable_span);
+}
+
 std::ostream &operator<<(std::ostream &os, const RawSender::Error err) {
   os << ErrorToString(err);
   return os;
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 23250e1..8825464 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -133,6 +133,25 @@
   Ftrace ftrace_;
 };
 
+using SharedSpan = std::shared_ptr<const absl::Span<const uint8_t>>;
+
+// Holds storage for a span object and the data referenced by that span for
+// compatibility with SharedSpan users. If constructed with MakeSharedSpan, span
+// points to only the aligned segment of the entire data.
+struct AlignedOwningSpan {
+  AlignedOwningSpan(absl::Span<const uint8_t> new_span) : span(new_span) {}
+
+  AlignedOwningSpan(const AlignedOwningSpan &) = delete;
+  AlignedOwningSpan &operator=(const AlignedOwningSpan &) = delete;
+  absl::Span<const uint8_t> span;
+  char *data() { return reinterpret_cast<char *>(this + 1); }
+};
+
+// Constructs a span which owns its data through a shared_ptr. The owning span
+// points to a const view of the data; also returns a temporary mutable span
+// which is only valid while the const shared span is kept alive.
+std::pair<SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(size_t size);
+
 // Raw version of sender.  Sends a block of data.  This is used for reflection
 // and as a building block to implement typed senders.
 class RawSender {
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 07bd6d4..d3d21fd 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -2389,6 +2389,68 @@
   }
 }
 
+// Tests that the RawSender::Send(SharedSpan) overload works.
+TEST_P(AbstractEventLoopTest, SharedSenderTimingReport) {
+  gflags::FlagSaver flag_saver;
+  FLAGS_timing_report_ms = 1000;
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  const FlatbufferDetachedBuffer<TestMessage> kMessage =
+      JsonToFlatbuffer<TestMessage>("{}");
+
+  std::unique_ptr<aos::RawSender> sender =
+      loop2->MakeRawSender(configuration::GetChannel(
+          loop2->configuration(), "/test", "aos.TestMessage", "", nullptr));
+
+  Fetcher<timing::Report> report_fetcher =
+      loop1->MakeFetcher<timing::Report>("/aos");
+  EXPECT_FALSE(report_fetcher.Fetch());
+
+  loop2->OnRun([&]() {
+    for (int ii = 0; ii < TestChannelQueueSize(loop2.get()); ++ii) {
+      auto shared_span = MakeSharedSpan(kMessage.span().size());
+      memcpy(shared_span.second.data(), kMessage.span().data(),
+             kMessage.span().size());
+      EXPECT_EQ(sender->Send(std::move(shared_span.first)),
+                RawSender::Error::kOk);
+    }
+    auto shared_span = MakeSharedSpan(kMessage.span().size());
+    memcpy(shared_span.second.data(), kMessage.span().data(),
+           kMessage.span().size());
+    EXPECT_EQ(sender->Send(std::move(shared_span.first)),
+              RawSender::Error::kMessagesSentTooFast);
+  });
+  // Quit after 1 timing report, mid way through the next cycle.
+  EndEventLoop(loop2.get(), chrono::milliseconds(1500));
+
+  Run();
+
+  if (do_timing_reports() == DoTimingReports::kYes) {
+    // Check that the sent too fast actually got recorded by the timing report.
+    FlatbufferDetachedBuffer<timing::Report> primary_report =
+        FlatbufferDetachedBuffer<timing::Report>::Empty();
+    while (report_fetcher.FetchNext()) {
+      if (report_fetcher->name()->string_view() == "primary") {
+        primary_report = CopyFlatBuffer(report_fetcher.get());
+      }
+    }
+
+    EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+    ASSERT_NE(primary_report.message().senders(), nullptr);
+    EXPECT_EQ(primary_report.message().senders()->size(), 3);
+    EXPECT_EQ(
+        primary_report.message()
+            .senders()
+            ->Get(0)
+            ->error_counts()
+            ->Get(static_cast<size_t>(timing::SendError::MESSAGE_SENT_TOO_FAST))
+            ->count(),
+        1);
+  }
+}
+
 // Tests that senders count correctly in the timing report.
 TEST_P(AbstractEventLoopTest, WatcherTimingReport) {
   FLAGS_timing_report_ms = 1000;
@@ -2619,9 +2681,10 @@
           loop3->configuration(), "/test", "aos.TestMessage", "", nullptr));
 
   loop2->OnRun([&]() {
-    EXPECT_EQ(sender->Send(std::make_shared<absl::Span<const uint8_t>>(
-                  kMessage.span().data(), kMessage.span().size())),
-              RawSender::Error::kOk);
+    auto shared_span = MakeSharedSpan(kMessage.span().size());
+    memcpy(shared_span.second.data(), kMessage.span().data(),
+           kMessage.span().size());
+    sender->CheckOk(sender->Send(std::move(shared_span.first)));
   });
 
   bool happened = false;
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 0af69c7..0cb96c6 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -1978,8 +1978,7 @@
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
   const auto err = sender->Send(
-      RawSender::SharedSpan(timestamped_message.data,
-                            &timestamped_message.data->span),
+      SharedSpan(timestamped_message.data, &timestamped_message.data->span),
       timestamped_message.monotonic_remote_time.time,
       timestamped_message.realtime_remote_time, remote_queue_index,
       (channel_source_state_[timestamped_message.channel_index] != nullptr
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index c679b21..c021a84 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -49,41 +49,6 @@
   const bool prior_;
 };
 
-// Holds storage for a span object and the data referenced by that span for
-// compatibility with RawSender::SharedSpan users. If constructed with
-// MakeSharedSpan, span points to only the aligned segment of the entire data.
-struct AlignedOwningSpan {
-  AlignedOwningSpan(const AlignedOwningSpan &) = delete;
-  AlignedOwningSpan &operator=(const AlignedOwningSpan &) = delete;
-  absl::Span<const uint8_t> span;
-  char data[];
-};
-
-// Constructs a span which owns its data through a shared_ptr. The owning span
-// points to a const view of the data; also returns a temporary mutable span
-// which is only valid while the const shared span is kept alive.
-std::pair<RawSender::SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(
-    size_t size) {
-  AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
-      malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
-
-  absl::Span<uint8_t> mutable_span(
-      reinterpret_cast<uint8_t *>(RoundChannelData(&span->data[0], size)),
-      size);
-  // Use the placement new operator to construct an actual absl::Span in place.
-  new (&span->span) absl::Span(mutable_span);
-
-  return std::make_pair(
-      RawSender::SharedSpan(
-          std::shared_ptr<AlignedOwningSpan>(span,
-                                             [](AlignedOwningSpan *s) {
-                                               s->~AlignedOwningSpan();
-                                               free(s);
-                                             }),
-          &span->span),
-      mutable_span);
-}
-
 // Container for both a message, and the context for it for simulation.  This
 // makes tracking the timestamps associated with the data easy.
 struct SimulatedMessage final {
@@ -93,8 +58,8 @@
 
   // Creates a SimulatedMessage with size bytes of storage.
   // This is a shared_ptr so we don't have to implement refcounting or copying.
-  static std::shared_ptr<SimulatedMessage> Make(
-      SimulatedChannel *channel, const RawSender::SharedSpan data);
+  static std::shared_ptr<SimulatedMessage> Make(SimulatedChannel *channel,
+                                                const SharedSpan data);
 
   // Context for the data.
   Context context;
@@ -103,7 +68,7 @@
 
   // Owning span to this message's data. Depending on the sender may either
   // represent the data of just the flatbuffer, or max channel size.
-  RawSender::SharedSpan data;
+  SharedSpan data;
 
   // Mutable view of above data. If empty, this message is not mutable.
   absl::Span<uint8_t> mutable_data;
@@ -336,7 +301,7 @@
 namespace {
 
 std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
-    SimulatedChannel *channel, RawSender::SharedSpan data) {
+    SimulatedChannel *channel, SharedSpan data) {
   // The allocations in here are due to infrastructure and don't count in the no
   // mallocs in RT code.
   ScopedNotRealtime nrt;
@@ -1165,8 +1130,7 @@
 }
 
 RawSender::Error SimulatedSender::DoSend(
-    const RawSender::SharedSpan data,
-    monotonic_clock::time_point monotonic_remote_time,
+    const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
     realtime_clock::time_point realtime_remote_time,
     uint32_t remote_queue_index, const UUID &source_boot_uuid) {
   CHECK_LE(data->size(), this->size())