add support for replaying log messages
For example, this allows reproducing logged but hard to physically
reproduce problems to control loops with more logging to help debug the
problem, and then verifying that it does something different under the
same conditions.
Change-Id: I1e55e7a7c0b3154bcaf86373a9e6c57c594310a0
diff --git a/aos/linux_code/ipc_lib/shared_mem.c b/aos/linux_code/ipc_lib/shared_mem.c
index 79b747a..65305ac 100644
--- a/aos/linux_code/ipc_lib/shared_mem.c
+++ b/aos/linux_code/ipc_lib/shared_mem.c
@@ -20,7 +20,7 @@
#define SIZEOFSHMSEG (4096 * 0x3000)
void init_shared_mem_core(aos_shm_core *shm_core) {
- clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
+ memset(&shm_core->time_offset, 0 , sizeof(shm_core->time_offset));
memset(&shm_core->msg_alloc_lock, 0, sizeof(shm_core->msg_alloc_lock));
shm_core->queues.pointer = NULL;
memset(&shm_core->queues.lock, 0, sizeof(shm_core->queues.lock));
diff --git a/aos/linux_code/ipc_lib/shared_mem.h b/aos/linux_code/ipc_lib/shared_mem.h
index a33290c..423fd0c 100644
--- a/aos/linux_code/ipc_lib/shared_mem.h
+++ b/aos/linux_code/ipc_lib/shared_mem.h
@@ -11,7 +11,7 @@
extern "C" {
#endif
-extern struct aos_core *global_core;
+extern struct aos_core *global_core __attribute__((weak));
// Where the shared memory segment starts in each process's address space.
// Has to be the same in all of them so that stuff in shared memory
@@ -26,13 +26,17 @@
} aos_global_pointer;
typedef struct aos_shm_core_t {
- // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
- // this shared memory area
- struct timespec identifier;
// Gets 0-initialized at the start (as part of shared memory) and
// the owner sets as soon as it finishes setting stuff up.
aos_condition creation_condition;
+ // An offset from CLOCK_REALTIME to times for all the code.
+ // This is currently only set to non-zero by the log replay code.
+ // There is no synchronization on this to avoid the overhead because it is
+ // only updated with proper memory barriers when only a single task is
+ // running.
+ struct timespec time_offset;
+
struct aos_mutex msg_alloc_lock;
void *msg_alloc;
diff --git a/aos/linux_code/logging/log_replay.cc b/aos/linux_code/logging/log_replay.cc
new file mode 100644
index 0000000..32991de
--- /dev/null
+++ b/aos/linux_code/logging/log_replay.cc
@@ -0,0 +1,44 @@
+#include "aos/linux_code/logging/log_replay.h"
+
+namespace aos {
+namespace logging {
+namespace linux_code {
+
+bool LogReplayer::ProcessMessage() {
+ const LogFileMessageHeader *message = reader_->ReadNextMessage(false);
+ if (message == nullptr) return true;
+ if (message->type != LogFileMessageHeader::MessageType::kStruct) return false;
+
+ const char *position = reinterpret_cast<const char *>(message + 1);
+
+ ::std::string process(position, message->name_size);
+ position += message->name_size;
+
+ uint32_t type_id;
+ memcpy(&type_id, position, sizeof(type_id));
+ position += sizeof(type_id);
+
+ uint32_t message_length;
+ memcpy(&message_length, position, sizeof(message_length));
+ position += sizeof(message_length);
+ ::std::string message_text(position, message_length);
+ position += message_length;
+
+ size_t split_index = message_text.find_first_of(':') + 2;
+ split_index = message_text.find_first_of(':', split_index) + 2;
+ message_text = message_text.substr(split_index);
+
+ auto handler = handlers_.find(Key(process, message_text));
+ if (handler == handlers_.end()) return false;
+
+ handler->second->HandleStruct(
+ ::aos::time::Time(message->time_sec, message->time_nsec), type_id,
+ position,
+ message->message_size -
+ (sizeof(type_id) + sizeof(message_length) + message_length));
+ return false;
+}
+
+} // namespace linux_code
+} // namespace logging
+} // namespace aos
diff --git a/aos/linux_code/logging/log_replay.h b/aos/linux_code/logging/log_replay.h
new file mode 100644
index 0000000..942f26f
--- /dev/null
+++ b/aos/linux_code/logging/log_replay.h
@@ -0,0 +1,164 @@
+#ifndef AOS_LINUX_CODE_LOGGING_LOG_REPLAY_H_
+#define AOS_LINUX_CODE_LOGGING_LOG_REPLAY_H_
+
+#include <unordered_map>
+#include <string>
+#include <functional>
+#include <memory>
+
+#include "aos/linux_code/logging/binary_log_file.h"
+#include "aos/common/queue.h"
+#include "aos/common/logging/logging.h"
+#include "aos/common/macros.h"
+#include "aos/linux_code/ipc_lib/queue.h"
+#include "aos/common/queue_types.h"
+
+namespace aos {
+namespace logging {
+namespace linux_code {
+
+// Manages pulling logged queue messages out of log files.
+//
+// Basic usage:
+// - Use the Add* methods to register handlers for various message sources.
+// - Call OpenFile to open a log file.
+// - Call ProcessMessage repeatedly until it returns true.
+//
+// This code could do something to adapt similar-but-not-identical
+// messages to the current versions, but currently it will LOG(FATAL) if any of
+// the messages don't match up exactly.
+class LogReplayer {
+ public:
+ LogReplayer() {}
+
+ // Gets ready to read messages from fd.
+ // Does not take ownership of fd.
+ void OpenFile(int fd) {
+ reader_.reset(new LogFileReader(fd));
+ }
+ // Closes the currently open file.
+ void CloseCurrentFile() { reader_.reset(); }
+ // Returns true if we have a file which is currently open.
+ bool HasCurrentFile() const { return reader_.get() != nullptr; }
+
+ // Processes a single message from the currently open file.
+ // Returns true if there are no more messages in the file.
+ // This will not call any of the handlers if the next message either has no
+ // registered handlers or is not a queue message.
+ bool ProcessMessage();
+
+ // Adds a handler for messages with a certain string from a certain process.
+ // T must be a Message with the same format as the messages generated by
+ // the .q files.
+ // LOG(FATAL)s for duplicate handlers.
+ template <class T>
+ void AddHandler(const ::std::string &process_name,
+ const ::std::string &log_message,
+ ::std::function<void(const T &message)> handler) {
+ CHECK(handlers_.emplace(
+ ::std::piecewise_construct,
+ ::std::forward_as_tuple(process_name, log_message),
+ ::std::forward_as_tuple(::std::unique_ptr<StructHandlerInterface>(
+ new TypedStructHandler<T>(handler)))).second);
+ }
+
+ // Adds a handler which takes messages and places them directly on a queue.
+ // T must be a Message with the same format as the messages generated by
+ // the .q files.
+ template <class T>
+ void AddDirectQueueSender(const ::std::string &process_name,
+ const ::std::string &log_message,
+ const ::aos::Queue<T> &queue) {
+ AddHandler(process_name, log_message,
+ ::std::function<void(const T &)>(
+ QueueDumpStructHandler<T>(queue.name())));
+ }
+
+ private:
+ // A generic handler of struct log messages.
+ class StructHandlerInterface {
+ public:
+ virtual ~StructHandlerInterface() {}
+
+ virtual void HandleStruct(::aos::time::Time log_time, uint32_t type_id,
+ const void *data, size_t data_size) = 0;
+ };
+
+ // Converts struct log messages to a message type and passes it to an
+ // ::std::function.
+ template <class T>
+ class TypedStructHandler : public StructHandlerInterface {
+ public:
+ TypedStructHandler(::std::function<void(const T &message)> handler)
+ : handler_(handler) {}
+
+ void HandleStruct(::aos::time::Time log_time, uint32_t type_id,
+ const void *data, size_t data_size) override {
+ CHECK_EQ(type_id, T::GetType()->id);
+ T message;
+ CHECK_EQ(data_size, T::Size());
+ CHECK_EQ(data_size, message.Deserialize(static_cast<const char *>(data)));
+ message.sent_time = log_time;
+ handler_(message);
+ }
+
+ private:
+ const ::std::function<void(T message)> handler_;
+ };
+
+ // A callable class which dumps messages straight to a queue.
+ template <class T>
+ class QueueDumpStructHandler {
+ public:
+ QueueDumpStructHandler(const ::std::string &queue_name)
+ : queue_(RawQueue::Fetch(queue_name.c_str(), sizeof(T), T::kHash,
+ T::kQueueLength)) {}
+
+ void operator()(const T &message) {
+ LOG_STRUCT(DEBUG, "re-sending", message);
+ void *raw_message = queue_->GetMessage();
+ CHECK_NOTNULL(raw_message);
+ memcpy(raw_message, &message, sizeof(message));
+ CHECK(queue_->WriteMessage(raw_message, RawQueue::kOverride));
+ }
+
+ private:
+ ::aos::RawQueue *const queue_;
+ };
+
+ // A key for specifying log messages to give to a certain handler.
+ struct Key {
+ Key(const ::std::string &process_name, const ::std::string &log_message)
+ : process_name(process_name), log_message(log_message) {}
+
+ ::std::string process_name;
+ ::std::string log_message;
+ };
+
+ struct KeyHash {
+ size_t operator()(const Key &key) const {
+ return string_hash(key.process_name) ^
+ (string_hash(key.log_message) << 1);
+ }
+
+ private:
+ const ::std::hash<::std::string> string_hash = ::std::hash<::std::string>();
+ };
+ struct KeyEqual {
+ bool operator()(const Key &a, const Key &b) const {
+ return a.process_name == b.process_name && a.log_message == b.log_message;
+ }
+ };
+
+ ::std::unordered_map<const Key, ::std::unique_ptr<StructHandlerInterface>,
+ KeyHash, KeyEqual> handlers_;
+ ::std::unique_ptr<LogFileReader> reader_;
+
+ DISALLOW_COPY_AND_ASSIGN(LogReplayer);
+};
+
+} // namespace linux_code
+} // namespace logging
+} // namespace aos
+
+#endif // AOS_LINUX_CODE_LOGGING_LOG_REPLAY_H_
diff --git a/aos/linux_code/logging/logging.gyp b/aos/linux_code/logging/logging.gyp
index 65939df..2ae9e0b 100644
--- a/aos/linux_code/logging/logging.gyp
+++ b/aos/linux_code/logging/logging.gyp
@@ -2,6 +2,19 @@
'targets': [
# linux_* is dealt with by aos/build/aos.gyp:logging.
{
+ 'target_name': 'log_replay',
+ 'type': 'static_library',
+ 'sources': [
+ 'log_replay.cc',
+ ],
+ 'dependencies': [
+ 'binary_log_file',
+ '<(AOS)/common/common.gyp:queues',
+ '<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
+ ],
+ },
+ {
'target_name': 'binary_log_writer',
'type': 'executable',
'sources': [