Merge "Make downloading work on a stock buster installation"
diff --git a/aos/configuration.h b/aos/configuration.h
index 55b64c9..ef30fce 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -49,6 +49,16 @@
const Node *node) {
return GetChannel(&config.message(), name, type, application_name, node);
}
+// Convenience wrapper for getting a channel from a specified config if you
+// already have the name/type in a Channel object--this is useful if you Channel
+// object you have does not point to memory within config.
+inline const Channel *GetChannel(const Configuration *config,
+ const Channel *channel,
+ const std::string_view application_name,
+ const Node *node) {
+ return GetChannel(config, channel->name()->string_view(),
+ channel->type()->string_view(), application_name, node);
+}
// Returns the Node out of the config with the matching name, or nullptr if it
// can't be found.
diff --git a/aos/controls/control_loop_test.h b/aos/controls/control_loop_test.h
index a2dbb7b..25e1a45 100644
--- a/aos/controls/control_loop_test.h
+++ b/aos/controls/control_loop_test.h
@@ -102,7 +102,7 @@
}
::aos::monotonic_clock::time_point monotonic_now() {
- return event_loop_factory_.monotonic_now();
+ return robot_status_event_loop_->monotonic_now();
}
::std::chrono::nanoseconds dt() const { return dt_; }
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 79200bd..6f1f453 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -250,6 +250,18 @@
)
cc_library(
+ name = "simple_channel",
+ srcs = ["simple_channel.cc"],
+ hdrs = ["simple_channel.h"],
+ deps = [
+ "//aos:configuration_fbs",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_github_google_glog//:glog",
+ "@com_google_absl//absl/strings",
+ ],
+)
+
+cc_library(
name = "simulated_event_loop",
srcs = [
"event_scheduler.cc",
@@ -262,6 +274,7 @@
visibility = ["//visibility:public"],
deps = [
":event_loop",
+ ":simple_channel",
"//aos/ipc_lib:index",
"//aos/util:phased_loop",
"@com_google_absl//absl/container:btree",
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 714796e..e55b80d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -10,6 +10,7 @@
#include "aos/configuration.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffer_merge.h"
#include "flatbuffers/flatbuffers.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -235,13 +236,45 @@
return result;
}
-SortedMessageReader::SortedMessageReader(std::string_view filename)
- : message_reader_(filename) {
+SortedMessageReader::SortedMessageReader(
+ const std::vector<std::string> &filenames)
+ : filenames_(filenames),
+ log_file_header_(FlatbufferDetachedBuffer<LogFileHeader>::Empty()) {
+ CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
+
+ log_file_header_ = CopyFlatBuffer(message_reader_->log_file_header());
+
channels_.resize(configuration()->channels()->size());
QueueMessages();
}
+bool SortedMessageReader::NextLogFile() {
+ if (next_filename_index_ == filenames_.size()) {
+ return false;
+ }
+ message_reader_ =
+ std::make_unique<MessageReader>(filenames_[next_filename_index_]);
+
+ // We can't support the config diverging between two log file headers. See if
+ // they are the same.
+ if (next_filename_index_ != 0) {
+ // Since we copied before, we need to copy again to guarantee that things
+ // didn't get re-ordered.
+ const FlatbufferDetachedBuffer<LogFileHeader> new_log_file_header =
+ CopyFlatBuffer(message_reader_->log_file_header());
+ CHECK_EQ(new_log_file_header.size(), log_file_header_.size());
+ CHECK(memcmp(new_log_file_header.data(), log_file_header_.data(),
+ log_file_header_.size()) == 0)
+ << ": Header is different between log file chunks "
+ << filenames_[next_filename_index_] << " and "
+ << filenames_[next_filename_index_ - 1] << ", this is not supported.";
+ }
+
+ ++next_filename_index_;
+ return true;
+}
+
void SortedMessageReader::EmplaceDataBack(
FlatbufferVector<MessageHeader> &&new_data) {
const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
@@ -289,17 +322,19 @@
// Those messages might be very old. Make sure to read a chunk past the
// starting time.
if (channel_heap_.size() > 0 &&
- message_reader_.newest_timestamp() >
+ message_reader_->newest_timestamp() >
std::max(oldest_message().first, monotonic_start_time()) +
- message_reader_.max_out_of_order_duration()) {
+ message_reader_->max_out_of_order_duration()) {
break;
}
if (std::optional<FlatbufferVector<MessageHeader>> msg =
- message_reader_.ReadMessage()) {
+ message_reader_->ReadMessage()) {
EmplaceDataBack(std::move(msg.value()));
} else {
- break;
+ if (!NextLogFile()) {
+ break;
+ }
}
}
}
@@ -328,7 +363,7 @@
channel.oldest_timestamp = monotonic_clock::min_time;
}
- if (oldest_channel_data.first > message_reader_.queue_data_time()) {
+ if (oldest_channel_data.first > message_reader_->queue_data_time()) {
QueueMessages();
}
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 285fe05..6b8e9aa 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -5,6 +5,7 @@
#include <deque>
#include <optional>
+#include <string>
#include <string_view>
#include <vector>
@@ -198,11 +199,11 @@
// sorted list of pointers to those.
class SortedMessageReader {
public:
- SortedMessageReader(std::string_view filename);
+ SortedMessageReader(const std::vector<std::string> &filenames);
// Returns the header from the log file.
const LogFileHeader *log_file_header() const {
- return message_reader_.log_file_header();
+ return &log_file_header_.message();
}
// Returns a pointer to the channel with the oldest message in it, and the
@@ -250,6 +251,9 @@
PopOldestChannel();
private:
+ // Moves to the next log file in the list.
+ bool NextLogFile();
+
// Adds more messages to the sorted list.
void QueueMessages();
@@ -279,7 +283,11 @@
}
};
- MessageReader message_reader_;
+ std::vector<std::string> filenames_;
+ size_t next_filename_index_ = 0;
+
+ FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+ std::unique_ptr<MessageReader> message_reader_;
// TODO(austin): Multithreaded read at some point. Gotta go faster!
// Especially if we start compressing.
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 3e790b6..51dc10c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -91,71 +91,82 @@
// starts here. It needs to be after everything is fetched so that the
// fetchers are all pointed at the most recent message before the start
// time.
- const monotonic_clock::time_point monotonic_now =
- event_loop_->monotonic_now();
- const realtime_clock::time_point realtime_now = event_loop_->realtime_now();
- last_synchronized_time_ = monotonic_now;
+ monotonic_start_time_ = event_loop_->monotonic_now();
+ realtime_start_time_ = event_loop_->realtime_now();
+ last_synchronized_time_ = monotonic_start_time_;
- {
- // Now write the header with this timestamp in it.
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(1);
+ LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
- flatbuffers::Offset<aos::Configuration> configuration_offset =
- CopyFlatBuffer(event_loop_->configuration(), &fbb);
-
- flatbuffers::Offset<flatbuffers::String> string_offset =
- fbb.CreateString(network::GetHostname());
-
- flatbuffers::Offset<Node> node_offset;
- if (event_loop_->node() != nullptr) {
- node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
- }
- LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
-
- aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
-
- log_file_header_builder.add_name(string_offset);
-
- // Only add the node if we are running in a multinode configuration.
- if (event_loop_->node() != nullptr) {
- log_file_header_builder.add_node(node_offset);
- }
-
- log_file_header_builder.add_configuration(configuration_offset);
- // The worst case theoretical out of order is the polling period times 2.
- // One message could get logged right after the boundary, but be for right
- // before the next boundary. And the reverse could happen for another
- // message. Report back 3x to be extra safe, and because the cost isn't
- // huge on the read side.
- log_file_header_builder.add_max_out_of_order_duration(
- std::chrono::duration_cast<std::chrono::nanoseconds>(3 *
- polling_period)
- .count());
-
- log_file_header_builder.add_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_now.time_since_epoch())
- .count());
- log_file_header_builder.add_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_now.time_since_epoch())
- .count());
-
- fbb.FinishSizePrefixed(log_file_header_builder.Finish());
- writer_->QueueSizedFlatbuffer(&fbb);
- }
+ WriteHeader();
timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
polling_period);
});
}
+void Logger::WriteHeader() {
+ // Now write the header with this timestamp in it.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(1);
+
+ flatbuffers::Offset<aos::Configuration> configuration_offset =
+ CopyFlatBuffer(event_loop_->configuration(), &fbb);
+
+ flatbuffers::Offset<flatbuffers::String> string_offset =
+ fbb.CreateString(network::GetHostname());
+
+ flatbuffers::Offset<Node> node_offset;
+ if (event_loop_->node() != nullptr) {
+ node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
+ }
+
+ aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+ log_file_header_builder.add_name(string_offset);
+
+ // Only add the node if we are running in a multinode configuration.
+ if (event_loop_->node() != nullptr) {
+ log_file_header_builder.add_node(node_offset);
+ }
+
+ log_file_header_builder.add_configuration(configuration_offset);
+ // The worst case theoretical out of order is the polling period times 2.
+ // One message could get logged right after the boundary, but be for right
+ // before the next boundary. And the reverse could happen for another
+ // message. Report back 3x to be extra safe, and because the cost isn't
+ // huge on the read side.
+ log_file_header_builder.add_max_out_of_order_duration(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
+ .count());
+
+ log_file_header_builder.add_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_start_time_.time_since_epoch())
+ .count());
+ log_file_header_builder.add_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_start_time_.time_since_epoch())
+ .count());
+
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ writer_->QueueSizedFlatbuffer(&fbb);
+}
+
+void Logger::Rotate(DetachedBufferWriter *writer) {
+ // Force data up until now to be written.
+ DoLogData();
+
+ // Swap the writer out, and re-write the header.
+ writer_ = writer;
+ WriteHeader();
+}
+
void Logger::DoLogData() {
// We want to guarentee that messages aren't out of order by more than
// max_out_of_order_duration. To do this, we need sync points. Every write
// cycle should be a sync point.
- const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+ const monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
do {
// Move the sync point up by at most polling_period. This forces one sync
@@ -219,20 +230,41 @@
writer_->Flush();
}
-LogReader::LogReader(std::string_view filename)
- : sorted_message_reader_(filename) {
- channels_.resize(configuration()->channels()->size());
+LogReader::LogReader(std::string_view filename,
+ const Configuration *replay_configuration)
+ : LogReader(std::vector<std::string>{std::string(filename)},
+ replay_configuration) {}
+
+LogReader::LogReader(const std::vector<std::string> &filenames,
+ const Configuration *replay_configuration)
+ : sorted_message_reader_(filenames),
+ replay_configuration_(replay_configuration) {
+ channels_.resize(logged_configuration()->channels()->size());
+ MakeRemappedConfig();
}
-LogReader::~LogReader() {
- Deregister();
-}
+LogReader::~LogReader() { Deregister(); }
-const Configuration *LogReader::configuration() const {
+const Configuration *LogReader::logged_configuration() const {
return sorted_message_reader_.configuration();
}
-const Node *LogReader::node() const { return sorted_message_reader_.node(); }
+const Configuration *LogReader::configuration() const {
+ return remapped_configuration_;
+}
+
+const Node *LogReader::node() const {
+ // Because the Node pointer will only be valid if it actually points to memory
+ // owned by remapped_configuration_, we need to wait for the
+ // remapped_configuration_ to be populated before accessing it.
+ CHECK(remapped_configuration_ != nullptr)
+ << ": Need to call Register before the node() pointer will be valid.";
+ if (sorted_message_reader_.node() == nullptr) {
+ return nullptr;
+ }
+ return configuration::GetNode(
+ configuration(), sorted_message_reader_.node()->name()->string_view());
+}
monotonic_clock::time_point LogReader::monotonic_start_time() {
return sorted_message_reader_.monotonic_start_time();
@@ -258,7 +290,7 @@
Register(event_loop_unique_ptr_.get());
event_loop_factory_->RunFor(monotonic_start_time() -
- event_loop_factory_->monotonic_now());
+ event_loop_->monotonic_now());
}
void LogReader::Register(EventLoop *event_loop) {
@@ -268,16 +300,36 @@
event_loop_->SkipTimingReport();
for (size_t i = 0; i < channels_.size(); ++i) {
- CHECK_EQ(configuration()->channels()->Get(i)->name(),
- event_loop_->configuration()->channels()->Get(i)->name());
- CHECK_EQ(configuration()->channels()->Get(i)->type(),
- event_loop_->configuration()->channels()->Get(i)->type());
+ const Channel *const original_channel =
+ logged_configuration()->channels()->Get(i);
- channels_[i] = event_loop_->MakeRawSender(
- event_loop_->configuration()->channels()->Get(i));
+ std::string_view channel_name = original_channel->name()->string_view();
+ std::string_view channel_type = original_channel->type()->string_view();
+ // If the channel is remapped, find the correct channel name to use.
+ if (remapped_channels_.count(i) > 0) {
+ VLOG(2) << "Got remapped channel on "
+ << configuration::CleanedChannelToString(original_channel);
+ channel_name = remapped_channels_[i];
+ }
+
+ VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
+ const Channel *channel = configuration::GetChannel(
+ event_loop_->configuration(), channel_name, channel_type,
+ event_loop_->name(), event_loop_->node());
+
+ CHECK(channel != nullptr)
+ << ": Unable to send {\"name\": \"" << channel_name
+ << "\", \"type\": \"" << channel_type
+ << "\"} because it is not in the provided configuration.";
+
+ channels_[i] = event_loop_->MakeRawSender(channel);
}
timer_handler_ = event_loop_->AddTimer([this]() {
+ if (sorted_message_reader_.active_channel_count() == 0u) {
+ event_loop_factory_->Exit();
+ return;
+ }
monotonic_clock::time_point channel_timestamp;
int channel_index;
FlatbufferVector<MessageHeader> channel_data =
@@ -329,6 +381,14 @@
if (sorted_message_reader_.active_channel_count() > 0u) {
timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+ } else {
+ // Set a timer up immediately after now to die. If we don't do this, then
+ // the senders waiting on the message we just read will never get called.
+ if (event_loop_factory_ != nullptr) {
+ timer_handler_->Setup(monotonic_now +
+ event_loop_factory_->send_delay() +
+ std::chrono::nanoseconds(1));
+ }
}
});
@@ -352,5 +412,108 @@
event_loop_factory_ = nullptr;
}
+void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
+ std::string_view add_prefix) {
+ for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
+ const Channel *const channel = logged_configuration()->channels()->Get(ii);
+ if (channel->name()->str() == name &&
+ channel->type()->string_view() == type) {
+ CHECK_EQ(0u, remapped_channels_.count(ii))
+ << "Already remapped channel "
+ << configuration::CleanedChannelToString(channel);
+ remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
+ VLOG(1) << "Remapping channel "
+ << configuration::CleanedChannelToString(channel)
+ << " to have name " << remapped_channels_[ii];
+ MakeRemappedConfig();
+ return;
+ }
+ }
+ LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
+ << type;
+}
+
+void LogReader::MakeRemappedConfig() {
+ // If no remapping occurred and we are using the original config, then there
+ // is nothing interesting to do here.
+ if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
+ remapped_configuration_ = sorted_message_reader_.configuration();
+ return;
+ }
+ // Config to copy Channel definitions from. Use the specified
+ // replay_configuration_ if it has been provided.
+ const Configuration *const base_config = replay_configuration_ == nullptr
+ ? logged_configuration()
+ : replay_configuration_;
+ // The remapped config will be identical to the base_config, except that it
+ // will have a bunch of extra channels in the channel list, which are exact
+ // copies of the remapped channels, but with different names.
+ // Because the flatbuffers API is a pain to work with, this requires a bit of
+ // a song-and-dance to get copied over.
+ // The order of operations is to:
+ // 1) Make a flatbuffer builder for a config that will just contain a list of
+ // the new channels that we want to add.
+ // 2) For each channel that we are remapping:
+ // a) Make a buffer/builder and construct into it a Channel table that only
+ // contains the new name for the channel.
+ // b) Merge the new channel with just the name into the channel that we are
+ // trying to copy, built in the flatbuffer builder made in 1. This gives
+ // us the new channel definition that we need.
+ // 3) Using this list of offsets, build the Configuration of just new
+ // Channels.
+ // 4) Merge the Configuration with the new Channels into the base_config.
+ // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
+ // chance to sanitize the config.
+
+ // This is the builder that we use for the config containing all the new
+ // channels.
+ flatbuffers::FlatBufferBuilder new_config_fbb;
+ new_config_fbb.ForceDefaults(1);
+ std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+ for (auto &pair : remapped_channels_) {
+ // This is the builder that we use for creating the Channel with just the
+ // new name.
+ flatbuffers::FlatBufferBuilder new_name_fbb;
+ new_name_fbb.ForceDefaults(1);
+ const flatbuffers::Offset<flatbuffers::String> name_offset =
+ new_name_fbb.CreateString(pair.second);
+ ChannelBuilder new_name_builder(new_name_fbb);
+ new_name_builder.add_name(name_offset);
+ new_name_fbb.Finish(new_name_builder.Finish());
+ const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
+ // Retrieve the channel that we want to copy, confirming that it is actually
+ // present in base_config.
+ const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
+ base_config, logged_configuration()->channels()->Get(pair.first), "",
+ nullptr));
+ // Actually create the new channel and put it into the vector of Offsets
+ // that we will use to create the new Configuration.
+ channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
+ reinterpret_cast<const flatbuffers::Table *>(base_channel),
+ reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
+ &new_config_fbb));
+ }
+ // Create the Configuration containing the new channels that we want to add.
+ const auto new_name_vector_offsets =
+ new_config_fbb.CreateVector(channel_offsets);
+ ConfigurationBuilder new_config_builder(new_config_fbb);
+ new_config_builder.add_channels(new_name_vector_offsets);
+ new_config_fbb.Finish(new_config_builder.Finish());
+ const FlatbufferDetachedBuffer<Configuration> new_name_config =
+ new_config_fbb.Release();
+ // Merge the new channels configuration into the base_config, giving us the
+ // remapped configuration.
+ remapped_configuration_buffer_ =
+ std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
+ MergeFlatBuffers<Configuration>(base_config,
+ &new_name_config.message()));
+ // Call MergeConfiguration to deal with sanitizing the config.
+ remapped_configuration_buffer_ =
+ std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
+ configuration::MergeConfiguration(*remapped_configuration_buffer_));
+
+ remapped_configuration_ = &remapped_configuration_buffer_->message();
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e5b91ac..337109b 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -25,7 +25,13 @@
std::chrono::milliseconds polling_period =
std::chrono::milliseconds(100));
+ // Rotates the log file with the new writer. This writes out the header
+ // again, but keeps going as if nothing else happened.
+ void Rotate(DetachedBufferWriter *writer);
+
private:
+ void WriteHeader();
+
void DoLogData();
EventLoop *event_loop_;
@@ -51,6 +57,9 @@
// Last time that data was written for all channels to disk.
monotonic_clock::time_point last_synchronized_time_;
+ monotonic_clock::time_point monotonic_start_time_;
+ realtime_clock::time_point realtime_start_time_;
+
// Max size that the header has consumed. This much extra data will be
// reserved in the builder to avoid reallocating.
size_t max_header_size_ = 0;
@@ -59,21 +68,30 @@
// Replays all the channels in the logfile to the event loop.
class LogReader {
public:
- LogReader(std::string_view filename);
+ // If you want to supply a new configuration that will be used for replay
+ // (e.g., to change message rates, or to populate an updated schema), then
+ // pass it in here. It must provide all the channels that the original logged
+ // config did.
+ LogReader(std::string_view filename,
+ const Configuration *replay_configuration = nullptr);
+ LogReader(const std::vector<std::string> &filename,
+ const Configuration *replay_configuration = nullptr);
~LogReader();
- // Registers everything, but also updates the real time time in sync. Runs
- // until the log file starts.
- // Note that if you use any call other than the Register() call with no
- // arguments, the user is responsible for making sure that the config of the
- // supplied event loop (factory) provides any necessary remapped configs.
- void Register();
- // Does the same as Register(), except it uses a pre-provided event loop
- // factory.
+ // Registers all the callbacks to send the log file data out on an event loop
+ // created in event_loop_factory. This also updates time to be at the start
+ // of the log file by running until the log file starts.
+ // Note: the configuration used in the factory should be configuration()
+ // below, but can be anything as long as the locations needed to send
+ // everything are available.
void Register(SimulatedEventLoopFactory *event_loop_factory);
- // Registers the timer and senders used to resend the messages from the log
- // file.
+ // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
+ // and then calls Register.
+ void Register();
+ // Registers callbacks for all the events after the log file starts. This is
+ // only useful when replaying live.
void Register(EventLoop *event_loop);
+
// Unregisters the senders. You only need to call this if you separately
// supplied an event loop or event loop factory and the lifetimes are such
// that they need to be explicitly destroyed before the LogReader destructor
@@ -81,15 +99,34 @@
void Deregister();
// Returns the configuration from the log file.
+ const Configuration *logged_configuration() const;
+ // Returns the configuration being used for replay.
const Configuration *configuration() const;
- // Returns the node that this log file was created on.
+ const LogFileHeader *log_file_header() const {
+ return sorted_message_reader_.log_file_header();
+ }
+
+ // Returns the node that this log file was created on. This is a pointer to a
+ // node in the nodes() list inside configuration().
const Node *node() const;
// Returns the starting timestamp for the log file.
monotonic_clock::time_point monotonic_start_time();
realtime_clock::time_point realtime_start_time();
+ // Causes the logger to publish the provided channel on a different name so
+ // that replayed applications can publish on the proper channel name without
+ // interference. This operates on raw channel names, without any node or
+ // application specific mappings.
+ void RemapLoggedChannel(std::string_view name, std::string_view type,
+ std::string_view add_prefix = "/original");
+ template <typename T>
+ void RemapLoggedChannel(std::string_view name,
+ std::string_view add_prefix = "/original") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
+ }
+
SimulatedEventLoopFactory *event_loop_factory() {
return event_loop_factory_;
}
@@ -102,6 +139,9 @@
private:
// Queues at least max_out_of_order_duration_ messages into channels_.
void QueueMessages();
+ // Handle constructing a configuration with all the additional remapped
+ // channels from calls to RemapLoggedChannel.
+ void MakeRemappedConfig();
// Log chunk reader.
SortedMessageReader sorted_message_reader_;
@@ -117,6 +157,14 @@
std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
+
+ // Map of channel indices to new name. The channel index will be an index into
+ // logged_configuration(), and the string key will be the name of the channel
+ // to send on instead of the logged channel name.
+ std::map<size_t, std::string> remapped_channels_;
+
+ const Configuration *remapped_configuration_ = nullptr;
+ const Configuration *replay_configuration_ = nullptr;
};
} // namespace logger
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 569aeca..5323493 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -59,10 +59,12 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(logfile);
+ // Even though it doesn't make any difference here, exercise the logic for
+ // passing in a separate config.
+ LogReader reader(logfile, &config_.message());
- LOG(INFO) << "Config " << FlatbufferToJson(reader.configuration());
- EXPECT_EQ(reader.node(), nullptr);
+ // Confirm that we can remap logged channels to point to new buses.
+ reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
// This sends out the fetched messages and advances time to the start of the
// log file.
@@ -70,7 +72,70 @@
EXPECT_EQ(reader.node(), nullptr);
- EXPECT_EQ(reader.event_loop_factory()->node(), nullptr);
+ std::unique_ptr<EventLoop> test_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+ int ping_count = 10;
+ int pong_count = 10;
+
+ // Confirm that the ping value matches in the remapped channel location.
+ test_event_loop->MakeWatcher("/original/test",
+ [&ping_count](const examples::Ping &ping) {
+ EXPECT_EQ(ping.value(), ping_count + 1);
+ ++ping_count;
+ });
+ // Confirm that the ping and pong counts both match, and the value also
+ // matches.
+ test_event_loop->MakeWatcher(
+ "/test", [&pong_count, &ping_count](const examples::Pong &pong) {
+ EXPECT_EQ(pong.value(), pong_count + 1);
+ ++pong_count;
+ EXPECT_EQ(ping_count, pong_count);
+ });
+
+ reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+ EXPECT_EQ(ping_count, 2010);
+}
+
+// Tests that we can read and write rotated log files.
+TEST_F(LoggerTest, RotatedLogFile) {
+ const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+ const ::std::string logfile0 = tmpdir + "/logfile0.bfbs";
+ const ::std::string logfile1 = tmpdir + "/logfile1.bfbs";
+ // Remove it.
+ unlink(logfile0.c_str());
+ unlink(logfile1.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile0 << " and " << logfile1;
+
+ {
+ DetachedBufferWriter writer0(logfile0);
+ DetachedBufferWriter writer1(logfile1);
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger");
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ Logger logger(&writer0, logger_event_loop.get(),
+ std::chrono::milliseconds(100));
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ logger.Rotate(&writer1);
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ }
+
+ // Even though it doesn't make any difference here, exercise the logic for
+ // passing in a separate config.
+ LogReader reader(std::vector<std::string>{logfile0, logfile1},
+ &config_.message());
+
+ // Confirm that we can remap logged channels to point to new buses.
+ reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
+
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register();
+
+ EXPECT_EQ(reader.node(), nullptr);
std::unique_ptr<EventLoop> test_event_loop =
reader.event_loop_factory()->MakeEventLoop("log_reader");
@@ -78,8 +143,8 @@
int ping_count = 10;
int pong_count = 10;
- // Confirm that the ping value matches.
- test_event_loop->MakeWatcher("/test",
+ // Confirm that the ping value matches in the remapped channel location.
+ test_event_loop->MakeWatcher("/original/test",
[&ping_count](const examples::Ping &ping) {
EXPECT_EQ(ping.value(), ping_count + 1);
++ping_count;
@@ -211,10 +276,12 @@
// TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
// messages. This won't work today yet until the log reading code gets
// significantly better.
+ SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration(), reader.node());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
// This sends out the fetched messages and advances time to the start of the
// log file.
- reader.Register();
+ reader.Register(&log_reader_factory);
ASSERT_NE(reader.node(), nullptr);
EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
@@ -222,7 +289,7 @@
reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
std::unique_ptr<EventLoop> test_event_loop =
- reader.event_loop_factory()->MakeEventLoop("test");
+ log_reader_factory.MakeEventLoop("test");
int ping_count = 10;
int pong_count = 10;
@@ -250,9 +317,11 @@
EXPECT_EQ(ping_count, pong_count);
});
- reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+ log_reader_factory.RunFor(std::chrono::seconds(100));
EXPECT_EQ(ping_count, 2010);
EXPECT_EQ(pong_count, 2010);
+
+ reader.Deregister();
}
} // namespace testing
diff --git a/aos/events/simple_channel.cc b/aos/events/simple_channel.cc
new file mode 100644
index 0000000..00e30ba
--- /dev/null
+++ b/aos/events/simple_channel.cc
@@ -0,0 +1,31 @@
+#include "aos/events/simple_channel.h"
+
+#include "absl/strings/str_cat.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+SimpleChannel::SimpleChannel(const Channel *channel)
+ : name(CHECK_NOTNULL(CHECK_NOTNULL(channel)->name())->str()),
+ type(CHECK_NOTNULL(CHECK_NOTNULL(channel)->type())->str()) {}
+
+std::string SimpleChannel::DebugString() const {
+ return absl::StrCat("{ ", name, ", ", type, "}");
+}
+
+bool SimpleChannel::operator==(const SimpleChannel &other) const {
+ return name == other.name && type == other.type;
+}
+
+bool SimpleChannel::operator<(const SimpleChannel &other) const {
+ int name_compare = other.name.compare(name);
+ if (name_compare == 0) {
+ return other.type < type;
+ } else if (name_compare < 0) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+} // namespace aos
diff --git a/aos/events/simple_channel.h b/aos/events/simple_channel.h
new file mode 100644
index 0000000..d5b2be1
--- /dev/null
+++ b/aos/events/simple_channel.h
@@ -0,0 +1,25 @@
+#ifndef AOS_EVENTS_SIMPLE_CHANNEL_H_
+#define AOS_EVENTS_SIMPLE_CHANNEL_H_
+
+#include <string>
+
+#include "aos/configuration_generated.h"
+
+namespace aos {
+
+// Structure used to store both a name and a type and look it up in a map.
+struct SimpleChannel {
+ SimpleChannel(const Channel *channel);
+
+ std::string name;
+ std::string type;
+
+ std::string DebugString() const;
+
+ bool operator==(const SimpleChannel &other) const;
+ bool operator<(const SimpleChannel &other) const;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_SIMPLE_CHANNEL_H_
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 1dfb3af..d783a3a 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -216,7 +216,7 @@
SetMsg(msgs_.front());
msgs_.pop_front();
- return std::make_pair(true, queue_->monotonic_now());
+ return std::make_pair(true, event_loop()->monotonic_now());
}
std::pair<bool, monotonic_clock::time_point> DoFetch() override {
@@ -225,7 +225,7 @@
// simpler. And call clear, obviously.
if (!msg_ && queue_->latest_message()) {
SetMsg(queue_->latest_message());
- return std::make_pair(true, queue_->monotonic_now());
+ return std::make_pair(true, event_loop()->monotonic_now());
} else {
return std::make_pair(false, monotonic_clock::min_time);
}
@@ -235,7 +235,7 @@
// latest message from before we started.
SetMsg(msgs_.back());
msgs_.clear();
- return std::make_pair(true, queue_->monotonic_now());
+ return std::make_pair(true, event_loop()->monotonic_now());
}
private:
@@ -282,10 +282,6 @@
void Disable() override;
- ::aos::monotonic_clock::time_point monotonic_now() const {
- return scheduler_->monotonic_now();
- }
-
private:
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedTimerHandler> event_;
@@ -467,6 +463,10 @@
}
}
+std::chrono::nanoseconds SimulatedEventLoopFactory::send_delay() const {
+ return send_delay_;
+}
+
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
@@ -540,7 +540,8 @@
}
void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
- monotonic_clock::time_point event_time = scheduler_->monotonic_now();
+ monotonic_clock::time_point event_time =
+ simulated_event_loop_->monotonic_now();
// Messages are queued in order. If we are the first, add ourselves.
// Otherwise, don't.
@@ -632,10 +633,6 @@
fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
}
-SimpleChannel::SimpleChannel(const Channel *channel)
- : name(CHECK_NOTNULL(CHECK_NOTNULL(channel)->name())->str()),
- type(CHECK_NOTNULL(CHECK_NOTNULL(channel)->type())->str()) {}
-
SimulatedTimerHandler::SimulatedTimerHandler(
EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn)
@@ -649,7 +646,7 @@
monotonic_clock::duration repeat_offset) {
Disable();
const ::aos::monotonic_clock::time_point monotonic_now =
- scheduler_->monotonic_now();
+ simulated_event_loop_->monotonic_now();
base_ = base;
repeat_offset_ = repeat_offset;
if (base < monotonic_now) {
@@ -665,7 +662,7 @@
void SimulatedTimerHandler::HandleEvent() {
const ::aos::monotonic_clock::time_point monotonic_now =
- scheduler_->monotonic_now();
+ simulated_event_loop_->monotonic_now();
if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index db0b840..de37c03 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -12,6 +12,7 @@
#include "absl/container/btree_map.h"
#include "aos/events/event_loop.h"
#include "aos/events/event_scheduler.h"
+#include "aos/events/simple_channel.h"
#include "aos/flatbuffer_merge.h"
#include "aos/flatbuffers.h"
#include "aos/ipc_lib/index.h"
@@ -22,30 +23,6 @@
// Class for simulated fetchers.
class SimulatedChannel;
-struct SimpleChannel {
- SimpleChannel(const Channel *channel);
- std::string name;
- std::string type;
-
- std::string DebugString() const {
- return std::string("{ ") + name + ", " + type + "}";
- }
-
- bool operator==(const SimpleChannel &other) const {
- return name == other.name && type == other.type;
- }
- bool operator<(const SimpleChannel &other) const {
- int name_compare = other.name.compare(name);
- if (name_compare == 0) {
- return other.type < type;
- } else if (name_compare < 0) {
- return true;
- } else {
- return false;
- }
- }
-};
-
class SimulatedEventLoopFactory {
public:
// Constructs a SimulatedEventLoopFactory with the provided configuration.
@@ -71,6 +48,7 @@
// Sets the simulated send delay for the factory.
void set_send_delay(std::chrono::nanoseconds send_delay);
+ std::chrono::nanoseconds send_delay() const;
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 4388687..c1c0db0 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -127,14 +127,23 @@
FlatbufferVector(const Flatbuffer<T> &other)
: data_(other.data(), other.data() + other.size()) {}
+ // Copy constructor.
+ FlatbufferVector(const FlatbufferVector<T> &other)
+ : data_(other.data(), other.data() + other.size()) {}
+
// Move constructor.
- FlatbufferVector(Flatbuffer<T> &&other) : data_(std::move(other.data())) {}
+ FlatbufferVector(FlatbufferVector<T> &&other)
+ : data_(std::move(other.data_)) {}
// Copies the data from the other flatbuffer.
- FlatbufferVector &operator=(const Flatbuffer<T> &other) {
+ FlatbufferVector &operator=(const FlatbufferVector<T> &other) {
data_ = std::vector<uint8_t>(other.data(), other.data() + other.size());
return *this;
}
+ FlatbufferVector &operator=(FlatbufferVector<T> &&other) {
+ data_ = std::move(other.data_);
+ return *this;
+ }
// Constructs an empty flatbuffer of type T.
static FlatbufferVector<T> Empty() {
diff --git a/y2017/control_loops/superstructure/vision_time_adjuster_test.cc b/y2017/control_loops/superstructure/vision_time_adjuster_test.cc
index 9a58049..701f8ed 100644
--- a/y2017/control_loops/superstructure/vision_time_adjuster_test.cc
+++ b/y2017/control_loops/superstructure/vision_time_adjuster_test.cc
@@ -82,7 +82,7 @@
void SendMessages() {
SendDrivetrainPosition();
- if (event_loop_factory_.monotonic_now().time_since_epoch() %
+ if (simulation_event_loop_->monotonic_now().time_since_epoch() %
kVisionTick ==
kVisionDelay) {
SendVisionTarget();
diff --git a/y2019/control_loops/drivetrain/BUILD b/y2019/control_loops/drivetrain/BUILD
index 1575584..bc76d7a 100644
--- a/y2019/control_loops/drivetrain/BUILD
+++ b/y2019/control_loops/drivetrain/BUILD
@@ -214,3 +214,21 @@
"//frc971/control_loops/drivetrain:drivetrain_test_lib",
],
)
+
+cc_binary(
+ name = "drivetrain_replay",
+ srcs = ["drivetrain_replay.cc"],
+ data = ["//y2019:config.json"],
+ deps = [
+ ":drivetrain_base",
+ ":event_loop_localizer",
+ "//aos:configuration",
+ "//aos:init",
+ "//aos/events:shm_event_loop",
+ "//aos/events:simulated_event_loop",
+ "//aos/events/logging:logger",
+ "//frc971/control_loops/drivetrain:drivetrain_lib",
+ "@com_github_gflags_gflags//:gflags",
+ "@com_github_google_glog//:glog",
+ ],
+)
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
new file mode 100644
index 0000000..cb07b68
--- /dev/null
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -0,0 +1,62 @@
+#include <iostream>
+
+#include "aos/configuration.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/init.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/network/team_number.h"
+#include "frc971/control_loops/drivetrain/drivetrain.h"
+#include "gflags/gflags.h"
+#include "y2019/control_loops/drivetrain/drivetrain_base.h"
+#include "y2019/control_loops/drivetrain/event_loop_localizer.h"
+
+DEFINE_string(logfile, "/tmp/logfile.bfbs",
+ "Name of the logfile to read from.");
+DEFINE_string(config, "y2019/config.json",
+ "Name of the config file to replay using.");
+DEFINE_string(output_file, "/tmp/replayed.bfbs",
+ "Name of the logfile to write replayed data to.");
+DEFINE_int32(team, 971, "Team number to use for logfile replay.");
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+
+ aos::network::OverrideTeamNumber(FLAGS_team);
+
+ aos::InitCreate();
+
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
+
+ aos::logger::LogReader reader(FLAGS_logfile, &config.message());
+ // TODO(james): Actually enforce not sending on the same buses as the logfile
+ // spews out.
+ reader.RemapLoggedChannel("/drivetrain",
+ "frc971.control_loops.drivetrain.Status");
+ reader.RemapLoggedChannel("/drivetrain",
+ "frc971.control_loops.drivetrain.Output");
+ reader.Register();
+
+ aos::logger::DetachedBufferWriter file_writer(FLAGS_output_file);
+ std::unique_ptr<aos::EventLoop> log_writer_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("log_writer");
+ log_writer_event_loop->SkipTimingReport();
+ CHECK(nullptr == log_writer_event_loop->node());
+ aos::logger::Logger writer(&file_writer, log_writer_event_loop.get());
+
+ std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("drivetrain");
+ drivetrain_event_loop->SkipTimingReport();
+
+ y2019::control_loops::drivetrain::EventLoopLocalizer localizer(
+ drivetrain_event_loop.get(),
+ y2019::control_loops::drivetrain::GetDrivetrainConfig());
+ frc971::control_loops::drivetrain::DrivetrainLoop drivetrain(
+ y2019::control_loops::drivetrain::GetDrivetrainConfig(),
+ drivetrain_event_loop.get(), &localizer);
+
+ reader.event_loop_factory()->Run();
+
+ aos::Cleanup();
+ return 0;
+}