| #ifndef AOS_COMMON_LOGGING_REPLAY_H_ |
| #define AOS_COMMON_LOGGING_REPLAY_H_ |
| |
| #include <unordered_map> |
| #include <string> |
| #include <functional> |
| #include <memory> |
| |
| #include "aos/common/logging/binary_log_file.h" |
| #include "aos/common/queue.h" |
| #include "aos/common/logging/logging.h" |
| #include "aos/common/macros.h" |
| #include "aos/linux_code/ipc_lib/queue.h" |
| #include "aos/common/queue_types.h" |
| |
| namespace aos { |
| namespace logging { |
| namespace linux_code { |
| |
| // Manages pulling logged queue messages out of log files. |
| // |
| // Basic usage: |
| // - Use the Add* methods to register handlers for various message sources. |
| // - Call OpenFile to open a log file. |
| // - Call ProcessMessage repeatedly until it returns true. |
| // |
| // This code could do something to adapt similar-but-not-identical |
| // messages to the current versions, but currently it will LOG(FATAL) if any of |
| // the messages don't match up exactly. |
| class LogReplayer { |
| public: |
| LogReplayer() {} |
| |
| // Gets ready to read messages from fd. |
| // Does not take ownership of fd. |
| void OpenFile(int fd) { |
| reader_.reset(new LogFileReader(fd)); |
| } |
| // Closes the currently open file. |
| void CloseCurrentFile() { reader_.reset(); } |
| // Returns true if we have a file which is currently open. |
| bool HasCurrentFile() const { return reader_.get() != nullptr; } |
| |
| // Processes a single message from the currently open file. |
| // Returns true if there are no more messages in the file. |
| // This will not call any of the handlers if the next message either has no |
| // registered handlers or is not a queue message. |
| bool ProcessMessage(); |
| |
| // Adds a handler for messages with a certain string from a certain process. |
| // T must be a Message with the same format as the messages generated by |
| // the .q files. |
| // LOG(FATAL)s for duplicate handlers. |
| template <class T> |
| void AddHandler(const ::std::string &process_name, |
| const ::std::string &log_message, |
| ::std::function<void(const T &message)> handler) { |
| CHECK(handlers_.emplace( |
| ::std::piecewise_construct, |
| ::std::forward_as_tuple(process_name, log_message), |
| ::std::forward_as_tuple(::std::unique_ptr<StructHandlerInterface>( |
| new TypedStructHandler<T>(handler)))).second); |
| } |
| |
| // Adds a handler which takes messages and places them directly on a queue. |
| // T must be a Message with the same format as the messages generated by |
| // the .q files. |
| template <class T> |
| void AddDirectQueueSender(const ::std::string &process_name, |
| const ::std::string &log_message, |
| const ::aos::Queue<T> &queue) { |
| AddHandler(process_name, log_message, |
| ::std::function<void(const T &)>( |
| QueueDumpStructHandler<T>(queue.name()))); |
| } |
| |
| private: |
| // A generic handler of struct log messages. |
| class StructHandlerInterface { |
| public: |
| virtual ~StructHandlerInterface() {} |
| |
| virtual void HandleStruct(::aos::monotonic_clock::time_point log_time, |
| uint32_t type_id, const void *data, |
| size_t data_size) = 0; |
| }; |
| |
| // Converts struct log messages to a message type and passes it to an |
| // ::std::function. |
| template <class T> |
| class TypedStructHandler : public StructHandlerInterface { |
| public: |
| TypedStructHandler(::std::function<void(const T &message)> handler) |
| : handler_(handler) {} |
| |
| void HandleStruct(::aos::monotonic_clock::time_point log_time, |
| uint32_t type_id, const void *data, |
| size_t data_size) override { |
| CHECK_EQ(type_id, T::GetType()->id); |
| T message; |
| CHECK_EQ(data_size, T::Size()); |
| CHECK_EQ(data_size, message.Deserialize(static_cast<const char *>(data))); |
| message.sent_time = log_time; |
| handler_(message); |
| } |
| |
| private: |
| const ::std::function<void(T message)> handler_; |
| }; |
| |
| // A callable class which dumps messages straight to a queue. |
| template <class T> |
| class QueueDumpStructHandler { |
| public: |
| QueueDumpStructHandler(const ::std::string &queue_name) |
| : queue_(RawQueue::Fetch(queue_name.c_str(), sizeof(T), T::kHash, |
| T::kQueueLength)) {} |
| |
| void operator()(const T &message) { |
| LOG_STRUCT(DEBUG, "re-sending", message); |
| void *raw_message = queue_->GetMessage(); |
| CHECK_NOTNULL(raw_message); |
| memcpy(raw_message, &message, sizeof(message)); |
| CHECK(queue_->WriteMessage(raw_message, RawQueue::kOverride)); |
| } |
| |
| private: |
| ::aos::RawQueue *const queue_; |
| }; |
| |
| // A key for specifying log messages to give to a certain handler. |
| struct Key { |
| Key(const ::std::string &process_name, const ::std::string &log_message) |
| : process_name(process_name), log_message(log_message) {} |
| |
| ::std::string process_name; |
| ::std::string log_message; |
| }; |
| |
| struct KeyHash { |
| size_t operator()(const Key &key) const { |
| return string_hash(key.process_name) ^ |
| (string_hash(key.log_message) << 1); |
| } |
| |
| private: |
| const ::std::hash<::std::string> string_hash = ::std::hash<::std::string>(); |
| }; |
| struct KeyEqual { |
| bool operator()(const Key &a, const Key &b) const { |
| return a.process_name == b.process_name && a.log_message == b.log_message; |
| } |
| }; |
| |
| ::std::unordered_map<const Key, ::std::unique_ptr<StructHandlerInterface>, |
| KeyHash, KeyEqual> handlers_; |
| ::std::unique_ptr<LogFileReader> reader_; |
| |
| DISALLOW_COPY_AND_ASSIGN(LogReplayer); |
| }; |
| |
| } // namespace linux_code |
| } // namespace logging |
| } // namespace aos |
| |
| #endif // AOS_COMMON_LOGGING_REPLAY_H_ |