Merge "Use key field for looking up enum values in flatbuffer reflection."
diff --git a/README.md b/README.md
index af5ae6e..395a802 100644
--- a/README.md
+++ b/README.md
@@ -2,36 +2,66 @@
 This is FRC Team 971's main code repository. There are `README*` files throughout the source tree documenting specifics for their respective folders.
 
 ## Access to the code
-The main central location for our code is our [Gerrit](https://www.gerritcodereview.com/) server at https://software.frc971.org/gerrit. To get a copy of the code on your computer to work with, follow these steps:
-  1. Contact Michael Schuh to get an SVN account.
-  2. Go to Gerrit and create an account.
-  3. Contact Brian Silverman with your SVN username to get access to the code in Gerrit.
-  4. Go to [the 971-Robot-Code project in Gerrit](https://software.frc971.org/gerrit/#/admin/projects/971-Robot-Code) and run the command.
-     Running the `clone with commit-msg hook` command will save you trouble later.
+The main central location for our code is our [Gerrit](https://www.gerritcodereview.com/) server at https://software.frc971.org/gerrit. To download a copy of the 971 code on your computer, follow these steps:
+  1. Fill out an SVN account request form to get an SVN account.
+    1. Mention that you are part of the software team and need Gerrit access
+    2. It is recommended to use your firstname in all lowercase for your username
+  2. Go to our [Gerrit server](https://software.frc971.org/gerrit) and create an account.
+  3. Contact Brian Silverman or Stephan Massalt with your SVN username to get access to the code in Gerrit.
+  4. When you log into Gerrit the first time, please [add your Email Address](http://software.frc971.org/gerrit/settings/#EmailAddresses)
+  5. Add your SSH key to Gerrit in order to be able to check out code
+    1. If you don't already have an ssh key, you can create one using `ssh-keygen`.  This will create a public/private key pair-- the default location for your public key will be `~/.ssh/id_rsa.pub`
+    2. Log into Gerrit and go to `Settings->SSH Keys` and paste your public key into the `New SSH Key` text box and clicking on `ADD NEW SSH KEY`
+  6. Install `git`: `sudo apt install git`
+  7. Go to [the 971-Robot-Code project in Gerrit](https://software.frc971.org/gerrit/#/admin/projects/971-Robot-Code) and run the command to Download the 971-Robot-Code repository.
+    1. We recommend downloading the code via SSH using the `clone with commit-msg hook` command
+    2. NOTE: Running with the option to `clone with commit-msg hook` will save you trouble later.
 
-To learn more about git, see git(1) (`man git` or [git(1)](http://manpages.debian.net/cgi-bin/man.cgi?query=git>) (especially the NOTES section).
+To learn more about git, open a terminal and run `man git`, or see [git(1)](https://manpages.debian.org/buster/git-man/git.1.en.html) (especially the NOTES section).
 
-## Code reviews
-We want all code to at least have a second person look over it before it gets merged into the `master` branch. Gerrit has [extensive documentation on starting reviews](https://software.frc971.org/gerrit/Documentation/user-upload.html). TL;DR: `git push origin HEAD:refs/for/master` and then click on the link to add reviewers.
-If you just upload a change without adding any reviewers, it might sit around for a long time before anybody else notices it.
-[git-review](http://manpages.debian.org/cgi-bin/man.cgi?query=git-review) can make the upload process simpler.
+
+## Prerequisites
+The instructions below assume you have the following:
+  1. A host computer with an appropriate OS or VM to compile the 971 code using Bazel
+    1. The currently supported operating system for building the code is amd64 Debian Buster.
+    2. It is likely to work on any `x86\_64 GNU/Linux` system (e.g., Ubuntu 20.04), but that's not at all well-tested.
+  2. Your favorite text editor installed, e.g., `vim`, `emacs`
+  3. Access to the 971-Robot-Code repository and have downloaded the source code
+  4. The ability to `ssh` into target CPU's like the roborio and Raspberry Pi
+
 
 ## Building the code
-The currently supported operating system for building the code is amd64 Debian Buster. It is likely to work on any x86\_64 GNU/Linux system, but that's not at all well-tested.
+We use [Bazel](http://bazel.io) to build the code. Bazel has [extensive docs](https://docs.bazel.build/versions/master/build-ref.html), including a nice [build encyclopedia reference](https://docs.bazel.build/versions/master/be/overview.html), and does a great job with fast, correct incremental rebuilds.
 
-We use [Bazel](http://bazel.io) to build the code. Bazel has [extensive](https://docs.bazel.build/versions/master/build-ref.html) [docs](https://docs.bazel.build/versions/master/be/overview.html) and does a nice job with fast, correct increment rebuilds.
+There are a couple options for building code that are given here-- setting up either your own computer, or using the frc971 build server.
 
 ### Steps to set up a computer to build the code:
-  0. Install any Bazel version.  See [here](https://docs.bazel.build/versions/master/install-ubuntu.html)
-  1. Install the required packages:
-```console
-apt-get update
+  1. Install any Bazel version.
+    1. Check to see if the version of Linux you're running has an apt package for Bazel: `apt-cache search bazel` or just try `sudo apt install bazel`
+    2. More likely, you'll need to install manually-- see [here](https://docs.bazel.build/versions/master/install-ubuntu.html).  We recommend using `apt-key` instead of `gnupg` in setting up the key:
+      1. Step 1: Add Bazel distribution URI as a package source
+         ```
+         sudo apt install curl apt-key
+         curl -fsSL https://bazel.build/bazel-release.pub.gpg | apt-key add -
+         echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | sudo tee /etc/apt/sources.list.d/bazel.list
+         ```
+      2. Step 2: Install Bazel
+         ```
+         sudo apt update && sudo apt install bazel
+         ```
+
+  2. Install the required packages:
+```sh
+sudo apt-get update
 # TODO(james): Ideally, we shouldn't need to be installing libtinfo5...
-apt-get install bazel python libtinfo5
+sudo apt-get install python libtinfo5
 ```
-  2. Allow Bazel's sandboxing to work:
-     Follow the direction in `doc/frc971.conf`.
+  3. Change settings to allow Bazel's sandboxing to work-- follow the directions in `doc/frc971.conf`.  For example, the commands to do this would be:
+    1. `sudo cp doc/frc971.conf /etc/sysctl.d/`
+    2. `sudo sysctl --system`
+
 ### Setting up access to a workspace on the build server
+In order to use the build server, you'll first need to get ssh access set up.  (NOTE: you don't need to do any of the other setup steps done for your own computer, since things like `bazel`, `python`, etc. are already installed on the build server)
   1. Use ssh-keygen to create a public and private key.
 ```console
 # IMPORTANT These are the windows instructions.
@@ -51,106 +81,93 @@
 ```console
 ssh [user]@build.frc971.org -p 2222 -i ./ssh/id_971_rsa -L 971:127.0.0.1:3389
 ```
-  5. So at this point you run the Remote Desktop app in windows.
-Once you get there, all you need to do is put `127.0.0.1:9971` for the computer name, and use your SVN usrname.
-Once you get connected accept the server certificate, and then enter your password that you gave Stephan. (Its either something unique or your SVN pwd)
-Then select the Default panel config.
-You can exit the Remote Desktop if you are good w/ the raw cmd line interface.
-And for future logins all you have to do is tunnel and then login using the app.
-Now if you have a graphical application that you are developing (ie spline UI), then you have to run the build command in the Remote Desktop application.
- 
-# ONE VERY IMPORTANT LAST STEP
-In order for you to be able to commit, you need to run this command, replacing <YOUR EMAIL> w/ your email that is in gerrit.
+  5. So at this point you run the Remote Desktop app in Windows.  Once
+you get there, all you need to do is put `127.0.0.1:9971` for the
+computer name, and use your SVN username.  Once you get connected,
+accept the server certificate and then enter your password that you
+gave Stephan. (It's either something unique or your SVN pwd) Then
+select the Default panel config.  You can exit the Remote Desktop if
+you are good w/ the raw cmd line interface.  And for future logins all
+you have to do is tunnel and then login using the app.  Now if you
+have a graphical application that you are developing (e.g., spline
+UI), then you have to run the build command in the Remote Desktop
+application.
+  6. Very important: In order for you to be able to commit, you need
+  to configure your email address in `git`.  To do this, run the
+  following command, replacing `<YOUR EMAIL>` with the email that you are
+  using for Gerrit:
 ```console
 git config --global user.email "<YOUR EMAIL>"
 ```
-One thing that also needs to be said is that you DO NOT need to do any of the installation steps, or the step w/ `frc971.conf`.
-If there are any questions, use #coding so that other people who may reach the same issue can refer back to that.
-Some people that you can @ would be Het <--Insert your name here if you are pingable-->
+If there are any questions, post to the #coding Slack channel so that other people who may reach the same issue can refer back to that.
 
-### Some useful Bazel commands:
-  * Build and test everything (on the host system):
-```console
+
+### Bazel commands for building, testing, and deploying the code:
+  * Build and test everything (on the host system, for the roborio target-- note, this may take a while):
+```
 bazel test //...
-bazel build --cpu=roborio //...
+bazel build --cpu=roborio -c opt //...
 ```
   * Build the code for a specific robot:
 ```console
-bazel build --cpu=roborio -c opt //y2019/...
+# For the roborio:
+bazel build --cpu=roborio -c opt //y2020/...
 ```
-  * Download code to a robot:
-```console
-# First add a host entry in your ~/.ssh/known_hosts file for the roboRIO.
-# Do this by ssh'ing into the machine.  If you problems doing this, see
-# the notes below for more information on how to connect to the roboRIO.
-ssh admin@roboRIO-971-frc.local
-# If you see an error like:
-#    subprocess.CalledProcessError: Command '['rsync' ...
-#    ERROR: Non-zero return code '1' from command: Process exited with status 1
-# The above "ssh admin@roboRIO-971-frc.local" step has not been sucessfully completed.
-# If the roboRIO has been configued to use a static IP address like 10.9.71.2,
-# set the laptop to have an IP address on the 10.9.71.x subnet with a netmask
-# of 255.0.0.0.  The ".x" is different than the .2 for the roboRIO or any other
-# device on the network.  The driver station uses .5 or .6 so avoid those.
-# The radio uses .1 or .50 so avoid those too.  If you are at the school,
-# disconnect from the student wireless network or try setting your netmask to
-# 255.255.255.0 if you want to be on both networks.  The student wireless
-# network is on a 10.?.?.? subnet which can cause problems with connecting to
-# the robot.
-bazel run --cpu=roborio --compilation_mode=opt //y2018:download_stripped -- admin@roboRIO-971-frc.local
-# If this does not work, try
-bazel run --cpu=roborio --compilation_mode=opt //y2018:download_stripped -- admin@10.9.71.2
-# If this does not work, it probably means that the robot and laptop are on
-# different subnets.  They need to be on the same subnet for the laptop to
-# connect to the robot.  Connecting can be confirmed by using ping.
-ping roboRIO-971-frc.local
-# or
-ping 10.9.71.2
-# If this does not work, perhaps the roboRIO has not been configured to have
-# a static IP address.  Use a USB cable to connect from a Windows laptop to
-# the roboRIO and use Internet Explorer (IE) to configure the roboRIO
-# to have a static IP address of 10.9.71.2.  Inside IE, browse to
-# http://roborio-971-frc.local or http://172.22.11.2.  Click on the "Ethernet"
-# icon on the left, select "Static" for the "Configure IPv4 Address" option.
-# Set the "IPv4 Address" to 10.9.71.2. Set the "Subnet Mask" to "255.0.0.0".
-# Finally click on "Save" at the bottom of the screen.  If you have trouble
-# using an Ethernet cable, try using a USB cable.  USB cables are much
-# more reliable for connecting than using a Ethernet cabe.  USB cables work
-# for connecting to the robot on Windows and Linux computers.
-# Another option is to configure the laptop to have a link-local connection
-# by going using the "Network Settings" GUI.  The laptop will then be on the
-# same subnet in the address range of 169.254.0.0 to 169.254.255.255.
-# James thinks this will only work over Ethernet (i.e., not USB; he is not
-# sure what will happen if you attempt this over USB), and if the robot
-# does *not* have a static IP address set and there is no DHCP server
-# assigning an IP address to the roboRIO.  James says to also note that this
-# implies that the roboRIO will also have a 169.254.*.* IP addresss, and
-# that the only simple way to figure it out is to use mDNS.
-#
 ```
-  * To configure a freshly imaged roboRIO:
+# For the raspberry pi:
+bazel build --cpu=armhf-debian -c opt //y2020/...
+```
+
+  * Configuring a roborio: Freshly imaged roboRIOs need to be configured to run the 971 code
+at startup.  This is done by using the setup_roborio.sh script.
 ```console
-# Freshly imaged roboRIOs need to be configured to run the 971 code
-# at startup.  This is done by using the setup_roborio.sh script.
 bazel run -c opt //frc971/config:setup_roborio -- roboRIO-XXX-frc.local
 ```
 
-### Some other useful packages
+  * Download code to a robot:
+```console
+# For the roborio
+bazel run --cpu=roborio -c opt //y2020:download_stripped -- roboRIO-971-frc.local
+```
+This assumes the roborio is reachable at `roboRIO-971-frc.local`.  If that does not work, you can try with a static IP address like `10.9.71.2` (see troubleshooting below)
+```console
+# For the raspberry pi's
+bazel run --cpu=armhf-debian -c opt //y2020:pi_download_stripped -- 10.9.71.101
+```
+NOTE:
+  1. The raspberry pi's require that you have your ssh key installed on them in order to copy code over
+  2. They are configured to use the IP addresses 10.X.71.Y, where X is 9, 79, 89, or 99 depending on the robot number (971, 7971, 8971, or 9971, respectively), and Y is 101, 102, etc for pi #1, #2, etc.
+
+  * Downloading specific targets to the robot
+    1. Generally if you want to update what's running on the robot, you can use the `download_stripped` (or `pi_download_stripped`) targets.  These will rsync only the changed files, and so are pretty efficient.
+    2. If you have a need to compile a specific module, you can build stripped versions of the individual modules by adding "_stripped" to the module name.  For example, to build the calibration code (`//y2020/vision:calibration`) for the pi (`armhf-debian`), run:
+    ```console
+    bazel run --cpu=armhf-debian -c opt //y2020/vision:calibration_stripped
+    ```
+    You will then need to manually copy the resulting file over to the robot.
+
+## Code reviews
+We want all code to at least have a second person look it over before it gets merged into the `master` branch. Gerrit has [extensive documentation on starting reviews](https://software.frc971.org/gerrit/Documentation/user-upload.html).  There is also a good [intro User Guide](https://software.frc971.org/gerrit/Documentation/intro-user.html) and an [intro to working with Gerrit](https://gerrit-review.googlesource.com/Documentation/intro-gerrit-walkthrough.html) and [Gerrit workflows](https://docs.google.com/presentation/d/1C73UgQdzZDw0gzpaEqIC6SPujZJhqamyqO1XOHjH-uk)
+
+TL;DR: Make and commit your changes, do `git push origin HEAD:refs/for/master`, and then click on the provided link to add reviewers.  If you just upload a change without adding any reviewers, it might sit around for a long time before anybody else notices it.
+
+[git-review](http://manpages.debian.org/cgi-bin/man.cgi?query=git-review) can make the upload process simpler.
+
+
+
+### Some other useful packages <TODO: Need to review these>
 These aren't strictly necessary to build the code, but Michael found the
 additional tools provided by these packages useful to install when working with
 the code on May 13, 2018.
 
+
 ```console
 # Get some useful packages including git and subversion.
    apt-get update
-   apt-get install git subversion ruby python vim-gtk3 subversion-tools
-   apt-get install vim-doc git-doc git-gui git-svn exim4-doc-html ruby
-   apt-get install python python-scipy python-matplotlib libpython-dev
-   apt-get install bazel clang-format-3.8 clang-3.8  openjdk-8-jdk
-   apt-get install gfortran libblas-dev liblapack-dev avahi-daemon
-# fix a key problem with llvm.
-   wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key|sudo apt-key add -
-   apt-get update
+   apt-get install git git-gui vim-gtk3 
+   apt-get install vim-doc git-doc exim4-doc-html yapf
+   apt-get install bazel clang-format
+   apt-get install python avahi-daemon 
 # Install apt-file so that packages can be searched
    apt-get install apt-file
    apt-file update
@@ -164,6 +181,30 @@
 # understanding the revision history of the repository and viewing
 # log messages and changes.
    apt-get install gitg
+# Also may want to install `buildifier` for formatting BUILD files
+```
+
+### Creating ssh aliases
+
+It is also handy to alias logins to the raspberry pi's by adding lines like this to your ~/.ssh/config file:
+```console
+Host pi-7971-2
+    User pi
+    ForwardAgent yes
+    HostName 10.79.71.102
+    StrictHostKeyChecking no
+```
+or, for the roborio:
+```
+Host roborio-971
+    User admin
+    HostName 10.9.71.2
+    StrictHostKeyChecking no
+```
+This allows you to use the alias to `ping`, `ssh`, or run commands like:
+```
+# Download code to robot #7971's raspberry pi #2
+bazel run --cpu=armhf-debian -c opt //y2020:download_stripped -- pi-7971-2
 ```
 
 ### Roborio Kernel Traces
@@ -183,3 +224,63 @@
 ```
 You can then scp the `trace.dat` file to your computer and run `kernelshark
 trace.dat` (may require installing the `kernelshark` apt package).
+
+
+### Notes on troubleshooting network setup
+If the roboRIO has been configued to use a static IP address like
+10.9.71.2, set the laptop to have an IP address on the 10.9.71.x
+subnet with a netmask of 255.0.0.0.  The ".x" is different than the
+.2 for the roboRIO or any other device on the network.  The driver
+station uses .5 or .6 so avoid those.  The radio uses .1 or .50 so
+avoid those too.  A good choice might be in the 90-99 range.  If you
+are at the school, disconnect from the student wireless network or
+try setting your netmask to 255.255.255.0 if you want to be on both
+networks.  The student wireless network is on a 10.?.?.? subnet
+which can cause problems with connecting to the robot.
+
+If running Bazel on the `download_stripped` target does not work for
+the IP address you're using for the roborio or the raspberry pi, it
+probably means that the robot and laptop are on different subnets.
+They need to be on the same subnet for the laptop to connect to the
+robot.  Connecting can be confirmed by using ping.
+```
+ping roboRIO-971-frc.local
+```
+or
+```
+ping 10.9.71.2
+```
+
+If this does not work, perhaps the roboRIO has not been configured to
+have a static IP address.  Use a USB cable to connect from a Windows
+laptop to the roboRIO and use a web browser (Chrome is preferred,
+IE/Edge is not-- see [this technical
+note](https://docs.wpilib.org/en/stable/docs/software/roborio-info/roborio-web-dashboard.html))
+to configure the roboRIO to have a static IP address of 10.9.71.2.
+Browse to http://roborio-971-frc.local or http://172.22.11.2.  Click
+on the "Ethernet" icon on the left, select "Static" for the "Configure
+IPv4 Address" option.  Set the "IPv4 Address" to 10.9.71.2. Set the
+"Subnet Mask" to "255.0.0.0".  Finally click on "Save" at the bottom
+of the screen.  If you have trouble using an Ethernet cable, try using
+a USB cable (USB A->B).
+
+Another option is to configure the laptop to have a link-local connection
+by using the "Network Settings" GUI.  The laptop will then be on the
+same subnet in the address range of 169.254.0.0 to 169.254.255.255.
+James thinks this will only work over Ethernet (i.e., not USB; he is not
+sure what will happen if you attempt this over USB), and if the robot
+does *not* have a static IP address set and there is no DHCP server
+assigning an IP address to the roboRIO.  James says to also note that this
+implies that the roboRIO will also have a 169.254.*.* IP addresss, and
+that the only simple way to figure it out is to use mDNS.
+
+
+### Other resources
+  1. Intro to [AOS](./aos/README.md), our robot Operating System
+  2. Introductory example: [ping/pong](./aos/events/README.md)
+  3. Example of [logging](./aos/events/README.md)
+
+TODOs:
+  1. Add more on networking setup and troubleshooting
+  2. Move Roborio Kernel Traces out of here, maybe into documentation/
+  3. Currently requires `apt install libsigsegv2`, but this should be temporary
diff --git a/WORKSPACE b/WORKSPACE
index eb7ed28..9ddecff 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -70,6 +70,14 @@
     "//debian:m4.bzl",
     m4_debs = "files",
 )
+load(
+    "//debian:lzma_amd64.bzl",
+    lzma_amd64_debs = "files",
+)
+load(
+    "//debian:lzma_arm64.bzl",
+    lzma_arm64_debs = "files",
+)
 load("//debian:packages.bzl", "generate_repositories_for_debs")
 
 generate_repositories_for_debs(python_debs)
@@ -106,6 +114,10 @@
 
 generate_repositories_for_debs(m4_debs)
 
+generate_repositories_for_debs(lzma_amd64_debs)
+
+generate_repositories_for_debs(lzma_arm64_debs)
+
 http_archive(
     name = "python_repo",
     build_file = "@//debian:python.BUILD",
@@ -753,3 +765,45 @@
     sha256 = "ee8dfe664ac8c1d066bab64f71bd076a021875581b3cc47dac4a14a475f50b15",
     url = "http://www.frc971.org/Build-Dependencies/m4.tar.gz",
 )
+
+# //debian:lzma_amd64
+http_archive(
+    name = "lzma_amd64",
+    build_file_content = """
+cc_library(
+    name = "lib",
+    srcs = [
+        "usr/lib/x86_64-linux-gnu/liblzma.a",
+    ],
+    hdrs = glob([
+        "usr/include/lzma/*.h",
+        "usr/include/*.h",
+    ]),
+    strip_include_prefix = "usr/include",
+    visibility = ["//visibility:public"],
+)
+""",
+    sha256 = "e0ccaa7f793e44638e9f89570e00f146073a98a5928e0b547146c8184488bb19",
+    urls = ["http://www.frc971.org/Build-Dependencies/lzma_amd64.tar.gz"],
+)
+
+# //debian:lzma_arm64
+http_archive(
+    name = "lzma_arm64",
+    build_file_content = """
+cc_library(
+    name = "lib",
+    srcs = [
+        "usr/lib/aarch64-linux-gnu/liblzma.a",
+    ],
+    hdrs = glob([
+        "usr/include/lzma/*.h",
+        "usr/include/*.h",
+    ]),
+    strip_include_prefix = "usr/include",
+    visibility = ["//visibility:public"],
+)
+""",
+    sha256 = "18db35669ee49a5f8324a344071dd4ab553e716f385fb75747b909bd1de959f5",
+    urls = ["http://www.frc971.org/Build-Dependencies/lzma_arm64.tar.gz"],
+)
diff --git a/aos/BUILD b/aos/BUILD
index 286882f..a46911e 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -453,6 +453,7 @@
     visibility = ["//visibility:public"],
     deps = [
         "//aos:macros",
+        "//aos/containers:resizeable_buffer",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_github_google_glog//:glog",
         "@com_google_absl//absl/strings",
diff --git a/aos/README.md b/aos/README.md
new file mode 100644
index 0000000..cbf8a56
--- /dev/null
+++ b/aos/README.md
@@ -0,0 +1,35 @@
+See [Top level README](../README.md) for overall information
+
+### `aos_dump`
+
+For examples on viewing events and logging, see the [README.md](./events/README.md) file in `aos/events`
+
+
+### NOTES
+
+Some functions need to be in separate translation units in order for them to be guaranteed to work. As the C standard says,
+
+> Alternatively, an implementation might perform various optimizations
+> within each translation unit, such that the actual semantics would
+> agree with the abstract semantics only when making function calls
+> across translation unit boundaries. In such an implementation, at the
+> time of each function entry and function return where the calling
+> function and the called function are in different translation units,
+> the values of all externally linked objects and of all objects
+> accessible via pointers therein would agree with the abstract
+> semantics. Furthermore, at the time of each such function entry the
+> values of the parameters of the called function and of all objects
+> accessible via pointers therein would agree with the abstract
+> semantics. In this type of implementation, objects referred to by
+> interrupt service routines activated by the signal function would
+> require explicit specification of volatile storage, as well as other
+> implementation-defined restrictions.
+
+### FILES  <TODO: I believe these are no longer correct>
+- `config/` has some configuration files
+  - `aos.conf` (currently in `aos` folder) has directions for setting up resource limits so you can run the code on any linux machine (the directions are there to keep them with the file)
+  - `setup_rc_caps.sh` (currently in `aos` folder) is a shell script (you have to run as root) that lets you run the realtime code without installing aos.conf for an individual file
+  - `starter` is an init.d file
+    - install it by putting it in /etc/init.d an running `update-rc.d starter defaults`
+    - restart it by running `invoke-rc.d starter restart` (doesn't always work very well...)
+  - the .config files are for building linux kernels
diff --git a/aos/README.txt b/aos/README.txt
deleted file mode 100644
index e0f9377..0000000
--- a/aos/README.txt
+++ /dev/null
@@ -1,34 +0,0 @@
-see ../README.txt for overall information
-
-[FILES]
-config/ has some configuration files
-	aos.conf has directions for setting up resource limits so you can run the code on any linux machine (the directions are there to keep them with the file)
-	setup_rc_caps.sh is a shell script (you have to run as root) that lets you run the realtime code without installing aos.conf for an individual file
-	starter is an init.d file
-		install it by putting it in /etc/init.d an running "update-rc.d starter defaults"
-		restart it by running "invoke-rc.d starter restart" (doesn't always work very well...)
-	the .config files are for building linux kernels
-
-linux/ has code that only runs on linux systems
-
-common/ is where stuff that runs on all platforms is
-common/input/ is where the framework code for reading stuff into the queues system is
-common/output/ is where the framework for writing things out of the queues system is
-common/messages is where the c++ wrappers for "queues" are
-
-[NOTES]
-Some functions need to be in separate translation units in order for them to be guaranteed to work. As the C standard says,
-	Alternatively, an implementation might perform various optimizations within each translation unit, such
-	that the actual semantics would agree with the abstract semantics only when making function calls across
-	translation unit boundaries. In such an implementation, at the time of each function entry and function
-	return where the calling function and the called function are in different translation units, the values of all
-	externally linked objects and of all objects accessible via pointers therein would agree with the abstract
-	semantics. Furthermore, at the time of each such function entry the values of the parameters of the called
-	function and of all objects accessible via pointers therein would agree with the abstract semantics. In this
-	type of implementation, objects referred to by interrupt service routines activated by the signal function
-	would require explicit specification of volatile storage, as well as other implementation-defined
-	restrictions.
-
-The C++ namespace aos is used for all of the aos code. The crio and linux_code namespaces are for APIs that only make sense on one platform or the other.
-
-Almost everything in aos/ is thread-safe. Files with a main() might not be, and some pieces of code explicitly say at their declaration that they aren't. Also, code which shares non-const pointers (including this) is not generally thread-safe. Other than those exceptions, everything is assumed to be thread-safe so the comments don't say so explicitly.
diff --git a/aos/aos_dump.cc b/aos/aos_dump.cc
index 055700f..a0cca5d 100644
--- a/aos/aos_dump.cc
+++ b/aos/aos_dump.cc
@@ -14,6 +14,9 @@
             "If true, fetch the current message on the channel first");
 DEFINE_bool(pretty, false,
             "If true, pretty print the messages on multiple lines");
+DEFINE_bool(all, false,
+            "If true, print out the channels for all nodes, not just the "
+            "channels which are visible on this node.");
 DEFINE_bool(print_timestamps, true, "If true, timestamps are printed.");
 DEFINE_uint64(count, 0,
               "If >0, aos_dump will exit after printing this many messages.");
@@ -75,8 +78,11 @@
   if (argc == 1) {
     std::cout << "Channels:\n";
     for (const aos::Channel *channel : *config_msg->channels()) {
-      std::cout << channel->name()->c_str() << ' ' << channel->type()->c_str()
-                << '\n';
+      if (FLAGS_all || aos::configuration::ChannelIsReadableOnNode(
+                           channel, event_loop.node())) {
+        std::cout << channel->name()->c_str() << ' ' << channel->type()->c_str()
+                  << '\n';
+      }
     }
     return 0;
   }
@@ -86,6 +92,10 @@
       config_msg->channels();
   bool found_exact = false;
   for (const aos::Channel *channel : *channels) {
+    if (!FLAGS_all && !aos::configuration::ChannelIsReadableOnNode(
+                          channel, event_loop.node())) {
+      continue;
+    }
     if (channel->name()->c_str() != channel_name) {
       continue;
     }
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index 9a24c8a..890bad0 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -131,6 +131,10 @@
   // The node that this application will be started on.
   // TODO(austin): Teach starter how to use this for starting applications.
   node:string;
+
+  // The user to run this application as. If this field is unset, run it as
+  // the current user of the application starter.
+  user:string;
 }
 
 // Per node data and connection information.
diff --git a/aos/containers/BUILD b/aos/containers/BUILD
index 3e24fab..8277212 100644
--- a/aos/containers/BUILD
+++ b/aos/containers/BUILD
@@ -56,3 +56,13 @@
         "//aos/testing:googletest",
     ],
 )
+
+cc_library(
+    name = "resizeable_buffer",
+    hdrs = [
+        "resizeable_buffer.h",
+    ],
+    deps = [
+        "@com_github_google_glog//:glog",
+    ],
+)
diff --git a/aos/containers/resizeable_buffer.h b/aos/containers/resizeable_buffer.h
new file mode 100644
index 0000000..b94f8cc
--- /dev/null
+++ b/aos/containers/resizeable_buffer.h
@@ -0,0 +1,101 @@
+#ifndef AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
+#define AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
+
+#include <stdlib.h>
+
+#include <memory>
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// Kind of like a subset of vector<uint8_t>, but with less destructor calls.
+// When building unoptimized, especially with sanitizers, the vector<uint8_t>
+// version ends up being really slow in tests.
+class ResizeableBuffer {
+ public:
+  ResizeableBuffer() = default;
+
+  ResizeableBuffer(const ResizeableBuffer &other) { *this = other; }
+  ResizeableBuffer(ResizeableBuffer &&other) { *this = std::move(other); }
+  ResizeableBuffer &operator=(const ResizeableBuffer &other) {
+    resize(other.size());
+    memcpy(storage_.get(), other.storage_.get(), size());
+    return *this;
+  }
+  ResizeableBuffer &operator=(ResizeableBuffer &&other) {
+    std::swap(storage_, other.storage_);
+    std::swap(size_, other.size_);
+    std::swap(capacity_, other.capacity_);
+    return *this;
+  }
+
+  uint8_t *data() { return static_cast<uint8_t *>(storage_.get()); }
+  const uint8_t *data() const {
+    return static_cast<const uint8_t *>(storage_.get());
+  }
+
+  uint8_t *begin() { return data(); }
+  const uint8_t *begin() const { return data(); }
+  uint8_t *end() { return data() + size(); }
+  const uint8_t *end() const { return data() + size(); }
+
+  size_t size() const { return size_; }
+  size_t capacity() const { return capacity_; }
+
+  void resize(size_t new_size) {
+    if (new_size > capacity_) {
+      Allocate(new_size);
+    }
+    size_ = new_size;
+  }
+
+  void erase_front(size_t count) {
+    if (count == 0) {
+      return;
+    }
+    CHECK_LE(count, size_);
+    memmove(static_cast<void *>(data()), static_cast<void *>(data() + count),
+            size_ - count);
+    size_ -= count;
+  }
+
+  void push_back(uint8_t x) {
+    if (size_ == capacity_) {
+      Allocate(std::max<size_t>(16, size_ * 2));
+    }
+    *end() = x;
+    ++size_;
+    CHECK_LE(size_, capacity_);
+  }
+
+ private:
+  // We need this silly function because C++ is bad, and extensions to it which
+  // we work with make it a true nightmare.
+  //
+  // (a) You can't easily write out the signature of free because it depends on
+  // whether exceptions are enabled or not. You could use decltype, but see (b).
+  //
+  // (b) You can't easily write &free because CUDA overloads it with a
+  // __device__ version. You could cast to the appropriate version, but see (a).
+  //
+  // There's probably some kind of SFINAE thing which could find the matching
+  // signature from a set of choices, and then we could just
+  // static_cast<TheSignature>(&free). However, that sounds like a nightmare,
+  // especially because it has to conditionally enable the part mentioning CUDA
+  // identifiers in the preprocessor. This little function is way simpler.
+  static void DoFree(void *p) { free(p); }
+
+  void Allocate(size_t new_capacity) {
+    void *const old = storage_.release();
+    storage_.reset(CHECK_NOTNULL(realloc(old, new_capacity)));
+    capacity_ = new_capacity;
+  }
+
+  std::unique_ptr<void, decltype(&DoFree)> storage_{nullptr, &DoFree};
+  size_t size_ = 0, capacity_ = 0;
+};
+
+}  // namespace aos
+
+#endif  // AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 5912ac6..59968c3 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -271,6 +271,7 @@
     shard_count = 4,
     deps = [
         ":event_loop_param_test",
+        ":message_counter",
         ":ping_lib",
         ":pong_lib",
         ":simulated_event_loop",
@@ -341,3 +342,12 @@
         "//aos/logging:log_message_fbs",
     ],
 )
+
+cc_library(
+    name = "message_counter",
+    hdrs = ["message_counter.h"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":event_loop",
+    ],
+)
diff --git a/aos/events/README.md b/aos/events/README.md
new file mode 100644
index 0000000..875e1b7
--- /dev/null
+++ b/aos/events/README.md
@@ -0,0 +1,74 @@
+## How to run ping & pong
+
+Running ping<->pong is a nice way to test that you can run some basic code and shows how messaging can work between two nodes
+
+### Setup real-time niceties:
+  1. Add the following lines to `/etc/security/limits.d/rt.conf`, replacing "USERNAME" with the username you're running under.  You'll probably need to do this as root, e.g., `sudo nano /etc/security/limits.d/rt.conf`
+```
+USERNAME - nice -20
+USERNAME - rtprio 95
+USERNAME - memlock unlimited
+```
+
+  2. Reboot your machine to pick up the changes
+
+### Compile and run the code
+  1. Compile the code for ping and pong, as well as aos_dump for looking at the messages.  We'll assume throughout that you're running from the top level directory of the 971 code.
+  ```
+  bazel build -c opt //aos/events:ping //aos/events:pong //aos:aos_dump
+  ```
+
+  2. In 2 separate windows, run the ping and pong commands using the `pingpong_config.json` config file:
+    1. `bazel-bin/aos/events/ping --config bazel-bin/aos/events/pingpong_config.json`
+    2. `bazel-bin/aos/events/pong --config bazel-bin/aos/events/pingpong_config.json`
+
+  3. In a third window, explore the message stream using `aos_dump`.  Some things you can do:
+    1. List the channels:
+       `bazel-bin/aos/aos_dump --config bazel-bin/aos/events/pingpong_config.json`
+    2. Listen to a specific topic on a channel-- copy one of the channels listed in the first step and put it at the end of the aos_dump command (e.g., "/test aos.examples.Ping")
+       `bazel-bin/aos/aos_dump --config bazel-bin/aos/events/pingpong_config.json /test aos.examples.Ping`
+    3. Listen to multiple topics on a channel (e.g., all the topics published on "/test")
+       `bazel-bin/aos/aos_dump --config bazel-bin/aos/events/pingpong_config.json /test`
+
+
+NOTE: To make life easier, you can alias `aos_dump` to include the path and the config file (you may want to specify the full path to the aos_dump executable so it can be run from anywhere)
+```
+alias aos_dump='bazel-bin/aos/aos_dump --config bazel-bin/aos/events/pingpong_config.json'
+```
+
+## Logging
+
+In addition to running ping and pong, this is a good example to explore event logging.
+
+  1. Start by compiling the code:
+  ```
+  bazel build -c opt //aos/events/logging:logger_main
+  ```
+
+  2. Create a folder for the log files, e.g., 
+  ```
+  mkdir /tmp/log_folder
+  ```
+
+  3. Run the command to log the data:
+  ```
+  bazel-bin/aos/events/logging/logger_main --config bazel-bin/aos/events/pingpong_config.json --logging_folder /tmp/log_folder/
+  ```
+
+A log file should be created in /tmp/log_folder, with a name something like `fbs_log-001.bfbs`.
+
+If you're running ping and pong at the same time, you should be able to watch the log file grow in size as events are being logged.  For example, running `ls -lh /tmp/log_folder/*` will list the files and their sizes.  Doing this periodically (e.g., every 15-30 seconds) should show the new log file growing in size.
+
+   4. View the contents of the log file (using `log_cat` functionality found in the `logging` sub-directory):
+      1. Compile `log_cat`:
+      ```
+      bazel build -c opt //aos/events/logging:log_cat
+      ```
+
+      2. Run the binary pointed at the recorded event log (this assumes you're running from the base directory of the repository and that the file is named `fbs_log-001.bfbs`):
+      ```
+      bazel-bin/aos/events/logging/log_cat /tmp/log_folder/fbs_log-001.bfbs
+      ```
+
+## EXERCISE:
+   1. Modify code in ping and pong to see a difference in their behavior, e.g., increment the counter by 2's instead of 1
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 1e7d25b..6ffb0fe 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -79,9 +79,10 @@
   // underlying flatbuffer.
   template <typename T>
   FlatbufferVector<T> CopyFlatBuffer() const {
-    return FlatbufferVector<T>(
-        std::vector<uint8_t>(reinterpret_cast<const uint8_t *>(data),
-                             reinterpret_cast<const uint8_t *>(data) + size));
+    ResizeableBuffer buffer;
+    buffer.resize(size);
+    memcpy(buffer.data(), data, size);
+    return FlatbufferVector<T>(std::move(buffer));
   }
 };
 
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 56bef43..ad2f2d2 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -21,15 +21,153 @@
     ],
     visibility = ["//visibility:public"],
     deps = [
+        ":buffer_encoder",
         ":logger_fbs",
         "//aos:configuration",
         "//aos:flatbuffer_merge",
+        "//aos:flatbuffers",
+        "//aos/containers:resizeable_buffer",
         "//aos/events:event_loop",
         "//aos/util:file",
         "@com_github_gflags_gflags//:gflags",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_github_google_glog//:glog",
         "@com_google_absl//absl/types:span",
+    ] + select({
+        "//tools:cpu_k8": [":lzma_encoder"],
+        "//tools:cpu_arm64": [":lzma_encoder"],
+        "//conditions:default": [],
+    }),
+)
+
+cc_binary(
+    name = "logfile_utils_out_of_space_test_runner",
+    testonly = True,
+    srcs = [
+        "logfile_utils_out_of_space_test_runner.cc",
+    ],
+    deps = [
+        ":logfile_utils",
+        "//aos:init",
+        "@com_github_gflags_gflags//:gflags",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+sh_test(
+    name = "logfile_utils_out_of_space_test",
+    srcs = [
+        "logfile_utils_out_of_space_test.sh",
+    ],
+    data = [
+        ":logfile_utils_out_of_space_test_runner",
+    ],
+    deps = [
+        "@bazel_tools//tools/bash/runfiles",
+    ],
+)
+
+cc_library(
+    name = "buffer_encoder",
+    srcs = [
+        "buffer_encoder.cc",
+    ],
+    hdrs = [
+        "buffer_encoder.h",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":logger_fbs",
+        "//aos:configuration_fbs",
+        "@com_github_google_flatbuffers//:flatbuffers",
+        "@com_github_google_glog//:glog",
+        "@com_google_absl//absl/types:span",
+    ],
+)
+
+cc_test(
+    name = "buffer_encoder_test",
+    srcs = [
+        "buffer_encoder_test.cc",
+    ],
+    shard_count = 4,
+    deps = [
+        ":buffer_encoder",
+        ":buffer_encoder_param_test",
+        "//aos/testing:googletest",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+# TODO(Brian): Properly restrict this to specific platforms and
+# un-conditionalize the srcs and hdrs once we have support for that which
+# interacts with select properly.
+cc_library(
+    name = "lzma_encoder",
+    srcs = select({
+        "//tools:cpu_k8": [
+            "lzma_encoder.cc",
+        ],
+        "//tools:cpu_arm64": [
+            "lzma_encoder.cc",
+        ],
+        "//conditions:default": [],
+    }),
+    hdrs = select({
+        "//tools:cpu_k8": [
+            "lzma_encoder.h",
+        ],
+        "//tools:cpu_arm64": [
+            "lzma_encoder.h",
+        ],
+        "//conditions:default": [],
+    }),
+    visibility = ["//visibility:public"],
+    deps = [
+        ":buffer_encoder",
+        ":logger_fbs",
+        "//aos:configuration_fbs",
+        "//aos/containers:resizeable_buffer",
+        "//third_party:lzma",
+        "@com_github_google_flatbuffers//:flatbuffers",
+        "@com_github_google_glog//:glog",
+        "@com_google_absl//absl/types:span",
+    ],
+)
+
+cc_test(
+    name = "lzma_encoder_test",
+    srcs = [
+        "lzma_encoder_test.cc",
+    ],
+    restricted_to = [
+        "//tools:k8",
+        "//tools:arm64",
+    ],
+    shard_count = 4,
+    deps = [
+        ":buffer_encoder_param_test",
+        ":lzma_encoder",
+        "//aos/testing:googletest",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_library(
+    name = "buffer_encoder_param_test",
+    testonly = True,
+    srcs = [
+        "buffer_encoder_param_test.cc",
+    ],
+    hdrs = [
+        "buffer_encoder_param_test.h",
+    ],
+    deps = [
+        ":logfile_utils",
+        ":logger_fbs",
+        "//aos/testing:googletest",
+        "//aos/testing:random_seed",
+        "@com_github_google_glog//:glog",
     ],
 )
 
@@ -153,8 +291,10 @@
         ":multinode_pingpong_config",
         "//aos/events:pingpong_config",
     ],
+    shard_count = 5,
     deps = [
         ":logger",
+        "//aos/events:message_counter",
         "//aos/events:ping_lib",
         "//aos/events:pong_lib",
         "//aos/events:simulated_event_loop",
diff --git a/aos/events/logging/buffer_encoder.cc b/aos/events/logging/buffer_encoder.cc
new file mode 100644
index 0000000..10a7ed1
--- /dev/null
+++ b/aos/events/logging/buffer_encoder.cc
@@ -0,0 +1,65 @@
+#include "aos/events/logging/buffer_encoder.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+
+void DummyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+  CHECK(in.data()) << ": Encode called with nullptr.";
+
+  total_bytes_ += in.size();
+  queue_.emplace_back(std::move(in));
+}
+
+void DummyEncoder::Clear(const int n) {
+  CHECK_GE(n, 0);
+  CHECK_LE(static_cast<size_t>(n), queue_size());
+  queue_.erase(queue_.begin(), queue_.begin() + n);
+}
+
+std::vector<absl::Span<const uint8_t>> DummyEncoder::queue() const {
+  std::vector<absl::Span<const uint8_t>> queue;
+  queue.reserve(queue_.size());
+  for (const auto &buffer : queue_) {
+    queue.emplace_back(buffer.data(), buffer.size());
+  }
+  return queue;
+}
+
+size_t DummyEncoder::queued_bytes() const {
+  size_t bytes = 0;
+  for (const auto &buffer : queue_) {
+    bytes += buffer.size();
+  }
+  return bytes;
+}
+
+DummyDecoder::DummyDecoder(std::string_view filename)
+    : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DummyDecoder::~DummyDecoder() {
+  int status = close(fd_);
+  if (status != 0) {
+    PLOG(ERROR) << "DummyDecoder: Failed to close file";
+  }
+}
+
+size_t DummyDecoder::Read(uint8_t *begin, uint8_t *end) {
+  if (end_of_file_) {
+    return 0;
+  }
+  const ssize_t count = read(fd_, begin, end - begin);
+  PCHECK(count >= 0) << ": Failed to read from file";
+  if (count == 0) {
+    end_of_file_ = true;
+  }
+  return static_cast<size_t>(count);
+}
+
+}  // namespace aos::logger
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
new file mode 100644
index 0000000..cc16fea
--- /dev/null
+++ b/aos/events/logging/buffer_encoder.h
@@ -0,0 +1,108 @@
+#ifndef AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
+#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Interface to encode DetachedBuffers as they are written to a file.
+//
+// The interface to a file is represented as a series of buffers, appropriate to
+// pass to writev(2). The backing storage for these is managed internally so
+// subclasses can handle resizing/moving as desired. Implementations must
+// enqueue as many of these backing buffers as demanded, leaving it up to the
+// caller to write them to a file when desired.
+class DetachedBufferEncoder {
+ public:
+  virtual ~DetachedBufferEncoder() = default;
+
+  // Encodes and enqueues the given DetachedBuffer.
+  //
+  // Note that in may be moved-from, or it may be left unchanged, depending on
+  // what makes the most sense for a given implementation.
+  virtual void Encode(flatbuffers::DetachedBuffer &&in) = 0;
+
+  // If this returns true, the encoder may be bypassed by writing directly to
+  // the file.
+  virtual bool may_bypass() const { return false; }
+
+  // Finalizes the encoding process. After this, queue_size() represents the
+  // full extent of data which will be written to this file.
+  //
+  // Encode may not be called after this method.
+  virtual void Finish() = 0;
+
+  // Clears the first n encoded buffers from the queue.
+  virtual void Clear(int n) = 0;
+
+  // Returns a view of the queue of encoded buffers.
+  virtual std::vector<absl::Span<const uint8_t>> queue() const = 0;
+
+  // Returns the total number of of bytes currently queued up.
+  virtual size_t queued_bytes() const = 0;
+
+  // Returns the cumulative number of bytes which have been queued. This
+  // includes data which has been removed via Clear.
+  virtual size_t total_bytes() const = 0;
+
+  // Returns the number of elements in the queue.
+  virtual size_t queue_size() const = 0;
+};
+
+// This class does not encode the data. It just claims ownership of the raw data
+// and queues it up as is.
+class DummyEncoder final : public DetachedBufferEncoder {
+ public:
+  ~DummyEncoder() override = default;
+
+  // No encoding happens, the raw data is queued up as is.
+  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  bool may_bypass() const final { return true; }
+  void Finish() final {}
+  void Clear(int n) final;
+  std::vector<absl::Span<const uint8_t>> queue() const final;
+  size_t queued_bytes() const final;
+  size_t total_bytes() const final { return total_bytes_; }
+  size_t queue_size() const final { return queue_.size(); }
+
+ private:
+  std::vector<flatbuffers::DetachedBuffer> queue_;
+  size_t total_bytes_ = 0;
+};
+
+// Interface to decode chunks of data. Implementations of this interface will
+// manage opening, reading, and closing the file stream.
+class DataDecoder {
+ public:
+  virtual ~DataDecoder() = default;
+
+  // Reads data into the given range. Returns the number of bytes read.
+  //
+  // Returns less than end-begin if all bytes have been read. Otherwise, this
+  // will always fill the whole range.
+  virtual size_t Read(uint8_t *begin, uint8_t *end) = 0;
+};
+
+// Simply reads the contents of the file into the target buffer.
+class DummyDecoder final : public DataDecoder {
+ public:
+  explicit DummyDecoder(std::string_view filename);
+  ~DummyDecoder() override;
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+  // File descriptor for the log file.
+  int fd_;
+
+  // Cached bit for if we have reached the end of the file.  Otherwise we will
+  // hammer on the kernel asking for more data each time we send.
+  bool end_of_file_ = false;
+};
+
+}  // namespace aos::logger
+
+#endif  // AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
diff --git a/aos/events/logging/buffer_encoder_param_test.cc b/aos/events/logging/buffer_encoder_param_test.cc
new file mode 100644
index 0000000..825696e
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_param_test.cc
@@ -0,0 +1,92 @@
+#include "aos/events/logging/buffer_encoder_param_test.h"
+
+#include "gmock/gmock.h"
+
+namespace aos::logger::testing {
+
+// Verifies that Clear affects the sizes as expected.
+TEST_P(BufferEncoderTest, ClearAndSizes) {
+  const auto encoder = MakeEncoder();
+
+  std::uniform_int_distribution<int> quantity_distribution(500, 600);
+  const int first_quantity = quantity_distribution(*random_number_generator());
+  const int second_quantity = quantity_distribution(*random_number_generator());
+
+  const auto first_buffers = CreateAndEncode(first_quantity, encoder.get());
+  const auto first_size = encoder->queued_bytes();
+  // We want at least 2 buffers to ensure this test is actually validating the
+  // behavior.
+  ASSERT_GT(first_size, 2u) << ": Encoder chunk size is too big for this test";
+  ASSERT_EQ(encoder->total_bytes(), first_size);
+  ASSERT_EQ(TotalSize(encoder->queue()), first_size);
+  ASSERT_EQ(encoder->queue_size(), encoder->queue().size());
+
+  encoder->Clear(encoder->queue_size());
+  ASSERT_EQ(encoder->queued_bytes(), 0);
+  ASSERT_EQ(encoder->total_bytes(), first_size);
+  ASSERT_EQ(encoder->queue().size(), 0);
+  ASSERT_EQ(encoder->queue_size(), 0);
+
+  const auto second_buffers = CreateAndEncode(second_quantity, encoder.get());
+  const auto second_size = encoder->queued_bytes();
+  ASSERT_GT(second_size, 2u) << ": Encoder chunk size is too big for this test";
+  ASSERT_EQ(encoder->total_bytes(), first_size + second_size);
+  ASSERT_EQ(TotalSize(encoder->queue()), second_size);
+  ASSERT_EQ(encoder->queue_size(), encoder->queue().size());
+}
+
+// Runs data in randomly-chunked sizes through the encoder and decoder to verify
+// it comes back out the same.
+TEST_P(BufferEncoderTest, RoundTrip) {
+  std::uniform_int_distribution<int> quantity_distribution(20, 60);
+  const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+  const std::string file_path = std::string(test_dir) + "/foo";
+
+  std::vector<std::vector<uint8_t>> encoded_buffers;
+  {
+    const int encode_chunks = quantity_distribution(*random_number_generator());
+    const auto encoder = MakeEncoder();
+    encoded_buffers = CreateAndEncode(encode_chunks, encoder.get());
+    encoder->Finish();
+
+    std::ofstream output_file(file_path, std::ios::binary);
+    for (auto span : encoder->queue()) {
+      output_file.write(reinterpret_cast<const char *>(span.data()),
+                        span.size());
+    }
+  }
+
+  const size_t total_encoded_size = TotalSize(encoded_buffers);
+
+  // Try decoding in multiple random chunkings.
+  for (int i = 0; i < 20; ++i) {
+    const auto decoder = MakeDecoder(file_path);
+    std::vector<std::vector<uint8_t>> decoded_buffers;
+    size_t total_decoded_size = 0;
+    while (true) {
+      const int chunk_size = quantity_distribution(*random_number_generator());
+      std::vector<uint8_t> chunk(chunk_size);
+      const size_t read_result =
+          decoder->Read(chunk.data(), chunk.data() + chunk_size);
+      if (read_result + total_decoded_size != total_encoded_size) {
+        // We didn't read anything, so we should've read the complete chunk.
+        ASSERT_EQ(read_result, chunk_size);
+      }
+      // Eventually we'll get here, once the decoder is really sure it's done.
+      if (read_result == 0) {
+        // Sanity check the math in the test code.
+        CHECK_EQ(total_decoded_size, TotalSize(decoded_buffers));
+        // Bail out because we're done.
+        break;
+      }
+      // If we're at the end, trim off the 0s so our comparison later works out.
+      chunk.resize(read_result);
+      total_decoded_size += read_result;
+      decoded_buffers.emplace_back(std::move(chunk));
+    }
+    ASSERT_THAT(Flatten(decoded_buffers),
+                ::testing::Eq(Flatten(encoded_buffers)));
+  }
+}
+
+}  // namespace aos::logger::testing
diff --git a/aos/events/logging/buffer_encoder_param_test.h b/aos/events/logging/buffer_encoder_param_test.h
new file mode 100644
index 0000000..fd1a00c
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_param_test.h
@@ -0,0 +1,114 @@
+#ifndef AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
+#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
+
+#include <functional>
+#include <memory>
+#include <random>
+#include <vector>
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/testing/random_seed.h"
+
+namespace aos::logger::testing {
+
+// Contains some helpful infrastructure for testing DetachedBufferEncoder
+// implementations.
+class BufferEncoderBaseTest : public ::testing::Test {
+ public:
+  // Creates and encodes n random buffers, returning a copy of the encoded data.
+  std::vector<std::vector<uint8_t>> CreateAndEncode(
+      int n, DetachedBufferEncoder *encoder) {
+    std::vector<std::vector<uint8_t>> result;
+    for (int i = 0; i < n; i++) {
+      flatbuffers::DetachedBuffer buffer = CreateRandomBuffer();
+      result.emplace_back(buffer.data(), buffer.data() + buffer.size());
+      encoder->Encode(std::move(buffer));
+    }
+    return result;
+  }
+
+  // Returns the total size of a vector full of objects with a size() method.
+  template <typename T>
+  static size_t TotalSize(const std::vector<T> &buffers) {
+    size_t result = 0;
+    for (const auto &v : buffers) {
+      result += v.size();
+    }
+    return result;
+  }
+
+  // Returns a flattened copy of a vector of sequences.
+  template <typename T>
+  static std::vector<typename T::value_type> Flatten(
+      const std::vector<T> &buffers) {
+    std::vector<typename T::value_type> result;
+    for (const auto &buffer : buffers) {
+      result.insert(result.end(), buffer.begin(), buffer.end());
+    }
+    return result;
+  }
+
+  void Reseed(int seed) {
+    // This algorithm allows a reasonable --runs_per_test to cover new seeds.
+    random_number_generator_ =
+        std::mt19937(::aos::testing::RandomSeed() + 1000 * seed);
+  }
+
+  std::mt19937 *random_number_generator() { return &random_number_generator_; }
+
+ private:
+  flatbuffers::DetachedBuffer CreateRandomBuffer() {
+    std::uniform_int_distribution<int> length_distribution(2800, 3000);
+    const size_t length = length_distribution(random_number_generator_);
+
+    flatbuffers::FlatBufferBuilder fbb;
+    uint8_t *data;
+    const flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+        fbb.CreateUninitializedVector(length, &data);
+    {
+      std::independent_bits_engine<std::mt19937, CHAR_BIT, uint8_t> engine(
+          std::ref(random_number_generator_));
+      const uint8_t byte = engine();
+      std::fill(data, data + length, byte);
+    }
+
+    MessageHeader::Builder builder(fbb);
+    builder.add_data(data_offset);
+    fbb.FinishSizePrefixed(builder.Finish());
+
+    return fbb.Release();
+  }
+
+  std::mt19937 random_number_generator_{
+      std::mt19937(::aos::testing::RandomSeed())};
+};
+
+// Tests some generic properties for any DetachedBufferEncoder+DataDecoder
+// implementation pair.
+//
+// First and second test parameters are methods to create instances of the
+// encoder and decoder. Third test parameter is a random seed, to allow testing
+// many different data patterns.
+class BufferEncoderTest
+    : public BufferEncoderBaseTest,
+      public ::testing::WithParamInterface<std::tuple<
+          std::function<std::unique_ptr<DetachedBufferEncoder>()>,
+          std::function<std::unique_ptr<DataDecoder>(std::string_view)>, int>> {
+ public:
+  BufferEncoderTest() { Reseed(std::get<2>(GetParam())); }
+
+  std::unique_ptr<DetachedBufferEncoder> MakeEncoder() const {
+    return std::get<0>(GetParam())();
+  }
+  std::unique_ptr<DataDecoder> MakeDecoder(std::string_view filename) const {
+    return std::get<1>(GetParam())(filename);
+  }
+};
+
+}  // namespace aos::logger::testing
+
+#endif  // AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
diff --git a/aos/events/logging/buffer_encoder_test.cc b/aos/events/logging/buffer_encoder_test.cc
new file mode 100644
index 0000000..a9c2bbb
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_test.cc
@@ -0,0 +1,106 @@
+#include "aos/events/logging/buffer_encoder.h"
+
+#include <algorithm>
+#include <fstream>
+#include <string>
+
+#include "glog/logging.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+
+namespace aos::logger::testing {
+
+class DummyEncoderTest : public BufferEncoderBaseTest {};
+
+// Tests that buffers are enqueued without any changes.
+TEST_F(DummyEncoderTest, QueuesBuffersAsIs) {
+  DummyEncoder encoder;
+  const auto expected = CreateAndEncode(100, &encoder);
+
+  auto queue = encoder.queue();
+  EXPECT_THAT(queue, ::testing::ElementsAreArray(expected));
+}
+
+// Checks that DummyDecoder can read into a buffer.
+TEST(DummyDecoderTest, ReadsIntoExactBuffer) {
+  static const std::string kTestString{"Just some random words."};
+
+  const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+  const std::string file_path = std::string(test_dir) + "/foo";
+  std::ofstream(file_path, std::ios::binary) << kTestString;
+
+  // Read the contents of the file into the buffer.
+  DummyDecoder dummy_decoder(file_path.c_str());
+  std::vector<uint8_t> buffer(kTestString.size());
+  const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+  ASSERT_EQ(std::string(buffer.data(), buffer.data() + count), kTestString);
+
+  for (int i = 0; i < 10; ++i) {
+    // Verify that there is no more data to read from the file.
+    ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+  }
+}
+
+// Checks that DummyDecoder can read into a buffer that can accommodate all the
+// data in the file.
+TEST(DummyDecoderTest, ReadsIntoLargerBuffer) {
+  static const std::string kTestString{"Just some random words."};
+
+  const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+  const std::string file_path = std::string(test_dir) + "/foo";
+  std::ofstream(file_path, std::ios::binary) << kTestString;
+
+  DummyDecoder dummy_decoder(file_path.c_str());
+  std::vector<uint8_t> buffer(100);
+  const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+  buffer.resize(count);
+  ASSERT_EQ(std::string(buffer.data(), buffer.data() + count), kTestString);
+
+  // Verify that there is no more data to read from the file.
+  ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+}
+
+// Checks that DummyDecoder can repeatedly read the contents of the file into a
+// smaller buffer until there is no more to read.
+TEST(DummyDecoderTest, ReadsRepeatedlyIntoSmallerBuffer) {
+  static const std::string kTestString{"Just some random words."};
+
+  const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+  const std::string file_path = std::string(test_dir) + "/foo";
+  std::ofstream(file_path, std::ios::binary) << kTestString;
+
+  DummyDecoder dummy_decoder(file_path.c_str());
+  std::vector<uint8_t> buffer((kTestString.size() + 1) / 2);
+
+  {
+    // Read into our buffer once, and verify the contents.
+    const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+    ASSERT_EQ(std::string(buffer.data(), buffer.data() + count),
+              kTestString.substr(0, buffer.size()));
+  }
+
+  {
+    // Read into the same buffer again, and verify the contents.
+    const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+    ASSERT_EQ(
+        std::string(buffer.data(), buffer.data() + count),
+        kTestString.substr(buffer.size(), kTestString.size() - buffer.size()));
+  }
+
+  // Verify that there is no more data to read from the file.
+  ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+}
+
+INSTANTIATE_TEST_CASE_P(
+    Dummy, BufferEncoderTest,
+    ::testing::Combine(::testing::Values([]() {
+                         return std::make_unique<DummyEncoder>();
+                       }),
+                       ::testing::Values([](std::string_view filename) {
+                         return std::make_unique<DummyDecoder>(filename);
+                       }),
+                       ::testing::Range(0, 100)));
+
+}  // namespace aos::logger::testing
diff --git a/aos/events/logging/eigen_mpq.h b/aos/events/logging/eigen_mpq.h
index 1463648..370d77e 100644
--- a/aos/events/logging/eigen_mpq.h
+++ b/aos/events/logging/eigen_mpq.h
@@ -8,8 +8,7 @@
 
 // TypeTraits for mpq_class.  This is only really enough to use inverse().
 template <>
-struct NumTraits<mpq_class>
-    : GenericNumTraits<mpq_class> {
+struct NumTraits<mpq_class> : GenericNumTraits<mpq_class> {
   typedef mpq_class Real;
   typedef mpq_class Literal;
   typedef mpq_class NonInteger;
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index f406d24..0afdc03 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -127,7 +127,7 @@
     unsorted_logfiles.emplace_back(std::string(argv[i]));
   }
 
-  std::vector<std::vector<std::string>> logfiles =
+  const std::vector<aos::logger::LogFile> logfiles =
       aos::logger::SortParts(unsorted_logfiles);
 
   aos::logger::LogReader reader(logfiles);
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index 4078f9b..02a1402 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -15,46 +15,48 @@
     "If true, replace the header on the log file with the JSON header.");
 DEFINE_string(
     header, "",
-    "If provided, this is the path to the JSON with the log file header.");
+    "If provided, this is the path to the JSON with the log file header.  If "
+    "not provided, _header.json will be appended to --logfile.");
 
 int main(int argc, char **argv) {
   gflags::SetUsageMessage(R"(This tool lets us manipulate log files.)");
   aos::InitGoogle(&argc, &argv);
 
-  if (!FLAGS_header.empty()) {
-    if (FLAGS_replace) {
-      const ::std::string header_json =
-          aos::util::ReadFileToStringOrDie(FLAGS_header);
-      flatbuffers::FlatBufferBuilder fbb;
-      fbb.ForceDefaults(true);
-      flatbuffers::Offset<aos::logger::LogFileHeader> header =
-          aos::JsonToFlatbuffer<aos::logger::LogFileHeader>(header_json, &fbb);
+  std::string header =
+      FLAGS_header.empty() ? (FLAGS_logfile + "_header.json") : FLAGS_header;
 
-      fbb.FinishSizePrefixed(header);
+  if (FLAGS_replace) {
+    const ::std::string header_json = aos::util::ReadFileToStringOrDie(header);
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.ForceDefaults(true);
+    flatbuffers::Offset<aos::logger::LogFileHeader> header =
+        aos::JsonToFlatbuffer<aos::logger::LogFileHeader>(header_json, &fbb);
 
-      const std::string orig_path = FLAGS_logfile + ".orig";
-      PCHECK(rename(FLAGS_logfile.c_str(), orig_path.c_str()) == 0);
+    fbb.FinishSizePrefixed(header);
 
-      aos::logger::SpanReader span_reader(orig_path);
-      CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
+    const std::string orig_path = FLAGS_logfile + ".orig";
+    PCHECK(rename(FLAGS_logfile.c_str(), orig_path.c_str()) == 0);
 
-      aos::logger::DetachedBufferWriter buffer_writer(FLAGS_logfile);
-      buffer_writer.QueueSizedFlatbuffer(&fbb);
+    aos::logger::SpanReader span_reader(orig_path);
+    CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
 
-      while (true) {
-        absl::Span<const uint8_t> msg_data = span_reader.ReadMessage();
-        if (msg_data == absl::Span<const uint8_t>()) {
-          break;
-        }
+    aos::logger::DetachedBufferWriter buffer_writer(
+        FLAGS_logfile, std::make_unique<aos::logger::DummyEncoder>());
+    buffer_writer.QueueSizedFlatbuffer(&fbb);
 
-        buffer_writer.WriteSizedFlatbuffer(msg_data);
+    while (true) {
+      absl::Span<const uint8_t> msg_data = span_reader.ReadMessage();
+      if (msg_data == absl::Span<const uint8_t>()) {
+        break;
       }
-    } else {
-      aos::logger::MessageReader reader(FLAGS_logfile);
-      aos::util::WriteStringToFileOrDie(
-          FLAGS_header, aos::FlatbufferToJson(reader.log_file_header(),
-                                              {.multi_line = true}));
+
+      buffer_writer.QueueSpan(msg_data);
     }
+  } else {
+    aos::logger::MessageReader reader(FLAGS_logfile);
+    aos::util::WriteStringToFileOrDie(
+        header,
+        aos::FlatbufferToJson(reader.log_file_header(), {.multi_line = true}));
   }
 
   return 0;
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index def0842..8dd5061 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -32,7 +32,7 @@
     const Node *node) {
   CHECK_EQ(node, this->node());
   UpdateHeader(header, uuid_, part_number_);
-  data_writer_->WriteSizedFlatbuffer(header->full_span());
+  data_writer_->QueueSpan(header->full_span());
 }
 
 DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
@@ -47,7 +47,7 @@
   ++part_number_;
   *data_writer_ = std::move(*OpenDataWriter());
   UpdateHeader(header, uuid_, part_number_);
-  data_writer_->WriteSizedFlatbuffer(header->full_span());
+  data_writer_->QueueSpan(header->full_span());
 }
 
 DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
@@ -81,14 +81,14 @@
     const Node *node) {
   if (node == this->node()) {
     UpdateHeader(header, uuid_, part_number_);
-    data_writer_->WriteSizedFlatbuffer(header->full_span());
+    data_writer_->QueueSpan(header->full_span());
   } else {
     for (std::pair<const Channel *const, DataWriter> &data_writer :
          data_writers_) {
       if (node == data_writer.second.node) {
         UpdateHeader(header, data_writer.second.uuid,
                      data_writer.second.part_number);
-        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+        data_writer.second.writer->QueueSpan(header->full_span());
       }
     }
   }
@@ -101,7 +101,7 @@
     ++part_number_;
     *data_writer_ = std::move(*OpenDataWriter());
     UpdateHeader(header, uuid_, part_number_);
-    data_writer_->WriteSizedFlatbuffer(header->full_span());
+    data_writer_->QueueSpan(header->full_span());
   } else {
     for (std::pair<const Channel *const, DataWriter> &data_writer :
          data_writers_) {
@@ -110,7 +110,7 @@
         data_writer.second.rotate(data_writer.first, &data_writer.second);
         UpdateHeader(header, data_writer.second.uuid,
                      data_writer.second.part_number);
-        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+        data_writer.second.writer->QueueSpan(header->full_span());
       }
     }
   }
@@ -188,11 +188,11 @@
 
 DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
     const Channel *channel) {
-  const bool log_delivery_times =
-      (this->node() == nullptr)
-          ? false
-          : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-                channel, this->node(), this->node());
+  bool log_delivery_times = false;
+  if (this->node() != nullptr) {
+    log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+        channel, this->node(), this->node());
+  }
   if (!log_delivery_times) {
     return nullptr;
   }
@@ -200,37 +200,76 @@
   return data_writer_.get();
 }
 
+void MultiNodeLogNamer::Close() {
+  for (std::pair<const Channel *const, DataWriter> &data_writer :
+       data_writers_) {
+    if (data_writer.second.writer) {
+      data_writer.second.writer->Close();
+      if (data_writer.second.writer->ran_out_of_space()) {
+        ran_out_of_space_ = true;
+        data_writer.second.writer->acknowledge_out_of_space();
+      }
+    }
+  }
+  if (data_writer_) {
+    data_writer_->Close();
+    if (data_writer_->ran_out_of_space()) {
+      ran_out_of_space_ = true;
+      data_writer_->acknowledge_out_of_space();
+    }
+  }
+}
+
 void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
                                                      DataWriter *data_writer) {
   std::string filename =
       absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
                    "/", channel->type()->string_view(), ".part",
                    data_writer->part_number, ".bfbs");
-
-  if (!data_writer->writer) {
-    data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
-  } else {
-    *data_writer->writer = DetachedBufferWriter(filename);
-  }
+  CreateBufferWriter(filename, &data_writer->writer);
 }
 
 void MultiNodeLogNamer::OpenWriter(const Channel *channel,
                                    DataWriter *data_writer) {
   const std::string filename = absl::StrCat(
-      base_name_, "_", channel->source_node()->string_view(), "_data",
-      channel->name()->string_view(), "/", channel->type()->string_view(),
-      ".part", data_writer->part_number, ".bfbs");
-  if (!data_writer->writer) {
-    data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
-  } else {
-    *data_writer->writer = DetachedBufferWriter(filename);
-  }
+      base_name_, "_", CHECK_NOTNULL(channel->source_node())->string_view(),
+      "_data", channel->name()->string_view(), "/",
+      channel->type()->string_view(), ".part", data_writer->part_number,
+      ".bfbs");
+  CreateBufferWriter(filename, &data_writer->writer);
 }
 
 std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
+  std::string name = base_name_;
+  if (node() != nullptr) {
+    name = absl::StrCat(name, "_", node()->name()->string_view());
+  }
   return std::make_unique<DetachedBufferWriter>(
-      absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
-                   part_number_, ".bfbs"));
+      absl::StrCat(name, "_data.part", part_number_, ".bfbs"),
+      std::make_unique<DummyEncoder>());
+}
+
+void MultiNodeLogNamer::CreateBufferWriter(
+    std::string_view filename,
+    std::unique_ptr<DetachedBufferWriter> *destination) {
+  if (ran_out_of_space_) {
+    // Refuse to open any new files, which might skip data. Any existing files
+    // are in the same folder, which means they're on the same filesystem, which
+    // means they're probably going to run out of space and get stuck too.
+    return;
+  }
+  if (!destination->get()) {
+    *destination = std::make_unique<DetachedBufferWriter>(
+        filename, std::make_unique<DummyEncoder>());
+    return;
+  }
+  destination->get()->Close();
+  if (destination->get()->ran_out_of_space()) {
+    ran_out_of_space_ = true;
+    return;
+  }
+  *destination->get() =
+      DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
 }
 
 }  // namespace logger
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 631016b..661f28d 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -33,17 +33,26 @@
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
       const Node *node) = 0;
 
-  // Returns a writer for writing data logged on this channel (on the node
-  // provided in the constructor).
+  // Returns a writer for writing data from messages on this channel (on the
+  // primary node).
+  //
+  // The returned pointer will stay valid across rotations, but the object it
+  // points to will be assigned to.
   virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
 
-  // Returns a writer for writing timestamps logged on this channel (on the node
-  // provided in the constructor).
+  // Returns a writer for writing timestamps from messages on this channel (on
+  // the primary node).
+  //
+  // The returned pointer will stay valid across rotations, but the object it
+  // points to will be assigned to.
   virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
 
   // Returns a writer for writing timestamps delivered over the special
   // /aos/remote_timestamps/* channels.  node is the node that the timestamps
-  // are forwarded back from.
+  // are forwarded back from (to the primary node).
+  //
+  // The returned pointer will stay valid across rotations, but the object it
+  // points to will be assigned to.
   virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
       const Channel *channel, const Node *node) = 0;
 
@@ -79,6 +88,7 @@
         base_name_(base_name),
         uuid_(UUID::Random()),
         data_writer_(OpenDataWriter()) {}
+  ~LocalLogNamer() override = default;
 
   void WriteHeader(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
@@ -99,7 +109,8 @@
   // Creates a new data writer with the new part number.
   std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
     return std::make_unique<DetachedBufferWriter>(
-        absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+        absl::StrCat(base_name_, ".part", part_number_, ".bfbs"),
+        std::make_unique<aos::logger::DummyEncoder>());
   }
 
   const std::string base_name_;
@@ -113,6 +124,9 @@
  public:
   MultiNodeLogNamer(std::string_view base_name,
                     const Configuration *configuration, const Node *node);
+  ~MultiNodeLogNamer() override = default;
+
+  std::string_view base_name() const { return base_name_; }
 
   void WriteHeader(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
@@ -124,11 +138,41 @@
 
   DetachedBufferWriter *MakeWriter(const Channel *channel) override;
 
-  DetachedBufferWriter *MakeForwardedTimestampWriter(
-      const Channel *channel, const Node *node) override;
+  DetachedBufferWriter *MakeForwardedTimestampWriter(const Channel *channel,
+                                                     const Node *node) override;
 
   DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
 
+  // Indicates that at least one file ran out of space. Once this happens, we
+  // stop trying to open new files, to avoid writing any files with holes from
+  // previous parts.
+  //
+  // Besides this function, this object will silently stop logging data when
+  // this occurs. If you want to ensure log files are complete, you must call
+  // this method.
+  bool ran_out_of_space() const { return ran_out_of_space_; }
+
+  // Returns the maximum total_bytes() value for all existing
+  // DetachedBufferWriters.
+  //
+  // Returns 0 if no files are open.
+  size_t maximum_total_bytes() const {
+    size_t result = 0;
+    for (const std::pair<const Channel *const, DataWriter> &data_writer :
+         data_writers_) {
+      result = std::max(result, data_writer.second.writer->total_bytes());
+    }
+    if (data_writer_) {
+      result = std::max(result, data_writer_->total_bytes());
+    }
+    return result;
+  }
+
+  // Closes all existing log files. No more data may be written after this.
+  //
+  // This may set ran_out_of_space().
+  void Close();
+
  private:
   // Files to write remote data to.  We want one per channel.  Maps the channel
   // to the writer, Node, and part number.
@@ -150,12 +194,17 @@
   // Opens the main data writer file for this node responsible for data_writer_.
   std::unique_ptr<DetachedBufferWriter> OpenDataWriter();
 
+  void CreateBufferWriter(std::string_view filename,
+                          std::unique_ptr<DetachedBufferWriter> *destination);
+
   const std::string base_name_;
   const Configuration *const configuration_;
   const UUID uuid_;
 
   size_t part_number_ = 0;
 
+  bool ran_out_of_space_ = false;
+
   // File to write both delivery timestamps and local data to.
   std::unique_ptr<DetachedBufferWriter> data_writer_;
 
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7f53577..5226154 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1,32 +1,43 @@
 #include "aos/events/logging/logfile_utils.h"
 
 #include <fcntl.h>
-#include <limits.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <sys/uio.h>
 
-#include <vector>
+#include <algorithm>
+#include <climits>
 
 #include "absl/strings/escaping.h"
 #include "aos/configuration.h"
-#include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/util/file.h"
 #include "flatbuffers/flatbuffers.h"
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
-DEFINE_int32(flush_size, 1000000,
+#if defined(__x86_64__)
+#define ENABLE_LZMA 1
+#elif defined(__aarch64__)
+#define ENABLE_LZMA 1
+#else
+#define ENABLE_LZMA 0
+#endif
+
+#if ENABLE_LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
+
+DEFINE_int32(flush_size, 128000,
              "Number of outstanding bytes to allow before flushing to disk.");
 
-namespace aos {
-namespace logger {
+namespace aos::logger {
 
 namespace chrono = std::chrono;
 
-DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
-    : filename_(filename) {
+DetachedBufferWriter::DetachedBufferWriter(
+    std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+    : filename_(filename), encoder_(std::move(encoder)) {
   util::MkdirP(filename, 0777);
   fd_ = open(std::string(filename).c_str(),
              O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
@@ -35,91 +46,161 @@
 }
 
 DetachedBufferWriter::~DetachedBufferWriter() {
-  Flush();
-  PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
-  VLOG(1) << "Closed " << filename_;
+  Close();
+  if (ran_out_of_space_) {
+    CHECK(acknowledge_ran_out_of_space_)
+        << ": Unacknowledged out of disk space, log file was not completed";
+  }
 }
 
-DetachedBufferWriter::DetachedBufferWriter(
-    DetachedBufferWriter &&other) {
+DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
   *this = std::move(other);
 }
 
+// When other is destroyed "soon" (which it should be because we're getting an
+// rvalue reference to it), it will flush etc all the data we have queued up
+// (because that data will then be its data).
 DetachedBufferWriter &DetachedBufferWriter::operator=(
     DetachedBufferWriter &&other) {
-  Flush();
   std::swap(filename_, other.filename_);
+  std::swap(encoder_, other.encoder_);
   std::swap(fd_, other.fd_);
-  std::swap(queued_size_, other.queued_size_);
-  std::swap(written_size_, other.written_size_);
-  std::swap(queue_, other.queue_);
+  std::swap(ran_out_of_space_, other.ran_out_of_space_);
+  std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
   std::swap(iovec_, other.iovec_);
+  std::swap(max_write_time_, other.max_write_time_);
+  std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
+  std::swap(max_write_time_messages_, other.max_write_time_messages_);
+  std::swap(total_write_time_, other.total_write_time_);
+  std::swap(total_write_count_, other.total_write_count_);
+  std::swap(total_write_messages_, other.total_write_messages_);
+  std::swap(total_write_bytes_, other.total_write_bytes_);
   return *this;
 }
 
-void DetachedBufferWriter::QueueSizedFlatbuffer(
-    flatbuffers::FlatBufferBuilder *fbb) {
-  QueueSizedFlatbuffer(fbb->Release());
+void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
+  if (encoder_->may_bypass() && span.size() > 4096u) {
+    // Over this threshold, we'll assume it's cheaper to add an extra
+    // syscall to write the data immediately instead of copying it to
+    // enqueue.
+
+    if (ran_out_of_space_) {
+      // We don't want any later data to be written after space becomes
+      // available, so refuse to write anything more once we've dropped data
+      // because we ran out of space.
+      VLOG(1) << "Ignoring span: " << span.size();
+      return;
+    }
+
+    // First, flush everything.
+    while (encoder_->queue_size() > 0u) {
+      Flush();
+    }
+
+    // Then, write it directly.
+    const auto start = aos::monotonic_clock::now();
+    const ssize_t written = write(fd_, span.data(), span.size());
+    const auto end = aos::monotonic_clock::now();
+    HandleWriteReturn(written, span.size());
+    UpdateStatsForWrite(end - start, written, 1);
+  } else {
+    encoder_->Encode(CopySpanAsDetachedBuffer(span));
+  }
+
+  FlushAtThreshold();
 }
 
-void DetachedBufferWriter::WriteSizedFlatbuffer(
-    absl::Span<const uint8_t> span) {
-  // Cheat aggressively...  Write out the queued up data, and then write this
-  // data once without buffering.  It is hard to make a DetachedBuffer out of
-  // this data, and we don't want to worry about lifetimes.
-  Flush();
-  iovec_.clear();
-  iovec_.reserve(1);
-
-  struct iovec n;
-  n.iov_base = const_cast<uint8_t *>(span.data());
-  n.iov_len = span.size();
-  iovec_.emplace_back(n);
-
-  const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
-  PCHECK(written == static_cast<ssize_t>(n.iov_len))
-      << ": Wrote " << written << " expected " << n.iov_len;
-  written_size_ += written;
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
-    flatbuffers::DetachedBuffer &&buffer) {
-  queued_size_ += buffer.size();
-  queue_.emplace_back(std::move(buffer));
-
-  // Flush if we are at the max number of iovs per writev, or have written
-  // enough data.  Otherwise writev will fail with an invalid argument.
-  if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
-      queue_.size() == IOV_MAX) {
+void DetachedBufferWriter::Close() {
+  if (fd_ == -1) {
+    return;
+  }
+  encoder_->Finish();
+  while (encoder_->queue_size() > 0) {
     Flush();
   }
+  if (close(fd_) == -1) {
+    if (errno == ENOSPC) {
+      ran_out_of_space_ = true;
+    } else {
+      PLOG(ERROR) << "Closing log file failed";
+    }
+  }
+  fd_ = -1;
+  VLOG(1) << "Closed " << filename_;
 }
 
 void DetachedBufferWriter::Flush() {
-  if (queue_.size() == 0u) {
+  const auto queue = encoder_->queue();
+  if (queue.empty()) {
     return;
   }
-  iovec_.clear();
-  iovec_.reserve(queue_.size());
-  size_t counted_size = 0;
-  for (size_t i = 0; i < queue_.size(); ++i) {
-    struct iovec n;
-    n.iov_base = queue_[i].data();
-    n.iov_len = queue_[i].size();
-    counted_size += n.iov_len;
-    iovec_.emplace_back(std::move(n));
+  if (ran_out_of_space_) {
+    // We don't want any later data to be written after space becomes available,
+    // so refuse to write anything more once we've dropped data because we ran
+    // out of space.
+    VLOG(1) << "Ignoring queue: " << queue.size();
+    encoder_->Clear(queue.size());
+    return;
   }
-  CHECK_EQ(counted_size, queued_size_);
+
+  iovec_.clear();
+  const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+  iovec_.resize(iovec_size);
+  size_t counted_size = 0;
+  for (size_t i = 0; i < iovec_size; ++i) {
+    iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
+    iovec_[i].iov_len = queue[i].size();
+    counted_size += iovec_[i].iov_len;
+  }
+
+  const auto start = aos::monotonic_clock::now();
   const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+  const auto end = aos::monotonic_clock::now();
+  HandleWriteReturn(written, counted_size);
 
-  PCHECK(written == static_cast<ssize_t>(queued_size_))
-      << ": Wrote " << written << " expected " << queued_size_;
-  written_size_ += written;
+  encoder_->Clear(iovec_size);
 
-  queued_size_ = 0;
-  queue_.clear();
-  // TODO(austin): Handle partial writes in some way other than crashing...
+  UpdateStatsForWrite(end - start, written, iovec_size);
+}
+
+void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
+                                             size_t write_size) {
+  if (write_return == -1 && errno == ENOSPC) {
+    ran_out_of_space_ = true;
+    return;
+  }
+  PCHECK(write_return >= 0) << ": write failed";
+  if (write_return < static_cast<ssize_t>(write_size)) {
+    // Sometimes this happens instead of ENOSPC. On a real filesystem, this
+    // never seems to happen in any other case. If we ever want to log to a
+    // socket, this will happen more often. However, until we get there, we'll
+    // just assume it means we ran out of space.
+    ran_out_of_space_ = true;
+    return;
+  }
+}
+
+void DetachedBufferWriter::UpdateStatsForWrite(
+    aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
+  if (duration > max_write_time_) {
+    max_write_time_ = duration;
+    max_write_time_bytes_ = written;
+    max_write_time_messages_ = iovec_size;
+  }
+  total_write_time_ += duration;
+  ++total_write_count_;
+  total_write_messages_ += iovec_size;
+  total_write_bytes_ += written;
+}
+
+void DetachedBufferWriter::FlushAtThreshold() {
+  // Flush if we are at the max number of iovs per writev, because there's no
+  // point queueing up any more data in memory. Also flush once we have enough
+  // data queued up.
+  while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
+         encoder_->queue_size() >= IOV_MAX) {
+    Flush();
+  }
 }
 
 flatbuffers::Offset<MessageHeader> PackMessage(
@@ -184,10 +265,17 @@
   return message_header_builder.Finish();
 }
 
-SpanReader::SpanReader(std::string_view filename)
-    : filename_(filename),
-      fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
-  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
+  static const std::string_view kXz = ".xz";
+  if (filename.substr(filename.size() - kXz.size()) == kXz) {
+#if ENABLE_LZMA
+    decoder_ = std::make_unique<LzmaDecoder>(filename);
+#else
+    LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
+#endif
+  } else {
+    decoder_ = std::make_unique<DummyDecoder>(filename);
+  }
 }
 
 absl::Span<const uint8_t> SpanReader::ReadMessage() {
@@ -243,18 +331,13 @@
 }
 
 bool SpanReader::ReadBlock() {
-  if (end_of_file_) {
-    return false;
-  }
-
-  // Appends 256k.  This is enough that the read call is efficient.  We don't
-  // want to spend too much time reading small chunks because the syscalls for
-  // that will be expensive.
+  // This is the amount of data we grab at a time. Doing larger chunks minimizes
+  // syscalls and helps decompressors batch things more efficiently.
   constexpr size_t kReadSize = 256 * 1024;
 
   // Strip off any unused data at the front.
   if (consumed_data_ != 0) {
-    data_.erase(data_.begin(), data_.begin() + consumed_data_);
+    data_.erase_front(consumed_data_);
     consumed_data_ = 0;
   }
 
@@ -263,15 +346,14 @@
   // This should automatically grow the backing store.  It won't shrink if we
   // get a small chunk later.  This reduces allocations when we want to append
   // more data.
-  data_.resize(data_.size() + kReadSize);
+  data_.resize(starting_size + kReadSize);
 
-  ssize_t count = read(fd_, &data_[starting_size], kReadSize);
-  data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+  const size_t count =
+      decoder_->Read(data_.begin() + starting_size, data_.end());
+  data_.resize(starting_size + count);
   if (count == 0) {
-    end_of_file_ = true;
     return false;
   }
-  PCHECK(count > 0);
 
   return true;
 }
@@ -285,8 +367,10 @@
       << ": Failed to read header from: " << filename;
 
   // And copy the config so we have it forever, removing the size prefix.
-  std::vector<uint8_t> data(
-      config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
+  ResizeableBuffer data;
+  data.resize(config_data.size() - sizeof(flatbuffers::uoffset_t));
+  memcpy(data.data(), config_data.begin() + sizeof(flatbuffers::uoffset_t),
+         data.size());
   return FlatbufferVector<LogFileHeader>(std::move(data));
 }
 
@@ -302,9 +386,11 @@
         << ": Failed to read data from: " << filename;
   }
 
-  // And copy the data so we have it forever.
-  std::vector<uint8_t> data(data_span.begin() + sizeof(flatbuffers::uoffset_t),
-                            data_span.end());
+  // And copy the config so we have it forever, removing the size prefix.
+  ResizeableBuffer data;
+  data.resize(data_span.size() - sizeof(flatbuffers::uoffset_t));
+  memcpy(data.data(), data_span.begin() + sizeof(flatbuffers::uoffset_t),
+         data.size());
   return FlatbufferVector<MessageHeader>(std::move(data));
 }
 
@@ -319,8 +405,11 @@
       << ": Failed to read header from: " << filename;
 
   // And copy the header data so we have it forever.
-  std::vector<uint8_t> header_data_copy(
-      header_data.begin() + sizeof(flatbuffers::uoffset_t), header_data.end());
+  ResizeableBuffer header_data_copy;
+  header_data_copy.resize(header_data.size() - sizeof(flatbuffers::uoffset_t));
+  memcpy(header_data_copy.data(),
+         header_data.begin() + sizeof(flatbuffers::uoffset_t),
+         header_data_copy.size());
   raw_log_file_header_ =
       FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
 
@@ -337,8 +426,12 @@
     return std::nullopt;
   }
 
-  FlatbufferVector<MessageHeader> result{std::vector<uint8_t>(
-      msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end())};
+  ResizeableBuffer result_buffer;
+  result_buffer.resize(msg_data.size() - sizeof(flatbuffers::uoffset_t));
+  memcpy(result_buffer.data(),
+         msg_data.begin() + sizeof(flatbuffers::uoffset_t),
+         result_buffer.size());
+  FlatbufferVector<MessageHeader> result(std::move(result_buffer));
 
   const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
       chrono::nanoseconds(result.message().monotonic_sent_time()));
@@ -378,7 +471,7 @@
     if (monotonic_start_time() == monotonic_clock::min_time) {
       CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
       // We should only be missing the monotonic start time when logging data
-      // for remote nodes.  We don't have a good way to deteremine the remote
+      // for remote nodes.  We don't have a good way to determine the remote
       // realtime offset, so it shouldn't be filled out.
       // TODO(austin): If we have a good way, feel free to fill it out.  It
       // probably won't be better than we could do in post though with the same
@@ -564,8 +657,8 @@
           chrono::nanoseconds(header.monotonic_sent_time()));
 
       if (VLOG_IS_ON(2)) {
-        LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this
-                  << " " << filename() << " ttq: " << time_to_queue_ << " now "
+        LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
+                  << filename() << " ttq: " << time_to_queue_ << " now "
                   << newest_timestamp() << " start time "
                   << monotonic_start_time() << " " << FlatbufferToJson(&header);
       } else if (VLOG_IS_ON(1)) {
@@ -1228,9 +1321,8 @@
           (reader->node() != nullptr) && (target_node != nullptr) &&
           (reader->node()->has_name() && target_node->has_name());
       const bool node_names_identical =
-          both_have_name &&
-          (reader->node()->name()->string_view() ==
-           target_node->name()->string_view());
+          both_have_name && (reader->node()->name()->string_view() ==
+                             target_node->name()->string_view());
       if (both_null || node_names_identical) {
         if (!found_node) {
           found_node = true;
@@ -1520,5 +1612,4 @@
   return "";
 }
 
-}  // namespace logger
-}  // namespace aos
+}  // namespace aos::logger
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 9534860..a3d16f7 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -15,12 +15,14 @@
 #include <vector>
 
 #include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/buffer_encoder.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffers.h"
 #include "flatbuffers/flatbuffers.h"
 
-namespace aos {
-namespace logger {
+namespace aos::logger {
 
 enum class LogType : uint8_t {
   // The message originated on this node and should be logged here.
@@ -37,10 +39,11 @@
 };
 
 // This class manages efficiently writing a sequence of detached buffers to a
-// file.  It queues them up and batches the write operation.
+// file.  It encodes them, queues them up, and batches the write operation.
 class DetachedBufferWriter {
  public:
-  DetachedBufferWriter(std::string_view filename);
+  DetachedBufferWriter(std::string_view filename,
+                       std::unique_ptr<DetachedBufferEncoder> encoder);
   DetachedBufferWriter(DetachedBufferWriter &&other);
   DetachedBufferWriter(const DetachedBufferWriter &) = delete;
 
@@ -51,43 +54,112 @@
 
   std::string_view filename() const { return filename_; }
 
-  // Rewrites a location in a file (relative to the start) to have new data in
-  // it.  The main use case is updating start times after a log file starts.
-  void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
+  // Queues up a finished FlatBufferBuilder to be encoded and written.
+  //
+  // Triggers a flush if there's enough data queued up.
+  //
+  // Steals the detached buffer from it.
+  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb) {
+    QueueSizedFlatbuffer(fbb->Release());
+  }
+  // May steal the backing storage of buffer, or may leave it alone.
+  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
+    encoder_->Encode(std::move(buffer));
+    FlushAtThreshold();
+  }
 
-  // TODO(austin): Snappy compress the log file if it ends with .snappy!
+  // Queues up data in span. May copy or may write it to disk immediately.
+  void QueueSpan(absl::Span<const uint8_t> span);
 
-  // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
-  // buffer from it.
-  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
-  // Queues up a detached buffer directly.
-  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
-  // Writes a Span.  This is not terribly optimized right now.
-  void WriteSizedFlatbuffer(absl::Span<const uint8_t> span);
+  // Indicates we got ENOSPC when trying to write. After this returns true, no
+  // further data is written.
+  bool ran_out_of_space() const { return ran_out_of_space_; }
 
-  // Triggers data to be provided to the kernel and written.
-  void Flush();
+  // To avoid silently failing to write logfiles, you must call this before
+  // destruction if ran_out_of_space() is true and the situation has been
+  // handled.
+  void acknowledge_out_of_space() {
+    CHECK(ran_out_of_space_);
+    acknowledge_ran_out_of_space_ = true;
+  }
 
-  // Returns the number of bytes written.
-  size_t written_size() const { return written_size_; }
+  // Fully flushes and closes the underlying file now. No additional data may be
+  // enqueued after calling this.
+  //
+  // This will be performed in the destructor automatically.
+  //
+  // Note that this may set ran_out_of_space().
+  void Close();
 
-  // Returns the number of bytes written or currently queued.
-  size_t total_size() const { return written_size_ + queued_size_; }
+  // Returns the total number of bytes written and currently queued.
+  size_t total_bytes() const { return encoder_->total_bytes(); }
+
+  // The maximum time for a single write call, or 0 if none have been performed.
+  std::chrono::nanoseconds max_write_time() const { return max_write_time_; }
+  // The number of bytes in the longest write call, or -1 if none have been
+  // performed.
+  int max_write_time_bytes() const { return max_write_time_bytes_; }
+  // The number of buffers in the longest write call, or -1 if none have been
+  // performed.
+  int max_write_time_messages() const { return max_write_time_messages_; }
+  // The total time spent in write calls.
+  std::chrono::nanoseconds total_write_time() const {
+    return total_write_time_;
+  }
+  // The total number of writes which have been performed.
+  int total_write_count() const { return total_write_count_; }
+  // The total number of messages which have been written.
+  int total_write_messages() const { return total_write_messages_; }
+  // The total number of bytes which have been written.
+  int total_write_bytes() const { return total_write_bytes_; }
+  void ResetStatistics() {
+    max_write_time_ = std::chrono::nanoseconds::zero();
+    max_write_time_bytes_ = -1;
+    max_write_time_messages_ = -1;
+    total_write_time_ = std::chrono::nanoseconds::zero();
+    total_write_count_ = 0;
+    total_write_messages_ = 0;
+    total_write_bytes_ = 0;
+  }
 
  private:
+  // Performs a single writev call with as much of the data we have queued up as
+  // possible.
+  //
+  // This will normally take all of the data we have queued up, unless an
+  // encoder has spit out a big enough chunk all at once that we can't manage
+  // all of it.
+  void Flush();
+
+  // write_return is what write(2) or writev(2) returned. write_size is the
+  // number of bytes we expected it to write.
+  void HandleWriteReturn(ssize_t write_return, size_t write_size);
+
+  void UpdateStatsForWrite(aos::monotonic_clock::duration duration,
+                           ssize_t written, int iovec_size);
+
+  // Flushes data if we've reached the threshold to do that as part of normal
+  // operation.
+  void FlushAtThreshold();
+
   std::string filename_;
+  std::unique_ptr<DetachedBufferEncoder> encoder_;
 
   int fd_ = -1;
+  bool ran_out_of_space_ = false;
+  bool acknowledge_ran_out_of_space_ = false;
 
-  // Size of all the data in the queue.
-  size_t queued_size_ = 0;
-  size_t written_size_ = 0;
-
-  // List of buffers to flush.
-  std::vector<flatbuffers::DetachedBuffer> queue_;
   // List of iovecs to use with writev.  This is a member variable to avoid
   // churn.
   std::vector<struct iovec> iovec_;
+
+  std::chrono::nanoseconds max_write_time_ = std::chrono::nanoseconds::zero();
+  int max_write_time_bytes_ = -1;
+  int max_write_time_messages_ = -1;
+  std::chrono::nanoseconds total_write_time_ = std::chrono::nanoseconds::zero();
+  int total_write_count_ = 0;
+  int total_write_messages_ = 0;
+  int total_write_bytes_ = 0;
 };
 
 // Packes a message pointed to by the context into a MessageHeader.
@@ -104,8 +176,6 @@
  public:
   SpanReader(std::string_view filename);
 
-  ~SpanReader() { close(fd_); }
-
   std::string_view filename() const { return filename_; }
 
   // Returns a span with the data for a message from the log file, excluding
@@ -121,50 +191,22 @@
   //   Allocate the 256k blocks like we do today.  But, refcount them with
   //   shared_ptr pointed to by the messageheader that is returned.  This avoids
   //   the copy.  Need to do more benchmarking.
+  //   And (Brian): Consider just mmapping the file and handing out refcounted
+  //   pointers into that too.
 
   // Reads a chunk of data into data_.  Returns false if no data was read.
   bool ReadBlock();
 
   const std::string filename_;
 
-  // File descriptor for the log file.
-  int fd_ = -1;
+  // File reader and data decoder.
+  std::unique_ptr<DataDecoder> decoder_;
 
-  // Allocator which doesn't zero initialize memory.
-  template <typename T>
-  struct DefaultInitAllocator {
-    typedef T value_type;
-
-    template <typename U>
-    void construct(U *p) {
-      ::new (static_cast<void *>(p)) U;
-    }
-
-    template <typename U, typename... Args>
-    void construct(U *p, Args &&... args) {
-      ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
-    }
-
-    T *allocate(std::size_t n) {
-      return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
-    }
-
-    template <typename U>
-    void deallocate(U *p, std::size_t /*n*/) {
-      ::operator delete(static_cast<void *>(p));
-    }
-  };
-
-  // Vector to read into.  This uses an allocator which doesn't zero
-  // initialize the memory.
-  std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+  // Vector to read into.
+  ResizeableBuffer data_;
 
   // Amount of data consumed already in data_.
   size_t consumed_data_ = 0;
-
-  // Cached bit for if we have reached the end of the file.  Otherwise we will
-  // hammer on the kernel asking for more data each time we send.
-  bool end_of_file_ = false;
 };
 
 // Class which handles reading the header and messages from the log file.  This
@@ -668,7 +710,6 @@
 // a single node.
 std::string MaybeNodeName(const Node *);
 
-}  // namespace logger
-}  // namespace aos
+}  // namespace aos::logger
 
 #endif  // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
diff --git a/aos/events/logging/logfile_utils_out_of_space_test.sh b/aos/events/logging/logfile_utils_out_of_space_test.sh
new file mode 100755
index 0000000..f412e0d
--- /dev/null
+++ b/aos/events/logging/logfile_utils_out_of_space_test.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+# Tests DetachedBufferWriter's behavior when running out of disk space. We do
+# this by creating a small tmpfs in a new mount namespace, and then running a
+# test binary in that namespace. There's no easy way for a process with user
+# code to run itself in a new mount+user namespace, so we do that outside the
+# test process itself.
+
+# --- begin runfiles.bash initialization v2 ---
+# Copy-pasted from the Bazel Bash runfiles library v2.
+set -uo pipefail; f=bazel_tools/tools/bash/runfiles/runfiles.bash
+source "${RUNFILES_DIR:-/dev/null}/$f" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "${RUNFILES_MANIFEST_FILE:-/dev/null}" | cut -f2- -d' ')" 2>/dev/null || \
+source "$0.runfiles/$f" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "$0.runfiles_manifest" | cut -f2- -d' ')" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "$0.exe.runfiles_manifest" | cut -f2- -d' ')" 2>/dev/null || \
+  { echo>&2 "ERROR: cannot find $f"; exit 1; }; f=; set -e
+# --- end runfiles.bash initialization v2 ---
+
+set -euo pipefail
+
+TMPFS="${TEST_TMPDIR}/small_tmpfs"
+rm -rf "${TMPFS}"
+mkdir "${TMPFS}"
+
+function test {
+  SIZE="$1"
+  echo "Running test with ${SIZE}..." >&2
+  unshare --mount --map-root-user bash <<END
+set -euo pipefail
+
+mount -t tmpfs tmpfs -o size=${SIZE} "${TMPFS}"
+
+exec "$(rlocation org_frc971/aos/events/logging/logfile_utils_out_of_space_test_runner)" -tmpfs "${TMPFS}"
+END
+  echo "Finished test with ${SIZE}" >&2
+}
+
+# Run out of space exactly at the beginning of a block.
+test 81920
+
+# Run out of space 1 byte into a block.
+test 81921
+
+# Run out of space in the middle of a block.
+test 87040
diff --git a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
new file mode 100644
index 0000000..3010a6f
--- /dev/null
+++ b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
@@ -0,0 +1,30 @@
+// See :logfile_utils_out_of_space_test for usage and details.
+
+#include <array>
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/init.h"
+
+DECLARE_int32(flush_size);
+DEFINE_string(tmpfs, "", "tmpfs with the desired size");
+
+int main(int argc, char **argv) {
+  aos::InitGoogle(&argc, &argv);
+  FLAGS_flush_size = 1;
+  CHECK(!FLAGS_tmpfs.empty()) << ": Must specify a tmpfs location";
+
+  aos::logger::DetachedBufferWriter writer(
+      FLAGS_tmpfs + "/file", std::make_unique<aos::logger::DummyEncoder>());
+  std::array<uint8_t, 10240> data;
+  data.fill(0);
+  for (int i = 0; i < 8; ++i) {
+    writer.QueueSpan(data);
+    CHECK(!writer.ran_out_of_space()) << ": " << i;
+  }
+  writer.QueueSpan(data);
+  CHECK(writer.ran_out_of_space());
+  writer.acknowledge_out_of_space();
+}
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index caccf03..859e24c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -35,44 +35,30 @@
 namespace logger {
 namespace chrono = std::chrono;
 
-
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
-               std::chrono::milliseconds polling_period)
-    : Logger(base_name, event_loop, event_loop->configuration(),
-             polling_period) {}
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
-               const Configuration *configuration,
-               std::chrono::milliseconds polling_period)
-    : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
-             event_loop, configuration, polling_period) {}
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-               std::chrono::milliseconds polling_period)
-    : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
-             polling_period) {}
-
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-               const Configuration *configuration,
-               std::chrono::milliseconds polling_period)
+Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
+               std::function<bool(const Channel *)> should_log)
     : event_loop_(event_loop),
-      uuid_(UUID::Random()),
-      log_namer_(std::move(log_namer)),
       configuration_(configuration),
       name_(network::GetHostname()),
-      timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
-      polling_period_(polling_period),
+      timer_handler_(event_loop_->AddTimer(
+          [this]() { DoLogData(event_loop_->monotonic_now()); })),
       server_statistics_fetcher_(
           configuration::MultiNode(event_loop_->configuration())
               ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
                     "/aos")
               : aos::Fetcher<message_bridge::ServerStatistics>()) {
-  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
-  int channel_index = 0;
+  VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
 
   // Find all the nodes which are logging timestamps on our node.
   std::set<const Node *> timestamp_logger_nodes;
   for (const Channel *channel : *configuration_->channels()) {
-    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
-        !channel->has_destination_nodes()) {
+    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
+      continue;
+    }
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+    if (!should_log(channel)) {
       continue;
     }
     for (const Connection *connection : *channel->destination_nodes()) {
@@ -104,44 +90,53 @@
         << event_loop_->node()->name()->string_view()
         << " but can't find channel /aos/remote_timestamps/"
         << node->name()->string_view();
+    if (!should_log(channel)) {
+      continue;
+    }
     timestamp_logger_channels.insert(std::make_pair(channel, node));
   }
 
-  const size_t our_node_index = configuration::GetNodeIndex(
-      configuration_, event_loop_->node());
+  const size_t our_node_index =
+      configuration::GetNodeIndex(configuration_, event_loop_->node());
 
-  for (const Channel *config_channel : *configuration_->channels()) {
+  for (size_t channel_index = 0;
+       channel_index < configuration_->channels()->size(); ++channel_index) {
+    const Channel *const config_channel =
+        configuration_->channels()->Get(channel_index);
     // The MakeRawFetcher method needs a channel which is in the event loop
     // configuration() object, not the configuration_ object.  Go look that up
     // from the config.
     const Channel *channel = aos::configuration::GetChannel(
         event_loop_->configuration(), config_channel->name()->string_view(),
         config_channel->type()->string_view(), "", event_loop_->node());
+    if (!should_log(channel)) {
+      continue;
+    }
 
     FetcherStruct fs;
     fs.node_index = our_node_index;
+    fs.channel_index = channel_index;
+    fs.channel = channel;
+
     const bool is_local =
         configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
 
     const bool is_readable =
         configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
-    const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
-                                 channel, event_loop_->node()) &&
-                             is_readable;
+    const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
+        channel, event_loop_->node());
+    const bool log_message = is_logged && is_readable;
 
-    const bool log_delivery_times =
-        (event_loop_->node() == nullptr)
-            ? false
-            : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-                  channel, event_loop_->node(), event_loop_->node());
+    bool log_delivery_times = false;
+    if (event_loop_->node() != nullptr) {
+      log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+          channel, event_loop_->node(), event_loop_->node());
+    }
 
     // Now, detect a MessageHeader timestamp logger where we should just log the
     // contents to a file directly.
     const bool log_contents = timestamp_logger_channels.find(channel) !=
                               timestamp_logger_channels.end();
-    const Node *timestamp_node =
-        log_contents ? timestamp_logger_channels.find(channel)->second
-                     : nullptr;
 
     if (log_message || log_delivery_times || log_contents) {
       fs.fetcher = event_loop->MakeRawFetcher(channel);
@@ -150,11 +145,11 @@
 
       if (log_delivery_times) {
         VLOG(1) << "  Delivery times";
-        fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+        fs.wants_timestamp_writer = true;
       }
       if (log_message) {
         VLOG(1) << "  Data";
-        fs.writer = log_namer_->MakeWriter(channel);
+        fs.wants_writer = true;
         if (!is_local) {
           fs.log_type = LogType::kLogRemoteMessage;
         }
@@ -162,44 +157,60 @@
       if (log_contents) {
         VLOG(1) << "Timestamp logger channel "
                 << configuration::CleanedChannelToString(channel);
-        fs.contents_writer =
-            log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+        fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+        fs.wants_contents_writer = true;
         fs.node_index =
-            configuration::GetNodeIndex(configuration_, timestamp_node);
+            configuration::GetNodeIndex(configuration_, fs.timestamp_node);
       }
-      fs.channel_index = channel_index;
-      fs.written = false;
       fetchers_.emplace_back(std::move(fs));
     }
-    ++channel_index;
+  }
+}
+
+Logger::~Logger() {
+  if (log_namer_) {
+    // If we are replaying a log file, or in simulation, we want to force the
+    // last bit of data to be logged.  The easiest way to deal with this is to
+    // poll everything as we go to destroy the class, ie, shut down the logger,
+    // and write it to disk.
+    StopLogging(event_loop_->monotonic_now());
+  }
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer) {
+  CHECK(!log_namer_) << ": Already logging";
+  log_namer_ = std::move(log_namer);
+  uuid_ = UUID::Random();
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+
+  // We want to do as much work as possible before the initial Fetch. Time
+  // between that and actually starting to log opens up the possibility of
+  // falling off the end of the queue during that time.
+
+  for (FetcherStruct &f : fetchers_) {
+    if (f.wants_writer) {
+      f.writer = log_namer_->MakeWriter(f.channel);
+    }
+    if (f.wants_timestamp_writer) {
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+    }
+    if (f.wants_contents_writer) {
+      f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
+          f.channel, CHECK_NOTNULL(f.timestamp_node));
+    }
   }
 
+  CHECK(node_state_.empty());
   node_state_.resize(configuration::MultiNode(configuration_)
                          ? configuration_->nodes()->size()
                          : 1u);
 
   for (const Node *node : log_namer_->nodes()) {
-    const int node_index =
-        configuration::GetNodeIndex(configuration_, node);
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
 
     node_state_[node_index].log_file_header = MakeHeader(node);
   }
 
-  // When things start, we want to log the header, then the most recent
-  // messages available on each fetcher to capture the previous state, then
-  // start polling.
-  event_loop_->OnRun([this]() { StartLogging(); });
-}
-
-Logger::~Logger() {
-  // If we are replaying a log file, or in simulation, we want to force the last
-  // bit of data to be logged.  The easiest way to deal with this is to poll
-  // everything as we go to destroy the class, ie, shut down the logger, and
-  // write it to disk.
-  DoLogData();
-}
-
-void Logger::StartLogging() {
   // Grab data from each channel right before we declare the log file started
   // so we can capture the latest message on each channel.  This lets us have
   // non periodic messages with configuration that now get logged.
@@ -221,6 +232,25 @@
                         polling_period_);
 }
 
+std::unique_ptr<LogNamer> Logger::StopLogging(
+    aos::monotonic_clock::time_point end_time) {
+  CHECK(log_namer_) << ": Not logging right now";
+
+  if (end_time != aos::monotonic_clock::min_time) {
+    LogUntil(end_time);
+  }
+  timer_handler_->Disable();
+
+  for (FetcherStruct &f : fetchers_) {
+    f.writer = nullptr;
+    f.timestamp_writer = nullptr;
+    f.contents_writer = nullptr;
+  }
+  node_state_.clear();
+
+  return std::move(log_namer_);
+}
+
 void Logger::WriteHeader() {
   if (configuration::MultiNode(configuration_)) {
     server_statistics_fetcher_.Fetch();
@@ -238,8 +268,7 @@
   last_synchronized_time_ = monotonic_start_time;
 
   for (const Node *node : log_namer_->nodes()) {
-    const int node_index =
-        configuration::GetNodeIndex(configuration_, node);
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
     MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
                          realtime_start_time);
     log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
@@ -258,8 +287,7 @@
   }
 
   for (const Node *node : log_namer_->nodes()) {
-    const int node_index =
-        configuration::GetNodeIndex(configuration_, node);
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
     if (MaybeUpdateTimestamp(
             node, node_index,
             server_statistics_fetcher_.context().monotonic_event_time,
@@ -296,7 +324,7 @@
     const Node *node, int node_index,
     aos::monotonic_clock::time_point monotonic_start_time,
     aos::realtime_clock::time_point realtime_start_time) {
-  // Bail early if there the start times are already set.
+  // Bail early if the start times are already set.
   if (node_state_[node_index].monotonic_start_time !=
       monotonic_clock::min_time) {
     return false;
@@ -360,6 +388,7 @@
   flatbuffers::Offset<flatbuffers::String> name_offset =
       fbb.CreateString(name_);
 
+  CHECK(uuid_ != UUID::Zero());
   flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
       fbb.CreateString(uuid_.string_view());
 
@@ -388,8 +417,7 @@
   // message.  Report back 3x to be extra safe, and because the cost isn't
   // huge on the read side.
   log_file_header_builder.add_max_out_of_order_duration(
-      std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
-          .count());
+      std::chrono::nanoseconds(3 * polling_period_).count());
 
   log_file_header_builder.add_monotonic_start_time(
       std::chrono::duration_cast<std::chrono::nanoseconds>(
@@ -413,8 +441,7 @@
 
 void Logger::Rotate() {
   for (const Node *node : log_namer_->nodes()) {
-    const int node_index =
-        configuration::GetNodeIndex(configuration_, node);
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
     log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
   }
 }
@@ -527,113 +554,252 @@
   last_synchronized_time_ = t;
 }
 
-void Logger::DoLogData() {
-  // We want to guarentee that messages aren't out of order by more than
+void Logger::DoLogData(const monotonic_clock::time_point end_time) {
+  // We want to guarantee that messages aren't out of order by more than
   // max_out_of_order_duration.  To do this, we need sync points.  Every write
   // cycle should be a sync point.
-  const monotonic_clock::time_point monotonic_now =
-      event_loop_->monotonic_now();
 
   do {
     // Move the sync point up by at most polling_period.  This forces one sync
     // per iteration, even if it is small.
-    LogUntil(
-        std::min(last_synchronized_time_ + polling_period_, monotonic_now));
+    LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
+
+    on_logged_period_();
 
     // If we missed cycles, we could be pretty far behind.  Spin until we are
     // caught up.
-  } while (last_synchronized_time_ + polling_period_ < monotonic_now);
+  } while (last_synchronized_time_ + polling_period_ < end_time);
 }
 
-std::vector<std::vector<std::string>> SortParts(
-    const std::vector<std::string> &parts) {
+std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
   // Start by grouping all parts by UUID, and extracting the part index.
-  std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
+  // Datastructure to hold all the info extracted from a set of parts which go
+  // together so we can sort them afterwords.
+  struct UnsortedLogParts {
+    // Start times.
+    aos::monotonic_clock::time_point monotonic_start_time;
+    aos::realtime_clock::time_point realtime_start_time;
+
+    // Node to save.
+    std::string node;
+
+    // Pairs of the filename and the part index for sorting.
+    std::vector<std::pair<std::string, int>> parts;
+  };
+
+  // Map holding the logger_uuid -> second map.  The second map holds the
+  // parts_uuid -> list of parts for sorting.
+  std::map<std::string, std::map<std::string, UnsortedLogParts>> parts_list;
 
   // Sort part files without UUIDs and part indexes as well.  Extract everything
   // useful from the log in the first pass, then sort later.
-  struct LogPart {
-    std::string filename;
-    monotonic_clock::time_point start_time;
-    monotonic_clock::time_point first_message_time;
+  struct UnsortedOldParts {
+    // Part information with everything but the list of parts.
+    LogParts parts;
+
+    // Tuple of time for the data and filename needed for sorting after
+    // extracting.
+    std::vector<std::pair<monotonic_clock::time_point, std::string>>
+        unsorted_parts;
   };
 
-  std::vector<LogPart> old_parts;
+  // A list of all the old parts which we don't know how to sort using uuids.
+  // There are enough of these in the wild that this is worth supporting.
+  std::vector<UnsortedOldParts> old_parts;
 
+  // Now extract everything into our datastructures above for sorting.
   for (const std::string &part : parts) {
     FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
 
+    const monotonic_clock::time_point monotonic_start_time(
+        chrono::nanoseconds(log_header.message().monotonic_start_time()));
+    const realtime_clock::time_point realtime_start_time(
+        chrono::nanoseconds(log_header.message().realtime_start_time()));
+
+    const std::string_view node =
+        log_header.message().has_node()
+            ? log_header.message().node()->name()->string_view()
+            : "";
+
     // Looks like an old log.  No UUID, index, and also single node.  We have
     // little to no multi-node log files in the wild without part UUIDs and
     // indexes which we care much about.
     if (!log_header.message().has_parts_uuid() &&
         !log_header.message().has_parts_index() &&
         !log_header.message().has_node()) {
-      LogPart log_part;
-      log_part.filename = part;
-      log_part.start_time = monotonic_clock::time_point(
-          chrono::nanoseconds(log_header.message().monotonic_start_time()));
       FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
-      log_part.first_message_time = monotonic_clock::time_point(
+      const monotonic_clock::time_point first_message_time(
           chrono::nanoseconds(first_message.message().monotonic_sent_time()));
-      old_parts.emplace_back(std::move(log_part));
+
+      // Find anything with a matching start time.  They all go together.
+      auto result = std::find_if(
+          old_parts.begin(), old_parts.end(),
+          [&](const UnsortedOldParts &parts) {
+            return parts.parts.monotonic_start_time == monotonic_start_time &&
+                   parts.parts.realtime_start_time == realtime_start_time;
+          });
+
+      if (result == old_parts.end()) {
+        old_parts.emplace_back();
+        old_parts.back().parts.monotonic_start_time = monotonic_start_time;
+        old_parts.back().parts.realtime_start_time = realtime_start_time;
+        old_parts.back().unsorted_parts.emplace_back(
+            std::make_pair(first_message_time, part));
+      } else {
+        result->unsorted_parts.emplace_back(
+            std::make_pair(first_message_time, part));
+      }
       continue;
     }
 
+    CHECK(log_header.message().has_logger_uuid());
     CHECK(log_header.message().has_parts_uuid());
     CHECK(log_header.message().has_parts_index());
 
+    const std::string logger_uuid = log_header.message().logger_uuid()->str();
     const std::string parts_uuid = log_header.message().parts_uuid()->str();
-    auto it = parts_list.find(parts_uuid);
-    if (it == parts_list.end()) {
-      it = parts_list
-               .insert(std::make_pair(
-                   parts_uuid, std::vector<std::pair<std::string, int>>{}))
-               .first;
+    int32_t parts_index = log_header.message().parts_index();
+
+    auto log_it = parts_list.find(logger_uuid);
+    if (log_it == parts_list.end()) {
+      log_it = parts_list
+                   .insert(std::make_pair(
+                       logger_uuid, std::map<std::string, UnsortedLogParts>()))
+                   .first;
     }
-    it->second.emplace_back(
-        std::make_pair(part, log_header.message().parts_index()));
+
+    auto it = log_it->second.find(parts_uuid);
+    if (it == log_it->second.end()) {
+      it = log_it->second.insert(std::make_pair(parts_uuid, UnsortedLogParts()))
+               .first;
+      it->second.monotonic_start_time = monotonic_start_time;
+      it->second.realtime_start_time = realtime_start_time;
+      it->second.node = std::string(node);
+    }
+
+    // First part might be min_time.  If it is, try to put a better time on it.
+    if (it->second.monotonic_start_time == monotonic_clock::min_time) {
+      it->second.monotonic_start_time = monotonic_start_time;
+    } else if (monotonic_start_time != monotonic_clock::min_time) {
+      CHECK_EQ(it->second.monotonic_start_time, monotonic_start_time);
+    }
+    if (it->second.realtime_start_time == realtime_clock::min_time) {
+      it->second.realtime_start_time = realtime_start_time;
+    } else if (realtime_start_time != realtime_clock::min_time) {
+      CHECK_EQ(it->second.realtime_start_time, realtime_start_time);
+    }
+
+    it->second.parts.emplace_back(std::make_pair(part, parts_index));
   }
 
   CHECK_NE(old_parts.empty(), parts_list.empty())
       << ": Can't have a mix of old and new parts.";
 
+  // Now reformat old_parts to be in the right datastructure to report.
   if (!old_parts.empty()) {
-    // Confirm they all have the same start time.  Old loggers always used the
-    // same start time.
-    for (const LogPart &p : old_parts) {
-      CHECK_EQ(old_parts[0].start_time, p.start_time);
+    std::vector<LogFile> result;
+    for (UnsortedOldParts &p : old_parts) {
+      // Sort by the oldest message in each file.
+      std::sort(
+          p.unsorted_parts.begin(), p.unsorted_parts.end(),
+          [](const std::pair<monotonic_clock::time_point, std::string> &a,
+             const std::pair<monotonic_clock::time_point, std::string> &b) {
+            return a.first < b.first;
+          });
+      LogFile log_file;
+      for (std::pair<monotonic_clock::time_point, std::string> &f :
+           p.unsorted_parts) {
+        p.parts.parts.emplace_back(std::move(f.second));
+      }
+      log_file.parts.emplace_back(std::move(p.parts));
+      result.emplace_back(std::move(log_file));
     }
-    // Sort by the oldest message in each file.
-    std::sort(old_parts.begin(), old_parts.end(),
-              [](const LogPart &a, const LogPart &b) {
-                return a.first_message_time < b.first_message_time;
-              });
 
-    // Produce the final form.
-    std::vector<std::string> sorted_old_parts;
-    sorted_old_parts.reserve(old_parts.size());
-    for (LogPart &p : old_parts) {
-      sorted_old_parts.emplace_back(std::move(p.filename));
-    }
-    return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
+    return result;
   }
 
   // Now, sort them and produce the final vector form.
-  std::vector<std::vector<std::string>> result;
+  std::vector<LogFile> result;
   result.reserve(parts_list.size());
-  for (auto &part : parts_list) {
-    std::sort(part.second.begin(), part.second.end(),
-              [](const std::pair<std::string, int> &a,
-                 const std::pair<std::string, int> &b) {
-                return a.second < b.second;
-              });
-    std::vector<std::string> result_line;
-    result_line.reserve(part.second.size());
-    for (std::pair<std::string, int> &p : part.second) {
-      result_line.emplace_back(std::move(p.first));
+  for (std::pair<const std::string, std::map<std::string, UnsortedLogParts>>
+           &logs : parts_list) {
+    LogFile new_file;
+    new_file.logger_uuid = logs.first;
+    for (std::pair<const std::string, UnsortedLogParts> &parts : logs.second) {
+      LogParts new_parts;
+      new_parts.monotonic_start_time = parts.second.monotonic_start_time;
+      new_parts.realtime_start_time = parts.second.realtime_start_time;
+      new_parts.logger_uuid = logs.first;
+      new_parts.parts_uuid = parts.first;
+      new_parts.node = std::move(parts.second.node);
+
+      std::sort(parts.second.parts.begin(), parts.second.parts.end(),
+                [](const std::pair<std::string, int> &a,
+                   const std::pair<std::string, int> &b) {
+                  return a.second < b.second;
+                });
+      new_parts.parts.reserve(parts.second.parts.size());
+      for (std::pair<std::string, int> &p : parts.second.parts) {
+        new_parts.parts.emplace_back(std::move(p.first));
+      }
+      new_file.parts.emplace_back(std::move(new_parts));
     }
-    result.emplace_back(std::move(result_line));
+    result.emplace_back(std::move(new_file));
+  }
+  return result;
+}
+
+std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
+  stream << "{";
+  if (!file.logger_uuid.empty()) {
+    stream << "\"logger_uuid\": \"" << file.logger_uuid << "\", ";
+  }
+  stream << "\"parts\": [";
+  for (size_t i = 0; i < file.parts.size(); ++i) {
+    if (i != 0u) {
+      stream << ", ";
+    }
+    stream << file.parts[i];
+  }
+  stream << "]}";
+  return stream;
+}
+std::ostream &operator<<(std::ostream &stream, const LogParts &parts) {
+  stream << "{";
+  if (!parts.logger_uuid.empty()) {
+    stream << "\"logger_uuid\": \"" << parts.logger_uuid << "\", ";
+  }
+  if (!parts.parts_uuid.empty()) {
+    stream << "\"parts_uuid\": \"" << parts.parts_uuid << "\", ";
+  }
+  if (!parts.node.empty()) {
+    stream << "\"node\": \"" << parts.node << "\", ";
+  }
+  stream << "\"monotonic_start_time\": " << parts.monotonic_start_time
+         << ", \"realtime_start_time\": " << parts.realtime_start_time << ", [";
+
+  for (size_t i = 0; i < parts.parts.size(); ++i) {
+    if (i != 0u) {
+      stream << ", ";
+    }
+    stream << parts.parts[i];
+  }
+
+  stream << "]}";
+  return stream;
+}
+
+std::vector<std::vector<std::string>> ToLogReaderVector(
+    const std::vector<LogFile> &log_files) {
+  std::vector<std::vector<std::string>> result;
+  for (const LogFile &log_file : log_files) {
+    for (const LogParts &log_parts : log_file.parts) {
+      std::vector<std::string> parts;
+      for (const std::string &part : log_parts.parts) {
+        parts.emplace_back(part);
+      }
+      result.emplace_back(std::move(parts));
+    }
   }
   return result;
 }
@@ -648,6 +814,12 @@
     : LogReader(std::vector<std::vector<std::string>>{filenames},
                 replay_configuration) {}
 
+// TODO(austin): Make this the base and kill the others.  This has much better
+// context for sorting.
+LogReader::LogReader(const std::vector<LogFile> &log_files,
+                     const Configuration *replay_configuration)
+    : LogReader(ToLogReaderVector(log_files), replay_configuration) {}
+
 LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
                      const Configuration *replay_configuration)
     : filenames_(filenames),
@@ -721,7 +893,8 @@
   return configuration::GetNodes(remapped_configuration_);
 }
 
-monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
+monotonic_clock::time_point LogReader::monotonic_start_time(
+    const Node *node) const {
   State *state =
       states_[configuration::GetNodeIndex(configuration(), node)].get();
   CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
@@ -729,7 +902,8 @@
   return state->monotonic_start_time();
 }
 
-realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
+realtime_clock::time_point LogReader::realtime_start_time(
+    const Node *node) const {
   State *state =
       states_[configuration::GetNodeIndex(configuration(), node)].get();
   CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
@@ -745,6 +919,7 @@
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
+  remapped_configuration_ = event_loop_factory_->configuration();
 
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
@@ -1094,7 +1269,6 @@
   }
 }
 
-
 void LogReader::Register(EventLoop *event_loop) {
   State *state =
       states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
@@ -1194,8 +1368,7 @@
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
                "not matched?  Use --skip_missing_forwarding_entries to "
-               "ignore "
-               "this.";
+               "ignore this.";
 
         if (update_time) {
           // Confirm that the message was sent on the sending node before the
@@ -1319,9 +1492,8 @@
                      });
 
       for (size_t i = 0; i < states_.size(); ++i) {
-        VLOG(1) << MaybeNodeName(
-                       states_[i]->event_loop()->node())
-                << "before " << states_[i]->monotonic_now();
+        VLOG(1) << MaybeNodeName(states_[i]->event_loop()->node()) << "before "
+                << states_[i]->monotonic_now();
       }
 
       UpdateOffsets();
@@ -1329,9 +1501,8 @@
               << state->monotonic_now();
 
       for (size_t i = 0; i < states_.size(); ++i) {
-        VLOG(1) << MaybeNodeName(
-                       states_[i]->event_loop()->node())
-                << "after " << states_[i]->monotonic_now();
+        VLOG(1) << MaybeNodeName(states_[i]->event_loop()->node()) << "after "
+                << states_[i]->monotonic_now();
       }
 
       // TODO(austin): We should be perfect.
@@ -1422,6 +1593,50 @@
              << type;
 }
 
+void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
+                                   const Node *node,
+                                   std::string_view add_prefix) {
+  VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
+  const Channel *remapped_channel =
+      configuration::GetChannel(logged_configuration(), name, type, "", node);
+  CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
+                                     << "\", \"type\": \"" << type << "\"}";
+  VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
+          << "\"}";
+  VLOG(1) << "Remapped "
+          << aos::configuration::StrippedChannelToString(remapped_channel);
+
+  // We want to make /spray on node 0 go to /0/spray by snooping the maps.  And
+  // we want it to degrade if the heuristics fail to just work.
+  //
+  // The easiest way to do this is going to be incredibly specific and verbose.
+  // Look up /spray, to /0/spray.  Then, prefix the result with /original to get
+  // /original/0/spray.  Then, create a map from /original/spray to
+  // /original/0/spray for just the type we were asked for.
+  if (name != remapped_channel->name()->string_view()) {
+    MapT new_map;
+    new_map.match = std::make_unique<ChannelT>();
+    new_map.match->name = absl::StrCat(add_prefix, name);
+    new_map.match->type = type;
+    if (node != nullptr) {
+      new_map.match->source_node = node->name()->str();
+    }
+    new_map.rename = std::make_unique<ChannelT>();
+    new_map.rename->name =
+        absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+    maps_.emplace_back(std::move(new_map));
+  }
+
+  const size_t channel_index =
+      configuration::ChannelIndex(logged_configuration(), remapped_channel);
+  CHECK_EQ(0u, remapped_channels_.count(channel_index))
+      << "Already remapped channel "
+      << configuration::CleanedChannelToString(remapped_channel);
+  remapped_channels_[channel_index] =
+      absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+  MakeRemappedConfig();
+}
+
 void LogReader::MakeRemappedConfig() {
   for (std::unique_ptr<State> &state : states_) {
     if (state) {
@@ -1490,10 +1705,46 @@
         &new_config_fbb));
   }
   // Create the Configuration containing the new channels that we want to add.
-  const auto new_name_vector_offsets =
+  const auto new_channel_vector_offsets =
       new_config_fbb.CreateVector(channel_offsets);
+
+  // Now create the new maps.
+  std::vector<flatbuffers::Offset<Map>> map_offsets;
+  for (const MapT &map : maps_) {
+    const flatbuffers::Offset<flatbuffers::String> match_name_offset =
+        new_config_fbb.CreateString(map.match->name);
+    const flatbuffers::Offset<flatbuffers::String> match_type_offset =
+        new_config_fbb.CreateString(map.match->type);
+    const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
+        new_config_fbb.CreateString(map.rename->name);
+    flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
+    if (!map.match->source_node.empty()) {
+      match_source_node_offset =
+          new_config_fbb.CreateString(map.match->source_node);
+    }
+    Channel::Builder match_builder(new_config_fbb);
+    match_builder.add_name(match_name_offset);
+    match_builder.add_type(match_type_offset);
+    if (!map.match->source_node.empty()) {
+      match_builder.add_source_node(match_source_node_offset);
+    }
+    const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
+
+    Channel::Builder rename_builder(new_config_fbb);
+    rename_builder.add_name(rename_name_offset);
+    const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
+
+    Map::Builder map_builder(new_config_fbb);
+    map_builder.add_match(match_offset);
+    map_builder.add_rename(rename_offset);
+    map_offsets.emplace_back(map_builder.Finish());
+  }
+
+  const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
+
   ConfigurationBuilder new_config_builder(new_config_fbb);
-  new_config_builder.add_channels(new_name_vector_offsets);
+  new_config_builder.add_channels(new_channel_vector_offsets);
+  new_config_builder.add_maps(new_maps_offsets);
   new_config_fbb.Finish(new_config_builder.Finish());
   const FlatbufferDetachedBuffer<Configuration> new_name_config =
       new_config_fbb.Release();
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 2905dfa..c651c35 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -17,24 +17,24 @@
   // Time this log file started on the monotonic clock in nanoseconds.
   // If this isn't known (the log file is being recorded from another node
   // where we don't know the time offset), both timestamps will be min_time.
-  monotonic_start_time:int64 = -9223372036854775808;
+  monotonic_start_time:int64 = -9223372036854775808 (id: 0);
   // Time this log file started on the realtime clock in nanoseconds.
-  realtime_start_time:int64 = -9223372036854775808;
+  realtime_start_time:int64 = -9223372036854775808 (id: 1);
 
   // Messages are not written in order to disk.  They will be out of order by
   // at most this duration (in nanoseconds).  If the log reader buffers until
   // it finds messages this much newer than it's simulation time, it will never
   // find a message out of order.
-  max_out_of_order_duration:long;
+  max_out_of_order_duration:long (id: 2);
 
   // The configuration of the channels.
-  configuration:aos.Configuration;
+  configuration:aos.Configuration (id: 3);
 
   // Name of the device which this log file is for.
-  name:string;
+  name:string (id: 4);
 
   // The current node, if known and running in a multi-node configuration.
-  node:Node;
+  node:Node (id: 5);
 
   // All UUIDs are uuid4.
 
@@ -45,10 +45,10 @@
   // ---- basename_timestamps/pi1/aos/remote_timestamps/pi2/aos.logger.MessageHeader.part0.bfbs, etc.
   //  \-- basename_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs, etc.
 
-  // All log files and parts from a single run of a logger executable will have
+  // All log files and parts from a single logging event will have
   // the same uuid.  This should be all the files generated on a single node.
   // Used to correlate files recorded together.
-  logger_uuid:string;
+  logger_uuid:string (id: 6);
 
   // Part files which go together all have headers.  When creating a log file
   // with multiple parts, the logger should stop writing to part n-1 as soon
@@ -61,41 +61,41 @@
   // will be created when the start time is known.
 
   // All the parts which go together have the same uuid.
-  parts_uuid:string;
+  parts_uuid:string (id: 7);
   // And the parts_index corresponds to which part this is in the sequence.  The
   // index should start at 0.
-  parts_index:int32;
+  parts_index:int32 (id: 8);
 }
 
 // Table holding a message.
 table MessageHeader {
   // Index into the channel datastructure in the log file header.  This
   // provides the data type.
-  channel_index:uint;
+  channel_index:uint (id: 0);
   // Time this message was sent on the monotonic clock in nanoseconds on this
   // node.
-  monotonic_sent_time:long;
+  monotonic_sent_time:long (id: 1);
   // Time this message was sent on the realtime clock in nanoseconds on this
   // node.
-  realtime_sent_time:long;
+  realtime_sent_time:long (id: 2);
   // Index into the ipc queue of this message.  This should start with 0 and
   // always monotonically increment if no messages were ever lost.  It will
   // wrap at a multiple of the queue size.
-  queue_index:uint;
+  queue_index:uint (id: 3);
 
   // TODO(austin): Format?  Compressed?
 
   // The nested flatbuffer.
-  data:[ubyte];
+  data:[ubyte] (id: 4);
 
   // Time this message was sent on the monotonic clock of the remote node in
   // nanoseconds.
-  monotonic_remote_time:int64 = -9223372036854775808;
+  monotonic_remote_time:int64 = -9223372036854775808 (id: 5);
   // Time this message was sent on the realtime clock of the remote node in
   // nanoseconds.
-  realtime_remote_time:int64 = -9223372036854775808;
+  realtime_remote_time:int64 = -9223372036854775808 (id: 6);
   // Queue index of this message on the remote node.
-  remote_queue_index:uint32 = 4294967295;
+  remote_queue_index:uint32 = 4294967295 (id: 7);
 }
 
 root_type MessageHeader;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 32c2022..e8856f3 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -32,66 +32,69 @@
 class Logger {
  public:
   // Constructs a logger.
-  //   base_name/log_namer: Object used to write data to disk in one or more log
-  //     files.  If a base_name is passed in, a LocalLogNamer is wrapped
-  //     around it.
   //   event_loop: The event loop used to read the messages.
-  //   polling_period: The period used to poll the data.
   //   configuration: When provided, this is the configuration to log, and the
   //     configuration to use for the channel list to log.  If not provided,
   //     this becomes the configuration from the event loop.
-  Logger(std::string_view base_name, EventLoop *event_loop,
-         std::chrono::milliseconds polling_period =
-             std::chrono::milliseconds(100));
-  Logger(std::string_view base_name, EventLoop *event_loop,
-         const Configuration *configuration,
-         std::chrono::milliseconds polling_period =
-             std::chrono::milliseconds(100));
-  Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-         std::chrono::milliseconds polling_period =
-             std::chrono::milliseconds(100));
-  Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-         const Configuration *configuration,
-         std::chrono::milliseconds polling_period =
-             std::chrono::milliseconds(100));
+  //   should_log: When provided, a filter for channels to log. If not provided,
+  //     all available channels are logged.
+  Logger(EventLoop *event_loop)
+      : Logger(event_loop, event_loop->configuration()) {}
+  Logger(EventLoop *event_loop, const Configuration *configuration)
+      : Logger(event_loop, configuration,
+               [](const Channel *) { return true; }) {}
+  Logger(EventLoop *event_loop, const Configuration *configuration,
+         std::function<bool(const Channel *)> should_log);
   ~Logger();
 
   // Overrides the name in the log file header.
   void set_name(std::string_view name) { name_ = name; }
 
+  // Sets the callback to run after each period of data is logged. Defaults to
+  // doing nothing.
+  //
+  // This callback may safely do things like call Rotate().
+  void set_on_logged_period(std::function<void()> on_logged_period) {
+    on_logged_period_ = std::move(on_logged_period);
+  }
+
+  // Sets the period between polling the data. Defaults to 100ms.
+  //
+  // Changing this while a set of files is being written may result in
+  // unreadable files.
+  void set_polling_period(std::chrono::nanoseconds polling_period) {
+    polling_period_ = polling_period;
+  }
+
   // Rotates the log file(s), triggering new part files to be written for each
   // log file.
   void Rotate();
 
+  // Starts logging to files with the given naming scheme.
+  void StartLogging(std::unique_ptr<LogNamer> log_namer);
+
+  // Stops logging. Ensures any messages through end_time make it into the log.
+  //
+  // If you want to stop ASAP, pass min_time to avoid reading any more messages.
+  //
+  // Returns the LogNamer in case the caller wants to do anything else with it
+  // before destroying it.
+  std::unique_ptr<LogNamer> StopLogging(
+      aos::monotonic_clock::time_point end_time);
+
+  // Returns whether a log is currently being written.
+  bool is_started() const { return static_cast<bool>(log_namer_); }
+
+  // Shortcut to call StartLogging with a LocalLogNamer when event processing
+  // starts.
+  void StartLoggingLocalNamerOnRun(std::string base_name) {
+    event_loop_->OnRun([this, base_name]() {
+      StartLogging(
+          std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
+    });
+  }
+
  private:
-  void WriteHeader();
-  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
-      const Node *node);
-
-  bool MaybeUpdateTimestamp(
-      const Node *node, int node_index,
-      aos::monotonic_clock::time_point monotonic_start_time,
-      aos::realtime_clock::time_point realtime_start_time);
-
-  void DoLogData();
-
-  void WriteMissingTimestamps();
-
-  void StartLogging();
-
-  // Fetches from each channel until all the data is logged.
-  void LogUntil(monotonic_clock::time_point t);
-
-  EventLoop *event_loop_;
-  const UUID uuid_;
-  std::unique_ptr<LogNamer> log_namer_;
-
-  // The configuration to place at the top of the log file.
-  const Configuration *configuration_;
-
-  // Name to save in the log file.  Defaults to hostname.
-  std::string name_;
-
   // Structure to track both a fetcher, and if the data fetched has been
   // written.  We may want to delay writing data to disk so that we don't let
   // data get too far out of order when written to disk so we can avoid making
@@ -101,41 +104,25 @@
     bool written = false;
 
     int channel_index = -1;
+    const Channel *channel = nullptr;
+    const Node *timestamp_node = nullptr;
 
     LogType log_type = LogType::kLogMessage;
 
+    // We fill out the metadata at construction, but the actual writers have to
+    // be updated each time we start logging. To avoid duplicating the complex
+    // logic determining whether each writer should be initialized, we just
+    // stash the answer in separate member variables.
+    bool wants_writer = false;
     DetachedBufferWriter *writer = nullptr;
+    bool wants_timestamp_writer = false;
     DetachedBufferWriter *timestamp_writer = nullptr;
+    bool wants_contents_writer = false;
     DetachedBufferWriter *contents_writer = nullptr;
-    const Node *writer_node = nullptr;
-    const Node *timestamp_node = nullptr;
+
     int node_index = 0;
   };
 
-  std::vector<FetcherStruct> fetchers_;
-  TimerHandler *timer_handler_;
-
-  // Period to poll the channels.
-  const std::chrono::milliseconds polling_period_;
-
-  // Last time that data was written for all channels to disk.
-  monotonic_clock::time_point last_synchronized_time_;
-
-  monotonic_clock::time_point monotonic_start_time_;
-  realtime_clock::time_point realtime_start_time_;
-
-  // Max size that the header has consumed.  This much extra data will be
-  // reserved in the builder to avoid reallocating.
-  size_t max_header_size_ = 0;
-
-  // Fetcher for all the statistics from all the nodes.
-  aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
-
-  // Sets the start time for a specific node.
-  void SetStartTime(size_t node_index,
-                    aos::monotonic_clock::time_point monotonic_start_time,
-                    aos::realtime_clock::time_point realtime_start_time);
-
   struct NodeState {
     aos::monotonic_clock::time_point monotonic_start_time =
         aos::monotonic_clock::min_time;
@@ -145,12 +132,96 @@
     aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
         aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
   };
+
+  void WriteHeader();
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+      const Node *node);
+
+  bool MaybeUpdateTimestamp(
+      const Node *node, int node_index,
+      aos::monotonic_clock::time_point monotonic_start_time,
+      aos::realtime_clock::time_point realtime_start_time);
+
+  void DoLogData(const monotonic_clock::time_point end_time);
+
+  void WriteMissingTimestamps();
+
+  // Fetches from each channel until all the data is logged.
+  void LogUntil(monotonic_clock::time_point t);
+
+  // Sets the start time for a specific node.
+  void SetStartTime(size_t node_index,
+                    aos::monotonic_clock::time_point monotonic_start_time,
+                    aos::realtime_clock::time_point realtime_start_time);
+
+  EventLoop *event_loop_;
+  UUID uuid_ = UUID::Zero();
+  std::unique_ptr<LogNamer> log_namer_;
+
+  // The configuration to place at the top of the log file.
+  const Configuration *const configuration_;
+
+  // Name to save in the log file.  Defaults to hostname.
+  std::string name_;
+
+  std::function<void()> on_logged_period_ = []() {};
+
+  std::vector<FetcherStruct> fetchers_;
+  TimerHandler *timer_handler_;
+
+  // Period to poll the channels.
+  std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
+
+  // Last time that data was written for all channels to disk.
+  monotonic_clock::time_point last_synchronized_time_;
+
+  // Max size that the header has consumed.  This much extra data will be
+  // reserved in the builder to avoid reallocating.
+  size_t max_header_size_ = 0;
+
+  // Fetcher for all the statistics from all the nodes.
+  aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
   std::vector<NodeState> node_state_;
 };
 
