Merge "Use key field for looking up enum values in flatbuffer reflection."
diff --git a/README.md b/README.md
index af5ae6e..395a802 100644
--- a/README.md
+++ b/README.md
@@ -2,36 +2,66 @@
This is FRC Team 971's main code repository. There are `README*` files throughout the source tree documenting specifics for their respective folders.
## Access to the code
-The main central location for our code is our [Gerrit](https://www.gerritcodereview.com/) server at https://software.frc971.org/gerrit. To get a copy of the code on your computer to work with, follow these steps:
- 1. Contact Michael Schuh to get an SVN account.
- 2. Go to Gerrit and create an account.
- 3. Contact Brian Silverman with your SVN username to get access to the code in Gerrit.
- 4. Go to [the 971-Robot-Code project in Gerrit](https://software.frc971.org/gerrit/#/admin/projects/971-Robot-Code) and run the command.
- Running the `clone with commit-msg hook` command will save you trouble later.
+The main central location for our code is our [Gerrit](https://www.gerritcodereview.com/) server at https://software.frc971.org/gerrit. To download a copy of the 971 code on your computer, follow these steps:
+ 1. Fill out an SVN account request form to get an SVN account.
+ 1. Mention that you are part of the software team and need Gerrit access
+ 2. It is recommended to use your firstname in all lowercase for your username
+ 2. Go to our [Gerrit server](https://software.frc971.org/gerrit) and create an account.
+ 3. Contact Brian Silverman or Stephan Massalt with your SVN username to get access to the code in Gerrit.
+ 4. When you log into Gerrit the first time, please [add your Email Address](http://software.frc971.org/gerrit/settings/#EmailAddresses)
+ 5. Add your SSH key to Gerrit in order to be able to check out code
+ 1. If you don't already have an ssh key, you can create one using `ssh-keygen`. This will create a public/private key pair-- the default location for your public key will be `~/.ssh/id_rsa.pub`
+ 2. Log into Gerrit and go to `Settings->SSH Keys` and paste your public key into the `New SSH Key` text box and clicking on `ADD NEW SSH KEY`
+ 6. Install `git`: `sudo apt install git`
+ 7. Go to [the 971-Robot-Code project in Gerrit](https://software.frc971.org/gerrit/#/admin/projects/971-Robot-Code) and run the command to Download the 971-Robot-Code repository.
+ 1. We recommend downloading the code via SSH using the `clone with commit-msg hook` command
+ 2. NOTE: Running with the option to `clone with commit-msg hook` will save you trouble later.
-To learn more about git, see git(1) (`man git` or [git(1)](http://manpages.debian.net/cgi-bin/man.cgi?query=git>) (especially the NOTES section).
+To learn more about git, open a terminal and run `man git`, or see [git(1)](https://manpages.debian.org/buster/git-man/git.1.en.html) (especially the NOTES section).
-## Code reviews
-We want all code to at least have a second person look over it before it gets merged into the `master` branch. Gerrit has [extensive documentation on starting reviews](https://software.frc971.org/gerrit/Documentation/user-upload.html). TL;DR: `git push origin HEAD:refs/for/master` and then click on the link to add reviewers.
-If you just upload a change without adding any reviewers, it might sit around for a long time before anybody else notices it.
-[git-review](http://manpages.debian.org/cgi-bin/man.cgi?query=git-review) can make the upload process simpler.
+
+## Prerequisites
+The instructions below assume you have the following:
+ 1. A host computer with an appropriate OS or VM to compile the 971 code using Bazel
+ 1. The currently supported operating system for building the code is amd64 Debian Buster.
+ 2. It is likely to work on any `x86\_64 GNU/Linux` system (e.g., Ubuntu 20.04), but that's not at all well-tested.
+ 2. Your favorite text editor installed, e.g., `vim`, `emacs`
+ 3. Access to the 971-Robot-Code repository and have downloaded the source code
+ 4. The ability to `ssh` into target CPU's like the roborio and Raspberry Pi
+
## Building the code
-The currently supported operating system for building the code is amd64 Debian Buster. It is likely to work on any x86\_64 GNU/Linux system, but that's not at all well-tested.
+We use [Bazel](http://bazel.io) to build the code. Bazel has [extensive docs](https://docs.bazel.build/versions/master/build-ref.html), including a nice [build encyclopedia reference](https://docs.bazel.build/versions/master/be/overview.html), and does a great job with fast, correct incremental rebuilds.
-We use [Bazel](http://bazel.io) to build the code. Bazel has [extensive](https://docs.bazel.build/versions/master/build-ref.html) [docs](https://docs.bazel.build/versions/master/be/overview.html) and does a nice job with fast, correct increment rebuilds.
+There are a couple options for building code that are given here-- setting up either your own computer, or using the frc971 build server.
### Steps to set up a computer to build the code:
- 0. Install any Bazel version. See [here](https://docs.bazel.build/versions/master/install-ubuntu.html)
- 1. Install the required packages:
-```console
-apt-get update
+ 1. Install any Bazel version.
+ 1. Check to see if the version of Linux you're running has an apt package for Bazel: `apt-cache search bazel` or just try `sudo apt install bazel`
+ 2. More likely, you'll need to install manually-- see [here](https://docs.bazel.build/versions/master/install-ubuntu.html). We recommend using `apt-key` instead of `gnupg` in setting up the key:
+ 1. Step 1: Add Bazel distribution URI as a package source
+ ```
+ sudo apt install curl apt-key
+ curl -fsSL https://bazel.build/bazel-release.pub.gpg | apt-key add -
+ echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | sudo tee /etc/apt/sources.list.d/bazel.list
+ ```
+ 2. Step 2: Install Bazel
+ ```
+ sudo apt update && sudo apt install bazel
+ ```
+
+ 2. Install the required packages:
+```sh
+sudo apt-get update
# TODO(james): Ideally, we shouldn't need to be installing libtinfo5...
-apt-get install bazel python libtinfo5
+sudo apt-get install python libtinfo5
```
- 2. Allow Bazel's sandboxing to work:
- Follow the direction in `doc/frc971.conf`.
+ 3. Change settings to allow Bazel's sandboxing to work-- follow the directions in `doc/frc971.conf`. For example, the commands to do this would be:
+ 1. `sudo cp doc/frc971.conf /etc/sysctl.d/`
+ 2. `sudo sysctl --system`
+
### Setting up access to a workspace on the build server
+In order to use the build server, you'll first need to get ssh access set up. (NOTE: you don't need to do any of the other setup steps done for your own computer, since things like `bazel`, `python`, etc. are already installed on the build server)
1. Use ssh-keygen to create a public and private key.
```console
# IMPORTANT These are the windows instructions.
@@ -51,106 +81,93 @@
```console
ssh [user]@build.frc971.org -p 2222 -i ./ssh/id_971_rsa -L 971:127.0.0.1:3389
```
- 5. So at this point you run the Remote Desktop app in windows.
-Once you get there, all you need to do is put `127.0.0.1:9971` for the computer name, and use your SVN usrname.
-Once you get connected accept the server certificate, and then enter your password that you gave Stephan. (Its either something unique or your SVN pwd)
-Then select the Default panel config.
-You can exit the Remote Desktop if you are good w/ the raw cmd line interface.
-And for future logins all you have to do is tunnel and then login using the app.
-Now if you have a graphical application that you are developing (ie spline UI), then you have to run the build command in the Remote Desktop application.
-
-# ONE VERY IMPORTANT LAST STEP
-In order for you to be able to commit, you need to run this command, replacing <YOUR EMAIL> w/ your email that is in gerrit.
+ 5. So at this point you run the Remote Desktop app in Windows. Once
+you get there, all you need to do is put `127.0.0.1:9971` for the
+computer name, and use your SVN username. Once you get connected,
+accept the server certificate and then enter your password that you
+gave Stephan. (It's either something unique or your SVN pwd) Then
+select the Default panel config. You can exit the Remote Desktop if
+you are good w/ the raw cmd line interface. And for future logins all
+you have to do is tunnel and then login using the app. Now if you
+have a graphical application that you are developing (e.g., spline
+UI), then you have to run the build command in the Remote Desktop
+application.
+ 6. Very important: In order for you to be able to commit, you need
+ to configure your email address in `git`. To do this, run the
+ following command, replacing `<YOUR EMAIL>` with the email that you are
+ using for Gerrit:
```console
git config --global user.email "<YOUR EMAIL>"
```
-One thing that also needs to be said is that you DO NOT need to do any of the installation steps, or the step w/ `frc971.conf`.
-If there are any questions, use #coding so that other people who may reach the same issue can refer back to that.
-Some people that you can @ would be Het <--Insert your name here if you are pingable-->
+If there are any questions, post to the #coding Slack channel so that other people who may reach the same issue can refer back to that.
-### Some useful Bazel commands:
- * Build and test everything (on the host system):
-```console
+
+### Bazel commands for building, testing, and deploying the code:
+ * Build and test everything (on the host system, for the roborio target-- note, this may take a while):
+```
bazel test //...
-bazel build --cpu=roborio //...
+bazel build --cpu=roborio -c opt //...
```
* Build the code for a specific robot:
```console
-bazel build --cpu=roborio -c opt //y2019/...
+# For the roborio:
+bazel build --cpu=roborio -c opt //y2020/...
```
- * Download code to a robot:
-```console
-# First add a host entry in your ~/.ssh/known_hosts file for the roboRIO.
-# Do this by ssh'ing into the machine. If you problems doing this, see
-# the notes below for more information on how to connect to the roboRIO.
-ssh admin@roboRIO-971-frc.local
-# If you see an error like:
-# subprocess.CalledProcessError: Command '['rsync' ...
-# ERROR: Non-zero return code '1' from command: Process exited with status 1
-# The above "ssh admin@roboRIO-971-frc.local" step has not been sucessfully completed.
-# If the roboRIO has been configued to use a static IP address like 10.9.71.2,
-# set the laptop to have an IP address on the 10.9.71.x subnet with a netmask
-# of 255.0.0.0. The ".x" is different than the .2 for the roboRIO or any other
-# device on the network. The driver station uses .5 or .6 so avoid those.
-# The radio uses .1 or .50 so avoid those too. If you are at the school,
-# disconnect from the student wireless network or try setting your netmask to
-# 255.255.255.0 if you want to be on both networks. The student wireless
-# network is on a 10.?.?.? subnet which can cause problems with connecting to
-# the robot.
-bazel run --cpu=roborio --compilation_mode=opt //y2018:download_stripped -- admin@roboRIO-971-frc.local
-# If this does not work, try
-bazel run --cpu=roborio --compilation_mode=opt //y2018:download_stripped -- admin@10.9.71.2
-# If this does not work, it probably means that the robot and laptop are on
-# different subnets. They need to be on the same subnet for the laptop to
-# connect to the robot. Connecting can be confirmed by using ping.
-ping roboRIO-971-frc.local
-# or
-ping 10.9.71.2
-# If this does not work, perhaps the roboRIO has not been configured to have
-# a static IP address. Use a USB cable to connect from a Windows laptop to
-# the roboRIO and use Internet Explorer (IE) to configure the roboRIO
-# to have a static IP address of 10.9.71.2. Inside IE, browse to
-# http://roborio-971-frc.local or http://172.22.11.2. Click on the "Ethernet"
-# icon on the left, select "Static" for the "Configure IPv4 Address" option.
-# Set the "IPv4 Address" to 10.9.71.2. Set the "Subnet Mask" to "255.0.0.0".
-# Finally click on "Save" at the bottom of the screen. If you have trouble
-# using an Ethernet cable, try using a USB cable. USB cables are much
-# more reliable for connecting than using a Ethernet cabe. USB cables work
-# for connecting to the robot on Windows and Linux computers.
-# Another option is to configure the laptop to have a link-local connection
-# by going using the "Network Settings" GUI. The laptop will then be on the
-# same subnet in the address range of 169.254.0.0 to 169.254.255.255.
-# James thinks this will only work over Ethernet (i.e., not USB; he is not
-# sure what will happen if you attempt this over USB), and if the robot
-# does *not* have a static IP address set and there is no DHCP server
-# assigning an IP address to the roboRIO. James says to also note that this
-# implies that the roboRIO will also have a 169.254.*.* IP addresss, and
-# that the only simple way to figure it out is to use mDNS.
-#
```
- * To configure a freshly imaged roboRIO:
+# For the raspberry pi:
+bazel build --cpu=armhf-debian -c opt //y2020/...
+```
+
+ * Configuring a roborio: Freshly imaged roboRIOs need to be configured to run the 971 code
+at startup. This is done by using the setup_roborio.sh script.
```console
-# Freshly imaged roboRIOs need to be configured to run the 971 code
-# at startup. This is done by using the setup_roborio.sh script.
bazel run -c opt //frc971/config:setup_roborio -- roboRIO-XXX-frc.local
```
-### Some other useful packages
+ * Download code to a robot:
+```console
+# For the roborio
+bazel run --cpu=roborio -c opt //y2020:download_stripped -- roboRIO-971-frc.local
+```
+This assumes the roborio is reachable at `roboRIO-971-frc.local`. If that does not work, you can try with a static IP address like `10.9.71.2` (see troubleshooting below)
+```console
+# For the raspberry pi's
+bazel run --cpu=armhf-debian -c opt //y2020:pi_download_stripped -- 10.9.71.101
+```
+NOTE:
+ 1. The raspberry pi's require that you have your ssh key installed on them in order to copy code over
+ 2. They are configured to use the IP addresses 10.X.71.Y, where X is 9, 79, 89, or 99 depending on the robot number (971, 7971, 8971, or 9971, respectively), and Y is 101, 102, etc for pi #1, #2, etc.
+
+ * Downloading specific targets to the robot
+ 1. Generally if you want to update what's running on the robot, you can use the `download_stripped` (or `pi_download_stripped`) targets. These will rsync only the changed files, and so are pretty efficient.
+ 2. If you have a need to compile a specific module, you can build stripped versions of the individual modules by adding "_stripped" to the module name. For example, to build the calibration code (`//y2020/vision:calibration`) for the pi (`armhf-debian`), run:
+ ```console
+ bazel run --cpu=armhf-debian -c opt //y2020/vision:calibration_stripped
+ ```
+ You will then need to manually copy the resulting file over to the robot.
+
+## Code reviews
+We want all code to at least have a second person look it over before it gets merged into the `master` branch. Gerrit has [extensive documentation on starting reviews](https://software.frc971.org/gerrit/Documentation/user-upload.html). There is also a good [intro User Guide](https://software.frc971.org/gerrit/Documentation/intro-user.html) and an [intro to working with Gerrit](https://gerrit-review.googlesource.com/Documentation/intro-gerrit-walkthrough.html) and [Gerrit workflows](https://docs.google.com/presentation/d/1C73UgQdzZDw0gzpaEqIC6SPujZJhqamyqO1XOHjH-uk)
+
+TL;DR: Make and commit your changes, do `git push origin HEAD:refs/for/master`, and then click on the provided link to add reviewers. If you just upload a change without adding any reviewers, it might sit around for a long time before anybody else notices it.
+
+[git-review](http://manpages.debian.org/cgi-bin/man.cgi?query=git-review) can make the upload process simpler.
+
+
+
+### Some other useful packages <TODO: Need to review these>
These aren't strictly necessary to build the code, but Michael found the
additional tools provided by these packages useful to install when working with
the code on May 13, 2018.
+
```console
# Get some useful packages including git and subversion.
apt-get update
- apt-get install git subversion ruby python vim-gtk3 subversion-tools
- apt-get install vim-doc git-doc git-gui git-svn exim4-doc-html ruby
- apt-get install python python-scipy python-matplotlib libpython-dev
- apt-get install bazel clang-format-3.8 clang-3.8 openjdk-8-jdk
- apt-get install gfortran libblas-dev liblapack-dev avahi-daemon
-# fix a key problem with llvm.
- wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key|sudo apt-key add -
- apt-get update
+ apt-get install git git-gui vim-gtk3
+ apt-get install vim-doc git-doc exim4-doc-html yapf
+ apt-get install bazel clang-format
+ apt-get install python avahi-daemon
# Install apt-file so that packages can be searched
apt-get install apt-file
apt-file update
@@ -164,6 +181,30 @@
# understanding the revision history of the repository and viewing
# log messages and changes.
apt-get install gitg
+# Also may want to install `buildifier` for formatting BUILD files
+```
+
+### Creating ssh aliases
+
+It is also handy to alias logins to the raspberry pi's by adding lines like this to your ~/.ssh/config file:
+```console
+Host pi-7971-2
+ User pi
+ ForwardAgent yes
+ HostName 10.79.71.102
+ StrictHostKeyChecking no
+```
+or, for the roborio:
+```
+Host roborio-971
+ User admin
+ HostName 10.9.71.2
+ StrictHostKeyChecking no
+```
+This allows you to use the alias to `ping`, `ssh`, or run commands like:
+```
+# Download code to robot #7971's raspberry pi #2
+bazel run --cpu=armhf-debian -c opt //y2020:download_stripped -- pi-7971-2
```
### Roborio Kernel Traces
@@ -183,3 +224,63 @@
```
You can then scp the `trace.dat` file to your computer and run `kernelshark
trace.dat` (may require installing the `kernelshark` apt package).
+
+
+### Notes on troubleshooting network setup
+If the roboRIO has been configued to use a static IP address like
+10.9.71.2, set the laptop to have an IP address on the 10.9.71.x
+subnet with a netmask of 255.0.0.0. The ".x" is different than the
+.2 for the roboRIO or any other device on the network. The driver
+station uses .5 or .6 so avoid those. The radio uses .1 or .50 so
+avoid those too. A good choice might be in the 90-99 range. If you
+are at the school, disconnect from the student wireless network or
+try setting your netmask to 255.255.255.0 if you want to be on both
+networks. The student wireless network is on a 10.?.?.? subnet
+which can cause problems with connecting to the robot.
+
+If running Bazel on the `download_stripped` target does not work for
+the IP address you're using for the roborio or the raspberry pi, it
+probably means that the robot and laptop are on different subnets.
+They need to be on the same subnet for the laptop to connect to the
+robot. Connecting can be confirmed by using ping.
+```
+ping roboRIO-971-frc.local
+```
+or
+```
+ping 10.9.71.2
+```
+
+If this does not work, perhaps the roboRIO has not been configured to
+have a static IP address. Use a USB cable to connect from a Windows
+laptop to the roboRIO and use a web browser (Chrome is preferred,
+IE/Edge is not-- see [this technical
+note](https://docs.wpilib.org/en/stable/docs/software/roborio-info/roborio-web-dashboard.html))
+to configure the roboRIO to have a static IP address of 10.9.71.2.
+Browse to http://roborio-971-frc.local or http://172.22.11.2. Click
+on the "Ethernet" icon on the left, select "Static" for the "Configure
+IPv4 Address" option. Set the "IPv4 Address" to 10.9.71.2. Set the
+"Subnet Mask" to "255.0.0.0". Finally click on "Save" at the bottom
+of the screen. If you have trouble using an Ethernet cable, try using
+a USB cable (USB A->B).
+
+Another option is to configure the laptop to have a link-local connection
+by using the "Network Settings" GUI. The laptop will then be on the
+same subnet in the address range of 169.254.0.0 to 169.254.255.255.
+James thinks this will only work over Ethernet (i.e., not USB; he is not
+sure what will happen if you attempt this over USB), and if the robot
+does *not* have a static IP address set and there is no DHCP server
+assigning an IP address to the roboRIO. James says to also note that this
+implies that the roboRIO will also have a 169.254.*.* IP addresss, and
+that the only simple way to figure it out is to use mDNS.
+
+
+### Other resources
+ 1. Intro to [AOS](./aos/README.md), our robot Operating System
+ 2. Introductory example: [ping/pong](./aos/events/README.md)
+ 3. Example of [logging](./aos/events/README.md)
+
+TODOs:
+ 1. Add more on networking setup and troubleshooting
+ 2. Move Roborio Kernel Traces out of here, maybe into documentation/
+ 3. Currently requires `apt install libsigsegv2`, but this should be temporary
diff --git a/WORKSPACE b/WORKSPACE
index eb7ed28..9ddecff 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -70,6 +70,14 @@
"//debian:m4.bzl",
m4_debs = "files",
)
+load(
+ "//debian:lzma_amd64.bzl",
+ lzma_amd64_debs = "files",
+)
+load(
+ "//debian:lzma_arm64.bzl",
+ lzma_arm64_debs = "files",
+)
load("//debian:packages.bzl", "generate_repositories_for_debs")
generate_repositories_for_debs(python_debs)
@@ -106,6 +114,10 @@
generate_repositories_for_debs(m4_debs)
+generate_repositories_for_debs(lzma_amd64_debs)
+
+generate_repositories_for_debs(lzma_arm64_debs)
+
http_archive(
name = "python_repo",
build_file = "@//debian:python.BUILD",
@@ -753,3 +765,45 @@
sha256 = "ee8dfe664ac8c1d066bab64f71bd076a021875581b3cc47dac4a14a475f50b15",
url = "http://www.frc971.org/Build-Dependencies/m4.tar.gz",
)
+
+# //debian:lzma_amd64
+http_archive(
+ name = "lzma_amd64",
+ build_file_content = """
+cc_library(
+ name = "lib",
+ srcs = [
+ "usr/lib/x86_64-linux-gnu/liblzma.a",
+ ],
+ hdrs = glob([
+ "usr/include/lzma/*.h",
+ "usr/include/*.h",
+ ]),
+ strip_include_prefix = "usr/include",
+ visibility = ["//visibility:public"],
+)
+""",
+ sha256 = "e0ccaa7f793e44638e9f89570e00f146073a98a5928e0b547146c8184488bb19",
+ urls = ["http://www.frc971.org/Build-Dependencies/lzma_amd64.tar.gz"],
+)
+
+# //debian:lzma_arm64
+http_archive(
+ name = "lzma_arm64",
+ build_file_content = """
+cc_library(
+ name = "lib",
+ srcs = [
+ "usr/lib/aarch64-linux-gnu/liblzma.a",
+ ],
+ hdrs = glob([
+ "usr/include/lzma/*.h",
+ "usr/include/*.h",
+ ]),
+ strip_include_prefix = "usr/include",
+ visibility = ["//visibility:public"],
+)
+""",
+ sha256 = "18db35669ee49a5f8324a344071dd4ab553e716f385fb75747b909bd1de959f5",
+ urls = ["http://www.frc971.org/Build-Dependencies/lzma_arm64.tar.gz"],
+)
diff --git a/aos/BUILD b/aos/BUILD
index 286882f..a46911e 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -453,6 +453,7 @@
visibility = ["//visibility:public"],
deps = [
"//aos:macros",
+ "//aos/containers:resizeable_buffer",
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/strings",
diff --git a/aos/README.md b/aos/README.md
new file mode 100644
index 0000000..cbf8a56
--- /dev/null
+++ b/aos/README.md
@@ -0,0 +1,35 @@
+See [Top level README](../README.md) for overall information
+
+### `aos_dump`
+
+For examples on viewing events and logging, see the [README.md](./events/README.md) file in `aos/events`
+
+
+### NOTES
+
+Some functions need to be in separate translation units in order for them to be guaranteed to work. As the C standard says,
+
+> Alternatively, an implementation might perform various optimizations
+> within each translation unit, such that the actual semantics would
+> agree with the abstract semantics only when making function calls
+> across translation unit boundaries. In such an implementation, at the
+> time of each function entry and function return where the calling
+> function and the called function are in different translation units,
+> the values of all externally linked objects and of all objects
+> accessible via pointers therein would agree with the abstract
+> semantics. Furthermore, at the time of each such function entry the
+> values of the parameters of the called function and of all objects
+> accessible via pointers therein would agree with the abstract
+> semantics. In this type of implementation, objects referred to by
+> interrupt service routines activated by the signal function would
+> require explicit specification of volatile storage, as well as other
+> implementation-defined restrictions.
+
+### FILES <TODO: I believe these are no longer correct>
+- `config/` has some configuration files
+ - `aos.conf` (currently in `aos` folder) has directions for setting up resource limits so you can run the code on any linux machine (the directions are there to keep them with the file)
+ - `setup_rc_caps.sh` (currently in `aos` folder) is a shell script (you have to run as root) that lets you run the realtime code without installing aos.conf for an individual file
+ - `starter` is an init.d file
+ - install it by putting it in /etc/init.d an running `update-rc.d starter defaults`
+ - restart it by running `invoke-rc.d starter restart` (doesn't always work very well...)
+ - the .config files are for building linux kernels
diff --git a/aos/README.txt b/aos/README.txt
deleted file mode 100644
index e0f9377..0000000
--- a/aos/README.txt
+++ /dev/null
@@ -1,34 +0,0 @@
-see ../README.txt for overall information
-
-[FILES]
-config/ has some configuration files
- aos.conf has directions for setting up resource limits so you can run the code on any linux machine (the directions are there to keep them with the file)
- setup_rc_caps.sh is a shell script (you have to run as root) that lets you run the realtime code without installing aos.conf for an individual file
- starter is an init.d file
- install it by putting it in /etc/init.d an running "update-rc.d starter defaults"
- restart it by running "invoke-rc.d starter restart" (doesn't always work very well...)
- the .config files are for building linux kernels
-
-linux/ has code that only runs on linux systems
-
-common/ is where stuff that runs on all platforms is
-common/input/ is where the framework code for reading stuff into the queues system is
-common/output/ is where the framework for writing things out of the queues system is
-common/messages is where the c++ wrappers for "queues" are
-
-[NOTES]
-Some functions need to be in separate translation units in order for them to be guaranteed to work. As the C standard says,
- Alternatively, an implementation might perform various optimizations within each translation unit, such
- that the actual semantics would agree with the abstract semantics only when making function calls across
- translation unit boundaries. In such an implementation, at the time of each function entry and function
- return where the calling function and the called function are in different translation units, the values of all
- externally linked objects and of all objects accessible via pointers therein would agree with the abstract
- semantics. Furthermore, at the time of each such function entry the values of the parameters of the called
- function and of all objects accessible via pointers therein would agree with the abstract semantics. In this
- type of implementation, objects referred to by interrupt service routines activated by the signal function
- would require explicit specification of volatile storage, as well as other implementation-defined
- restrictions.
-
-The C++ namespace aos is used for all of the aos code. The crio and linux_code namespaces are for APIs that only make sense on one platform or the other.
-
-Almost everything in aos/ is thread-safe. Files with a main() might not be, and some pieces of code explicitly say at their declaration that they aren't. Also, code which shares non-const pointers (including this) is not generally thread-safe. Other than those exceptions, everything is assumed to be thread-safe so the comments don't say so explicitly.
diff --git a/aos/aos_dump.cc b/aos/aos_dump.cc
index 055700f..a0cca5d 100644
--- a/aos/aos_dump.cc
+++ b/aos/aos_dump.cc
@@ -14,6 +14,9 @@
"If true, fetch the current message on the channel first");
DEFINE_bool(pretty, false,
"If true, pretty print the messages on multiple lines");
+DEFINE_bool(all, false,
+ "If true, print out the channels for all nodes, not just the "
+ "channels which are visible on this node.");
DEFINE_bool(print_timestamps, true, "If true, timestamps are printed.");
DEFINE_uint64(count, 0,
"If >0, aos_dump will exit after printing this many messages.");
@@ -75,8 +78,11 @@
if (argc == 1) {
std::cout << "Channels:\n";
for (const aos::Channel *channel : *config_msg->channels()) {
- std::cout << channel->name()->c_str() << ' ' << channel->type()->c_str()
- << '\n';
+ if (FLAGS_all || aos::configuration::ChannelIsReadableOnNode(
+ channel, event_loop.node())) {
+ std::cout << channel->name()->c_str() << ' ' << channel->type()->c_str()
+ << '\n';
+ }
}
return 0;
}
@@ -86,6 +92,10 @@
config_msg->channels();
bool found_exact = false;
for (const aos::Channel *channel : *channels) {
+ if (!FLAGS_all && !aos::configuration::ChannelIsReadableOnNode(
+ channel, event_loop.node())) {
+ continue;
+ }
if (channel->name()->c_str() != channel_name) {
continue;
}
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index 9a24c8a..890bad0 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -131,6 +131,10 @@
// The node that this application will be started on.
// TODO(austin): Teach starter how to use this for starting applications.
node:string;
+
+ // The user to run this application as. If this field is unset, run it as
+ // the current user of the application starter.
+ user:string;
}
// Per node data and connection information.
diff --git a/aos/containers/BUILD b/aos/containers/BUILD
index 3e24fab..8277212 100644
--- a/aos/containers/BUILD
+++ b/aos/containers/BUILD
@@ -56,3 +56,13 @@
"//aos/testing:googletest",
],
)
+
+cc_library(
+ name = "resizeable_buffer",
+ hdrs = [
+ "resizeable_buffer.h",
+ ],
+ deps = [
+ "@com_github_google_glog//:glog",
+ ],
+)
diff --git a/aos/containers/resizeable_buffer.h b/aos/containers/resizeable_buffer.h
new file mode 100644
index 0000000..b94f8cc
--- /dev/null
+++ b/aos/containers/resizeable_buffer.h
@@ -0,0 +1,101 @@
+#ifndef AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
+#define AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
+
+#include <stdlib.h>
+
+#include <memory>
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// Kind of like a subset of vector<uint8_t>, but with less destructor calls.
+// When building unoptimized, especially with sanitizers, the vector<uint8_t>
+// version ends up being really slow in tests.
+class ResizeableBuffer {
+ public:
+ ResizeableBuffer() = default;
+
+ ResizeableBuffer(const ResizeableBuffer &other) { *this = other; }
+ ResizeableBuffer(ResizeableBuffer &&other) { *this = std::move(other); }
+ ResizeableBuffer &operator=(const ResizeableBuffer &other) {
+ resize(other.size());
+ memcpy(storage_.get(), other.storage_.get(), size());
+ return *this;
+ }
+ ResizeableBuffer &operator=(ResizeableBuffer &&other) {
+ std::swap(storage_, other.storage_);
+ std::swap(size_, other.size_);
+ std::swap(capacity_, other.capacity_);
+ return *this;
+ }
+
+ uint8_t *data() { return static_cast<uint8_t *>(storage_.get()); }
+ const uint8_t *data() const {
+ return static_cast<const uint8_t *>(storage_.get());
+ }
+
+ uint8_t *begin() { return data(); }
+ const uint8_t *begin() const { return data(); }
+ uint8_t *end() { return data() + size(); }
+ const uint8_t *end() const { return data() + size(); }
+
+ size_t size() const { return size_; }
+ size_t capacity() const { return capacity_; }
+
+ void resize(size_t new_size) {
+ if (new_size > capacity_) {
+ Allocate(new_size);
+ }
+ size_ = new_size;
+ }
+
+ void erase_front(size_t count) {
+ if (count == 0) {
+ return;
+ }
+ CHECK_LE(count, size_);
+ memmove(static_cast<void *>(data()), static_cast<void *>(data() + count),
+ size_ - count);
+ size_ -= count;
+ }
+
+ void push_back(uint8_t x) {
+ if (size_ == capacity_) {
+ Allocate(std::max<size_t>(16, size_ * 2));
+ }
+ *end() = x;
+ ++size_;
+ CHECK_LE(size_, capacity_);
+ }
+
+ private:
+ // We need this silly function because C++ is bad, and extensions to it which
+ // we work with make it a true nightmare.
+ //
+ // (a) You can't easily write out the signature of free because it depends on
+ // whether exceptions are enabled or not. You could use decltype, but see (b).
+ //
+ // (b) You can't easily write &free because CUDA overloads it with a
+ // __device__ version. You could cast to the appropriate version, but see (a).
+ //
+ // There's probably some kind of SFINAE thing which could find the matching
+ // signature from a set of choices, and then we could just
+ // static_cast<TheSignature>(&free). However, that sounds like a nightmare,
+ // especially because it has to conditionally enable the part mentioning CUDA
+ // identifiers in the preprocessor. This little function is way simpler.
+ static void DoFree(void *p) { free(p); }
+
+ void Allocate(size_t new_capacity) {
+ void *const old = storage_.release();
+ storage_.reset(CHECK_NOTNULL(realloc(old, new_capacity)));
+ capacity_ = new_capacity;
+ }
+
+ std::unique_ptr<void, decltype(&DoFree)> storage_{nullptr, &DoFree};
+ size_t size_ = 0, capacity_ = 0;
+};
+
+} // namespace aos
+
+#endif // AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 5912ac6..59968c3 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -271,6 +271,7 @@
shard_count = 4,
deps = [
":event_loop_param_test",
+ ":message_counter",
":ping_lib",
":pong_lib",
":simulated_event_loop",
@@ -341,3 +342,12 @@
"//aos/logging:log_message_fbs",
],
)
+
+cc_library(
+ name = "message_counter",
+ hdrs = ["message_counter.h"],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":event_loop",
+ ],
+)
diff --git a/aos/events/README.md b/aos/events/README.md
new file mode 100644
index 0000000..875e1b7
--- /dev/null
+++ b/aos/events/README.md
@@ -0,0 +1,74 @@
+## How to run ping & pong
+
+Running ping<->pong is a nice way to test that you can run some basic code and shows how messaging can work between two nodes
+
+### Setup real-time niceties:
+ 1. Add the following lines to `/etc/security/limits.d/rt.conf`, replacing "USERNAME" with the username you're running under. You'll probably need to do this as root, e.g., `sudo nano /etc/security/limits.d/rt.conf`
+```
+USERNAME - nice -20
+USERNAME - rtprio 95
+USERNAME - memlock unlimited
+```
+
+ 2. Reboot your machine to pick up the changes
+
+### Compile and run the code
+ 1. Compile the code for ping and pong, as well as aos_dump for looking at the messages. We'll assume throughout that you're running from the top level directory of the 971 code.
+ ```
+ bazel build -c opt //aos/events:ping //aos/events:pong //aos:aos_dump
+ ```
+
+ 2. In 2 separate windows, run the ping and pong commands using the `pingpong_config.json` config file:
+ 1. `bazel-bin/aos/events/ping --config bazel-bin/aos/events/pingpong_config.json`
+ 2. `bazel-bin/aos/events/pong --config bazel-bin/aos/events/pingpong_config.json`
+
+ 3. In a third window, explore the message stream using `aos_dump`. Some things you can do:
+ 1. List the channels:
+ `bazel-bin/aos/aos_dump --config bazel-bin/aos/events/pingpong_config.json`
+ 2. Listen to a specific topic on a channel-- copy one of the channels listed in the first step and put it at the end of the aos_dump command (e.g., "/test aos.examples.Ping")
+ `bazel-bin/aos/aos_dump --config bazel-bin/aos/events/pingpong_config.json /test aos.examples.Ping`
+ 3. Listen to multiple topics on a channel (e.g., all the topics published on "/test")
+ `bazel-bin/aos/aos_dump --config bazel-bin/aos/events/pingpong_config.json /test`
+
+
+NOTE: To make life easier, you can alias `aos_dump` to include the path and the config file (you may want to specify the full path to the aos_dump executable so it can be run from anywhere)
+```
+alias aos_dump='bazel-bin/aos/aos_dump --config bazel-bin/aos/events/pingpong_config.json'
+```
+
+## Logging
+
+In addition to running ping and pong, this is a good example to explore event logging.
+
+ 1. Start by compiling the code:
+ ```
+ bazel build -c opt //aos/events/logging:logger_main
+ ```
+
+ 2. Create a folder for the log files, e.g.,
+ ```
+ mkdir /tmp/log_folder
+ ```
+
+ 3. Run the command to log the data:
+ ```
+ bazel-bin/aos/events/logging/logger_main --config bazel-bin/aos/events/pingpong_config.json --logging_folder /tmp/log_folder/
+ ```
+
+A log file should be created in /tmp/log_folder, with a name something like `fbs_log-001.bfbs`.
+
+If you're running ping and pong at the same time, you should be able to watch the log file grow in size as events are being logged. For example, running `ls -lh /tmp/log_folder/*` will list the files and their sizes. Doing this periodically (e.g., every 15-30 seconds) should show the new log file growing in size.
+
+ 4. View the contents of the log file (using `log_cat` functionality found in the `logging` sub-directory):
+ 1. Compile `log_cat`:
+ ```
+ bazel build -c opt //aos/events/logging:log_cat
+ ```
+
+ 2. Run the binary pointed at the recorded event log (this assumes you're running from the base directory of the repository and that the file is named `fbs_log-001.bfbs`):
+ ```
+ bazel-bin/aos/events/logging/log_cat /tmp/log_folder/fbs_log-001.bfbs
+ ```
+
+## EXERCISE:
+ 1. Modify code in ping and pong to see a difference in their behavior, e.g., increment the counter by 2's instead of 1
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 1e7d25b..6ffb0fe 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -79,9 +79,10 @@
// underlying flatbuffer.
template <typename T>
FlatbufferVector<T> CopyFlatBuffer() const {
- return FlatbufferVector<T>(
- std::vector<uint8_t>(reinterpret_cast<const uint8_t *>(data),
- reinterpret_cast<const uint8_t *>(data) + size));
+ ResizeableBuffer buffer;
+ buffer.resize(size);
+ memcpy(buffer.data(), data, size);
+ return FlatbufferVector<T>(std::move(buffer));
}
};
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 56bef43..ad2f2d2 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -21,15 +21,153 @@
],
visibility = ["//visibility:public"],
deps = [
+ ":buffer_encoder",
":logger_fbs",
"//aos:configuration",
"//aos:flatbuffer_merge",
+ "//aos:flatbuffers",
+ "//aos/containers:resizeable_buffer",
"//aos/events:event_loop",
"//aos/util:file",
"@com_github_gflags_gflags//:gflags",
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/types:span",
+ ] + select({
+ "//tools:cpu_k8": [":lzma_encoder"],
+ "//tools:cpu_arm64": [":lzma_encoder"],
+ "//conditions:default": [],
+ }),
+)
+
+cc_binary(
+ name = "logfile_utils_out_of_space_test_runner",
+ testonly = True,
+ srcs = [
+ "logfile_utils_out_of_space_test_runner.cc",
+ ],
+ deps = [
+ ":logfile_utils",
+ "//aos:init",
+ "@com_github_gflags_gflags//:gflags",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+sh_test(
+ name = "logfile_utils_out_of_space_test",
+ srcs = [
+ "logfile_utils_out_of_space_test.sh",
+ ],
+ data = [
+ ":logfile_utils_out_of_space_test_runner",
+ ],
+ deps = [
+ "@bazel_tools//tools/bash/runfiles",
+ ],
+)
+
+cc_library(
+ name = "buffer_encoder",
+ srcs = [
+ "buffer_encoder.cc",
+ ],
+ hdrs = [
+ "buffer_encoder.h",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":logger_fbs",
+ "//aos:configuration_fbs",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_github_google_glog//:glog",
+ "@com_google_absl//absl/types:span",
+ ],
+)
+
+cc_test(
+ name = "buffer_encoder_test",
+ srcs = [
+ "buffer_encoder_test.cc",
+ ],
+ shard_count = 4,
+ deps = [
+ ":buffer_encoder",
+ ":buffer_encoder_param_test",
+ "//aos/testing:googletest",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+# TODO(Brian): Properly restrict this to specific platforms and
+# un-conditionalize the srcs and hdrs once we have support for that which
+# interacts with select properly.
+cc_library(
+ name = "lzma_encoder",
+ srcs = select({
+ "//tools:cpu_k8": [
+ "lzma_encoder.cc",
+ ],
+ "//tools:cpu_arm64": [
+ "lzma_encoder.cc",
+ ],
+ "//conditions:default": [],
+ }),
+ hdrs = select({
+ "//tools:cpu_k8": [
+ "lzma_encoder.h",
+ ],
+ "//tools:cpu_arm64": [
+ "lzma_encoder.h",
+ ],
+ "//conditions:default": [],
+ }),
+ visibility = ["//visibility:public"],
+ deps = [
+ ":buffer_encoder",
+ ":logger_fbs",
+ "//aos:configuration_fbs",
+ "//aos/containers:resizeable_buffer",
+ "//third_party:lzma",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_github_google_glog//:glog",
+ "@com_google_absl//absl/types:span",
+ ],
+)
+
+cc_test(
+ name = "lzma_encoder_test",
+ srcs = [
+ "lzma_encoder_test.cc",
+ ],
+ restricted_to = [
+ "//tools:k8",
+ "//tools:arm64",
+ ],
+ shard_count = 4,
+ deps = [
+ ":buffer_encoder_param_test",
+ ":lzma_encoder",
+ "//aos/testing:googletest",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+cc_library(
+ name = "buffer_encoder_param_test",
+ testonly = True,
+ srcs = [
+ "buffer_encoder_param_test.cc",
+ ],
+ hdrs = [
+ "buffer_encoder_param_test.h",
+ ],
+ deps = [
+ ":logfile_utils",
+ ":logger_fbs",
+ "//aos/testing:googletest",
+ "//aos/testing:random_seed",
+ "@com_github_google_glog//:glog",
],
)
@@ -153,8 +291,10 @@
":multinode_pingpong_config",
"//aos/events:pingpong_config",
],
+ shard_count = 5,
deps = [
":logger",
+ "//aos/events:message_counter",
"//aos/events:ping_lib",
"//aos/events:pong_lib",
"//aos/events:simulated_event_loop",
diff --git a/aos/events/logging/buffer_encoder.cc b/aos/events/logging/buffer_encoder.cc
new file mode 100644
index 0000000..10a7ed1
--- /dev/null
+++ b/aos/events/logging/buffer_encoder.cc
@@ -0,0 +1,65 @@
+#include "aos/events/logging/buffer_encoder.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+
+void DummyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+ CHECK(in.data()) << ": Encode called with nullptr.";
+
+ total_bytes_ += in.size();
+ queue_.emplace_back(std::move(in));
+}
+
+void DummyEncoder::Clear(const int n) {
+ CHECK_GE(n, 0);
+ CHECK_LE(static_cast<size_t>(n), queue_size());
+ queue_.erase(queue_.begin(), queue_.begin() + n);
+}
+
+std::vector<absl::Span<const uint8_t>> DummyEncoder::queue() const {
+ std::vector<absl::Span<const uint8_t>> queue;
+ queue.reserve(queue_.size());
+ for (const auto &buffer : queue_) {
+ queue.emplace_back(buffer.data(), buffer.size());
+ }
+ return queue;
+}
+
+size_t DummyEncoder::queued_bytes() const {
+ size_t bytes = 0;
+ for (const auto &buffer : queue_) {
+ bytes += buffer.size();
+ }
+ return bytes;
+}
+
+DummyDecoder::DummyDecoder(std::string_view filename)
+ : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DummyDecoder::~DummyDecoder() {
+ int status = close(fd_);
+ if (status != 0) {
+ PLOG(ERROR) << "DummyDecoder: Failed to close file";
+ }
+}
+
+size_t DummyDecoder::Read(uint8_t *begin, uint8_t *end) {
+ if (end_of_file_) {
+ return 0;
+ }
+ const ssize_t count = read(fd_, begin, end - begin);
+ PCHECK(count >= 0) << ": Failed to read from file";
+ if (count == 0) {
+ end_of_file_ = true;
+ }
+ return static_cast<size_t>(count);
+}
+
+} // namespace aos::logger
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
new file mode 100644
index 0000000..cc16fea
--- /dev/null
+++ b/aos/events/logging/buffer_encoder.h
@@ -0,0 +1,108 @@
+#ifndef AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
+#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Interface to encode DetachedBuffers as they are written to a file.
+//
+// The interface to a file is represented as a series of buffers, appropriate to
+// pass to writev(2). The backing storage for these is managed internally so
+// subclasses can handle resizing/moving as desired. Implementations must
+// enqueue as many of these backing buffers as demanded, leaving it up to the
+// caller to write them to a file when desired.
+class DetachedBufferEncoder {
+ public:
+ virtual ~DetachedBufferEncoder() = default;
+
+ // Encodes and enqueues the given DetachedBuffer.
+ //
+ // Note that in may be moved-from, or it may be left unchanged, depending on
+ // what makes the most sense for a given implementation.
+ virtual void Encode(flatbuffers::DetachedBuffer &&in) = 0;
+
+ // If this returns true, the encoder may be bypassed by writing directly to
+ // the file.
+ virtual bool may_bypass() const { return false; }
+
+ // Finalizes the encoding process. After this, queue_size() represents the
+ // full extent of data which will be written to this file.
+ //
+ // Encode may not be called after this method.
+ virtual void Finish() = 0;
+
+ // Clears the first n encoded buffers from the queue.
+ virtual void Clear(int n) = 0;
+
+ // Returns a view of the queue of encoded buffers.
+ virtual std::vector<absl::Span<const uint8_t>> queue() const = 0;
+
+ // Returns the total number of of bytes currently queued up.
+ virtual size_t queued_bytes() const = 0;
+
+ // Returns the cumulative number of bytes which have been queued. This
+ // includes data which has been removed via Clear.
+ virtual size_t total_bytes() const = 0;
+
+ // Returns the number of elements in the queue.
+ virtual size_t queue_size() const = 0;
+};
+
+// This class does not encode the data. It just claims ownership of the raw data
+// and queues it up as is.
+class DummyEncoder final : public DetachedBufferEncoder {
+ public:
+ ~DummyEncoder() override = default;
+
+ // No encoding happens, the raw data is queued up as is.
+ void Encode(flatbuffers::DetachedBuffer &&in) final;
+ bool may_bypass() const final { return true; }
+ void Finish() final {}
+ void Clear(int n) final;
+ std::vector<absl::Span<const uint8_t>> queue() const final;
+ size_t queued_bytes() const final;
+ size_t total_bytes() const final { return total_bytes_; }
+ size_t queue_size() const final { return queue_.size(); }
+
+ private:
+ std::vector<flatbuffers::DetachedBuffer> queue_;
+ size_t total_bytes_ = 0;
+};
+
+// Interface to decode chunks of data. Implementations of this interface will
+// manage opening, reading, and closing the file stream.
+class DataDecoder {
+ public:
+ virtual ~DataDecoder() = default;
+
+ // Reads data into the given range. Returns the number of bytes read.
+ //
+ // Returns less than end-begin if all bytes have been read. Otherwise, this
+ // will always fill the whole range.
+ virtual size_t Read(uint8_t *begin, uint8_t *end) = 0;
+};
+
+// Simply reads the contents of the file into the target buffer.
+class DummyDecoder final : public DataDecoder {
+ public:
+ explicit DummyDecoder(std::string_view filename);
+ ~DummyDecoder() override;
+
+ size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+ // File descriptor for the log file.
+ int fd_;
+
+ // Cached bit for if we have reached the end of the file. Otherwise we will
+ // hammer on the kernel asking for more data each time we send.
+ bool end_of_file_ = false;
+};
+
+} // namespace aos::logger
+
+#endif // AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
diff --git a/aos/events/logging/buffer_encoder_param_test.cc b/aos/events/logging/buffer_encoder_param_test.cc
new file mode 100644
index 0000000..825696e
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_param_test.cc
@@ -0,0 +1,92 @@
+#include "aos/events/logging/buffer_encoder_param_test.h"
+
+#include "gmock/gmock.h"
+
+namespace aos::logger::testing {
+
+// Verifies that Clear affects the sizes as expected.
+TEST_P(BufferEncoderTest, ClearAndSizes) {
+ const auto encoder = MakeEncoder();
+
+ std::uniform_int_distribution<int> quantity_distribution(500, 600);
+ const int first_quantity = quantity_distribution(*random_number_generator());
+ const int second_quantity = quantity_distribution(*random_number_generator());
+
+ const auto first_buffers = CreateAndEncode(first_quantity, encoder.get());
+ const auto first_size = encoder->queued_bytes();
+ // We want at least 2 buffers to ensure this test is actually validating the
+ // behavior.
+ ASSERT_GT(first_size, 2u) << ": Encoder chunk size is too big for this test";
+ ASSERT_EQ(encoder->total_bytes(), first_size);
+ ASSERT_EQ(TotalSize(encoder->queue()), first_size);
+ ASSERT_EQ(encoder->queue_size(), encoder->queue().size());
+
+ encoder->Clear(encoder->queue_size());
+ ASSERT_EQ(encoder->queued_bytes(), 0);
+ ASSERT_EQ(encoder->total_bytes(), first_size);
+ ASSERT_EQ(encoder->queue().size(), 0);
+ ASSERT_EQ(encoder->queue_size(), 0);
+
+ const auto second_buffers = CreateAndEncode(second_quantity, encoder.get());
+ const auto second_size = encoder->queued_bytes();
+ ASSERT_GT(second_size, 2u) << ": Encoder chunk size is too big for this test";
+ ASSERT_EQ(encoder->total_bytes(), first_size + second_size);
+ ASSERT_EQ(TotalSize(encoder->queue()), second_size);
+ ASSERT_EQ(encoder->queue_size(), encoder->queue().size());
+}
+
+// Runs data in randomly-chunked sizes through the encoder and decoder to verify
+// it comes back out the same.
+TEST_P(BufferEncoderTest, RoundTrip) {
+ std::uniform_int_distribution<int> quantity_distribution(20, 60);
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+
+ std::vector<std::vector<uint8_t>> encoded_buffers;
+ {
+ const int encode_chunks = quantity_distribution(*random_number_generator());
+ const auto encoder = MakeEncoder();
+ encoded_buffers = CreateAndEncode(encode_chunks, encoder.get());
+ encoder->Finish();
+
+ std::ofstream output_file(file_path, std::ios::binary);
+ for (auto span : encoder->queue()) {
+ output_file.write(reinterpret_cast<const char *>(span.data()),
+ span.size());
+ }
+ }
+
+ const size_t total_encoded_size = TotalSize(encoded_buffers);
+
+ // Try decoding in multiple random chunkings.
+ for (int i = 0; i < 20; ++i) {
+ const auto decoder = MakeDecoder(file_path);
+ std::vector<std::vector<uint8_t>> decoded_buffers;
+ size_t total_decoded_size = 0;
+ while (true) {
+ const int chunk_size = quantity_distribution(*random_number_generator());
+ std::vector<uint8_t> chunk(chunk_size);
+ const size_t read_result =
+ decoder->Read(chunk.data(), chunk.data() + chunk_size);
+ if (read_result + total_decoded_size != total_encoded_size) {
+ // We didn't read anything, so we should've read the complete chunk.
+ ASSERT_EQ(read_result, chunk_size);
+ }
+ // Eventually we'll get here, once the decoder is really sure it's done.
+ if (read_result == 0) {
+ // Sanity check the math in the test code.
+ CHECK_EQ(total_decoded_size, TotalSize(decoded_buffers));
+ // Bail out because we're done.
+ break;
+ }
+ // If we're at the end, trim off the 0s so our comparison later works out.
+ chunk.resize(read_result);
+ total_decoded_size += read_result;
+ decoded_buffers.emplace_back(std::move(chunk));
+ }
+ ASSERT_THAT(Flatten(decoded_buffers),
+ ::testing::Eq(Flatten(encoded_buffers)));
+ }
+}
+
+} // namespace aos::logger::testing
diff --git a/aos/events/logging/buffer_encoder_param_test.h b/aos/events/logging/buffer_encoder_param_test.h
new file mode 100644
index 0000000..fd1a00c
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_param_test.h
@@ -0,0 +1,114 @@
+#ifndef AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
+#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
+
+#include <functional>
+#include <memory>
+#include <random>
+#include <vector>
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/testing/random_seed.h"
+
+namespace aos::logger::testing {
+
+// Contains some helpful infrastructure for testing DetachedBufferEncoder
+// implementations.
+class BufferEncoderBaseTest : public ::testing::Test {
+ public:
+ // Creates and encodes n random buffers, returning a copy of the encoded data.
+ std::vector<std::vector<uint8_t>> CreateAndEncode(
+ int n, DetachedBufferEncoder *encoder) {
+ std::vector<std::vector<uint8_t>> result;
+ for (int i = 0; i < n; i++) {
+ flatbuffers::DetachedBuffer buffer = CreateRandomBuffer();
+ result.emplace_back(buffer.data(), buffer.data() + buffer.size());
+ encoder->Encode(std::move(buffer));
+ }
+ return result;
+ }
+
+ // Returns the total size of a vector full of objects with a size() method.
+ template <typename T>
+ static size_t TotalSize(const std::vector<T> &buffers) {
+ size_t result = 0;
+ for (const auto &v : buffers) {
+ result += v.size();
+ }
+ return result;
+ }
+
+ // Returns a flattened copy of a vector of sequences.
+ template <typename T>
+ static std::vector<typename T::value_type> Flatten(
+ const std::vector<T> &buffers) {
+ std::vector<typename T::value_type> result;
+ for (const auto &buffer : buffers) {
+ result.insert(result.end(), buffer.begin(), buffer.end());
+ }
+ return result;
+ }
+
+ void Reseed(int seed) {
+ // This algorithm allows a reasonable --runs_per_test to cover new seeds.
+ random_number_generator_ =
+ std::mt19937(::aos::testing::RandomSeed() + 1000 * seed);
+ }
+
+ std::mt19937 *random_number_generator() { return &random_number_generator_; }
+
+ private:
+ flatbuffers::DetachedBuffer CreateRandomBuffer() {
+ std::uniform_int_distribution<int> length_distribution(2800, 3000);
+ const size_t length = length_distribution(random_number_generator_);
+
+ flatbuffers::FlatBufferBuilder fbb;
+ uint8_t *data;
+ const flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+ fbb.CreateUninitializedVector(length, &data);
+ {
+ std::independent_bits_engine<std::mt19937, CHAR_BIT, uint8_t> engine(
+ std::ref(random_number_generator_));
+ const uint8_t byte = engine();
+ std::fill(data, data + length, byte);
+ }
+
+ MessageHeader::Builder builder(fbb);
+ builder.add_data(data_offset);
+ fbb.FinishSizePrefixed(builder.Finish());
+
+ return fbb.Release();
+ }
+
+ std::mt19937 random_number_generator_{
+ std::mt19937(::aos::testing::RandomSeed())};
+};
+
+// Tests some generic properties for any DetachedBufferEncoder+DataDecoder
+// implementation pair.
+//
+// First and second test parameters are methods to create instances of the
+// encoder and decoder. Third test parameter is a random seed, to allow testing
+// many different data patterns.
+class BufferEncoderTest
+ : public BufferEncoderBaseTest,
+ public ::testing::WithParamInterface<std::tuple<
+ std::function<std::unique_ptr<DetachedBufferEncoder>()>,
+ std::function<std::unique_ptr<DataDecoder>(std::string_view)>, int>> {
+ public:
+ BufferEncoderTest() { Reseed(std::get<2>(GetParam())); }
+
+ std::unique_ptr<DetachedBufferEncoder> MakeEncoder() const {
+ return std::get<0>(GetParam())();
+ }
+ std::unique_ptr<DataDecoder> MakeDecoder(std::string_view filename) const {
+ return std::get<1>(GetParam())(filename);
+ }
+};
+
+} // namespace aos::logger::testing
+
+#endif // AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
diff --git a/aos/events/logging/buffer_encoder_test.cc b/aos/events/logging/buffer_encoder_test.cc
new file mode 100644
index 0000000..a9c2bbb
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_test.cc
@@ -0,0 +1,106 @@
+#include "aos/events/logging/buffer_encoder.h"
+
+#include <algorithm>
+#include <fstream>
+#include <string>
+
+#include "glog/logging.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+
+namespace aos::logger::testing {
+
+class DummyEncoderTest : public BufferEncoderBaseTest {};
+
+// Tests that buffers are enqueued without any changes.
+TEST_F(DummyEncoderTest, QueuesBuffersAsIs) {
+ DummyEncoder encoder;
+ const auto expected = CreateAndEncode(100, &encoder);
+
+ auto queue = encoder.queue();
+ EXPECT_THAT(queue, ::testing::ElementsAreArray(expected));
+}
+
+// Checks that DummyDecoder can read into a buffer.
+TEST(DummyDecoderTest, ReadsIntoExactBuffer) {
+ static const std::string kTestString{"Just some random words."};
+
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+ std::ofstream(file_path, std::ios::binary) << kTestString;
+
+ // Read the contents of the file into the buffer.
+ DummyDecoder dummy_decoder(file_path.c_str());
+ std::vector<uint8_t> buffer(kTestString.size());
+ const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+ ASSERT_EQ(std::string(buffer.data(), buffer.data() + count), kTestString);
+
+ for (int i = 0; i < 10; ++i) {
+ // Verify that there is no more data to read from the file.
+ ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+ }
+}
+
+// Checks that DummyDecoder can read into a buffer that can accommodate all the
+// data in the file.
+TEST(DummyDecoderTest, ReadsIntoLargerBuffer) {
+ static const std::string kTestString{"Just some random words."};
+
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+ std::ofstream(file_path, std::ios::binary) << kTestString;
+
+ DummyDecoder dummy_decoder(file_path.c_str());
+ std::vector<uint8_t> buffer(100);
+ const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+ buffer.resize(count);
+ ASSERT_EQ(std::string(buffer.data(), buffer.data() + count), kTestString);
+
+ // Verify that there is no more data to read from the file.
+ ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+}
+
+// Checks that DummyDecoder can repeatedly read the contents of the file into a
+// smaller buffer until there is no more to read.
+TEST(DummyDecoderTest, ReadsRepeatedlyIntoSmallerBuffer) {
+ static const std::string kTestString{"Just some random words."};
+
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+ std::ofstream(file_path, std::ios::binary) << kTestString;
+
+ DummyDecoder dummy_decoder(file_path.c_str());
+ std::vector<uint8_t> buffer((kTestString.size() + 1) / 2);
+
+ {
+ // Read into our buffer once, and verify the contents.
+ const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+ ASSERT_EQ(std::string(buffer.data(), buffer.data() + count),
+ kTestString.substr(0, buffer.size()));
+ }
+
+ {
+ // Read into the same buffer again, and verify the contents.
+ const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+ ASSERT_EQ(
+ std::string(buffer.data(), buffer.data() + count),
+ kTestString.substr(buffer.size(), kTestString.size() - buffer.size()));
+ }
+
+ // Verify that there is no more data to read from the file.
+ ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+}
+
+INSTANTIATE_TEST_CASE_P(
+ Dummy, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ return std::make_unique<DummyEncoder>();
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<DummyDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
+} // namespace aos::logger::testing
diff --git a/aos/events/logging/eigen_mpq.h b/aos/events/logging/eigen_mpq.h
index 1463648..370d77e 100644
--- a/aos/events/logging/eigen_mpq.h
+++ b/aos/events/logging/eigen_mpq.h
@@ -8,8 +8,7 @@
// TypeTraits for mpq_class. This is only really enough to use inverse().
template <>
-struct NumTraits<mpq_class>
- : GenericNumTraits<mpq_class> {
+struct NumTraits<mpq_class> : GenericNumTraits<mpq_class> {
typedef mpq_class Real;
typedef mpq_class Literal;
typedef mpq_class NonInteger;
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index f406d24..0afdc03 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -127,7 +127,7 @@
unsorted_logfiles.emplace_back(std::string(argv[i]));
}
- std::vector<std::vector<std::string>> logfiles =
+ const std::vector<aos::logger::LogFile> logfiles =
aos::logger::SortParts(unsorted_logfiles);
aos::logger::LogReader reader(logfiles);
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index 4078f9b..02a1402 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -15,46 +15,48 @@
"If true, replace the header on the log file with the JSON header.");
DEFINE_string(
header, "",
- "If provided, this is the path to the JSON with the log file header.");
+ "If provided, this is the path to the JSON with the log file header. If "
+ "not provided, _header.json will be appended to --logfile.");
int main(int argc, char **argv) {
gflags::SetUsageMessage(R"(This tool lets us manipulate log files.)");
aos::InitGoogle(&argc, &argv);
- if (!FLAGS_header.empty()) {
- if (FLAGS_replace) {
- const ::std::string header_json =
- aos::util::ReadFileToStringOrDie(FLAGS_header);
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
- flatbuffers::Offset<aos::logger::LogFileHeader> header =
- aos::JsonToFlatbuffer<aos::logger::LogFileHeader>(header_json, &fbb);
+ std::string header =
+ FLAGS_header.empty() ? (FLAGS_logfile + "_header.json") : FLAGS_header;
- fbb.FinishSizePrefixed(header);
+ if (FLAGS_replace) {
+ const ::std::string header_json = aos::util::ReadFileToStringOrDie(header);
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+ flatbuffers::Offset<aos::logger::LogFileHeader> header =
+ aos::JsonToFlatbuffer<aos::logger::LogFileHeader>(header_json, &fbb);
- const std::string orig_path = FLAGS_logfile + ".orig";
- PCHECK(rename(FLAGS_logfile.c_str(), orig_path.c_str()) == 0);
+ fbb.FinishSizePrefixed(header);
- aos::logger::SpanReader span_reader(orig_path);
- CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
+ const std::string orig_path = FLAGS_logfile + ".orig";
+ PCHECK(rename(FLAGS_logfile.c_str(), orig_path.c_str()) == 0);
- aos::logger::DetachedBufferWriter buffer_writer(FLAGS_logfile);
- buffer_writer.QueueSizedFlatbuffer(&fbb);
+ aos::logger::SpanReader span_reader(orig_path);
+ CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
- while (true) {
- absl::Span<const uint8_t> msg_data = span_reader.ReadMessage();
- if (msg_data == absl::Span<const uint8_t>()) {
- break;
- }
+ aos::logger::DetachedBufferWriter buffer_writer(
+ FLAGS_logfile, std::make_unique<aos::logger::DummyEncoder>());
+ buffer_writer.QueueSizedFlatbuffer(&fbb);
- buffer_writer.WriteSizedFlatbuffer(msg_data);
+ while (true) {
+ absl::Span<const uint8_t> msg_data = span_reader.ReadMessage();
+ if (msg_data == absl::Span<const uint8_t>()) {
+ break;
}
- } else {
- aos::logger::MessageReader reader(FLAGS_logfile);
- aos::util::WriteStringToFileOrDie(
- FLAGS_header, aos::FlatbufferToJson(reader.log_file_header(),
- {.multi_line = true}));
+
+ buffer_writer.QueueSpan(msg_data);
}
+ } else {
+ aos::logger::MessageReader reader(FLAGS_logfile);
+ aos::util::WriteStringToFileOrDie(
+ header,
+ aos::FlatbufferToJson(reader.log_file_header(), {.multi_line = true}));
}
return 0;
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index def0842..8dd5061 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -32,7 +32,7 @@
const Node *node) {
CHECK_EQ(node, this->node());
UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
+ data_writer_->QueueSpan(header->full_span());
}
DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
@@ -47,7 +47,7 @@
++part_number_;
*data_writer_ = std::move(*OpenDataWriter());
UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
+ data_writer_->QueueSpan(header->full_span());
}
DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
@@ -81,14 +81,14 @@
const Node *node) {
if (node == this->node()) {
UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
+ data_writer_->QueueSpan(header->full_span());
} else {
for (std::pair<const Channel *const, DataWriter> &data_writer :
data_writers_) {
if (node == data_writer.second.node) {
UpdateHeader(header, data_writer.second.uuid,
data_writer.second.part_number);
- data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+ data_writer.second.writer->QueueSpan(header->full_span());
}
}
}
@@ -101,7 +101,7 @@
++part_number_;
*data_writer_ = std::move(*OpenDataWriter());
UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
+ data_writer_->QueueSpan(header->full_span());
} else {
for (std::pair<const Channel *const, DataWriter> &data_writer :
data_writers_) {
@@ -110,7 +110,7 @@
data_writer.second.rotate(data_writer.first, &data_writer.second);
UpdateHeader(header, data_writer.second.uuid,
data_writer.second.part_number);
- data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+ data_writer.second.writer->QueueSpan(header->full_span());
}
}
}
@@ -188,11 +188,11 @@
DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
const Channel *channel) {
- const bool log_delivery_times =
- (this->node() == nullptr)
- ? false
- : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- channel, this->node(), this->node());
+ bool log_delivery_times = false;
+ if (this->node() != nullptr) {
+ log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, this->node(), this->node());
+ }
if (!log_delivery_times) {
return nullptr;
}
@@ -200,37 +200,76 @@
return data_writer_.get();
}
+void MultiNodeLogNamer::Close() {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (data_writer.second.writer) {
+ data_writer.second.writer->Close();
+ if (data_writer.second.writer->ran_out_of_space()) {
+ ran_out_of_space_ = true;
+ data_writer.second.writer->acknowledge_out_of_space();
+ }
+ }
+ }
+ if (data_writer_) {
+ data_writer_->Close();
+ if (data_writer_->ran_out_of_space()) {
+ ran_out_of_space_ = true;
+ data_writer_->acknowledge_out_of_space();
+ }
+ }
+}
+
void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
DataWriter *data_writer) {
std::string filename =
absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
"/", channel->type()->string_view(), ".part",
data_writer->part_number, ".bfbs");
-
- if (!data_writer->writer) {
- data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
- } else {
- *data_writer->writer = DetachedBufferWriter(filename);
- }
+ CreateBufferWriter(filename, &data_writer->writer);
}
void MultiNodeLogNamer::OpenWriter(const Channel *channel,
DataWriter *data_writer) {
const std::string filename = absl::StrCat(
- base_name_, "_", channel->source_node()->string_view(), "_data",
- channel->name()->string_view(), "/", channel->type()->string_view(),
- ".part", data_writer->part_number, ".bfbs");
- if (!data_writer->writer) {
- data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
- } else {
- *data_writer->writer = DetachedBufferWriter(filename);
- }
+ base_name_, "_", CHECK_NOTNULL(channel->source_node())->string_view(),
+ "_data", channel->name()->string_view(), "/",
+ channel->type()->string_view(), ".part", data_writer->part_number,
+ ".bfbs");
+ CreateBufferWriter(filename, &data_writer->writer);
}
std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
+ std::string name = base_name_;
+ if (node() != nullptr) {
+ name = absl::StrCat(name, "_", node()->name()->string_view());
+ }
return std::make_unique<DetachedBufferWriter>(
- absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
- part_number_, ".bfbs"));
+ absl::StrCat(name, "_data.part", part_number_, ".bfbs"),
+ std::make_unique<DummyEncoder>());
+}
+
+void MultiNodeLogNamer::CreateBufferWriter(
+ std::string_view filename,
+ std::unique_ptr<DetachedBufferWriter> *destination) {
+ if (ran_out_of_space_) {
+ // Refuse to open any new files, which might skip data. Any existing files
+ // are in the same folder, which means they're on the same filesystem, which
+ // means they're probably going to run out of space and get stuck too.
+ return;
+ }
+ if (!destination->get()) {
+ *destination = std::make_unique<DetachedBufferWriter>(
+ filename, std::make_unique<DummyEncoder>());
+ return;
+ }
+ destination->get()->Close();
+ if (destination->get()->ran_out_of_space()) {
+ ran_out_of_space_ = true;
+ return;
+ }
+ *destination->get() =
+ DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
}
} // namespace logger
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 631016b..661f28d 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -33,17 +33,26 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
const Node *node) = 0;
- // Returns a writer for writing data logged on this channel (on the node
- // provided in the constructor).
+ // Returns a writer for writing data from messages on this channel (on the
+ // primary node).
+ //
+ // The returned pointer will stay valid across rotations, but the object it
+ // points to will be assigned to.
virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
- // Returns a writer for writing timestamps logged on this channel (on the node
- // provided in the constructor).
+ // Returns a writer for writing timestamps from messages on this channel (on
+ // the primary node).
+ //
+ // The returned pointer will stay valid across rotations, but the object it
+ // points to will be assigned to.
virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
// Returns a writer for writing timestamps delivered over the special
// /aos/remote_timestamps/* channels. node is the node that the timestamps
- // are forwarded back from.
+ // are forwarded back from (to the primary node).
+ //
+ // The returned pointer will stay valid across rotations, but the object it
+ // points to will be assigned to.
virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
const Channel *channel, const Node *node) = 0;
@@ -79,6 +88,7 @@
base_name_(base_name),
uuid_(UUID::Random()),
data_writer_(OpenDataWriter()) {}
+ ~LocalLogNamer() override = default;
void WriteHeader(
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
@@ -99,7 +109,8 @@
// Creates a new data writer with the new part number.
std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
return std::make_unique<DetachedBufferWriter>(
- absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+ absl::StrCat(base_name_, ".part", part_number_, ".bfbs"),
+ std::make_unique<aos::logger::DummyEncoder>());
}
const std::string base_name_;
@@ -113,6 +124,9 @@
public:
MultiNodeLogNamer(std::string_view base_name,
const Configuration *configuration, const Node *node);
+ ~MultiNodeLogNamer() override = default;
+
+ std::string_view base_name() const { return base_name_; }
void WriteHeader(
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
@@ -124,11 +138,41 @@
DetachedBufferWriter *MakeWriter(const Channel *channel) override;
- DetachedBufferWriter *MakeForwardedTimestampWriter(
- const Channel *channel, const Node *node) override;
+ DetachedBufferWriter *MakeForwardedTimestampWriter(const Channel *channel,
+ const Node *node) override;
DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+ // Indicates that at least one file ran out of space. Once this happens, we
+ // stop trying to open new files, to avoid writing any files with holes from
+ // previous parts.
+ //
+ // Besides this function, this object will silently stop logging data when
+ // this occurs. If you want to ensure log files are complete, you must call
+ // this method.
+ bool ran_out_of_space() const { return ran_out_of_space_; }
+
+ // Returns the maximum total_bytes() value for all existing
+ // DetachedBufferWriters.
+ //
+ // Returns 0 if no files are open.
+ size_t maximum_total_bytes() const {
+ size_t result = 0;
+ for (const std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ result = std::max(result, data_writer.second.writer->total_bytes());
+ }
+ if (data_writer_) {
+ result = std::max(result, data_writer_->total_bytes());
+ }
+ return result;
+ }
+
+ // Closes all existing log files. No more data may be written after this.
+ //
+ // This may set ran_out_of_space().
+ void Close();
+
private:
// Files to write remote data to. We want one per channel. Maps the channel
// to the writer, Node, and part number.
@@ -150,12 +194,17 @@
// Opens the main data writer file for this node responsible for data_writer_.
std::unique_ptr<DetachedBufferWriter> OpenDataWriter();
+ void CreateBufferWriter(std::string_view filename,
+ std::unique_ptr<DetachedBufferWriter> *destination);
+
const std::string base_name_;
const Configuration *const configuration_;
const UUID uuid_;
size_t part_number_ = 0;
+ bool ran_out_of_space_ = false;
+
// File to write both delivery timestamps and local data to.
std::unique_ptr<DetachedBufferWriter> data_writer_;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7f53577..5226154 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1,32 +1,43 @@
#include "aos/events/logging/logfile_utils.h"
#include <fcntl.h>
-#include <limits.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
-#include <vector>
+#include <algorithm>
+#include <climits>
#include "absl/strings/escaping.h"
#include "aos/configuration.h"
-#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/util/file.h"
#include "flatbuffers/flatbuffers.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
-DEFINE_int32(flush_size, 1000000,
+#if defined(__x86_64__)
+#define ENABLE_LZMA 1
+#elif defined(__aarch64__)
+#define ENABLE_LZMA 1
+#else
+#define ENABLE_LZMA 0
+#endif
+
+#if ENABLE_LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
+
+DEFINE_int32(flush_size, 128000,
"Number of outstanding bytes to allow before flushing to disk.");
-namespace aos {
-namespace logger {
+namespace aos::logger {
namespace chrono = std::chrono;
-DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
- : filename_(filename) {
+DetachedBufferWriter::DetachedBufferWriter(
+ std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+ : filename_(filename), encoder_(std::move(encoder)) {
util::MkdirP(filename, 0777);
fd_ = open(std::string(filename).c_str(),
O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
@@ -35,91 +46,161 @@
}
DetachedBufferWriter::~DetachedBufferWriter() {
- Flush();
- PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
- VLOG(1) << "Closed " << filename_;
+ Close();
+ if (ran_out_of_space_) {
+ CHECK(acknowledge_ran_out_of_space_)
+ << ": Unacknowledged out of disk space, log file was not completed";
+ }
}
-DetachedBufferWriter::DetachedBufferWriter(
- DetachedBufferWriter &&other) {
+DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
*this = std::move(other);
}
+// When other is destroyed "soon" (which it should be because we're getting an
+// rvalue reference to it), it will flush etc all the data we have queued up
+// (because that data will then be its data).
DetachedBufferWriter &DetachedBufferWriter::operator=(
DetachedBufferWriter &&other) {
- Flush();
std::swap(filename_, other.filename_);
+ std::swap(encoder_, other.encoder_);
std::swap(fd_, other.fd_);
- std::swap(queued_size_, other.queued_size_);
- std::swap(written_size_, other.written_size_);
- std::swap(queue_, other.queue_);
+ std::swap(ran_out_of_space_, other.ran_out_of_space_);
+ std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
std::swap(iovec_, other.iovec_);
+ std::swap(max_write_time_, other.max_write_time_);
+ std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
+ std::swap(max_write_time_messages_, other.max_write_time_messages_);
+ std::swap(total_write_time_, other.total_write_time_);
+ std::swap(total_write_count_, other.total_write_count_);
+ std::swap(total_write_messages_, other.total_write_messages_);
+ std::swap(total_write_bytes_, other.total_write_bytes_);
return *this;
}
-void DetachedBufferWriter::QueueSizedFlatbuffer(
- flatbuffers::FlatBufferBuilder *fbb) {
- QueueSizedFlatbuffer(fbb->Release());
+void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
+ if (encoder_->may_bypass() && span.size() > 4096u) {
+ // Over this threshold, we'll assume it's cheaper to add an extra
+ // syscall to write the data immediately instead of copying it to
+ // enqueue.
+
+ if (ran_out_of_space_) {
+ // We don't want any later data to be written after space becomes
+ // available, so refuse to write anything more once we've dropped data
+ // because we ran out of space.
+ VLOG(1) << "Ignoring span: " << span.size();
+ return;
+ }
+
+ // First, flush everything.
+ while (encoder_->queue_size() > 0u) {
+ Flush();
+ }
+
+ // Then, write it directly.
+ const auto start = aos::monotonic_clock::now();
+ const ssize_t written = write(fd_, span.data(), span.size());
+ const auto end = aos::monotonic_clock::now();
+ HandleWriteReturn(written, span.size());
+ UpdateStatsForWrite(end - start, written, 1);
+ } else {
+ encoder_->Encode(CopySpanAsDetachedBuffer(span));
+ }
+
+ FlushAtThreshold();
}
-void DetachedBufferWriter::WriteSizedFlatbuffer(
- absl::Span<const uint8_t> span) {
- // Cheat aggressively... Write out the queued up data, and then write this
- // data once without buffering. It is hard to make a DetachedBuffer out of
- // this data, and we don't want to worry about lifetimes.
- Flush();
- iovec_.clear();
- iovec_.reserve(1);
-
- struct iovec n;
- n.iov_base = const_cast<uint8_t *>(span.data());
- n.iov_len = span.size();
- iovec_.emplace_back(n);
-
- const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
- PCHECK(written == static_cast<ssize_t>(n.iov_len))
- << ": Wrote " << written << " expected " << n.iov_len;
- written_size_ += written;
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
- flatbuffers::DetachedBuffer &&buffer) {
- queued_size_ += buffer.size();
- queue_.emplace_back(std::move(buffer));
-
- // Flush if we are at the max number of iovs per writev, or have written
- // enough data. Otherwise writev will fail with an invalid argument.
- if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
- queue_.size() == IOV_MAX) {
+void DetachedBufferWriter::Close() {
+ if (fd_ == -1) {
+ return;
+ }
+ encoder_->Finish();
+ while (encoder_->queue_size() > 0) {
Flush();
}
+ if (close(fd_) == -1) {
+ if (errno == ENOSPC) {
+ ran_out_of_space_ = true;
+ } else {
+ PLOG(ERROR) << "Closing log file failed";
+ }
+ }
+ fd_ = -1;
+ VLOG(1) << "Closed " << filename_;
}
void DetachedBufferWriter::Flush() {
- if (queue_.size() == 0u) {
+ const auto queue = encoder_->queue();
+ if (queue.empty()) {
return;
}
- iovec_.clear();
- iovec_.reserve(queue_.size());
- size_t counted_size = 0;
- for (size_t i = 0; i < queue_.size(); ++i) {
- struct iovec n;
- n.iov_base = queue_[i].data();
- n.iov_len = queue_[i].size();
- counted_size += n.iov_len;
- iovec_.emplace_back(std::move(n));
+ if (ran_out_of_space_) {
+ // We don't want any later data to be written after space becomes available,
+ // so refuse to write anything more once we've dropped data because we ran
+ // out of space.
+ VLOG(1) << "Ignoring queue: " << queue.size();
+ encoder_->Clear(queue.size());
+ return;
}
- CHECK_EQ(counted_size, queued_size_);
+
+ iovec_.clear();
+ const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+ iovec_.resize(iovec_size);
+ size_t counted_size = 0;
+ for (size_t i = 0; i < iovec_size; ++i) {
+ iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
+ iovec_[i].iov_len = queue[i].size();
+ counted_size += iovec_[i].iov_len;
+ }
+
+ const auto start = aos::monotonic_clock::now();
const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+ const auto end = aos::monotonic_clock::now();
+ HandleWriteReturn(written, counted_size);
- PCHECK(written == static_cast<ssize_t>(queued_size_))
- << ": Wrote " << written << " expected " << queued_size_;
- written_size_ += written;
+ encoder_->Clear(iovec_size);
- queued_size_ = 0;
- queue_.clear();
- // TODO(austin): Handle partial writes in some way other than crashing...
+ UpdateStatsForWrite(end - start, written, iovec_size);
+}
+
+void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
+ size_t write_size) {
+ if (write_return == -1 && errno == ENOSPC) {
+ ran_out_of_space_ = true;
+ return;
+ }
+ PCHECK(write_return >= 0) << ": write failed";
+ if (write_return < static_cast<ssize_t>(write_size)) {
+ // Sometimes this happens instead of ENOSPC. On a real filesystem, this
+ // never seems to happen in any other case. If we ever want to log to a
+ // socket, this will happen more often. However, until we get there, we'll
+ // just assume it means we ran out of space.
+ ran_out_of_space_ = true;
+ return;
+ }
+}
+
+void DetachedBufferWriter::UpdateStatsForWrite(
+ aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
+ if (duration > max_write_time_) {
+ max_write_time_ = duration;
+ max_write_time_bytes_ = written;
+ max_write_time_messages_ = iovec_size;
+ }
+ total_write_time_ += duration;
+ ++total_write_count_;
+ total_write_messages_ += iovec_size;
+ total_write_bytes_ += written;
+}
+
+void DetachedBufferWriter::FlushAtThreshold() {
+ // Flush if we are at the max number of iovs per writev, because there's no
+ // point queueing up any more data in memory. Also flush once we have enough
+ // data queued up.
+ while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
+ encoder_->queue_size() >= IOV_MAX) {
+ Flush();
+ }
}
flatbuffers::Offset<MessageHeader> PackMessage(
@@ -184,10 +265,17 @@
return message_header_builder.Finish();
}
-SpanReader::SpanReader(std::string_view filename)
- : filename_(filename),
- fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
- PCHECK(fd_ != -1) << ": Failed to open " << filename;
+SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
+ static const std::string_view kXz = ".xz";
+ if (filename.substr(filename.size() - kXz.size()) == kXz) {
+#if ENABLE_LZMA
+ decoder_ = std::make_unique<LzmaDecoder>(filename);
+#else
+ LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
+#endif
+ } else {
+ decoder_ = std::make_unique<DummyDecoder>(filename);
+ }
}
absl::Span<const uint8_t> SpanReader::ReadMessage() {
@@ -243,18 +331,13 @@
}
bool SpanReader::ReadBlock() {
- if (end_of_file_) {
- return false;
- }
-
- // Appends 256k. This is enough that the read call is efficient. We don't
- // want to spend too much time reading small chunks because the syscalls for
- // that will be expensive.
+ // This is the amount of data we grab at a time. Doing larger chunks minimizes
+ // syscalls and helps decompressors batch things more efficiently.
constexpr size_t kReadSize = 256 * 1024;
// Strip off any unused data at the front.
if (consumed_data_ != 0) {
- data_.erase(data_.begin(), data_.begin() + consumed_data_);
+ data_.erase_front(consumed_data_);
consumed_data_ = 0;
}
@@ -263,15 +346,14 @@
// This should automatically grow the backing store. It won't shrink if we
// get a small chunk later. This reduces allocations when we want to append
// more data.
- data_.resize(data_.size() + kReadSize);
+ data_.resize(starting_size + kReadSize);
- ssize_t count = read(fd_, &data_[starting_size], kReadSize);
- data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+ const size_t count =
+ decoder_->Read(data_.begin() + starting_size, data_.end());
+ data_.resize(starting_size + count);
if (count == 0) {
- end_of_file_ = true;
return false;
}
- PCHECK(count > 0);
return true;
}
@@ -285,8 +367,10 @@
<< ": Failed to read header from: " << filename;
// And copy the config so we have it forever, removing the size prefix.
- std::vector<uint8_t> data(
- config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
+ ResizeableBuffer data;
+ data.resize(config_data.size() - sizeof(flatbuffers::uoffset_t));
+ memcpy(data.data(), config_data.begin() + sizeof(flatbuffers::uoffset_t),
+ data.size());
return FlatbufferVector<LogFileHeader>(std::move(data));
}
@@ -302,9 +386,11 @@
<< ": Failed to read data from: " << filename;
}
- // And copy the data so we have it forever.
- std::vector<uint8_t> data(data_span.begin() + sizeof(flatbuffers::uoffset_t),
- data_span.end());
+ // And copy the config so we have it forever, removing the size prefix.
+ ResizeableBuffer data;
+ data.resize(data_span.size() - sizeof(flatbuffers::uoffset_t));
+ memcpy(data.data(), data_span.begin() + sizeof(flatbuffers::uoffset_t),
+ data.size());
return FlatbufferVector<MessageHeader>(std::move(data));
}
@@ -319,8 +405,11 @@
<< ": Failed to read header from: " << filename;
// And copy the header data so we have it forever.
- std::vector<uint8_t> header_data_copy(
- header_data.begin() + sizeof(flatbuffers::uoffset_t), header_data.end());
+ ResizeableBuffer header_data_copy;
+ header_data_copy.resize(header_data.size() - sizeof(flatbuffers::uoffset_t));
+ memcpy(header_data_copy.data(),
+ header_data.begin() + sizeof(flatbuffers::uoffset_t),
+ header_data_copy.size());
raw_log_file_header_ =
FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
@@ -337,8 +426,12 @@
return std::nullopt;
}
- FlatbufferVector<MessageHeader> result{std::vector<uint8_t>(
- msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end())};
+ ResizeableBuffer result_buffer;
+ result_buffer.resize(msg_data.size() - sizeof(flatbuffers::uoffset_t));
+ memcpy(result_buffer.data(),
+ msg_data.begin() + sizeof(flatbuffers::uoffset_t),
+ result_buffer.size());
+ FlatbufferVector<MessageHeader> result(std::move(result_buffer));
const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
chrono::nanoseconds(result.message().monotonic_sent_time()));
@@ -378,7 +471,7 @@
if (monotonic_start_time() == monotonic_clock::min_time) {
CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
// We should only be missing the monotonic start time when logging data
- // for remote nodes. We don't have a good way to deteremine the remote
+ // for remote nodes. We don't have a good way to determine the remote
// realtime offset, so it shouldn't be filled out.
// TODO(austin): If we have a good way, feel free to fill it out. It
// probably won't be better than we could do in post though with the same
@@ -564,8 +657,8 @@
chrono::nanoseconds(header.monotonic_sent_time()));
if (VLOG_IS_ON(2)) {
- LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this
- << " " << filename() << " ttq: " << time_to_queue_ << " now "
+ LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
+ << filename() << " ttq: " << time_to_queue_ << " now "
<< newest_timestamp() << " start time "
<< monotonic_start_time() << " " << FlatbufferToJson(&header);
} else if (VLOG_IS_ON(1)) {
@@ -1228,9 +1321,8 @@
(reader->node() != nullptr) && (target_node != nullptr) &&
(reader->node()->has_name() && target_node->has_name());
const bool node_names_identical =
- both_have_name &&
- (reader->node()->name()->string_view() ==
- target_node->name()->string_view());
+ both_have_name && (reader->node()->name()->string_view() ==
+ target_node->name()->string_view());
if (both_null || node_names_identical) {
if (!found_node) {
found_node = true;
@@ -1520,5 +1612,4 @@
return "";
}
-} // namespace logger
-} // namespace aos
+} // namespace aos::logger
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 9534860..a3d16f7 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -15,12 +15,14 @@
#include <vector>
#include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/buffer_encoder.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffers.h"
#include "flatbuffers/flatbuffers.h"
-namespace aos {
-namespace logger {
+namespace aos::logger {
enum class LogType : uint8_t {
// The message originated on this node and should be logged here.
@@ -37,10 +39,11 @@
};
// This class manages efficiently writing a sequence of detached buffers to a
-// file. It queues them up and batches the write operation.
+// file. It encodes them, queues them up, and batches the write operation.
class DetachedBufferWriter {
public:
- DetachedBufferWriter(std::string_view filename);
+ DetachedBufferWriter(std::string_view filename,
+ std::unique_ptr<DetachedBufferEncoder> encoder);
DetachedBufferWriter(DetachedBufferWriter &&other);
DetachedBufferWriter(const DetachedBufferWriter &) = delete;
@@ -51,43 +54,112 @@
std::string_view filename() const { return filename_; }
- // Rewrites a location in a file (relative to the start) to have new data in
- // it. The main use case is updating start times after a log file starts.
- void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
+ // Queues up a finished FlatBufferBuilder to be encoded and written.
+ //
+ // Triggers a flush if there's enough data queued up.
+ //
+ // Steals the detached buffer from it.
+ void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb) {
+ QueueSizedFlatbuffer(fbb->Release());
+ }
+ // May steal the backing storage of buffer, or may leave it alone.
+ void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
+ encoder_->Encode(std::move(buffer));
+ FlushAtThreshold();
+ }
- // TODO(austin): Snappy compress the log file if it ends with .snappy!
+ // Queues up data in span. May copy or may write it to disk immediately.
+ void QueueSpan(absl::Span<const uint8_t> span);
- // Queues up a finished FlatBufferBuilder to be written. Steals the detached
- // buffer from it.
- void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
- // Queues up a detached buffer directly.
- void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
- // Writes a Span. This is not terribly optimized right now.
- void WriteSizedFlatbuffer(absl::Span<const uint8_t> span);
+ // Indicates we got ENOSPC when trying to write. After this returns true, no
+ // further data is written.
+ bool ran_out_of_space() const { return ran_out_of_space_; }
- // Triggers data to be provided to the kernel and written.
- void Flush();
+ // To avoid silently failing to write logfiles, you must call this before
+ // destruction if ran_out_of_space() is true and the situation has been
+ // handled.
+ void acknowledge_out_of_space() {
+ CHECK(ran_out_of_space_);
+ acknowledge_ran_out_of_space_ = true;
+ }
- // Returns the number of bytes written.
- size_t written_size() const { return written_size_; }
+ // Fully flushes and closes the underlying file now. No additional data may be
+ // enqueued after calling this.
+ //
+ // This will be performed in the destructor automatically.
+ //
+ // Note that this may set ran_out_of_space().
+ void Close();
- // Returns the number of bytes written or currently queued.
- size_t total_size() const { return written_size_ + queued_size_; }
+ // Returns the total number of bytes written and currently queued.
+ size_t total_bytes() const { return encoder_->total_bytes(); }
+
+ // The maximum time for a single write call, or 0 if none have been performed.
+ std::chrono::nanoseconds max_write_time() const { return max_write_time_; }
+ // The number of bytes in the longest write call, or -1 if none have been
+ // performed.
+ int max_write_time_bytes() const { return max_write_time_bytes_; }
+ // The number of buffers in the longest write call, or -1 if none have been
+ // performed.
+ int max_write_time_messages() const { return max_write_time_messages_; }
+ // The total time spent in write calls.
+ std::chrono::nanoseconds total_write_time() const {
+ return total_write_time_;
+ }
+ // The total number of writes which have been performed.
+ int total_write_count() const { return total_write_count_; }
+ // The total number of messages which have been written.
+ int total_write_messages() const { return total_write_messages_; }
+ // The total number of bytes which have been written.
+ int total_write_bytes() const { return total_write_bytes_; }
+ void ResetStatistics() {
+ max_write_time_ = std::chrono::nanoseconds::zero();
+ max_write_time_bytes_ = -1;
+ max_write_time_messages_ = -1;
+ total_write_time_ = std::chrono::nanoseconds::zero();
+ total_write_count_ = 0;
+ total_write_messages_ = 0;
+ total_write_bytes_ = 0;
+ }
private:
+ // Performs a single writev call with as much of the data we have queued up as
+ // possible.
+ //
+ // This will normally take all of the data we have queued up, unless an
+ // encoder has spit out a big enough chunk all at once that we can't manage
+ // all of it.
+ void Flush();
+
+ // write_return is what write(2) or writev(2) returned. write_size is the
+ // number of bytes we expected it to write.
+ void HandleWriteReturn(ssize_t write_return, size_t write_size);
+
+ void UpdateStatsForWrite(aos::monotonic_clock::duration duration,
+ ssize_t written, int iovec_size);
+
+ // Flushes data if we've reached the threshold to do that as part of normal
+ // operation.
+ void FlushAtThreshold();
+
std::string filename_;
+ std::unique_ptr<DetachedBufferEncoder> encoder_;
int fd_ = -1;
+ bool ran_out_of_space_ = false;
+ bool acknowledge_ran_out_of_space_ = false;
- // Size of all the data in the queue.
- size_t queued_size_ = 0;
- size_t written_size_ = 0;
-
- // List of buffers to flush.
- std::vector<flatbuffers::DetachedBuffer> queue_;
// List of iovecs to use with writev. This is a member variable to avoid
// churn.
std::vector<struct iovec> iovec_;
+
+ std::chrono::nanoseconds max_write_time_ = std::chrono::nanoseconds::zero();
+ int max_write_time_bytes_ = -1;
+ int max_write_time_messages_ = -1;
+ std::chrono::nanoseconds total_write_time_ = std::chrono::nanoseconds::zero();
+ int total_write_count_ = 0;
+ int total_write_messages_ = 0;
+ int total_write_bytes_ = 0;
};
// Packes a message pointed to by the context into a MessageHeader.
@@ -104,8 +176,6 @@
public:
SpanReader(std::string_view filename);
- ~SpanReader() { close(fd_); }
-
std::string_view filename() const { return filename_; }
// Returns a span with the data for a message from the log file, excluding
@@ -121,50 +191,22 @@
// Allocate the 256k blocks like we do today. But, refcount them with
// shared_ptr pointed to by the messageheader that is returned. This avoids
// the copy. Need to do more benchmarking.
+ // And (Brian): Consider just mmapping the file and handing out refcounted
+ // pointers into that too.
// Reads a chunk of data into data_. Returns false if no data was read.
bool ReadBlock();
const std::string filename_;
- // File descriptor for the log file.
- int fd_ = -1;
+ // File reader and data decoder.
+ std::unique_ptr<DataDecoder> decoder_;
- // Allocator which doesn't zero initialize memory.
- template <typename T>
- struct DefaultInitAllocator {
- typedef T value_type;
-
- template <typename U>
- void construct(U *p) {
- ::new (static_cast<void *>(p)) U;
- }
-
- template <typename U, typename... Args>
- void construct(U *p, Args &&... args) {
- ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
- }
-
- T *allocate(std::size_t n) {
- return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
- }
-
- template <typename U>
- void deallocate(U *p, std::size_t /*n*/) {
- ::operator delete(static_cast<void *>(p));
- }
- };
-
- // Vector to read into. This uses an allocator which doesn't zero
- // initialize the memory.
- std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+ // Vector to read into.
+ ResizeableBuffer data_;
// Amount of data consumed already in data_.
size_t consumed_data_ = 0;
-
- // Cached bit for if we have reached the end of the file. Otherwise we will
- // hammer on the kernel asking for more data each time we send.
- bool end_of_file_ = false;
};
// Class which handles reading the header and messages from the log file. This
@@ -668,7 +710,6 @@
// a single node.
std::string MaybeNodeName(const Node *);
-} // namespace logger
-} // namespace aos
+} // namespace aos::logger
#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
diff --git a/aos/events/logging/logfile_utils_out_of_space_test.sh b/aos/events/logging/logfile_utils_out_of_space_test.sh
new file mode 100755
index 0000000..f412e0d
--- /dev/null
+++ b/aos/events/logging/logfile_utils_out_of_space_test.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+# Tests DetachedBufferWriter's behavior when running out of disk space. We do
+# this by creating a small tmpfs in a new mount namespace, and then running a
+# test binary in that namespace. There's no easy way for a process with user
+# code to run itself in a new mount+user namespace, so we do that outside the
+# test process itself.
+
+# --- begin runfiles.bash initialization v2 ---
+# Copy-pasted from the Bazel Bash runfiles library v2.
+set -uo pipefail; f=bazel_tools/tools/bash/runfiles/runfiles.bash
+source "${RUNFILES_DIR:-/dev/null}/$f" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "${RUNFILES_MANIFEST_FILE:-/dev/null}" | cut -f2- -d' ')" 2>/dev/null || \
+source "$0.runfiles/$f" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "$0.runfiles_manifest" | cut -f2- -d' ')" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "$0.exe.runfiles_manifest" | cut -f2- -d' ')" 2>/dev/null || \
+ { echo>&2 "ERROR: cannot find $f"; exit 1; }; f=; set -e
+# --- end runfiles.bash initialization v2 ---
+
+set -euo pipefail
+
+TMPFS="${TEST_TMPDIR}/small_tmpfs"
+rm -rf "${TMPFS}"
+mkdir "${TMPFS}"
+
+function test {
+ SIZE="$1"
+ echo "Running test with ${SIZE}..." >&2
+ unshare --mount --map-root-user bash <<END
+set -euo pipefail
+
+mount -t tmpfs tmpfs -o size=${SIZE} "${TMPFS}"
+
+exec "$(rlocation org_frc971/aos/events/logging/logfile_utils_out_of_space_test_runner)" -tmpfs "${TMPFS}"
+END
+ echo "Finished test with ${SIZE}" >&2
+}
+
+# Run out of space exactly at the beginning of a block.
+test 81920
+
+# Run out of space 1 byte into a block.
+test 81921
+
+# Run out of space in the middle of a block.
+test 87040
diff --git a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
new file mode 100644
index 0000000..3010a6f
--- /dev/null
+++ b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
@@ -0,0 +1,30 @@
+// See :logfile_utils_out_of_space_test for usage and details.
+
+#include <array>
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/init.h"
+
+DECLARE_int32(flush_size);
+DEFINE_string(tmpfs, "", "tmpfs with the desired size");
+
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+ FLAGS_flush_size = 1;
+ CHECK(!FLAGS_tmpfs.empty()) << ": Must specify a tmpfs location";
+
+ aos::logger::DetachedBufferWriter writer(
+ FLAGS_tmpfs + "/file", std::make_unique<aos::logger::DummyEncoder>());
+ std::array<uint8_t, 10240> data;
+ data.fill(0);
+ for (int i = 0; i < 8; ++i) {
+ writer.QueueSpan(data);
+ CHECK(!writer.ran_out_of_space()) << ": " << i;
+ }
+ writer.QueueSpan(data);
+ CHECK(writer.ran_out_of_space());
+ writer.acknowledge_out_of_space();
+}
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index caccf03..859e24c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -35,44 +35,30 @@
namespace logger {
namespace chrono = std::chrono;
-
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
- std::chrono::milliseconds polling_period)
- : Logger(base_name, event_loop, event_loop->configuration(),
- polling_period) {}
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
- const Configuration *configuration,
- std::chrono::milliseconds polling_period)
- : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
- event_loop, configuration, polling_period) {}
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
- std::chrono::milliseconds polling_period)
- : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
- polling_period) {}
-
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
- const Configuration *configuration,
- std::chrono::milliseconds polling_period)
+Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
+ std::function<bool(const Channel *)> should_log)
: event_loop_(event_loop),
- uuid_(UUID::Random()),
- log_namer_(std::move(log_namer)),
configuration_(configuration),
name_(network::GetHostname()),
- timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
- polling_period_(polling_period),
+ timer_handler_(event_loop_->AddTimer(
+ [this]() { DoLogData(event_loop_->monotonic_now()); })),
server_statistics_fetcher_(
configuration::MultiNode(event_loop_->configuration())
? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
"/aos")
: aos::Fetcher<message_bridge::ServerStatistics>()) {
- VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
- int channel_index = 0;
+ VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
// Find all the nodes which are logging timestamps on our node.
std::set<const Node *> timestamp_logger_nodes;
for (const Channel *channel : *configuration_->channels()) {
- if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
- !channel->has_destination_nodes()) {
+ if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
+ continue;
+ }
+ if (!channel->has_destination_nodes()) {
+ continue;
+ }
+ if (!should_log(channel)) {
continue;
}
for (const Connection *connection : *channel->destination_nodes()) {
@@ -104,44 +90,53 @@
<< event_loop_->node()->name()->string_view()
<< " but can't find channel /aos/remote_timestamps/"
<< node->name()->string_view();
+ if (!should_log(channel)) {
+ continue;
+ }
timestamp_logger_channels.insert(std::make_pair(channel, node));
}
- const size_t our_node_index = configuration::GetNodeIndex(
- configuration_, event_loop_->node());
+ const size_t our_node_index =
+ configuration::GetNodeIndex(configuration_, event_loop_->node());
- for (const Channel *config_channel : *configuration_->channels()) {
+ for (size_t channel_index = 0;
+ channel_index < configuration_->channels()->size(); ++channel_index) {
+ const Channel *const config_channel =
+ configuration_->channels()->Get(channel_index);
// The MakeRawFetcher method needs a channel which is in the event loop
// configuration() object, not the configuration_ object. Go look that up
// from the config.
const Channel *channel = aos::configuration::GetChannel(
event_loop_->configuration(), config_channel->name()->string_view(),
config_channel->type()->string_view(), "", event_loop_->node());
+ if (!should_log(channel)) {
+ continue;
+ }
FetcherStruct fs;
fs.node_index = our_node_index;
+ fs.channel_index = channel_index;
+ fs.channel = channel;
+
const bool is_local =
configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
const bool is_readable =
configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
- const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
- channel, event_loop_->node()) &&
- is_readable;
+ const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
+ channel, event_loop_->node());
+ const bool log_message = is_logged && is_readable;
- const bool log_delivery_times =
- (event_loop_->node() == nullptr)
- ? false
- : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- channel, event_loop_->node(), event_loop_->node());
+ bool log_delivery_times = false;
+ if (event_loop_->node() != nullptr) {
+ log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, event_loop_->node(), event_loop_->node());
+ }
// Now, detect a MessageHeader timestamp logger where we should just log the
// contents to a file directly.
const bool log_contents = timestamp_logger_channels.find(channel) !=
timestamp_logger_channels.end();
- const Node *timestamp_node =
- log_contents ? timestamp_logger_channels.find(channel)->second
- : nullptr;
if (log_message || log_delivery_times || log_contents) {
fs.fetcher = event_loop->MakeRawFetcher(channel);
@@ -150,11 +145,11 @@
if (log_delivery_times) {
VLOG(1) << " Delivery times";
- fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+ fs.wants_timestamp_writer = true;
}
if (log_message) {
VLOG(1) << " Data";
- fs.writer = log_namer_->MakeWriter(channel);
+ fs.wants_writer = true;
if (!is_local) {
fs.log_type = LogType::kLogRemoteMessage;
}
@@ -162,44 +157,60 @@
if (log_contents) {
VLOG(1) << "Timestamp logger channel "
<< configuration::CleanedChannelToString(channel);
- fs.contents_writer =
- log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+ fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+ fs.wants_contents_writer = true;
fs.node_index =
- configuration::GetNodeIndex(configuration_, timestamp_node);
+ configuration::GetNodeIndex(configuration_, fs.timestamp_node);
}
- fs.channel_index = channel_index;
- fs.written = false;
fetchers_.emplace_back(std::move(fs));
}
- ++channel_index;
+ }
+}
+
+Logger::~Logger() {
+ if (log_namer_) {
+ // If we are replaying a log file, or in simulation, we want to force the
+ // last bit of data to be logged. The easiest way to deal with this is to
+ // poll everything as we go to destroy the class, ie, shut down the logger,
+ // and write it to disk.
+ StopLogging(event_loop_->monotonic_now());
+ }
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer) {
+ CHECK(!log_namer_) << ": Already logging";
+ log_namer_ = std::move(log_namer);
+ uuid_ = UUID::Random();
+ VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+
+ // We want to do as much work as possible before the initial Fetch. Time
+ // between that and actually starting to log opens up the possibility of
+ // falling off the end of the queue during that time.
+
+ for (FetcherStruct &f : fetchers_) {
+ if (f.wants_writer) {
+ f.writer = log_namer_->MakeWriter(f.channel);
+ }
+ if (f.wants_timestamp_writer) {
+ f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+ }
+ if (f.wants_contents_writer) {
+ f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
+ f.channel, CHECK_NOTNULL(f.timestamp_node));
+ }
}
+ CHECK(node_state_.empty());
node_state_.resize(configuration::MultiNode(configuration_)
? configuration_->nodes()->size()
: 1u);
for (const Node *node : log_namer_->nodes()) {
- const int node_index =
- configuration::GetNodeIndex(configuration_, node);
+ const int node_index = configuration::GetNodeIndex(configuration_, node);
node_state_[node_index].log_file_header = MakeHeader(node);
}
- // When things start, we want to log the header, then the most recent
- // messages available on each fetcher to capture the previous state, then
- // start polling.
- event_loop_->OnRun([this]() { StartLogging(); });
-}
-
-Logger::~Logger() {
- // If we are replaying a log file, or in simulation, we want to force the last
- // bit of data to be logged. The easiest way to deal with this is to poll
- // everything as we go to destroy the class, ie, shut down the logger, and
- // write it to disk.
- DoLogData();
-}
-
-void Logger::StartLogging() {
// Grab data from each channel right before we declare the log file started
// so we can capture the latest message on each channel. This lets us have
// non periodic messages with configuration that now get logged.
@@ -221,6 +232,25 @@
polling_period_);
}
+std::unique_ptr<LogNamer> Logger::StopLogging(
+ aos::monotonic_clock::time_point end_time) {
+ CHECK(log_namer_) << ": Not logging right now";
+
+ if (end_time != aos::monotonic_clock::min_time) {
+ LogUntil(end_time);
+ }
+ timer_handler_->Disable();
+
+ for (FetcherStruct &f : fetchers_) {
+ f.writer = nullptr;
+ f.timestamp_writer = nullptr;
+ f.contents_writer = nullptr;
+ }
+ node_state_.clear();
+
+ return std::move(log_namer_);
+}
+
void Logger::WriteHeader() {
if (configuration::MultiNode(configuration_)) {
server_statistics_fetcher_.Fetch();
@@ -238,8 +268,7 @@
last_synchronized_time_ = monotonic_start_time;
for (const Node *node : log_namer_->nodes()) {
- const int node_index =
- configuration::GetNodeIndex(configuration_, node);
+ const int node_index = configuration::GetNodeIndex(configuration_, node);
MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
realtime_start_time);
log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
@@ -258,8 +287,7 @@
}
for (const Node *node : log_namer_->nodes()) {
- const int node_index =
- configuration::GetNodeIndex(configuration_, node);
+ const int node_index = configuration::GetNodeIndex(configuration_, node);
if (MaybeUpdateTimestamp(
node, node_index,
server_statistics_fetcher_.context().monotonic_event_time,
@@ -296,7 +324,7 @@
const Node *node, int node_index,
aos::monotonic_clock::time_point monotonic_start_time,
aos::realtime_clock::time_point realtime_start_time) {
- // Bail early if there the start times are already set.
+ // Bail early if the start times are already set.
if (node_state_[node_index].monotonic_start_time !=
monotonic_clock::min_time) {
return false;
@@ -360,6 +388,7 @@
flatbuffers::Offset<flatbuffers::String> name_offset =
fbb.CreateString(name_);
+ CHECK(uuid_ != UUID::Zero());
flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
fbb.CreateString(uuid_.string_view());
@@ -388,8 +417,7 @@
// message. Report back 3x to be extra safe, and because the cost isn't
// huge on the read side.
log_file_header_builder.add_max_out_of_order_duration(
- std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
- .count());
+ std::chrono::nanoseconds(3 * polling_period_).count());
log_file_header_builder.add_monotonic_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
@@ -413,8 +441,7 @@
void Logger::Rotate() {
for (const Node *node : log_namer_->nodes()) {
- const int node_index =
- configuration::GetNodeIndex(configuration_, node);
+ const int node_index = configuration::GetNodeIndex(configuration_, node);
log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
}
}
@@ -527,113 +554,252 @@
last_synchronized_time_ = t;
}
-void Logger::DoLogData() {
- // We want to guarentee that messages aren't out of order by more than
+void Logger::DoLogData(const monotonic_clock::time_point end_time) {
+ // We want to guarantee that messages aren't out of order by more than
// max_out_of_order_duration. To do this, we need sync points. Every write
// cycle should be a sync point.
- const monotonic_clock::time_point monotonic_now =
- event_loop_->monotonic_now();
do {
// Move the sync point up by at most polling_period. This forces one sync
// per iteration, even if it is small.
- LogUntil(
- std::min(last_synchronized_time_ + polling_period_, monotonic_now));
+ LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
+
+ on_logged_period_();
// If we missed cycles, we could be pretty far behind. Spin until we are
// caught up.
- } while (last_synchronized_time_ + polling_period_ < monotonic_now);
+ } while (last_synchronized_time_ + polling_period_ < end_time);
}
-std::vector<std::vector<std::string>> SortParts(
- const std::vector<std::string> &parts) {
+std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
// Start by grouping all parts by UUID, and extracting the part index.
- std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
+ // Datastructure to hold all the info extracted from a set of parts which go
+ // together so we can sort them afterwords.
+ struct UnsortedLogParts {
+ // Start times.
+ aos::monotonic_clock::time_point monotonic_start_time;
+ aos::realtime_clock::time_point realtime_start_time;
+
+ // Node to save.
+ std::string node;
+
+ // Pairs of the filename and the part index for sorting.
+ std::vector<std::pair<std::string, int>> parts;
+ };
+
+ // Map holding the logger_uuid -> second map. The second map holds the
+ // parts_uuid -> list of parts for sorting.
+ std::map<std::string, std::map<std::string, UnsortedLogParts>> parts_list;
// Sort part files without UUIDs and part indexes as well. Extract everything
// useful from the log in the first pass, then sort later.
- struct LogPart {
- std::string filename;
- monotonic_clock::time_point start_time;
- monotonic_clock::time_point first_message_time;
+ struct UnsortedOldParts {
+ // Part information with everything but the list of parts.
+ LogParts parts;
+
+ // Tuple of time for the data and filename needed for sorting after
+ // extracting.
+ std::vector<std::pair<monotonic_clock::time_point, std::string>>
+ unsorted_parts;
};
- std::vector<LogPart> old_parts;
+ // A list of all the old parts which we don't know how to sort using uuids.
+ // There are enough of these in the wild that this is worth supporting.
+ std::vector<UnsortedOldParts> old_parts;
+ // Now extract everything into our datastructures above for sorting.
for (const std::string &part : parts) {
FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
+ const monotonic_clock::time_point monotonic_start_time(
+ chrono::nanoseconds(log_header.message().monotonic_start_time()));
+ const realtime_clock::time_point realtime_start_time(
+ chrono::nanoseconds(log_header.message().realtime_start_time()));
+
+ const std::string_view node =
+ log_header.message().has_node()
+ ? log_header.message().node()->name()->string_view()
+ : "";
+
// Looks like an old log. No UUID, index, and also single node. We have
// little to no multi-node log files in the wild without part UUIDs and
// indexes which we care much about.
if (!log_header.message().has_parts_uuid() &&
!log_header.message().has_parts_index() &&
!log_header.message().has_node()) {
- LogPart log_part;
- log_part.filename = part;
- log_part.start_time = monotonic_clock::time_point(
- chrono::nanoseconds(log_header.message().monotonic_start_time()));
FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
- log_part.first_message_time = monotonic_clock::time_point(
+ const monotonic_clock::time_point first_message_time(
chrono::nanoseconds(first_message.message().monotonic_sent_time()));
- old_parts.emplace_back(std::move(log_part));
+
+ // Find anything with a matching start time. They all go together.
+ auto result = std::find_if(
+ old_parts.begin(), old_parts.end(),
+ [&](const UnsortedOldParts &parts) {
+ return parts.parts.monotonic_start_time == monotonic_start_time &&
+ parts.parts.realtime_start_time == realtime_start_time;
+ });
+
+ if (result == old_parts.end()) {
+ old_parts.emplace_back();
+ old_parts.back().parts.monotonic_start_time = monotonic_start_time;
+ old_parts.back().parts.realtime_start_time = realtime_start_time;
+ old_parts.back().unsorted_parts.emplace_back(
+ std::make_pair(first_message_time, part));
+ } else {
+ result->unsorted_parts.emplace_back(
+ std::make_pair(first_message_time, part));
+ }
continue;
}
+ CHECK(log_header.message().has_logger_uuid());
CHECK(log_header.message().has_parts_uuid());
CHECK(log_header.message().has_parts_index());
+ const std::string logger_uuid = log_header.message().logger_uuid()->str();
const std::string parts_uuid = log_header.message().parts_uuid()->str();
- auto it = parts_list.find(parts_uuid);
- if (it == parts_list.end()) {
- it = parts_list
- .insert(std::make_pair(
- parts_uuid, std::vector<std::pair<std::string, int>>{}))
- .first;
+ int32_t parts_index = log_header.message().parts_index();
+
+ auto log_it = parts_list.find(logger_uuid);
+ if (log_it == parts_list.end()) {
+ log_it = parts_list
+ .insert(std::make_pair(
+ logger_uuid, std::map<std::string, UnsortedLogParts>()))
+ .first;
}
- it->second.emplace_back(
- std::make_pair(part, log_header.message().parts_index()));
+
+ auto it = log_it->second.find(parts_uuid);
+ if (it == log_it->second.end()) {
+ it = log_it->second.insert(std::make_pair(parts_uuid, UnsortedLogParts()))
+ .first;
+ it->second.monotonic_start_time = monotonic_start_time;
+ it->second.realtime_start_time = realtime_start_time;
+ it->second.node = std::string(node);
+ }
+
+ // First part might be min_time. If it is, try to put a better time on it.
+ if (it->second.monotonic_start_time == monotonic_clock::min_time) {
+ it->second.monotonic_start_time = monotonic_start_time;
+ } else if (monotonic_start_time != monotonic_clock::min_time) {
+ CHECK_EQ(it->second.monotonic_start_time, monotonic_start_time);
+ }
+ if (it->second.realtime_start_time == realtime_clock::min_time) {
+ it->second.realtime_start_time = realtime_start_time;
+ } else if (realtime_start_time != realtime_clock::min_time) {
+ CHECK_EQ(it->second.realtime_start_time, realtime_start_time);
+ }
+
+ it->second.parts.emplace_back(std::make_pair(part, parts_index));
}
CHECK_NE(old_parts.empty(), parts_list.empty())
<< ": Can't have a mix of old and new parts.";
+ // Now reformat old_parts to be in the right datastructure to report.
if (!old_parts.empty()) {
- // Confirm they all have the same start time. Old loggers always used the
- // same start time.
- for (const LogPart &p : old_parts) {
- CHECK_EQ(old_parts[0].start_time, p.start_time);
+ std::vector<LogFile> result;
+ for (UnsortedOldParts &p : old_parts) {
+ // Sort by the oldest message in each file.
+ std::sort(
+ p.unsorted_parts.begin(), p.unsorted_parts.end(),
+ [](const std::pair<monotonic_clock::time_point, std::string> &a,
+ const std::pair<monotonic_clock::time_point, std::string> &b) {
+ return a.first < b.first;
+ });
+ LogFile log_file;
+ for (std::pair<monotonic_clock::time_point, std::string> &f :
+ p.unsorted_parts) {
+ p.parts.parts.emplace_back(std::move(f.second));
+ }
+ log_file.parts.emplace_back(std::move(p.parts));
+ result.emplace_back(std::move(log_file));
}
- // Sort by the oldest message in each file.
- std::sort(old_parts.begin(), old_parts.end(),
- [](const LogPart &a, const LogPart &b) {
- return a.first_message_time < b.first_message_time;
- });
- // Produce the final form.
- std::vector<std::string> sorted_old_parts;
- sorted_old_parts.reserve(old_parts.size());
- for (LogPart &p : old_parts) {
- sorted_old_parts.emplace_back(std::move(p.filename));
- }
- return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
+ return result;
}
// Now, sort them and produce the final vector form.
- std::vector<std::vector<std::string>> result;
+ std::vector<LogFile> result;
result.reserve(parts_list.size());
- for (auto &part : parts_list) {
- std::sort(part.second.begin(), part.second.end(),
- [](const std::pair<std::string, int> &a,
- const std::pair<std::string, int> &b) {
- return a.second < b.second;
- });
- std::vector<std::string> result_line;
- result_line.reserve(part.second.size());
- for (std::pair<std::string, int> &p : part.second) {
- result_line.emplace_back(std::move(p.first));
+ for (std::pair<const std::string, std::map<std::string, UnsortedLogParts>>
+ &logs : parts_list) {
+ LogFile new_file;
+ new_file.logger_uuid = logs.first;
+ for (std::pair<const std::string, UnsortedLogParts> &parts : logs.second) {
+ LogParts new_parts;
+ new_parts.monotonic_start_time = parts.second.monotonic_start_time;
+ new_parts.realtime_start_time = parts.second.realtime_start_time;
+ new_parts.logger_uuid = logs.first;
+ new_parts.parts_uuid = parts.first;
+ new_parts.node = std::move(parts.second.node);
+
+ std::sort(parts.second.parts.begin(), parts.second.parts.end(),
+ [](const std::pair<std::string, int> &a,
+ const std::pair<std::string, int> &b) {
+ return a.second < b.second;
+ });
+ new_parts.parts.reserve(parts.second.parts.size());
+ for (std::pair<std::string, int> &p : parts.second.parts) {
+ new_parts.parts.emplace_back(std::move(p.first));
+ }
+ new_file.parts.emplace_back(std::move(new_parts));
}
- result.emplace_back(std::move(result_line));
+ result.emplace_back(std::move(new_file));
+ }
+ return result;
+}
+
+std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
+ stream << "{";
+ if (!file.logger_uuid.empty()) {
+ stream << "\"logger_uuid\": \"" << file.logger_uuid << "\", ";
+ }
+ stream << "\"parts\": [";
+ for (size_t i = 0; i < file.parts.size(); ++i) {
+ if (i != 0u) {
+ stream << ", ";
+ }
+ stream << file.parts[i];
+ }
+ stream << "]}";
+ return stream;
+}
+std::ostream &operator<<(std::ostream &stream, const LogParts &parts) {
+ stream << "{";
+ if (!parts.logger_uuid.empty()) {
+ stream << "\"logger_uuid\": \"" << parts.logger_uuid << "\", ";
+ }
+ if (!parts.parts_uuid.empty()) {
+ stream << "\"parts_uuid\": \"" << parts.parts_uuid << "\", ";
+ }
+ if (!parts.node.empty()) {
+ stream << "\"node\": \"" << parts.node << "\", ";
+ }
+ stream << "\"monotonic_start_time\": " << parts.monotonic_start_time
+ << ", \"realtime_start_time\": " << parts.realtime_start_time << ", [";
+
+ for (size_t i = 0; i < parts.parts.size(); ++i) {
+ if (i != 0u) {
+ stream << ", ";
+ }
+ stream << parts.parts[i];
+ }
+
+ stream << "]}";
+ return stream;
+}
+
+std::vector<std::vector<std::string>> ToLogReaderVector(
+ const std::vector<LogFile> &log_files) {
+ std::vector<std::vector<std::string>> result;
+ for (const LogFile &log_file : log_files) {
+ for (const LogParts &log_parts : log_file.parts) {
+ std::vector<std::string> parts;
+ for (const std::string &part : log_parts.parts) {
+ parts.emplace_back(part);
+ }
+ result.emplace_back(std::move(parts));
+ }
}
return result;
}
@@ -648,6 +814,12 @@
: LogReader(std::vector<std::vector<std::string>>{filenames},
replay_configuration) {}
+// TODO(austin): Make this the base and kill the others. This has much better
+// context for sorting.
+LogReader::LogReader(const std::vector<LogFile> &log_files,
+ const Configuration *replay_configuration)
+ : LogReader(ToLogReaderVector(log_files), replay_configuration) {}
+
LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
const Configuration *replay_configuration)
: filenames_(filenames),
@@ -721,7 +893,8 @@
return configuration::GetNodes(remapped_configuration_);
}
-monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
+monotonic_clock::time_point LogReader::monotonic_start_time(
+ const Node *node) const {
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
@@ -729,7 +902,8 @@
return state->monotonic_start_time();
}
-realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
+realtime_clock::time_point LogReader::realtime_start_time(
+ const Node *node) const {
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
@@ -745,6 +919,7 @@
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
+ remapped_configuration_ = event_loop_factory_->configuration();
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
@@ -1094,7 +1269,6 @@
}
}
-
void LogReader::Register(EventLoop *event_loop) {
State *state =
states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
@@ -1194,8 +1368,7 @@
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "
"not matched? Use --skip_missing_forwarding_entries to "
- "ignore "
- "this.";
+ "ignore this.";
if (update_time) {
// Confirm that the message was sent on the sending node before the
@@ -1319,9 +1492,8 @@
});
for (size_t i = 0; i < states_.size(); ++i) {
- VLOG(1) << MaybeNodeName(
- states_[i]->event_loop()->node())
- << "before " << states_[i]->monotonic_now();
+ VLOG(1) << MaybeNodeName(states_[i]->event_loop()->node()) << "before "
+ << states_[i]->monotonic_now();
}
UpdateOffsets();
@@ -1329,9 +1501,8 @@
<< state->monotonic_now();
for (size_t i = 0; i < states_.size(); ++i) {
- VLOG(1) << MaybeNodeName(
- states_[i]->event_loop()->node())
- << "after " << states_[i]->monotonic_now();
+ VLOG(1) << MaybeNodeName(states_[i]->event_loop()->node()) << "after "
+ << states_[i]->monotonic_now();
}
// TODO(austin): We should be perfect.
@@ -1422,6 +1593,50 @@
<< type;
}
+void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
+ const Node *node,
+ std::string_view add_prefix) {
+ VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
+ const Channel *remapped_channel =
+ configuration::GetChannel(logged_configuration(), name, type, "", node);
+ CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
+ << "\", \"type\": \"" << type << "\"}";
+ VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
+ << "\"}";
+ VLOG(1) << "Remapped "
+ << aos::configuration::StrippedChannelToString(remapped_channel);
+
+ // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
+ // we want it to degrade if the heuristics fail to just work.
+ //
+ // The easiest way to do this is going to be incredibly specific and verbose.
+ // Look up /spray, to /0/spray. Then, prefix the result with /original to get
+ // /original/0/spray. Then, create a map from /original/spray to
+ // /original/0/spray for just the type we were asked for.
+ if (name != remapped_channel->name()->string_view()) {
+ MapT new_map;
+ new_map.match = std::make_unique<ChannelT>();
+ new_map.match->name = absl::StrCat(add_prefix, name);
+ new_map.match->type = type;
+ if (node != nullptr) {
+ new_map.match->source_node = node->name()->str();
+ }
+ new_map.rename = std::make_unique<ChannelT>();
+ new_map.rename->name =
+ absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+ maps_.emplace_back(std::move(new_map));
+ }
+
+ const size_t channel_index =
+ configuration::ChannelIndex(logged_configuration(), remapped_channel);
+ CHECK_EQ(0u, remapped_channels_.count(channel_index))
+ << "Already remapped channel "
+ << configuration::CleanedChannelToString(remapped_channel);
+ remapped_channels_[channel_index] =
+ absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+ MakeRemappedConfig();
+}
+
void LogReader::MakeRemappedConfig() {
for (std::unique_ptr<State> &state : states_) {
if (state) {
@@ -1490,10 +1705,46 @@
&new_config_fbb));
}
// Create the Configuration containing the new channels that we want to add.
- const auto new_name_vector_offsets =
+ const auto new_channel_vector_offsets =
new_config_fbb.CreateVector(channel_offsets);
+
+ // Now create the new maps.
+ std::vector<flatbuffers::Offset<Map>> map_offsets;
+ for (const MapT &map : maps_) {
+ const flatbuffers::Offset<flatbuffers::String> match_name_offset =
+ new_config_fbb.CreateString(map.match->name);
+ const flatbuffers::Offset<flatbuffers::String> match_type_offset =
+ new_config_fbb.CreateString(map.match->type);
+ const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
+ new_config_fbb.CreateString(map.rename->name);
+ flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
+ if (!map.match->source_node.empty()) {
+ match_source_node_offset =
+ new_config_fbb.CreateString(map.match->source_node);
+ }
+ Channel::Builder match_builder(new_config_fbb);
+ match_builder.add_name(match_name_offset);
+ match_builder.add_type(match_type_offset);
+ if (!map.match->source_node.empty()) {
+ match_builder.add_source_node(match_source_node_offset);
+ }
+ const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
+
+ Channel::Builder rename_builder(new_config_fbb);
+ rename_builder.add_name(rename_name_offset);
+ const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
+
+ Map::Builder map_builder(new_config_fbb);
+ map_builder.add_match(match_offset);
+ map_builder.add_rename(rename_offset);
+ map_offsets.emplace_back(map_builder.Finish());
+ }
+
+ const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
+
ConfigurationBuilder new_config_builder(new_config_fbb);
- new_config_builder.add_channels(new_name_vector_offsets);
+ new_config_builder.add_channels(new_channel_vector_offsets);
+ new_config_builder.add_maps(new_maps_offsets);
new_config_fbb.Finish(new_config_builder.Finish());
const FlatbufferDetachedBuffer<Configuration> new_name_config =
new_config_fbb.Release();
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 2905dfa..c651c35 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -17,24 +17,24 @@
// Time this log file started on the monotonic clock in nanoseconds.
// If this isn't known (the log file is being recorded from another node
// where we don't know the time offset), both timestamps will be min_time.
- monotonic_start_time:int64 = -9223372036854775808;
+ monotonic_start_time:int64 = -9223372036854775808 (id: 0);
// Time this log file started on the realtime clock in nanoseconds.
- realtime_start_time:int64 = -9223372036854775808;
+ realtime_start_time:int64 = -9223372036854775808 (id: 1);
// Messages are not written in order to disk. They will be out of order by
// at most this duration (in nanoseconds). If the log reader buffers until
// it finds messages this much newer than it's simulation time, it will never
// find a message out of order.
- max_out_of_order_duration:long;
+ max_out_of_order_duration:long (id: 2);
// The configuration of the channels.
- configuration:aos.Configuration;
+ configuration:aos.Configuration (id: 3);
// Name of the device which this log file is for.
- name:string;
+ name:string (id: 4);
// The current node, if known and running in a multi-node configuration.
- node:Node;
+ node:Node (id: 5);
// All UUIDs are uuid4.
@@ -45,10 +45,10 @@
// ---- basename_timestamps/pi1/aos/remote_timestamps/pi2/aos.logger.MessageHeader.part0.bfbs, etc.
// \-- basename_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs, etc.
- // All log files and parts from a single run of a logger executable will have
+ // All log files and parts from a single logging event will have
// the same uuid. This should be all the files generated on a single node.
// Used to correlate files recorded together.
- logger_uuid:string;
+ logger_uuid:string (id: 6);
// Part files which go together all have headers. When creating a log file
// with multiple parts, the logger should stop writing to part n-1 as soon
@@ -61,41 +61,41 @@
// will be created when the start time is known.
// All the parts which go together have the same uuid.
- parts_uuid:string;
+ parts_uuid:string (id: 7);
// And the parts_index corresponds to which part this is in the sequence. The
// index should start at 0.
- parts_index:int32;
+ parts_index:int32 (id: 8);
}
// Table holding a message.
table MessageHeader {
// Index into the channel datastructure in the log file header. This
// provides the data type.
- channel_index:uint;
+ channel_index:uint (id: 0);
// Time this message was sent on the monotonic clock in nanoseconds on this
// node.
- monotonic_sent_time:long;
+ monotonic_sent_time:long (id: 1);
// Time this message was sent on the realtime clock in nanoseconds on this
// node.
- realtime_sent_time:long;
+ realtime_sent_time:long (id: 2);
// Index into the ipc queue of this message. This should start with 0 and
// always monotonically increment if no messages were ever lost. It will
// wrap at a multiple of the queue size.
- queue_index:uint;
+ queue_index:uint (id: 3);
// TODO(austin): Format? Compressed?
// The nested flatbuffer.
- data:[ubyte];
+ data:[ubyte] (id: 4);
// Time this message was sent on the monotonic clock of the remote node in
// nanoseconds.
- monotonic_remote_time:int64 = -9223372036854775808;
+ monotonic_remote_time:int64 = -9223372036854775808 (id: 5);
// Time this message was sent on the realtime clock of the remote node in
// nanoseconds.
- realtime_remote_time:int64 = -9223372036854775808;
+ realtime_remote_time:int64 = -9223372036854775808 (id: 6);
// Queue index of this message on the remote node.
- remote_queue_index:uint32 = 4294967295;
+ remote_queue_index:uint32 = 4294967295 (id: 7);
}
root_type MessageHeader;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 32c2022..e8856f3 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -32,66 +32,69 @@
class Logger {
public:
// Constructs a logger.
- // base_name/log_namer: Object used to write data to disk in one or more log
- // files. If a base_name is passed in, a LocalLogNamer is wrapped
- // around it.
// event_loop: The event loop used to read the messages.
- // polling_period: The period used to poll the data.
// configuration: When provided, this is the configuration to log, and the
// configuration to use for the channel list to log. If not provided,
// this becomes the configuration from the event loop.
- Logger(std::string_view base_name, EventLoop *event_loop,
- std::chrono::milliseconds polling_period =
- std::chrono::milliseconds(100));
- Logger(std::string_view base_name, EventLoop *event_loop,
- const Configuration *configuration,
- std::chrono::milliseconds polling_period =
- std::chrono::milliseconds(100));
- Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
- std::chrono::milliseconds polling_period =
- std::chrono::milliseconds(100));
- Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
- const Configuration *configuration,
- std::chrono::milliseconds polling_period =
- std::chrono::milliseconds(100));
+ // should_log: When provided, a filter for channels to log. If not provided,
+ // all available channels are logged.
+ Logger(EventLoop *event_loop)
+ : Logger(event_loop, event_loop->configuration()) {}
+ Logger(EventLoop *event_loop, const Configuration *configuration)
+ : Logger(event_loop, configuration,
+ [](const Channel *) { return true; }) {}
+ Logger(EventLoop *event_loop, const Configuration *configuration,
+ std::function<bool(const Channel *)> should_log);
~Logger();
// Overrides the name in the log file header.
void set_name(std::string_view name) { name_ = name; }
+ // Sets the callback to run after each period of data is logged. Defaults to
+ // doing nothing.
+ //
+ // This callback may safely do things like call Rotate().
+ void set_on_logged_period(std::function<void()> on_logged_period) {
+ on_logged_period_ = std::move(on_logged_period);
+ }
+
+ // Sets the period between polling the data. Defaults to 100ms.
+ //
+ // Changing this while a set of files is being written may result in
+ // unreadable files.
+ void set_polling_period(std::chrono::nanoseconds polling_period) {
+ polling_period_ = polling_period;
+ }
+
// Rotates the log file(s), triggering new part files to be written for each
// log file.
void Rotate();
+ // Starts logging to files with the given naming scheme.
+ void StartLogging(std::unique_ptr<LogNamer> log_namer);
+
+ // Stops logging. Ensures any messages through end_time make it into the log.
+ //
+ // If you want to stop ASAP, pass min_time to avoid reading any more messages.
+ //
+ // Returns the LogNamer in case the caller wants to do anything else with it
+ // before destroying it.
+ std::unique_ptr<LogNamer> StopLogging(
+ aos::monotonic_clock::time_point end_time);
+
+ // Returns whether a log is currently being written.
+ bool is_started() const { return static_cast<bool>(log_namer_); }
+
+ // Shortcut to call StartLogging with a LocalLogNamer when event processing
+ // starts.
+ void StartLoggingLocalNamerOnRun(std::string base_name) {
+ event_loop_->OnRun([this, base_name]() {
+ StartLogging(
+ std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
+ });
+ }
+
private:
- void WriteHeader();
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
- const Node *node);
-
- bool MaybeUpdateTimestamp(
- const Node *node, int node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time);
-
- void DoLogData();
-
- void WriteMissingTimestamps();
-
- void StartLogging();
-
- // Fetches from each channel until all the data is logged.
- void LogUntil(monotonic_clock::time_point t);
-
- EventLoop *event_loop_;
- const UUID uuid_;
- std::unique_ptr<LogNamer> log_namer_;
-
- // The configuration to place at the top of the log file.
- const Configuration *configuration_;
-
- // Name to save in the log file. Defaults to hostname.
- std::string name_;
-
// Structure to track both a fetcher, and if the data fetched has been
// written. We may want to delay writing data to disk so that we don't let
// data get too far out of order when written to disk so we can avoid making
@@ -101,41 +104,25 @@
bool written = false;
int channel_index = -1;
+ const Channel *channel = nullptr;
+ const Node *timestamp_node = nullptr;
LogType log_type = LogType::kLogMessage;
+ // We fill out the metadata at construction, but the actual writers have to
+ // be updated each time we start logging. To avoid duplicating the complex
+ // logic determining whether each writer should be initialized, we just
+ // stash the answer in separate member variables.
+ bool wants_writer = false;
DetachedBufferWriter *writer = nullptr;
+ bool wants_timestamp_writer = false;
DetachedBufferWriter *timestamp_writer = nullptr;
+ bool wants_contents_writer = false;
DetachedBufferWriter *contents_writer = nullptr;
- const Node *writer_node = nullptr;
- const Node *timestamp_node = nullptr;
+
int node_index = 0;
};
- std::vector<FetcherStruct> fetchers_;
- TimerHandler *timer_handler_;
-
- // Period to poll the channels.
- const std::chrono::milliseconds polling_period_;
-
- // Last time that data was written for all channels to disk.
- monotonic_clock::time_point last_synchronized_time_;
-
- monotonic_clock::time_point monotonic_start_time_;
- realtime_clock::time_point realtime_start_time_;
-
- // Max size that the header has consumed. This much extra data will be
- // reserved in the builder to avoid reallocating.
- size_t max_header_size_ = 0;
-
- // Fetcher for all the statistics from all the nodes.
- aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
-
- // Sets the start time for a specific node.
- void SetStartTime(size_t node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time);
-
struct NodeState {
aos::monotonic_clock::time_point monotonic_start_time =
aos::monotonic_clock::min_time;
@@ -145,12 +132,96 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
};
+
+ void WriteHeader();
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ const Node *node);
+
+ bool MaybeUpdateTimestamp(
+ const Node *node, int node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time);
+
+ void DoLogData(const monotonic_clock::time_point end_time);
+
+ void WriteMissingTimestamps();
+
+ // Fetches from each channel until all the data is logged.
+ void LogUntil(monotonic_clock::time_point t);
+
+ // Sets the start time for a specific node.
+ void SetStartTime(size_t node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time);
+
+ EventLoop *event_loop_;
+ UUID uuid_ = UUID::Zero();
+ std::unique_ptr<LogNamer> log_namer_;
+
+ // The configuration to place at the top of the log file.
+ const Configuration *const configuration_;
+
+ // Name to save in the log file. Defaults to hostname.
+ std::string name_;
+
+ std::function<void()> on_logged_period_ = []() {};
+
+ std::vector<FetcherStruct> fetchers_;
+ TimerHandler *timer_handler_;
+
+ // Period to poll the channels.
+ std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
+
+ // Last time that data was written for all channels to disk.
+ monotonic_clock::time_point last_synchronized_time_;
+
+ // Max size that the header has consumed. This much extra data will be
+ // reserved in the builder to avoid reallocating.
+ size_t max_header_size_ = 0;
+
+ // Fetcher for all the statistics from all the nodes.
+ aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
std::vector<NodeState> node_state_;
};
+// Datastructure to hold ordered parts.
+struct LogParts {
+ // Monotonic and realtime start times for this set of log files. For log
+ // files which started out unknown and then became known, this is the known
+ // start time.
+ aos::monotonic_clock::time_point monotonic_start_time;
+ aos::realtime_clock::time_point realtime_start_time;
+
+ // UUIDs if available.
+ std::string logger_uuid;
+ std::string parts_uuid;
+
+ // The node this represents, or empty if we are in a single node world.
+ std::string node;
+
+ // Pre-sorted list of parts.
+ std::vector<std::string> parts;
+};
+
+// Datastructure to hold parts from the same run of the logger which have no
+// ordering constraints relative to each other.
+struct LogFile {
+ // The UUID tying them all together (if available)
+ std::string logger_uuid;
+
+ // All the parts, unsorted.
+ std::vector<LogParts> parts;
+};
+
+std::ostream &operator<<(std::ostream &stream, const LogFile &file);
+std::ostream &operator<<(std::ostream &stream, const LogParts &parts);
+
// Takes a bunch of parts and sorts them based on part_uuid and part_index.
-std::vector<std::vector<std::string>> SortParts(
- const std::vector<std::string> &parts);
+std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
+
+std::vector<std::vector<std::string>> ToLogReaderVector(
+ const std::vector<LogFile> &log_files);
// We end up with one of the following 3 log file types.
//
@@ -205,6 +276,8 @@
const Configuration *replay_configuration = nullptr);
LogReader(const std::vector<std::vector<std::string>> &filenames,
const Configuration *replay_configuration = nullptr);
+ LogReader(const std::vector<LogFile> &log_files,
+ const Configuration *replay_configuration = nullptr);
~LogReader();
// Registers all the callbacks to send the log file data out on an event loop
@@ -233,7 +306,11 @@
// SimulatedEventLoopFactory, and then get the configuration from there for
// everything else.
const Configuration *logged_configuration() const;
- // Returns the configuration being used for replay.
+ // Returns the configuration being used for replay from the log file.
+ // Note that this may be different from the configuration actually used for
+ // handling events. You should generally only use this to create a
+ // SimulatedEventLoopFactory, and then get the configuration from there for
+ // everything else.
// The pointer is invalidated whenever RemapLoggedChannel is called.
const Configuration *configuration() const;
@@ -243,8 +320,10 @@
std::vector<const Node *> Nodes() const;
// Returns the starting timestamp for the log file.
- monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
- realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
+ monotonic_clock::time_point monotonic_start_time(
+ const Node *node = nullptr) const;
+ realtime_clock::time_point realtime_start_time(
+ const Node *node = nullptr) const;
// Causes the logger to publish the provided channel on a different name so
// that replayed applications can publish on the proper channel name without
@@ -258,11 +337,28 @@
RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
}
+ // Remaps the provided channel, though this respects node mappings, and
+ // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
+ // /original/aos -> /original/pi1/aos on the same node after renaming, just
+ // like you would hope.
+ //
+ // TODO(austin): If you have 2 nodes remapping something to the same channel,
+ // this doesn't handle that. No use cases exist yet for that, so it isn't
+ // being done yet.
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ const Node *node,
+ std::string_view add_prefix = "/original");
template <typename T>
- bool HasChannel(std::string_view name) {
+ void RemapLoggedChannel(std::string_view name, const Node *node,
+ std::string_view add_prefix = "/original") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
+ }
+
+ template <typename T>
+ bool HasChannel(std::string_view name, const Node *node = nullptr) {
return configuration::GetChannel(log_file_header()->configuration(), name,
T::GetFullyQualifiedName(), "",
- nullptr) != nullptr;
+ node) != nullptr;
}
SimulatedEventLoopFactory *event_loop_factory() {
@@ -524,7 +620,7 @@
// constrain our time function.
//
// One is simple. The distributed clock is the average of all the clocks.
- // (ta + tb + tc + td) / num_nodex = t_distributed
+ // (ta + tb + tc + td) / num_nodes = t_distributed
//
// The second is a bit more complicated. Our basic time conversion function
// is:
@@ -540,7 +636,7 @@
// per-node times at two set distributed clock times, we will be able to
// recreate the linear function (we know it is linear). We can do a similar
// thing by breaking our equation up into:
- //
+ //
// [1/3 1/3 1/3 ] [ta] [t_distributed]
// [ 1 -1-m1 0 ] [tb] = [oab]
// [ 1 0 -1-m2 ] [tc] [oac]
@@ -583,6 +679,7 @@
// logged_configuration(), and the string key will be the name of the channel
// to send on instead of the logged channel name.
std::map<size_t, std::string> remapped_channels_;
+ std::vector<MapT> maps_;
// Number of nodes which still have data to send. This is used to figure out
// when to exit.
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index f95fa17..b1dce96 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -1,8 +1,8 @@
#include "aos/configuration.h"
#include "aos/events/logging/logger.h"
#include "aos/events/shm_event_loop.h"
-#include "aos/logging/log_namer.h"
#include "aos/init.h"
+#include "aos/logging/log_namer.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -33,8 +33,9 @@
event_loop.node());
}
- aos::logger::Logger logger(std::move(log_namer), &event_loop,
- std::chrono::milliseconds(100));
+ aos::logger::Logger logger(&event_loop);
+ event_loop.OnRun(
+ [&log_namer, &logger]() { logger.StartLogging(std::move(log_namer)); });
event_loop.Run();
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 47fe9e2..2095529 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,6 +1,7 @@
#include "aos/events/logging/logger.h"
#include "aos/events/event_loop.h"
+#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
@@ -14,6 +15,7 @@
namespace testing {
namespace chrono = std::chrono;
+using aos::testing::MessageCounter;
class LoggerTest : public ::testing::Test {
public:
@@ -39,6 +41,8 @@
Pong pong_;
};
+using LoggerDeathTest = LoggerTest;
+
// Tests that we can startup at all. This confirms that the channels are all in
// the config.
TEST_F(LoggerTest, Starts) {
@@ -56,8 +60,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(base_name, logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ Logger logger(logger_event_loop.get());
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
@@ -99,6 +104,130 @@
EXPECT_EQ(ping_count, 2010);
}
+// Tests calling StartLogging twice.
+TEST_F(LoggerDeathTest, ExtraStart) {
+ const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+ const ::std::string base_name1 = tmpdir + "/logfile1";
+ const ::std::string logfile1 = base_name1 + ".part0.bfbs";
+ const ::std::string base_name2 = tmpdir + "/logfile2";
+ const ::std::string logfile2 = base_name2 + ".part0.bfbs";
+ unlink(logfile1.c_str());
+ unlink(logfile2.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile1 << " then " << logfile2;
+
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger");
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ Logger logger(logger_event_loop.get());
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger_event_loop->OnRun(
+ [base_name1, base_name2, &logger_event_loop, &logger]() {
+ logger.StartLogging(std::make_unique<LocalLogNamer>(
+ base_name1, logger_event_loop->node()));
+ EXPECT_DEATH(logger.StartLogging(std::make_unique<LocalLogNamer>(
+ base_name2, logger_event_loop->node())),
+ "Already logging");
+ });
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+}
+
+// Tests calling StopLogging twice.
+TEST_F(LoggerDeathTest, ExtraStop) {
+ const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile = base_name + ".part0.bfbs";
+ // Remove it.
+ unlink(logfile.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile;
+
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger");
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ Logger logger(logger_event_loop.get());
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger_event_loop->OnRun([base_name, &logger_event_loop, &logger]() {
+ logger.StartLogging(std::make_unique<LocalLogNamer>(
+ base_name, logger_event_loop->node()));
+ logger.StopLogging(aos::monotonic_clock::min_time);
+ EXPECT_DEATH(logger.StopLogging(aos::monotonic_clock::min_time),
+ "Not logging right now");
+ });
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+}
+
+// Tests that we can startup twice.
+TEST_F(LoggerTest, StartsTwice) {
+ const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+ const ::std::string base_name1 = tmpdir + "/logfile1";
+ const ::std::string logfile1 = base_name1 + ".part0.bfbs";
+ const ::std::string base_name2 = tmpdir + "/logfile2";
+ const ::std::string logfile2 = base_name2 + ".part0.bfbs";
+ unlink(logfile1.c_str());
+ unlink(logfile2.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile1 << " then " << logfile2;
+
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger");
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ Logger logger(logger_event_loop.get());
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLogging(
+ std::make_unique<LocalLogNamer>(base_name1, logger_event_loop->node()));
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ logger.StopLogging(logger_event_loop->monotonic_now());
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ logger.StartLogging(
+ std::make_unique<LocalLogNamer>(base_name2, logger_event_loop->node()));
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ }
+
+ for (const auto &logfile :
+ {std::make_tuple(logfile1, 10), std::make_tuple(logfile2, 2010)}) {
+ SCOPED_TRACE(std::get<0>(logfile));
+ LogReader reader(std::get<0>(logfile));
+ reader.Register();
+
+ EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(nullptr));
+
+ std::unique_ptr<EventLoop> test_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+ int ping_count = std::get<1>(logfile);
+ int pong_count = std::get<1>(logfile);
+
+ // Confirm that the ping and pong counts both match, and the value also
+ // matches.
+ test_event_loop->MakeWatcher("/test",
+ [&ping_count](const examples::Ping &ping) {
+ EXPECT_EQ(ping.value(), ping_count + 1);
+ ++ping_count;
+ });
+ test_event_loop->MakeWatcher(
+ "/test", [&pong_count, &ping_count](const examples::Pong &pong) {
+ EXPECT_EQ(pong.value(), pong_count + 1);
+ ++pong_count;
+ EXPECT_EQ(ping_count, pong_count);
+ });
+
+ reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+ EXPECT_EQ(ping_count, std::get<1>(logfile) + 1000);
+ }
+}
+
// Tests that we can read and write rotated log files.
TEST_F(LoggerTest, RotatedLogFile) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
@@ -117,9 +246,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(
- std::make_unique<LocalLogNamer>(base_name, logger_event_loop->node()),
- logger_event_loop.get(), std::chrono::milliseconds(100));
+ Logger logger(logger_event_loop.get());
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
event_loop_factory_.RunFor(chrono::milliseconds(10000));
logger.Rotate();
event_loop_factory_.RunFor(chrono::milliseconds(10000));
@@ -216,8 +345,9 @@
chrono::microseconds(50));
});
- Logger logger(base_name, logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ Logger logger(logger_event_loop.get());
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
event_loop_factory_.RunFor(chrono::milliseconds(1000));
}
@@ -287,11 +417,13 @@
}
void StartLogger(LoggerState *logger) {
- logger->logger = std::make_unique<Logger>(
- std::make_unique<MultiNodeLogNamer>(logfile_base_,
- logger->event_loop->configuration(),
- logger->event_loop->node()),
- logger->event_loop.get(), chrono::milliseconds(100));
+ logger->logger = std::make_unique<Logger>(logger->event_loop.get());
+ logger->logger->set_polling_period(std::chrono::milliseconds(100));
+ logger->event_loop->OnRun([this, logger]() {
+ logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
+ logfile_base_, logger->event_loop->configuration(),
+ logger->event_loop->node()));
+ });
}
// Config and factory.
@@ -973,16 +1105,116 @@
event_loop_factory_.RunFor(chrono::milliseconds(2000));
}
- const std::vector<std::vector<std::string>> sorted_parts =
- SortParts(logfiles_);
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+
+ EXPECT_EQ(sorted_parts.size(), 2u);
+
+ // Count up the number of UUIDs and make sure they are what we expect as a
+ // sanity check.
+ std::set<std::string> logger_uuids;
+ std::set<std::string> parts_uuids;
+ std::set<std::string> both_uuids;
+
+ size_t missing_rt_count = 0;
+
+ for (const LogFile &log_file : sorted_parts) {
+ EXPECT_FALSE(log_file.logger_uuid.empty());
+ logger_uuids.insert(log_file.logger_uuid);
+ both_uuids.insert(log_file.logger_uuid);
+
+ for (const LogParts &part : log_file.parts) {
+ EXPECT_NE(part.monotonic_start_time, aos::monotonic_clock::min_time)
+ << ": " << part;
+ missing_rt_count +=
+ part.realtime_start_time == aos::realtime_clock::min_time;
+
+ EXPECT_TRUE(logger_uuids.find(part.logger_uuid) != logger_uuids.end());
+ EXPECT_NE(part.node, "");
+ parts_uuids.insert(part.parts_uuid);
+ both_uuids.insert(part.parts_uuid);
+ }
+ }
+
+ // We won't have RT timestamps for 5 log files. We don't log the RT start
+ // time on remote nodes because we don't know it and would be guessing. And
+ // the log reader can actually do a better job.
+ EXPECT_EQ(missing_rt_count, 5u);
+
+ EXPECT_EQ(logger_uuids.size(), 2u);
+ EXPECT_EQ(parts_uuids.size(), ToLogReaderVector(sorted_parts).size());
+ EXPECT_EQ(logger_uuids.size() + parts_uuids.size(), both_uuids.size());
// Test that each list of parts is in order. Don't worry about the ordering
// between part file lists though.
// (inner vectors all need to be in order, but outer one doesn't matter).
- EXPECT_THAT(sorted_parts,
+ EXPECT_THAT(ToLogReaderVector(sorted_parts),
::testing::UnorderedElementsAreArray(structured_logfiles_));
}
+// Tests that if we remap a remapped channel, it shows up correctly.
+TEST_F(MultinodeLoggerTest, RemapLoggedChannel) {
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(structured_logfiles_);
+
+ // Remap just on pi1.
+ reader.RemapLoggedChannel<aos::timing::Report>(
+ "/aos", configuration::GetNode(reader.configuration(), "pi1"));
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+ // else should have moved.
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ pi1_event_loop->SkipTimingReport();
+ std::unique_ptr<EventLoop> full_pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ full_pi1_event_loop->SkipTimingReport();
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+ pi2_event_loop->SkipTimingReport();
+
+ MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
+ "/aos");
+ MessageCounter<aos::timing::Report> full_pi1_timing_report(
+ full_pi1_event_loop.get(), "/pi1/aos");
+ MessageCounter<aos::timing::Report> pi1_original_timing_report(
+ pi1_event_loop.get(), "/original/aos");
+ MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
+ full_pi1_event_loop.get(), "/original/pi1/aos");
+ MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
+ "/aos");
+
+ log_reader_factory.Run();
+
+ EXPECT_EQ(pi1_timing_report.count(), 0u);
+ EXPECT_EQ(full_pi1_timing_report.count(), 0u);
+ EXPECT_NE(pi1_original_timing_report.count(), 0u);
+ EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
+ EXPECT_NE(pi2_timing_report.count(), 0u);
+
+ reader.Deregister();
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
new file mode 100644
index 0000000..a8cafda
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.cc
@@ -0,0 +1,194 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+namespace {
+
+// Returns if `status` is not an error code, otherwise logs the appropriate
+// error message and crashes.
+void CheckLzmaCodeIsOk(lzma_ret status) {
+ switch (status) {
+ case LZMA_OK:
+ case LZMA_STREAM_END:
+ return;
+ case LZMA_MEM_ERROR:
+ LOG(FATAL) << "Memory allocation failed:" << status;
+ case LZMA_OPTIONS_ERROR:
+ LOG(FATAL) << "The given compression preset or decompression options are "
+ "not supported: "
+ << status;
+ case LZMA_UNSUPPORTED_CHECK:
+ LOG(FATAL) << "The given check type is not supported: " << status;
+ case LZMA_PROG_ERROR:
+ LOG(FATAL) << "One or more of the parameters have values that will never "
+ "be valid: "
+ << status;
+ case LZMA_MEMLIMIT_ERROR:
+ LOG(FATAL) << "Decoder needs more memory than allowed by the specified "
+ "memory usage limit: "
+ << status;
+ case LZMA_FORMAT_ERROR:
+ LOG(FATAL) << "File format not recognized: " << status;
+ case LZMA_DATA_ERROR:
+ LOG(FATAL) << "Compressed file is corrupt: " << status;
+ case LZMA_BUF_ERROR:
+ LOG(FATAL) << "Compressed file is truncated or corrupt: " << status;
+ default:
+ LOG(FATAL) << "Unexpected return value: " << status;
+ }
+}
+
+} // namespace
+
+LzmaEncoder::LzmaEncoder(const uint32_t compression_preset)
+ : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
+ CHECK_GE(compression_preset_, 0u)
+ << ": Compression preset must be in the range [0, 9].";
+ CHECK_LE(compression_preset_, 9u)
+ << ": Compression preset must be in the range [0, 9].";
+
+ lzma_ret status =
+ lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
+ CheckLzmaCodeIsOk(status);
+ stream_.avail_out = 0;
+ VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+}
+
+LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
+
+void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+ CHECK(in.data()) << ": Encode called with nullptr.";
+
+ stream_.next_in = in.data();
+ stream_.avail_in = in.size();
+
+ RunLzmaCode(LZMA_RUN);
+}
+
+void LzmaEncoder::Finish() { RunLzmaCode(LZMA_FINISH); }
+
+void LzmaEncoder::Clear(const int n) {
+ CHECK_GE(n, 0);
+ CHECK_LE(static_cast<size_t>(n), queue_size());
+ queue_.erase(queue_.begin(), queue_.begin() + n);
+ if (queue_.empty()) {
+ stream_.next_out = nullptr;
+ stream_.avail_out = 0;
+ }
+}
+
+std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
+ std::vector<absl::Span<const uint8_t>> queue;
+ if (queue_.empty()) {
+ return queue;
+ }
+ for (size_t i = 0; i < queue_.size() - 1; ++i) {
+ queue.emplace_back(
+ absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
+ }
+ // For the last buffer in the queue, we must account for the possibility that
+ // the buffer isn't full yet.
+ queue.emplace_back(absl::MakeConstSpan(
+ queue_.back().data(), queue_.back().size() - stream_.avail_out));
+ return queue;
+}
+
+size_t LzmaEncoder::queued_bytes() const {
+ size_t bytes = queue_size() * kEncodedBufferSizeBytes;
+ // Subtract the bytes that the encoder hasn't filled yet.
+ bytes -= stream_.avail_out;
+ return bytes;
+}
+
+void LzmaEncoder::RunLzmaCode(lzma_action action) {
+ CHECK(!finished_);
+
+ // This is to keep track of how many bytes resulted from encoding this input
+ // buffer.
+ size_t last_avail_out = stream_.avail_out;
+
+ while (stream_.avail_in > 0 || action == LZMA_FINISH) {
+ // If output buffer is full, create a new one, queue it up, and resume
+ // encoding. This could happen in the first call to Encode after
+ // construction or a Reset, or when an input buffer is large enough to fill
+ // more than one output buffer.
+ if (stream_.avail_out == 0) {
+ queue_.emplace_back();
+ queue_.back().resize(kEncodedBufferSizeBytes);
+ stream_.next_out = queue_.back().data();
+ stream_.avail_out = kEncodedBufferSizeBytes;
+ // Update the byte count.
+ total_bytes_ += last_avail_out;
+ last_avail_out = stream_.avail_out;
+ }
+
+ // Encode the data.
+ lzma_ret status = lzma_code(&stream_, action);
+ CheckLzmaCodeIsOk(status);
+ if (action == LZMA_FINISH) {
+ if (status == LZMA_STREAM_END) {
+ // This is returned when lzma_code is all done.
+ finished_ = true;
+ break;
+ }
+ } else {
+ CHECK(status != LZMA_STREAM_END);
+ }
+ VLOG(2) << "LzmaEncoder: Encoded chunk.";
+ }
+
+ // Update the number of resulting encoded bytes.
+ total_bytes_ += last_avail_out - stream_.avail_out;
+}
+
+LzmaDecoder::LzmaDecoder(std::string_view filename)
+ : dummy_decoder_(filename), stream_(LZMA_STREAM_INIT) {
+ compressed_data_.resize(kBufSize);
+
+ lzma_ret status =
+ lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
+ CheckLzmaCodeIsOk(status);
+ stream_.avail_out = 0;
+ VLOG(2) << "LzmaDecoder: Initialization succeeded.";
+}
+
+LzmaDecoder::~LzmaDecoder() { lzma_end(&stream_); }
+
+size_t LzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+ if (finished_) {
+ return 0;
+ }
+
+ // Write into the given range.
+ stream_.next_out = begin;
+ stream_.avail_out = end - begin;
+ // Keep decompressing until we run out of buffer space.
+ while (stream_.avail_out > 0) {
+ if (action_ == LZMA_RUN && stream_.avail_in == 0) {
+ // Read more bytes from the file if we're all out.
+ const size_t count =
+ dummy_decoder_.Read(compressed_data_.begin(), compressed_data_.end());
+ if (count == 0) {
+ // No more data to read in the file, begin the finishing operation.
+ action_ = LZMA_FINISH;
+ } else {
+ stream_.next_in = compressed_data_.data();
+ stream_.avail_in = count;
+ }
+ }
+ // Decompress the data.
+ const lzma_ret status = lzma_code(&stream_, action_);
+ // Return if we're done.
+ if (status == LZMA_STREAM_END) {
+ CHECK_EQ(action_, LZMA_FINISH)
+ << ": Got LZMA_STREAM_END when action wasn't LZMA_FINISH";
+ finished_ = true;
+ return (end - begin) - stream_.avail_out;
+ }
+ CheckLzmaCodeIsOk(status);
+ }
+ return end - begin;
+}
+
+} // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
new file mode 100644
index 0000000..d7080a9
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.h
@@ -0,0 +1,72 @@
+#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/logging/buffer_encoder.h"
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Encodes buffers using liblzma.
+class LzmaEncoder final : public DetachedBufferEncoder {
+ public:
+ // Initializes the LZMA stream and encoder.
+ explicit LzmaEncoder(uint32_t compression_preset);
+ // Gracefully shuts down the encoder.
+ ~LzmaEncoder() final;
+
+ void Encode(flatbuffers::DetachedBuffer &&in) final;
+ void Finish() final;
+ void Clear(int n) final;
+ std::vector<absl::Span<const uint8_t>> queue() const final;
+ size_t queued_bytes() const final;
+ size_t total_bytes() const final { return total_bytes_; }
+ size_t queue_size() const final { return queue_.size(); }
+
+ private:
+ static constexpr size_t kEncodedBufferSizeBytes{1024};
+
+ void RunLzmaCode(lzma_action action);
+
+ lzma_stream stream_;
+ uint32_t compression_preset_;
+ std::vector<ResizeableBuffer> queue_;
+ bool finished_ = false;
+ // Total bytes that resulted from encoding raw data since the last call to
+ // Reset.
+ size_t total_bytes_ = 0;
+};
+
+// Decompresses data with liblzma.
+class LzmaDecoder final : public DataDecoder {
+ public:
+ explicit LzmaDecoder(std::string_view filename);
+ ~LzmaDecoder();
+
+ size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+ // Size of temporary buffer to use.
+ static constexpr size_t kBufSize{256 * 1024};
+
+ // Temporary buffer for storing compressed data.
+ ResizeableBuffer compressed_data_;
+ // Used for reading data from the file.
+ DummyDecoder dummy_decoder_;
+ // Stream for decompression.
+ lzma_stream stream_;
+ // The current action. This is LZMA_RUN until we've run out of data to read
+ // from the file.
+ lzma_action action_ = LZMA_RUN;
+ // Flag that represents whether or not all the data from the file has been
+ // successfully decoded.
+ bool finished_ = false;
+};
+
+} // namespace aos::logger
+
+#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
new file mode 100644
index 0000000..2d619c4
--- /dev/null
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -0,0 +1,18 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+INSTANTIATE_TEST_CASE_P(
+ Lzma, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ return std::make_unique<LzmaEncoder>(2);
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<LzmaDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
+} // namespace aos::logger::testing
diff --git a/aos/events/logging/uuid.cc b/aos/events/logging/uuid.cc
index 9298c8b..376fa95 100644
--- a/aos/events/logging/uuid.cc
+++ b/aos/events/logging/uuid.cc
@@ -57,4 +57,10 @@
return result;
}
+UUID UUID::Zero() {
+ UUID result;
+ result.data_.fill(0);
+ return result;
+}
+
} // namespace aos
diff --git a/aos/events/logging/uuid.h b/aos/events/logging/uuid.h
index b81b811..0387a4b 100644
--- a/aos/events/logging/uuid.h
+++ b/aos/events/logging/uuid.h
@@ -13,6 +13,8 @@
// Returns a randomly generated UUID. This is known as a UUID4.
static UUID Random();
+ static UUID Zero();
+
std::string_view string_view() const {
return std::string_view(data_.data(), data_.size());
}
diff --git a/aos/events/logging/uuid_test.cc b/aos/events/logging/uuid_test.cc
index d0320de..4ea351c 100644
--- a/aos/events/logging/uuid_test.cc
+++ b/aos/events/logging/uuid_test.cc
@@ -12,6 +12,8 @@
LOG(INFO) << UUID::Random().string_view();
EXPECT_NE(UUID::Random(), UUID::Random());
+ EXPECT_NE(UUID::Random(), UUID::Zero());
+ EXPECT_EQ(UUID::Zero(), UUID::Zero());
}
} // namespace testing
diff --git a/aos/events/message_counter.h b/aos/events/message_counter.h
new file mode 100644
index 0000000..418c1eb
--- /dev/null
+++ b/aos/events/message_counter.h
@@ -0,0 +1,28 @@
+#ifndef AOS_EVENTS_MESSAGE_COUNTER_H_
+#define AOS_EVENTS_MESSAGE_COUNTER_H_
+
+#include "aos/events/event_loop.h"
+
+namespace aos {
+namespace testing {
+
+// Simple class to count messages on a channel easily. This only counts
+// messages published while running.
+template <typename T>
+class MessageCounter {
+ public:
+ MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
+ event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
+ }
+
+ // Returns the number of messages seen so far.
+ size_t count() const { return count_; }
+
+ private:
+ size_t count_ = 0;
+};
+
+} // namespace testing
+} // namespace aos
+
+#endif // AOS_EVENTS_MESSAGE_COUNTER_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 7de0540..81937ca 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -4,6 +4,7 @@
#include "aos/events/event_loop_param_test.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/test_message_generated.h"
@@ -14,6 +15,11 @@
namespace aos {
namespace testing {
+namespace {
+
+std::string ConfigPrefix() { return "aos/"; }
+
+} // namespace
namespace chrono = ::std::chrono;
@@ -254,25 +260,12 @@
0.0);
}
-template <typename T>
-class MessageCounter {
- public:
- MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
- event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
- }
-
- size_t count() const { return count_; }
-
- private:
- size_t count_ = 0;
-};
-
// Tests that ping and pong work when on 2 different nodes, and the message
// gateway messages are sent out as expected.
TEST(SimulatedEventLoopTest, MultinodePingPong) {
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(
- "aos/events/multinode_pingpong_config.json");
+ aos::configuration::ReadConfig(ConfigPrefix() +
+ "events/multinode_pingpong_config.json");
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -573,8 +566,8 @@
// ServerStatistics correctly.
TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(
- "aos/events/multinode_pingpong_config.json");
+ aos::configuration::ReadConfig(ConfigPrefix() +
+ "events/multinode_pingpong_config.json");
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -673,8 +666,8 @@
// Test that disabling statistics actually disables them.
TEST(SimulatedEventLoopTest, MultinodeWithoutStatistics) {
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
- aos::configuration::ReadConfig(
- "aos/events/multinode_pingpong_config.json");
+ aos::configuration::ReadConfig(ConfigPrefix() +
+ "events/multinode_pingpong_config.json");
const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
@@ -774,5 +767,104 @@
EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
}
+// Tests that the time offset having a slope doesn't break the world.
+// SimulatedMessageBridge has enough self consistency CHECK statements to
+// confirm, and we can can also check a message in each direction to make sure
+// it gets delivered as expected.
+TEST(SimulatedEventLoopTest, MultinodePingPongWithOffsetAndSlope) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(ConfigPrefix() +
+ "events/multinode_pingpong_config.json");
+ const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+ const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+ NodeEventLoopFactory *pi2_factory =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
+
+ // Move the pi far into the future so the slope is significant. And set it to
+ // something reasonable.
+ constexpr chrono::milliseconds kOffset{150100};
+ pi2_factory->SetDistributedOffset(kOffset, 1.0001);
+
+ std::unique_ptr<EventLoop> ping_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ Ping ping(ping_event_loop.get());
+
+ std::unique_ptr<EventLoop> pong_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ Pong pong(pong_event_loop.get());
+
+ std::unique_ptr<EventLoop> pi1_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi1_counter", pi1);
+ std::unique_ptr<EventLoop> pi2_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi2);
+
+ aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+ pi1_counter_event_loop->MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+ pi2_counter_event_loop->MakeFetcher<examples::Ping>("/test");
+
+ aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
+ pi2_counter_event_loop->MakeFetcher<examples::Pong>("/test");
+ aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
+ pi1_counter_event_loop->MakeFetcher<examples::Pong>("/test");
+
+ // End after a pong message comes back. This will leave the latest messages
+ // on all channels so we can look at timestamps easily and check they make
+ // sense.
+ std::unique_ptr<EventLoop> pi1_pong_ender =
+ simulated_event_loop_factory.MakeEventLoop("pi2_counter", pi1);
+ int count = 0;
+ pi1_pong_ender->MakeWatcher(
+ "/test", [&simulated_event_loop_factory, &count](const examples::Pong &) {
+ if (++count == 100) {
+ simulated_event_loop_factory.Exit();
+ }
+ });
+
+ // Run enough that messages should be delivered.
+ simulated_event_loop_factory.Run();
+
+ // Grab the latest messages.
+ EXPECT_TRUE(ping_on_pi1_fetcher.Fetch());
+ EXPECT_TRUE(ping_on_pi2_fetcher.Fetch());
+ EXPECT_TRUE(pong_on_pi1_fetcher.Fetch());
+ EXPECT_TRUE(pong_on_pi2_fetcher.Fetch());
+
+ // Compute their time on the global distributed clock so we can compute
+ // distance betwen them.
+ const distributed_clock::time_point pi1_ping_time =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
+ ->ToDistributedClock(
+ ping_on_pi1_fetcher.context().monotonic_event_time);
+ const distributed_clock::time_point pi2_ping_time =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+ ->ToDistributedClock(
+ ping_on_pi2_fetcher.context().monotonic_event_time);
+ const distributed_clock::time_point pi1_pong_time =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)
+ ->ToDistributedClock(
+ pong_on_pi1_fetcher.context().monotonic_event_time);
+ const distributed_clock::time_point pi2_pong_time =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+ ->ToDistributedClock(
+ pong_on_pi2_fetcher.context().monotonic_event_time);
+
+ // And confirm the delivery delay is just about exactly 150 uS for both
+ // directions like expected. There will be a couple ns of rounding errors in
+ // the conversion functions that aren't worth accounting for right now. This
+ // will either be really close, or really far.
+ EXPECT_GE(pi2_ping_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
+ pi1_ping_time);
+ EXPECT_LE(pi2_ping_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
+ pi1_ping_time);
+
+ EXPECT_GE(pi1_pong_time, chrono::microseconds(150) - chrono::nanoseconds(10) +
+ pi2_pong_time);
+ EXPECT_LE(pi1_pong_time, chrono::microseconds(150) + chrono::nanoseconds(10) +
+ pi2_pong_time);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 825c830..5660331 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -82,7 +82,11 @@
DeliveredTime(fetcher_->context());
CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
- << ": Trying to deliver message in the past...";
+ << ": Trying to deliver message in the past on channel "
+ << configuration::StrippedChannelToString(fetcher_->channel())
+ << " to node " << send_event_loop_->node()->name()->string_view()
+ << " sent from " << fetcher_->channel()->source_node()->string_view()
+ << " at " << fetch_node_factory_->monotonic_now();
server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
1);
@@ -142,11 +146,9 @@
const distributed_clock::time_point distributed_sent_time =
fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
- return aos::monotonic_clock::epoch() +
- (distributed_sent_time - send_node_factory_->ToDistributedClock(
- aos::monotonic_clock::epoch())) +
- send_node_factory_->network_delay() +
- send_node_factory_->send_delay();
+ return send_node_factory_->FromDistributedClock(
+ distributed_sent_time + send_node_factory_->network_delay() +
+ send_node_factory_->send_delay());
}
// Factories used for time conversion.
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index bab006a..5da7466 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -5,6 +5,7 @@
#include <string_view>
#include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
#include "aos/macros.h"
#include "flatbuffers/flatbuffers.h"
#include "glog/logging.h"
@@ -164,15 +165,16 @@
class FlatbufferVector : public Flatbuffer<T> {
public:
// Builds a Flatbuffer around a vector.
- FlatbufferVector(std::vector<uint8_t> &&data) : data_(std::move(data)) {}
+ FlatbufferVector(ResizeableBuffer &&data) : data_(std::move(data)) {}
// Builds a Flatbuffer by copying the data from the other flatbuffer.
- FlatbufferVector(const Flatbuffer<T> &other)
- : data_(other.data(), other.data() + other.size()) {}
+ FlatbufferVector(const Flatbuffer<T> &other) {
+ data_.resize(other.size());
+ memcpy(data_.data(), other.data(), data_.size());
+ }
// Copy constructor.
- FlatbufferVector(const FlatbufferVector<T> &other)
- : data_(other.data(), other.data() + other.size()) {}
+ FlatbufferVector(const FlatbufferVector<T> &other) : data_(other.data_) {}
// Move constructor.
FlatbufferVector(FlatbufferVector<T> &&other)
@@ -180,7 +182,7 @@
// Copies the data from the other flatbuffer.
FlatbufferVector &operator=(const FlatbufferVector<T> &other) {
- data_ = std::vector<uint8_t>(other.data(), other.data() + other.size());
+ data_ = other.data_;
return *this;
}
FlatbufferVector &operator=(FlatbufferVector<T> &&other) {
@@ -190,7 +192,7 @@
// Constructs an empty flatbuffer of type T.
static FlatbufferVector<T> Empty() {
- return FlatbufferVector<T>(std::vector<uint8_t>{});
+ return FlatbufferVector<T>(ResizeableBuffer());
}
virtual ~FlatbufferVector() override {}
@@ -200,7 +202,7 @@
size_t size() const override { return data_.size(); }
private:
- std::vector<uint8_t> data_;
+ ResizeableBuffer data_;
};
// This object associates the message type with the memory storing the
@@ -359,6 +361,16 @@
flatbuffers::DetachedBuffer buffer_;
};
+inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
+ absl::Span<const uint8_t> span) {
+ // Copy the data from the span.
+ uint8_t *buf = flatbuffers::DefaultAllocator().allocate(span.size());
+ memcpy(buf, span.data(), span.size());
+ // Then give it to a DetachedBuffer to manage.
+ return flatbuffers::DetachedBuffer(nullptr, false, buf, span.size(), buf,
+ span.size());
+}
+
} // namespace aos
#endif // AOS_FLATBUFFERS_H_
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index d5add53..fed9a8a 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -295,10 +295,10 @@
cc_binary(
name = "print_lockless_queue_memory",
- visibility = ["//visibility:public"],
srcs = [
"print_lockless_queue_memory.cc",
],
+ visibility = ["//visibility:public"],
deps = [
":lockless_queue",
],
diff --git a/aos/json_to_flatbuffer.h b/aos/json_to_flatbuffer.h
index 381621e..c997f03 100644
--- a/aos/json_to_flatbuffer.h
+++ b/aos/json_to_flatbuffer.h
@@ -122,8 +122,12 @@
template <typename T>
inline FlatbufferVector<T> FileToFlatbuffer(const std::string_view path) {
std::ifstream instream(std::string(path), std::ios::in | std::ios::binary);
- std::vector<uint8_t> data((std::istreambuf_iterator<char>(instream)),
- std::istreambuf_iterator<char>());
+ ResizeableBuffer data;
+ std::istreambuf_iterator<char> it(instream);
+ while (it != std::istreambuf_iterator<char>()) {
+ data.push_back(*it);
+ ++it;
+ }
return FlatbufferVector<T>(std::move(data));
}
diff --git a/aos/realtime.cc b/aos/realtime.cc
index a0af97a..a41eb0d 100644
--- a/aos/realtime.cc
+++ b/aos/realtime.cc
@@ -74,7 +74,7 @@
WriteCoreDumps();
PCHECK(mlockall(MCL_CURRENT | MCL_FUTURE) == 0);
-#if !__has_feature(address_sanitizer)
+#if !__has_feature(address_sanitizer) && !__has_feature(memory_sanitizer)
// Don't give freed memory back to the OS.
CHECK_EQ(1, mallopt(M_TRIM_THRESHOLD, -1));
// Don't use mmap for large malloc chunks.
diff --git a/debian/BUILD b/debian/BUILD
index f498dff..6f955ae 100644
--- a/debian/BUILD
+++ b/debian/BUILD
@@ -65,11 +65,18 @@
gstreamer_armhf_debs = "files",
)
load(
- "//debian:m4.bzl",
+ ":m4.bzl",
m4_debs = "files",
)
-
-load("//debian:packages.bzl", "download_packages", "generate_deb_tarball")
+load(
+ ":lzma_amd64.bzl",
+ lzma_amd64_debs = "files",
+)
+load(
+ ":lzma_arm64.bzl",
+ lzma_arm64_debs = "files",
+)
+load(":packages.bzl", "download_packages", "generate_deb_tarball")
filegroup(
name = "matplotlib_patches",
@@ -341,6 +348,23 @@
files = m4_debs,
)
+download_packages(
+ name = "download_lzma",
+ packages = [
+ "liblzma-dev",
+ ],
+)
+
+generate_deb_tarball(
+ name = "lzma_amd64",
+ files = lzma_amd64_debs,
+)
+
+generate_deb_tarball(
+ name = "lzma_arm64",
+ files = lzma_arm64_debs,
+)
+
exports_files([
"ssh_wrapper.sh",
])
diff --git a/debian/lzma_amd64.bzl b/debian/lzma_amd64.bzl
new file mode 100644
index 0000000..1dbfa38
--- /dev/null
+++ b/debian/lzma_amd64.bzl
@@ -0,0 +1,3 @@
+files = {
+ "liblzma-dev_5.2.4-1_amd64.deb": "8373145ed28563aff33b116c434399347ab1fa77ba672d8d94151dd8f68c8ece",
+}
diff --git a/debian/lzma_arm64.bzl b/debian/lzma_arm64.bzl
new file mode 100644
index 0000000..a38d900
--- /dev/null
+++ b/debian/lzma_arm64.bzl
@@ -0,0 +1,3 @@
+files = {
+ "liblzma-dev_5.2.2-1.3_arm64.deb": "eb730c11f65c9bd10ae6c39b5a461e16294c21a423c9a8bc4011390798387e82",
+}
diff --git a/debian/matplotlib.BUILD b/debian/matplotlib.BUILD
index e7e9b6f..881521c 100644
--- a/debian/matplotlib.BUILD
+++ b/debian/matplotlib.BUILD
@@ -1,6 +1,9 @@
load("@//debian:matplotlib.bzl", "build_matplotlib")
-build_matplotlib("3", tkinter_py_version="3.5")
+build_matplotlib(
+ "3",
+ tkinter_py_version = "3.5",
+)
build_matplotlib(
"2.7",
diff --git a/frc971/codelab/README.md b/frc971/codelab/README.md
index e274f24..2fdf08c 100644
--- a/frc971/codelab/README.md
+++ b/frc971/codelab/README.md
@@ -1,5 +1,12 @@
# FRC971 "Codelab"
+## Flatbuffers tutorial
+
+Flatbuffers has pretty good [tutorials](https://google.github.io/flatbuffers/flatbuffers_guide_tutorial.html) for how to use them. We recommend
+going through them for more information.
+
+## Basic control loop
+
Welcome! This folder contains a "codelab" where you can go through the process
of fleshing out a basic control-loop using the same infrastructure as we do for
the control loops that normally run on our robots. Currently, this just consists
diff --git a/frc971/codelab/basic.cc b/frc971/codelab/basic.cc
index 36e64c5..6144fed 100644
--- a/frc971/codelab/basic.cc
+++ b/frc971/codelab/basic.cc
@@ -30,7 +30,8 @@
Status::Builder builder = status->MakeBuilder<Status>();
// TODO(you): Fill out the Status message! In order to populate fields, use
// the add_field_name() method on the builder, just like we do with the
- // Output message above.
+ // Output message above. Look at the definition of Status in
+ // basic_status.fbs and populate the field according to the comments there.
status->Send(builder.Finish());
}
diff --git a/frc971/codelab/basic.h b/frc971/codelab/basic.h
index 944048f..b75a87f 100644
--- a/frc971/codelab/basic.h
+++ b/frc971/codelab/basic.h
@@ -15,6 +15,9 @@
// This codelab helps build basic knowledge of how to use 971 control loop
// primatives.
//
+// For flatbuffers background, we recommend checking out
+// https://google.github.io/flatbuffers/flatbuffers_guide_tutorial.html
+//
// The meat of the task is to make the tests pass.
//
// Run the tests with:
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index 1f362f3..faed4af 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -64,8 +64,8 @@
if (!FLAGS_output_file.empty()) {
unlink(FLAGS_output_file.c_str());
logger_event_loop_ = MakeEventLoop("logger");
- logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
- logger_event_loop_.get());
+ logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+ logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
}
// Run for enough time to allow the gyro/imu zeroing code to run.
@@ -125,11 +125,9 @@
// TODO(james): Handle Euler angle singularities...
const double down_estimator_yaw =
CHECK_NOTNULL(drivetrain_status_fetcher_->down_estimator())->yaw();
- const double localizer_yaw =
- drivetrain_status_fetcher_->theta();
- EXPECT_LT(
- std::abs(aos::math::DiffAngle(down_estimator_yaw, localizer_yaw)),
- 1e-2);
+ const double localizer_yaw = drivetrain_status_fetcher_->theta();
+ EXPECT_LT(std::abs(aos::math::DiffAngle(down_estimator_yaw, localizer_yaw)),
+ 1e-2);
const double true_yaw = (drivetrain_plant_.GetRightPosition() -
drivetrain_plant_.GetLeftPosition()) /
(dt_config_.robot_radius * 2.0);
@@ -558,9 +556,11 @@
auto builder = drivetrain_goal_sender_.MakeBuilder();
flatbuffers::Offset<flatbuffers::Vector<float>> spline_x_offset =
- builder.fbb()->CreateVector<float>({0.0, -0.25, -0.5, -0.5, -0.75, -1.0});
+ builder.fbb()->CreateVector<float>(
+ {0.0, -0.25, -0.5, -0.5, -0.75, -1.0});
flatbuffers::Offset<flatbuffers::Vector<float>> spline_y_offset =
- builder.fbb()->CreateVector<float>({0.0, 0.0, -0.25, -0.75, -1.0, -1.0});
+ builder.fbb()->CreateVector<float>(
+ {0.0, 0.0, -0.25, -0.75, -1.0, -1.0});
MultiSpline::Builder multispline_builder =
builder.MakeBuilder<MultiSpline>();
diff --git a/frc971/wpilib/ADIS16470.cc b/frc971/wpilib/ADIS16470.cc
index fa61d9b..ec2ee9e 100644
--- a/frc971/wpilib/ADIS16470.cc
+++ b/frc971/wpilib/ADIS16470.cc
@@ -296,8 +296,9 @@
(0 << 2) /* internal clock mode */ |
(0 << 1) /* sync polarity, doesn't matter */ |
(1 << 0) /* data ready is active high */);
+ // Rate of the output will be 2000 / (DEC_RATE + 1) Hz.
WriteRegister(registers::DEC_RATE,
- 0 /* no internal decimation (averaging) */);
+ 1 /* Average every pair of values. */);
// Start a sensor self test.
WriteRegister(registers::GLOB_CMD, 1 << 2);
diff --git a/third_party/BUILD b/third_party/BUILD
index bcba978..2f22b0e 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -113,3 +113,15 @@
"roborio": ["@webrtc_rio//:webrtc"],
}),
)
+
+# TODO(Brian): Properly restrict this to specific platforms and remove the
+# default deps once we have support for that which interacts with select properly.
+cc_library(
+ name = "lzma",
+ visibility = ["//visibility:public"],
+ deps = select({
+ "//tools:cpu_k8": ["@lzma_amd64//:lib"],
+ "//tools:cpu_arm64": ["@lzma_arm64//:lib"],
+ "//conditions:default": [],
+ }),
+)
diff --git a/tools/BUILD b/tools/BUILD
index ac76ee9..d77abf8 100644
--- a/tools/BUILD
+++ b/tools/BUILD
@@ -49,6 +49,11 @@
)
config_setting(
+ name = "cpu_arm64",
+ values = {"cpu": "arm64"},
+)
+
+config_setting(
name = "has_asan",
values = {"define": "have_asan=true"},
)
@@ -75,11 +80,14 @@
environment(name = "cortex-m4f-k22")
+environment(name = "arm64")
+
environment_group(
name = "cpus",
defaults = [
":k8",
":roborio",
+ ":arm64",
":armhf-debian",
],
environments = [
@@ -89,5 +97,6 @@
":armhf-debian",
":cortex-m4f",
":cortex-m4f-k22",
+ ":arm64",
],
)
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
index f7b69e4..98f3eeb 100644
--- a/y2019/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -42,7 +42,8 @@
log_writer_event_loop->SkipTimingReport();
log_writer_event_loop->SkipAosLog();
CHECK(nullptr == log_writer_event_loop->node());
- aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
+ aos::logger::Logger writer(log_writer_event_loop.get());
+ writer.StartLoggingLocalNamerOnRun(FLAGS_output_file);
std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
reader.event_loop_factory()->MakeEventLoop("drivetrain");
diff --git a/y2020/config_gen.bzl b/y2020/config_gen.bzl
deleted file mode 100644
index aed1302..0000000
--- a/y2020/config_gen.bzl
+++ /dev/null
@@ -1,13 +0,0 @@
-def generate_pi_config(number):
- native.genrule(
- name = "generate_pi_config_%d" % (number),
- srcs = ["y2020_pi_template.json"],
- outs = ["y2020_pi%d.json" % (number)],
- cmd = " ".join([
- "$(location //y2020:generate_pi_config)",
- "$(location y2020_pi_template.json)",
- "'{\"NUM\": %d}'" % (number),
- "$(OUTS)",
- ]),
- tools = ["//y2020:generate_pi_config"],
- )
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay.cc b/y2020/control_loops/drivetrain/drivetrain_replay.cc
index 0dd5a1a..0ed57c5 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay.cc
@@ -49,7 +49,8 @@
std::unique_ptr<aos::EventLoop> log_writer_event_loop =
reader.event_loop_factory()->MakeEventLoop("log_writer", node);
log_writer_event_loop->SkipTimingReport();
- aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
+ aos::logger::Logger writer(log_writer_event_loop.get());
+ writer.StartLoggingLocalNamerOnRun(FLAGS_output_file);
std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
reader.event_loop_factory()->MakeEventLoop("drivetrain", node);
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index ae9f887..dbb4f2f 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -26,10 +26,10 @@
using frc971::control_loops::drivetrain::DrivetrainConfig;
using frc971::control_loops::drivetrain::Goal;
using frc971::control_loops::drivetrain::LocalizerControl;
+using frc971::vision::sift::CameraCalibrationT;
+using frc971::vision::sift::CameraPoseT;
using frc971::vision::sift::ImageMatchResult;
using frc971::vision::sift::ImageMatchResultT;
-using frc971::vision::sift::CameraPoseT;
-using frc971::vision::sift::CameraCalibrationT;
using frc971::vision::sift::TransformationMatrixT;
namespace {
@@ -90,9 +90,9 @@
} // namespace
namespace chrono = std::chrono;
-using frc971::control_loops::drivetrain::testing::DrivetrainSimulation;
-using frc971::control_loops::drivetrain::DrivetrainLoop;
using aos::monotonic_clock;
+using frc971::control_loops::drivetrain::DrivetrainLoop;
+using frc971::control_loops::drivetrain::testing::DrivetrainSimulation;
class LocalizedDrivetrainTest : public aos::testing::ControlLoopTest {
protected:
@@ -136,8 +136,8 @@
if (!FLAGS_output_file.empty()) {
logger_event_loop_ = MakeEventLoop("logger", roborio_);
- logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
- logger_event_loop_.get());
+ logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+ logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
}
test_event_loop_->MakeWatcher(
@@ -152,7 +152,7 @@
last_frame_ = monotonic_now();
}
}
- });
+ });
test_event_loop_->AddPhasedLoop(
[this](int) {
@@ -337,8 +337,7 @@
aos::Sender<aos::message_bridge::ServerStatistics> server_statistics_sender_;
std::unique_ptr<aos::EventLoop> drivetrain_event_loop_;
- const frc971::control_loops::drivetrain::DrivetrainConfig<double>
- dt_config_;
+ const frc971::control_loops::drivetrain::DrivetrainConfig<double> dt_config_;
std::unique_ptr<aos::EventLoop> pi1_event_loop_;
aos::Sender<ImageMatchResult> camera_sender_;
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index 8a0589c..59296d9 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -440,8 +440,8 @@
if (!FLAGS_output_file.empty()) {
unlink(FLAGS_output_file.c_str());
logger_event_loop_ = MakeEventLoop("logger", roborio_);
- logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
- logger_event_loop_.get());
+ logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+ logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
}
}
diff --git a/y2020/y2020.json b/y2020/y2020.json
index 808a796..59f3fc0 100644
--- a/y2020/y2020.json
+++ b/y2020/y2020.json
@@ -1,5 +1,5 @@
{
- "channel_storage_duration": 5000000000,
+ "channel_storage_duration": 2000000000,
"maps": [
{
"match": {