blob: fbdc2922f918e0dac7d834ee24423c8ff9afeb43 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
12#include "aos/events/logging/uuid.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17namespace logger {
18
19void LogNamer::UpdateHeader(
20 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
21 const UUID &uuid, int parts_index) const {
22 header->mutable_message()->mutate_parts_index(parts_index);
23 CHECK_EQ(uuid.string_view().size(),
24 header->mutable_message()->mutable_parts_uuid()->size());
25 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
26 reinterpret_cast<char *>(
27 header->mutable_message()->mutable_parts_uuid()->Data()));
28}
29
30void LocalLogNamer::WriteHeader(
31 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
32 const Node *node) {
33 CHECK_EQ(node, this->node());
34 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070035 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070036}
37
38DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
39 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
40 return data_writer_.get();
41}
42
43void LocalLogNamer::Rotate(
44 const Node *node,
45 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
46 CHECK(node == this->node());
47 ++part_number_;
48 *data_writer_ = std::move(*OpenDataWriter());
49 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070050 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070051}
52
53DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
54 const Channel *channel) {
55 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
56 << ": Message is not delivered to this node.";
57 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
58 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
59 node_))
60 << ": Delivery times aren't logged for this channel on this node.";
61 return data_writer_.get();
62}
63
64DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
65 const Channel * /*channel*/, const Node * /*node*/) {
66 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
67 return nullptr;
68}
69
70MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
71 const Configuration *configuration,
72 const Node *node)
73 : LogNamer(node),
74 base_name_(base_name),
75 configuration_(configuration),
Brian Silvermana621f522020-09-30 16:52:43 -070076 uuid_(UUID::Random()) {
77 OpenDataWriter();
78}
Austin Schuhcb5601b2020-09-10 15:29:59 -070079
80void MultiNodeLogNamer::WriteHeader(
81 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
82 const Node *node) {
83 if (node == this->node()) {
84 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070085 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070086 } else {
87 for (std::pair<const Channel *const, DataWriter> &data_writer :
88 data_writers_) {
89 if (node == data_writer.second.node) {
90 UpdateHeader(header, data_writer.second.uuid,
91 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -070092 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070093 }
94 }
95 }
96}
97
98void MultiNodeLogNamer::Rotate(
99 const Node *node,
100 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
101 if (node == this->node()) {
102 ++part_number_;
Brian Silvermana621f522020-09-30 16:52:43 -0700103 OpenDataWriter();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700104 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700105 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700106 } else {
107 for (std::pair<const Channel *const, DataWriter> &data_writer :
108 data_writers_) {
109 if (node == data_writer.second.node) {
110 ++data_writer.second.part_number;
111 data_writer.second.rotate(data_writer.first, &data_writer.second);
112 UpdateHeader(header, data_writer.second.uuid,
113 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700114 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700115 }
116 }
117 }
118}
119
120DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
121 // See if we can read the data on this node at all.
122 const bool is_readable =
123 configuration::ChannelIsReadableOnNode(channel, this->node());
124 if (!is_readable) {
125 return nullptr;
126 }
127
128 // Then, see if we are supposed to log the data here.
129 const bool log_message =
130 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
131
132 if (!log_message) {
133 return nullptr;
134 }
135
136 // Now, sort out if this is data generated on this node, or not. It is
137 // generated if it is sendable on this node.
138 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
139 return data_writer_.get();
140 }
141
142 // Ok, we have data that is being forwarded to us that we are supposed to
143 // log. It needs to be logged with send timestamps, but be sorted enough
144 // to be able to be processed.
145 CHECK(data_writers_.find(channel) == data_writers_.end());
146
147 // Track that this node is being logged.
148 const Node *source_node = configuration::GetNode(
149 configuration_, channel->source_node()->string_view());
150
151 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
152 nodes_.emplace_back(source_node);
153 }
154
155 DataWriter data_writer;
156 data_writer.node = source_node;
157 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
158 OpenWriter(channel, data_writer);
159 };
160 data_writer.rotate(channel, &data_writer);
161
162 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
163 .first->second.writer.get();
164}
165
166DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
167 const Channel *channel, const Node *node) {
168 // See if we can read the data on this node at all.
169 const bool is_readable =
170 configuration::ChannelIsReadableOnNode(channel, this->node());
171 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
172
173 CHECK(data_writers_.find(channel) == data_writers_.end());
174
175 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
176 nodes_.emplace_back(node);
177 }
178
179 DataWriter data_writer;
180 data_writer.node = node;
181 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
182 OpenForwardedTimestampWriter(channel, data_writer);
183 };
184 data_writer.rotate(channel, &data_writer);
185
186 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
187 .first->second.writer.get();
188}
189
190DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
191 const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700192 bool log_delivery_times = false;
193 if (this->node() != nullptr) {
194 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
195 channel, this->node(), this->node());
196 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700197 if (!log_delivery_times) {
198 return nullptr;
199 }
200
201 return data_writer_.get();
202}
203
Brian Silverman0465fcf2020-09-24 00:29:18 -0700204void MultiNodeLogNamer::Close() {
205 for (std::pair<const Channel *const, DataWriter> &data_writer :
206 data_writers_) {
207 if (data_writer.second.writer) {
208 data_writer.second.writer->Close();
209 if (data_writer.second.writer->ran_out_of_space()) {
210 ran_out_of_space_ = true;
211 data_writer.second.writer->acknowledge_out_of_space();
212 }
213 }
214 }
215 if (data_writer_) {
216 data_writer_->Close();
217 if (data_writer_->ran_out_of_space()) {
218 ran_out_of_space_ = true;
219 data_writer_->acknowledge_out_of_space();
220 }
221 }
222}
223
Austin Schuhcb5601b2020-09-10 15:29:59 -0700224void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
225 DataWriter *data_writer) {
226 std::string filename =
Brian Silvermana621f522020-09-30 16:52:43 -0700227 absl::StrCat("_timestamps", channel->name()->string_view(), "/",
228 channel->type()->string_view(), ".part",
Austin Schuhcb5601b2020-09-10 15:29:59 -0700229 data_writer->part_number, ".bfbs");
Brian Silverman0465fcf2020-09-24 00:29:18 -0700230 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700231}
232
233void MultiNodeLogNamer::OpenWriter(const Channel *channel,
234 DataWriter *data_writer) {
235 const std::string filename = absl::StrCat(
Brian Silvermana621f522020-09-30 16:52:43 -0700236 "_", CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
237 channel->name()->string_view(), "/", channel->type()->string_view(),
238 ".part", data_writer->part_number, ".bfbs");
Brian Silverman0465fcf2020-09-24 00:29:18 -0700239 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700240}
241
Brian Silvermana621f522020-09-30 16:52:43 -0700242void MultiNodeLogNamer::OpenDataWriter() {
243 std::string name;
Brian Silverman7af8c902020-09-29 16:14:04 -0700244 if (node() != nullptr) {
245 name = absl::StrCat(name, "_", node()->name()->string_view());
246 }
Brian Silvermana621f522020-09-30 16:52:43 -0700247 absl::StrAppend(&name, "_data.part", part_number_, ".bfbs");
248 CreateBufferWriter(name, &data_writer_);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700249}
250
Brian Silverman0465fcf2020-09-24 00:29:18 -0700251void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700252 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700253 if (ran_out_of_space_) {
254 // Refuse to open any new files, which might skip data. Any existing files
255 // are in the same folder, which means they're on the same filesystem, which
256 // means they're probably going to run out of space and get stuck too.
257 return;
258 }
Brian Silvermana621f522020-09-30 16:52:43 -0700259 const std::string filename = absl::StrCat(base_name_, path);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700260 if (!destination->get()) {
Brian Silvermana621f522020-09-30 16:52:43 -0700261 all_filenames_.emplace_back(path);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700262 *destination = std::make_unique<DetachedBufferWriter>(
263 filename, std::make_unique<DummyEncoder>());
264 return;
265 }
266 destination->get()->Close();
267 if (destination->get()->ran_out_of_space()) {
268 ran_out_of_space_ = true;
269 return;
270 }
Brian Silvermana621f522020-09-30 16:52:43 -0700271 all_filenames_.emplace_back(path);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700272 *destination->get() =
273 DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
274}
275
Austin Schuhcb5601b2020-09-10 15:29:59 -0700276} // namespace logger
277} // namespace aos