+// Datastructure to hold ordered parts.
+struct LogParts {
+  // Monotonic and realtime start times for this set of log files.  For log
+  // files which started out unknown and then became known, this is the known
+  // start time.
+  aos::monotonic_clock::time_point monotonic_start_time;
+  aos::realtime_clock::time_point realtime_start_time;
+
+  // UUIDs if available.
+  std::string logger_uuid;
+  std::string parts_uuid;
+
+  // The node this represents, or empty if we are in a single node world.
+  std::string node;
+
+  // Pre-sorted list of parts.
+  std::vector<std::string> parts;
+};
+
+// Datastructure to hold parts from the same run of the logger which have no
+// ordering constraints relative to each other.
+struct LogFile {
+  // The UUID tying them all together (if available)
+  std::string logger_uuid;
+
+  // All the parts, unsorted.
+  std::vector<LogParts> parts;
+};
+
+std::ostream &operator<<(std::ostream &stream, const LogFile &file);
+std::ostream &operator<<(std::ostream &stream, const LogParts &parts);
+
 // Takes a bunch of parts and sorts them based on part_uuid and part_index.
-std::vector<std::vector<std::string>> SortParts(
-    const std::vector<std::string> &parts);
+std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
+
+std::vector<std::vector<std::string>> ToLogReaderVector(
+    const std::vector<LogFile> &log_files);
 
 // We end up with one of the following 3 log file types.
 //
