Handle a large number of messages being published

writev has a limit for the number of iovec's that can be written at
once.  If this is exceeded, it fails to write data.  This is an
independent check from us wanting to flush if there has been too big a
quantity of data written.  A burst of small messages can trigger this.

Check for this and flush when we get to the limit.

Change-Id: Ie8096f5ae734de9f6716d2ba1846c42f6c80ff3b
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e2da491..9968b80 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1,6 +1,7 @@
 #include "aos/events/logging/logger.h"
 
 #include <fcntl.h>
+#include <limits.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <sys/uio.h>
@@ -44,7 +45,10 @@
   queued_size_ += buffer.size();
   queue_.emplace_back(std::move(buffer));
 
-  if (queued_size_ > static_cast<size_t>(FLAGS_flush_size)) {
+  // Flush if we are at the max number of iovs per writev, or have written
+  // enough data.  Otherwise writev will fail with an invalid argument.
+  if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
+      queue_.size() == IOV_MAX) {
     Flush();
   }
 }
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 1137e66..c627101 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -106,6 +106,47 @@
   reader.Deregister();
 }
 
+// Tests that a large number of messages per second doesn't overwhelm writev.
+TEST_F(LoggerTest, ManyMessages) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string logfile = tmpdir + "/logfile.bfbs";
+  // Remove the log file.
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    DetachedBufferWriter writer(logfile);
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    std::unique_ptr<EventLoop> ping_spammer_event_loop =
+        event_loop_factory_.MakeEventLoop("ping_spammer");
+    aos::Sender<examples::Ping> ping_sender =
+        ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
+
+    aos::TimerHandler *timer_handler =
+        ping_spammer_event_loop->AddTimer([&ping_sender]() {
+          aos::Sender<examples::Ping>::Builder builder =
+              ping_sender.MakeBuilder();
+          examples::Ping::Builder ping_builder =
+              builder.MakeBuilder<examples::Ping>();
+          CHECK(builder.Send(ping_builder.Finish()));
+        });
+
+    // 100 ms / 0.05 ms -> 2000 messages.  Should be enough to crash it.
+    ping_spammer_event_loop->OnRun([&ping_spammer_event_loop, timer_handler]() {
+      timer_handler->Setup(ping_spammer_event_loop->monotonic_now(),
+                           chrono::microseconds(50));
+    });
+
+    Logger logger(&writer, logger_event_loop.get(),
+                  std::chrono::milliseconds(100));
+
+    event_loop_factory_.RunFor(chrono::milliseconds(1000));
+  }
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos