blob: a635682f707a6591e25804b5dc079f46dfe1e812 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
12#include "aos/events/logging/uuid.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17namespace logger {
18
19void LogNamer::UpdateHeader(
20 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
21 const UUID &uuid, int parts_index) const {
22 header->mutable_message()->mutate_parts_index(parts_index);
23 CHECK_EQ(uuid.string_view().size(),
24 header->mutable_message()->mutable_parts_uuid()->size());
25 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
26 reinterpret_cast<char *>(
27 header->mutable_message()->mutable_parts_uuid()->Data()));
28}
29
30void LocalLogNamer::WriteHeader(
31 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
32 const Node *node) {
33 CHECK_EQ(node, this->node());
34 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070035 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070036}
37
38DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
39 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
40 return data_writer_.get();
41}
42
43void LocalLogNamer::Rotate(
44 const Node *node,
45 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
46 CHECK(node == this->node());
47 ++part_number_;
48 *data_writer_ = std::move(*OpenDataWriter());
49 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070050 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070051}
52
53DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
54 const Channel *channel) {
55 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
56 << ": Message is not delivered to this node.";
57 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
58 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
59 node_))
60 << ": Delivery times aren't logged for this channel on this node.";
61 return data_writer_.get();
62}
63
64DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
65 const Channel * /*channel*/, const Node * /*node*/) {
66 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
67 return nullptr;
68}
69
70MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
71 const Configuration *configuration,
72 const Node *node)
73 : LogNamer(node),
74 base_name_(base_name),
75 configuration_(configuration),
76 uuid_(UUID::Random()),
77 data_writer_(OpenDataWriter()) {}
78
79void MultiNodeLogNamer::WriteHeader(
80 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
81 const Node *node) {
82 if (node == this->node()) {
83 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070084 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070085 } else {
86 for (std::pair<const Channel *const, DataWriter> &data_writer :
87 data_writers_) {
88 if (node == data_writer.second.node) {
89 UpdateHeader(header, data_writer.second.uuid,
90 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -070091 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070092 }
93 }
94 }
95}
96
97void MultiNodeLogNamer::Rotate(
98 const Node *node,
99 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
100 if (node == this->node()) {
101 ++part_number_;
102 *data_writer_ = std::move(*OpenDataWriter());
103 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700104 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700105 } else {
106 for (std::pair<const Channel *const, DataWriter> &data_writer :
107 data_writers_) {
108 if (node == data_writer.second.node) {
109 ++data_writer.second.part_number;
110 data_writer.second.rotate(data_writer.first, &data_writer.second);
111 UpdateHeader(header, data_writer.second.uuid,
112 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700113 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700114 }
115 }
116 }
117}
118
119DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
120 // See if we can read the data on this node at all.
121 const bool is_readable =
122 configuration::ChannelIsReadableOnNode(channel, this->node());
123 if (!is_readable) {
124 return nullptr;
125 }
126
127 // Then, see if we are supposed to log the data here.
128 const bool log_message =
129 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
130
131 if (!log_message) {
132 return nullptr;
133 }
134
135 // Now, sort out if this is data generated on this node, or not. It is
136 // generated if it is sendable on this node.
137 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
138 return data_writer_.get();
139 }
140
141 // Ok, we have data that is being forwarded to us that we are supposed to
142 // log. It needs to be logged with send timestamps, but be sorted enough
143 // to be able to be processed.
144 CHECK(data_writers_.find(channel) == data_writers_.end());
145
146 // Track that this node is being logged.
147 const Node *source_node = configuration::GetNode(
148 configuration_, channel->source_node()->string_view());
149
150 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
151 nodes_.emplace_back(source_node);
152 }
153
154 DataWriter data_writer;
155 data_writer.node = source_node;
156 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
157 OpenWriter(channel, data_writer);
158 };
159 data_writer.rotate(channel, &data_writer);
160
161 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
162 .first->second.writer.get();
163}
164
165DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
166 const Channel *channel, const Node *node) {
167 // See if we can read the data on this node at all.
168 const bool is_readable =
169 configuration::ChannelIsReadableOnNode(channel, this->node());
170 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
171
172 CHECK(data_writers_.find(channel) == data_writers_.end());
173
174 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
175 nodes_.emplace_back(node);
176 }
177
178 DataWriter data_writer;
179 data_writer.node = node;
180 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
181 OpenForwardedTimestampWriter(channel, data_writer);
182 };
183 data_writer.rotate(channel, &data_writer);
184
185 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
186 .first->second.writer.get();
187}
188
189DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
190 const Channel *channel) {
191 const bool log_delivery_times =
192 (this->node() == nullptr)
193 ? false
194 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
195 channel, this->node(), this->node());
196 if (!log_delivery_times) {
197 return nullptr;
198 }
199
200 return data_writer_.get();
201}
202
203void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
204 DataWriter *data_writer) {
205 std::string filename =
206 absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
207 "/", channel->type()->string_view(), ".part",
208 data_writer->part_number, ".bfbs");
209
210 if (!data_writer->writer) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700211 data_writer->writer = std::make_unique<DetachedBufferWriter>(
212 filename, std::make_unique<DummyEncoder>());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700213 } else {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700214 *data_writer->writer =
215 DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700216 }
217}
218
219void MultiNodeLogNamer::OpenWriter(const Channel *channel,
220 DataWriter *data_writer) {
221 const std::string filename = absl::StrCat(
222 base_name_, "_", channel->source_node()->string_view(), "_data",
223 channel->name()->string_view(), "/", channel->type()->string_view(),
224 ".part", data_writer->part_number, ".bfbs");
225 if (!data_writer->writer) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700226 data_writer->writer = std::make_unique<DetachedBufferWriter>(
227 filename, std::make_unique<DummyEncoder>());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700228 } else {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700229 *data_writer->writer =
230 DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700231 }
232}
233
234std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
235 return std::make_unique<DetachedBufferWriter>(
236 absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
Brian Silvermanf51499a2020-09-21 12:49:08 -0700237 part_number_, ".bfbs"),
238 std::make_unique<DummyEncoder>());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700239}
240
241} // namespace logger
242} // namespace aos