blob: a332929dce705ec53e4aed94649f726b0fafdd53 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
12#include "aos/events/logging/uuid.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17namespace logger {
18
19void LogNamer::UpdateHeader(
20 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
21 const UUID &uuid, int parts_index) const {
22 header->mutable_message()->mutate_parts_index(parts_index);
23 CHECK_EQ(uuid.string_view().size(),
24 header->mutable_message()->mutable_parts_uuid()->size());
25 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
26 reinterpret_cast<char *>(
27 header->mutable_message()->mutable_parts_uuid()->Data()));
28}
29
30void LocalLogNamer::WriteHeader(
31 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
32 const Node *node) {
33 CHECK_EQ(node, this->node());
34 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070035 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070036}
37
38DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
39 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
40 return data_writer_.get();
41}
42
43void LocalLogNamer::Rotate(
44 const Node *node,
45 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
46 CHECK(node == this->node());
47 ++part_number_;
48 *data_writer_ = std::move(*OpenDataWriter());
49 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070050 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070051}
52
53DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
54 const Channel *channel) {
55 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
56 << ": Message is not delivered to this node.";
57 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
58 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
59 node_))
60 << ": Delivery times aren't logged for this channel on this node.";
61 return data_writer_.get();
62}
63
64DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
65 const Channel * /*channel*/, const Node * /*node*/) {
66 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
67 return nullptr;
68}
69
70MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
71 const Configuration *configuration,
72 const Node *node)
73 : LogNamer(node),
74 base_name_(base_name),
75 configuration_(configuration),
76 uuid_(UUID::Random()),
77 data_writer_(OpenDataWriter()) {}
78
79void MultiNodeLogNamer::WriteHeader(
80 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
81 const Node *node) {
82 if (node == this->node()) {
83 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070084 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070085 } else {
86 for (std::pair<const Channel *const, DataWriter> &data_writer :
87 data_writers_) {
88 if (node == data_writer.second.node) {
89 UpdateHeader(header, data_writer.second.uuid,
90 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -070091 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070092 }
93 }
94 }
95}
96
97void MultiNodeLogNamer::Rotate(
98 const Node *node,
99 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
100 if (node == this->node()) {
101 ++part_number_;
102 *data_writer_ = std::move(*OpenDataWriter());
103 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700104 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700105 } else {
106 for (std::pair<const Channel *const, DataWriter> &data_writer :
107 data_writers_) {
108 if (node == data_writer.second.node) {
109 ++data_writer.second.part_number;
110 data_writer.second.rotate(data_writer.first, &data_writer.second);
111 UpdateHeader(header, data_writer.second.uuid,
112 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700113 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700114 }
115 }
116 }
117}
118
119DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
120 // See if we can read the data on this node at all.
121 const bool is_readable =
122 configuration::ChannelIsReadableOnNode(channel, this->node());
123 if (!is_readable) {
124 return nullptr;
125 }
126
127 // Then, see if we are supposed to log the data here.
128 const bool log_message =
129 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
130
131 if (!log_message) {
132 return nullptr;
133 }
134
135 // Now, sort out if this is data generated on this node, or not. It is
136 // generated if it is sendable on this node.
137 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
138 return data_writer_.get();
139 }
140
141 // Ok, we have data that is being forwarded to us that we are supposed to
142 // log. It needs to be logged with send timestamps, but be sorted enough
143 // to be able to be processed.
144 CHECK(data_writers_.find(channel) == data_writers_.end());
145
146 // Track that this node is being logged.
147 const Node *source_node = configuration::GetNode(
148 configuration_, channel->source_node()->string_view());
149
150 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
151 nodes_.emplace_back(source_node);
152 }
153
154 DataWriter data_writer;
155 data_writer.node = source_node;
156 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
157 OpenWriter(channel, data_writer);
158 };
159 data_writer.rotate(channel, &data_writer);
160
161 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
162 .first->second.writer.get();
163}
164
165DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
166 const Channel *channel, const Node *node) {
167 // See if we can read the data on this node at all.
168 const bool is_readable =
169 configuration::ChannelIsReadableOnNode(channel, this->node());
170 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
171
172 CHECK(data_writers_.find(channel) == data_writers_.end());
173
174 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
175 nodes_.emplace_back(node);
176 }
177
178 DataWriter data_writer;
179 data_writer.node = node;
180 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
181 OpenForwardedTimestampWriter(channel, data_writer);
182 };
183 data_writer.rotate(channel, &data_writer);
184
185 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
186 .first->second.writer.get();
187}
188
189DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
190 const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700191 bool log_delivery_times = false;
192 if (this->node() != nullptr) {
193 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
194 channel, this->node(), this->node());
195 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700196 if (!log_delivery_times) {
197 return nullptr;
198 }
199
200 return data_writer_.get();
201}
202
Brian Silverman0465fcf2020-09-24 00:29:18 -0700203void MultiNodeLogNamer::Close() {
204 for (std::pair<const Channel *const, DataWriter> &data_writer :
205 data_writers_) {
206 if (data_writer.second.writer) {
207 data_writer.second.writer->Close();
208 if (data_writer.second.writer->ran_out_of_space()) {
209 ran_out_of_space_ = true;
210 data_writer.second.writer->acknowledge_out_of_space();
211 }
212 }
213 }
214 if (data_writer_) {
215 data_writer_->Close();
216 if (data_writer_->ran_out_of_space()) {
217 ran_out_of_space_ = true;
218 data_writer_->acknowledge_out_of_space();
219 }
220 }
221}
222
Austin Schuhcb5601b2020-09-10 15:29:59 -0700223void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
224 DataWriter *data_writer) {
225 std::string filename =
226 absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
227 "/", channel->type()->string_view(), ".part",
228 data_writer->part_number, ".bfbs");
Brian Silverman0465fcf2020-09-24 00:29:18 -0700229 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700230}
231
232void MultiNodeLogNamer::OpenWriter(const Channel *channel,
233 DataWriter *data_writer) {
234 const std::string filename = absl::StrCat(
235 base_name_, "_", channel->source_node()->string_view(), "_data",
236 channel->name()->string_view(), "/", channel->type()->string_view(),
237 ".part", data_writer->part_number, ".bfbs");
Brian Silverman0465fcf2020-09-24 00:29:18 -0700238 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700239}
240
241std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
242 return std::make_unique<DetachedBufferWriter>(
243 absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
Brian Silvermanf51499a2020-09-21 12:49:08 -0700244 part_number_, ".bfbs"),
245 std::make_unique<DummyEncoder>());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700246}
247
Brian Silverman0465fcf2020-09-24 00:29:18 -0700248void MultiNodeLogNamer::CreateBufferWriter(
249 std::string_view filename,
250 std::unique_ptr<DetachedBufferWriter> *destination) {
251 if (ran_out_of_space_) {
252 // Refuse to open any new files, which might skip data. Any existing files
253 // are in the same folder, which means they're on the same filesystem, which
254 // means they're probably going to run out of space and get stuck too.
255 return;
256 }
257 if (!destination->get()) {
258 *destination = std::make_unique<DetachedBufferWriter>(
259 filename, std::make_unique<DummyEncoder>());
260 return;
261 }
262 destination->get()->Close();
263 if (destination->get()->ran_out_of_space()) {
264 ran_out_of_space_ = true;
265 return;
266 }
267 *destination->get() =
268 DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
269}
270
Austin Schuhcb5601b2020-09-10 15:29:59 -0700271} // namespace logger
272} // namespace aos