blob: e44f65ff57d068d3ce39850de4c509a4fca2dbc8 [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
4#include <deque>
5#include <vector>
6
7#include "absl/strings/string_view.h"
8#include "absl/types/span.h"
9#include "aos/events/event_loop.h"
Austin Schuha36c8902019-12-30 18:07:15 -080010#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080011#include "aos/events/logging/logger_generated.h"
Austin Schuh92547522019-12-28 14:33:43 -080012#include "aos/events/simulated_event_loop.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080013#include "aos/time/time.h"
14#include "flatbuffers/flatbuffers.h"
15
16namespace aos {
17namespace logger {
18
Austin Schuhe309d2a2019-11-29 13:25:21 -080019// Logs all channels available in the event loop to disk every 100 ms.
20// Start by logging one message per channel to capture any state and
21// configuration that is sent rately on a channel and would affect execution.
22class Logger {
23 public:
24 Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
25 std::chrono::milliseconds polling_period =
26 std::chrono::milliseconds(100));
27
28 private:
29 void DoLogData();
30
31 EventLoop *event_loop_;
32 DetachedBufferWriter *writer_;
33
34 // Structure to track both a fetcher, and if the data fetched has been
35 // written. We may want to delay writing data to disk so that we don't let
36 // data get too far out of order when written to disk so we can avoid making
37 // it too hard to sort when reading.
38 struct FetcherStruct {
39 std::unique_ptr<RawFetcher> fetcher;
40 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -080041
42 LogType log_type;
Austin Schuhe309d2a2019-11-29 13:25:21 -080043 };
44
45 std::vector<FetcherStruct> fetchers_;
46 TimerHandler *timer_handler_;
47
48 // Period to poll the channels.
49 const std::chrono::milliseconds polling_period_;
50
51 // Last time that data was written for all channels to disk.
52 monotonic_clock::time_point last_synchronized_time_;
53
54 // Max size that the header has consumed. This much extra data will be
55 // reserved in the builder to avoid reallocating.
56 size_t max_header_size_ = 0;
57};
58
59// Replays all the channels in the logfile to the event loop.
60class LogReader {
61 public:
62 LogReader(absl::string_view filename);
James Kuszmaul7daef362019-12-31 18:28:17 -080063 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -080064
65 // Registers the timer and senders used to resend the messages from the log
66 // file.
67 void Register(EventLoop *event_loop);
Austin Schuh92547522019-12-28 14:33:43 -080068 // Registers everything, but also updates the real time time in sync. Runs
69 // until the log file starts.
70 void Register(SimulatedEventLoopFactory *factory);
Austin Schuhe309d2a2019-11-29 13:25:21 -080071 // Unregisters the senders.
72 void Deregister();
73
74 // TODO(austin): Remap channels?
75
76 // Returns the configuration from the log file.
Austin Schuh15649d62019-12-28 16:36:38 -080077 const Configuration *configuration() const;
78
79 // Returns the node that this log file was created on.
80 const Node *node() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -080081
82 // Returns the starting timestamp for the log file.
83 monotonic_clock::time_point monotonic_start_time();
84 realtime_clock::time_point realtime_start_time();
85
86 // TODO(austin): Add the ability to re-publish the fetched messages. Add 2
87 // options, one which publishes them *now*, and another which publishes them
88 // to the simulated event loop factory back in time where they actually
89 // happened.
90
91 private:
92 // Reads a chunk of data into data_. Returns false if no data was read.
93 bool ReadBlock();
94
95 // Returns true if there is a full message available in the buffer, or if we
96 // will have to read more data from disk.
97 bool MessageAvailable();
98
Austin Schuh15649d62019-12-28 16:36:38 -080099 // Returns a span with the data for a message from the log file, excluding
100 // the size.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800101 absl::Span<const uint8_t> ReadMessage();
102
103 // Queues at least max_out_of_order_duration_ messages into channels_.
104 void QueueMessages();
105
106 // We need to read a large chunk at a time, then kit it up into parts and
107 // sort.
108 //
109 // We want to read 256 KB chunks at a time. This is the fastest read size.
110 // This leaves us with a fragmentation problem though.
111 //
112 // The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
113 // chunks into single flatbuffer messages and manage them in a sorted queue.
114 // Everything is copied three times (into 256 kb buffer, then into separate
115 // buffer, then into sender), but none of it is all that expensive. We can
116 // optimize if it is slow later.
117 //
Austin Schuh15649d62019-12-28 16:36:38 -0800118 // As we place the elements in the sorted list of times, keep doing this
119 // until we read a message that is newer than the threshold.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800120 //
121 // Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
122 // small state machine so we can resume), and keep pulling messages back out
123 // and sending.
124 //
Austin Schuh15649d62019-12-28 16:36:38 -0800125 // For sorting, we want to use the fact that each channel is sorted, and
126 // then merge sort the channels. Have a vector of deques, and then hold a
127 // sorted list of pointers to those.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800128 //
129 // TODO(austin): Multithreaded read at some point. Gotta go faster!
130 // Especially if we start compressing.
131
132 // Allocator which doesn't zero initialize memory.
133 template <typename T>
134 struct DefaultInitAllocator {
135 typedef T value_type;
136
137 template <typename U>
138 void construct(U *p) {
139 ::new (static_cast<void *>(p)) U;
140 }
141
142 template <typename U, typename... Args>
143 void construct(U *p, Args &&... args) {
144 ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
145 }
146
147 T *allocate(std::size_t n) {
148 return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
149 }
150
151 template <typename U>
152 void deallocate(U *p, std::size_t /*n*/) {
153 ::operator delete(static_cast<void *>(p));
154 }
155 };
156
Austin Schuh15649d62019-12-28 16:36:38 -0800157 // Minimum amount of data to queue up for sorting before we are guarenteed
158 // to not see data out of order.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800159 std::chrono::nanoseconds max_out_of_order_duration_;
160
161 // File descriptor for the log file.
162 int fd_ = -1;
163
Austin Schuh92547522019-12-28 14:33:43 -0800164 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
165 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
166 EventLoop *event_loop_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800167 TimerHandler *timer_handler_;
168
Austin Schuh15649d62019-12-28 16:36:38 -0800169 // Vector to read into. This uses an allocator which doesn't zero
170 // initialize the memory.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800171 std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
172
173 // Amount of data consumed already in data_.
174 size_t consumed_data_ = 0;
175
176 // Vector holding the data for the configuration.
177 std::vector<uint8_t> configuration_;
178
179 // Moves the message to the correct channel queue.
180 void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
181
182 // Pushes a pointer to the channel for the given timestamp to the sorted
183 // channel list.
184 void PushChannelHeap(monotonic_clock::time_point timestamp,
185 int channel_index);
186
187 // Returns a pointer to the channel with the oldest message in it, and the
188 // timestamp.
189 const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
190 return channel_heap_.front();
191 }
192
193 // Pops a pointer to the channel with the oldest message in it, and the
194 // timestamp.
195 std::pair<monotonic_clock::time_point, int> PopOldestChannel();
196
Austin Schuh15649d62019-12-28 16:36:38 -0800197 // Datastructure to hold the list of messages, cached timestamp for the
198 // oldest message, and sender to send with.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800199 struct ChannelData {
200 monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
201 std::deque<FlatbufferVector<MessageHeader>> data;
202 std::unique_ptr<RawSender> raw_sender;
203
204 // Returns the oldest message.
205 const FlatbufferVector<MessageHeader> &front() { return data.front(); }
206
207 // Returns the timestamp for the oldest message.
208 const monotonic_clock::time_point front_timestamp() {
209 return monotonic_clock::time_point(
210 std::chrono::nanoseconds(front().message().monotonic_sent_time()));
211 }
212 };
213
214 // List of channels and messages for them.
215 std::vector<ChannelData> channels_;
216
217 // Heap of channels so we can track which channel to send next.
218 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
219
220 // Timestamp of the newest message in a channel queue.
221 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
222
223 // The time at which we need to read another chunk from the logfile.
224 monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
225
226 // Cached bit for if we have reached the end of the file. Otherwise we will
227 // hammer on the kernel asking for more data each time we send.
228 bool end_of_file_ = false;
229};
230
231} // namespace logger
232} // namespace aos
233
234#endif // AOS_EVENTS_LOGGER_H_