Merge "Respect WasReset() indicator in the drivetrain"
diff --git a/aos/BUILD b/aos/BUILD
index 04c0548..04d77f3 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -274,6 +274,7 @@
     ],
     visibility = ["//visibility:public"],
     deps = [
+        ":thread_local",
         "@com_github_google_glog//:glog",
     ],
 )
@@ -543,3 +544,15 @@
     ],
     visibility = ["//visibility:public"],
 )
+
+cc_test(
+    name = "realtime_test",
+    srcs = [
+        "realtime_test.cc",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":realtime",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/config.bzl b/aos/config.bzl
index b87fff2..a11efd5 100644
--- a/aos/config.bzl
+++ b/aos/config.bzl
@@ -18,6 +18,7 @@
 def _aos_config_impl(ctx):
     config = ctx.actions.declare_file(ctx.label.name + ".json")
     stripped_config = ctx.actions.declare_file(ctx.label.name + ".stripped.json")
+    binary_config = ctx.actions.declare_file(ctx.label.name + ".bfbs")
 
     flatbuffers_depset = depset(
         ctx.files.flatbuffers,
@@ -31,21 +32,22 @@
 
     all_files = flatbuffers_depset.to_list() + src_depset.to_list()
     ctx.actions.run(
-        outputs = [config, stripped_config],
+        outputs = [config, stripped_config, binary_config],
         inputs = all_files,
         arguments = [
             config.path,
             stripped_config.path,
+            binary_config.path,
             (ctx.label.workspace_root or ".") + "/" + ctx.files.src[0].short_path,
             ctx.bin_dir.path,
         ] + [f.path for f in flatbuffers_depset.to_list()],
         progress_message = "Flattening config",
         executable = ctx.executable._config_flattener,
     )
-    runfiles = ctx.runfiles(files = [config, stripped_config])
+    runfiles = ctx.runfiles(files = [config, stripped_config, binary_config])
     return [
         DefaultInfo(
-            files = depset([config, stripped_config]),
+            files = depset([config, stripped_config, binary_config]),
             runfiles = runfiles,
         ),
         AosConfigInfo(
diff --git a/aos/config_flattener.cc b/aos/config_flattener.cc
index 83959a1..e4bd192 100644
--- a/aos/config_flattener.cc
+++ b/aos/config_flattener.cc
@@ -11,15 +11,16 @@
 namespace aos {
 
 int Main(int argc, char **argv) {
-  CHECK_GE(argc, 5) << ": Too few arguments";
+  CHECK_GE(argc, 6) << ": Too few arguments";
 
   const char *full_output = argv[1];
   const char *stripped_output = argv[2];
-  const char *config_path = argv[3];
+  const char *binary_output = argv[3];
+  const char *config_path = argv[4];
   // In order to support not only importing things by absolute path, but also
   // importing the outputs of genrules (rather than just manually written
   // files), we need to tell ReadConfig where the generated files directory is.
-  const char *bazel_outs_directory = argv[4];
+  const char *bazel_outs_directory = argv[5];
 
   VLOG(1) << "Reading " << config_path;
   FlatbufferDetachedBuffer<Configuration> config =
@@ -36,21 +37,24 @@
 
   std::vector<aos::FlatbufferString<reflection::Schema>> schemas;
 
-  for (int i = 5; i < argc; ++i) {
+  for (int i = 6; i < argc; ++i) {
     schemas.emplace_back(util::ReadFileToStringOrDie(argv[i]));
   }
 
-  const std::string merged_config = FlatbufferToJson(
-      &configuration::MergeConfiguration(config, schemas).message(),
-      {.multi_line = true});
+  aos::FlatbufferDetachedBuffer<Configuration> merged_config =
+      configuration::MergeConfiguration(config, schemas);
+
+  const std::string merged_config_json =
+      FlatbufferToJson(&merged_config.message(), {.multi_line = true});
 
   // TODO(austin): Figure out how to squash the schemas onto 1 line so it is
   // easier to read?
-  VLOG(1) << "Flattened config is " << merged_config;
-  util::WriteStringToFileOrDie(full_output, merged_config);
+  VLOG(1) << "Flattened config is " << merged_config_json;
+  util::WriteStringToFileOrDie(full_output, merged_config_json);
   util::WriteStringToFileOrDie(
       stripped_output,
       FlatbufferToJson(&config.message(), {.multi_line = true}));
+  aos::WriteFlatbufferToFile(binary_output, merged_config);
   return 0;
 }
 
diff --git a/aos/configuration.cc b/aos/configuration.cc
index f68c543..adba7dc 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -23,6 +23,44 @@
 #include "glog/logging.h"
 
 namespace aos {
+namespace {
+bool EndsWith(std::string_view str, std::string_view end) {
+  if (str.size() < end.size()) {
+    return false;
+  }
+  if (str.substr(str.size() - end.size(), end.size()) != end) {
+    return false;
+  }
+  return true;
+}
+
+std::string MaybeReplaceExtension(std::string_view filename,
+                                  std::string_view extension,
+                                  std::string_view replacement) {
+  if (!EndsWith(filename, extension)) {
+    return std::string(filename);
+  }
+  filename.remove_suffix(extension.size());
+  return absl::StrCat(filename, replacement);
+}
+
+FlatbufferDetachedBuffer<Configuration> ReadConfigFile(std::string_view path,
+                                                       bool binary) {
+  if (binary) {
+    FlatbufferVector<Configuration> config =
+        FileToFlatbuffer<Configuration>(path);
+    return CopySpanAsDetachedBuffer(config.span());
+  }
+
+  flatbuffers::DetachedBuffer buffer = JsonToFlatbuffer(
+      util::ReadFileToStringOrDie(path), ConfigurationTypeTable());
+
+  CHECK_GT(buffer.size(), 0u) << ": Failed to parse JSON file";
+
+  return FlatbufferDetachedBuffer<Configuration>(std::move(buffer));
+}
+
+}  // namespace
 
 // Define the compare and equal operators for Channel and Application so we can
 // insert them in the btree below.
@@ -95,19 +133,30 @@
 FlatbufferDetachedBuffer<Configuration> ReadConfig(
     const std::string_view path, absl::btree_set<std::string> *visited_paths,
     const std::vector<std::string_view> &extra_import_paths) {
+  std::string binary_path = MaybeReplaceExtension(path, ".json", ".bfbs");
+  bool binary_path_exists = util::PathExists(binary_path);
   std::string raw_path(path);
-  if (!util::PathExists(path)) {
-    const bool path_is_absolute = path.size() > 0 && path[0] == '/';
+  // For each .json file, look and see if we can find a .bfbs file next to it
+  // with the same base name.  If we can, assume it is the same and use it
+  // instead.  It is much faster to load .bfbs files than .json files.
+  if (!binary_path_exists && !util::PathExists(raw_path)) {
+    const bool path_is_absolute = raw_path.size() > 0 && raw_path[0] == '/';
     if (path_is_absolute) {
       CHECK(extra_import_paths.empty())
           << "Can't specify extra import paths if attempting to read a config "
              "file from an absolute path (path is "
-          << path << ").";
+          << raw_path << ").";
     }
 
     bool found_path = false;
     for (const auto &import_path : extra_import_paths) {
       raw_path = std::string(import_path) + "/" + std::string(path);
+      binary_path = MaybeReplaceExtension(raw_path, ".json", ".bfbs");
+      binary_path_exists = util::PathExists(binary_path);
+      if (binary_path_exists) {
+        found_path = true;
+        break;
+      }
       if (util::PathExists(raw_path)) {
         found_path = true;
         break;
@@ -115,12 +164,9 @@
     }
     CHECK(found_path) << ": Failed to find file " << path << ".";
   }
-  flatbuffers::DetachedBuffer buffer = JsonToFlatbuffer(
-      util::ReadFileToStringOrDie(raw_path), ConfigurationTypeTable());
 
-  CHECK_GT(buffer.size(), 0u) << ": Failed to parse JSON file";
-
-  FlatbufferDetachedBuffer<Configuration> config(std::move(buffer));
+  FlatbufferDetachedBuffer<Configuration> config = ReadConfigFile(
+      binary_path_exists ? binary_path : raw_path, binary_path_exists);
 
   // Depth first.  Take the following example:
   //
@@ -153,8 +199,10 @@
   // config.  That means that it needs to be merged into the imported configs,
   // not the other way around.
 
-  const std::string absolute_path = AbsolutePath(raw_path);
-  // Track that we have seen this file before recursing.
+  const std::string absolute_path =
+      AbsolutePath(binary_path_exists ? binary_path : raw_path);
+  // Track that we have seen this file before recursing.  Track the path we
+  // actually loaded (which should be consistent if imported twice).
   if (!visited_paths->insert(absolute_path).second) {
     for (const auto &visited_path : *visited_paths) {
       LOG(INFO) << "Already visited: " << visited_path;
@@ -272,6 +320,102 @@
   }
 }
 
+void ValidateConfiguration(const Flatbuffer<Configuration> &config) {
+  // No imports should be left.
+  CHECK(!config.message().has_imports());
+
+  // Check that if there is a node list, all the source nodes are filled out and
+  // valid, and all the destination nodes are valid (and not the source).  This
+  // is a basic consistency check.
+  if (config.message().has_channels()) {
+    const Channel *last_channel = nullptr;
+    for (const Channel *c : *config.message().channels()) {
+      CHECK(c->has_name());
+      CHECK(c->has_type());
+      if (c->name()->string_view().back() == '/') {
+        LOG(FATAL) << "Channel names can't end with '/'";
+      }
+      if (c->name()->string_view().find("//") != std::string_view::npos) {
+        LOG(FATAL) << ": Invalid channel name " << c->name()->string_view()
+                   << ", can't use //.";
+      }
+      for (const char data : c->name()->string_view()) {
+        if (data >= '0' && data <= '9') {
+          continue;
+        }
+        if (data >= 'a' && data <= 'z') {
+          continue;
+        }
+        if (data >= 'A' && data <= 'Z') {
+          continue;
+        }
+        if (data == '-' || data == '_' || data == '/') {
+          continue;
+        }
+        LOG(FATAL) << "Invalid channel name " << c->name()->string_view()
+                   << ", can only use [-a-zA-Z0-9_/]";
+      }
+
+      // Make sure everything is sorted while we are here...  If this fails,
+      // there will be a bunch of weird errors.
+      if (last_channel != nullptr) {
+        CHECK(CompareChannels(
+            last_channel,
+            std::make_pair(c->name()->string_view(), c->type()->string_view())))
+            << ": Channels not sorted!";
+      }
+      last_channel = c;
+    }
+  }
+
+  if (config.message().has_nodes() && config.message().has_channels()) {
+    for (const Channel *c : *config.message().channels()) {
+      CHECK(c->has_source_node()) << ": Channel " << FlatbufferToJson(c)
+                                  << " is missing \"source_node\"";
+      CHECK(GetNode(&config.message(), c->source_node()->string_view()) !=
+            nullptr)
+          << ": Channel " << FlatbufferToJson(c)
+          << " has an unknown \"source_node\"";
+
+      if (c->has_destination_nodes()) {
+        for (const Connection *connection : *c->destination_nodes()) {
+          CHECK(connection->has_name());
+          CHECK(GetNode(&config.message(), connection->name()->string_view()) !=
+                nullptr)
+              << ": Channel " << FlatbufferToJson(c)
+              << " has an unknown \"destination_nodes\" "
+              << connection->name()->string_view();
+
+          switch (connection->timestamp_logger()) {
+            case LoggerConfig::LOCAL_LOGGER:
+            case LoggerConfig::NOT_LOGGED:
+              CHECK(!connection->has_timestamp_logger_nodes());
+              break;
+            case LoggerConfig::REMOTE_LOGGER:
+            case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
+              CHECK(connection->has_timestamp_logger_nodes());
+              CHECK_GT(connection->timestamp_logger_nodes()->size(), 0u);
+              for (const flatbuffers::String *timestamp_logger_node :
+                   *connection->timestamp_logger_nodes()) {
+                CHECK(GetNode(&config.message(),
+                              timestamp_logger_node->string_view()) != nullptr)
+                    << ": Channel " << FlatbufferToJson(c)
+                    << " has an unknown \"timestamp_logger_node\""
+                    << connection->name()->string_view();
+              }
+              break;
+          }
+
+          CHECK_NE(connection->name()->string_view(),
+                   c->source_node()->string_view())
+              << ": Channel " << FlatbufferToJson(c)
+              << " is forwarding data to itself";
+        }
+      }
+    }
+  }
+}
+
 }  // namespace
 
 FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
@@ -403,83 +547,7 @@
   FlatbufferDetachedBuffer<Configuration> result =
       MergeFlatBuffers(modified_config, auto_merge_config);
 
-  // Check that if there is a node list, all the source nodes are filled out and
-  // valid, and all the destination nodes are valid (and not the source).  This
-  // is a basic consistency check.
-  if (result.message().has_channels()) {
-    for (const Channel *c : *result.message().channels()) {
-      if (c->name()->string_view().back() == '/') {
-        LOG(FATAL) << "Channel names can't end with '/'";
-      }
-      if(c->name()->string_view().find("//")!= std::string_view::npos) {
-        LOG(FATAL) << ": Invalid channel name " << c->name()->string_view()
-                   << ", can't use //.";
-      }
-      for (const char data : c->name()->string_view()) {
-        if (data >= '0' && data <= '9') {
-          continue;
-        }
-        if (data >= 'a' && data <= 'z') {
-          continue;
-        }
-        if (data >= 'A' && data <= 'Z') {
-          continue;
-        }
-        if (data == '-' || data == '_' || data == '/') {
-          continue;
-        }
-        LOG(FATAL) << "Invalid channel name " << c->name()->string_view()
-                   << ", can only use [-a-zA-Z0-9_/]";
-      }
-    }
-  }
-
-  if (result.message().has_nodes() && result.message().has_channels()) {
-    for (const Channel *c : *result.message().channels()) {
-      CHECK(c->has_source_node()) << ": Channel " << FlatbufferToJson(c)
-                                  << " is missing \"source_node\"";
-      CHECK(GetNode(&result.message(), c->source_node()->string_view()) !=
-            nullptr)
-          << ": Channel " << FlatbufferToJson(c)
-          << " has an unknown \"source_node\"";
-
-      if (c->has_destination_nodes()) {
-        for (const Connection *connection : *c->destination_nodes()) {
-          CHECK(connection->has_name());
-          CHECK(GetNode(&result.message(), connection->name()->string_view()) !=
-                nullptr)
-              << ": Channel " << FlatbufferToJson(c)
-              << " has an unknown \"destination_nodes\" "
-              << connection->name()->string_view();
-
-          switch (connection->timestamp_logger()) {
-            case LoggerConfig::LOCAL_LOGGER:
-            case LoggerConfig::NOT_LOGGED:
-              CHECK(!connection->has_timestamp_logger_nodes());
-              break;
-            case LoggerConfig::REMOTE_LOGGER:
-            case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
-              CHECK(connection->has_timestamp_logger_nodes());
-              CHECK_GT(connection->timestamp_logger_nodes()->size(), 0u);
-              for (const flatbuffers::String *timestamp_logger_node :
-                   *connection->timestamp_logger_nodes()) {
-                CHECK(GetNode(&result.message(),
-                              timestamp_logger_node->string_view()) != nullptr)
-                    << ": Channel " << FlatbufferToJson(c)
-                    << " has an unknown \"timestamp_logger_node\""
-                    << connection->name()->string_view();
-              }
-              break;
-          }
-
-          CHECK_NE(connection->name()->string_view(),
-                   c->source_node()->string_view())
-              << ": Channel " << FlatbufferToJson(c)
-              << " is forwarding data to itself";
-        }
-      }
-    }
-  }
+  ValidateConfiguration(result);
 
   return result;
 }
@@ -489,7 +557,17 @@
     const std::vector<std::string_view> &import_paths) {
   // We only want to read a file once.  So track the visited files in a set.
   absl::btree_set<std::string> visited_paths;
-  return MergeConfiguration(ReadConfig(path, &visited_paths, import_paths));
+  FlatbufferDetachedBuffer<Configuration> read_config =
+      ReadConfig(path, &visited_paths, import_paths);
+
+  // If we only read one file, and it had a .bfbs extension, it has to be a
+  // fully formatted config.  Do a quick verification and return it.
+  if (visited_paths.size() == 1 && EndsWith(*visited_paths.begin(), ".bfbs")) {
+    ValidateConfiguration(read_config);
+    return read_config;
+  }
+
+  return MergeConfiguration(read_config);
 }
 
 FlatbufferDetachedBuffer<Configuration> MergeWithConfig(
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index e8a8050..cdf21b8 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -153,6 +153,10 @@
   //
   // Don't specify a hostname in multiple nodes in the same configuration.
   hostnames:[string] (id: 3);
+
+  // An arbitrary list of strings representing properties of each node.  These
+  // can be used to store information about roles.
+  tags:[string] (id: 4);
 }
 
 // Overall configuration datastructure for the pubsub.
diff --git a/aos/configuration.h b/aos/configuration.h
index f1bcece..5579334 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -8,7 +8,7 @@
 
 #include <string_view>
 
-#include "aos/configuration_generated.h"
+#include "aos/configuration_generated.h"  // IWYU pragma: export
 #include "aos/flatbuffers.h"
 
 namespace aos {
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 59968c3..7c1abd0 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -260,6 +260,7 @@
     deps = [
         ":event_loop",
         ":test_message_fbs",
+        "//aos:realtime",
         "//aos/testing:googletest",
     ],
 )
@@ -308,6 +309,7 @@
         ":aos_logging",
         ":event_loop",
         ":simple_channel",
+        "//aos:realtime",
         "//aos/events/logging:logger_fbs",
         "//aos/ipc_lib:index",
         "//aos/network:message_bridge_client_status",
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 6ffb0fe..dc458e7 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -753,6 +753,6 @@
 
 }  // namespace aos
 
-#include "aos/events/event_loop_tmpl.h"
+#include "aos/events/event_loop_tmpl.h"  // IWYU pragma: export
 
 #endif  // AOS_EVENTS_EVENT_LOOP_H
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 248dcd3..d92927b 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -5,6 +5,7 @@
 #include <unordered_set>
 
 #include "aos/flatbuffer_merge.h"
+#include "aos/realtime.h"
 #include "glog/logging.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
@@ -1949,6 +1950,120 @@
   aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
 }
 
+// Tests that a non-realtime event loop timer is marked non-realtime.
+TEST_P(AbstractEventLoopTest, NonRealtimeEventLoopTimer) {
+  auto loop1 = MakePrimary();
+
+  // Add a timer to actually quit.
+  auto test_timer = loop1->AddTimer([this]() {
+    CheckNotRealtime();
+    this->Exit();
+  });
+
+  loop1->OnRun([&test_timer, &loop1]() {
+    CheckNotRealtime();
+    test_timer->Setup(loop1->monotonic_now(), ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+}
+
+// Tests that a realtime event loop timer is marked realtime.
+TEST_P(AbstractEventLoopTest, RealtimeEventLoopTimer) {
+  auto loop1 = MakePrimary();
+
+  loop1->SetRuntimeRealtimePriority(1);
+
+  // Add a timer to actually quit.
+  auto test_timer = loop1->AddTimer([this]() {
+    CheckRealtime();
+    this->Exit();
+  });
+
+  loop1->OnRun([&test_timer, &loop1]() {
+    CheckRealtime();
+    test_timer->Setup(loop1->monotonic_now(), ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+}
+
+// Tests that a non-realtime event loop phased loop is marked non-realtime.
+TEST_P(AbstractEventLoopTest, NonRealtimeEventLoopPhasedLoop) {
+  auto loop1 = MakePrimary();
+
+  // Add a timer to actually quit.
+  loop1->AddPhasedLoop(
+      [this](int) {
+        CheckNotRealtime();
+        this->Exit();
+      },
+      chrono::seconds(1), chrono::seconds(0));
+
+  Run();
+}
+
+// Tests that a realtime event loop phased loop is marked realtime.
+TEST_P(AbstractEventLoopTest, RealtimeEventLoopPhasedLoop) {
+  auto loop1 = MakePrimary();
+
+  loop1->SetRuntimeRealtimePriority(1);
+
+  // Add a timer to actually quit.
+  loop1->AddPhasedLoop(
+      [this](int) {
+        CheckRealtime();
+        this->Exit();
+      },
+      chrono::seconds(1), chrono::seconds(0));
+
+  Run();
+}
+
+// Tests that a non-realtime event loop watcher is marked non-realtime.
+TEST_P(AbstractEventLoopTest, NonRealtimeEventLoopWatcher) {
+  auto loop1 = MakePrimary();
+  auto loop2 = Make();
+
+  aos::Sender<TestMessage> sender = loop2->MakeSender<TestMessage>("/test");
+
+  loop1->OnRun([&]() {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  });
+
+  loop1->MakeWatcher("/test", [&](const TestMessage &) {
+    CheckNotRealtime();
+    this->Exit();
+  });
+
+  Run();
+}
+
+// Tests that a realtime event loop watcher is marked realtime.
+TEST_P(AbstractEventLoopTest, RealtimeEventLoopWatcher) {
+  auto loop1 = MakePrimary();
+  auto loop2 = Make();
+
+  loop1->SetRuntimeRealtimePriority(1);
+
+  aos::Sender<TestMessage> sender = loop2->MakeSender<TestMessage>("/test");
+
+  loop1->OnRun([&]() {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  });
+
+  loop1->MakeWatcher("/test", [&](const TestMessage &) {
+    CheckRealtime();
+    this->Exit();
+  });
+
+  Run();
+}
+
 // Tests that watchers fail when created on the wrong node.
 TEST_P(AbstractEventLoopDeathTest, NodeWatcher) {
   EnableNodes("them");
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index d829edd..b7f3b5a 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -14,14 +14,17 @@
 cc_library(
     name = "logfile_utils",
     srcs = [
+        "logfile_sorting.cc",
         "logfile_utils.cc",
     ],
     hdrs = [
+        "logfile_sorting.h",
         "logfile_utils.h",
     ],
     visibility = ["//visibility:public"],
     deps = [
         ":buffer_encoder",
+        ":uuid",
         ":logger_fbs",
         "//aos:configuration",
         "//aos:flatbuffer_merge",
@@ -300,6 +303,7 @@
         "//aos/events:pong_lib",
         "//aos/events:simulated_event_loop",
         "//aos/testing:googletest",
+        "//aos/testing:tmpdir",
     ],
 )
 
@@ -317,3 +321,20 @@
         "//aos/testing:googletest",
     ],
 )
+
+cc_test(
+    name = "logfile_utils_test",
+    srcs = ["logfile_utils_test.cc"],
+    deps = [
+        ":logfile_utils",
+        ":test_message_fbs",
+        "//aos/testing:googletest",
+        "//aos/testing:tmpdir",
+    ],
+)
+
+flatbuffer_cc_library(
+    name = "test_message_fbs",
+    srcs = ["test_message.fbs"],
+    gen_reflections = 1,
+)
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 3067aea..411e666 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -36,7 +36,8 @@
 }
 
 DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
-  CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
+  CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
+      << ": " << configuration::CleanedChannelToString(channel);
   return data_writer_.get();
 }
 
@@ -244,7 +245,7 @@
 void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
                                                      DataWriter *data_writer) {
   std::string filename =
-      absl::StrCat("_timestamps", channel->name()->string_view(), "/",
+      absl::StrCat("timestamps", channel->name()->string_view(), "/",
                    channel->type()->string_view(), ".part",
                    data_writer->part_number, ".bfbs", extension_);
   CreateBufferWriter(filename, &data_writer->writer);
@@ -253,7 +254,7 @@
 void MultiNodeLogNamer::OpenWriter(const Channel *channel,
                                    DataWriter *data_writer) {
   const std::string filename = absl::StrCat(
-      "_", CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
+      CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
       channel->name()->string_view(), "/", channel->type()->string_view(),
       ".part", data_writer->part_number, ".bfbs", extension_);
   CreateBufferWriter(filename, &data_writer->writer);
@@ -262,9 +263,9 @@
 void MultiNodeLogNamer::OpenDataWriter() {
   std::string name;
   if (node() != nullptr) {
-    name = absl::StrCat(name, "_", node()->name()->string_view());
+    name = absl::StrCat(name, node()->name()->string_view(), "_");
   }
-  absl::StrAppend(&name, "_data.part", data_writer_.part_number, ".bfbs",
+  absl::StrAppend(&name, "data.part", data_writer_.part_number, ".bfbs",
                   extension_);
   CreateBufferWriter(name, &data_writer_.writer);
 }
@@ -277,7 +278,9 @@
     // means they're probably going to run out of space and get stuck too.
     return;
   }
-  const std::string filename = absl::StrCat(base_name_, path, temp_suffix_);
+  const std::string_view separator = base_name_.back() == '/' ? "" : "_";
+  const std::string filename =
+      absl::StrCat(base_name_, separator, path, temp_suffix_);
   if (!destination->get()) {
     if (ran_out_of_space_) {
       *destination = std::make_unique<DetachedBufferWriter>(
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
new file mode 100644
index 0000000..d3f7f38
--- /dev/null
+++ b/aos/events/logging/logfile_sorting.cc
@@ -0,0 +1,281 @@
+#include "aos/events/logging/logfile_sorting.h"
+
+#include <algorithm>
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/flatbuffers.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace logger {
+namespace chrono = std::chrono;
+
+std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
+  // Start by grouping all parts by UUID, and extracting the part index.
+  // Datastructure to hold all the info extracted from a set of parts which go
+  // together so we can sort them afterwords.
+  struct UnsortedLogParts {
+    // Start times.
+    aos::monotonic_clock::time_point monotonic_start_time;
+    aos::realtime_clock::time_point realtime_start_time;
+
+    // Node to save.
+    std::string node;
+
+    // Pairs of the filename and the part index for sorting.
+    std::vector<std::pair<std::string, int>> parts;
+  };
+
+  // Struct to hold both the node, and the parts associated with it.
+  struct UnsortedLogPartsMap {
+    std::string logger_node;
+    aos::monotonic_clock::time_point monotonic_start_time =
+        aos::monotonic_clock::min_time;
+    aos::realtime_clock::time_point realtime_start_time =
+        aos::realtime_clock::min_time;
+
+    std::map<std::string, UnsortedLogParts> unsorted_parts;
+  };
+
+  // Map holding the log_event_uuid -> second map.  The second map holds the
+  // parts_uuid -> list of parts for sorting.
+  std::map<std::string, UnsortedLogPartsMap> parts_list;
+
+  // Sort part files without UUIDs and part indexes as well.  Extract everything
+  // useful from the log in the first pass, then sort later.
+  struct UnsortedOldParts {
+    // Part information with everything but the list of parts.
+    LogParts parts;
+
+    // Tuple of time for the data and filename needed for sorting after
+    // extracting.
+    std::vector<std::pair<monotonic_clock::time_point, std::string>>
+        unsorted_parts;
+  };
+
+  // A list of all the old parts which we don't know how to sort using uuids.
+  // There are enough of these in the wild that this is worth supporting.
+  std::vector<UnsortedOldParts> old_parts;
+
+  // Now extract everything into our datastructures above for sorting.
+  for (const std::string &part : parts) {
+    FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
+
+    const monotonic_clock::time_point monotonic_start_time(
+        chrono::nanoseconds(log_header.message().monotonic_start_time()));
+    const realtime_clock::time_point realtime_start_time(
+        chrono::nanoseconds(log_header.message().realtime_start_time()));
+
+    const std::string_view node =
+        log_header.message().has_node()
+            ? log_header.message().node()->name()->string_view()
+            : "";
+
+    const std::string_view logger_node =
+        log_header.message().has_logger_node()
+            ? log_header.message().logger_node()->name()->string_view()
+            : "";
+
+    // Looks like an old log.  No UUID, index, and also single node.  We have
+    // little to no multi-node log files in the wild without part UUIDs and
+    // indexes which we care much about.
+    if (!log_header.message().has_parts_uuid() &&
+        !log_header.message().has_parts_index() &&
+        !log_header.message().has_node()) {
+      FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
+      const monotonic_clock::time_point first_message_time(
+          chrono::nanoseconds(first_message.message().monotonic_sent_time()));
+
+      // Find anything with a matching start time.  They all go together.
+      auto result = std::find_if(
+          old_parts.begin(), old_parts.end(),
+          [&](const UnsortedOldParts &parts) {
+            return parts.parts.monotonic_start_time == monotonic_start_time &&
+                   parts.parts.realtime_start_time == realtime_start_time;
+          });
+
+      if (result == old_parts.end()) {
+        old_parts.emplace_back();
+        old_parts.back().parts.monotonic_start_time = monotonic_start_time;
+        old_parts.back().parts.realtime_start_time = realtime_start_time;
+        old_parts.back().unsorted_parts.emplace_back(
+            std::make_pair(first_message_time, part));
+      } else {
+        result->unsorted_parts.emplace_back(
+            std::make_pair(first_message_time, part));
+      }
+      continue;
+    }
+
+    CHECK(log_header.message().has_log_event_uuid());
+    CHECK(log_header.message().has_parts_uuid());
+    CHECK(log_header.message().has_parts_index());
+
+    CHECK_EQ(log_header.message().has_logger_node(),
+             log_header.message().has_node());
+
+    const std::string log_event_uuid =
+        log_header.message().log_event_uuid()->str();
+    const std::string parts_uuid = log_header.message().parts_uuid()->str();
+    int32_t parts_index = log_header.message().parts_index();
+
+    auto log_it = parts_list.find(log_event_uuid);
+    if (log_it == parts_list.end()) {
+      log_it =
+          parts_list
+              .insert(std::make_pair(log_event_uuid, UnsortedLogPartsMap()))
+              .first;
+      log_it->second.logger_node = logger_node;
+    } else {
+      CHECK_EQ(log_it->second.logger_node, logger_node);
+    }
+
+    if (node == log_it->second.logger_node) {
+      if (log_it->second.monotonic_start_time ==
+          aos::monotonic_clock::min_time) {
+        log_it->second.monotonic_start_time = monotonic_start_time;
+        log_it->second.realtime_start_time = realtime_start_time;
+      } else {
+        CHECK_EQ(log_it->second.monotonic_start_time, monotonic_start_time);
+        CHECK_EQ(log_it->second.realtime_start_time, realtime_start_time);
+      }
+    }
+
+    auto it = log_it->second.unsorted_parts.find(parts_uuid);
+    if (it == log_it->second.unsorted_parts.end()) {
+      it = log_it->second.unsorted_parts
+               .insert(std::make_pair(parts_uuid, UnsortedLogParts()))
+               .first;
+      it->second.monotonic_start_time = monotonic_start_time;
+      it->second.realtime_start_time = realtime_start_time;
+      it->second.node = std::string(node);
+    }
+
+    // First part might be min_time.  If it is, try to put a better time on it.
+    if (it->second.monotonic_start_time == monotonic_clock::min_time) {
+      it->second.monotonic_start_time = monotonic_start_time;
+    } else if (monotonic_start_time != monotonic_clock::min_time) {
+      CHECK_EQ(it->second.monotonic_start_time, monotonic_start_time);
+    }
+    if (it->second.realtime_start_time == realtime_clock::min_time) {
+      it->second.realtime_start_time = realtime_start_time;
+    } else if (realtime_start_time != realtime_clock::min_time) {
+      CHECK_EQ(it->second.realtime_start_time, realtime_start_time);
+    }
+
+    it->second.parts.emplace_back(std::make_pair(part, parts_index));
+  }
+
+  CHECK_NE(old_parts.empty(), parts_list.empty())
+      << ": Can't have a mix of old and new parts.";
+
+  // Now reformat old_parts to be in the right datastructure to report.
+  if (!old_parts.empty()) {
+    std::vector<LogFile> result;
+    for (UnsortedOldParts &p : old_parts) {
+      // Sort by the oldest message in each file.
+      std::sort(
+          p.unsorted_parts.begin(), p.unsorted_parts.end(),
+          [](const std::pair<monotonic_clock::time_point, std::string> &a,
+             const std::pair<monotonic_clock::time_point, std::string> &b) {
+            return a.first < b.first;
+          });
+      LogFile log_file;
+      for (std::pair<monotonic_clock::time_point, std::string> &f :
+           p.unsorted_parts) {
+        p.parts.parts.emplace_back(std::move(f.second));
+      }
+      log_file.parts.emplace_back(std::move(p.parts));
+      log_file.monotonic_start_time = log_file.parts[0].monotonic_start_time;
+      log_file.realtime_start_time = log_file.parts[0].realtime_start_time;
+      result.emplace_back(std::move(log_file));
+    }
+
+    return result;
+  }
+
+  // Now, sort them and produce the final vector form.
+  std::vector<LogFile> result;
+  result.reserve(parts_list.size());
+  for (std::pair<const std::string, UnsortedLogPartsMap> &logs : parts_list) {
+    LogFile new_file;
+    new_file.log_event_uuid = logs.first;
+    new_file.logger_node = logs.second.logger_node;
+    new_file.monotonic_start_time = logs.second.monotonic_start_time;
+    new_file.realtime_start_time = logs.second.realtime_start_time;
+    for (std::pair<const std::string, UnsortedLogParts> &parts :
+         logs.second.unsorted_parts) {
+      LogParts new_parts;
+      new_parts.monotonic_start_time = parts.second.monotonic_start_time;
+      new_parts.realtime_start_time = parts.second.realtime_start_time;
+      new_parts.log_event_uuid = logs.first;
+      new_parts.parts_uuid = parts.first;
+      new_parts.node = std::move(parts.second.node);
+
+      std::sort(parts.second.parts.begin(), parts.second.parts.end(),
+                [](const std::pair<std::string, int> &a,
+                   const std::pair<std::string, int> &b) {
+                  return a.second < b.second;
+                });
+      new_parts.parts.reserve(parts.second.parts.size());
+      for (std::pair<std::string, int> &p : parts.second.parts) {
+        new_parts.parts.emplace_back(std::move(p.first));
+      }
+      new_file.parts.emplace_back(std::move(new_parts));
+    }
+    result.emplace_back(std::move(new_file));
+  }
+  return result;
+}
+
+std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
+  stream << "{";
+  if (!file.log_event_uuid.empty()) {
+    stream << "\"log_event_uuid\": \"" << file.log_event_uuid << "\", ";
+  }
+  if (!file.logger_node.empty()) {
+    stream << "\"logger_node\": \"" << file.logger_node << "\", ";
+  }
+  stream << "\"monotonic_start_time\": " << file.monotonic_start_time
+         << ", \"realtime_start_time\": " << file.realtime_start_time << ", [";
+  stream << "\"parts\": [";
+  for (size_t i = 0; i < file.parts.size(); ++i) {
+    if (i != 0u) {
+      stream << ", ";
+    }
+    stream << file.parts[i];
+  }
+  stream << "]}";
+  return stream;
+}
+std::ostream &operator<<(std::ostream &stream, const LogParts &parts) {
+  stream << "{";
+  if (!parts.log_event_uuid.empty()) {
+    stream << "\"log_event_uuid\": \"" << parts.log_event_uuid << "\", ";
+  }
+  if (!parts.parts_uuid.empty()) {
+    stream << "\"parts_uuid\": \"" << parts.parts_uuid << "\", ";
+  }
+  if (!parts.node.empty()) {
+    stream << "\"node\": \"" << parts.node << "\", ";
+  }
+  stream << "\"monotonic_start_time\": " << parts.monotonic_start_time
+         << ", \"realtime_start_time\": " << parts.realtime_start_time << ", [";
+
+  for (size_t i = 0; i < parts.parts.size(); ++i) {
+    if (i != 0u) {
+      stream << ", ";
+    }
+    stream << parts.parts[i];
+  }
+
+  stream << "]}";
+  return stream;
+}
+
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
new file mode 100644
index 0000000..50bfbd7
--- /dev/null
+++ b/aos/events/logging/logfile_sorting.h
@@ -0,0 +1,59 @@
+#ifndef AOS_EVENTS_LOGGING_LOGFILE_SORTING_H_
+#define AOS_EVENTS_LOGGING_LOGFILE_SORTING_H_
+
+#include <iostream>
+#include <vector>
+#include <string>
+
+#include "aos/events/logging/uuid.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace logger {
+
+// Datastructure to hold ordered parts.
+struct LogParts {
+  // Monotonic and realtime start times for this set of log files.  For log
+  // files which started out unknown and then became known, this is the known
+  // start time.
+  aos::monotonic_clock::time_point monotonic_start_time;
+  aos::realtime_clock::time_point realtime_start_time;
+
+  // UUIDs if available.
+  std::string log_event_uuid;
+  std::string parts_uuid;
+
+  // The node this represents, or empty if we are in a single node world.
+  std::string node;
+
+  // Pre-sorted list of parts.
+  std::vector<std::string> parts;
+};
+
+// Datastructure to hold parts from the same run of the logger which have no
+// ordering constraints relative to each other.
+struct LogFile {
+  // The UUID tying them all together (if available)
+  std::string log_event_uuid;
+
+  // The node the logger was running on (if available)
+  std::string logger_node;
+
+  // The start time on the logger node.
+  aos::monotonic_clock::time_point monotonic_start_time;
+  aos::realtime_clock::time_point realtime_start_time;
+
+  // All the parts, unsorted.
+  std::vector<LogParts> parts;
+};
+
+std::ostream &operator<<(std::ostream &stream, const LogFile &file);
+std::ostream &operator<<(std::ostream &stream, const LogParts &parts);
+
+// Takes a bunch of parts and sorts them based on part_uuid and part_index.
+std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
+
+}  // namespace logger
+}  // namespace aos
+
+#endif  // AOS_EVENTS_LOGGING_LOGFILE_SORTING_H_
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 8eec23e..908e5e1 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -320,23 +320,6 @@
   return absl::Span<const uint8_t>(data_ptr, data_size);
 }
 
-bool SpanReader::MessageAvailable() {
-  // Are we big enough to read the size?
-  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
-    return false;
-  }
-
-  // Then, are we big enough to read the full message?
-  const size_t data_size =
-      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
-      sizeof(flatbuffers::uoffset_t);
-  if (data_.size() < consumed_data_ + data_size) {
-    return false;
-  }
-
-  return true;
-}
-
 bool SpanReader::ReadBlock() {
   // This is the amount of data we grab at a time. Doing larger chunks minimizes
   // syscalls and helps decompressors batch things more efficiently.
@@ -630,6 +613,7 @@
     // but we then want to find the next message to read.  The conservative
     // answer is to immediately trigger a second requeue to get things moving.
     time_to_queue_ = monotonic_start_time();
+    CHECK_NE(time_to_queue_, monotonic_clock::min_time);
     QueueMessages(time_to_queue_);
   }
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index fddda72..60bd39c 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -195,10 +195,6 @@
   // the size.
   absl::Span<const uint8_t> ReadMessage();
 
-  // Returns true if there is a full message available in the buffer, or if we
-  // will have to read more data from disk.
-  bool MessageAvailable();
-
  private:
   // TODO(austin): Optimization:
   //   Allocate the 256k blocks like we do today.  But, refcount them with
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
new file mode 100644
index 0000000..f6412c3
--- /dev/null
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -0,0 +1,95 @@
+#include "aos/events/logging/logfile_utils.h"
+
+#include <chrono>
+#include <string>
+
+#include "aos/events/logging/test_message_generated.h"
+#include "aos/flatbuffers.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/testing/tmpdir.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace logger {
+namespace testing {
+namespace chrono = std::chrono;
+
+// Creates a size prefixed flatbuffer from json.
+template <typename T>
+SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
+    const std::string_view data) {
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+  fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
+  return fbb.Release();
+}
+
+// Tests that we can write and read 2 flatbuffers to file.
+TEST(SpanReaderTest, ReadWrite) {
+  const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
+  unlink(logfile.c_str());
+
+  const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
+      JsonToSizedFlatbuffer<TestMessage>(
+          R"({ "value": 1 })");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
+      JsonToSizedFlatbuffer<TestMessage>(
+          R"({ "value": 2 })");
+
+  {
+    DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(m1.full_span());
+    writer.QueueSpan(m2.full_span());
+  }
+
+  SpanReader reader(logfile);
+
+  EXPECT_EQ(reader.filename(), logfile);
+  EXPECT_EQ(reader.ReadMessage(), m1.full_span());
+  EXPECT_EQ(reader.ReadMessage(), m2.full_span());
+  EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
+}
+
+// Tests that we can actually parse the resulting messages at a basic level
+// through MessageReader.
+TEST(MessageReaderTest, ReadWrite) {
+  const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
+  unlink(logfile.c_str());
+
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
+      JsonToSizedFlatbuffer<LogFileHeader>(
+          R"({ "max_out_of_order_duration": 100000000 })");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
+
+  {
+    DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config.full_span());
+    writer.QueueSpan(m1.full_span());
+    writer.QueueSpan(m2.full_span());
+  }
+
+  MessageReader reader(logfile);
+
+  EXPECT_EQ(reader.filename(), logfile);
+
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
+  EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+  EXPECT_TRUE(reader.ReadMessage());
+  EXPECT_EQ(reader.newest_timestamp(),
+            monotonic_clock::time_point(chrono::nanoseconds(1)));
+  EXPECT_TRUE(reader.ReadMessage());
+  EXPECT_EQ(reader.newest_timestamp(),
+            monotonic_clock::time_point(chrono::nanoseconds(2)));
+  EXPECT_FALSE(reader.ReadMessage());
+}
+
+}  // namespace testing
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index d206d5d..134c202 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -11,6 +11,7 @@
 #include "absl/strings/escaping.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/logging/uuid.h"
 #include "aos/flatbuffer_merge.h"
@@ -34,7 +35,16 @@
 
 namespace aos {
 namespace logger {
+namespace {
+// Helper to safely read a header, or CHECK.
+FlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
+    const std::vector<std::vector<std::string>> &filenames) {
+  CHECK_GE(filenames.size(), 1u) << ": Empty filenames list";
+  CHECK_GE(filenames[0].size(), 1u) << ": Empty filenames list";
+  return ReadHeader(filenames[0][0]);
+}
 namespace chrono = std::chrono;
+}  // namespace
 
 Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
                std::function<bool(const Channel *)> should_log)
@@ -681,226 +691,6 @@
   }
 }
 
-std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
-  // Start by grouping all parts by UUID, and extracting the part index.
-  // Datastructure to hold all the info extracted from a set of parts which go
-  // together so we can sort them afterwords.
-  struct UnsortedLogParts {
-    // Start times.
-    aos::monotonic_clock::time_point monotonic_start_time;
-    aos::realtime_clock::time_point realtime_start_time;
-
-    // Node to save.
-    std::string node;
-
-    // Pairs of the filename and the part index for sorting.
-    std::vector<std::pair<std::string, int>> parts;
-  };
-
-  // Map holding the log_event_uuid -> second map.  The second map holds the
-  // parts_uuid -> list of parts for sorting.
-  std::map<std::string, std::map<std::string, UnsortedLogParts>> parts_list;
-
-  // Sort part files without UUIDs and part indexes as well.  Extract everything
-  // useful from the log in the first pass, then sort later.
-  struct UnsortedOldParts {
-    // Part information with everything but the list of parts.
-    LogParts parts;
-
-    // Tuple of time for the data and filename needed for sorting after
-    // extracting.
-    std::vector<std::pair<monotonic_clock::time_point, std::string>>
-        unsorted_parts;
-  };
-
-  // A list of all the old parts which we don't know how to sort using uuids.
-  // There are enough of these in the wild that this is worth supporting.
-  std::vector<UnsortedOldParts> old_parts;
-
-  // Now extract everything into our datastructures above for sorting.
-  for (const std::string &part : parts) {
-    FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
-
-    const monotonic_clock::time_point monotonic_start_time(
-        chrono::nanoseconds(log_header.message().monotonic_start_time()));
-    const realtime_clock::time_point realtime_start_time(
-        chrono::nanoseconds(log_header.message().realtime_start_time()));
-
-    const std::string_view node =
-        log_header.message().has_node()
-            ? log_header.message().node()->name()->string_view()
-            : "";
-
-    // Looks like an old log.  No UUID, index, and also single node.  We have
-    // little to no multi-node log files in the wild without part UUIDs and
-    // indexes which we care much about.
-    if (!log_header.message().has_parts_uuid() &&
-        !log_header.message().has_parts_index() &&
-        !log_header.message().has_node()) {
-      FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
-      const monotonic_clock::time_point first_message_time(
-          chrono::nanoseconds(first_message.message().monotonic_sent_time()));
-
-      // Find anything with a matching start time.  They all go together.
-      auto result = std::find_if(
-          old_parts.begin(), old_parts.end(),
-          [&](const UnsortedOldParts &parts) {
-            return parts.parts.monotonic_start_time == monotonic_start_time &&
-                   parts.parts.realtime_start_time == realtime_start_time;
-          });
-
-      if (result == old_parts.end()) {
-        old_parts.emplace_back();
-        old_parts.back().parts.monotonic_start_time = monotonic_start_time;
-        old_parts.back().parts.realtime_start_time = realtime_start_time;
-        old_parts.back().unsorted_parts.emplace_back(
-            std::make_pair(first_message_time, part));
-      } else {
-        result->unsorted_parts.emplace_back(
-            std::make_pair(first_message_time, part));
-      }
-      continue;
-    }
-
-    CHECK(log_header.message().has_log_event_uuid());
-    CHECK(log_header.message().has_parts_uuid());
-    CHECK(log_header.message().has_parts_index());
-
-    const std::string log_event_uuid =
-        log_header.message().log_event_uuid()->str();
-    const std::string parts_uuid = log_header.message().parts_uuid()->str();
-    int32_t parts_index = log_header.message().parts_index();
-
-    auto log_it = parts_list.find(log_event_uuid);
-    if (log_it == parts_list.end()) {
-      log_it =
-          parts_list
-              .insert(std::make_pair(log_event_uuid,
-                                     std::map<std::string, UnsortedLogParts>()))
-              .first;
-    }
-
-    auto it = log_it->second.find(parts_uuid);
-    if (it == log_it->second.end()) {
-      it = log_it->second.insert(std::make_pair(parts_uuid, UnsortedLogParts()))
-               .first;
-      it->second.monotonic_start_time = monotonic_start_time;
-      it->second.realtime_start_time = realtime_start_time;
-      it->second.node = std::string(node);
-    }
-
-    // First part might be min_time.  If it is, try to put a better time on it.
-    if (it->second.monotonic_start_time == monotonic_clock::min_time) {
-      it->second.monotonic_start_time = monotonic_start_time;
-    } else if (monotonic_start_time != monotonic_clock::min_time) {
-      CHECK_EQ(it->second.monotonic_start_time, monotonic_start_time);
-    }
-    if (it->second.realtime_start_time == realtime_clock::min_time) {
-      it->second.realtime_start_time = realtime_start_time;
-    } else if (realtime_start_time != realtime_clock::min_time) {
-      CHECK_EQ(it->second.realtime_start_time, realtime_start_time);
-    }
-
-    it->second.parts.emplace_back(std::make_pair(part, parts_index));
-  }
-
-  CHECK_NE(old_parts.empty(), parts_list.empty())
-      << ": Can't have a mix of old and new parts.";
-
-  // Now reformat old_parts to be in the right datastructure to report.
-  if (!old_parts.empty()) {
-    std::vector<LogFile> result;
-    for (UnsortedOldParts &p : old_parts) {
-      // Sort by the oldest message in each file.
-      std::sort(
-          p.unsorted_parts.begin(), p.unsorted_parts.end(),
-          [](const std::pair<monotonic_clock::time_point, std::string> &a,
-             const std::pair<monotonic_clock::time_point, std::string> &b) {
-            return a.first < b.first;
-          });
-      LogFile log_file;
-      for (std::pair<monotonic_clock::time_point, std::string> &f :
-           p.unsorted_parts) {
-        p.parts.parts.emplace_back(std::move(f.second));
-      }
-      log_file.parts.emplace_back(std::move(p.parts));
-      result.emplace_back(std::move(log_file));
-    }
-
-    return result;
-  }
-
-  // Now, sort them and produce the final vector form.
-  std::vector<LogFile> result;
-  result.reserve(parts_list.size());
-  for (std::pair<const std::string, std::map<std::string, UnsortedLogParts>>
-           &logs : parts_list) {
-    LogFile new_file;
-    new_file.log_event_uuid = logs.first;
-    for (std::pair<const std::string, UnsortedLogParts> &parts : logs.second) {
-      LogParts new_parts;
-      new_parts.monotonic_start_time = parts.second.monotonic_start_time;
-      new_parts.realtime_start_time = parts.second.realtime_start_time;
-      new_parts.log_event_uuid = logs.first;
-      new_parts.parts_uuid = parts.first;
-      new_parts.node = std::move(parts.second.node);
-
-      std::sort(parts.second.parts.begin(), parts.second.parts.end(),
-                [](const std::pair<std::string, int> &a,
-                   const std::pair<std::string, int> &b) {
-                  return a.second < b.second;
-                });
-      new_parts.parts.reserve(parts.second.parts.size());
-      for (std::pair<std::string, int> &p : parts.second.parts) {
-        new_parts.parts.emplace_back(std::move(p.first));
-      }
-      new_file.parts.emplace_back(std::move(new_parts));
-    }
-    result.emplace_back(std::move(new_file));
-  }
-  return result;
-}
-
-std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
-  stream << "{";
-  if (!file.log_event_uuid.empty()) {
-    stream << "\"log_event_uuid\": \"" << file.log_event_uuid << "\", ";
-  }
-  stream << "\"parts\": [";
-  for (size_t i = 0; i < file.parts.size(); ++i) {
-    if (i != 0u) {
-      stream << ", ";
-    }
-    stream << file.parts[i];
-  }
-  stream << "]}";
-  return stream;
-}
-std::ostream &operator<<(std::ostream &stream, const LogParts &parts) {
-  stream << "{";
-  if (!parts.log_event_uuid.empty()) {
-    stream << "\"log_event_uuid\": \"" << parts.log_event_uuid << "\", ";
-  }
-  if (!parts.parts_uuid.empty()) {
-    stream << "\"parts_uuid\": \"" << parts.parts_uuid << "\", ";
-  }
-  if (!parts.node.empty()) {
-    stream << "\"node\": \"" << parts.node << "\", ";
-  }
-  stream << "\"monotonic_start_time\": " << parts.monotonic_start_time
-         << ", \"realtime_start_time\": " << parts.realtime_start_time << ", [";
-
-  for (size_t i = 0; i < parts.parts.size(); ++i) {
-    if (i != 0u) {
-      stream << ", ";
-    }
-    stream << parts.parts[i];
-  }
-
-  stream << "]}";
-  return stream;
-}
-
 std::vector<std::vector<std::string>> ToLogReaderVector(
     const std::vector<LogFile> &log_files) {
   std::vector<std::vector<std::string>> result;
@@ -935,7 +725,7 @@
 LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
                      const Configuration *replay_configuration)
     : filenames_(filenames),
-      log_file_header_(ReadHeader(filenames[0][0])),
+      log_file_header_(MaybeReadHeaderOrDie(filenames)),
       replay_configuration_(replay_configuration) {
   MakeRemappedConfig();
 
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index ca4630f..b37fea2 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -13,6 +13,7 @@
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/eigen_mpq.h"
 #include "aos/events/logging/log_namer.h"
+#include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/logging/uuid.h"
@@ -279,41 +280,6 @@
   std::vector<NodeState> node_state_;
 };
 
-// Datastructure to hold ordered parts.
-struct LogParts {
-  // Monotonic and realtime start times for this set of log files.  For log
-  // files which started out unknown and then became known, this is the known
-  // start time.
-  aos::monotonic_clock::time_point monotonic_start_time;
-  aos::realtime_clock::time_point realtime_start_time;
-
-  // UUIDs if available.
-  std::string log_event_uuid;
-  std::string parts_uuid;
-
-  // The node this represents, or empty if we are in a single node world.
-  std::string node;
-
-  // Pre-sorted list of parts.
-  std::vector<std::string> parts;
-};
-
-// Datastructure to hold parts from the same run of the logger which have no
-// ordering constraints relative to each other.
-struct LogFile {
-  // The UUID tying them all together (if available)
-  std::string log_event_uuid;
-
-  // All the parts, unsorted.
-  std::vector<LogParts> parts;
-};
-
-std::ostream &operator<<(std::ostream &stream, const LogFile &file);
-std::ostream &operator<<(std::ostream &stream, const LogParts &parts);
-
-// Takes a bunch of parts and sorts them based on part_uuid and part_index.
-std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
-
 std::vector<std::vector<std::string>> ToLogReaderVector(
     const std::vector<LogFile> &log_files);
 
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index 1d0c55e..f6ec4d6 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -1,3 +1,6 @@
+#include <sys/resource.h>
+#include <sys/time.h>
+
 #include "aos/configuration.h"
 #include "aos/events/logging/logger.h"
 #include "aos/events/shm_event_loop.h"
@@ -28,13 +31,17 @@
         aos::logging::GetLogName("fbs_log"), event_loop.node());
   } else {
     log_namer = std::make_unique<aos::logger::MultiNodeLogNamer>(
-        aos::logging::GetLogName("fbs_log"), event_loop.configuration(),
-        event_loop.node());
+        absl::StrCat(aos::logging::GetLogName("fbs_log"), "/"),
+        event_loop.configuration(), event_loop.node());
   }
 
   aos::logger::Logger logger(&event_loop);
