Add a sent too fast check for simulation and shm
Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.
Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 67c4472..08b0064 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -26,6 +26,8 @@
return "RawSender::Error::kOk";
case RawSender::Error::kMessagesSentTooFast:
return "RawSender::Error::kMessagesSentTooFast";
+ case RawSender::Error::kInvalidRedzone:
+ return "RawSender::Error::kInvalidRedzone";
}
LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
}
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index e810e3f..c2498b3 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -139,13 +139,16 @@
public:
using SharedSpan = std::shared_ptr<const absl::Span<const uint8_t>>;
- enum class [[nodiscard]] Error{
+ enum class [[nodiscard]] Error {
// Represents success and no error
kOk,
// Error for messages on channels being sent faster than their
// frequency and channel storage duration allow
- kMessagesSentTooFast};
+ kMessagesSentTooFast,
+ // Access to Redzone was attempted in Sender Queue
+ kInvalidRedzone
+ };
RawSender(EventLoop *event_loop, const Channel *channel);
RawSender(const RawSender &) = delete;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index fca25d4..18f5f96 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -4,6 +4,7 @@
#include <unordered_map>
#include <unordered_set>
+#include "aos/events/test_message_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/logging/log_message_generated.h"
#include "aos/logging/logging.h"
@@ -2567,5 +2568,148 @@
"May only send the buffer detached from this Sender");
}
+int TestChannelFrequency(EventLoop *event_loop) {
+ return event_loop->GetChannel<TestMessage>("/test")->frequency();
+}
+
+int TestChannelQueueSize(EventLoop *event_loop) {
+ const int frequency = TestChannelFrequency(event_loop);
+ const auto channel_storage_duration = std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration());
+ const int queue_size =
+ frequency * std::chrono::duration_cast<std::chrono::duration<double>>(
+ channel_storage_duration)
+ .count();
+
+ return queue_size;
+}
+
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender) {
+ aos::Sender<TestMessage>::Builder builder = sender.MakeBuilder();
+ TestMessage::Builder test_message_builder =
+ builder.MakeBuilder<TestMessage>();
+ test_message_builder.add_value(0);
+ return builder.Send(test_message_builder.Finish());
+}
+
+// Test that sending messages too fast returns
+// RawSender::Error::kMessagesSentTooFast.
+TEST_P(AbstractEventLoopTest, SendingMessagesTooFast) {
+ auto event_loop = MakePrimary();
+
+ auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+ // Send one message in the beginning, then wait until the
+ // channel_storage_duration is almost done and start sending messages rapidly,
+ // having some come in the next chanel_storage_duration. The queue_size is
+ // 1600, so the 1601st message will be the last valid one (the initial message
+ // having being sent more than a channel_storage_duration ago), and trying to
+ // send the 1602nd message should return
+ // RawSender::Error::kMessagesSentTooFast.
+ EXPECT_EQ(SendTestMessage(sender), RawSender::Error::kOk);
+ int msgs_sent = 1;
+ const int queue_size = TestChannelQueueSize(event_loop.get());
+
+ const auto timer = event_loop->AddTimer([&]() {
+ const bool done = (msgs_sent == queue_size + 1);
+ ASSERT_EQ(
+ SendTestMessage(sender),
+ done ? RawSender::Error::kMessagesSentTooFast : RawSender::Error::kOk);
+ msgs_sent++;
+ if (done) {
+ Exit();
+ }
+ });
+
+ const auto kRepeatOffset = std::chrono::milliseconds(1);
+ const auto base_offset =
+ std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration()) -
+ (kRepeatOffset * (queue_size / 2));
+ event_loop->OnRun([&event_loop, &timer, &base_offset, &kRepeatOffset]() {
+ timer->Setup(event_loop->monotonic_now() + base_offset, kRepeatOffset);
+ });
+
+ Run();
+}
+
+// Tests that we are able to send messages successfully after sending messages
+// too fast and waiting while continuously attempting to send messages.
+// Also tests that SendFailureCounter is working correctly in this
+// situation
+TEST_P(AbstractEventLoopTest, SendingAfterSendingTooFast) {
+ auto event_loop = MakePrimary();
+
+ auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+ // We are sending messages at 1 kHz, so we will be sending too fast after
+ // queue_size (1600) ms. After this, keep sending messages, and exactly a
+ // channel storage duration (2s) after we send the first message we should
+ // be able to successfully send a message.
+
+ const monotonic_clock::duration kInterval = std::chrono::milliseconds(1);
+ const monotonic_clock::duration channel_storage_duration =
+ std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration());
+ const int queue_size = TestChannelQueueSize(event_loop.get());
+
+ int msgs_sent = 0;
+ SendFailureCounter counter;
+ auto start = monotonic_clock::min_time;
+
+ event_loop->AddPhasedLoop(
+ [&](int) {
+ const auto actual_err = SendTestMessage(sender);
+ const bool done_waiting = (start != monotonic_clock::min_time &&
+ sender.monotonic_sent_time() >=
+ (start + channel_storage_duration));
+ const auto expected_err =
+ (msgs_sent < queue_size || done_waiting
+ ? RawSender::Error::kOk
+ : RawSender::Error::kMessagesSentTooFast);
+
+ if (start == monotonic_clock::min_time) {
+ start = sender.monotonic_sent_time();
+ }
+
+ ASSERT_EQ(actual_err, expected_err);
+ counter.Count(actual_err);
+ msgs_sent++;
+
+ EXPECT_EQ(counter.failures(),
+ msgs_sent <= queue_size
+ ? 0
+ : (msgs_sent - queue_size) -
+ (actual_err == RawSender::Error::kOk ? 1 : 0));
+ EXPECT_EQ(counter.just_failed(), actual_err != RawSender::Error::kOk);
+
+ if (done_waiting) {
+ Exit();
+ }
+ },
+ kInterval);
+ Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when messages are sent too fast from senders in different loops
+TEST_P(AbstractEventLoopTest, SendingTooFastWithMultipleLoops) {
+ auto loop1 = MakePrimary();
+ auto loop2 = Make();
+
+ auto sender1 = loop1->MakeSender<TestMessage>("/test");
+ auto sender2 = loop2->MakeSender<TestMessage>("/test");
+
+ // Send queue_size messages split between the senders.
+ const int queue_size = TestChannelQueueSize(loop1.get());
+ for (int i = 0; i < queue_size / 2; i++) {
+ ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk);
+ ASSERT_EQ(SendTestMessage(sender2), RawSender::Error::kOk);
+ }
+
+ // Since queue_size messages have been sent, this should return an error
+ EXPECT_EQ(SendTestMessage(sender2), RawSender::Error::kMessagesSentTooFast);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index a9d280e..fad8dea 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -359,6 +359,12 @@
std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders);
+ // Helper function for testing the sent too fast check using a PhasedLoop with
+ // an interval that sends exactly at the frequency of the channel
+ void TestSentTooFastCheckEdgeCase(
+ const std::function<RawSender::Error(int, int)> expected_err,
+ const bool send_twice_at_end);
+
private:
const ::std::unique_ptr<EventLoopTestFactory> factory_;
@@ -367,6 +373,13 @@
using AbstractEventLoopDeathTest = AbstractEventLoopTest;
+// Returns the frequency of the /test TestMessage channel
+int TestChannelFrequency(EventLoop *event_loop);
+// Returns the queue size of the /test TestMessage channel
+int TestChannelQueueSize(EventLoop *event_loop);
+// Sends a test message with value 0 with the given sender
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender);
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index ed3d099..9159553 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -522,7 +522,10 @@
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_sender_(VerifySender(
- ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+ ipc_lib::LocklessQueueSender::Make(
+ lockless_queue_memory_.queue(),
+ std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration())),
channel)),
wake_upper_(lockless_queue_memory_.queue()) {}
@@ -557,17 +560,17 @@
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
- CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
- realtime_remote_time, remote_queue_index,
- source_boot_uuid, &monotonic_sent_time_,
- &realtime_sent_time_, &sent_queue_index_))
+ const auto result = lockless_queue_sender_.Send(
+ length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
+ source_boot_uuid, &monotonic_sent_time_, &realtime_sent_time_,
+ &sent_queue_index_);
+ CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
<< ": Somebody wrote outside the buffer of their message on channel "
<< configuration::CleanedChannelToString(channel());
wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
: 0);
- // TODO(Milind): check for messages sent too fast
- return Error::kOk;
+ return CheckLocklessQueueResult(result);
}
Error DoSend(const void *msg, size_t length,
@@ -579,16 +582,19 @@
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
- CHECK(lockless_queue_sender_.Send(
+ const auto result = lockless_queue_sender_.Send(
reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
realtime_remote_time, remote_queue_index, source_boot_uuid,
- &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
- << ": Somebody wrote outside the buffer of their message on channel "
+ &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
+
+ CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
+ << ": Somebody wrote outside the buffer of their message on "
+ "channel "
<< configuration::CleanedChannelToString(channel());
wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
: 0);
- // TODO(austin): Return an error if we send too fast.
- return RawSender::Error::kOk;
+
+ return CheckLocklessQueueResult(result);
}
absl::Span<char> GetSharedMemory() const {
@@ -605,6 +611,20 @@
return static_cast<const ShmEventLoop *>(event_loop());
}
+ RawSender::Error CheckLocklessQueueResult(
+ const ipc_lib::LocklessQueueSender::Result &result) {
+ switch (result) {
+ case ipc_lib::LocklessQueueSender::Result::GOOD:
+ return Error::kOk;
+ case ipc_lib::LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST:
+ return Error::kMessagesSentTooFast;
+ case ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE:
+ return Error::kInvalidRedzone;
+ }
+ LOG(FATAL) << "Unknown lockless queue sender result"
+ << static_cast<int>(result);
+ }
+
MMappedQueue lockless_queue_memory_;
ipc_lib::LocklessQueueSender lockless_queue_sender_;
ipc_lib::LocklessQueueWakeUpper wake_upper_;
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 01ca92b..f4107b0 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -116,6 +116,59 @@
ShmEventLoopTestFactory *factory() { return &factory_; }
+ // Helper functions for testing when a fetcher cannot fetch the next message
+ // because it was overwritten
+ void TestNextMessageNotAvailable(const bool skip_timing_report) {
+ auto loop1 = factory()->MakePrimary("loop1");
+ if (skip_timing_report) {
+ loop1->SkipTimingReport();
+ }
+ auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+ auto loop2 = factory()->Make("loop2");
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+ bool ran = false;
+ loop1->AddPhasedLoop(
+ [&sender](int) {
+ auto builder = sender.MakeBuilder();
+ TestMessage::Builder test_builder(*builder.fbb());
+ test_builder.add_value(0);
+ builder.CheckOk(builder.Send(test_builder.Finish()));
+ },
+ std::chrono::milliseconds(2));
+ loop1
+ ->AddTimer([this, &fetcher, &ran]() {
+ EXPECT_DEATH(fetcher.FetchNext(),
+ "The next message is no longer "
+ "available.*\"/test\".*\"aos\\.TestMessage\"");
+ factory()->Exit();
+ ran = true;
+ })
+ ->Setup(loop1->monotonic_now() + std::chrono::seconds(4));
+ factory()->Run();
+ EXPECT_TRUE(ran);
+ }
+ void TestNextMessageNotAvailableNoRun(const bool skip_timing_report) {
+ auto loop1 = factory()->MakePrimary("loop1");
+ if (skip_timing_report) {
+ loop1->SkipTimingReport();
+ }
+ auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+ auto loop2 = factory()->Make("loop2");
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+ time::PhasedLoop phased_loop(std::chrono::milliseconds(2),
+ loop2->monotonic_now());
+ for (int i = 0; i < 2000; ++i) {
+ auto builder = sender.MakeBuilder();
+ TestMessage::Builder test_builder(*builder.fbb());
+ test_builder.add_value(0);
+ builder.CheckOk(builder.Send(test_builder.Finish()));
+ phased_loop.SleepUntilNext();
+ }
+ EXPECT_DEATH(fetcher.FetchNext(),
+ "The next message is no longer "
+ "available.*\"/test\".*\"aos\\.TestMessage\"");
+ }
+
private:
ShmEventLoopTestFactory factory_;
};
@@ -351,89 +404,25 @@
// Tests that the next message not being available prints a helpful error in the
// normal case.
TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailable) {
- auto loop1 = factory()->MakePrimary("loop1");
- auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- auto loop2 = factory()->Make("loop2");
- auto sender = loop2->MakeSender<TestMessage>("/test");
- bool ran = false;
- loop1->OnRun([this, &sender, &fetcher, &ran]() {
- for (int i = 0; i < 2000; ++i) {
- auto builder = sender.MakeBuilder();
- TestMessage::Builder test_builder(*builder.fbb());
- test_builder.add_value(0);
- builder.CheckOk(builder.Send(test_builder.Finish()));
- }
- EXPECT_DEATH(fetcher.FetchNext(),
- "The next message is no longer "
- "available.*\"/test\".*\"aos\\.TestMessage\"");
- factory()->Exit();
- ran = true;
- });
- factory()->Run();
- EXPECT_TRUE(ran);
+ TestNextMessageNotAvailable(false);
}
// Tests that the next message not being available prints a helpful error with
// timing reports disabled.
TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoTimingReports) {
- auto loop1 = factory()->MakePrimary("loop1");
- loop1->SkipTimingReport();
- auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- auto loop2 = factory()->Make("loop2");
- auto sender = loop2->MakeSender<TestMessage>("/test");
- bool ran = false;
- loop1->OnRun([this, &sender, &fetcher, &ran]() {
- for (int i = 0; i < 2000; ++i) {
- auto builder = sender.MakeBuilder();
- TestMessage::Builder test_builder(*builder.fbb());
- test_builder.add_value(0);
- builder.CheckOk(builder.Send(test_builder.Finish()));
- }
- EXPECT_DEATH(fetcher.FetchNext(),
- "The next message is no longer "
- "available.*\"/test\".*\"aos\\.TestMessage\"");
- factory()->Exit();
- ran = true;
- });
- factory()->Run();
- EXPECT_TRUE(ran);
+ TestNextMessageNotAvailable(true);
}
// Tests that the next message not being available prints a helpful error even
// when Run is never called.
TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoRun) {
- auto loop1 = factory()->MakePrimary("loop1");
- auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- auto loop2 = factory()->Make("loop2");
- auto sender = loop2->MakeSender<TestMessage>("/test");
- for (int i = 0; i < 2000; ++i) {
- auto builder = sender.MakeBuilder();
- TestMessage::Builder test_builder(*builder.fbb());
- test_builder.add_value(0);
- builder.CheckOk(builder.Send(test_builder.Finish()));
- }
- EXPECT_DEATH(fetcher.FetchNext(),
- "The next message is no longer "
- "available.*\"/test\".*\"aos\\.TestMessage\"");
+ TestNextMessageNotAvailableNoRun(false);
}
// Tests that the next message not being available prints a helpful error even
// when Run is never called without timing reports.
TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoRunNoTimingReports) {
- auto loop1 = factory()->MakePrimary("loop1");
- loop1->SkipTimingReport();
- auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- auto loop2 = factory()->Make("loop2");
- auto sender = loop2->MakeSender<TestMessage>("/test");
- for (int i = 0; i < 2000; ++i) {
- auto builder = sender.MakeBuilder();
- TestMessage::Builder test_builder(*builder.fbb());
- test_builder.add_value(0);
- builder.CheckOk(builder.Send(test_builder.Finish()));
- }
- EXPECT_DEATH(fetcher.FetchNext(),
- "The next message is no longer "
- "available.*\"/test\".*\"aos\\.TestMessage\"");
+ TestNextMessageNotAvailableNoRun(true);
}
// TODO(austin): Test that missing a deadline with a timer recovers as expected.
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a1ef95d..69e6638 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -151,10 +151,12 @@
class SimulatedChannel {
public:
explicit SimulatedChannel(const Channel *channel,
- std::chrono::nanoseconds channel_storage_duration)
+ std::chrono::nanoseconds channel_storage_duration,
+ const EventScheduler *scheduler)
: channel_(channel),
channel_storage_duration_(channel_storage_duration),
- next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+ next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
+ scheduler_(scheduler) {
available_buffer_indices_.resize(number_buffers());
for (int i = 0; i < number_buffers(); ++i) {
available_buffer_indices_[i] = i;
@@ -296,6 +298,11 @@
int sender_count_ = 0;
std::vector<uint16_t> available_buffer_indices_;
+
+ const EventScheduler *scheduler_;
+
+ // Queue of all the message send times in the last channel_storage_duration_
+ std::queue<monotonic_clock::time_point> last_times_;
};
namespace {
@@ -782,14 +789,14 @@
const Channel *channel) {
auto it = channels_->find(SimpleChannel(channel));
if (it == channels_->end()) {
- it =
- channels_
- ->emplace(
- SimpleChannel(channel),
- std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
- channel, std::chrono::nanoseconds(
- configuration()->channel_storage_duration()))))
- .first;
+ it = channels_
+ ->emplace(SimpleChannel(channel),
+ std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+ channel,
+ std::chrono::nanoseconds(
+ configuration()->channel_storage_duration()),
+ scheduler_)))
+ .first;
}
return it->second.get();
}
@@ -930,7 +937,21 @@
std::optional<uint32_t> SimulatedChannel::Send(
std::shared_ptr<SimulatedMessage> message) {
- std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+ const auto now = scheduler_->monotonic_now();
+ // Remove times that are greater than or equal to a channel_storage_duration_
+ // ago
+ while (!last_times_.empty() &&
+ (now - last_times_.front() >= channel_storage_duration_)) {
+ last_times_.pop();
+ }
+
+ // Check that we are not sending messages too fast
+ if (static_cast<int>(last_times_.size()) >= queue_size()) {
+ return std::nullopt;
+ }
+
+ const std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+ last_times_.push(now);
message->context.queue_index = *queue_index;
// Points to the actual data depending on the size set in context. Data may
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 09202ff..a83243d 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1,6 +1,7 @@
#include "aos/events/simulated_event_loop.h"
#include <chrono>
+#include <functional>
#include <string_view>
#include "aos/events/event_loop_param_test.h"
@@ -139,13 +140,13 @@
};
class FunctionEvent : public EventScheduler::Event {
- public:
- FunctionEvent(std::function<void()> fn) : fn_(fn) {}
+ public:
+ FunctionEvent(std::function<void()> fn) : fn_(fn) {}
- void Handle() noexcept override { fn_(); }
+ void Handle() noexcept override { fn_(); }
- private:
- std::function<void()> fn_;
+ private:
+ std::function<void()> fn_;
};
// Test that creating an event and running the scheduler runs the event.
@@ -204,14 +205,6 @@
EXPECT_EQ(counter, 1);
}
-void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
- aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
- TestMessage::Builder test_message_builder =
- builder.MakeBuilder<TestMessage>();
- test_message_builder.add_value(value);
- ASSERT_EQ(builder.Send(test_message_builder.Finish()), RawSender::Error::kOk);
-}
-
// Test that sending a message after running gets properly notified.
TEST(SimulatedEventLoopTest, SendAfterRunFor) {
SimulatedEventLoopTestFactory factory;
@@ -223,7 +216,7 @@
simulated_event_loop_factory.MakeEventLoop("ping");
aos::Sender<TestMessage> test_message_sender =
ping_event_loop->MakeSender<TestMessage>("/test");
- SendTestMessage(&test_message_sender, 1);
+ ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
std::unique_ptr<EventLoop> pong1_event_loop =
simulated_event_loop_factory.MakeEventLoop("pong");
@@ -243,7 +236,7 @@
// Pauses in the middle don't count though, so this should be counted.
// But, the fresh watcher shouldn't pick it up yet.
- SendTestMessage(&test_message_sender, 2);
+ ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
EXPECT_EQ(test_message_counter1.count(), 0u);
EXPECT_EQ(test_message_counter2.count(), 0u);
@@ -253,6 +246,63 @@
EXPECT_EQ(test_message_counter2.count(), 0u);
}
+void TestSentTooFastCheckEdgeCase(
+ const std::function<RawSender::Error(int, int)> expected_err,
+ const bool send_twice_at_end) {
+ SimulatedEventLoopTestFactory factory;
+
+ auto event_loop = factory.MakePrimary("primary");
+
+ auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+ const int queue_size = TestChannelQueueSize(event_loop.get());
+ int msgs_sent = 0;
+ event_loop->AddPhasedLoop(
+ [&](int) {
+ EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
+ msgs_sent++;
+
+ // If send_twice_at_end, send the last two messages (message
+ // queue_size and queue_size + 1) in the same iteration, meaning that
+ // we would be sending very slightly too fast. Otherwise, we will send
+ // message queue_size + 1 in the next iteration and we will continue
+ // to be sending exactly at the channel frequency.
+ if (send_twice_at_end && (msgs_sent == queue_size)) {
+ EXPECT_EQ(SendTestMessage(sender),
+ expected_err(msgs_sent, queue_size));
+ msgs_sent++;
+ }
+
+ if (msgs_sent > queue_size) {
+ factory.Exit();
+ }
+ },
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(
+ 1.0 / TestChannelFrequency(event_loop.get()))));
+
+ factory.Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is not returned
+// when messages are sent at the exact frequency of the channel.
+TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
+ TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
+ false);
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when sending exactly one more message than allowed in a channel storage
+// duration.
+TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
+ TestSentTooFastCheckEdgeCase(
+ [](const int msgs_sent, const int queue_size) {
+ return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
+ : RawSender::Error::kOk);
+ },
+ true);
+}
+
// Test that creating an event loop while running dies.
TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
SimulatedEventLoopTestFactory factory;
@@ -1338,7 +1388,6 @@
EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
<< " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
-
EXPECT_EQ(pi1_pong_counter.count(), 601u);
EXPECT_EQ(pi2_pong_counter.count(), 601u);
@@ -1707,18 +1756,17 @@
});
// Confirm that reboot changes the UUID.
- pi2->OnShutdown(
- [&expected_boot_uuid, &boot_number, &expected_connection_time, pi1, pi2,
- pi2_boot1]() {
- expected_boot_uuid = pi2_boot1;
- ++boot_number;
- LOG(INFO) << "OnShutdown triggered for pi2";
- pi2->OnStartup(
- [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
- EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
- expected_connection_time = pi1->monotonic_now();
- });
- });
+ pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
+ pi1, pi2, pi2_boot1]() {
+ expected_boot_uuid = pi2_boot1;
+ ++boot_number;
+ LOG(INFO) << "OnShutdown triggered for pi2";
+ pi2->OnStartup(
+ [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
+ EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
+ expected_connection_time = pi1->monotonic_now();
+ });
+ });
// Let a couple of ServerStatistics messages show up before rebooting.
factory.RunFor(chrono::milliseconds(2002));