@@ -205,6 +276,8 @@
             const Configuration *replay_configuration = nullptr);
   LogReader(const std::vector<std::vector<std::string>> &filenames,
             const Configuration *replay_configuration = nullptr);
+  LogReader(const std::vector<LogFile> &log_files,
+            const Configuration *replay_configuration = nullptr);
   ~LogReader();
 
   // Registers all the callbacks to send the log file data out on an event loop
@@ -233,7 +306,11 @@
   // SimulatedEventLoopFactory, and then get the configuration from there for
   // everything else.
   const Configuration *logged_configuration() const;
-  // Returns the configuration being used for replay.
+  // Returns the configuration being used for replay from the log file.
+  // Note that this may be different from the configuration actually used for
+  // handling events. You should generally only use this to create a
+  // SimulatedEventLoopFactory, and then get the configuration from there for
+  // everything else.
   // The pointer is invalidated whenever RemapLoggedChannel is called.
   const Configuration *configuration() const;
 
@@ -243,8 +320,10 @@
   std::vector<const Node *> Nodes() const;
 
   // Returns the starting timestamp for the log file.
-  monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
-  realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
+  monotonic_clock::time_point monotonic_start_time(
+      const Node *node = nullptr) const;
+  realtime_clock::time_point realtime_start_time(
+      const Node *node = nullptr) const;
 
   // Causes the logger to publish the provided channel on a different name so
   // that replayed applications can publish on the proper channel name without
