Enable snappy compression in logger
In doing so, increase the parameterization of logger_test a bit, so up
the shard count.
Change-Id: I9d8d571b4eccc56266c9ba839783f3df9f9c36ab
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index a923cf6..a0ae78a 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -3,6 +3,7 @@
#include "absl/strings/str_format.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/log_reader.h"
+#include "aos/events/logging/snappy_encoder.h"
#include "aos/events/logging/log_writer.h"
#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
@@ -415,8 +416,30 @@
}
}
+struct CompressionParams {
+ std::string_view extension;
+ std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory;
+};
+
+std::ostream &operator<<(std::ostream &ostream,
+ const CompressionParams ¶ms) {
+ ostream << "\"" << params.extension << "\"";
+ return ostream;
+}
+
+std::vector<CompressionParams> SupportedCompressionAlgorithms() {
+ return {{"", []() { return std::make_unique<DummyEncoder>(); }},
+ {SnappyDecoder::kExtension,
+ []() { return std::make_unique<SnappyEncoder>(); }},
+#ifdef LZMA
+ {LzmaDecoder::kExtension,
+ []() { return std::make_unique<LzmaEncoder>(3); }}
+#endif // LZMA
+ };
+}
+
// Parameters to run all the tests with.
-struct Param {
+struct ConfigParams {
// The config file to use.
std::string config;
// If true, the RemoteMessage channel should be shared between all the remote
@@ -427,11 +450,18 @@
std::string_view sha256;
};
-class MultinodeLoggerTest : public ::testing::TestWithParam<struct Param> {
+std::ostream &operator<<(std::ostream &ostream, const ConfigParams ¶ms) {
+ ostream << "{config: \"" << params.config << "\", shared: " << params.shared
+ << ", sha256: \"" << params.sha256 << "\"}";
+ return ostream;
+}
+
+class MultinodeLoggerTest : public ::testing::TestWithParam<
+ std::tuple<ConfigParams, CompressionParams>> {
public:
MultinodeLoggerTest()
- : config_(aos::configuration::ReadConfig(ArtifactPath(
- absl::StrCat("aos/events/logging/", GetParam().config)))),
+ : config_(aos::configuration::ReadConfig(ArtifactPath(absl::StrCat(
+ "aos/events/logging/", std::get<0>(GetParam()).config)))),
time_converter_(configuration::NodesCount(&config_.message())),
event_loop_factory_(&config_.message()),
pi1_(event_loop_factory_.GetNodeEventLoopFactory("pi1")),
@@ -447,7 +477,7 @@
logfiles_(MakeLogFiles(logfile_base1_, logfile_base2_)),
pi1_single_direction_logfiles_(MakePi1SingleDirectionLogfiles()),
structured_logfiles_(StructureLogFiles()) {
- LOG(INFO) << "Config " << GetParam().config;
+ LOG(INFO) << "Config " << std::get<0>(GetParam()).config;
event_loop_factory_.SetTimeConverter(&time_converter_);
// Go through and remove the logfiles if they already exist.
@@ -472,79 +502,89 @@
pi2_->OnStartup([this]() { pi2_->AlwaysStart<Pong>("pong"); });
}
- bool shared() const { return GetParam().shared; }
+ bool shared() const { return std::get<0>(GetParam()).shared; }
std::vector<std::string> MakeLogFiles(std::string logfile_base1,
std::string logfile_base2,
size_t pi1_data_count = 2,
size_t pi2_data_count = 2) {
std::vector<std::string> result;
- result.emplace_back(
- absl::StrCat(logfile_base1, "_", GetParam().sha256, ".bfbs"));
- result.emplace_back(
- absl::StrCat(logfile_base2, "_", GetParam().sha256, ".bfbs"));
+ result.emplace_back(absl::StrCat(
+ logfile_base1, "_", std::get<0>(GetParam()).sha256, Extension()));
+ result.emplace_back(absl::StrCat(
+ logfile_base2, "_", std::get<0>(GetParam()).sha256, Extension()));
for (size_t i = 0; i < pi1_data_count; ++i) {
result.emplace_back(
- absl::StrCat(logfile_base1, "_pi1_data.part", i, ".bfbs"));
+ absl::StrCat(logfile_base1, "_pi1_data.part", i, Extension()));
}
result.emplace_back(logfile_base1 +
- "_pi2_data/test/aos.examples.Pong.part0.bfbs");
+ "_pi2_data/test/aos.examples.Pong.part0" + Extension());
result.emplace_back(logfile_base1 +
- "_pi2_data/test/aos.examples.Pong.part1.bfbs");
+ "_pi2_data/test/aos.examples.Pong.part1" + Extension());
for (size_t i = 0; i < pi2_data_count; ++i) {
result.emplace_back(
- absl::StrCat(logfile_base2, "_pi2_data.part", i, ".bfbs"));
+ absl::StrCat(logfile_base2, "_pi2_data.part", i, Extension()));
}
- result.emplace_back(
- logfile_base2 +
- "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs");
- result.emplace_back(
- logfile_base2 +
- "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs");
- result.emplace_back(
- logfile_base1 +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs");
- result.emplace_back(
- logfile_base1 +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs");
+ result.emplace_back(logfile_base2 +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0" +
+ Extension());
+ result.emplace_back(logfile_base2 +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1" +
+ Extension());
+ result.emplace_back(logfile_base1 +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0" +
+ Extension());
+ result.emplace_back(logfile_base1 +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1" +
+ Extension());
if (shared()) {
result.emplace_back(logfile_base1 +
"_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.message_bridge.RemoteMessage.part0.bfbs");
+ "aos.message_bridge.RemoteMessage.part0" +
+ Extension());
result.emplace_back(logfile_base1 +
"_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.message_bridge.RemoteMessage.part1.bfbs");
+ "aos.message_bridge.RemoteMessage.part1" +
+ Extension());
result.emplace_back(logfile_base2 +
"_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.message_bridge.RemoteMessage.part0.bfbs");
+ "aos.message_bridge.RemoteMessage.part0" +
+ Extension());
result.emplace_back(logfile_base2 +
"_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.message_bridge.RemoteMessage.part1.bfbs");
+ "aos.message_bridge.RemoteMessage.part1" +
+ Extension());
} else {
result.emplace_back(logfile_base1 +
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
"aos-message_bridge-Timestamp/"
- "aos.message_bridge.RemoteMessage.part0.bfbs");
+ "aos.message_bridge.RemoteMessage.part0" +
+ Extension());
result.emplace_back(logfile_base1 +
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
"aos-message_bridge-Timestamp/"
- "aos.message_bridge.RemoteMessage.part1.bfbs");
+ "aos.message_bridge.RemoteMessage.part1" +
+ Extension());
result.emplace_back(logfile_base2 +
"_timestamps/pi2/aos/remote_timestamps/pi1/pi2/aos/"
"aos-message_bridge-Timestamp/"
- "aos.message_bridge.RemoteMessage.part0.bfbs");
+ "aos.message_bridge.RemoteMessage.part0" +
+ Extension());
result.emplace_back(logfile_base2 +
"_timestamps/pi2/aos/remote_timestamps/pi1/pi2/aos/"
"aos-message_bridge-Timestamp/"
- "aos.message_bridge.RemoteMessage.part1.bfbs");
+ "aos.message_bridge.RemoteMessage.part1" +
+ Extension());
result.emplace_back(logfile_base1 +
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
"aos-examples-Ping/"
- "aos.message_bridge.RemoteMessage.part0.bfbs");
+ "aos.message_bridge.RemoteMessage.part0" +
+ Extension());
result.emplace_back(logfile_base1 +
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
"aos-examples-Ping/"
- "aos.message_bridge.RemoteMessage.part1.bfbs");
+ "aos.message_bridge.RemoteMessage.part1" +
+ Extension());
}
return result;
@@ -552,99 +592,111 @@
std::vector<std::string> MakePi1RebootLogfiles() {
std::vector<std::string> result;
- result.emplace_back(logfile_base1_ + "_pi1_data.part0.bfbs");
- result.emplace_back(logfile_base1_ + "_pi1_data.part1.bfbs");
- result.emplace_back(logfile_base1_ + "_pi1_data.part2.bfbs");
+ result.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
+ result.emplace_back(logfile_base1_ + "_pi1_data.part1" + Extension());
+ result.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
result.emplace_back(logfile_base1_ +
- "_pi2_data/test/aos.examples.Pong.part0.bfbs");
+ "_pi2_data/test/aos.examples.Pong.part0" + Extension());
result.emplace_back(logfile_base1_ +
- "_pi2_data/test/aos.examples.Pong.part1.bfbs");
+ "_pi2_data/test/aos.examples.Pong.part1" + Extension());
result.emplace_back(logfile_base1_ +
- "_pi2_data/test/aos.examples.Pong.part2.bfbs");
+ "_pi2_data/test/aos.examples.Pong.part2" + Extension());
result.emplace_back(logfile_base1_ +
- "_pi2_data/test/aos.examples.Pong.part3.bfbs");
- result.emplace_back(
- logfile_base1_ +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs");
- result.emplace_back(
- logfile_base1_ +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs");
- result.emplace_back(
- logfile_base1_ +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part2.bfbs");
- result.emplace_back(
- logfile_base1_ +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part3.bfbs");
- result.emplace_back(
- absl::StrCat(logfile_base1_, "_", GetParam().sha256, ".bfbs"));
+ "_pi2_data/test/aos.examples.Pong.part3" + Extension());
+ result.emplace_back(logfile_base1_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0" +
+ Extension());
+ result.emplace_back(logfile_base1_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1" +
+ Extension());
+ result.emplace_back(logfile_base1_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part2" +
+ Extension());
+ result.emplace_back(logfile_base1_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part3" +
+ Extension());
+ result.emplace_back(absl::StrCat(
+ logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
if (shared()) {
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.message_bridge.RemoteMessage.part0.bfbs");
+ "aos.message_bridge.RemoteMessage.part0" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.message_bridge.RemoteMessage.part1.bfbs");
+ "aos.message_bridge.RemoteMessage.part1" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.message_bridge.RemoteMessage.part2.bfbs");
+ "aos.message_bridge.RemoteMessage.part2" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.message_bridge.RemoteMessage.part3.bfbs");
+ "aos.message_bridge.RemoteMessage.part3" +
+ Extension());
} else {
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
"aos-message_bridge-Timestamp/"
- "aos.message_bridge.RemoteMessage.part0.bfbs");
+ "aos.message_bridge.RemoteMessage.part0" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
"aos-message_bridge-Timestamp/"
- "aos.message_bridge.RemoteMessage.part1.bfbs");
+ "aos.message_bridge.RemoteMessage.part1" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
"aos-message_bridge-Timestamp/"
- "aos.message_bridge.RemoteMessage.part2.bfbs");
+ "aos.message_bridge.RemoteMessage.part2" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
"aos-message_bridge-Timestamp/"
- "aos.message_bridge.RemoteMessage.part3.bfbs");
+ "aos.message_bridge.RemoteMessage.part3" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
"aos-examples-Ping/"
- "aos.message_bridge.RemoteMessage.part0.bfbs");
+ "aos.message_bridge.RemoteMessage.part0" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
"aos-examples-Ping/"
- "aos.message_bridge.RemoteMessage.part1.bfbs");
+ "aos.message_bridge.RemoteMessage.part1" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
"aos-examples-Ping/"
- "aos.message_bridge.RemoteMessage.part2.bfbs");
+ "aos.message_bridge.RemoteMessage.part2" +
+ Extension());
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
"aos-examples-Ping/"
- "aos.message_bridge.RemoteMessage.part3.bfbs");
+ "aos.message_bridge.RemoteMessage.part3" +
+ Extension());
}
return result;
}
std::vector<std::string> MakePi1SingleDirectionLogfiles() {
std::vector<std::string> result;
- result.emplace_back(logfile_base1_ + "_pi1_data.part0.bfbs");
- result.emplace_back(logfile_base1_ + "_pi1_data.part1.bfbs");
- result.emplace_back(
- logfile_base1_ +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs");
- result.emplace_back(
- absl::StrCat(logfile_base1_, "_", GetParam().sha256, ".bfbs"));
+ result.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
+ result.emplace_back(logfile_base1_ + "_pi1_data.part1" + Extension());
+ result.emplace_back(logfile_base1_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0" +
+ Extension());
+ result.emplace_back(absl::StrCat(
+ logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
return result;
}
std::vector<std::string> MakePi1DeadNodeLogfiles() {
std::vector<std::string> result;
- result.emplace_back(logfile_base1_ + "_pi1_data.part0.bfbs");
- result.emplace_back(
- absl::StrCat(logfile_base1_, "_", GetParam().sha256, ".bfbs"));
+ result.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
+ result.emplace_back(absl::StrCat(
+ logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
return result;
}
@@ -666,6 +718,10 @@
return result;
}
+ std::string Extension() {
+ return absl::StrCat(".bfbs", std::get<1>(GetParam()).extension);
+ }
+
struct LoggerState {
std::unique_ptr<EventLoop> event_loop;
std::unique_ptr<Logger> logger;
@@ -690,8 +746,7 @@
nullptr};
}
- void StartLogger(LoggerState *logger, std::string logfile_base = "",
- bool compress = false) {
+ void StartLogger(LoggerState *logger, std::string logfile_base = "") {
if (logfile_base.empty()) {
if (logger->event_loop->node()->name()->string_view() == "pi1") {
logfile_base = logfile_base1_;
@@ -705,20 +760,13 @@
logger->logger->set_polling_period(std::chrono::milliseconds(100));
logger->logger->set_name(absl::StrCat(
"name_prefix_", logger->event_loop->node()->name()->str()));
- logger->event_loop->OnRun([logger, logfile_base, compress]() {
+ logger->event_loop->OnRun([logger, logfile_base]() {
std::unique_ptr<MultiNodeLogNamer> namer =
std::make_unique<MultiNodeLogNamer>(
logfile_base, logger->configuration, logger->event_loop.get(),
logger->node);
- if (compress) {
-#ifdef LZMA
- namer->set_extension(".xz");
- namer->set_encoder_factory(
- []() { return std::make_unique<aos::logger::LzmaEncoder>(3); });
-#else
- LOG(FATAL) << "Compression unsupported";
-#endif
- }
+ namer->set_extension(std::get<1>(GetParam()).extension);
+ namer->set_encoder_factory(std::get<1>(GetParam()).encoder_factory);
logger->log_namer = namer.get();
logger->logger->StartLogging(std::move(namer));
@@ -1678,7 +1726,7 @@
}
// TODO(austin): Should we flip out if the file can't open?
- const std::string kEmptyFile("foobarinvalidfiledoesnotexist.bfbs");
+ const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
aos::util::WriteStringToFileOrDie(kEmptyFile, "");
logfiles_.emplace_back(kEmptyFile);
@@ -1687,39 +1735,9 @@
VerifyParts(sorted_parts, {kEmptyFile});
}
-#ifdef LZMA
-// Tests that we can sort a bunch of parts with an empty .xz file in there. The
-// empty file should be ignored.
-TEST_P(MultinodeLoggerTest, SortEmptyCompressedParts) {
- time_converter_.StartEqual();
- // Make a bunch of parts.
- {
- LoggerState pi1_logger = MakeLogger(pi1_);
- LoggerState pi2_logger = MakeLogger(pi2_);
-
- event_loop_factory_.RunFor(chrono::milliseconds(95));
-
- StartLogger(&pi1_logger, "", true);
- StartLogger(&pi2_logger, "", true);
-
- event_loop_factory_.RunFor(chrono::milliseconds(2000));
- }
-
- // TODO(austin): Should we flip out if the file can't open?
- const std::string kEmptyFile("foobarinvalidfiledoesnotexist.bfbs.xz");
-
- AddExtension(".xz");
-
- aos::util::WriteStringToFileOrDie(kEmptyFile, "");
- logfiles_.emplace_back(kEmptyFile);
-
- const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
- VerifyParts(sorted_parts, {kEmptyFile});
-}
-
-// Tests that we can sort a bunch of parts with the end missing off a compressed
+// Tests that we can sort a bunch of parts with the end missing off a
// file. We should use the part we can read.
-TEST_P(MultinodeLoggerTest, SortTruncatedCompressedParts) {
+TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
time_converter_.StartEqual();
// Make a bunch of parts.
{
@@ -1728,27 +1746,25 @@
event_loop_factory_.RunFor(chrono::milliseconds(95));
- StartLogger(&pi1_logger, "", true);
- StartLogger(&pi2_logger, "", true);
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
event_loop_factory_.RunFor(chrono::milliseconds(2000));
}
- // Append everything with .xz.
- AddExtension(".xz");
-
// Strip off the end of one of the files. Pick one with a lot of data.
+ // For snappy, needs to have enough data to be >1 chunk of compressed data so
+ // that we don't corrupt the entire log part.
::std::string compressed_contents =
- aos::util::ReadFileToStringOrDie(logfiles_[2]);
+ aos::util::ReadFileToStringOrDie(logfiles_[3]);
aos::util::WriteStringToFileOrDie(
- logfiles_[2],
+ logfiles_[3],
compressed_contents.substr(0, compressed_contents.size() - 100));
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
VerifyParts(sorted_parts);
}
-#endif
// Tests that if we remap a remapped channel, it shows up correctly.
TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
@@ -2560,17 +2576,23 @@
INSTANTIATE_TEST_SUITE_P(
All, MultinodeLoggerTest,
- ::testing::Values(Param{"multinode_pingpong_combined_config.json", true,
- kCombinedConfigSha1},
- Param{"multinode_pingpong_split_config.json", false,
- kSplitConfigSha1}));
+ ::testing::Combine(::testing::Values(
+ ConfigParams{
+ "multinode_pingpong_combined_config.json", true,
+ kCombinedConfigSha1},
+ ConfigParams{"multinode_pingpong_split_config.json",
+ false, kSplitConfigSha1}),
+ ::testing::ValuesIn(SupportedCompressionAlgorithms())));
INSTANTIATE_TEST_SUITE_P(
All, MultinodeLoggerDeathTest,
- ::testing::Values(Param{"multinode_pingpong_combined_config.json", true,
- kCombinedConfigSha1},
- Param{"multinode_pingpong_split_config.json", false,
- kSplitConfigSha1}));
+ ::testing::Combine(::testing::Values(
+ ConfigParams{
+ "multinode_pingpong_combined_config.json", true,
+ kCombinedConfigSha1},
+ ConfigParams{"multinode_pingpong_split_config.json",
+ false, kSplitConfigSha1}),
+ ::testing::ValuesIn(SupportedCompressionAlgorithms())));
// Tests that we can relog with a different config. This makes most sense when
// you are trying to edit a log and want to use channel renaming + the original