blob: 714796e0e50e20f177826b2a42e25d2ddc9b9d68 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
4#include <limits.h>
5#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8
9#include <vector>
10
Austin Schuh05b70472020-01-01 17:11:17 -080011#include "aos/configuration.h"
Austin Schuha36c8902019-12-30 18:07:15 -080012#include "aos/events/logging/logger_generated.h"
13#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080014#include "gflags/gflags.h"
15#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016
17DEFINE_int32(flush_size, 1000000,
18 "Number of outstanding bytes to allow before flushing to disk.");
19
20namespace aos {
21namespace logger {
22
Austin Schuh05b70472020-01-01 17:11:17 -080023namespace chrono = std::chrono;
24
Austin Schuha36c8902019-12-30 18:07:15 -080025DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
26 : fd_(open(std::string(filename).c_str(),
27 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
28 PCHECK(fd_ != -1) << ": Failed to open " << filename;
29}
30
31DetachedBufferWriter::~DetachedBufferWriter() {
32 Flush();
33 PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
34}
35
36void DetachedBufferWriter::QueueSizedFlatbuffer(
37 flatbuffers::FlatBufferBuilder *fbb) {
38 QueueSizedFlatbuffer(fbb->Release());
39}
40
41void DetachedBufferWriter::QueueSizedFlatbuffer(
42 flatbuffers::DetachedBuffer &&buffer) {
43 queued_size_ += buffer.size();
44 queue_.emplace_back(std::move(buffer));
45
46 // Flush if we are at the max number of iovs per writev, or have written
47 // enough data. Otherwise writev will fail with an invalid argument.
48 if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
49 queue_.size() == IOV_MAX) {
50 Flush();
51 }
52}
53
54void DetachedBufferWriter::Flush() {
55 if (queue_.size() == 0u) {
56 return;
57 }
58 iovec_.clear();
59 iovec_.reserve(queue_.size());
60 size_t counted_size = 0;
61 for (size_t i = 0; i < queue_.size(); ++i) {
62 struct iovec n;
63 n.iov_base = queue_[i].data();
64 n.iov_len = queue_[i].size();
65 counted_size += n.iov_len;
66 iovec_.emplace_back(std::move(n));
67 }
68 CHECK_EQ(counted_size, queued_size_);
69 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
70
71 PCHECK(written == static_cast<ssize_t>(queued_size_))
72 << ": Wrote " << written << " expected " << queued_size_;
73
74 queued_size_ = 0;
75 queue_.clear();
76 // TODO(austin): Handle partial writes in some way other than crashing...
77}
78
79flatbuffers::Offset<MessageHeader> PackMessage(
80 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
81 int channel_index, LogType log_type) {
82 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
83
84 switch (log_type) {
85 case LogType::kLogMessage:
86 case LogType::kLogMessageAndDeliveryTime:
87 data_offset =
88 fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
89 break;
90
91 case LogType::kLogDeliveryTimeOnly:
92 break;
93 }
94
95 MessageHeader::Builder message_header_builder(*fbb);
96 message_header_builder.add_channel_index(channel_index);
97 message_header_builder.add_queue_index(context.queue_index);
98 message_header_builder.add_monotonic_sent_time(
99 context.monotonic_event_time.time_since_epoch().count());
100 message_header_builder.add_realtime_sent_time(
101 context.realtime_event_time.time_since_epoch().count());
102
103 switch (log_type) {
104 case LogType::kLogMessage:
105 message_header_builder.add_data(data_offset);
106 break;
107
108 case LogType::kLogMessageAndDeliveryTime:
109 message_header_builder.add_data(data_offset);
110 [[fallthrough]];
111
112 case LogType::kLogDeliveryTimeOnly:
113 message_header_builder.add_monotonic_remote_time(
114 context.monotonic_remote_time.time_since_epoch().count());
115 message_header_builder.add_realtime_remote_time(
116 context.realtime_remote_time.time_since_epoch().count());
117 message_header_builder.add_remote_queue_index(context.remote_queue_index);
118 break;
119 }
120
121 return message_header_builder.Finish();
122}
123
Austin Schuh05b70472020-01-01 17:11:17 -0800124SpanReader::SpanReader(std::string_view filename)
125 : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
126 PCHECK(fd_ != -1) << ": Failed to open " << filename;
127}
128
129absl::Span<const uint8_t> SpanReader::ReadMessage() {
130 // Make sure we have enough for the size.
131 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
132 if (!ReadBlock()) {
133 return absl::Span<const uint8_t>();
134 }
135 }
136
137 // Now make sure we have enough for the message.
138 const size_t data_size =
139 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
140 sizeof(flatbuffers::uoffset_t);
141 while (data_.size() < consumed_data_ + data_size) {
142 if (!ReadBlock()) {
143 return absl::Span<const uint8_t>();
144 }
145 }
146
147 // And return it, consuming the data.
148 const uint8_t *data_ptr = data_.data() + consumed_data_;
149
150 consumed_data_ += data_size;
151
152 return absl::Span<const uint8_t>(data_ptr, data_size);
153}
154
155bool SpanReader::MessageAvailable() {
156 // Are we big enough to read the size?
157 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
158 return false;
159 }
160
161 // Then, are we big enough to read the full message?
162 const size_t data_size =
163 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
164 sizeof(flatbuffers::uoffset_t);
165 if (data_.size() < consumed_data_ + data_size) {
166 return false;
167 }
168
169 return true;
170}
171
172bool SpanReader::ReadBlock() {
173 if (end_of_file_) {
174 return false;
175 }
176
177 // Appends 256k. This is enough that the read call is efficient. We don't
178 // want to spend too much time reading small chunks because the syscalls for
179 // that will be expensive.
180 constexpr size_t kReadSize = 256 * 1024;
181
182 // Strip off any unused data at the front.
183 if (consumed_data_ != 0) {
184 data_.erase(data_.begin(), data_.begin() + consumed_data_);
185 consumed_data_ = 0;
186 }
187
188 const size_t starting_size = data_.size();
189
190 // This should automatically grow the backing store. It won't shrink if we
191 // get a small chunk later. This reduces allocations when we want to append
192 // more data.
193 data_.resize(data_.size() + kReadSize);
194
195 ssize_t count = read(fd_, &data_[starting_size], kReadSize);
196 data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
197 if (count == 0) {
198 end_of_file_ = true;
199 return false;
200 }
201 PCHECK(count > 0);
202
203 return true;
204}
205
206MessageReader::MessageReader(std::string_view filename)
207 : span_reader_(filename) {
208 // Make sure we have enough to read the size.
209 absl::Span<const uint8_t> config_data = span_reader_.ReadMessage();
210
211 // Make sure something was read.
212 CHECK(config_data != absl::Span<const uint8_t>());
213
214 // And copy the config so we have it forever.
215 configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
216
217 max_out_of_order_duration_ = std::chrono::nanoseconds(
218 flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
219 ->max_out_of_order_duration());
220}
221
222std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
223 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
224 if (msg_data == absl::Span<const uint8_t>()) {
225 return std::nullopt;
226 }
227
228 FlatbufferVector<MessageHeader> result{std::vector<uint8_t>(
229 msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end())};
230
231 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
232 chrono::nanoseconds(result.message().monotonic_sent_time()));
233
234 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
235 return result;
236}
237
238SortedMessageReader::SortedMessageReader(std::string_view filename)
239 : message_reader_(filename) {
240 channels_.resize(configuration()->channels()->size());
241
242 QueueMessages();
243}
244
245void SortedMessageReader::EmplaceDataBack(
246 FlatbufferVector<MessageHeader> &&new_data) {
247 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
248 chrono::nanoseconds(new_data.message().monotonic_sent_time()));
249 const size_t channel_index = new_data.message().channel_index();
250 CHECK_LT(channel_index, channels_.size());
251
252 if (channels_[channel_index].data.size() == 0) {
253 channels_[channel_index].oldest_timestamp = timestamp;
254 PushChannelHeap(timestamp, channel_index);
255 }
256 channels_[channel_index].data.emplace_back(std::move(new_data));
257}
258
259namespace {
260
261bool ChannelHeapCompare(
262 const std::pair<monotonic_clock::time_point, int> first,
263 const std::pair<monotonic_clock::time_point, int> second) {
264 if (first.first > second.first) {
265 return true;
266 } else if (first.first == second.first) {
267 return first.second > second.second;
268 } else {
269 return false;
270 }
271}
272
273} // namespace
274
275void SortedMessageReader::PushChannelHeap(monotonic_clock::time_point timestamp,
276 int channel_index) {
277 channel_heap_.push_back(std::make_pair(timestamp, channel_index));
278
279 // The default sort puts the newest message first. Use a custom comparator to
280 // put the oldest message first.
281 std::push_heap(channel_heap_.begin(), channel_heap_.end(),
282 ChannelHeapCompare);
283}
284
285void SortedMessageReader::QueueMessages() {
286 while (true) {
287 // Don't queue if we have enough data already.
288 // When a log file starts, there should be a message from each channel.
289 // Those messages might be very old. Make sure to read a chunk past the
290 // starting time.
291 if (channel_heap_.size() > 0 &&
292 message_reader_.newest_timestamp() >
293 std::max(oldest_message().first, monotonic_start_time()) +
294 message_reader_.max_out_of_order_duration()) {
295 break;
296 }
297
298 if (std::optional<FlatbufferVector<MessageHeader>> msg =
299 message_reader_.ReadMessage()) {
300 EmplaceDataBack(std::move(msg.value()));
301 } else {
302 break;
303 }
304 }
305}
306
307std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
308SortedMessageReader::PopOldestChannel() {
309 std::pair<monotonic_clock::time_point, int> oldest_channel_data =
310 channel_heap_.front();
311 std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
312 &ChannelHeapCompare);
313 channel_heap_.pop_back();
314
315 struct ChannelData &channel = channels_[oldest_channel_data.second];
316
317 FlatbufferVector<MessageHeader> front = std::move(channel.front());
318
319 channel.data.pop_front();
320
321 // Re-push it and update the oldest timestamp.
322 if (channel.data.size() != 0) {
323 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
324 chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
325 PushChannelHeap(timestamp, oldest_channel_data.second);
326 channel.oldest_timestamp = timestamp;
327 } else {
328 channel.oldest_timestamp = monotonic_clock::min_time;
329 }
330
331 if (oldest_channel_data.first > message_reader_.queue_data_time()) {
332 QueueMessages();
333 }
334
335 return std::make_tuple(oldest_channel_data.first, oldest_channel_data.second,
336 std::move(front));
337}
338
Austin Schuha36c8902019-12-30 18:07:15 -0800339} // namespace logger
340} // namespace aos