Move log file manipulation logic out of LogReader
It really had 4 separate layers that should each have been a class.
Split them out in perparation for multi-file file logs.
1) Read chunks of data from a file
2) Read the header and messages from a file.
3) Sort those messages
4) And then send them over the event loop.
Change-Id: Ib885e6f0ed027851a4d7faea71b9391c1b60cf19
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e44f65f..a35b453 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -3,8 +3,8 @@
#include <deque>
#include <vector>
+#include <string_view>
-#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logfile_utils.h"
@@ -59,7 +59,7 @@
// Replays all the channels in the logfile to the event loop.
class LogReader {
public:
- LogReader(absl::string_view filename);
+ LogReader(std::string_view filename);
~LogReader();
// Registers the timer and senders used to resend the messages from the log
@@ -89,143 +89,18 @@
// happened.
private:
- // Reads a chunk of data into data_. Returns false if no data was read.
- bool ReadBlock();
-
- // Returns true if there is a full message available in the buffer, or if we
- // will have to read more data from disk.
- bool MessageAvailable();
-
- // Returns a span with the data for a message from the log file, excluding
- // the size.
- absl::Span<const uint8_t> ReadMessage();
-
// Queues at least max_out_of_order_duration_ messages into channels_.
void QueueMessages();
- // We need to read a large chunk at a time, then kit it up into parts and
- // sort.
- //
- // We want to read 256 KB chunks at a time. This is the fastest read size.
- // This leaves us with a fragmentation problem though.
- //
- // The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
- // chunks into single flatbuffer messages and manage them in a sorted queue.
- // Everything is copied three times (into 256 kb buffer, then into separate
- // buffer, then into sender), but none of it is all that expensive. We can
- // optimize if it is slow later.
- //
- // As we place the elements in the sorted list of times, keep doing this
- // until we read a message that is newer than the threshold.
- //
- // Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
- // small state machine so we can resume), and keep pulling messages back out
- // and sending.
- //
- // For sorting, we want to use the fact that each channel is sorted, and
- // then merge sort the channels. Have a vector of deques, and then hold a
- // sorted list of pointers to those.
- //
- // TODO(austin): Multithreaded read at some point. Gotta go faster!
- // Especially if we start compressing.
+ // Log chunk reader.
+ SortedMessageReader sorted_message_reader_;
- // Allocator which doesn't zero initialize memory.
- template <typename T>
- struct DefaultInitAllocator {
- typedef T value_type;
-
- template <typename U>
- void construct(U *p) {
- ::new (static_cast<void *>(p)) U;
- }
-
- template <typename U, typename... Args>
- void construct(U *p, Args &&... args) {
- ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
- }
-
- T *allocate(std::size_t n) {
- return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
- }
-
- template <typename U>
- void deallocate(U *p, std::size_t /*n*/) {
- ::operator delete(static_cast<void *>(p));
- }
- };
-
- // Minimum amount of data to queue up for sorting before we are guarenteed
- // to not see data out of order.
- std::chrono::nanoseconds max_out_of_order_duration_;
-
- // File descriptor for the log file.
- int fd_ = -1;
+ std::vector<std::unique_ptr<RawSender>> channels_;
SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
EventLoop *event_loop_ = nullptr;
TimerHandler *timer_handler_;
-
- // Vector to read into. This uses an allocator which doesn't zero
- // initialize the memory.
- std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
-
- // Amount of data consumed already in data_.
- size_t consumed_data_ = 0;
-
- // Vector holding the data for the configuration.
- std::vector<uint8_t> configuration_;
-
- // Moves the message to the correct channel queue.
- void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
-
- // Pushes a pointer to the channel for the given timestamp to the sorted
- // channel list.
- void PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index);
-
- // Returns a pointer to the channel with the oldest message in it, and the
- // timestamp.
- const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
- return channel_heap_.front();
- }
-
- // Pops a pointer to the channel with the oldest message in it, and the
- // timestamp.
- std::pair<monotonic_clock::time_point, int> PopOldestChannel();
-
- // Datastructure to hold the list of messages, cached timestamp for the
- // oldest message, and sender to send with.
- struct ChannelData {
- monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
- std::deque<FlatbufferVector<MessageHeader>> data;
- std::unique_ptr<RawSender> raw_sender;
-
- // Returns the oldest message.
- const FlatbufferVector<MessageHeader> &front() { return data.front(); }
-
- // Returns the timestamp for the oldest message.
- const monotonic_clock::time_point front_timestamp() {
- return monotonic_clock::time_point(
- std::chrono::nanoseconds(front().message().monotonic_sent_time()));
- }
- };
-
- // List of channels and messages for them.
- std::vector<ChannelData> channels_;
-
- // Heap of channels so we can track which channel to send next.
- std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
-
- // Timestamp of the newest message in a channel queue.
- monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
-
- // The time at which we need to read another chunk from the logfile.
- monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
-
- // Cached bit for if we have reached the end of the file. Otherwise we will
- // hammer on the kernel asking for more data each time we send.
- bool end_of_file_ = false;
};
} // namespace logger