@@ -258,11 +337,28 @@
     RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
   }
 
+  // Remaps the provided channel, though this respects node mappings, and
+  // preserves them too.  This makes it so if /aos -> /pi1/aos on one node,
+  // /original/aos -> /original/pi1/aos on the same node after renaming, just
+  // like you would hope.
+  //
+  // TODO(austin): If you have 2 nodes remapping something to the same channel,
+  // this doesn't handle that.  No use cases exist yet for that, so it isn't
+  // being done yet.
+  void RemapLoggedChannel(std::string_view name, std::string_view type,
+                          const Node *node,
+                          std::string_view add_prefix = "/original");
   template <typename T>
-  bool HasChannel(std::string_view name) {
+  void RemapLoggedChannel(std::string_view name, const Node *node,
+                          std::string_view add_prefix = "/original") {
+    RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
+  }
+
+  template <typename T>
+  bool HasChannel(std::string_view name, const Node *node = nullptr) {
     return configuration::GetChannel(log_file_header()->configuration(), name,
                                      T::GetFullyQualifiedName(), "",
-                                     nullptr) != nullptr;
+                                     node) != nullptr;
   }
 
   SimulatedEventLoopFactory *event_loop_factory() {
@@ -524,7 +620,7 @@
   // constrain our time function.
   //
   // One is simple.  The distributed clock is the average of all the clocks.
-  //   (ta + tb + tc + td) / num_nodex = t_distributed
+  //   (ta + tb + tc + td) / num_nodes = t_distributed
   //
   // The second is a bit more complicated.  Our basic time conversion function
   // is:
@@ -540,7 +636,7 @@
   // per-node times at two set distributed clock times, we will be able to
   // recreate the linear function (we know it is linear).  We can do a similar
   // thing by breaking our equation up into:
-  // 
+  //
   // [1/3  1/3  1/3  ] [ta]   [t_distributed]
   // [ 1  -1-m1  0   ] [tb] = [oab]
   // [ 1    0  -1-m2 ] [tc]   [oac]
@@ -583,6 +679,7 @@
   // logged_configuration(), and the string key will be the name of the channel
   // to send on instead of the logged channel name.
   std::map<size_t, std::string> remapped_channels_;
+  std::vector<MapT> maps_;
 
   // Number of nodes which still have data to send.  This is used to figure out
   // when to exit.
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index f95fa17..b1dce96 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -1,8 +1,8 @@
 #include "aos/configuration.h"
 #include "aos/events/logging/logger.h"
 #include "aos/events/shm_event_loop.h"
-#include "aos/logging/log_namer.h"
 #include "aos/init.h"
+#include "aos/logging/log_namer.h"
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
@@ -33,8 +33,9 @@
         event_loop.node());
   }
 
