blob: f03aa64abe9ee7f8b21845f7548598e77166ac05 [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
4#include <deque>
5#include <vector>
6
7#include "absl/strings/string_view.h"
8#include "absl/types/span.h"
9#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080010#include "aos/events/logging/logger_generated.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080011#include "aos/time/time.h"
12#include "flatbuffers/flatbuffers.h"
13
14namespace aos {
15namespace logger {
16
17// This class manages efficiently writing a sequence of detached buffers to a
18// file. It queues them up and batches the write operation.
19class DetachedBufferWriter {
20 public:
21 DetachedBufferWriter(absl::string_view filename);
22 ~DetachedBufferWriter();
23
24 // TODO(austin): Snappy compress the log file if it ends with .snappy!
25
26 // Queues up a finished FlatBufferBuilder to be written. Steals the detached
27 // buffer from it.
28 void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
29 // Queues up a detached buffer directly.
30 void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
31
32 // Triggers data to be provided to the kernel and written.
33 void Flush();
34
35 private:
36 int fd_ = -1;
37
38 // Size of all the data in the queue.
39 size_t queued_size_ = 0;
40
41 // List of buffers to flush.
42 std::vector<flatbuffers::DetachedBuffer> queue_;
43 // List of iovecs to use with writev. This is a member variable to avoid
44 // churn.
45 std::vector<struct iovec> iovec_;
46};
47
48// Logs all channels available in the event loop to disk every 100 ms.
49// Start by logging one message per channel to capture any state and
50// configuration that is sent rately on a channel and would affect execution.
51class Logger {
52 public:
53 Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
54 std::chrono::milliseconds polling_period =
55 std::chrono::milliseconds(100));
56
57 private:
58 void DoLogData();
59
60 EventLoop *event_loop_;
61 DetachedBufferWriter *writer_;
62
63 // Structure to track both a fetcher, and if the data fetched has been
64 // written. We may want to delay writing data to disk so that we don't let
65 // data get too far out of order when written to disk so we can avoid making
66 // it too hard to sort when reading.
67 struct FetcherStruct {
68 std::unique_ptr<RawFetcher> fetcher;
69 bool written = false;
70 };
71
72 std::vector<FetcherStruct> fetchers_;
73 TimerHandler *timer_handler_;
74
75 // Period to poll the channels.
76 const std::chrono::milliseconds polling_period_;
77
78 // Last time that data was written for all channels to disk.
79 monotonic_clock::time_point last_synchronized_time_;
80
81 // Max size that the header has consumed. This much extra data will be
82 // reserved in the builder to avoid reallocating.
83 size_t max_header_size_ = 0;
84};
85
86// Replays all the channels in the logfile to the event loop.
87class LogReader {
88 public:
89 LogReader(absl::string_view filename);
90
91 // Registers the timer and senders used to resend the messages from the log
92 // file.
93 void Register(EventLoop *event_loop);
94 // Unregisters the senders.
95 void Deregister();
96
97 // TODO(austin): Remap channels?
98
99 // Returns the configuration from the log file.
100 const Configuration *configuration();
101
102 // Returns the starting timestamp for the log file.
103 monotonic_clock::time_point monotonic_start_time();
104 realtime_clock::time_point realtime_start_time();
105
106 // TODO(austin): Add the ability to re-publish the fetched messages. Add 2
107 // options, one which publishes them *now*, and another which publishes them
108 // to the simulated event loop factory back in time where they actually
109 // happened.
110
111 private:
112 // Reads a chunk of data into data_. Returns false if no data was read.
113 bool ReadBlock();
114
115 // Returns true if there is a full message available in the buffer, or if we
116 // will have to read more data from disk.
117 bool MessageAvailable();
118
119 // Returns a span with the data for a message from the log file, excluding the
120 // size.
121 absl::Span<const uint8_t> ReadMessage();
122
123 // Queues at least max_out_of_order_duration_ messages into channels_.
124 void QueueMessages();
125
126 // We need to read a large chunk at a time, then kit it up into parts and
127 // sort.
128 //
129 // We want to read 256 KB chunks at a time. This is the fastest read size.
130 // This leaves us with a fragmentation problem though.
131 //
132 // The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
133 // chunks into single flatbuffer messages and manage them in a sorted queue.
134 // Everything is copied three times (into 256 kb buffer, then into separate
135 // buffer, then into sender), but none of it is all that expensive. We can
136 // optimize if it is slow later.
137 //
138 // As we place the elements in the sorted list of times, keep doing this until
139 // we read a message that is newer than the threshold.
140 //
141 // Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
142 // small state machine so we can resume), and keep pulling messages back out
143 // and sending.
144 //
145 // For sorting, we want to use the fact that each channel is sorted, and then
146 // merge sort the channels. Have a vector of deques, and then hold a sorted
147 // list of pointers to those.
148 //
149 // TODO(austin): Multithreaded read at some point. Gotta go faster!
150 // Especially if we start compressing.
151
152 // Allocator which doesn't zero initialize memory.
153 template <typename T>
154 struct DefaultInitAllocator {
155 typedef T value_type;
156
157 template <typename U>
158 void construct(U *p) {
159 ::new (static_cast<void *>(p)) U;
160 }
161
162 template <typename U, typename... Args>
163 void construct(U *p, Args &&... args) {
164 ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
165 }
166
167 T *allocate(std::size_t n) {
168 return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
169 }
170
171 template <typename U>
172 void deallocate(U *p, std::size_t /*n*/) {
173 ::operator delete(static_cast<void *>(p));
174 }
175 };
176
177 // Minimum amount of data to queue up for sorting before we are guarenteed to
178 // not see data out of order.
179 std::chrono::nanoseconds max_out_of_order_duration_;
180
181 // File descriptor for the log file.
182 int fd_ = -1;
183
184 EventLoop *event_loop_;
185 TimerHandler *timer_handler_;
186
187 // Vector to read into. This uses an allocator which doesn't zero initialize
188 // the memory.
189 std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
190
191 // Amount of data consumed already in data_.
192 size_t consumed_data_ = 0;
193
194 // Vector holding the data for the configuration.
195 std::vector<uint8_t> configuration_;
196
197 // Moves the message to the correct channel queue.
198 void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
199
200 // Pushes a pointer to the channel for the given timestamp to the sorted
201 // channel list.
202 void PushChannelHeap(monotonic_clock::time_point timestamp,
203 int channel_index);
204
205 // Returns a pointer to the channel with the oldest message in it, and the
206 // timestamp.
207 const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
208 return channel_heap_.front();
209 }
210
211 // Pops a pointer to the channel with the oldest message in it, and the
212 // timestamp.
213 std::pair<monotonic_clock::time_point, int> PopOldestChannel();
214
215 // Datastructure to hold the list of messages, cached timestamp for the oldest
216 // message, and sender to send with.
217 struct ChannelData {
218 monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
219 std::deque<FlatbufferVector<MessageHeader>> data;
220 std::unique_ptr<RawSender> raw_sender;
221
222 // Returns the oldest message.
223 const FlatbufferVector<MessageHeader> &front() { return data.front(); }
224
225 // Returns the timestamp for the oldest message.
226 const monotonic_clock::time_point front_timestamp() {
227 return monotonic_clock::time_point(
228 std::chrono::nanoseconds(front().message().monotonic_sent_time()));
229 }
230 };
231
232 // List of channels and messages for them.
233 std::vector<ChannelData> channels_;
234
235 // Heap of channels so we can track which channel to send next.
236 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
237
238 // Timestamp of the newest message in a channel queue.
239 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
240
241 // The time at which we need to read another chunk from the logfile.
242 monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
243
244 // Cached bit for if we have reached the end of the file. Otherwise we will
245 // hammer on the kernel asking for more data each time we send.
246 bool end_of_file_ = false;
247};
248
249} // namespace logger
250} // namespace aos
251
252#endif // AOS_EVENTS_LOGGER_H_