blob: 6604591468862eab38fe8aee2b3007499490a327 [file] [log] [blame]
#ifndef AOS_LOGGING_REPLAY_H_
#define AOS_LOGGING_REPLAY_H_
#include <unordered_map>
#include <string>
#include <functional>
#include <memory>
#include "aos/events/event-loop.h"
#include "aos/ipc_lib/queue.h"
#include "aos/logging/binary_log_file.h"
#include "aos/logging/logging.h"
#include "aos/logging/queue_logging.h"
#include "aos/macros.h"
#include "aos/queue.h"
#include "aos/queue_types.h"
namespace aos {
namespace logging {
namespace linux_code {
// Manages pulling logged queue messages out of log files.
//
// Basic usage:
// - Use the Add* methods to register handlers for various message sources.
// - Call OpenFile to open a log file.
// - Call ProcessMessage repeatedly until it returns true.
//
// This code could do something to adapt similar-but-not-identical
// messages to the current versions, but currently it will LOG(FATAL) if any of
// the messages don't match up exactly.
class LogReplayer {
public:
LogReplayer() {}
// Gets ready to read messages from fd.
// Does not take ownership of fd.
void OpenFile(int fd) {
reader_.reset(new LogFileReader(fd));
}
// Closes the currently open file.
void CloseCurrentFile() { reader_.reset(); }
// Returns true if we have a file which is currently open.
bool HasCurrentFile() const { return reader_.get() != nullptr; }
// Processes a single message from the currently open file.
// Returns true if there are no more messages in the file.
// This will not call any of the handlers if the next message either has no
// registered handlers or is not a queue message.
bool ProcessMessage();
// Adds a handler for messages with a certain string from a certain process.
// T must be a Message with the same format as the messages generated by
// the .q files.
// LOG(FATAL)s for duplicate handlers.
template <class T>
void AddHandler(const ::std::string &process_name,
const ::std::string &log_message,
::std::function<void(const T &message)> handler) {
AOS_CHECK(handlers_
.emplace(::std::piecewise_construct,
::std::forward_as_tuple(process_name, log_message),
::std::forward_as_tuple(
::std::unique_ptr<StructHandlerInterface>(
new TypedStructHandler<T>(handler))))
.second);
}
// Adds a handler which takes messages and places them directly on a queue.
// T must be a Message with the same format as the messages generated by
// the .q files.
template <class T>
void AddDirectQueueSender(const ::std::string &process_name,
const ::std::string &log_message,
const ::std::string &name) {
AddHandler(process_name, log_message,
::std::function<void(const T &)>(
QueueDumpStructHandler<T>(name.c_str())));
}
private:
// A generic handler of struct log messages.
class StructHandlerInterface {
public:
virtual ~StructHandlerInterface() {}
virtual void HandleStruct(::aos::monotonic_clock::time_point log_time,
uint32_t type_id, const void *data,
size_t data_size) = 0;
};
// Converts struct log messages to a message type and passes it to an
// ::std::function.
template <class T>
class TypedStructHandler : public StructHandlerInterface {
public:
TypedStructHandler(::std::function<void(const T &message)> handler)
: handler_(handler) {}
void HandleStruct(::aos::monotonic_clock::time_point log_time,
uint32_t type_id, const void *data,
size_t data_size) override {
AOS_CHECK_EQ(type_id, T::GetType()->id);
T message;
AOS_CHECK_EQ(data_size, T::Size());
AOS_CHECK_EQ(data_size,
message.Deserialize(static_cast<const char *>(data)));
message.sent_time = log_time;
handler_(message);
}
private:
const ::std::function<void(T message)> handler_;
};
// A callable class which dumps messages straight to a queue.
template <class T>
class QueueDumpStructHandler {
public:
QueueDumpStructHandler(const ::std::string &queue_name)
: queue_(RawQueue::Fetch(queue_name.c_str(), sizeof(T), T::kHash,
T::kQueueLength)) {}
void operator()(const T &message) {
AOS_LOG_STRUCT(DEBUG, "re-sending", message);
void *raw_message = queue_->GetMessage();
AOS_CHECK_NOTNULL(raw_message);
memcpy(raw_message, &message, sizeof(message));
AOS_CHECK(queue_->WriteMessage(raw_message, RawQueue::kOverride));
}
private:
::aos::RawQueue *const queue_;
};
// A key for specifying log messages to give to a certain handler.
struct Key {
Key(const ::std::string &process_name, const ::std::string &log_message)
: process_name(process_name), log_message(log_message) {}
::std::string process_name;
::std::string log_message;
};
struct KeyHash {
size_t operator()(const Key &key) const {
return string_hash(key.process_name) ^
(string_hash(key.log_message) << 1);
}
private:
const ::std::hash<::std::string> string_hash = ::std::hash<::std::string>();
};
struct KeyEqual {
bool operator()(const Key &a, const Key &b) const {
return a.process_name == b.process_name && a.log_message == b.log_message;
}
};
::std::unordered_map<const Key, ::std::unique_ptr<StructHandlerInterface>,
KeyHash, KeyEqual> handlers_;
::std::unique_ptr<LogFileReader> reader_;
DISALLOW_COPY_AND_ASSIGN(LogReplayer);
};
} // namespace linux_code
} // namespace logging
} // namespace aos
#endif // AOS_LOGGING_REPLAY_H_