blob: d44a885d08bdd51024559733b12f5f6bea60dca8 [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
4#include <deque>
5#include <vector>
6
7#include "absl/strings/string_view.h"
8#include "absl/types/span.h"
9#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080010#include "aos/events/logging/logger_generated.h"
Austin Schuh92547522019-12-28 14:33:43 -080011#include "aos/events/simulated_event_loop.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "aos/time/time.h"
13#include "flatbuffers/flatbuffers.h"
14
15namespace aos {
16namespace logger {
17
18// This class manages efficiently writing a sequence of detached buffers to a
19// file. It queues them up and batches the write operation.
20class DetachedBufferWriter {
21 public:
22 DetachedBufferWriter(absl::string_view filename);
23 ~DetachedBufferWriter();
24
25 // TODO(austin): Snappy compress the log file if it ends with .snappy!
26
27 // Queues up a finished FlatBufferBuilder to be written. Steals the detached
28 // buffer from it.
29 void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
30 // Queues up a detached buffer directly.
31 void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
32
33 // Triggers data to be provided to the kernel and written.
34 void Flush();
35
36 private:
37 int fd_ = -1;
38
39 // Size of all the data in the queue.
40 size_t queued_size_ = 0;
41
42 // List of buffers to flush.
43 std::vector<flatbuffers::DetachedBuffer> queue_;
44 // List of iovecs to use with writev. This is a member variable to avoid
45 // churn.
46 std::vector<struct iovec> iovec_;
47};
48
Austin Schuh15649d62019-12-28 16:36:38 -080049enum class LogType : uint8_t {
50 // The message originated on this node and should be logged here.
51 kLogMessage,
52 // The message originated on another node, but only the delivery times are
53 // logged here.
54 kLogDeliveryTimeOnly,
55 // The message originated on another node. Log it and the delivery times
56 // together. The message_gateway is responsible for logging any messages
57 // which didn't get delivered.
58 kLogMessageAndDeliveryTime
59};
Austin Schuh646b7b82019-12-22 21:38:55 -080060
Austin Schuhe309d2a2019-11-29 13:25:21 -080061// Logs all channels available in the event loop to disk every 100 ms.
62// Start by logging one message per channel to capture any state and
63// configuration that is sent rately on a channel and would affect execution.
64class Logger {
65 public:
66 Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
67 std::chrono::milliseconds polling_period =
68 std::chrono::milliseconds(100));
69
70 private:
71 void DoLogData();
72
73 EventLoop *event_loop_;
74 DetachedBufferWriter *writer_;
75
76 // Structure to track both a fetcher, and if the data fetched has been
77 // written. We may want to delay writing data to disk so that we don't let
78 // data get too far out of order when written to disk so we can avoid making
79 // it too hard to sort when reading.
80 struct FetcherStruct {
81 std::unique_ptr<RawFetcher> fetcher;
82 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -080083
84 LogType log_type;
Austin Schuhe309d2a2019-11-29 13:25:21 -080085 };
86
87 std::vector<FetcherStruct> fetchers_;
88 TimerHandler *timer_handler_;
89
90 // Period to poll the channels.
91 const std::chrono::milliseconds polling_period_;
92
93 // Last time that data was written for all channels to disk.
94 monotonic_clock::time_point last_synchronized_time_;
95
96 // Max size that the header has consumed. This much extra data will be
97 // reserved in the builder to avoid reallocating.
98 size_t max_header_size_ = 0;
99};
100
Austin Schuh15649d62019-12-28 16:36:38 -0800101// Packes a message pointed to by the context into a MessageHeader.
102flatbuffers::Offset<MessageHeader> PackMessage(
103 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
104 int channel_index, LogType log_type);
105
Austin Schuhe309d2a2019-11-29 13:25:21 -0800106// Replays all the channels in the logfile to the event loop.
107class LogReader {
108 public:
109 LogReader(absl::string_view filename);
James Kuszmaul7daef362019-12-31 18:28:17 -0800110 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800111
112 // Registers the timer and senders used to resend the messages from the log
113 // file.
114 void Register(EventLoop *event_loop);
Austin Schuh92547522019-12-28 14:33:43 -0800115 // Registers everything, but also updates the real time time in sync. Runs
116 // until the log file starts.
117 void Register(SimulatedEventLoopFactory *factory);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800118 // Unregisters the senders.
119 void Deregister();
120
121 // TODO(austin): Remap channels?
122
123 // Returns the configuration from the log file.
Austin Schuh15649d62019-12-28 16:36:38 -0800124 const Configuration *configuration() const;
125
126 // Returns the node that this log file was created on.
127 const Node *node() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800128
129 // Returns the starting timestamp for the log file.
130 monotonic_clock::time_point monotonic_start_time();
131 realtime_clock::time_point realtime_start_time();
132
133 // TODO(austin): Add the ability to re-publish the fetched messages. Add 2
134 // options, one which publishes them *now*, and another which publishes them
135 // to the simulated event loop factory back in time where they actually
136 // happened.
137
138 private:
139 // Reads a chunk of data into data_. Returns false if no data was read.
140 bool ReadBlock();
141
142 // Returns true if there is a full message available in the buffer, or if we
143 // will have to read more data from disk.
144 bool MessageAvailable();
145
Austin Schuh15649d62019-12-28 16:36:38 -0800146 // Returns a span with the data for a message from the log file, excluding
147 // the size.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800148 absl::Span<const uint8_t> ReadMessage();
149
150 // Queues at least max_out_of_order_duration_ messages into channels_.
151 void QueueMessages();
152
153 // We need to read a large chunk at a time, then kit it up into parts and
154 // sort.
155 //
156 // We want to read 256 KB chunks at a time. This is the fastest read size.
157 // This leaves us with a fragmentation problem though.
158 //
159 // The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
160 // chunks into single flatbuffer messages and manage them in a sorted queue.
161 // Everything is copied three times (into 256 kb buffer, then into separate
162 // buffer, then into sender), but none of it is all that expensive. We can
163 // optimize if it is slow later.
164 //
Austin Schuh15649d62019-12-28 16:36:38 -0800165 // As we place the elements in the sorted list of times, keep doing this
166 // until we read a message that is newer than the threshold.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800167 //
168 // Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
169 // small state machine so we can resume), and keep pulling messages back out
170 // and sending.
171 //
Austin Schuh15649d62019-12-28 16:36:38 -0800172 // For sorting, we want to use the fact that each channel is sorted, and
173 // then merge sort the channels. Have a vector of deques, and then hold a
174 // sorted list of pointers to those.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800175 //
176 // TODO(austin): Multithreaded read at some point. Gotta go faster!
177 // Especially if we start compressing.
178
179 // Allocator which doesn't zero initialize memory.
180 template <typename T>
181 struct DefaultInitAllocator {
182 typedef T value_type;
183
184 template <typename U>
185 void construct(U *p) {
186 ::new (static_cast<void *>(p)) U;
187 }
188
189 template <typename U, typename... Args>
190 void construct(U *p, Args &&... args) {
191 ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
192 }
193
194 T *allocate(std::size_t n) {
195 return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
196 }
197
198 template <typename U>
199 void deallocate(U *p, std::size_t /*n*/) {
200 ::operator delete(static_cast<void *>(p));
201 }
202 };
203
Austin Schuh15649d62019-12-28 16:36:38 -0800204 // Minimum amount of data to queue up for sorting before we are guarenteed
205 // to not see data out of order.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800206 std::chrono::nanoseconds max_out_of_order_duration_;
207
208 // File descriptor for the log file.
209 int fd_ = -1;
210
Austin Schuh92547522019-12-28 14:33:43 -0800211 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
212 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
213 EventLoop *event_loop_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800214 TimerHandler *timer_handler_;
215
Austin Schuh15649d62019-12-28 16:36:38 -0800216 // Vector to read into. This uses an allocator which doesn't zero
217 // initialize the memory.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800218 std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
219
220 // Amount of data consumed already in data_.
221 size_t consumed_data_ = 0;
222
223 // Vector holding the data for the configuration.
224 std::vector<uint8_t> configuration_;
225
226 // Moves the message to the correct channel queue.
227 void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
228
229 // Pushes a pointer to the channel for the given timestamp to the sorted
230 // channel list.
231 void PushChannelHeap(monotonic_clock::time_point timestamp,
232 int channel_index);
233
234 // Returns a pointer to the channel with the oldest message in it, and the
235 // timestamp.
236 const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
237 return channel_heap_.front();
238 }
239
240 // Pops a pointer to the channel with the oldest message in it, and the
241 // timestamp.
242 std::pair<monotonic_clock::time_point, int> PopOldestChannel();
243
Austin Schuh15649d62019-12-28 16:36:38 -0800244 // Datastructure to hold the list of messages, cached timestamp for the
245 // oldest message, and sender to send with.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800246 struct ChannelData {
247 monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
248 std::deque<FlatbufferVector<MessageHeader>> data;
249 std::unique_ptr<RawSender> raw_sender;
250
251 // Returns the oldest message.
252 const FlatbufferVector<MessageHeader> &front() { return data.front(); }
253
254 // Returns the timestamp for the oldest message.
255 const monotonic_clock::time_point front_timestamp() {
256 return monotonic_clock::time_point(
257 std::chrono::nanoseconds(front().message().monotonic_sent_time()));
258 }
259 };
260
261 // List of channels and messages for them.
262 std::vector<ChannelData> channels_;
263
264 // Heap of channels so we can track which channel to send next.
265 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
266
267 // Timestamp of the newest message in a channel queue.
268 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
269
270 // The time at which we need to read another chunk from the logfile.
271 monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
272
273 // Cached bit for if we have reached the end of the file. Otherwise we will
274 // hammer on the kernel asking for more data each time we send.
275 bool end_of_file_ = false;
276};
277
278} // namespace logger
279} // namespace aos
280
281#endif // AOS_EVENTS_LOGGER_H_