make log_displayer following faster on large files

It can now skip to close to the end of a log file without missing any
struct types. This means following old log files with a new
log_displayer won't work, but that's not very useful anyways...

Change-Id: I8ed78ce0bd0e1d7f28b6e98284490ea9b5a43f1e
diff --git a/aos/linux_code/logging/binary_log_file.cc b/aos/linux_code/logging/binary_log_file.cc
index 1f9815e..b063fae 100644
--- a/aos/linux_code/logging/binary_log_file.cc
+++ b/aos/linux_code/logging/binary_log_file.cc
@@ -35,6 +35,30 @@
   msync(current_, kPageSize, MS_ASYNC | MS_INVALIDATE);
 }
 
+void LogFileAccessor::SkipToLastSeekablePage() {
+  CHECK(definitely_use_mmap());
+
+  struct stat info;
+  if (fstat(fd_, &info) == -1) {
+    PLOG(FATAL, "fstat(%d, %p) failed", fd_, &info);
+  }
+
+  CHECK((info.st_size % kPageSize) == 0);
+  const auto last_readable_page_number = (info.st_size / kPageSize) - 1;
+  const auto last_seekable_page_number =
+      last_readable_page_number / kSeekPages * kSeekPages;
+  const off_t new_offset = last_seekable_page_number * kPageSize;
+  // We don't want to go backwards...
+  if (new_offset > offset_) {
+    Unmap(current_);
+    offset_ = new_offset;
+    MapNextPage();
+  }
+}
+
+// The only way to tell is using fstat, but we don't really want to be making a
+// syscall every single time somebody wants to know the answer, so it gets
+// cached in is_last_page_.
 bool LogFileAccessor::IsLastPage() {
   if (is_last_page_ != Maybe::kUnknown) {
     return is_last_page_ == Maybe::kYes;
@@ -116,6 +140,7 @@
     r = static_cast<LogFileMessageHeader *>(
         static_cast<void *>(&current()[position()]));
     if (wait) {
+      CHECK(definitely_use_mmap());
       if (futex_wait(&r->marker) != 0) continue;
     }
     if (r->marker == 2) {
@@ -194,23 +219,45 @@
 }
 
 LogFileMessageHeader *LogFileWriter::GetWritePosition(size_t message_size) {
-  if (position() + message_size + (kAlignment - (message_size % kAlignment)) +
-      sizeof(aos_futex) > kPageSize) {
-    char *const temp = current();
-    MapNextPage();
-    if (futex_set_value(static_cast<aos_futex *>(static_cast<void *>(
-                    &temp[position()])), 2) == -1) {
-      PLOG(WARNING, "readers will hang because futex_set_value(%p, 2) failed",
-           &temp[position()]);
-    }
-    Unmap(temp);
-  }
+  if (NeedNewPageFor(message_size)) ForceNewPage();
   LogFileMessageHeader *const r = static_cast<LogFileMessageHeader *>(
       static_cast<void *>(&current()[position()]));
   IncrementPosition(message_size);
   return r;
 }
 
+// A number of seekable pages, not the actual file offset, is stored in *cookie.
+bool LogFileWriter::ShouldClearSeekableData(off_t *cookie,
+                                            size_t next_message_size) const {
+  off_t next_message_page = (offset() / kPageSize) - 1;
+  if (NeedNewPageFor(next_message_size)) {
+    ++next_message_page;
+  }
+  const off_t current_seekable_page = next_message_page / kSeekPages;
+  CHECK_LE(*cookie, current_seekable_page);
+  const bool r = *cookie != current_seekable_page;
+  *cookie = current_seekable_page;
+  return r;
+}
+
+bool LogFileWriter::NeedNewPageFor(size_t bytes) const {
+  return position() + bytes + (kAlignment - (bytes % kAlignment)) +
+             sizeof(aos_futex) >
+         kPageSize;
+}
+
+void LogFileWriter::ForceNewPage() {
+  char *const temp = current();
+  MapNextPage();
+  if (futex_set_value(
+          static_cast<aos_futex *>(static_cast<void *>(&temp[position()])),
+          2) == -1) {
+    PLOG(WARNING, "readers will hang because futex_set_value(%p, 2) failed",
+         &temp[position()]);
+  }
+  Unmap(temp);
+}
+
 }  // namespace linux_code
 }  // namespace logging
 }  // namespace aos
diff --git a/aos/linux_code/logging/binary_log_file.h b/aos/linux_code/logging/binary_log_file.h
index 84f8398..bcf9da1 100644
--- a/aos/linux_code/logging/binary_log_file.h
+++ b/aos/linux_code/logging/binary_log_file.h
@@ -94,8 +94,17 @@
   // Asynchronously syncs all open mappings.
   void Sync() const;
 
+  // Returns true iff we currently have the last page in the file mapped.
+  // This is fundamentally a racy question, so the return value may not be
+  // accurate by the time this method returns.
   bool IsLastPage();
 
+  // Skips to the last page which is an even multiple of kSeekPages.
+  // This is fundamentally racy, so it may not actually be on the very last
+  // possible multiple of kSeekPages when it returns, but it should be close.
+  // This will never move backwards.
+  void SkipToLastSeekablePage();
+
   size_t file_offset(const void *msg) {
     return offset() + (static_cast<const char *>(msg) - current());
   }
@@ -109,6 +118,10 @@
   // What to align messages to, copied into an actual constant.
   static const size_t kAlignment = MESSAGE_ALIGNMENT;
 #undef MESSAGE_ALIGNMENT
