Merge "Add ability to get source aos::Node* from a Channel*"
diff --git a/aos/containers/resizeable_buffer.h b/aos/containers/resizeable_buffer.h
index 2f6ff32..9f9967f 100644
--- a/aos/containers/resizeable_buffer.h
+++ b/aos/containers/resizeable_buffer.h
@@ -118,6 +118,20 @@
   }
 };
 
+// Allocates aligned memory.
+template <size_t alignment>
+class AlignedReallocator {
+ public:
+  static void *Realloc(void *old, size_t old_size, size_t new_capacity) {
+    void *new_memory = std::aligned_alloc(alignment, new_capacity);
+    if (old) {
+      memcpy(new_memory, old, old_size);
+      free(old);
+    }
+    return new_memory;
+  }
+};
+
 // A resizable buffer which uses realloc when it needs to grow to attempt to
 // avoid full coppies.
 class ResizeableBuffer : public AllocatorResizeableBuffer<Reallocator> {};
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index eadefa8..3b5f57a 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1014,7 +1014,9 @@
     srcs = [
         "ssd_profiler.cc",
     ],
+    visibility = ["//visibility:public"],
     deps = [
+        ":log_backend",
         "//aos:init",
         "//aos/containers:resizeable_buffer",
         "//aos/time",
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
index 394992c..a155bb5 100644
--- a/aos/events/logging/buffer_encoder.h
+++ b/aos/events/logging/buffer_encoder.h
@@ -15,6 +15,10 @@
  public:
   virtual ~DataEncoder() = default;
 
+  // Size of an aligned sector used to detect when the data is aligned enough to
+  // use O_DIRECT instead.
+  static constexpr size_t kSector = 512u;
+
   // Interface to copy data into a buffer.
   class Copier {
    public:
@@ -114,21 +118,7 @@
  private:
   size_t total_bytes_ = 0;
 
-  // A class which uses aligned_alloc to allocate sector aligned blocks of
-  // memory.
-  class AlignedReallocator {
-   public:
-    static void *Realloc(void *old, size_t old_size, size_t new_capacity) {
-      void *new_memory = std::aligned_alloc(512, new_capacity);
-      if (old) {
-        memcpy(new_memory, old, old_size);
-        free(old);
-      }
-      return new_memory;
-    }
-  };
-
-  AllocatorResizeableBuffer<AlignedReallocator> input_buffer_;
+  AllocatorResizeableBuffer<aos::AlignedReallocator<kSector>> input_buffer_;
   std::vector<absl::Span<const uint8_t>> return_queue_;
 };
 
diff --git a/aos/events/logging/log_backend_test.cc b/aos/events/logging/log_backend_test.cc
index 1e95c10..0603b33 100644
--- a/aos/events/logging/log_backend_test.cc
+++ b/aos/events/logging/log_backend_test.cc
@@ -246,19 +246,10 @@
 struct FileWriteTestBase : public ::testing::Test {
   uint8_t NextRandom() { return distribution(engine); }
 
-  class AlignedReallocator {
-   public:
-    static void *Realloc(void *old, size_t old_size, size_t new_capacity) {
-      void *new_memory = std::aligned_alloc(512, new_capacity);
-      if (old) {
-        memcpy(new_memory, old, old_size);
-        free(old);
-      }
-      return new_memory;
-    }
-  };
+  AllocatorResizeableBuffer<AlignedReallocator<aos::logger::FileHandler::kSector
 
-  AllocatorResizeableBuffer<AlignedReallocator> buffer;
+                                               >>
+      buffer;
 
   void TestRecipe(const WriteRecipe &recipe) {
     VLOG(1) << "Starting";
@@ -374,8 +365,8 @@
 
 // Test an aligned to unaligned transition to make sure everything works.
 TEST_F(FileWriteTestBase, AlignedToUnaligned) {
-  AllocatorResizeableBuffer<AlignedReallocator> aligned_buffer;
-  AllocatorResizeableBuffer<AlignedReallocator> unaligned_buffer;
+  AllocatorResizeableBuffer<AlignedReallocator<512>> aligned_buffer;
+  AllocatorResizeableBuffer<AlignedReallocator<512>> unaligned_buffer;
 
   aligned_buffer.resize(FileHandler::kSector * 4);
   std::generate(std::begin(aligned_buffer), std::end(aligned_buffer),
diff --git a/aos/events/logging/log_reader_utils_test.cc b/aos/events/logging/log_reader_utils_test.cc
index 9cd64f2..ff2484e 100644
--- a/aos/events/logging/log_reader_utils_test.cc
+++ b/aos/events/logging/log_reader_utils_test.cc
@@ -157,4 +157,75 @@
   EXPECT_EQ(logs.front().name, log_file);
 }
 
+// Tests that FindLogs returns reasonable results.
+TEST(LogfileSorting, FindLogs) {
+  std::string log_folder = aos::testing::TestTmpDir() + "/log_folder";
+  util::UnlinkRecursive(log_folder);
+  std::filesystem::create_directories(log_folder);
+
+  std::filesystem::create_directories(log_folder + "/log1/a");
+  std::ofstream(log_folder + "/log1/a/part1.bfbs").good();
+  std::ofstream(log_folder + "/log1/a/part2.bfbs").good();
+  std::ofstream(log_folder + "/log1/a/randomfile").good();
+  std::filesystem::create_directories(log_folder + "/log1/b");
+  std::ofstream(log_folder + "/log1/b/part1.bfbs").good();
+  std::ofstream(log_folder + "/log1/b/randomfile").good();
+  std::filesystem::create_directories(log_folder + "/log1/c");
+  std::ofstream(log_folder + "/log1/c/part1.bfbs").good();
+  std::ofstream(log_folder + "/log1/c/part2.bfbs").good();
+  std::ofstream(log_folder + "/log1/c/part3.bfbs").good();
+
+  std::filesystem::create_directories(log_folder + "/log2/a");
+  std::ofstream(log_folder + "/log2/a/part1.bfbs").good();
+  std::ofstream(log_folder + "/log2/a/part2.bfbs").good();
+  std::ofstream(log_folder + "/log2/a/part3.bfbs").good();
+  std::ofstream(log_folder + "/log2/a/randomfile").good();
+
+  std::filesystem::create_directories(log_folder + "/log3/b");
+  std::ofstream(log_folder + "/log3/b/part1.bfbs").good();
+  std::filesystem::create_directories(log_folder + "/log3/c");
+  std::ofstream(log_folder + "/log3/c/part1.bfbs").good();
+  std::ofstream(log_folder + "/log3/c/part2.bfbs").good();
+  std::ofstream(log_folder + "/log3/c/part3.bfbs").good();
+
+  auto empty_file_with_name = [](std::string_view name) {
+    return ::testing::AllOf(
+        ::testing::Field(&internal::FileOperations::File::name, name),
+        ::testing::Field(&internal::FileOperations::File::size, 0u));
+  };
+
+  {
+    std::vector<internal::FileOperations::File> result = FindLogs(
+        std::vector<std::string>{log_folder + "/log1", log_folder + "/log3"});
+
+    EXPECT_EQ(result.size(), 10);
+  }
+
+  {
+    std::vector<internal::FileOperations::File> result =
+        FindLogs(std::vector<std::string>{log_folder + "/log1"});
+
+    EXPECT_THAT(result,
+                ::testing::UnorderedElementsAre(
+                    empty_file_with_name(log_folder + "/log1/a/part1.bfbs"),
+                    empty_file_with_name(log_folder + "/log1/a/part2.bfbs"),
+                    empty_file_with_name(log_folder + "/log1/b/part1.bfbs"),
+                    empty_file_with_name(log_folder + "/log1/c/part1.bfbs"),
+                    empty_file_with_name(log_folder + "/log1/c/part2.bfbs"),
+                    empty_file_with_name(log_folder + "/log1/c/part3.bfbs")));
+  }
+
+  {
+    std::vector<internal::FileOperations::File> result =
+        FindLogs(std::vector<std::string>{log_folder + "/log3"});
+
+    EXPECT_THAT(result,
+                ::testing::UnorderedElementsAre(
+                    empty_file_with_name(log_folder + "/log3/b/part1.bfbs"),
+                    empty_file_with_name(log_folder + "/log3/c/part1.bfbs"),
+                    empty_file_with_name(log_folder + "/log3/c/part2.bfbs"),
+                    empty_file_with_name(log_folder + "/log3/c/part3.bfbs")));
+  }
+}
+
 }  // namespace aos::logger::testing
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 4c859c2..fdb4eab 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -126,17 +126,35 @@
   return files;
 }
 
+namespace {
+
+void AddLogfiles(std::string_view filename,
+                 std::vector<internal::FileOperations::File> *found_logfiles) {
+  const auto file_operations = MakeFileOperations(filename);
+  if (file_operations->Exists()) {
+    file_operations->FindLogs(found_logfiles);
+  } else {
+    LOG(FATAL) << "File " << filename << " does not exist";
+  }
+}
+
+};  // namespace
+
 std::vector<internal::FileOperations::File> FindLogs(int argc, char **argv) {
   std::vector<internal::FileOperations::File> found_logfiles;
 
   for (int i = 1; i < argc; i++) {
-    std::string filename = argv[i];
-    const auto file_operations = MakeFileOperations(filename);
-    if (file_operations->Exists()) {
-      file_operations->FindLogs(&found_logfiles);
-    } else {
-      LOG(FATAL) << "File " << filename << " does not exist";
-    }
+    AddLogfiles(argv[i], &found_logfiles);
+  }
+  return found_logfiles;
+}
+
+std::vector<internal::FileOperations::File> FindLogs(
+    const std::vector<std::string> &paths) {
+  std::vector<internal::FileOperations::File> found_logfiles;
+
+  for (const std::string &filename : paths) {
+    AddLogfiles(filename, &found_logfiles);
   }
   return found_logfiles;
 }
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 4f2a2e2..52f44e5 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -167,6 +167,8 @@
 
 // Recursively searches for logfiles in argv[1] and onward.
 std::vector<internal::FileOperations::File> FindLogs(int argc, char **argv);
+std::vector<internal::FileOperations::File> FindLogs(
+    const std::vector<std::string> &paths);
 
 // Proxy container to bind log parts with log source. It helps with reading logs
 // from virtual media such as memory or S3.
diff --git a/aos/events/logging/ssd_profiler.cc b/aos/events/logging/ssd_profiler.cc
index e7493ff..093e17b 100644
--- a/aos/events/logging/ssd_profiler.cc
+++ b/aos/events/logging/ssd_profiler.cc
@@ -12,6 +12,7 @@
 #include "glog/logging.h"
 
 #include "aos/containers/resizeable_buffer.h"
+#include "aos/events/logging/log_backend.h"
 #include "aos/init.h"
 #include "aos/realtime.h"
 #include "aos/time/time.h"
@@ -39,19 +40,6 @@
               "Write speed in MB/s to simulate. This is only used when "
               "--rate_limit is specified.");
 
-// Stolen from aos/events/logging/DummyEncoder
-class AlignedReallocator {
- public:
-  static void *Realloc(void *old, size_t old_size, size_t new_capacity) {
-    void *new_memory = std::aligned_alloc(512, new_capacity);
-    if (old) {
-      memcpy(new_memory, old, old_size);
-      free(old);
-    }
-    return new_memory;
-  }
-};
-
 void trap_sig(int signum) { exit(signum); }
 
 aos::monotonic_clock::time_point start_time = aos::monotonic_clock::min_time;
@@ -82,7 +70,9 @@
     std::signal(SIGHUP, trap_sig);
     std::atexit(cleanup);
   }
-  aos::AllocatorResizeableBuffer<AlignedReallocator> data;
+  aos::AllocatorResizeableBuffer<
+      aos::AlignedReallocator<aos::logger::FileHandler::kSector>>
+      data;
 
   {
     // We want uncompressible data.  The easiest way to do this is to grab a