Add a sent too fast check for simulation and shm
Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.
Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/.bazelrc b/.bazelrc
index 3a09925..d231812 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -77,7 +77,8 @@
# Show paths to a few more than just 1 target.
build --show_result 5
# Dump the output of the failing test to stdout.
-test --test_output=errors
+# Keep the default test timeouts except make 'eternal'=4500 secs
+test --test_output=errors --test_timeout=-1,-1,-1,4500
build --sandbox_base=/dev/shm/
build --experimental_multi_threaded_digest
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index c0d67b0..7595167 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -57,6 +57,9 @@
// Type name of the flatbuffer.
type:string (id: 1);
// Max frequency in messages/sec of the data published on this channel.
+ // The maximum number of messages that can be sent
+ // in a channel_storage_duration is
+ // frequency * channel_storage_duration (in seconds).
frequency:int = 100 (id: 2);
// Max size of the data being published. (This will hopefully be
// automatically computed in the future.)
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 67c4472..08b0064 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -26,6 +26,8 @@
return "RawSender::Error::kOk";
case RawSender::Error::kMessagesSentTooFast:
return "RawSender::Error::kMessagesSentTooFast";
+ case RawSender::Error::kInvalidRedzone:
+ return "RawSender::Error::kInvalidRedzone";
}
LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
}
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index e810e3f..c2498b3 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -139,13 +139,16 @@
public:
using SharedSpan = std::shared_ptr<const absl::Span<const uint8_t>>;
- enum class [[nodiscard]] Error{
+ enum class [[nodiscard]] Error {
// Represents success and no error
kOk,
// Error for messages on channels being sent faster than their
// frequency and channel storage duration allow
- kMessagesSentTooFast};
+ kMessagesSentTooFast,
+ // Access to Redzone was attempted in Sender Queue
+ kInvalidRedzone
+ };
RawSender(EventLoop *event_loop, const Channel *channel);
RawSender(const RawSender &) = delete;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index fca25d4..18f5f96 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -4,6 +4,7 @@
#include <unordered_map>
#include <unordered_set>
+#include "aos/events/test_message_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/logging/log_message_generated.h"
#include "aos/logging/logging.h"
@@ -2567,5 +2568,148 @@
"May only send the buffer detached from this Sender");
}
+int TestChannelFrequency(EventLoop *event_loop) {
+ return event_loop->GetChannel<TestMessage>("/test")->frequency();
+}
+
+int TestChannelQueueSize(EventLoop *event_loop) {
+ const int frequency = TestChannelFrequency(event_loop);
+ const auto channel_storage_duration = std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration());
+ const int queue_size =
+ frequency * std::chrono::duration_cast<std::chrono::duration<double>>(
+ channel_storage_duration)
+ .count();
+
+ return queue_size;
+}
+
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender) {
+ aos::Sender<TestMessage>::Builder builder = sender.MakeBuilder();
+ TestMessage::Builder test_message_builder =
+ builder.MakeBuilder<TestMessage>();
+ test_message_builder.add_value(0);
+ return builder.Send(test_message_builder.Finish());
+}
+
+// Test that sending messages too fast returns
+// RawSender::Error::kMessagesSentTooFast.
+TEST_P(AbstractEventLoopTest, SendingMessagesTooFast) {
+ auto event_loop = MakePrimary();
+
+ auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+ // Send one message in the beginning, then wait until the
+ // channel_storage_duration is almost done and start sending messages rapidly,
+ // having some come in the next chanel_storage_duration. The queue_size is
+ // 1600, so the 1601st message will be the last valid one (the initial message
+ // having being sent more than a channel_storage_duration ago), and trying to
+ // send the 1602nd message should return
+ // RawSender::Error::kMessagesSentTooFast.
+ EXPECT_EQ(SendTestMessage(sender), RawSender::Error::kOk);
+ int msgs_sent = 1;
+ const int queue_size = TestChannelQueueSize(event_loop.get());
+
+ const auto timer = event_loop->AddTimer([&]() {
+ const bool done = (msgs_sent == queue_size + 1);
+ ASSERT_EQ(
+ SendTestMessage(sender),
+ done ? RawSender::Error::kMessagesSentTooFast : RawSender::Error::kOk);
+ msgs_sent++;
+ if (done) {
+ Exit();
+ }
+ });
+
+ const auto kRepeatOffset = std::chrono::milliseconds(1);
+ const auto base_offset =
+ std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration()) -
+ (kRepeatOffset * (queue_size / 2));
+ event_loop->OnRun([&event_loop, &timer, &base_offset, &kRepeatOffset]() {
+ timer->Setup(event_loop->monotonic_now() + base_offset, kRepeatOffset);
+ });
+
+ Run();
+}
+
+// Tests that we are able to send messages successfully after sending messages
+// too fast and waiting while continuously attempting to send messages.
+// Also tests that SendFailureCounter is working correctly in this
+// situation
+TEST_P(AbstractEventLoopTest, SendingAfterSendingTooFast) {
+ auto event_loop = MakePrimary();
+
+ auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+ // We are sending messages at 1 kHz, so we will be sending too fast after
+ // queue_size (1600) ms. After this, keep sending messages, and exactly a
+ // channel storage duration (2s) after we send the first message we should
+ // be able to successfully send a message.
+
+ const monotonic_clock::duration kInterval = std::chrono::milliseconds(1);
+ const monotonic_clock::duration channel_storage_duration =
+ std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration());
+ const int queue_size = TestChannelQueueSize(event_loop.get());
+
+ int msgs_sent = 0;
+ SendFailureCounter counter;
+ auto start = monotonic_clock::min_time;
+
+ event_loop->AddPhasedLoop(
+ [&](int) {
+ const auto actual_err = SendTestMessage(sender);
+ const bool done_waiting = (start != monotonic_clock::min_time &&
+ sender.monotonic_sent_time() >=
+ (start + channel_storage_duration));
+ const auto expected_err =
+ (msgs_sent < queue_size || done_waiting
+ ? RawSender::Error::kOk
+ : RawSender::Error::kMessagesSentTooFast);
+
+ if (start == monotonic_clock::min_time) {
+ start = sender.monotonic_sent_time();
+ }
+
+ ASSERT_EQ(actual_err, expected_err);
+ counter.Count(actual_err);
+ msgs_sent++;
+
+ EXPECT_EQ(counter.failures(),
+ msgs_sent <= queue_size
+ ? 0
+ : (msgs_sent - queue_size) -
+ (actual_err == RawSender::Error::kOk ? 1 : 0));
+ EXPECT_EQ(counter.just_failed(), actual_err != RawSender::Error::kOk);
+
+ if (done_waiting) {
+ Exit();
+ }
+ },
+ kInterval);
+ Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when messages are sent too fast from senders in different loops
+TEST_P(AbstractEventLoopTest, SendingTooFastWithMultipleLoops) {
+ auto loop1 = MakePrimary();
+ auto loop2 = Make();
+
+ auto sender1 = loop1->MakeSender<TestMessage>("/test");
+ auto sender2 = loop2->MakeSender<TestMessage>("/test");
+
+ // Send queue_size messages split between the senders.
+ const int queue_size = TestChannelQueueSize(loop1.get());
+ for (int i = 0; i < queue_size / 2; i++) {
+ ASSERT_EQ(SendTestMessage(sender1), RawSender::Error::kOk);
+ ASSERT_EQ(SendTestMessage(sender2), RawSender::Error::kOk);
+ }
+
+ // Since queue_size messages have been sent, this should return an error
+ EXPECT_EQ(SendTestMessage(sender2), RawSender::Error::kMessagesSentTooFast);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index a9d280e..fad8dea 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -359,6 +359,12 @@
std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders);
+ // Helper function for testing the sent too fast check using a PhasedLoop with
+ // an interval that sends exactly at the frequency of the channel
+ void TestSentTooFastCheckEdgeCase(
+ const std::function<RawSender::Error(int, int)> expected_err,
+ const bool send_twice_at_end);
+
private:
const ::std::unique_ptr<EventLoopTestFactory> factory_;
@@ -367,6 +373,13 @@
using AbstractEventLoopDeathTest = AbstractEventLoopTest;
+// Returns the frequency of the /test TestMessage channel
+int TestChannelFrequency(EventLoop *event_loop);
+// Returns the queue size of the /test TestMessage channel
+int TestChannelQueueSize(EventLoop *event_loop);
+// Sends a test message with value 0 with the given sender
+RawSender::Error SendTestMessage(aos::Sender<TestMessage> &sender);
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index ed3d099..9159553 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -522,7 +522,10 @@
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_sender_(VerifySender(
- ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+ ipc_lib::LocklessQueueSender::Make(
+ lockless_queue_memory_.queue(),
+ std::chrono::nanoseconds(
+ event_loop->configuration()->channel_storage_duration())),
channel)),
wake_upper_(lockless_queue_memory_.queue()) {}
@@ -557,17 +560,17 @@
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
- CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
- realtime_remote_time, remote_queue_index,
- source_boot_uuid, &monotonic_sent_time_,
- &realtime_sent_time_, &sent_queue_index_))
+ const auto result = lockless_queue_sender_.Send(
+ length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
+ source_boot_uuid, &monotonic_sent_time_, &realtime_sent_time_,
+ &sent_queue_index_);
+ CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
<< ": Somebody wrote outside the buffer of their message on channel "
<< configuration::CleanedChannelToString(channel());
wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
: 0);
- // TODO(Milind): check for messages sent too fast
- return Error::kOk;
+ return CheckLocklessQueueResult(result);
}
Error DoSend(const void *msg, size_t length,
@@ -579,16 +582,19 @@
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
- CHECK(lockless_queue_sender_.Send(
+ const auto result = lockless_queue_sender_.Send(
reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
realtime_remote_time, remote_queue_index, source_boot_uuid,
- &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
- << ": Somebody wrote outside the buffer of their message on channel "
+ &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
+
+ CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
+ << ": Somebody wrote outside the buffer of their message on "
+ "channel "
<< configuration::CleanedChannelToString(channel());
wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
: 0);
- // TODO(austin): Return an error if we send too fast.
- return RawSender::Error::kOk;
+
+ return CheckLocklessQueueResult(result);
}
absl::Span<char> GetSharedMemory() const {
@@ -605,6 +611,20 @@
return static_cast<const ShmEventLoop *>(event_loop());
}
+ RawSender::Error CheckLocklessQueueResult(
+ const ipc_lib::LocklessQueueSender::Result &result) {
+ switch (result) {
+ case ipc_lib::LocklessQueueSender::Result::GOOD:
+ return Error::kOk;
+ case ipc_lib::LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST:
+ return Error::kMessagesSentTooFast;
+ case ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE:
+ return Error::kInvalidRedzone;
+ }
+ LOG(FATAL) << "Unknown lockless queue sender result"
+ << static_cast<int>(result);
+ }
+
MMappedQueue lockless_queue_memory_;
ipc_lib::LocklessQueueSender lockless_queue_sender_;
ipc_lib::LocklessQueueWakeUpper wake_upper_;
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 01ca92b..f4107b0 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -116,6 +116,59 @@
ShmEventLoopTestFactory *factory() { return &factory_; }
+ // Helper functions for testing when a fetcher cannot fetch the next message
+ // because it was overwritten
+ void TestNextMessageNotAvailable(const bool skip_timing_report) {
+ auto loop1 = factory()->MakePrimary("loop1");
+ if (skip_timing_report) {
+ loop1->SkipTimingReport();
+ }
+ auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+ auto loop2 = factory()->Make("loop2");
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+ bool ran = false;
+ loop1->AddPhasedLoop(
+ [&sender](int) {
+ auto builder = sender.MakeBuilder();
+ TestMessage::Builder test_builder(*builder.fbb());
+ test_builder.add_value(0);
+ builder.CheckOk(builder.Send(test_builder.Finish()));
+ },
+ std::chrono::milliseconds(2));
+ loop1
+ ->AddTimer([this, &fetcher, &ran]() {
+ EXPECT_DEATH(fetcher.FetchNext(),
+ "The next message is no longer "
+ "available.*\"/test\".*\"aos\\.TestMessage\"");
+ factory()->Exit();
+ ran = true;
+ })
+ ->Setup(loop1->monotonic_now() + std::chrono::seconds(4));
+ factory()->Run();
+ EXPECT_TRUE(ran);
+ }
+ void TestNextMessageNotAvailableNoRun(const bool skip_timing_report) {
+ auto loop1 = factory()->MakePrimary("loop1");
+ if (skip_timing_report) {
+ loop1->SkipTimingReport();
+ }
+ auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+ auto loop2 = factory()->Make("loop2");
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+ time::PhasedLoop phased_loop(std::chrono::milliseconds(2),
+ loop2->monotonic_now());
+ for (int i = 0; i < 2000; ++i) {
+ auto builder = sender.MakeBuilder();
+ TestMessage::Builder test_builder(*builder.fbb());
+ test_builder.add_value(0);
+ builder.CheckOk(builder.Send(test_builder.Finish()));
+ phased_loop.SleepUntilNext();
+ }
+ EXPECT_DEATH(fetcher.FetchNext(),
+ "The next message is no longer "
+ "available.*\"/test\".*\"aos\\.TestMessage\"");
+ }
+
private:
ShmEventLoopTestFactory factory_;
};
@@ -351,89 +404,25 @@
// Tests that the next message not being available prints a helpful error in the
// normal case.
TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailable) {
- auto loop1 = factory()->MakePrimary("loop1");
- auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- auto loop2 = factory()->Make("loop2");
- auto sender = loop2->MakeSender<TestMessage>("/test");
- bool ran = false;
- loop1->OnRun([this, &sender, &fetcher, &ran]() {
- for (int i = 0; i < 2000; ++i) {
- auto builder = sender.MakeBuilder();
- TestMessage::Builder test_builder(*builder.fbb());
- test_builder.add_value(0);
- builder.CheckOk(builder.Send(test_builder.Finish()));
- }
- EXPECT_DEATH(fetcher.FetchNext(),
- "The next message is no longer "
- "available.*\"/test\".*\"aos\\.TestMessage\"");
- factory()->Exit();
- ran = true;
- });
- factory()->Run();
- EXPECT_TRUE(ran);
+ TestNextMessageNotAvailable(false);
}
// Tests that the next message not being available prints a helpful error with
// timing reports disabled.
TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoTimingReports) {
- auto loop1 = factory()->MakePrimary("loop1");
- loop1->SkipTimingReport();
- auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- auto loop2 = factory()->Make("loop2");
- auto sender = loop2->MakeSender<TestMessage>("/test");
- bool ran = false;
- loop1->OnRun([this, &sender, &fetcher, &ran]() {
- for (int i = 0; i < 2000; ++i) {
- auto builder = sender.MakeBuilder();
- TestMessage::Builder test_builder(*builder.fbb());
- test_builder.add_value(0);
- builder.CheckOk(builder.Send(test_builder.Finish()));
- }
- EXPECT_DEATH(fetcher.FetchNext(),
- "The next message is no longer "
- "available.*\"/test\".*\"aos\\.TestMessage\"");
- factory()->Exit();
- ran = true;
- });
- factory()->Run();
- EXPECT_TRUE(ran);
+ TestNextMessageNotAvailable(true);
}
// Tests that the next message not being available prints a helpful error even
// when Run is never called.
TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoRun) {
- auto loop1 = factory()->MakePrimary("loop1");
- auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- auto loop2 = factory()->Make("loop2");
- auto sender = loop2->MakeSender<TestMessage>("/test");
- for (int i = 0; i < 2000; ++i) {
- auto builder = sender.MakeBuilder();
- TestMessage::Builder test_builder(*builder.fbb());
- test_builder.add_value(0);
- builder.CheckOk(builder.Send(test_builder.Finish()));
- }
- EXPECT_DEATH(fetcher.FetchNext(),
- "The next message is no longer "
- "available.*\"/test\".*\"aos\\.TestMessage\"");
+ TestNextMessageNotAvailableNoRun(false);
}
// Tests that the next message not being available prints a helpful error even
// when Run is never called without timing reports.
TEST_P(ShmEventLoopDeathTest, NextMessageNotAvailableNoRunNoTimingReports) {
- auto loop1 = factory()->MakePrimary("loop1");
- loop1->SkipTimingReport();
- auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- auto loop2 = factory()->Make("loop2");
- auto sender = loop2->MakeSender<TestMessage>("/test");
- for (int i = 0; i < 2000; ++i) {
- auto builder = sender.MakeBuilder();
- TestMessage::Builder test_builder(*builder.fbb());
- test_builder.add_value(0);
- builder.CheckOk(builder.Send(test_builder.Finish()));
- }
- EXPECT_DEATH(fetcher.FetchNext(),
- "The next message is no longer "
- "available.*\"/test\".*\"aos\\.TestMessage\"");
+ TestNextMessageNotAvailableNoRun(true);
}
// TODO(austin): Test that missing a deadline with a timer recovers as expected.
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a1ef95d..69e6638 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -151,10 +151,12 @@
class SimulatedChannel {
public:
explicit SimulatedChannel(const Channel *channel,
- std::chrono::nanoseconds channel_storage_duration)
+ std::chrono::nanoseconds channel_storage_duration,
+ const EventScheduler *scheduler)
: channel_(channel),
channel_storage_duration_(channel_storage_duration),
- next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+ next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
+ scheduler_(scheduler) {
available_buffer_indices_.resize(number_buffers());
for (int i = 0; i < number_buffers(); ++i) {
available_buffer_indices_[i] = i;
@@ -296,6 +298,11 @@
int sender_count_ = 0;
std::vector<uint16_t> available_buffer_indices_;
+
+ const EventScheduler *scheduler_;
+
+ // Queue of all the message send times in the last channel_storage_duration_
+ std::queue<monotonic_clock::time_point> last_times_;
};
namespace {
@@ -782,14 +789,14 @@
const Channel *channel) {
auto it = channels_->find(SimpleChannel(channel));
if (it == channels_->end()) {
- it =
- channels_
- ->emplace(
- SimpleChannel(channel),
- std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
- channel, std::chrono::nanoseconds(
- configuration()->channel_storage_duration()))))
- .first;
+ it = channels_
+ ->emplace(SimpleChannel(channel),
+ std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+ channel,
+ std::chrono::nanoseconds(
+ configuration()->channel_storage_duration()),
+ scheduler_)))
+ .first;
}
return it->second.get();
}
@@ -930,7 +937,21 @@
std::optional<uint32_t> SimulatedChannel::Send(
std::shared_ptr<SimulatedMessage> message) {
- std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+ const auto now = scheduler_->monotonic_now();
+ // Remove times that are greater than or equal to a channel_storage_duration_
+ // ago
+ while (!last_times_.empty() &&
+ (now - last_times_.front() >= channel_storage_duration_)) {
+ last_times_.pop();
+ }
+
+ // Check that we are not sending messages too fast
+ if (static_cast<int>(last_times_.size()) >= queue_size()) {
+ return std::nullopt;
+ }
+
+ const std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+ last_times_.push(now);
message->context.queue_index = *queue_index;
// Points to the actual data depending on the size set in context. Data may
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 09202ff..a83243d 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1,6 +1,7 @@
#include "aos/events/simulated_event_loop.h"
#include <chrono>
+#include <functional>
#include <string_view>
#include "aos/events/event_loop_param_test.h"
@@ -139,13 +140,13 @@
};
class FunctionEvent : public EventScheduler::Event {
- public:
- FunctionEvent(std::function<void()> fn) : fn_(fn) {}
+ public:
+ FunctionEvent(std::function<void()> fn) : fn_(fn) {}
- void Handle() noexcept override { fn_(); }
+ void Handle() noexcept override { fn_(); }
- private:
- std::function<void()> fn_;
+ private:
+ std::function<void()> fn_;
};
// Test that creating an event and running the scheduler runs the event.
@@ -204,14 +205,6 @@
EXPECT_EQ(counter, 1);
}
-void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
- aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
- TestMessage::Builder test_message_builder =
- builder.MakeBuilder<TestMessage>();
- test_message_builder.add_value(value);
- ASSERT_EQ(builder.Send(test_message_builder.Finish()), RawSender::Error::kOk);
-}
-
// Test that sending a message after running gets properly notified.
TEST(SimulatedEventLoopTest, SendAfterRunFor) {
SimulatedEventLoopTestFactory factory;
@@ -223,7 +216,7 @@
simulated_event_loop_factory.MakeEventLoop("ping");
aos::Sender<TestMessage> test_message_sender =
ping_event_loop->MakeSender<TestMessage>("/test");
- SendTestMessage(&test_message_sender, 1);
+ ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
std::unique_ptr<EventLoop> pong1_event_loop =
simulated_event_loop_factory.MakeEventLoop("pong");
@@ -243,7 +236,7 @@
// Pauses in the middle don't count though, so this should be counted.
// But, the fresh watcher shouldn't pick it up yet.
- SendTestMessage(&test_message_sender, 2);
+ ASSERT_EQ(SendTestMessage(test_message_sender), RawSender::Error::kOk);
EXPECT_EQ(test_message_counter1.count(), 0u);
EXPECT_EQ(test_message_counter2.count(), 0u);
@@ -253,6 +246,63 @@
EXPECT_EQ(test_message_counter2.count(), 0u);
}
+void TestSentTooFastCheckEdgeCase(
+ const std::function<RawSender::Error(int, int)> expected_err,
+ const bool send_twice_at_end) {
+ SimulatedEventLoopTestFactory factory;
+
+ auto event_loop = factory.MakePrimary("primary");
+
+ auto sender = event_loop->MakeSender<TestMessage>("/test");
+
+ const int queue_size = TestChannelQueueSize(event_loop.get());
+ int msgs_sent = 0;
+ event_loop->AddPhasedLoop(
+ [&](int) {
+ EXPECT_EQ(SendTestMessage(sender), expected_err(msgs_sent, queue_size));
+ msgs_sent++;
+
+ // If send_twice_at_end, send the last two messages (message
+ // queue_size and queue_size + 1) in the same iteration, meaning that
+ // we would be sending very slightly too fast. Otherwise, we will send
+ // message queue_size + 1 in the next iteration and we will continue
+ // to be sending exactly at the channel frequency.
+ if (send_twice_at_end && (msgs_sent == queue_size)) {
+ EXPECT_EQ(SendTestMessage(sender),
+ expected_err(msgs_sent, queue_size));
+ msgs_sent++;
+ }
+
+ if (msgs_sent > queue_size) {
+ factory.Exit();
+ }
+ },
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(
+ 1.0 / TestChannelFrequency(event_loop.get()))));
+
+ factory.Run();
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is not returned
+// when messages are sent at the exact frequency of the channel.
+TEST(SimulatedEventLoopTest, SendingAtExactlyChannelFrequency) {
+ TestSentTooFastCheckEdgeCase([](int, int) { return RawSender::Error::kOk; },
+ false);
+}
+
+// Tests that RawSender::Error::kMessagesSentTooFast is returned
+// when sending exactly one more message than allowed in a channel storage
+// duration.
+TEST(SimulatedEventLoopTest, SendingSlightlyTooFast) {
+ TestSentTooFastCheckEdgeCase(
+ [](const int msgs_sent, const int queue_size) {
+ return (msgs_sent == queue_size ? RawSender::Error::kMessagesSentTooFast
+ : RawSender::Error::kOk);
+ },
+ true);
+}
+
// Test that creating an event loop while running dies.
TEST(SimulatedEventLoopDeathTest, MakeEventLoopWhileRunning) {
SimulatedEventLoopTestFactory factory;
@@ -1338,7 +1388,6 @@
EXPECT_EQ(ConnectedCount(pi3_client_statistics_fetcher.get(), "pi1"), 2u)
<< " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
-
EXPECT_EQ(pi1_pong_counter.count(), 601u);
EXPECT_EQ(pi2_pong_counter.count(), 601u);
@@ -1707,18 +1756,17 @@
});
// Confirm that reboot changes the UUID.
- pi2->OnShutdown(
- [&expected_boot_uuid, &boot_number, &expected_connection_time, pi1, pi2,
- pi2_boot1]() {
- expected_boot_uuid = pi2_boot1;
- ++boot_number;
- LOG(INFO) << "OnShutdown triggered for pi2";
- pi2->OnStartup(
- [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
- EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
- expected_connection_time = pi1->monotonic_now();
- });
- });
+ pi2->OnShutdown([&expected_boot_uuid, &boot_number, &expected_connection_time,
+ pi1, pi2, pi2_boot1]() {
+ expected_boot_uuid = pi2_boot1;
+ ++boot_number;
+ LOG(INFO) << "OnShutdown triggered for pi2";
+ pi2->OnStartup(
+ [&expected_boot_uuid, &expected_connection_time, pi1, pi2]() {
+ EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
+ expected_connection_time = pi1->monotonic_now();
+ });
+ });
// Let a couple of ServerStatistics messages show up before rebooting.
factory.RunFor(chrono::milliseconds(2002));
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index c124fe0..ef6c4f3 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -216,6 +216,7 @@
"//aos/events:epoll",
"//aos/testing:googletest",
"//aos/testing:prevent_exit",
+ "//aos/util:phased_loop",
],
)
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index fe86f6c..5f12423 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -4,6 +4,7 @@
#include <sys/types.h>
#include <syscall.h>
#include <unistd.h>
+
#include <algorithm>
#include <iomanip>
#include <iostream>
@@ -503,7 +504,7 @@
bool bad = false;
- for (size_t i = 0; i < redzone.size(); ++i) {
+ for (size_t i = 0; i < redzone.size() && !bad; ++i) {
if (memcmp(&redzone[i], &redzone_value, 1)) {
bad = true;
}
@@ -603,6 +604,7 @@
Message *const message =
memory->GetMessage(Index(QueueIndex::Zero(memory->queue_size()), i));
message->header.queue_index.Invalidate();
+ message->header.monotonic_sent_time = monotonic_clock::min_time;
FillRedzone(memory, message->PreRedzone(memory->message_data_size()));
FillRedzone(memory, message->PostRedzone(memory->message_data_size(),
memory->message_size()));
@@ -831,8 +833,16 @@
return count;
}
-LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
- : memory_(memory) {
+std::ostream &operator<<(std::ostream &os,
+ const LocklessQueueSender::Result r) {
+ os << static_cast<int>(r);
+ return os;
+}
+
+LocklessQueueSender::LocklessQueueSender(
+ LocklessQueueMemory *memory,
+ monotonic_clock::duration channel_storage_duration)
+ : memory_(memory), channel_storage_duration_(channel_storage_duration) {
GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Since we already have the lock, go ahead and try cleaning up.
@@ -877,9 +887,9 @@
}
std::optional<LocklessQueueSender> LocklessQueueSender::Make(
- LocklessQueue queue) {
+ LocklessQueue queue, monotonic_clock::duration channel_storage_duration) {
queue.Initialize();
- LocklessQueueSender result(queue.memory());
+ LocklessQueueSender result(queue.memory(), channel_storage_duration);
if (result.sender_index_ != -1) {
return std::move(result);
} else {
@@ -904,7 +914,7 @@
return message->data(memory_->message_data_size());
}
-bool LocklessQueueSender::Send(
+LocklessQueueSender::Result LocklessQueueSender::Send(
const char *data, size_t length,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
@@ -921,7 +931,7 @@
realtime_sent_time, queue_index);
}
-bool LocklessQueueSender::Send(
+LocklessQueueSender::Result LocklessQueueSender::Send(
size_t length, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid,
@@ -936,7 +946,7 @@
const Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *const message = memory_->GetMessage(scratch_index);
if (CheckBothRedzones(memory_, message)) {
- return false;
+ return Result::INVALID_REDZONE;
}
// We should have invalidated this when we first got the buffer. Verify that
@@ -995,11 +1005,14 @@
// This is just a best-effort check to skip reading the clocks if possible.
// If this fails, then the compare-exchange below definitely would, so we
// can bail out now.
+ const Message *message_to_replace = memory_->GetMessage(to_replace);
+ bool is_previous_index_valid = false;
{
const QueueIndex previous_index =
- memory_->GetMessage(to_replace)
- ->header.queue_index.RelaxedLoad(queue_size);
- if (previous_index != decremented_queue_index && previous_index.valid()) {
+ message_to_replace->header.queue_index.RelaxedLoad(queue_size);
+ is_previous_index_valid = previous_index.valid();
+ if (previous_index != decremented_queue_index &&
+ is_previous_index_valid) {
// Retry.
VLOG(3) << "Something fishy happened, queue index doesn't match. "
"Retrying. Previous index was "
@@ -1011,6 +1024,7 @@
message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
message->header.realtime_sent_time = ::aos::realtime_clock::now();
+
if (monotonic_sent_time != nullptr) {
*monotonic_sent_time = message->header.monotonic_sent_time;
}
@@ -1021,8 +1035,48 @@
*queue_index = next_queue_index.index();
}
+ const auto to_replace_monotonic_sent_time =
+ message_to_replace->header.monotonic_sent_time;
+
+ // If we are overwriting a message sent in the last
+ // channel_storage_duration_, that means that we would be sending more than
+ // queue_size messages and would therefore be sending too fast. If the
+ // previous index is not valid then the message hasn't been filled out yet
+ // so we aren't sending too fast. And, if it is not less than the sent time
+ // of the message that we are going to write, someone else beat us and the
+ // compare and exchange below will fail.
+ if (is_previous_index_valid &&
+ (to_replace_monotonic_sent_time <
+ message->header.monotonic_sent_time) &&
+ (message->header.monotonic_sent_time - to_replace_monotonic_sent_time <
+ channel_storage_duration_)) {
+ // There is a possibility that another context beat us to writing out the
+ // message in the queue, but we beat that context to acquiring the sent
+ // time. In this case our sent time is *greater than* the other context's
+ // sent time. Therefore, we can check if we got beat filling out this
+ // message *after* doing the above check to determine if we hit this edge
+ // case. Otherwise, messages are being sent too fast.
+ const QueueIndex previous_index =
+ message_to_replace->header.queue_index.Load(queue_size);
+ if (previous_index != decremented_queue_index && previous_index.valid()) {
+ VLOG(3) << "Got beat during check for messages being sent too fast"
+ "Retrying.";
+ continue;
+ } else {
+ VLOG(3) << "Messages sent too fast. Returning. Attempted index: "
+ << decremented_queue_index.index()
+ << " message sent time: " << message->header.monotonic_sent_time
+ << " message to replace sent time: "
+ << to_replace_monotonic_sent_time;
+ // Since we are not using the message obtained from scratch_index
+ // and we are not retrying, we need to invalidate its queue_index.
+ message->header.queue_index.Invalidate();
+ return Result::MESSAGES_SENT_TOO_FAST;
+ }
+ }
+
// Before we are fully done filling out the message, update the Sender state
- // with the new index to write. This re-uses the barrier for the
+ // with the new index to write. This re-uses the barrier for the
// queue_index store.
const Index index_to_write(next_queue_index, scratch_index.message_index());
@@ -1085,7 +1139,7 @@
// If anybody is looking at this message (they shouldn't be), then try telling
// them about it (best-effort).
memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
- return true;
+ return Result::GOOD;
}
int LocklessQueueSender::buffer_index() const {
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index d6fb72f..f7a85c3 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -282,10 +282,19 @@
// scoped to this object's lifetime.
class LocklessQueueSender {
public:
+ // Enum of possible sending errors
+ // Send returns GOOD if the messages was sent successfully, INVALID_REDZONE if
+ // one of a message's redzones has invalid data, or MESSAGES_SENT_TOO_FAST if
+ // more than queue_size messages were going to be sent in a
+ // channel_storage_duration_.
+ enum class Result { GOOD, INVALID_REDZONE, MESSAGES_SENT_TOO_FAST };
+
LocklessQueueSender(const LocklessQueueSender &) = delete;
LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
LocklessQueueSender(LocklessQueueSender &&other)
- : memory_(other.memory_), sender_index_(other.sender_index_) {
+ : memory_(other.memory_),
+ sender_index_(other.sender_index_),
+ channel_storage_duration_(other.channel_storage_duration_) {
other.memory_ = nullptr;
other.sender_index_ = -1;
}
@@ -299,7 +308,8 @@
// Creates a sender. If we couldn't allocate a sender, returns nullopt.
// TODO(austin): Change the API if we find ourselves with more errors.
- static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
+ static std::optional<LocklessQueueSender> Make(
+ LocklessQueue queue, monotonic_clock::duration channel_storage_duration);
// Sends a message without copying the data.
// Copy at most size() bytes of data into the memory pointed to by Data(),
@@ -307,34 +317,43 @@
// Note: calls to Data() are expensive enough that you should cache it.
size_t size() const;
void *Data();
- bool Send(size_t length, monotonic_clock::time_point monotonic_remote_time,
- realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index, const UUID &source_boot_uuid,
- monotonic_clock::time_point *monotonic_sent_time = nullptr,
- realtime_clock::time_point *realtime_sent_time = nullptr,
- uint32_t *queue_index = nullptr);
+ LocklessQueueSender::Result Send(
+ size_t length, monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid,
+ monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
// Sends up to length data. Does not wakeup the target.
- bool Send(const char *data, size_t length,
- monotonic_clock::time_point monotonic_remote_time,
- realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index, const UUID &source_boot_uuid,
- monotonic_clock::time_point *monotonic_sent_time = nullptr,
- realtime_clock::time_point *realtime_sent_time = nullptr,
- uint32_t *queue_index = nullptr);
+ LocklessQueueSender::Result Send(
+ const char *data, size_t length,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid,
+ monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
int buffer_index() const;
private:
- LocklessQueueSender(LocklessQueueMemory *memory);
+ LocklessQueueSender(LocklessQueueMemory *memory,
+ monotonic_clock::duration channel_storage_duration);
// Pointer to the backing memory.
LocklessQueueMemory *memory_ = nullptr;
// Index into the sender list.
int sender_index_ = -1;
+
+ // Storage duration of the channel used to check if messages were sent too
+ // fast
+ const monotonic_clock::duration channel_storage_duration_;
};
+std::ostream &operator<<(std::ostream &os, const LocklessQueueSender::Result r);
+
// Pinner for blocks of data. The resources associated with a pinner are
// scoped to this object's lifetime.
class LocklessQueuePinner {
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 2217811..c5edb9e 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -558,6 +558,9 @@
static int kPinnedMessageIndex = 0;
+constexpr monotonic_clock::duration kChannelStorageDuration =
+ std::chrono::milliseconds(500);
+
} // namespace
// Tests that death during sends is recovered from correctly.
@@ -575,7 +578,7 @@
config.num_watchers = 2;
config.num_senders = 2;
config.num_pinners = 1;
- config.queue_size = 2;
+ config.queue_size = 10;
config.message_data_size = 32;
TestShmRobustness(
@@ -596,14 +599,16 @@
config);
// Now try to write some messages. We will get killed a bunch as this
// tries to happen.
- LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
+ LocklessQueueSender sender =
+ LocklessQueueSender::Make(queue, kChannelStorageDuration).value();
LocklessQueuePinner pinner = LocklessQueuePinner::Make(queue).value();
for (int i = 0; i < 5; ++i) {
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
- sender.Send(data, s + 1, monotonic_clock::min_time,
- realtime_clock::min_time, 0xffffffffl, UUID::Zero(),
- nullptr, nullptr, nullptr);
+ ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
+ realtime_clock::min_time, 0xffffffffl,
+ UUID::Zero(), nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
// Pin a message, so when we keep writing we will exercise the pinning
// logic.
if (i == 1) {
@@ -613,7 +618,8 @@
},
[config, tid](void *raw_memory) {
::aos::ipc_lib::LocklessQueueMemory *const memory =
- reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+ reinterpret_cast< ::aos::ipc_lib::LocklessQueueMemory *>(
+ raw_memory);
// Confirm that we can create 2 senders (the number in the queue), and
// send a message. And that all the messages in the queue are valid.
LocklessQueue queue(memory, memory, config);
@@ -639,7 +645,7 @@
}
// Building and destroying a sender will clean up the queue.
- LocklessQueueSender::Make(queue).value();
+ LocklessQueueSender::Make(queue, kChannelStorageDuration).value();
if (print) {
LOG(INFO) << "Cleaned up version:";
@@ -665,19 +671,21 @@
}
{
- LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
+ LocklessQueueSender sender =
+ LocklessQueueSender::Make(queue, kChannelStorageDuration).value();
{
// Make a second sender to confirm that the slot was freed.
// If the sender doesn't get cleaned up, this will fail.
- LocklessQueueSender::Make(queue).value();
+ LocklessQueueSender::Make(queue, kChannelStorageDuration).value();
}
// Send a message to make sure that the queue still works.
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", 971);
- sender.Send(data, s + 1, monotonic_clock::min_time,
- realtime_clock::min_time, 0xffffffffl, UUID::Zero(),
- nullptr, nullptr, nullptr);
+ ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
+ realtime_clock::min_time, 0xffffffffl,
+ UUID::Zero(), nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
}
// Now loop through the queue and make sure the number in the snprintf
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 2b9f49c..57dd94b 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -16,6 +16,7 @@
#include "aos/ipc_lib/queue_racer.h"
#include "aos/ipc_lib/signalfd.h"
#include "aos/realtime.h"
+#include "aos/util/phased_loop.h"
#include "gflags/gflags.h"
#include "gtest/gtest.h"
@@ -42,6 +43,9 @@
class LocklessQueueTest : public ::testing::Test {
public:
+ static constexpr monotonic_clock::duration kChannelStorageDuration =
+ std::chrono::milliseconds(500);
+
LocklessQueueTest() {
config_.num_watchers = 10;
config_.num_senders = 100;
@@ -99,8 +103,6 @@
LocklessQueueConfiguration config_;
};
-typedef LocklessQueueTest LocklessQueueDeathTest;
-
// Tests that wakeup doesn't do anything if nothing was registered.
TEST_F(LocklessQueueTest, NoWatcherWakeup) {
LocklessQueueWakeUpper wake_upper(queue());
@@ -190,9 +192,10 @@
TEST_F(LocklessQueueTest, TooManySenders) {
::std::vector<LocklessQueueSender> senders;
for (size_t i = 0; i < config_.num_senders; ++i) {
- senders.emplace_back(LocklessQueueSender::Make(queue()).value());
+ senders.emplace_back(
+ LocklessQueueSender::Make(queue(), kChannelStorageDuration).value());
}
- EXPECT_FALSE(LocklessQueueSender::Make(queue()));
+ EXPECT_FALSE(LocklessQueueSender::Make(queue(), kChannelStorageDuration));
}
// Now, start 2 threads and have them receive the signals.
@@ -226,9 +229,11 @@
// Do a simple send test.
TEST_F(LocklessQueueTest, Send) {
- LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+ LocklessQueueSender sender =
+ LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
LocklessQueueReader reader(queue());
+ time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
// Send enough messages to wrap.
for (int i = 0; i < 20000; ++i) {
// Confirm that the queue index makes sense given the number of sends.
@@ -238,8 +243,10 @@
// Send a trivial piece of data.
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i);
- sender.Send(data, s, monotonic_clock::min_time, realtime_clock::min_time,
- 0xffffffffu, UUID::Zero(), nullptr, nullptr, nullptr);
+ EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
+ realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
+ nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
// Confirm that the queue index still makes sense. This is easier since the
// empty case has been handled.
@@ -271,6 +278,8 @@
if (read_result != LocklessQueueReader::Result::GOOD) {
EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
}
+
+ loop.SleepUntilNext();
}
}
@@ -342,7 +351,42 @@
} // namespace
-// Send enough messages to wrap the 32 bit send counter.
+class LocklessQueueTestTooFast : public LocklessQueueTest {
+ public:
+ LocklessQueueTestTooFast() {
+ // Force a scenario where senders get rate limited
+ config_.num_watchers = 1000;
+ config_.num_senders = 100;
+ config_.num_pinners = 5;
+ config_.queue_size = 100;
+ // Exercise the alignment code. This would throw off alignment.
+ config_.message_data_size = 101;
+
+ // Since our backing store is an array of uint64_t for alignment purposes,
+ // normalize by the size.
+ memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
+
+ Reset();
+ }
+};
+
+// Ensure we always return OK or MESSAGES_SENT_TOO_FAST under an extreme load
+// on the Sender Queue.
+TEST_F(LocklessQueueTestTooFast, MessagesSentTooFast) {
+ PinForTest pin_cpu;
+ uint64_t kNumMessages = 1000000;
+ QueueRacer racer(queue(),
+ {FLAGS_thread_count,
+ kNumMessages,
+ {LocklessQueueSender::Result::GOOD,
+ LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST},
+ std::chrono::milliseconds(500),
+ false});
+
+ EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+}
+
+// // Send enough messages to wrap the 32 bit send counter.
TEST_F(LocklessQueueTest, WrappedSend) {
PinForTest pin_cpu;
uint64_t kNumMessages = 0x100010000ul;
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index b738805..414b7fb 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -26,7 +26,23 @@
QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
uint64_t num_messages)
- : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
+ : queue_(queue),
+ num_threads_(num_threads),
+ num_messages_(num_messages),
+ channel_storage_duration_(std::chrono::nanoseconds(1)),
+ expected_send_results_({LocklessQueueSender::Result::GOOD}),
+ check_writes_and_reads_(true) {
+ Reset();
+}
+
+QueueRacer::QueueRacer(LocklessQueue queue,
+ const QueueRacerConfiguration &config)
+ : queue_(queue),
+ num_threads_(config.num_threads),
+ num_messages_(config.num_messages),
+ channel_storage_duration_(config.channel_storage_duration),
+ expected_send_results_(config.expected_send_results),
+ check_writes_and_reads_(config.check_writes_and_reads) {
Reset();
}
@@ -117,7 +133,9 @@
EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
// latest_queue_index is an index, not a count. So it always reads 1
// low.
- EXPECT_GE(latest_queue_index + 1, finished_writes);
+ if (check_writes_and_reads_) {
+ EXPECT_GE(latest_queue_index + 1, finished_writes);
+ }
}
}
}
@@ -133,8 +151,8 @@
}
t.thread = ::std::thread([this, &t, thread_index, &run,
write_wrap_count]() {
- // Build up a sender.
- LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
+ LocklessQueueSender sender =
+ LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
// Signal that we are ready to start sending.
@@ -176,9 +194,16 @@
}
++started_writes_;
- sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
- aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
- nullptr, nullptr, nullptr);
+ auto result =
+ sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
+ aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
+ nullptr, nullptr, nullptr);
+
+ CHECK(std::find(expected_send_results_.begin(),
+ expected_send_results_.end(),
+ result) != expected_send_results_.end())
+ << "Unexpected send result: " << result;
+
// Blank out the new scratch buffer, to catch other people using it.
{
char *const new_data = static_cast<char *>(sender.Data()) +
@@ -210,7 +235,9 @@
queue_index_racer.join();
}
- CheckReads(race_reads, write_wrap_count, &threads);
+ if (check_writes_and_reads_) {
+ CheckReads(race_reads, write_wrap_count, &threads);
+ }
// Reap all the threads.
if (race_reads) {
@@ -221,26 +248,28 @@
queue_index_racer.join();
}
- // Confirm that the number of writes matches the expected number of writes.
- ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
- started_writes_);
- ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
- finished_writes_);
+ if (check_writes_and_reads_) {
+ // Confirm that the number of writes matches the expected number of writes.
+ ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+ started_writes_);
+ ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
+ finished_writes_);
- // And that every thread sent the right number of messages.
- for (ThreadState &t : threads) {
- if (will_wrap) {
- if (!race_reads) {
- // If we are wrapping, there is a possibility that a thread writes
- // everything *before* we can read any of it, and it all gets
- // overwritten.
- ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
- t.event_count == (1 + write_wrap_count) * num_messages_)
- << ": Got " << t.event_count << " events, expected "
- << (1 + write_wrap_count) * num_messages_;
+ // And that every thread sent the right number of messages.
+ for (ThreadState &t : threads) {
+ if (will_wrap) {
+ if (!race_reads) {
+ // If we are wrapping, there is a possibility that a thread writes
+ // everything *before* we can read any of it, and it all gets
+ // overwritten.
+ ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
+ t.event_count == (1 + write_wrap_count) * num_messages_)
+ << ": Got " << t.event_count << " events, expected "
+ << (1 + write_wrap_count) * num_messages_;
+ }
+ } else {
+ ASSERT_EQ(t.event_count, num_messages_);
}
- } else {
- ASSERT_EQ(t.event_count, num_messages_);
}
}
}
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index ea0238e..3e5ca94 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -10,11 +10,28 @@
struct ThreadState;
+struct QueueRacerConfiguration {
+ // Number of threads that send messages
+ const int num_threads;
+ // Number of messages sent by each thread
+ const uint64_t num_messages;
+ // Allows QueueRacer to check for multiple returns from calling Send()
+ const std::vector<LocklessQueueSender::Result> expected_send_results = {
+ LocklessQueueSender::Result::GOOD};
+ // Channel Storage Duration for queue used by QueueRacer
+ const monotonic_clock::duration channel_storage_duration =
+ std::chrono::nanoseconds(1);
+ // Set to true if all writes and reads are expected to be successful
+ // This allows QueueRacer to be used for checking failure scenarios
+ const bool check_writes_and_reads;
+};
+
// Class to test the queue by spinning up a bunch of writing threads and racing
// them together to all write at once.
class QueueRacer {
public:
QueueRacer(LocklessQueue queue, int num_threads, uint64_t num_messages);
+ QueueRacer(LocklessQueue queue, const QueueRacerConfiguration &config);
// Runs an iteration of the race.
//
@@ -52,7 +69,10 @@
LocklessQueue queue_;
const uint64_t num_threads_;
const uint64_t num_messages_;
-
+ const monotonic_clock::duration channel_storage_duration_;
+ // Allows QueueRacer to check for multiple returns from calling Send()
+ const std::vector<LocklessQueueSender::Result> expected_send_results_;
+ const bool check_writes_and_reads_;
// The overall number of writes executed will always be between the two of
// these. We can't atomically count writes, so we have to bound them.
//
diff --git a/aos/network/message_bridge_test_combined_timestamps_common.json b/aos/network/message_bridge_test_combined_timestamps_common.json
index 74c932d..be79014 100644
--- a/aos/network/message_bridge_test_combined_timestamps_common.json
+++ b/aos/network/message_bridge_test_combined_timestamps_common.json
@@ -20,7 +20,7 @@
"name": "/pi1/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "pi1",
- "frequency": 10,
+ "frequency": 15,
"max_size": 200,
"destination_nodes": [
{
@@ -36,7 +36,7 @@
"name": "/pi2/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "pi2",
- "frequency": 10,
+ "frequency": 15,
"max_size": 200,
"destination_nodes": [
{
@@ -64,25 +64,25 @@
"name": "/pi1/aos",
"type": "aos.message_bridge.ClientStatistics",
"source_node": "pi1",
- "frequency": 2
+ "frequency": 15
},
{
"name": "/pi2/aos",
"type": "aos.message_bridge.ClientStatistics",
"source_node": "pi2",
- "frequency": 2
+ "frequency": 15
},
{
"name": "/pi1/aos/remote_timestamps/pi2",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi1",
- "frequency": 10
+ "frequency": 15
},
{
"name": "/pi2/aos/remote_timestamps/pi1",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi2",
- "frequency": 10
+ "frequency": 15
},
{
"name": "/pi1/aos",
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index 99c80a9..4623edb 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -20,7 +20,7 @@
"name": "/pi1/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "pi1",
- "frequency": 10,
+ "frequency": 15,
"max_size": 200,
"destination_nodes": [
{
@@ -36,7 +36,7 @@
"name": "/pi2/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "pi2",
- "frequency": 10,
+ "frequency": 15,
"max_size": 200,
"destination_nodes": [
{
@@ -64,43 +64,43 @@
"name": "/pi1/aos",
"type": "aos.message_bridge.ClientStatistics",
"source_node": "pi1",
- "frequency": 2
+ "frequency": 15
},
{
"name": "/pi2/aos",
"type": "aos.message_bridge.ClientStatistics",
"source_node": "pi2",
- "frequency": 2
+ "frequency": 15
},
{
"name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi1",
- "frequency": 10
+ "frequency": 15
},
{
"name": "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi2",
- "frequency": 10
+ "frequency": 15
},
{
"name": "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi1",
- "frequency": 10
+ "frequency": 15
},
{
"name": "/pi2/aos/remote_timestamps/pi1/test/aos-examples-Pong",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi2",
- "frequency": 10
+ "frequency": 15
},
{
"name": "/pi1/aos/remote_timestamps/pi2/unreliable/aos-examples-Ping",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi1",
- "frequency": 10
+ "frequency": 15
},
{
"name": "/pi1/aos",
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index cc04428..f0bcd88 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -603,7 +603,7 @@
// Tests that we don't blow up if we stop getting updates for an extended period
// of time and fall behind on fetching fron the cameras.
TEST_F(LocalizedDrivetrainTest, FetchersHandleTimeGap) {
- set_enable_cameras(true);
+ set_enable_cameras(false);
set_send_delay(std::chrono::seconds(0));
event_loop_factory()->set_network_delay(std::chrono::nanoseconds(1));
test_event_loop_
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index 1bbf42b..b13263a 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -686,8 +686,7 @@
goal_builder.add_turret(turret_offset);
goal_builder.add_shooter(shooter_offset);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
RunFor(chrono::seconds(10));
VerifyNearGoal();
@@ -733,8 +732,7 @@
goal_builder.add_turret(turret_offset);
goal_builder.add_shooter(shooter_offset);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
// Give it a lot of time to get there.
@@ -775,8 +773,7 @@
goal_builder.add_turret(turret_offset);
goal_builder.add_shooter(shooter_offset);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
RunFor(chrono::seconds(8));
VerifyNearGoal();
@@ -810,8 +807,7 @@
goal_builder.add_turret(turret_offset);
goal_builder.add_shooter(shooter_offset);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
superstructure_plant_.set_peak_hood_velocity(23.0);
// 30 hz sin wave on the hood causes acceleration to be ignored.
@@ -865,8 +861,7 @@
goal_builder.add_shooter(shooter_offset);
goal_builder.add_shooting(true);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
// In the beginning, the finisher and accelerator should not be ready
@@ -911,8 +906,7 @@
goal_builder.add_intake(intake_offset);
goal_builder.add_shooter(shooter_offset);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
// Give it a lot of time to get there.
@@ -961,8 +955,7 @@
goal_builder.add_climber_voltage(-10.0);
goal_builder.add_turret(turret_offset);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
// The turret needs to move out of the way first. This takes some time.
@@ -986,8 +979,7 @@
goal_builder.add_climber_voltage(10.0);
goal_builder.add_turret(turret_offset);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
RunFor(chrono::seconds(1));
@@ -1175,6 +1167,8 @@
"y2020.control_loops.superstructure.Status");
reader_.RemapLoggedChannel("/superstructure",
"y2020.control_loops.superstructure.Output");
+ reader_.RemapLoggedChannel("/drivetrain",
+ "frc971.control_loops.drivetrain.Status");
reader_.Register();
roborio_ = aos::configuration::GetNode(reader_.configuration(), "roborio");
@@ -1221,32 +1215,27 @@
constexpr double kShotDistance = 2.5;
const auto target = turret::OuterPortPose(aos::Alliance::kRed);
- // There was no target when this log was taken so send a position within range
- // of the interpolation table.
- test_event_loop_->AddPhasedLoop(
- [&](int) {
- auto builder = drivetrain_status_sender_.MakeBuilder();
+ // There was no target when this log was taken, so send a position within
+ // range of the interpolation table.
+ {
+ auto builder = drivetrain_status_sender_.MakeBuilder();
- const auto localizer_offset =
- builder
- .MakeBuilder<
- frc971::control_loops::drivetrain::LocalizerState>()
- .Finish();
+ const auto localizer_offset =
+ builder.MakeBuilder<frc971::control_loops::drivetrain::LocalizerState>()
+ .Finish();
- auto drivetrain_status_builder =
- builder.MakeBuilder<DrivetrainStatus>();
+ auto drivetrain_status_builder = builder.MakeBuilder<DrivetrainStatus>();
- // Set the robot up at kShotAngle off from the target, kShotDistance
- // away.
- drivetrain_status_builder.add_x(target.abs_pos().x() +
- std::cos(kShotAngle) * kShotDistance);
- drivetrain_status_builder.add_y(target.abs_pos().y() +
- std::sin(kShotAngle) * kShotDistance);
- drivetrain_status_builder.add_localizer(localizer_offset);
+ // Set the robot up at kShotAngle off from the target, kShotDistance
+ // away.
+ drivetrain_status_builder.add_x(target.abs_pos().x() +
+ std::cos(kShotAngle) * kShotDistance);
+ drivetrain_status_builder.add_y(target.abs_pos().y() +
+ std::sin(kShotAngle) * kShotDistance);
+ drivetrain_status_builder.add_localizer(localizer_offset);
- builder.CheckOk(builder.Send(drivetrain_status_builder.Finish()));
- },
- frc971::controls::kLoopFrequency);
+ builder.CheckOk(builder.Send(drivetrain_status_builder.Finish()));
+ }
reader_.event_loop_factory()->Run();
@@ -1289,8 +1278,7 @@
goal_builder.add_turret_tracking(true);
- ASSERT_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ ASSERT_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
{
@@ -1347,8 +1335,7 @@
goal_builder.add_shooter(shooter_goal);
goal_builder.add_hood(hood_offset);
- CHECK_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ CHECK_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
RunFor(chrono::seconds(10));
@@ -1415,8 +1402,7 @@
goal_builder.add_shooter_tracking(true);
goal_builder.add_hood_tracking(true);
- CHECK_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ CHECK_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
RunFor(chrono::seconds(10));
@@ -1486,8 +1472,7 @@
goal_builder.add_shooter_tracking(true);
goal_builder.add_hood_tracking(true);
- CHECK_EQ(builder.Send(goal_builder.Finish()),
- aos::RawSender::Error::kOk);
+ CHECK_EQ(builder.Send(goal_builder.Finish()), aos::RawSender::Error::kOk);
}
RunFor(chrono::seconds(10));
diff --git a/y2020/y2020_pi_template.json b/y2020/y2020_pi_template.json
index 5ad87eb..0f8d268 100644
--- a/y2020/y2020_pi_template.json
+++ b/y2020/y2020_pi_template.json
@@ -120,7 +120,7 @@
"name": "/pi{{ NUM }}/camera/detailed",
"type": "frc971.vision.sift.ImageMatchResult",
"source_node": "pi{{ NUM }}",
- "frequency": 25,
+ "frequency": 30,
"max_size": 1000000
},
{