-  event_loop.OnRun(
-      [&log_namer, &logger]() { logger.StartLogging(std::move(log_namer)); });
+  event_loop.OnRun([&log_namer, &logger]() {
+    errno = 0;
+    setpriority(PRIO_PROCESS, 0, -20);
+    PCHECK(errno == 0) << ": Renicing to -20 failed";
+    logger.StartLogging(std::move(log_namer));
+  });
 
   event_loop.Run();
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 51994e3..57d274b 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -6,6 +6,7 @@
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/timestamp_generated.h"
+#include "aos/testing/tmpdir.h"
 #include "aos/util/file.h"
 #include "glog/logging.h"
 #include "gmock/gmock.h"
@@ -47,7 +48,7 @@
 // Tests that we can startup at all.  This confirms that the channels are all in
 // the config.
 TEST_F(LoggerTest, Starts) {
-  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string tmpdir = aos::testing::TestTmpDir();
   const ::std::string base_name = tmpdir + "/logfile";
   const ::std::string logfile = base_name + ".part0.bfbs";
   // Remove it.
@@ -1126,9 +1127,11 @@
 
   size_t missing_rt_count = 0;
 
+  std::vector<std::string> logger_nodes;
   for (const LogFile &log_file : sorted_parts) {
     EXPECT_FALSE(log_file.log_event_uuid.empty());
     log_event_uuids.insert(log_file.log_event_uuid);
+    logger_nodes.emplace_back(log_file.logger_node);
     both_uuids.insert(log_file.log_event_uuid);
 
     for (const LogParts &part : log_file.parts) {
@@ -1159,6 +1162,16 @@
   // (inner vectors all need to be in order, but outer one doesn't matter).
   EXPECT_THAT(ToLogReaderVector(sorted_parts),
               ::testing::UnorderedElementsAreArray(structured_logfiles_));
+
+  EXPECT_THAT(logger_nodes, ::testing::UnorderedElementsAre("pi1", "pi2"));
+
+  EXPECT_NE(sorted_parts[0].realtime_start_time, aos::realtime_clock::min_time);
+  EXPECT_NE(sorted_parts[1].realtime_start_time, aos::realtime_clock::min_time);
+
+  EXPECT_NE(sorted_parts[0].monotonic_start_time,
+            aos::monotonic_clock::min_time);
+  EXPECT_NE(sorted_parts[1].monotonic_start_time,
+            aos::monotonic_clock::min_time);
 }
 
 // Tests that if we remap a remapped channel, it shows up correctly.
diff --git a/aos/events/logging/test_message.fbs b/aos/events/logging/test_message.fbs
new file mode 100644
index 0000000..ef876aa
--- /dev/null
+++ b/aos/events/logging/test_message.fbs
@@ -0,0 +1,7 @@
+namespace aos.logger.testing;
+
+table TestMessage {
+  value:int;
+}
+
+root_type TestMessage;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 2eaffa0..d86d86c 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -9,6 +9,7 @@
 #include "aos/events/aos_logging.h"
 #include "aos/events/simulated_network_bridge.h"
 #include "aos/json_to_flatbuffer.h"
+#include "aos/realtime.h"
 #include "aos/util/phased_loop.h"
 
 namespace aos {
@@ -19,6 +20,16 @@
 
 namespace {
 
+class ScopedMarkRealtimeRestorer {
+ public:
+  ScopedMarkRealtimeRestorer(bool rt) : rt_(rt), prior_(MarkRealtime(rt)) {}
+  ~ScopedMarkRealtimeRestorer() { CHECK_EQ(rt_, MarkRealtime(prior_)); }
+
+ private:
+  const bool rt_;
+  const bool prior_;
+};
+
 // Container for both a message, and the context for it for simulation.  This
 // makes tracking the timestamps associated with the data easy.
 struct SimulatedMessage final {
@@ -546,7 +557,10 @@
   }
 
   void OnRun(::std::function<void()> on_run) override {
-    scheduler_->ScheduleOnRun(on_run);
+    scheduler_->ScheduleOnRun([this, on_run = std::move(on_run)]() {
+      ScopedMarkRealtimeRestorer rt(priority() > 0);
+      on_run();
+    });
   }
 
   const Node *node() const override { return node_; }
@@ -739,7 +753,10 @@
     context.realtime_remote_time = context.realtime_event_time;
   }
 
-  DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
+  {
+    ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
+    DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
+  }
 
   msgs_.pop_front();
   if (token_ != scheduler_->InvalidToken()) {
@@ -857,7 +874,10 @@
     simulated_event_loop_->AddEvent(&event_);
   }
 
-  Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
+  {
+    ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
+    Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
+  }
 }
 
 void SimulatedTimerHandler::Disable() {
@@ -893,9 +913,14 @@
   if (simulated_event_loop_->log_impl_) {
     prev_logger.Swap(simulated_event_loop_->log_impl_);
   }
-  Call(
-      [monotonic_now]() { return monotonic_now; },
-      [this](monotonic_clock::time_point sleep_time) { Schedule(sleep_time); });
+
+  {
+    ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
+    Call([monotonic_now]() { return monotonic_now; },
+         [this](monotonic_clock::time_point sleep_time) {
+           Schedule(sleep_time);
+         });
+  }
 }
 
 void SimulatedPhasedLoopHandler::Schedule(
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 5da7466..b619452 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -7,7 +7,7 @@
 #include "absl/types/span.h"
 #include "aos/containers/resizeable_buffer.h"
 #include "aos/macros.h"
-#include "flatbuffers/flatbuffers.h"
+#include "flatbuffers/flatbuffers.h"  // IWYU pragma: export
 #include "glog/logging.h"
 
 namespace aos {
@@ -264,9 +264,12 @@
   }
 
   void Reset() {
-    CHECK(!allocator_.is_allocated()) << ": May not reset while building";
+    CHECK(!allocator_.is_allocated() || data_ != nullptr)
+        << ": May not reset while building";
     fbb_ = flatbuffers::FlatBufferBuilder(Size, &allocator_);
     fbb_.ForceDefaults(true);
+    data_ = nullptr;
+    size_ = 0;
   }
 
   flatbuffers::FlatBufferBuilder *fbb() {
diff --git a/aos/logging/log_namer.cc b/aos/logging/log_namer.cc
index f025054..c1dd30a 100644
--- a/aos/logging/log_namer.cc
+++ b/aos/logging/log_namer.cc
@@ -146,11 +146,11 @@
   AllocateLogName(&tmp, folder, basename);
 
   std::string log_base_name = tmp;
-  std::string log_roborio_name = log_base_name + "_roborio_data.bfbs";
+  std::string log_roborio_name = log_base_name + "/";
   free(tmp);
 
   char *tmp2;
-  if (asprintf(&tmp2, "%s/%s-current.bfbs", folder, basename) == -1) {
+  if (asprintf(&tmp2, "%s/%s-current", folder, basename) == -1) {
     PLOG(WARNING) << "couldn't create current symlink name";
   } else {
     if (unlink(tmp2) == -1 && (errno != EROFS && errno != ENOENT)) {
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index b272c29..b80547d 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -248,6 +248,12 @@
   const Channel *const timestamp_channel = configuration::GetChannel(
       event_loop_->configuration(), "/aos", Timestamp::GetFullyQualifiedName(),
       event_loop_->name(), event_loop_->node());
+  CHECK(timestamp_channel != nullptr)
+      << ": Failed to find timestamp channel {\"name\": \"/aos\", \"type\": \""
+      << Timestamp::GetFullyQualifiedName() << "\"}";
+  CHECK(configuration::ChannelIsSendableOnNode(timestamp_channel,
+                                               event_loop_->node()))
+      << ": Timestamp channel is not sendable on this node.";
 
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
     CHECK(channel->has_source_node());
@@ -302,11 +308,17 @@
         timestamp_state_ = state.get();
       }
       channels_.emplace_back(std::move(state));
+    } else if (channel == timestamp_channel) {
+      std::unique_ptr<ChannelState> state(
+          new ChannelState{channel, channel_index});
+      timestamp_state_ = state.get();
+      channels_.emplace_back(std::move(state));
     } else {
       channels_.emplace_back(nullptr);
     }
     ++channel_index;
   }
+  CHECK(timestamp_state_ != nullptr);
 
   // Buffer up the max size a bit so everything fits nicely.
   LOG(INFO) << "Max message size for all clients is " << max_size;
diff --git a/aos/realtime.cc b/aos/realtime.cc
index a41eb0d..9df7aca 100644
--- a/aos/realtime.cc
+++ b/aos/realtime.cc
@@ -1,18 +1,19 @@
 #include "aos/realtime.h"
 
+#include <errno.h>
+#include <malloc.h>
+#include <sched.h>
+#include <stdint.h>
 #include <stdio.h>
+#include <stdlib.h>
 #include <string.h>
 #include <sys/mman.h>
-#include <errno.h>
-#include <sched.h>
+#include <sys/prctl.h>
 #include <sys/resource.h>
 #include <sys/types.h>
 #include <unistd.h>
-#include <stdlib.h>
-#include <stdint.h>
-#include <sys/prctl.h>
-#include <malloc.h>
 
+#include "aos/thread_local.h"
 #include "glog/logging.h"
 
 namespace FLAG__namespace_do_not_use_directly_use_DECLARE_double_instead {
@@ -68,6 +69,7 @@
 }  // namespace
 
 void LockAllMemory() {
+  CheckNotRealtime();
   // Allow locking as much as we want into RAM.
   SetSoftRLimit(RLIMIT_MEMLOCK, RLIM_INFINITY, SetLimitForRoot::kNo);
 
@@ -101,6 +103,7 @@
 }
 
 void InitRT() {
+  CheckNotRealtime();
   LockAllMemory();
 
   // Only let rt processes run for 3 seconds straight.
@@ -114,6 +117,7 @@
   struct sched_param param;
   param.sched_priority = 0;
   PCHECK(sched_setscheduler(0, SCHED_OTHER, &param) == 0);
+  MarkRealtime(false);
 }
 
 void SetCurrentThreadAffinity(const cpu_set_t &cpuset) {
@@ -141,6 +145,7 @@
 
   struct sched_param param;
   param.sched_priority = priority;
+  MarkRealtime(true);
   PCHECK(sched_setscheduler(0, SCHED_FIFO, &param) == 0)
       << ": changing to SCHED_FIFO with " << priority;
 }
@@ -155,4 +160,20 @@
                 AllowSoftLimitDecrease::kNo);
 }
 
+namespace {
+AOS_THREAD_LOCAL bool is_realtime = false;
+}
+
+bool MarkRealtime(bool realtime) {
+  const bool prior = is_realtime;
+  is_realtime = realtime;
+  return prior;
+}
+
+void CheckRealtime() { CHECK(is_realtime); }
+
+void CheckNotRealtime() { CHECK(!is_realtime); }
+
+ScopedRealtimeRestorer::ScopedRealtimeRestorer() : prior_(is_realtime) {}
+
 }  // namespace aos
diff --git a/aos/realtime.h b/aos/realtime.h
index 6e0a472..52db4a8 100644
--- a/aos/realtime.h
+++ b/aos/realtime.h
@@ -4,6 +4,8 @@
 #include <sched.h>
 #include <string_view>
 
+#include "glog/logging.h"
+
 namespace aos {
 
 // Locks everything into memory and sets the limits.  This plus InitNRT are
@@ -34,6 +36,55 @@
 
 void ExpandStackSize();
 
+// CHECKs that we are (or are not) running on the RT scheduler.  Useful for
+// enforcing that operations which are or are not bounded shouldn't be run. This
+// works both in simulation and when running against the real target.
+void CheckRealtime();
+void CheckNotRealtime();
+
+// Marks that we are or are not running on the realtime scheduler.  Returns the
+// previous state.
+//
+// Note: this shouldn't be used directly.  The event loop primitives should be
+// used instead.
+bool MarkRealtime(bool realtime);
+
+// Class which restores the current RT state when destructed.
+class ScopedRealtimeRestorer {
+ public:
+  ScopedRealtimeRestorer();
+  ~ScopedRealtimeRestorer() { MarkRealtime(prior_); }
+
+ private:
+  const bool prior_;
+};
+
+// Class which marks us as on the RT scheduler until it goes out of scope.
+// Note: this shouldn't be needed for most applications.
+class ScopedRealtime {
+ public:
+  ScopedRealtime() : prior_(MarkRealtime(true)) {}
+  ~ScopedRealtime() {
+    CHECK(MarkRealtime(prior_)) << ": Priority was modified";
+  }
+
+ private:
+  const bool prior_;
+};
+
+// Class which marks us as not on the RT scheduler until it goes out of scope.
+// Note: this shouldn't be needed for most applications.
+class ScopedNotRealtime {
+ public:
+  ScopedNotRealtime() : prior_(MarkRealtime(false)) {}
+  ~ScopedNotRealtime() {
+    CHECK(!MarkRealtime(prior_)) << ": Priority was modified";
+  }
+
+ private:
+  const bool prior_;
+};
+
 }  // namespace aos
 
 #endif  // AOS_REALTIME_H_
diff --git a/aos/realtime_test.cc b/aos/realtime_test.cc
new file mode 100644
index 0000000..e77f140
--- /dev/null
+++ b/aos/realtime_test.cc
@@ -0,0 +1,76 @@
+#include "aos/realtime.h"
+
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace testing {
+
+// Tests that ScopedRealtime handles the simple case.
+TEST(RealtimeTest, ScopedRealtime) {
+  CheckNotRealtime();
+  {
+    ScopedRealtime rt;
+    CheckRealtime();
+  }
+  CheckNotRealtime();
+}
+
+// Tests that ScopedRealtime handles nesting.
+TEST(RealtimeTest, DoubleScopedRealtime) {
+  CheckNotRealtime();
+  {
+    ScopedRealtime rt;
+    CheckRealtime();
+    {
+      ScopedRealtime rt2;
+      CheckRealtime();
+    }
+    CheckRealtime();
+  }
+  CheckNotRealtime();
+}
+
+// Tests that ScopedRealtime handles nesting with ScopedNotRealtime.
+TEST(RealtimeTest, ScopedNotRealtime) {
+  CheckNotRealtime();
+  {
+    ScopedRealtime rt;
+    CheckRealtime();
+    {
+      ScopedNotRealtime nrt;
+      CheckNotRealtime();
+    }
+    CheckRealtime();
+  }
+  CheckNotRealtime();
+}
+
+// Tests that ScopedRealtimeRestorer works both when starting RT and nonrt.
+TEST(RealtimeTest, ScopedRealtimeRestorer) {
+  CheckNotRealtime();
+  {
+    ScopedRealtime rt;
+    CheckRealtime();
+    {
+      ScopedRealtimeRestorer restore;
+      CheckRealtime();
+
+      MarkRealtime(false);
+      CheckNotRealtime();
+    }
+    CheckRealtime();
+  }
+  CheckNotRealtime();
+
+  {
+    ScopedRealtimeRestorer restore;
+    CheckNotRealtime();
+
+    MarkRealtime(true);
+    CheckRealtime();
+  }
+  CheckNotRealtime();
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/starter/starter.sh b/aos/starter/starter.sh
index e140351..87f58a4 100755
--- a/aos/starter/starter.sh
+++ b/aos/starter/starter.sh
@@ -4,6 +4,11 @@
 if [[ "$(hostname)" == "roboRIO"* ]]; then
   ROBOT_CODE="/home/admin/robot_code"
 
+  # Configure throttling so we reserve 5% of the CPU for non-rt work.
+  # This makes things significantly more stable.
+  echo 950000 > /proc/sys/kernel/sched_rt_runtime_us
+  echo 1000000 > /proc/sys/kernel/sched_rt_period_us
+
   ln -s /var/local/natinst/log/FRC_UserProgram.log /tmp/FRC_UserProgram.log
   ln -s /var/local/natinst/log/FRC_UserProgram.log "${ROBOT_CODE}/FRC_UserProgram.log"
 elif [[ "$(hostname)" == "pi-"* ]]; then
diff --git a/aos/testing/BUILD b/aos/testing/BUILD
index ce67b84..e70b5a4 100644
--- a/aos/testing/BUILD
+++ b/aos/testing/BUILD
@@ -86,3 +86,11 @@
     ],
     visibility = ["//visibility:public"],
 )
+
+cc_library(
+    name = "tmpdir",
+    testonly = True,
+    srcs = ["tmpdir.cc"],
+    hdrs = ["tmpdir.h"],
+    visibility = ["//visibility:public"],
+)
diff --git a/aos/testing/tmpdir.cc b/aos/testing/tmpdir.cc
new file mode 100644
index 0000000..15e3c13
--- /dev/null
+++ b/aos/testing/tmpdir.cc
@@ -0,0 +1,18 @@
+#include "aos/testing/tmpdir.h"
+
+#include <cstdlib>
+#include <string>
+
+namespace aos {
+namespace testing {
+
+std::string TestTmpDir() {
+  const char *tmp_dir = std::getenv("TEST_TMPDIR");
+  if (tmp_dir != nullptr) {
+    return tmp_dir;
+  }
+  return "/tmp";
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/testing/tmpdir.h b/aos/testing/tmpdir.h
new file mode 100644
index 0000000..7e64342
--- /dev/null
+++ b/aos/testing/tmpdir.h
@@ -0,0 +1,15 @@
+#ifndef AOS_TESTING_TMPDIR_H_
+#define AOS_TESTING_TMPDIR_H_
+
+#include <string>
+
+namespace aos {
+namespace testing {
+
+// Returns a usable temporary directory.
+std::string TestTmpDir();
+
+}  // namespace testing
+}  // namespace aos
+
+#endif  // AOS_TESTING_TMPDIR_H_
diff --git a/third_party/gmp/BUILD b/third_party/gmp/BUILD
index b60c349..0ac8edc 100644
--- a/third_party/gmp/BUILD
+++ b/third_party/gmp/BUILD
@@ -47,6 +47,9 @@
     ],
 }
 
+# gmp's tools leak memory on purpose. Just skip asan for them.
+tool_features = ["-asan"]
+
 genrule(
     name = "gmp_h_copy",
     srcs = file_from_architecture(architecture_paths, "gmp.h"),
@@ -112,6 +115,7 @@
     name = "gen-fac",
     srcs = ["gen-fac.c"],
     copts = copts,
+    features = tool_features,
     deps = [":bootstrap"],
 )
 
@@ -133,6 +137,7 @@
     name = "gen-fib",
     srcs = ["gen-fib.c"],
     copts = copts,
+    features = tool_features,
     deps = [":bootstrap"],
 )
 
@@ -154,6 +159,7 @@
     name = "gen-bases",
     srcs = ["gen-bases.c"],
     copts = copts,
+    features = tool_features,
     deps = [":bootstrap"],
 )
 
@@ -175,6 +181,7 @@
     name = "gen-trialdivtab",
     srcs = ["gen-trialdivtab.c"],
     copts = copts,
+    features = tool_features,
     deps = [":bootstrap"],
 )