blob: a123dcc7b433ebf7ef206ffb9b9e3eee2c3b5a36 [file] [log] [blame]
#ifndef AOS_EVENTS_LOGGER_H_
#define AOS_EVENTS_LOGGER_H_
#include <chrono>
#include <deque>
#include <string_view>
#include <vector>
#include "Eigen/Dense"
#include "absl/strings/str_cat.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
namespace aos {
namespace logger {
class LogNamer {
public:
LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
virtual ~LogNamer() {}
virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
const Node *node) = 0;
virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
const std::vector<const Node *> &nodes() const { return nodes_; }
const Node *node() const { return node_; }
protected:
const Node *const node_;
std::vector<const Node *> nodes_;
};
class LocalLogNamer : public LogNamer {
public:
LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
: LogNamer(node), writer_(writer) {}
~LocalLogNamer() override { writer_->Flush(); }
void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
const Node *node) override {
CHECK_EQ(node, this->node());
writer_->WriteSizedFlatbuffer(
absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
}
DetachedBufferWriter *MakeWriter(const Channel *channel) override {
CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
return writer_;
}
DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
<< ": Message is not delivered to this node.";
CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
node_))
<< ": Delivery times aren't logged for this channel on this node.";
return writer_;
}
private:
DetachedBufferWriter *writer_;
};
// TODO(austin): Split naming files from making files so we can re-use the
// naming code to predict the log file names for a provided base name.
class MultiNodeLogNamer : public LogNamer {
public:
MultiNodeLogNamer(std::string_view base_name,
const Configuration *configuration, const Node *node)
: LogNamer(node),
base_name_(base_name),
configuration_(configuration),
data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
// Writes the header to all log files for a specific node. This function
// needs to be called after all the writers are created.
void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
if (node == this->node()) {
data_writer_->WriteSizedFlatbuffer(
absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
} else {
for (std::pair<const Channel *const,
std::unique_ptr<DetachedBufferWriter>> &data_writer :
data_writers_) {
if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
fbb->GetBufferPointer(), fbb->GetSize()));
}
}
}
}
// Makes a data logger for a specific channel.
DetachedBufferWriter *MakeWriter(const Channel *channel) {
// See if we can read the data on this node at all.
const bool is_readable =
configuration::ChannelIsReadableOnNode(channel, this->node());
if (!is_readable) {
return nullptr;
}
// Then, see if we are supposed to log the data here.
const bool log_message =
configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
if (!log_message) {
return nullptr;
}
// Now, sort out if this is data generated on this node, or not. It is
// generated if it is sendable on this node.
if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
return data_writer_.get();
} else {
// Ok, we have data that is being forwarded to us that we are supposed to
// log. It needs to be logged with send timestamps, but be sorted enough
// to be able to be processed.
CHECK(data_writers_.find(channel) == data_writers_.end());
// Track that this node is being logged.
if (configuration::MultiNode(configuration_)) {
const Node *source_node = configuration::GetNode(
configuration_, channel->source_node()->string_view());
if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
nodes_.end()) {
nodes_.emplace_back(source_node);
}
}
return data_writers_
.insert(std::make_pair(
channel,
std::make_unique<DetachedBufferWriter>(absl::StrCat(
base_name_, "_", channel->source_node()->string_view(),
"_data", channel->name()->string_view(), "/",
channel->type()->string_view(), ".bfbs"))))
.first->second.get();
}
}
// Makes a timestamp (or timestamp and data) logger for a channel and
// forwarding connection.
DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
const bool log_delivery_times =
(this->node() == nullptr)
? false
: configuration::ConnectionDeliveryTimeIsLoggedOnNode(
channel, this->node(), this->node());
if (!log_delivery_times) {
return nullptr;
}
return data_writer_.get();
}
const std::vector<const Node *> &nodes() const { return nodes_; }
private:
const std::string base_name_;
const Configuration *const configuration_;
// File to write both delivery timestamps and local data to.
std::unique_ptr<DetachedBufferWriter> data_writer_;
// Files to write remote data to. We want one per channel.
std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
data_writers_;
};
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
// configuration that is sent rately on a channel and would affect execution.
class Logger {
public:
Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
std::chrono::milliseconds polling_period =
std::chrono::milliseconds(100));
Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
std::chrono::milliseconds polling_period =
std::chrono::milliseconds(100));
// Rotates the log file with the new writer. This writes out the header
// again, but keeps going as if nothing else happened.
void Rotate(DetachedBufferWriter *writer);
void Rotate(std::unique_ptr<LogNamer> log_namer);
private:
void WriteHeader();
void WriteHeader(const Node *node);
void DoLogData();
EventLoop *event_loop_;
std::unique_ptr<LogNamer> log_namer_;
// Structure to track both a fetcher, and if the data fetched has been
// written. We may want to delay writing data to disk so that we don't let
// data get too far out of order when written to disk so we can avoid making
// it too hard to sort when reading.
struct FetcherStruct {
std::unique_ptr<RawFetcher> fetcher;
bool written = false;
int channel_index = -1;
LogType log_type = LogType::kLogMessage;
DetachedBufferWriter *writer = nullptr;
DetachedBufferWriter *timestamp_writer = nullptr;
};
std::vector<FetcherStruct> fetchers_;
TimerHandler *timer_handler_;
// Period to poll the channels.
const std::chrono::milliseconds polling_period_;
// Last time that data was written for all channels to disk.
monotonic_clock::time_point last_synchronized_time_;
monotonic_clock::time_point monotonic_start_time_;
realtime_clock::time_point realtime_start_time_;
// Max size that the header has consumed. This much extra data will be
// reserved in the builder to avoid reallocating.
size_t max_header_size_ = 0;
};
// We end up with one of the following 3 log file types.
//
// Single node logged as the source node.
// -> Replayed just on the source node.
//
// Forwarding timestamps only logged from the perspective of the destination
// node.
// -> Matched with data on source node and logged.
//
// Forwarding timestamps with data logged as the destination node.
// -> Replayed just as the destination
// -> Replayed as the source (Much harder, ordering is not defined)
//
// Duplicate data logged. -> CHECK that it matches and explode otherwise.
//
// This can be boiled down to a set of constraints and tools.
//
// 1) Forwarding timestamps and data need to be logged separately.
// 2) Any forwarded data logged on the destination node needs to be logged
// separately such that it can be sorted.
//
// 1) Log reader needs to be able to sort a list of log files.
// 2) Log reader needs to be able to merge sorted lists of log files.
// 3) Log reader needs to be able to match timestamps with messages.
//
// We also need to be able to generate multiple views of a log file depending on
// the target.
// Replays all the channels in the logfile to the event loop.
class LogReader {
public:
// If you want to supply a new configuration that will be used for replay
// (e.g., to change message rates, or to populate an updated schema), then
// pass it in here. It must provide all the channels that the original logged
// config did.
//
// Log filenames are in the following format:
//
// {
// {log1_part0, log1_part1, ...},
// {log2}
// }
// The inner vector is a list of log file chunks which form up a log file.
// The outer vector is a list of log files with subsets of the messages, or
// messages from different nodes.
//
// If the outer vector isn't provided, it is assumed to be of size 1.
LogReader(std::string_view filename,
const Configuration *replay_configuration = nullptr);
LogReader(const std::vector<std::string> &filenames,
const Configuration *replay_configuration = nullptr);
LogReader(const std::vector<std::vector<std::string>> &filenames,
const Configuration *replay_configuration = nullptr);
~LogReader();
// Registers all the callbacks to send the log file data out on an event loop
// created in event_loop_factory. This also updates time to be at the start
// of the log file by running until the log file starts.
// Note: the configuration used in the factory should be configuration()
// below, but can be anything as long as the locations needed to send
// everything are available.
void Register(SimulatedEventLoopFactory *event_loop_factory);
// Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
// and then calls Register.
void Register();
// Registers callbacks for all the events after the log file starts. This is
// only useful when replaying live.
void Register(EventLoop *event_loop);
// Unregisters the senders. You only need to call this if you separately
// supplied an event loop or event loop factory and the lifetimes are such
// that they need to be explicitly destroyed before the LogReader destructor
// gets called.
void Deregister();
// Returns the configuration from the log file.
const Configuration *logged_configuration() const;
// Returns the configuration being used for replay.
// The pointer is invalidated whenever RemapLoggedChannel is called.
const Configuration *configuration() const;
// Returns the nodes that this log file was created on. This is a list of
// pointers to a node in the nodes() list inside configuration(). The
// pointers here are invalidated whenever RemapLoggedChannel is called.
std::vector<const Node *> Nodes() const;
// Returns the starting timestamp for the log file.
monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
// Causes the logger to publish the provided channel on a different name so
// that replayed applications can publish on the proper channel name without
// interference. This operates on raw channel names, without any node or
// application specific mappings.
void RemapLoggedChannel(std::string_view name, std::string_view type,
std::string_view add_prefix = "/original");
template <typename T>
void RemapLoggedChannel(std::string_view name,
std::string_view add_prefix = "/original") {
RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
}
template <typename T>
bool HasChannel(std::string_view name) {
return configuration::GetChannel(log_file_header()->configuration(), name,
T::GetFullyQualifiedName(), "",
nullptr) != nullptr;
}
SimulatedEventLoopFactory *event_loop_factory() {
return event_loop_factory_;
}
const LogFileHeader *log_file_header() const {
return &log_file_header_.message();
}
private:
const Channel *RemapChannel(const EventLoop *event_loop,
const Channel *channel);
// Queues at least max_out_of_order_duration_ messages into channels_.
void QueueMessages();
// Handle constructing a configuration with all the additional remapped
// channels from calls to RemapLoggedChannel.
void MakeRemappedConfig();
const std::vector<std::vector<std::string>> filenames_;
// This is *a* log file header used to provide the logged config. The rest of
// the header is likely distracting.
FlatbufferVector<LogFileHeader> log_file_header_;
Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
// State per node.
class State {
public:
State(std::unique_ptr<ChannelMerger> channel_merger);
// Returns the timestamps, channel_index, and message from a channel.
// update_time (will be) set to true when popping this message causes the
// filter to change the time offset estimation function.
std::tuple<TimestampMerger::DeliveryTimestamp, int,
FlatbufferVector<MessageHeader>>
PopOldest(bool *update_time);
// Returns the monotonic time of the oldest message.
monotonic_clock::time_point OldestMessageTime() const;
// Primes the queues inside State. Should be called before calling
// OldestMessageTime.
void SeedSortedMessages();
// Updates the timestamp filter with the timestamp. Returns true if the
// provided timestamp was actually a forwarding timestamp and used, and
// false otherwise.
bool MaybeUpdateTimestamp(
const TimestampMerger::DeliveryTimestamp &channel_timestamp,
int channel_index);
// Returns the starting time for this node.
monotonic_clock::time_point monotonic_start_time() const {
return channel_merger_->monotonic_start_time();
}
realtime_clock::time_point realtime_start_time() const {
return channel_merger_->realtime_start_time();
}
// Sets the node event loop factory for replaying into a
// SimulatedEventLoopFactory. Returns the EventLoop to use.
EventLoop *SetNodeEventLoopFactory(
NodeEventLoopFactory *node_event_loop_factory);
// Sets and gets the event loop to use.
void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
EventLoop *event_loop() { return event_loop_; }
// Returns the oldest timestamp for the provided channel. This should only
// be called before SeedSortedMessages();
TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
size_t channel) {
return channel_merger_->OldestTimestampForChannel(channel);
}
// Sets the current realtime offset from the monotonic clock for this node
// (if we are on a simulated event loop).
void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
realtime_clock::time_point realtime_time) {
if (node_event_loop_factory_ != nullptr) {
node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
realtime_time);
}
}
// Converts a timestamp from the monotonic clock on this node to the
// distributed clock.
distributed_clock::time_point ToDistributedClock(
monotonic_clock::time_point time) {
return node_event_loop_factory_->ToDistributedClock(time);
}
// Sets the offset (and slope) from the distributed clock.
void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
double distributed_slope) {
node_event_loop_factory_->SetDistributedOffset(distributed_offset,
distributed_slope);
}
// Returns the current time on the remote node which sends messages on
// channel_index.
monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
return channel_target_event_loop_factory_[channel_index]->monotonic_now();
}
// Sets the node we will be merging as, and returns true if there is any
// data on it.
bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
// Sets the number of channels.
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
void SetChannel(
size_t channel, std::unique_ptr<RawSender> sender,
std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
NodeEventLoopFactory *channel_target_event_loop_factory);
// Returns if we have read all the messages from all the logs.
bool at_end() const { return channel_merger_->at_end(); }
// Unregisters everything so we can destory the event loop.
void Deregister();
// Sets the current TimerHandle for the replay callback.
void set_timer_handler(TimerHandler *timer_handler) {
timer_handler_ = timer_handler;
}
// Sets the next wakeup time on the replay callback.
void Setup(monotonic_clock::time_point next_time) {
timer_handler_->Setup(next_time);
}
// Sends a buffer on the provided channel index.
bool Send(size_t channel_index, const void *data, size_t size,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index) {
return channels_[channel_index]->Send(data, size, monotonic_remote_time,
realtime_remote_time,
remote_queue_index);
}
// Returns a debug string for the channel merger.
std::string DebugString() const { return channel_merger_->DebugString(); }
private:
// Log file.
std::unique_ptr<ChannelMerger> channel_merger_;
std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
FlatbufferVector<MessageHeader>>>
sorted_messages_;
// Senders.
std::vector<std::unique_ptr<RawSender>> channels_;
// Factory (if we are in sim) that this loop was created on.
NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
// Event loop.
EventLoop *event_loop_ = nullptr;
// And timer used to send messages.
TimerHandler *timer_handler_;
// Filters (or nullptr if it isn't a forwarded channel) for each channel.
// This corresponds to the object which is shared among all the channels
// going between 2 nodes. The second element in the tuple indicates if this
// is the primary direction or not.
std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
filters_;
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
};
// Node index -> State.
std::vector<std::unique_ptr<State>> states_;
// Creates the requested filter if it doesn't exist, regardless of whether
// these nodes can actually communicate directly. The second return value
// reports if this is the primary direction or not.
std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
const Node *node_a, const Node *node_b);
// FILE to write offsets to (if populated).
FILE *offset_fp_ = nullptr;
// Timestamp of the first piece of data used for the horizontal axis on the
// plot.
aos::realtime_clock::time_point first_time_;
// List of filters for a connection. The pointer to the first node will be
// less than the second node.
std::map<std::tuple<const Node *, const Node *>,
message_bridge::ClippedAverageFilter>
filters_;
// Returns the offset from the monotonic clock for a node to the distributed
// clock. distributed = monotonic + offset;
std::chrono::nanoseconds offset(int node_index) const {
CHECK_LT(node_index, offset_matrix_.rows())
<< ": Got too high of a node index.";
return -std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(offset_matrix_(node_index))) -
base_offset_matrix_(node_index);
}
// Updates the offset matrix solution and sets the per-node distributed
// offsets in the factory.
void UpdateOffsets();
// sample_matrix_ = map_matrix_ * offset_matrix_
Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
// Base offsets. The actual offset is the sum of this and the offset matrix.
// This removes some of the dynamic range challenges from the double above.
Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
base_offset_matrix_;
std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
remapped_configuration_buffer_;
std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
// Map of channel indices to new name. The channel index will be an index into
// logged_configuration(), and the string key will be the name of the channel
// to send on instead of the logged channel name.
std::map<size_t, std::string> remapped_channels_;
// Number of nodes which still have data to send. This is used to figure out
// when to exit.
size_t live_nodes_ = 0;
const Configuration *remapped_configuration_ = nullptr;
const Configuration *replay_configuration_ = nullptr;
// If true, the replay timer will ignore any missing data. This is used
// during startup when we are bootstrapping everything and trying to get to
// the start of all the log files.
bool ignore_missing_data_ = false;
};
} // namespace logger
} // namespace aos
#endif // AOS_EVENTS_LOGGER_H_