blob: dc1801d080a538bc093489befddeebb5eeeadfdf [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
4#include <limits.h>
5#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8
9#include <vector>
10
11#include "aos/events/logging/logger_generated.h"
12#include "flatbuffers/flatbuffers.h"
13
14DEFINE_int32(flush_size, 1000000,
15 "Number of outstanding bytes to allow before flushing to disk.");
16
17namespace aos {
18namespace logger {
19
20DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
21 : fd_(open(std::string(filename).c_str(),
22 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
23 PCHECK(fd_ != -1) << ": Failed to open " << filename;
24}
25
26DetachedBufferWriter::~DetachedBufferWriter() {
27 Flush();
28 PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
29}
30
31void DetachedBufferWriter::QueueSizedFlatbuffer(
32 flatbuffers::FlatBufferBuilder *fbb) {
33 QueueSizedFlatbuffer(fbb->Release());
34}
35
36void DetachedBufferWriter::QueueSizedFlatbuffer(
37 flatbuffers::DetachedBuffer &&buffer) {
38 queued_size_ += buffer.size();
39 queue_.emplace_back(std::move(buffer));
40
41 // Flush if we are at the max number of iovs per writev, or have written
42 // enough data. Otherwise writev will fail with an invalid argument.
43 if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
44 queue_.size() == IOV_MAX) {
45 Flush();
46 }
47}
48
49void DetachedBufferWriter::Flush() {
50 if (queue_.size() == 0u) {
51 return;
52 }
53 iovec_.clear();
54 iovec_.reserve(queue_.size());
55 size_t counted_size = 0;
56 for (size_t i = 0; i < queue_.size(); ++i) {
57 struct iovec n;
58 n.iov_base = queue_[i].data();
59 n.iov_len = queue_[i].size();
60 counted_size += n.iov_len;
61 iovec_.emplace_back(std::move(n));
62 }
63 CHECK_EQ(counted_size, queued_size_);
64 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
65
66 PCHECK(written == static_cast<ssize_t>(queued_size_))
67 << ": Wrote " << written << " expected " << queued_size_;
68
69 queued_size_ = 0;
70 queue_.clear();
71 // TODO(austin): Handle partial writes in some way other than crashing...
72}
73
74flatbuffers::Offset<MessageHeader> PackMessage(
75 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
76 int channel_index, LogType log_type) {
77 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
78
79 switch (log_type) {
80 case LogType::kLogMessage:
81 case LogType::kLogMessageAndDeliveryTime:
82 data_offset =
83 fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
84 break;
85
86 case LogType::kLogDeliveryTimeOnly:
87 break;
88 }
89
90 MessageHeader::Builder message_header_builder(*fbb);
91 message_header_builder.add_channel_index(channel_index);
92 message_header_builder.add_queue_index(context.queue_index);
93 message_header_builder.add_monotonic_sent_time(
94 context.monotonic_event_time.time_since_epoch().count());
95 message_header_builder.add_realtime_sent_time(
96 context.realtime_event_time.time_since_epoch().count());
97
98 switch (log_type) {
99 case LogType::kLogMessage:
100 message_header_builder.add_data(data_offset);
101 break;
102
103 case LogType::kLogMessageAndDeliveryTime:
104 message_header_builder.add_data(data_offset);
105 [[fallthrough]];
106
107 case LogType::kLogDeliveryTimeOnly:
108 message_header_builder.add_monotonic_remote_time(
109 context.monotonic_remote_time.time_since_epoch().count());
110 message_header_builder.add_realtime_remote_time(
111 context.realtime_remote_time.time_since_epoch().count());
112 message_header_builder.add_remote_queue_index(context.remote_queue_index);
113 break;
114 }
115
116 return message_header_builder.Finish();
117}
118
119} // namespace logger
120} // namespace aos