-  aos::logger::Logger logger(std::move(log_namer), &event_loop,
-                             std::chrono::milliseconds(100));
+  aos::logger::Logger logger(&event_loop);
+  event_loop.OnRun(
+      [&log_namer, &logger]() { logger.StartLogging(std::move(log_namer)); });
 
   event_loop.Run();
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 47fe9e2..2095529 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,6 +1,7 @@
 #include "aos/events/logging/logger.h"
 
 #include "aos/events/event_loop.h"
+#include "aos/events/message_counter.h"
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
@@ -14,6 +15,7 @@
 namespace testing {
 
 namespace chrono = std::chrono;
+using aos::testing::MessageCounter;
 
 class LoggerTest : public ::testing::Test {
  public:
@@ -39,6 +41,8 @@
   Pong pong_;
 };
 
+using LoggerDeathTest = LoggerTest;
+
 // Tests that we can startup at all.  This confirms that the channels are all in
 // the config.
 TEST_F(LoggerTest, Starts) {
@@ -56,8 +60,9 @@
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
-    Logger logger(base_name, logger_event_loop.get(),
-                  std::chrono::milliseconds(100));
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
@@ -99,6 +104,130 @@
   EXPECT_EQ(ping_count, 2010);
 }
 
+// Tests calling StartLogging twice.
+TEST_F(LoggerDeathTest, ExtraStart) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string base_name1 = tmpdir + "/logfile1";
+  const ::std::string logfile1 = base_name1 + ".part0.bfbs";
+  const ::std::string base_name2 = tmpdir + "/logfile2";
+  const ::std::string logfile2 = base_name2 + ".part0.bfbs";
+  unlink(logfile1.c_str());
+  unlink(logfile2.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile1 << " then " << logfile2;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger_event_loop->OnRun(
+        [base_name1, base_name2, &logger_event_loop, &logger]() {
+          logger.StartLogging(std::make_unique<LocalLogNamer>(
+              base_name1, logger_event_loop->node()));
+          EXPECT_DEATH(logger.StartLogging(std::make_unique<LocalLogNamer>(
+                           base_name2, logger_event_loop->node())),
+                       "Already logging");
+        });
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+}
+
+// Tests calling StopLogging twice.
+TEST_F(LoggerDeathTest, ExtraStop) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string logfile = base_name + ".part0.bfbs";
+  // Remove it.
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger_event_loop->OnRun([base_name, &logger_event_loop, &logger]() {
+      logger.StartLogging(std::make_unique<LocalLogNamer>(
+          base_name, logger_event_loop->node()));
+      logger.StopLogging(aos::monotonic_clock::min_time);
+      EXPECT_DEATH(logger.StopLogging(aos::monotonic_clock::min_time),
+                   "Not logging right now");
+    });
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+}
+
+// Tests that we can startup twice.
+TEST_F(LoggerTest, StartsTwice) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string base_name1 = tmpdir + "/logfile1";
+  const ::std::string logfile1 = base_name1 + ".part0.bfbs";
+  const ::std::string base_name2 = tmpdir + "/logfile2";
+  const ::std::string logfile2 = base_name2 + ".part0.bfbs";
+  unlink(logfile1.c_str());
+  unlink(logfile2.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile1 << " then " << logfile2;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLogging(
+        std::make_unique<LocalLogNamer>(base_name1, logger_event_loop->node()));
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+    logger.StopLogging(logger_event_loop->monotonic_now());
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+    logger.StartLogging(
+        std::make_unique<LocalLogNamer>(base_name2, logger_event_loop->node()));
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+  }
+
+  for (const auto &logfile :
+       {std::make_tuple(logfile1, 10), std::make_tuple(logfile2, 2010)}) {
+    SCOPED_TRACE(std::get<0>(logfile));
+    LogReader reader(std::get<0>(logfile));
+    reader.Register();
+
+    EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(nullptr));
+
+    std::unique_ptr<EventLoop> test_event_loop =
+        reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+    int ping_count = std::get<1>(logfile);
+    int pong_count = std::get<1>(logfile);
+
+    // Confirm that the ping and pong counts both match, and the value also
+    // matches.
+    test_event_loop->MakeWatcher("/test",
+                                 [&ping_count](const examples::Ping &ping) {
+                                   EXPECT_EQ(ping.value(), ping_count + 1);
+                                   ++ping_count;
+                                 });
+    test_event_loop->MakeWatcher(
+        "/test", [&pong_count, &ping_count](const examples::Pong &pong) {
+          EXPECT_EQ(pong.value(), pong_count + 1);
+          ++pong_count;
+          EXPECT_EQ(ping_count, pong_count);
+        });
+
+    reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+    EXPECT_EQ(ping_count, std::get<1>(logfile) + 1000);
+  }
+}
+
 // Tests that we can read and write rotated log files.
 TEST_F(LoggerTest, RotatedLogFile) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));
