Add per channel storage durations
This lets us save images for less time than everything else, since they
are the main driver of queue memory usage.
Change-Id: I677791e20af6aedb0288d09cf5963b4dde30ecce
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index e674a75..5afbee0 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -968,7 +968,7 @@
std::map<std::string_view, flatbuffers::Offset<reflection::Schema>>
schema_cache;
- CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 13u)
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
<< ": Merging logic needs to be updated when the number of channel "
"fields changes.";
@@ -1065,6 +1065,10 @@
if (c->has_num_readers()) {
channel_builder.add_num_readers(c->num_readers());
}
+ if (c->has_channel_storage_duration()) {
+ channel_builder.add_channel_storage_duration(
+ c->channel_storage_duration());
+ }
channel_offsets.emplace_back(channel_builder.Finish());
}
channels_offset = fbb.CreateVector(channel_offsets);
@@ -1612,6 +1616,9 @@
chrono::nanoseconds ChannelStorageDuration(const Configuration *config,
const Channel *channel) {
CHECK(channel != nullptr);
+ if (channel->has_channel_storage_duration()) {
+ return chrono::nanoseconds(channel->channel_storage_duration());
+ }
return chrono::nanoseconds(config->channel_storage_duration());
}
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index 5283d9e..0b42f5b 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -105,6 +105,10 @@
//
// Currently, this must be set if and only if read_method is PIN.
num_readers:int (id: 12);
+
+ // Length of this channel in nanoseconds. This overrides
+ // channel_storage_duration below in Configuration for just this channel.
+ channel_storage_duration:long = 2000000000 (id: 13);
}
// Table to support renaming channel names.
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 0d7794e..0559aae 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -3332,5 +3332,24 @@
EXPECT_EQ(SendTestMessage(sender2), RawSender::Error::kMessagesSentTooFast);
}
+// Tests that a longer storage durations store more messages.
+TEST_P(AbstractEventLoopTest, SendingTooFastWithLongDuration) {
+ auto loop1 = MakePrimary();
+
+ auto sender1 = loop1->MakeSender<TestMessage>("/test3");
+
+ // Send queue_size messages split between the senders.
+ const int queue_size =
+ configuration::QueueSize(loop1->configuration(), sender1.channel());
+ EXPECT_EQ(queue_size, 100 * 10);
+ for (int i = 0; i < queue_size; i++) {
+ ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk);
+ }
+
+ // Since queue_size messages have been sent, and little time has elapsed,
+ // this should return an error.
+ EXPECT_EQ(SendTestMessage(sender1), RawSender::Error::kMessagesSentTooFast);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index fe85732..d466a1e 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -47,6 +47,11 @@
{
"name": "/test2",
"type": "aos.TestMessage"
+ },
+ {
+ "name": "/test3",
+ "type": "aos.TestMessage",
+ "channel_storage_duration": 10000000000
}
]
})config",
@@ -97,6 +102,11 @@
{
"name": "/test2",
"type": "aos.TestMessage"
+ },
+ {
+ "name": "/test3",
+ "type": "aos.TestMessage",
+ "channel_storage_duration": 10000000000
}
]
})config",
@@ -139,6 +149,13 @@
"type": "aos.TestMessage",
"read_method": "PIN",
"num_readers": 10
+ },
+ {
+ "name": "/test3",
+ "type": "aos.TestMessage",
+ "read_method": "PIN",
+ "num_readers": 10,
+ "channel_storage_duration": 10000000000
}
]
})config";
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 874fe43..e3dd904 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -98,6 +98,10 @@
std::string_view new_name,
std::string_view new_type,
flatbuffers::FlatBufferBuilder *fbb) {
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
+ << ": Merging logic needs to be updated when the number of channel "
+ "fields changes.";
+
flatbuffers::Offset<flatbuffers::String> name_offset =
fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
: new_name);
@@ -148,6 +152,9 @@
if (c->has_num_readers()) {
channel_builder.add_num_readers(c->num_readers());
}
+ if (c->has_channel_storage_duration()) {
+ channel_builder.add_channel_storage_duration(c->channel_storage_duration());
+ }
return channel_builder.Finish();
}
@@ -1475,7 +1482,7 @@
fbb.ForceDefaults(true);
std::vector<flatbuffers::Offset<Channel>> channel_offsets;
- CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 13u)
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
<< ": Merging logic needs to be updated when the number of channel "
"fields changes.";
@@ -1547,6 +1554,10 @@
if (c->has_frequency()) {
channel_builder.add_frequency(c->frequency());
}
+ if (c->has_channel_storage_duration()) {
+ channel_builder.add_channel_storage_duration(
+ c->channel_storage_duration());
+ }
channel_offsets.emplace_back(channel_builder.Finish());
}
break;
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index aaba8cb..e207179 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -60,13 +60,13 @@
};
constexpr std::string_view kCombinedConfigSha1() {
- return "433bcf2bddfbbd2745a4e0c3c9dda2f9832bb61c5b311e3efdd357b9a19e1b76";
+ return "d018002a9b780d45a69172a1e5dd1d6df49a7c6c63b9bae9125cdc0458ddc6ca";
}
constexpr std::string_view kSplitConfigSha1() {
- return "6956d86e4eeda28d6857c3365f79a7fb0344c74de44bcb5ebe4d51398a4a26d5";
+ return "562f80087c0e95d9304127c4cb46962659b4bfc11def84253c67702b4213e6cf";
}
constexpr std::string_view kReloggedSplitConfigSha1() {
- return "db53e99234ecec2cde4d6b9f7b77c8f5150e0a58f6a441030eebfc1e76a2c89c";
+ return "cb560559ee3111d7c67314e3e1a5fd7fc88e8b4cfd9d15ea71c8d1cae1c0480b";
}
LoggerState MakeLoggerState(NodeEventLoopFactory *node,