+  // Pages which are multiples of this from the beginning of a file start with
+  // no saved state (ie struct types). This allows seeking immediately to the
+  // largest currently written interval of this number when following.
+  static const size_t kSeekPages = 256;
 
   char *current() const { return current_; }
   size_t position() const { return position_; }
@@ -169,6 +182,22 @@
 
   // message_size should be the total number of bytes needed for the message.
   LogFileMessageHeader *GetWritePosition(size_t message_size);
+
+  // Returns true exactly once for each unique cookie on each page where cached
+  // data should be cleared.
+  // Call with a non-zero next_message_size to determine if cached data should
+  // be forgotten before writing a next_message_size-sized message.
+  // cookie should be initialized to 0.
+  bool ShouldClearSeekableData(off_t *cookie, size_t next_message_size) const;
+
+  // Forces a move to a new page for the next message.
+  // This is important when there is cacheable data that needs to be re-written
+  // before a message which will spill over onto the next page but the cacheable
+  // message being refreshed is smaller and won't get to a new page by itself.
+  void ForceNewPage();
+
+ private:
+  bool NeedNewPageFor(size_t bytes) const;
 };
 
 }  // namespace linux_code
diff --git a/aos/linux_code/logging/binary_log_writer.cc b/aos/linux_code/logging/binary_log_writer.cc
index cc3cf66..9943e7b 100644
--- a/aos/linux_code/logging/binary_log_writer.cc
+++ b/aos/linux_code/logging/binary_log_writer.cc
@@ -26,14 +26,14 @@
 namespace linux_code {
 namespace {
 
-void CheckTypeWritten(uint32_t type_id, LogFileWriter &writer) {
-  static ::std::unordered_set<uint32_t> written_type_ids;
-  if (written_type_ids.count(type_id) > 0) return;
+void CheckTypeWritten(uint32_t type_id, LogFileWriter *writer,
+                      ::std::unordered_set<uint32_t> *written_type_ids) {
+  if (written_type_ids->count(type_id) > 0) return;
   if (MessageType::IsPrimitive(type_id)) return;
 
   const MessageType &type = type_cache::Get(type_id);
   for (int i = 0; i < type.number_fields; ++i) {
-    CheckTypeWritten(type.fields[i]->type, writer);
+    CheckTypeWritten(type.fields[i]->type, writer, written_type_ids);
   }
 
   char buffer[1024];
@@ -44,7 +44,7 @@
     return;
   }
   LogFileMessageHeader *const output =
-      writer.GetWritePosition(sizeof(LogFileMessageHeader) + size);
+      writer->GetWritePosition(sizeof(LogFileMessageHeader) + size);
 
   output->time_sec = output->time_nsec = 0;
   output->source = getpid();
@@ -58,7 +58,7 @@
   output->type = LogFileMessageHeader::MessageType::kStructType;
   futex_set(&output->marker);
 
-  written_type_ids.insert(type_id);
+  written_type_ids->insert(type_id);
 }
 
 void AllocateLogName(char **filename, const char *directory) {
@@ -198,6 +198,9 @@
   }
   LogFileWriter writer(fd);
 
+  ::std::unordered_set<uint32_t> written_type_ids;
+  off_t clear_type_ids_cookie = 0;
+
   while (true) {
     const LogMessage *const msg = ReadNext();
     if (msg == NULL) continue;
@@ -208,7 +211,12 @@
     if (msg->type == LogMessage::Type::kStruct) {
       output_length += sizeof(msg->structure.type_id) + sizeof(uint32_t) +
                        msg->structure.string_length;
-      CheckTypeWritten(msg->structure.type_id, writer);
+      if (writer.ShouldClearSeekableData(&clear_type_ids_cookie,
+                                         output_length)) {
+        writer.ForceNewPage();
+        written_type_ids.clear();
+      }
+      CheckTypeWritten(msg->structure.type_id, &writer, &written_type_ids);
     } else if (msg->type == LogMessage::Type::kMatrix) {
       output_length +=
           sizeof(msg->matrix.type) + sizeof(uint32_t) + sizeof(uint16_t) +
diff --git a/aos/linux_code/logging/log_displayer.cc b/aos/linux_code/logging/log_displayer.cc
index 22ea0bd..cdc8f46 100644
--- a/aos/linux_code/logging/log_displayer.cc
+++ b/aos/linux_code/logging/log_displayer.cc
@@ -233,16 +233,20 @@
     }
   }
 
-  fprintf(stderr, "displaying down to level %s from file '%s'\n",
-      ::aos::logging::log_str(filter_level), filename);
-
   int fd;
   if (strcmp(filename, "-") == 0) {
+    if (skip_to_end) {
+      fputs("Can't skip to end of stdin!\n", stderr);
+      return EXIT_FAILURE;
+    }
     fd = STDIN_FILENO;
   } else {
     fd = open(filename, O_RDONLY);
   }
 
+  fprintf(stderr, "displaying down to level %s from file '%s'\n",
+      ::aos::logging::log_str(filter_level), filename);
+
   if (fd == -1) {
     PLOG(FATAL, "couldn't open file '%s' for reading", filename);
   }
@@ -250,6 +254,7 @@
 
   if (skip_to_end) {
     fputs("skipping old logs...\n", stderr);
+    reader.SkipToLastSeekablePage();
   }
 
   const LogFileMessageHeader *msg;