@@ -117,9 +246,9 @@
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
-    Logger logger(
-        std::make_unique<LocalLogNamer>(base_name, logger_event_loop->node()),
-        logger_event_loop.get(), std::chrono::milliseconds(100));
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
     logger.Rotate();
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
@@ -216,8 +345,9 @@
                            chrono::microseconds(50));
     });
 
-    Logger logger(base_name, logger_event_loop.get(),
-                  std::chrono::milliseconds(100));
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
 
     event_loop_factory_.RunFor(chrono::milliseconds(1000));
   }
@@ -287,11 +417,13 @@
   }
 
   void StartLogger(LoggerState *logger) {
-    logger->logger = std::make_unique<Logger>(
-        std::make_unique<MultiNodeLogNamer>(logfile_base_,
-                                            logger->event_loop->configuration(),
-                                            logger->event_loop->node()),
-        logger->event_loop.get(), chrono::milliseconds(100));
+    logger->logger = std::make_unique<Logger>(logger->event_loop.get());
+    logger->logger->set_polling_period(std::chrono::milliseconds(100));
+    logger->event_loop->OnRun([this, logger]() {
+      logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
+          logfile_base_, logger->event_loop->configuration(),
+          logger->event_loop->node()));
+    });
   }
 
   // Config and factory.
@@ -973,16 +1105,116 @@
     event_loop_factory_.RunFor(chrono::milliseconds(2000));
   }
 
-  const std::vector<std::vector<std::string>> sorted_parts =
-      SortParts(logfiles_);
+  const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+
+  EXPECT_EQ(sorted_parts.size(), 2u);
+
+  // Count up the number of UUIDs and make sure they are what we expect as a
+  // sanity check.
+  std::set<std::string> logger_uuids;
+  std::set<std::string> parts_uuids;
+  std::set<std::string> both_uuids;
+
+  size_t missing_rt_count = 0;
+
+  for (const LogFile &log_file : sorted_parts) {
+    EXPECT_FALSE(log_file.logger_uuid.empty());
+    logger_uuids.insert(log_file.logger_uuid);
+    both_uuids.insert(log_file.logger_uuid);
+
+    for (const LogParts &part : log_file.parts) {
+      EXPECT_NE(part.monotonic_start_time, aos::monotonic_clock::min_time)
+          << ": " << part;
+      missing_rt_count +=
+          part.realtime_start_time == aos::realtime_clock::min_time;
+
+      EXPECT_TRUE(logger_uuids.find(part.logger_uuid) != logger_uuids.end());
+      EXPECT_NE(part.node, "");
+      parts_uuids.insert(part.parts_uuid);
+      both_uuids.insert(part.parts_uuid);
+    }
+  }
+
+  // We won't have RT timestamps for 5 log files.  We don't log the RT start
+  // time on remote nodes because we don't know it and would be guessing.  And
+  // the log reader can actually do a better job.
+  EXPECT_EQ(missing_rt_count, 5u);
+
+  EXPECT_EQ(logger_uuids.size(), 2u);
+  EXPECT_EQ(parts_uuids.size(), ToLogReaderVector(sorted_parts).size());
+  EXPECT_EQ(logger_uuids.size() + parts_uuids.size(), both_uuids.size());
 
   // Test that each list of parts is in order.  Don't worry about the ordering
   // between part file lists though.
   // (inner vectors all need to be in order, but outer one doesn't matter).
-  EXPECT_THAT(sorted_parts,
+  EXPECT_THAT(ToLogReaderVector(sorted_parts),
               ::testing::UnorderedElementsAreArray(structured_logfiles_));
 }
 
+// Tests that if we remap a remapped channel, it shows up correctly.
+TEST_F(MultinodeLoggerTest, RemapLoggedChannel) {
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(structured_logfiles_);
+
+  // Remap just on pi1.
+  reader.RemapLoggedChannel<aos::timing::Report>(
+      "/aos", configuration::GetNode(reader.configuration(), "pi1"));
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+  // else should have moved.
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  pi1_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> full_pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  full_pi1_event_loop->SkipTimingReport();
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+  pi2_event_loop->SkipTimingReport();
+
+  MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
+                                                        "/aos");
+  MessageCounter<aos::timing::Report> full_pi1_timing_report(
+      full_pi1_event_loop.get(), "/pi1/aos");
+  MessageCounter<aos::timing::Report> pi1_original_timing_report(
+      pi1_event_loop.get(), "/original/aos");
+  MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
+      full_pi1_event_loop.get(), "/original/pi1/aos");
+  MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
+                                                        "/aos");
+
+  log_reader_factory.Run();
+
+  EXPECT_EQ(pi1_timing_report.count(), 0u);
+  EXPECT_EQ(full_pi1_timing_report.count(), 0u);
+  EXPECT_NE(pi1_original_timing_report.count(), 0u);
+  EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
+  EXPECT_NE(pi2_timing_report.count(), 0u);
+
+  reader.Deregister();
+}
+
 // TODO(austin): We can write a test which recreates a logfile and confirms that
 // we get it back.  That is the ultimate test.
 
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
new file mode 100644
index 0000000..a8cafda
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.cc
@@ -0,0 +1,194 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+namespace {
+
+// Returns if `status` is not an error code, otherwise logs the appropriate
+// error message and crashes.
+void CheckLzmaCodeIsOk(lzma_ret status) {
+  switch (status) {
+    case LZMA_OK:
+    case LZMA_STREAM_END:
+      return;
+    case LZMA_MEM_ERROR:
+      LOG(FATAL) << "Memory allocation failed:" << status;
+    case LZMA_OPTIONS_ERROR:
+      LOG(FATAL) << "The given compression preset or decompression options are "
+                    "not supported: "
+                 << status;
+    case LZMA_UNSUPPORTED_CHECK:
+      LOG(FATAL) << "The given check type is not supported: " << status;
+    case LZMA_PROG_ERROR:
+      LOG(FATAL) << "One or more of the parameters have values that will never "
+                    "be valid: "
+                 << status;
+    case LZMA_MEMLIMIT_ERROR:
+      LOG(FATAL) << "Decoder needs more memory than allowed by the specified "
+                    "memory usage limit: "
+                 << status;
+    case LZMA_FORMAT_ERROR:
+      LOG(FATAL) << "File format not recognized: " << status;
+    case LZMA_DATA_ERROR:
+      LOG(FATAL) << "Compressed file is corrupt: " << status;
+    case LZMA_BUF_ERROR:
+      LOG(FATAL) << "Compressed file is truncated or corrupt: " << status;
+    default:
+      LOG(FATAL) << "Unexpected return value: " << status;
+  }
+}
+
+}  // namespace
+
+LzmaEncoder::LzmaEncoder(const uint32_t compression_preset)
+    : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
+  CHECK_GE(compression_preset_, 0u)
+      << ": Compression preset must be in the range [0, 9].";
+  CHECK_LE(compression_preset_, 9u)
+      << ": Compression preset must be in the range [0, 9].";
+
+  lzma_ret status =
+      lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
+  CheckLzmaCodeIsOk(status);
+  stream_.avail_out = 0;
+  VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+}
+
+LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
+
+void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+  CHECK(in.data()) << ": Encode called with nullptr.";
+
+  stream_.next_in = in.data();
+  stream_.avail_in = in.size();
+
+  RunLzmaCode(LZMA_RUN);
+}
+
+void LzmaEncoder::Finish() { RunLzmaCode(LZMA_FINISH); }
+
+void LzmaEncoder::Clear(const int n) {
+  CHECK_GE(n, 0);
+  CHECK_LE(static_cast<size_t>(n), queue_size());
+  queue_.erase(queue_.begin(), queue_.begin() + n);
+  if (queue_.empty()) {
+    stream_.next_out = nullptr;
+    stream_.avail_out = 0;
+  }
+}
+
+std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
+  std::vector<absl::Span<const uint8_t>> queue;
+  if (queue_.empty()) {
+    return queue;
+  }
+  for (size_t i = 0; i < queue_.size() - 1; ++i) {
+    queue.emplace_back(
+        absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
+  }
+  // For the last buffer in the queue, we must account for the possibility that
+  // the buffer isn't full yet.
+  queue.emplace_back(absl::MakeConstSpan(
+      queue_.back().data(), queue_.back().size() - stream_.avail_out));
+  return queue;
+}
+
+size_t LzmaEncoder::queued_bytes() const {
+  size_t bytes = queue_size() * kEncodedBufferSizeBytes;
+  // Subtract the bytes that the encoder hasn't filled yet.
+  bytes -= stream_.avail_out;
+  return bytes;
+}
+
+void LzmaEncoder::RunLzmaCode(lzma_action action) {
+  CHECK(!finished_);
+
+  // This is to keep track of how many bytes resulted from encoding this input
+  // buffer.
+  size_t last_avail_out = stream_.avail_out;
+
+  while (stream_.avail_in > 0 || action == LZMA_FINISH) {
+    // If output buffer is full, create a new one, queue it up, and resume
+    // encoding. This could happen in the first call to Encode after
+    // construction or a Reset, or when an input buffer is large enough to fill
+    // more than one output buffer.
+    if (stream_.avail_out == 0) {
+      queue_.emplace_back();
+      queue_.back().resize(kEncodedBufferSizeBytes);
+      stream_.next_out = queue_.back().data();
+      stream_.avail_out = kEncodedBufferSizeBytes;
+      // Update the byte count.
+      total_bytes_ += last_avail_out;
+      last_avail_out = stream_.avail_out;
+    }
+
+    // Encode the data.
+    lzma_ret status = lzma_code(&stream_, action);
+    CheckLzmaCodeIsOk(status);
+    if (action == LZMA_FINISH) {
+      if (status == LZMA_STREAM_END) {
+        // This is returned when lzma_code is all done.
+        finished_ = true;
+        break;
+      }
+    } else {
+      CHECK(status != LZMA_STREAM_END);
+    }
+    VLOG(2) << "LzmaEncoder: Encoded chunk.";
+  }
+
+  // Update the number of resulting encoded bytes.
+  total_bytes_ += last_avail_out - stream_.avail_out;
+}
+
+LzmaDecoder::LzmaDecoder(std::string_view filename)
+    : dummy_decoder_(filename), stream_(LZMA_STREAM_INIT) {
+  compressed_data_.resize(kBufSize);
+
+  lzma_ret status =
+      lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
+  CheckLzmaCodeIsOk(status);
+  stream_.avail_out = 0;
+  VLOG(2) << "LzmaDecoder: Initialization succeeded.";
+}
+
+LzmaDecoder::~LzmaDecoder() { lzma_end(&stream_); }
+
+size_t LzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+  if (finished_) {
+    return 0;
+  }
+
+  // Write into the given range.
+  stream_.next_out = begin;
+  stream_.avail_out = end - begin;
+  // Keep decompressing until we run out of buffer space.
+  while (stream_.avail_out > 0) {
+    if (action_ == LZMA_RUN && stream_.avail_in == 0) {
+      // Read more bytes from the file if we're all out.
+      const size_t count =
+          dummy_decoder_.Read(compressed_data_.begin(), compressed_data_.end());
+      if (count == 0) {
+        // No more data to read in the file, begin the finishing operation.
+        action_ = LZMA_FINISH;
+      } else {
+        stream_.next_in = compressed_data_.data();
+        stream_.avail_in = count;
+      }
+    }
+    // Decompress the data.
+    const lzma_ret status = lzma_code(&stream_, action_);
+    // Return if we're done.
+    if (status == LZMA_STREAM_END) {
+      CHECK_EQ(action_, LZMA_FINISH)
+          << ": Got LZMA_STREAM_END when action wasn't LZMA_FINISH";
+      finished_ = true;
+      return (end - begin) - stream_.avail_out;
+    }
+    CheckLzmaCodeIsOk(status);
+  }
+  return end - begin;
+}
+
+}  // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
new file mode 100644
index 0000000..d7080a9
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.h
@@ -0,0 +1,72 @@
+#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/logging/buffer_encoder.h"
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Encodes buffers using liblzma.
+class LzmaEncoder final : public DetachedBufferEncoder {
+ public:
+  // Initializes the LZMA stream and encoder.
+  explicit LzmaEncoder(uint32_t compression_preset);
+  // Gracefully shuts down the encoder.
+  ~LzmaEncoder() final;
+
+  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  void Finish() final;
+  void Clear(int n) final;
+  std::vector<absl::Span<const uint8_t>> queue() const final;
+  size_t queued_bytes() const final;
+  size_t total_bytes() const final { return total_bytes_; }
+  size_t queue_size() const final { return queue_.size(); }
+
+ private:
+  static constexpr size_t kEncodedBufferSizeBytes{1024};
+
+  void RunLzmaCode(lzma_action action);
+
+  lzma_stream stream_;
+  uint32_t compression_preset_;
+  std::vector<ResizeableBuffer> queue_;
+  bool finished_ = false;
+  // Total bytes that resulted from encoding raw data since the last call to
+  // Reset.
+  size_t total_bytes_ = 0;
+};
+
+// Decompresses data with liblzma.
+class LzmaDecoder final : public DataDecoder {
+ public:
+  explicit LzmaDecoder(std::string_view filename);
+  ~LzmaDecoder();
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+  // Size of temporary buffer to use.
+  static constexpr size_t kBufSize{256 * 1024};
+
+  // Temporary buffer for storing compressed data.
+  ResizeableBuffer compressed_data_;
+  // Used for reading data from the file.
+  DummyDecoder dummy_decoder_;
+  // Stream for decompression.
+  lzma_stream stream_;
+  // The current action. This is LZMA_RUN until we've run out of data to read
+  // from the file.
+  lzma_action action_ = LZMA_RUN;
+  // Flag that represents whether or not all the data from the file has been
+  // successfully decoded.
+  bool finished_ = false;
+};
+
+}  // namespace aos::logger
+
+#endif  // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
new file mode 100644
index 0000000..2d619c4
--- /dev/null
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -0,0 +1,18 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+INSTANTIATE_TEST_CASE_P(
+    Lzma, BufferEncoderTest,
+    ::testing::Combine(::testing::Values([]() {
+                         return std::make_unique<LzmaEncoder>(2);
+                       }),
+                       ::testing::Values([](std::string_view filename) {
+                         return std::make_unique<LzmaDecoder>(filename);
+                       }),
+                       ::testing::Range(0, 100)));
+
+}  // namespace aos::logger::testing
diff --git a/aos/events/logging/uuid.cc b/aos/events/logging/uuid.cc
index 9298c8b..376fa95 100644
--- a/aos/events/logging/uuid.cc
+++ b/aos/events/logging/uuid.cc
@@ -57,4 +57,10 @@
   return result;
 }
 
+UUID UUID::Zero() {
+  UUID result;
+  result.data_.fill(0);
+  return result;
+}
+
 }  // namespace aos
diff --git a/aos/events/logging/uuid.h b/aos/events/logging/uuid.h
index b81b811..0387a4b 100644
--- a/aos/events/logging/uuid.h
+++ b/aos/events/logging/uuid.h
@@ -13,6 +13,8 @@
   // Returns a randomly generated UUID.  This is known as a UUID4.
   static UUID Random();
 
+  static UUID Zero();
+
   std::string_view string_view() const {
     return std::string_view(data_.data(), data_.size());
   }
diff --git a/aos/events/logging/uuid_test.cc b/aos/events/logging/uuid_test.cc
index d0320de..4ea351c 100644
--- a/aos/events/logging/uuid_test.cc
+++ b/aos/events/logging/uuid_test.cc
@@ -12,6 +12,8 @@
   LOG(INFO) << UUID::Random().string_view();
 
   EXPECT_NE(UUID::Random(), UUID::Random());
+  EXPECT_NE(UUID::Random(), UUID::Zero());
+  EXPECT_EQ(UUID::Zero(), UUID::Zero());
 }
 
 }  // namespace testing
diff --git a/aos/events/message_counter.h b/aos/events/message_counter.h
new file mode 100644
index 0000000..418c1eb
--- /dev/null
+++ b/aos/events/message_counter.h
@@ -0,0 +1,28 @@
+#ifndef AOS_EVENTS_MESSAGE_COUNTER_H_
+#define AOS_EVENTS_MESSAGE_COUNTER_H_
+
+#include "aos/events/event_loop.h"
+
+namespace aos {
+namespace testing {
+
+// Simple class to count messages on a channel easily.  This only counts
+// messages published while running.
+template <typename T>
+class MessageCounter {
+ public:
+  MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
+    event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
+  }
+
+  // Returns the number of messages seen so far.
+  size_t count() const { return count_; }
+
+ private:
+  size_t count_ = 0;
+};
+
+}  // namespace testing
+}  // namespace aos
+
+#endif  // AOS_EVENTS_MESSAGE_COUNTER_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 7de0540..81937ca 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -4,6 +4,7 @@
 
 #include "aos/events/event_loop_param_test.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/events/message_counter.h"
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
 #include "aos/events/test_message_generated.h"
