blob: 942f26fcd52f08dcb414cadb8fc127b7822c9f07 [file] [log] [blame]
Brian Silvermand0575692015-02-21 16:24:02 -05001#ifndef AOS_LINUX_CODE_LOGGING_LOG_REPLAY_H_
2#define AOS_LINUX_CODE_LOGGING_LOG_REPLAY_H_
3
4#include <unordered_map>
5#include <string>
6#include <functional>
7#include <memory>
8
9#include "aos/linux_code/logging/binary_log_file.h"
10#include "aos/common/queue.h"
11#include "aos/common/logging/logging.h"
12#include "aos/common/macros.h"
13#include "aos/linux_code/ipc_lib/queue.h"
14#include "aos/common/queue_types.h"
15
16namespace aos {
17namespace logging {
18namespace linux_code {
19
20// Manages pulling logged queue messages out of log files.
21//
22// Basic usage:
23// - Use the Add* methods to register handlers for various message sources.
24// - Call OpenFile to open a log file.
25// - Call ProcessMessage repeatedly until it returns true.
26//
27// This code could do something to adapt similar-but-not-identical
28// messages to the current versions, but currently it will LOG(FATAL) if any of
29// the messages don't match up exactly.
30class LogReplayer {
31 public:
32 LogReplayer() {}
33
34 // Gets ready to read messages from fd.
35 // Does not take ownership of fd.
36 void OpenFile(int fd) {
37 reader_.reset(new LogFileReader(fd));
38 }
39 // Closes the currently open file.
40 void CloseCurrentFile() { reader_.reset(); }
41 // Returns true if we have a file which is currently open.
42 bool HasCurrentFile() const { return reader_.get() != nullptr; }
43
44 // Processes a single message from the currently open file.
45 // Returns true if there are no more messages in the file.
46 // This will not call any of the handlers if the next message either has no
47 // registered handlers or is not a queue message.
48 bool ProcessMessage();
49
50 // Adds a handler for messages with a certain string from a certain process.
51 // T must be a Message with the same format as the messages generated by
52 // the .q files.
53 // LOG(FATAL)s for duplicate handlers.
54 template <class T>
55 void AddHandler(const ::std::string &process_name,
56 const ::std::string &log_message,
57 ::std::function<void(const T &message)> handler) {
58 CHECK(handlers_.emplace(
59 ::std::piecewise_construct,
60 ::std::forward_as_tuple(process_name, log_message),
61 ::std::forward_as_tuple(::std::unique_ptr<StructHandlerInterface>(
62 new TypedStructHandler<T>(handler)))).second);
63 }
64
65 // Adds a handler which takes messages and places them directly on a queue.
66 // T must be a Message with the same format as the messages generated by
67 // the .q files.
68 template <class T>
69 void AddDirectQueueSender(const ::std::string &process_name,
70 const ::std::string &log_message,
71 const ::aos::Queue<T> &queue) {
72 AddHandler(process_name, log_message,
73 ::std::function<void(const T &)>(
74 QueueDumpStructHandler<T>(queue.name())));
75 }
76
77 private:
78 // A generic handler of struct log messages.
79 class StructHandlerInterface {
80 public:
81 virtual ~StructHandlerInterface() {}
82
83 virtual void HandleStruct(::aos::time::Time log_time, uint32_t type_id,
84 const void *data, size_t data_size) = 0;
85 };
86
87 // Converts struct log messages to a message type and passes it to an
88 // ::std::function.
89 template <class T>
90 class TypedStructHandler : public StructHandlerInterface {
91 public:
92 TypedStructHandler(::std::function<void(const T &message)> handler)
93 : handler_(handler) {}
94
95 void HandleStruct(::aos::time::Time log_time, uint32_t type_id,
96 const void *data, size_t data_size) override {
97 CHECK_EQ(type_id, T::GetType()->id);
98 T message;
99 CHECK_EQ(data_size, T::Size());
100 CHECK_EQ(data_size, message.Deserialize(static_cast<const char *>(data)));
101 message.sent_time = log_time;
102 handler_(message);
103 }
104
105 private:
106 const ::std::function<void(T message)> handler_;
107 };
108
109 // A callable class which dumps messages straight to a queue.
110 template <class T>
111 class QueueDumpStructHandler {
112 public:
113 QueueDumpStructHandler(const ::std::string &queue_name)
114 : queue_(RawQueue::Fetch(queue_name.c_str(), sizeof(T), T::kHash,
115 T::kQueueLength)) {}
116
117 void operator()(const T &message) {
118 LOG_STRUCT(DEBUG, "re-sending", message);
119 void *raw_message = queue_->GetMessage();
120 CHECK_NOTNULL(raw_message);
121 memcpy(raw_message, &message, sizeof(message));
122 CHECK(queue_->WriteMessage(raw_message, RawQueue::kOverride));
123 }
124
125 private:
126 ::aos::RawQueue *const queue_;
127 };
128
129 // A key for specifying log messages to give to a certain handler.
130 struct Key {
131 Key(const ::std::string &process_name, const ::std::string &log_message)
132 : process_name(process_name), log_message(log_message) {}
133
134 ::std::string process_name;
135 ::std::string log_message;
136 };
137
138 struct KeyHash {
139 size_t operator()(const Key &key) const {
140 return string_hash(key.process_name) ^
141 (string_hash(key.log_message) << 1);
142 }
143
144 private:
145 const ::std::hash<::std::string> string_hash = ::std::hash<::std::string>();
146 };
147 struct KeyEqual {
148 bool operator()(const Key &a, const Key &b) const {
149 return a.process_name == b.process_name && a.log_message == b.log_message;
150 }
151 };
152
153 ::std::unordered_map<const Key, ::std::unique_ptr<StructHandlerInterface>,
154 KeyHash, KeyEqual> handlers_;
155 ::std::unique_ptr<LogFileReader> reader_;
156
157 DISALLOW_COPY_AND_ASSIGN(LogReplayer);
158};
159
160} // namespace linux_code
161} // namespace logging
162} // namespace aos
163
164#endif // AOS_LINUX_CODE_LOGGING_LOG_REPLAY_H_