Add library function to lookup storage duration per channel
This sets us up to actually add configuration per channel and have all
the call sites automatically update.
Change-Id: I1c1fe11b2096d6e2bdf578902e609b718c3bcb5c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index d100321..e674a75 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -367,7 +367,7 @@
std::numeric_limits<uint16_t>::max())
<< ": More messages/second configured than the queue can hold on "
<< CleanedChannelToString(c) << ", " << c->frequency() << "hz for "
- << config.message().channel_storage_duration() << "ns";
+ << ChannelStorageDuration(&config.message(), c).count() << "ns";
if (c->has_logger_nodes()) {
// Confirm that we don't have duplicate logger nodes.
@@ -1609,9 +1609,15 @@
return result;
}
+chrono::nanoseconds ChannelStorageDuration(const Configuration *config,
+ const Channel *channel) {
+ CHECK(channel != nullptr);
+ return chrono::nanoseconds(config->channel_storage_duration());
+}
+
int QueueSize(const Configuration *config, const Channel *channel) {
return QueueSize(channel->frequency(),
- chrono::nanoseconds(config->channel_storage_duration()));
+ ChannelStorageDuration(config, channel));
}
int QueueSize(size_t frequency, chrono::nanoseconds channel_storage_duration) {
diff --git a/aos/configuration.h b/aos/configuration.h
index 06f4db4..68b3079 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -231,6 +231,10 @@
GetSchemaDetachedBuffer(const Configuration *config,
std::string_view schema_type);
+// Returns the storage duration for a channel.
+std::chrono::nanoseconds ChannelStorageDuration(const Configuration *config,
+ const Channel *channel);
+
// Adds the specified channel to the config and returns the new, merged, config.
// The channel name is derived from the specified name, the type and schema from
// the provided schema, the source node from the specified node, and all other
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 7e79a8b..0d7794e 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -2241,7 +2241,9 @@
// Sanity check channel frequencies to ensure that we've designed the test
// correctly.
ASSERT_EQ(800, sender.channel()->frequency());
- ASSERT_EQ(2000000000, loop1->configuration()->channel_storage_duration());
+ ASSERT_EQ(2000000000, configuration::ChannelStorageDuration(
+ loop1->configuration(), sender.channel())
+ .count());
constexpr int kMaxAllowedMessages = 800 * 2;
constexpr int kSendMessages = kMaxAllowedMessages * 2;
constexpr int kDroppedMessages = kSendMessages - kMaxAllowedMessages;
@@ -3195,15 +3197,8 @@
}
int TestChannelQueueSize(EventLoop *event_loop) {
- const int frequency = TestChannelFrequency(event_loop);
- const auto channel_storage_duration = std::chrono::nanoseconds(
- event_loop->configuration()->channel_storage_duration());
- const int queue_size =
- frequency * std::chrono::duration_cast<std::chrono::duration<double>>(
- channel_storage_duration)
- .count();
-
- return queue_size;
+ return configuration::QueueSize(event_loop->configuration(),
+ event_loop->GetChannel<TestMessage>("/test"));
}
RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender) {
@@ -3244,10 +3239,9 @@
});
const auto kRepeatOffset = std::chrono::milliseconds(1);
- const auto base_offset =
- std::chrono::nanoseconds(
- event_loop->configuration()->channel_storage_duration()) -
- (kRepeatOffset * (queue_size / 2));
+ const auto base_offset = configuration::ChannelStorageDuration(
+ event_loop->configuration(), sender.channel()) -
+ (kRepeatOffset * (queue_size / 2));
event_loop->OnRun([&event_loop, &timer, &base_offset, &kRepeatOffset]() {
timer->Schedule(event_loop->monotonic_now() + base_offset, kRepeatOffset);
});
@@ -3271,8 +3265,8 @@
const std::chrono::milliseconds kInterval = std::chrono::milliseconds(10);
const monotonic_clock::duration channel_storage_duration =
- std::chrono::nanoseconds(
- event_loop->configuration()->channel_storage_duration());
+ configuration::ChannelStorageDuration(event_loop->configuration(),
+ sender.channel());
const int queue_size = TestChannelQueueSize(event_loop.get());
int msgs_sent = 0;
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index ea265b5..e6913c1 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -146,7 +146,7 @@
current_message_time_ = context.monotonic_event_time;
channel_storage_duration_messages_.push(current_message_time_);
while (channel_storage_duration_messages_.front() +
- std::chrono::nanoseconds(config_->channel_storage_duration()) <=
+ aos::configuration::ChannelStorageDuration(config_, channel_) <=
current_message_time_) {
channel_storage_duration_messages_.pop();
}
@@ -188,7 +188,10 @@
double max_messages_per_sec() const {
return max_messages_per_period_ /
std::min(SecondsActive(),
- 1e-9 * config_->channel_storage_duration());
+ std::chrono::duration<double>(
+ aos::configuration::ChannelStorageDuration(config_,
+ channel_))
+ .count());
}
size_t avg_message_size() const {
return total_message_size_ / total_num_messages_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index f2cd702..5990a7a 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -545,10 +545,8 @@
});
constexpr std::chrono::microseconds kSendPeriod{10};
- const int max_legal_messages =
- ping_sender.channel()->frequency() *
- event_loop_factory.configuration()->channel_storage_duration() /
- 1000000000;
+ const int max_legal_messages = configuration::QueueSize(
+ event_loop_factory.configuration(), ping_sender.channel());
ping_spammer_event_loop->OnRun(
[&ping_spammer_event_loop, kSendPeriod, timer_handler]() {
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 09cd488..4d46d1d 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -379,12 +379,12 @@
: RawSender(event_loop, channel),
lockless_queue_memory_(shm_base, FLAGS_permissions,
event_loop->configuration(), channel),
- lockless_queue_sender_(VerifySender(
- ipc_lib::LocklessQueueSender::Make(
- lockless_queue_memory_.queue(),
- std::chrono::nanoseconds(
- event_loop->configuration()->channel_storage_duration())),
- channel)),
+ lockless_queue_sender_(
+ VerifySender(ipc_lib::LocklessQueueSender::Make(
+ lockless_queue_memory_.queue(),
+ configuration::ChannelStorageDuration(
+ event_loop->configuration(), channel)),
+ channel)),
wake_upper_(lockless_queue_memory_.queue()) {}
~ShmSender() override { shm_event_loop()->CheckCurrentThread(); }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index b17e5ff..5345b8d 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -808,8 +808,8 @@
->emplace(SimpleChannel(channel),
std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
channel,
- std::chrono::nanoseconds(
- configuration()->channel_storage_duration()),
+ configuration::ChannelStorageDuration(
+ configuration(), channel),
scheduler_)))
.first;
}