@@ -14,6 +15,11 @@
 
 namespace aos {
 namespace testing {
+namespace {
+
+std::string ConfigPrefix() { return "aos/"; }
+
+}  // namespace
 
 namespace chrono = ::std::chrono;
 
@@ -254,25 +260,12 @@
             0.0);
 }
 
-template <typename T>
-class MessageCounter {
- public:
-  MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
-    event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
-  }
-
-  size_t count() const { return count_; }
-
- private:
-  size_t count_ = 0;
-};
-
 // Tests that ping and pong work when on 2 different nodes, and the message
 // gateway messages are sent out as expected.
 TEST(SimulatedEventLoopTest, MultinodePingPong) {
   aos::FlatbufferDetachedBuffer<aos::Configuration> config =
-      aos::configuration::ReadConfig(
-          "aos/events/multinode_pingpong_config.json");
+      aos::configuration::ReadConfig(ConfigPrefix() +
+                                     "events/multinode_pingpong_config.json");
   const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
   const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
   const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -573,8 +566,8 @@
 // ServerStatistics correctly.
 TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
   aos::FlatbufferDetachedBuffer<aos::Configuration> config =
-      aos::configuration::ReadConfig(
-          "aos/events/multinode_pingpong_config.json");
+      aos::configuration::ReadConfig(ConfigPrefix() +
+                                     "events/multinode_pingpong_config.json");
   const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
   const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
   const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -673,8 +666,8 @@
 // Test that disabling statistics actually disables them.
 TEST(SimulatedEventLoopTest, MultinodeWithoutStatistics) {
   aos::FlatbufferDetachedBuffer<aos::Configuration> config =
-      aos::configuration::ReadConfig(
-          "aos/events/multinode_pingpong_config.json");
+      aos::configuration::ReadConfig(ConfigPrefix() +
+                                     "events/multinode_pingpong_config.json");
   const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
   const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
   const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -774,5 +767,104 @@
   EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
 }
 
+// Tests that the time offset having a slope doesn't break the world.
+// SimulatedMessageBridge has enough self consistency CHECK statements to
+// confirm, and we can can also check a message in each direction to make sure
+// it gets delivered as expected.
+TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ConfigPrefix() +
+                                     "events/multinode_pingpong_config.json");
+  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+  NodeEventLoopFactory *pi2_factory =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
+
+  // Move the pi far into the future so the slope is significant.  And set it to
+  // something reasonable.
+  constexpr chrono::milliseconds kOffset{150100};
+  pi2_factory->SetDistributedOffset(kOffset, 1.0001);
+
+  std::unique_ptr<EventLoop> ping_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  Ping ping(ping_event_loop.get());
+
+  std::unique_ptr<EventLoop> pong_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  Pong pong(pong_event_loop.get());
+
+  std::unique_ptr<EventLoop> pi1_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
+  std::unique_ptr<EventLoop> pi2_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
+
+  aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+      pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+      pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
+
+  aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
+      pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
+  aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
+      pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
+
+  // End after a pong message comes back.  This will leave the latest messages
+  // on all channels so we can look at timestamps easily and check they make
+  // sense.
+  std::unique_ptr<EventLoop> pi1_pong_ender =
+      simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
+  int count = 0;
+  pi1_pong_ender->MakeWatcher(
+      "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
+        if (++count == 100) {
+          simulated_event_loop_factory.Exit();
+        }
+      });
+
+  // Run enough that messages should be delivered.
+  simulated_event_loop_factory.Run();
+
+  // Grab the latest messages.
+  EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
+  EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
+  EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
+  EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
+
+  // Compute their time on the global distributed clock so we can compute
+  // distance betwen them.
+  const distributed_clock::time_point pi1_ping_time =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
+          ->ToDistributedClock(
+              ping_on_pi1_fetcher.context().monotonic_event_time);
+  const distributed_clock::time_point pi2_ping_time =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+          ->ToDistributedClock(
+              ping_on_pi2_fetcher.context().monotonic_event_time);
+  const distributed_clock::time_point pi1_pong_time =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
+          ->ToDistributedClock(
+              pong_on_pi1_fetcher.context().monotonic_event_time);
+  const distributed_clock::time_point pi2_pong_time =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+          ->ToDistributedClock(
+              pong_on_pi2_fetcher.context().monotonic_event_time);
+
+  // And confirm the delivery delay is just about exactly 150 uS for both
+  // directions like expected.  There will be a couple ns of rounding errors in
+  // the conversion functions that aren't worth accounting for right now.  This
+  // will either be really close, or really far.
+  EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
+                               pi1_ping_time);
+  EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
+                               pi1_ping_time);
+
+  EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
+                               pi2_pong_time);
+  EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
+                               pi2_pong_time);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 825c830..5660331 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -82,7 +82,11 @@
         DeliveredTime(fetcher_->context());
 
     CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
-        << ": Trying to deliver message in the past...";
+        << ": Trying to deliver message in the past on channel "
+        << configuration::StrippedChannelToString(fetcher_->channel())
+        << " to node " << send_event_loop_->node()->name()->string_view()
+        << " sent from " << fetcher_->channel()->source_node()->string_view()
+        << " at " << fetch_node_factory_->monotonic_now();
 
     server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
                                             1);
@@ -142,11 +146,9 @@
     const distributed_clock::time_point distributed_sent_time =
         fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
 
-    return aos::monotonic_clock::epoch() +
-           (distributed_sent_time - send_node_factory_->ToDistributedClock(
-                                        aos::monotonic_clock::epoch())) +
-           send_node_factory_->network_delay() +
-           send_node_factory_->send_delay();
+    return send_node_factory_->FromDistributedClock(
+        distributed_sent_time + send_node_factory_->network_delay() +
+        send_node_factory_->send_delay());
   }
 
   // Factories used for time conversion.
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index bab006a..5da7466 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -5,6 +5,7 @@
 #include <string_view>
 
 #include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
 #include "aos/macros.h"
 #include "flatbuffers/flatbuffers.h"
 #include "glog/logging.h"
@@ -164,15 +165,16 @@
 class FlatbufferVector : public Flatbuffer<T> {
  public:
   // Builds a Flatbuffer around a vector.
-  FlatbufferVector(std::vector<uint8_t> &&data) : data_(std::move(data)) {}
+  FlatbufferVector(ResizeableBuffer &&data) : data_(std::move(data)) {}
 
   // Builds a Flatbuffer by copying the data from the other flatbuffer.
-  FlatbufferVector(const Flatbuffer<T> &other)
-      : data_(other.data(), other.data() + other.size()) {}
+  FlatbufferVector(const Flatbuffer<T> &other) {
+    data_.resize(other.size());
+    memcpy(data_.data(), other.data(), data_.size());
+  }
 
   // Copy constructor.
-  FlatbufferVector(const FlatbufferVector<T> &other)
-      : data_(other.data(), other.data() + other.size()) {}
+  FlatbufferVector(const FlatbufferVector<T> &other) : data_(other.data_) {}
 
   // Move constructor.
   FlatbufferVector(FlatbufferVector<T> &&other)
@@ -180,7 +182,7 @@
 
   // Copies the data from the other flatbuffer.
   FlatbufferVector &operator=(const FlatbufferVector<T> &other) {
-    data_ = std::vector<uint8_t>(other.data(), other.data() + other.size());
+    data_ = other.data_;
     return *this;
   }
   FlatbufferVector &operator=(FlatbufferVector<T> &&other) {
@@ -190,7 +192,7 @@
 
   // Constructs an empty flatbuffer of type T.
   static FlatbufferVector<T> Empty() {
-    return FlatbufferVector<T>(std::vector<uint8_t>{});
+    return FlatbufferVector<T>(ResizeableBuffer());
   }
 
   virtual ~FlatbufferVector() override {}
@@ -200,7 +202,7 @@
   size_t size() const override { return data_.size(); }
 
  private:
-  std::vector<uint8_t> data_;
+  ResizeableBuffer data_;
 };
 
 // This object associates the message type with the memory storing the
@@ -359,6 +361,16 @@
   flatbuffers::DetachedBuffer buffer_;
 };
 
+inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
+    absl::Span<const uint8_t> span) {
+  // Copy the data from the span.
+  uint8_t *buf = flatbuffers::DefaultAllocator().allocate(span.size());
+  memcpy(buf, span.data(), span.size());
+  // Then give it to a DetachedBuffer to manage.
+  return flatbuffers::DetachedBuffer(nullptr, false, buf, span.size(), buf,
+                                     span.size());
+}
+
 }  // namespace aos
 
 #endif  // AOS_FLATBUFFERS_H_
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index d5add53..fed9a8a 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -295,10 +295,10 @@
 
 cc_binary(
     name = "print_lockless_queue_memory",
-    visibility = ["//visibility:public"],
     srcs = [
         "print_lockless_queue_memory.cc",
     ],
+    visibility = ["//visibility:public"],
     deps = [
         ":lockless_queue",
     ],
diff --git a/aos/json_to_flatbuffer.h b/aos/json_to_flatbuffer.h
index 381621e..c997f03 100644
--- a/aos/json_to_flatbuffer.h
+++ b/aos/json_to_flatbuffer.h
@@ -122,8 +122,12 @@
 template <typename T>
 inline FlatbufferVector<T> FileToFlatbuffer(const std::string_view path) {
   std::ifstream instream(std::string(path), std::ios::in | std::ios::binary);
-  std::vector<uint8_t> data((std::istreambuf_iterator<char>(instream)),
-                            std::istreambuf_iterator<char>());
+  ResizeableBuffer data;
+  std::istreambuf_iterator<char> it(instream);
+  while (it != std::istreambuf_iterator<char>()) {
+    data.push_back(*it);
+    ++it;
+  }
   return FlatbufferVector<T>(std::move(data));
 }
 
diff --git a/aos/realtime.cc b/aos/realtime.cc
index a0af97a..a41eb0d 100644
--- a/aos/realtime.cc
+++ b/aos/realtime.cc
@@ -74,7 +74,7 @@
   WriteCoreDumps();
   PCHECK(mlockall(MCL_CURRENT | MCL_FUTURE) == 0);
 
-#if !__has_feature(address_sanitizer)
+#if !__has_feature(address_sanitizer) && !__has_feature(memory_sanitizer)
   // Don't give freed memory back to the OS.
   CHECK_EQ(1, mallopt(M_TRIM_THRESHOLD, -1));
   // Don't use mmap for large malloc chunks.
diff --git a/debian/BUILD b/debian/BUILD
index f498dff..6f955ae 100644
--- a/debian/BUILD
+++ b/debian/BUILD
@@ -65,11 +65,18 @@
     gstreamer_armhf_debs = "files",
 )
 load(
-    "//debian:m4.bzl",
+    ":m4.bzl",
     m4_debs = "files",
 )
-
-load("//debian:packages.bzl", "download_packages", "generate_deb_tarball")
+load(
+    ":lzma_amd64.bzl",
+    lzma_amd64_debs = "files",
+)
+load(
+    ":lzma_arm64.bzl",
+    lzma_arm64_debs = "files",
+)
+load(":packages.bzl", "download_packages", "generate_deb_tarball")
 
 filegroup(
     name = "matplotlib_patches",
@@ -341,6 +348,23 @@
     files = m4_debs,
 )
 
+download_packages(
+    name = "download_lzma",
+    packages = [
+        "liblzma-dev",
+    ],
+)
+
+generate_deb_tarball(
+    name = "lzma_amd64",
+    files = lzma_amd64_debs,
+)
+
+generate_deb_tarball(
+    name = "lzma_arm64",
+    files = lzma_arm64_debs,
+)
+
 exports_files([
     "ssh_wrapper.sh",
 ])
diff --git a/debian/lzma_amd64.bzl b/debian/lzma_amd64.bzl
new file mode 100644
index 0000000..1dbfa38
--- /dev/null
+++ b/debian/lzma_amd64.bzl
@@ -0,0 +1,3 @@
+files = {
+    "liblzma-dev_5.2.4-1_amd64.deb": "8373145ed28563aff33b116c434399347ab1fa77ba672d8d94151dd8f68c8ece",
+}
diff --git a/debian/lzma_arm64.bzl b/debian/lzma_arm64.bzl
new file mode 100644
index 0000000..a38d900
--- /dev/null
+++ b/debian/lzma_arm64.bzl
@@ -0,0 +1,3 @@
+files = {
+    "liblzma-dev_5.2.2-1.3_arm64.deb": "eb730c11f65c9bd10ae6c39b5a461e16294c21a423c9a8bc4011390798387e82",
+}
diff --git a/debian/matplotlib.BUILD b/debian/matplotlib.BUILD
index e7e9b6f..881521c 100644
--- a/debian/matplotlib.BUILD
+++ b/debian/matplotlib.BUILD
@@ -1,6 +1,9 @@
 load("@//debian:matplotlib.bzl", "build_matplotlib")
 
-build_matplotlib("3", tkinter_py_version="3.5")
+build_matplotlib(
+    "3",
+    tkinter_py_version = "3.5",
+)
 
 build_matplotlib(
     "2.7",
diff --git a/frc971/codelab/README.md b/frc971/codelab/README.md
index e274f24..2fdf08c 100644
--- a/frc971/codelab/README.md
+++ b/frc971/codelab/README.md
@@ -1,5 +1,12 @@
 # FRC971 "Codelab"
 
+## Flatbuffers tutorial
+
+Flatbuffers has pretty good [tutorials](https://google.github.io/flatbuffers/flatbuffers_guide_tutorial.html) for how to use them.  We recommend
+going through them for more information.
+
+## Basic control loop
+
 Welcome! This folder contains a "codelab" where you can go through the process
 of fleshing out a basic control-loop using the same infrastructure as we do for
 the control loops that normally run on our robots. Currently, this just consists
diff --git a/frc971/codelab/basic.cc b/frc971/codelab/basic.cc
index 36e64c5..6144fed 100644
--- a/frc971/codelab/basic.cc
+++ b/frc971/codelab/basic.cc
@@ -30,7 +30,8 @@
     Status::Builder builder = status->MakeBuilder<Status>();
     // TODO(you): Fill out the Status message! In order to populate fields, use
     // the add_field_name() method on the builder, just like we do with the
-    // Output message above.
+    // Output message above.  Look at the definition of Status in
+    // basic_status.fbs and populate the field according to the comments there.
 
     status->Send(builder.Finish());
   }
diff --git a/frc971/codelab/basic.h b/frc971/codelab/basic.h
index 944048f..b75a87f 100644
--- a/frc971/codelab/basic.h
+++ b/frc971/codelab/basic.h
@@ -15,6 +15,9 @@
 // This codelab helps build basic knowledge of how to use 971 control loop
 // primatives.
 //
+// For flatbuffers background, we recommend checking out
+// https://google.github.io/flatbuffers/flatbuffers_guide_tutorial.html
+//
 // The meat of the task is to make the tests pass.
 //
 // Run the tests with:
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index 1f362f3..faed4af 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -64,8 +64,8 @@
     if (!FLAGS_output_file.empty()) {
       unlink(FLAGS_output_file.c_str());
       logger_event_loop_ = MakeEventLoop("logger");
-      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
-                                                      logger_event_loop_.get());
+      logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+      logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
     }
 
     // Run for enough time to allow the gyro/imu zeroing code to run.
@@ -125,11 +125,9 @@
     // TODO(james): Handle Euler angle singularities...
     const double down_estimator_yaw =
         CHECK_NOTNULL(drivetrain_status_fetcher_->down_estimator())->yaw();
-    const double localizer_yaw =
-        drivetrain_status_fetcher_->theta();
-    EXPECT_LT(
-        std::abs(aos::math::DiffAngle(down_estimator_yaw, localizer_yaw)),
-        1e-2);
+    const double localizer_yaw = drivetrain_status_fetcher_->theta();
+    EXPECT_LT(std::abs(aos::math::DiffAngle(down_estimator_yaw, localizer_yaw)),
+              1e-2);
     const double true_yaw = (drivetrain_plant_.GetRightPosition() -
                              drivetrain_plant_.GetLeftPosition()) /
                             (dt_config_.robot_radius * 2.0);
@@ -558,9 +556,11 @@
     auto builder = drivetrain_goal_sender_.MakeBuilder();
 
     flatbuffers::Offset<flatbuffers::Vector<float>> spline_x_offset =
-        builder.fbb()->CreateVector<float>({0.0, -0.25, -0.5, -0.5, -0.75, -1.0});
+        builder.fbb()->CreateVector<float>(
+            {0.0, -0.25, -0.5, -0.5, -0.75, -1.0});
     flatbuffers::Offset<flatbuffers::Vector<float>> spline_y_offset =
-        builder.fbb()->CreateVector<float>({0.0, 0.0, -0.25, -0.75, -1.0, -1.0});
+        builder.fbb()->CreateVector<float>(
+            {0.0, 0.0, -0.25, -0.75, -1.0, -1.0});
 
     MultiSpline::Builder multispline_builder =
         builder.MakeBuilder<MultiSpline>();
diff --git a/frc971/wpilib/ADIS16470.cc b/frc971/wpilib/ADIS16470.cc
index fa61d9b..ec2ee9e 100644
--- a/frc971/wpilib/ADIS16470.cc
+++ b/frc971/wpilib/ADIS16470.cc
@@ -296,8 +296,9 @@
                   (0 << 2) /* internal clock mode */ |
                   (0 << 1) /* sync polarity, doesn't matter */ |
                   (1 << 0) /* data ready is active high */);
+          // Rate of the output will be 2000 / (DEC_RATE + 1) Hz.
           WriteRegister(registers::DEC_RATE,
-                        0 /* no internal decimation (averaging) */);
+                        1 /* Average every pair of values. */);
 
           // Start a sensor self test.
           WriteRegister(registers::GLOB_CMD, 1 << 2);
diff --git a/third_party/BUILD b/third_party/BUILD
index bcba978..2f22b0e 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -113,3 +113,15 @@
         "roborio": ["@webrtc_rio//:webrtc"],
     }),
 )
+
+# TODO(Brian): Properly restrict this to specific platforms and remove the
+# default deps once we have support for that which interacts with select properly.
+cc_library(
+    name = "lzma",
+    visibility = ["//visibility:public"],
+    deps = select({
+        "//tools:cpu_k8": ["@lzma_amd64//:lib"],
+        "//tools:cpu_arm64": ["@lzma_arm64//:lib"],
+        "//conditions:default": [],
+    }),
+)
diff --git a/tools/BUILD b/tools/BUILD
index ac76ee9..d77abf8 100644
--- a/tools/BUILD
+++ b/tools/BUILD
@@ -49,6 +49,11 @@
 )
 
 config_setting(
+    name = "cpu_arm64",
+    values = {"cpu": "arm64"},
+)
+
+config_setting(
     name = "has_asan",
     values = {"define": "have_asan=true"},
 )
@@ -75,11 +80,14 @@
 
 environment(name = "cortex-m4f-k22")
 
+environment(name = "arm64")
+
 environment_group(
     name = "cpus",
     defaults = [
         ":k8",
         ":roborio",
+        ":arm64",
         ":armhf-debian",
     ],
     environments = [
@@ -89,5 +97,6 @@
         ":armhf-debian",
         ":cortex-m4f",
         ":cortex-m4f-k22",
+        ":arm64",
     ],
 )
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
index f7b69e4..98f3eeb 100644
--- a/y2019/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -42,7 +42,8 @@
   log_writer_event_loop->SkipTimingReport();
   log_writer_event_loop->SkipAosLog();
   CHECK(nullptr == log_writer_event_loop->node());
-  aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
+  aos::logger::Logger writer(log_writer_event_loop.get());
+  writer.StartLoggingLocalNamerOnRun(FLAGS_output_file);
 
   std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
       reader.event_loop_factory()->MakeEventLoop("drivetrain");
diff --git a/y2020/config_gen.bzl b/y2020/config_gen.bzl
deleted file mode 100644
index aed1302..0000000
--- a/y2020/config_gen.bzl
+++ /dev/null
@@ -1,13 +0,0 @@
-def generate_pi_config(number):
-  native.genrule(
-      name = "generate_pi_config_%d" % (number),
-      srcs = ["y2020_pi_template.json"],
-      outs = ["y2020_pi%d.json" % (number)],
-      cmd = " ".join([
-          "$(location //y2020:generate_pi_config)",
-          "$(location y2020_pi_template.json)",
-          "'{\"NUM\": %d}'" % (number),
-          "$(OUTS)",
-      ]),
-      tools = ["//y2020:generate_pi_config"],
-  )
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay.cc b/y2020/control_loops/drivetrain/drivetrain_replay.cc
index 0dd5a1a..0ed57c5 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay.cc
@@ -49,7 +49,8 @@
   std::unique_ptr<aos::EventLoop> log_writer_event_loop =
       reader.event_loop_factory()->MakeEventLoop("log_writer", node);
   log_writer_event_loop->SkipTimingReport();
-  aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
+  aos::logger::Logger writer(log_writer_event_loop.get());
+  writer.StartLoggingLocalNamerOnRun(FLAGS_output_file);
 
   std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
       reader.event_loop_factory()->MakeEventLoop("drivetrain", node);
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index ae9f887..dbb4f2f 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -26,10 +26,10 @@
 using frc971::control_loops::drivetrain::DrivetrainConfig;
 using frc971::control_loops::drivetrain::Goal;
 using frc971::control_loops::drivetrain::LocalizerControl;
+using frc971::vision::sift::CameraCalibrationT;
+using frc971::vision::sift::CameraPoseT;
 using frc971::vision::sift::ImageMatchResult;
 using frc971::vision::sift::ImageMatchResultT;
-using frc971::vision::sift::CameraPoseT;
-using frc971::vision::sift::CameraCalibrationT;
 using frc971::vision::sift::TransformationMatrixT;
 
 namespace {
@@ -90,9 +90,9 @@
 }  // namespace
 
 namespace chrono = std::chrono;
-using frc971::control_loops::drivetrain::testing::DrivetrainSimulation;
-using frc971::control_loops::drivetrain::DrivetrainLoop;
 using aos::monotonic_clock;
+using frc971::control_loops::drivetrain::DrivetrainLoop;
+using frc971::control_loops::drivetrain::testing::DrivetrainSimulation;
 
 class LocalizedDrivetrainTest : public aos::testing::ControlLoopTest {
  protected:
@@ -136,8 +136,8 @@
 
     if (!FLAGS_output_file.empty()) {
       logger_event_loop_ = MakeEventLoop("logger", roborio_);
-      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
-                                                      logger_event_loop_.get());
+      logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+      logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
     }
 
     test_event_loop_->MakeWatcher(
@@ -152,7 +152,7 @@
               last_frame_ = monotonic_now();
             }
           }
-          });
+        });
 
     test_event_loop_->AddPhasedLoop(
         [this](int) {
@@ -337,8 +337,7 @@
   aos::Sender<aos::message_bridge::ServerStatistics> server_statistics_sender_;
 
   std::unique_ptr<aos::EventLoop> drivetrain_event_loop_;
-  const frc971::control_loops::drivetrain::DrivetrainConfig<double>
-      dt_config_;
+  const frc971::control_loops::drivetrain::DrivetrainConfig<double> dt_config_;
 
   std::unique_ptr<aos::EventLoop> pi1_event_loop_;
   aos::Sender<ImageMatchResult> camera_sender_;
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index 8a0589c..59296d9 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -440,8 +440,8 @@
     if (!FLAGS_output_file.empty()) {
       unlink(FLAGS_output_file.c_str());
       logger_event_loop_ = MakeEventLoop("logger", roborio_);
-      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
-                                                      logger_event_loop_.get());
+      logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+      logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
     }
   }
 
diff --git a/y2020/y2020.json b/y2020/y2020.json
index 808a796..59f3fc0 100644
--- a/y2020/y2020.json
+++ b/y2020/y2020.json
@@ -1,5 +1,5 @@
 {
-  "channel_storage_duration": 5000000000,
+  "channel_storage_duration": 2000000000,
   "maps": [
     {
       "match": {