Add applications to benchmark various IPC mechanisms

Change-Id: I1d4f225bbf4be8a4f67055e3694f1bd664097edd
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 75452bd..1916e7a 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -223,3 +223,68 @@
         "@com_github_google_glog//:glog",
     ],
 )
+
+cc_library(
+    name = "latency_lib",
+    srcs = ["latency_lib.cc"],
+    hdrs = ["latency_lib.h"],
+    deps = [
+        "//aos:realtime",
+        "//aos/logging",
+        "//aos/time",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_binary(
+    name = "signal_stress",
+    srcs = [
+        "signal_stress.cc",
+    ],
+    deps = [
+        ":latency_lib",
+        "//aos:init",
+        "//aos/events:epoll",
+        "@com_github_gflags_gflags//:gflags",
+    ],
+)
+
+cc_binary(
+    name = "futex_latency",
+    srcs = [
+        "futex_latency.cc",
+    ],
+    deps = [
+        ":latency_lib",
+        "//aos:condition",
+        "//aos:init",
+        "//aos/mutex",
+        "@com_github_gflags_gflags//:gflags",
+    ],
+)
+
+cc_binary(
+    name = "named_pipe_latency",
+    srcs = [
+        "named_pipe_latency.cc",
+    ],
+    deps = [
+        ":latency_lib",
+        "//aos:init",
+        "//aos/events:epoll",
+        "@com_github_gflags_gflags//:gflags",
+    ],
+)
+
+cc_binary(
+    name = "eventfd_latency",
+    srcs = [
+        "eventfd_latency.cc",
+    ],
+    deps = [
+        ":latency_lib",
+        "//aos:init",
+        "//aos/events:epoll",
+        "@com_github_gflags_gflags//:gflags",
+    ],
+)
diff --git a/aos/ipc_lib/eventfd_latency.cc b/aos/ipc_lib/eventfd_latency.cc
new file mode 100644
index 0000000..a281a4c
--- /dev/null
+++ b/aos/ipc_lib/eventfd_latency.cc
@@ -0,0 +1,168 @@
+#include "gflags/gflags.h"
+
+#include <sys/eventfd.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <chrono>
+#include <random>
+#include <thread>
+
+#include "aos/events/epoll.h"
+#include "aos/init.h"
+#include "aos/ipc_lib/latency_lib.h"
+#include "aos/logging/implementations.h"
+#include "aos/logging/logging.h"
+#include "aos/realtime.h"
+#include "aos/time/time.h"
+
+// This is a demo program which uses named pipes to communicate.
+// It measures both latency of a random timer thread, and latency of the
+// pipe.
+
+DEFINE_int32(seconds, 10, "Duration of the test to run");
+DEFINE_int32(
+    latency_threshold, 1000,
+    "Disable tracing when anything takes more than this many microseoncds");
+DEFINE_int32(core, 7, "Core to pin to");
+DEFINE_int32(sender_priority, 53, "RT priority to send at");
+DEFINE_int32(receiver_priority, 52, "RT priority to receive at");
+DEFINE_int32(timer_priority, 51, "RT priority to spin the timer at");
+
+DEFINE_bool(log_latency, false, "If true, log the latency");
+
+namespace chrono = ::std::chrono;
+
+namespace aos {
+
+void SenderThread(int fd) {
+  const monotonic_clock::time_point end_time =
+      monotonic_clock::now() + chrono::seconds(FLAGS_seconds);
+  // Standard mersenne_twister_engine seeded with 0
+  ::std::mt19937 generator(0);
+
+  // Sleep between 1 and 15 ms.
+  ::std::uniform_int_distribution<> distribution(1000, 15000);
+
+  PinCurrentThreadToCPU(FLAGS_core);
+  SetCurrentThreadRealtimePriority(FLAGS_sender_priority);
+  while (true) {
+    const monotonic_clock::time_point wakeup_time =
+        monotonic_clock::now() + chrono::microseconds(distribution(generator));
+
+    ::std::this_thread::sleep_until(wakeup_time);
+    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+    char sent_time_buffer[8];
+    memcpy(sent_time_buffer, &monotonic_now, sizeof(sent_time_buffer));
+    PCHECK(write(fd, sent_time_buffer, sizeof(sent_time_buffer)));
+
+    if (monotonic_now > end_time) {
+      break;
+    }
+  }
+
+  {
+    ::std::this_thread::sleep_for(chrono::milliseconds(100));
+    const monotonic_clock::time_point stop_time(chrono::nanoseconds(1));
+    char sent_time_buffer[8];
+    memcpy(sent_time_buffer, &stop_time, sizeof(sent_time_buffer));
+    PCHECK(write(fd, sent_time_buffer, sizeof(sent_time_buffer)));
+  }
+  UnsetCurrentThreadRealtimePriority();
+}
+
+void ReceiverThread(int fd) {
+  Tracing t;
+  t.Start();
+
+  chrono::nanoseconds max_wakeup_latency = chrono::nanoseconds(0);
+
+  chrono::nanoseconds sum_latency = chrono::nanoseconds(0);
+  int latency_count = 0;
+
+  internal::EPoll epoll;
+
+  epoll.OnReadable(fd, [&t, &epoll, &max_wakeup_latency, &sum_latency,
+                        &latency_count, fd]() {
+    char sent_time_buffer[8];
+    const int ret = read(fd, static_cast<void *>(sent_time_buffer),
+                         sizeof(sent_time_buffer));
+    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+    CHECK_EQ(ret, 8);
+
+    monotonic_clock::time_point sent_time;
+    memcpy(&sent_time, sent_time_buffer, sizeof(sent_time_buffer));
+
+    if (sent_time == monotonic_clock::time_point(chrono::nanoseconds(1))) {
+      epoll.Quit();
+      return;
+    }
+
+    const chrono::nanoseconds wakeup_latency = monotonic_now - sent_time;
+
+    sum_latency += wakeup_latency;
+    ++latency_count;
+
+    max_wakeup_latency = ::std::max(wakeup_latency, max_wakeup_latency);
+
+    if (wakeup_latency > chrono::microseconds(FLAGS_latency_threshold)) {
+      t.Stop();
+      AOS_LOG(INFO, "Stopped tracing, latency %" PRId64 "\n",
+              static_cast<int64_t>(wakeup_latency.count()));
+    }
+
+    if (FLAGS_log_latency) {
+      AOS_LOG(INFO, "dt: %8d.%03d\n",
+              static_cast<int>(wakeup_latency.count() / 1000),
+              static_cast<int>(wakeup_latency.count() % 1000));
+    }
+  });
+
+  PinCurrentThreadToCPU(FLAGS_core);
+  SetCurrentThreadRealtimePriority(FLAGS_receiver_priority);
+  epoll.Run();
+  UnsetCurrentThreadRealtimePriority();
+  epoll.DeleteFd(fd);
+
+  const chrono::nanoseconds average_latency = sum_latency / latency_count;
+
+  AOS_LOG(INFO,
+          "Max eventfd wakeup latency: %d.%03d microseconds, average: %d.%03d "
+          "microseconds\n",
+          static_cast<int>(max_wakeup_latency.count() / 1000),
+          static_cast<int>(max_wakeup_latency.count() % 1000),
+          static_cast<int>(average_latency.count() / 1000),
+          static_cast<int>(average_latency.count() % 1000));
+}
+
+int Main(int /*argc*/, char ** /*argv*/) {
+  AOS_LOG(INFO, "Main!\n");
+  ::std::thread t([]() {
+    TimerThread(monotonic_clock::now() + chrono::seconds(FLAGS_seconds),
+                FLAGS_timer_priority);
+  });
+
+  int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+  PCHECK(fd);
+
+  ::std::thread st([&fd]() { SenderThread(fd); });
+
+  ReceiverThread(fd);
+  st.join();
+
+  PCHECK(close(fd));
+
+  t.join();
+  return 0;
+}
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  ::aos::logging::Init();
+  ::aos::logging::SetImplementation(
+      new ::aos::logging::StreamLogImplementation(stdout));
+
+  return ::aos::Main(argc, argv);
+}
diff --git a/aos/ipc_lib/futex_latency.cc b/aos/ipc_lib/futex_latency.cc
new file mode 100644
index 0000000..60e7a70
--- /dev/null
+++ b/aos/ipc_lib/futex_latency.cc
@@ -0,0 +1,172 @@
+#include "gflags/gflags.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <chrono>
+#include <random>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/init.h"
+#include "aos/ipc_lib/latency_lib.h"
+#include "aos/logging/implementations.h"
+#include "aos/logging/logging.h"
+#include "aos/mutex/mutex.h"
+#include "aos/realtime.h"
+#include "aos/time/time.h"
+
+DEFINE_int32(seconds, 10, "Duration of the test to run");
+DEFINE_int32(
+    latency_threshold, 1000,
+    "Disable tracing when anything takes more than this many microseoncds");
+DEFINE_int32(core, 7, "Core to pin to");
+DEFINE_int32(sender_priority, 53, "RT priority to send at");
+DEFINE_int32(receiver_priority, 52, "RT priority to receive at");
+DEFINE_int32(timer_priority, 51, "RT priority to spin the timer at");
+
+DEFINE_bool(log_latency, false, "If true, log the latency");
+
+const uint32_t kSignalNumber = SIGRTMIN + 1;
+const uint32_t kQuitSignalNumber = SIGRTMIN + 2;
+
+namespace chrono = ::std::chrono;
+
+namespace aos {
+
+struct WakeupData {
+  Mutex mutex;
+  Condition condition;
+
+  WakeupData() : condition(&mutex) {}
+
+  monotonic_clock::time_point wakeup_time = monotonic_clock::epoch();
+
+  bool done = false;
+};
+
+void SenderThread(WakeupData *data) {
+  const monotonic_clock::time_point end_time =
+      monotonic_clock::now() + chrono::seconds(FLAGS_seconds);
+  // Standard mersenne_twister_engine seeded with 0
+  ::std::mt19937 generator(0);
+
+  // Sleep between 1 and 15 ms.
+  ::std::uniform_int_distribution<> distribution(1000, 15000);
+
+  PinCurrentThreadToCPU(FLAGS_core);
+  SetCurrentThreadRealtimePriority(FLAGS_sender_priority);
+  while (true) {
+    const monotonic_clock::time_point wakeup_time =
+        monotonic_clock::now() + chrono::microseconds(distribution(generator));
+
+    ::std::this_thread::sleep_until(wakeup_time);
+    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+
+    {
+      MutexLocker locker(&data->mutex);
+      data->wakeup_time = monotonic_now;
+      data->condition.Broadcast();
+
+      if (monotonic_now > end_time) {
+        break;
+      }
+    }
+  }
+
+  {
+    MutexLocker locker(&data->mutex);
+    data->done = true;
+    data->condition.Broadcast();
+  }
+
+  UnsetCurrentThreadRealtimePriority();
+}
+
+void ReceiverThread(WakeupData *data) {
+  Tracing t;
+  t.Start();
+
+  chrono::nanoseconds max_wakeup_latency = chrono::nanoseconds(0);
+  chrono::nanoseconds sum_latency = chrono::nanoseconds(0);
+  int latency_count = 0;
+
+  PinCurrentThreadToCPU(FLAGS_core);
+  SetCurrentThreadRealtimePriority(FLAGS_receiver_priority);
+  while (true) {
+    chrono::nanoseconds wakeup_latency;
+    {
+      MutexLocker locker(&data->mutex);
+      while (data->wakeup_time == monotonic_clock::epoch() && !data->done) {
+        CHECK(!data->condition.Wait());
+      }
+
+      const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+
+      if (data->done) {
+        break;
+      }
+
+      wakeup_latency = monotonic_now - data->wakeup_time;
+      data->wakeup_time = monotonic_clock::epoch();
+    }
+
+    sum_latency += wakeup_latency;
+    ++latency_count;
+
+    max_wakeup_latency = ::std::max(wakeup_latency, max_wakeup_latency);
+
+    if (wakeup_latency > chrono::microseconds(FLAGS_latency_threshold)) {
+      t.Stop();
+      AOS_LOG(INFO, "Stopped tracing, latency %" PRId64 "\n",
+              static_cast<int64_t>(wakeup_latency.count()));
+    }
+
+    if (FLAGS_log_latency) {
+      AOS_LOG(INFO, "dt: %8d.%03d\n",
+              static_cast<int>(wakeup_latency.count() / 1000),
+              static_cast<int>(wakeup_latency.count() % 1000));
+    }
+  }
+  UnsetCurrentThreadRealtimePriority();
+
+  const chrono::nanoseconds average_latency = sum_latency / latency_count;
+
+  AOS_LOG(INFO,
+          "Max futex wakeup latency: %d.%03d microseconds, average: %d.%03d "
+          "microseconds\n",
+          static_cast<int>(max_wakeup_latency.count() / 1000),
+          static_cast<int>(max_wakeup_latency.count() % 1000),
+          static_cast<int>(average_latency.count() / 1000),
+          static_cast<int>(average_latency.count() % 1000));
+}
+
+int Main(int /*argc*/, char ** /*argv*/) {
+  WakeupData data;
+
+  AOS_LOG(INFO, "Main!\n");
+  ::std::thread t([]() {
+    TimerThread(monotonic_clock::now() + chrono::seconds(FLAGS_seconds),
+                FLAGS_timer_priority);
+  });
+
+  ::std::thread st([&data]() { SenderThread(&data); });
+
+  ReceiverThread(&data);
+
+  st.join();
+  t.join();
+  return 0;
+}
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  ::aos::logging::Init();
+  ::aos::logging::SetImplementation(
+      new ::aos::logging::StreamLogImplementation(stdout));
+
+  return ::aos::Main(argc, argv);
+}
diff --git a/aos/ipc_lib/histogram.gnuplot b/aos/ipc_lib/histogram.gnuplot
new file mode 100755
index 0000000..3ab0868
--- /dev/null
+++ b/aos/ipc_lib/histogram.gnuplot
@@ -0,0 +1,22 @@
+#!/usr/bin/gnuplot
+
+n=1000 #number of intervals
+max=500. #max value
+min=0 #min value
+width=(max-min)/n #interval width
+#function used to map a value to the intervals
+hist(x,width)=width*floor(x/width)+width/2.0
+set boxwidth width*0.9
+set style fill solid 0.5 # fill style
+
+set logscale y
+set yrange [0.1:]
+set xrange [0:200]
+
+#count and plot
+plot "/tmp/st_csv" u (hist($1,width)):(1.0) smooth freq w boxes, \
+     "/tmp/pt_csv" u (hist($1,width)):(1.0) smooth freq w boxes, \
+     "/tmp/ft_csv" u (hist($1,width)):(1.0) smooth freq w boxes
+
+pause -1
+
diff --git a/aos/ipc_lib/latency_lib.cc b/aos/ipc_lib/latency_lib.cc
new file mode 100644
index 0000000..0fcb8de
--- /dev/null
+++ b/aos/ipc_lib/latency_lib.cc
@@ -0,0 +1,45 @@
+#include "aos/ipc_lib/latency_lib.h"
+
+#include <chrono>
+#include <random>
+#include <thread>
+
+#include "aos/logging/logging.h"
+#include "aos/realtime.h"
+#include "aos/time/time.h"
+
+namespace aos {
+
+namespace chrono = std::chrono;
+
+void TimerThread(const monotonic_clock::time_point end_time,
+                 int timer_priority) {
+  // Standard mersenne_twister_engine seeded with 0
+  ::std::mt19937 generator(0);
+
+  // Sleep between 1 and 15 ms.
+  ::std::uniform_int_distribution<> distribution(1000, 15000);
+
+  chrono::nanoseconds max_wakeup_latency = chrono::nanoseconds(0);
+
+  SetCurrentThreadRealtimePriority(timer_priority);
+  while (true) {
+    const monotonic_clock::time_point wakeup_time =
+        monotonic_clock::now() + chrono::microseconds(distribution(generator));
+
+    ::std::this_thread::sleep_until(wakeup_time);
+    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+    const chrono::nanoseconds wakeup_latency = monotonic_now - wakeup_time;
+
+    max_wakeup_latency = ::std::max(wakeup_latency, max_wakeup_latency);
+
+    if (monotonic_now > end_time) {
+      break;
+    }
+  }
+  AOS_LOG(INFO, "Max wakeup latency: %d.%d microseconds\n",
+          static_cast<int>(max_wakeup_latency.count() / 1000),
+          static_cast<int>(max_wakeup_latency.count() % 1000));
+}
+
+}  // namespace aos
diff --git a/aos/ipc_lib/latency_lib.h b/aos/ipc_lib/latency_lib.h
new file mode 100644
index 0000000..4d9b912
--- /dev/null
+++ b/aos/ipc_lib/latency_lib.h
@@ -0,0 +1,44 @@
+#ifndef AOS_IPC_LIB_LATENCY_LIB_H_
+#define AOS_IPC_LIB_LATENCY_LIB_H_
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <chrono>
+
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+void TimerThread(monotonic_clock::time_point end_time, int timer_priority);
+
+class Tracing {
+ public:
+  Tracing() {
+    SetContentsOrDie("/sys/kernel/debug/tracing/events/enable", "1\n");
+    fd_ = open("/sys/kernel/debug/tracing/tracing_on",
+               O_WRONLY | O_TRUNC | O_CLOEXEC, 0);
+    PCHECK(fd_);
+  }
+
+  ~Tracing() { close(fd_); }
+
+  void Start() { PCHECK(write(fd_, "1\n", 2)); }
+
+  void Stop() { PCHECK(write(fd_, "0\n", 2)); }
+
+ private:
+  void SetContentsOrDie(const char *filename, const char *data) {
+    int fd = open(filename, O_WRONLY | O_TRUNC | O_CLOEXEC);
+    PCHECK(fd);
+    PCHECK(write(fd, data, strlen(data)));
+    PCHECK(close(fd));
+  }
+
+  int fd_;
+};
+
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_LATENCY_LIB_H_
diff --git a/aos/ipc_lib/named_pipe_latency.cc b/aos/ipc_lib/named_pipe_latency.cc
new file mode 100644
index 0000000..6683f59
--- /dev/null
+++ b/aos/ipc_lib/named_pipe_latency.cc
@@ -0,0 +1,174 @@
+#include "gflags/gflags.h"
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <chrono>
+#include <random>
+#include <thread>
+
+#include "aos/events/epoll.h"
+#include "aos/init.h"
+#include "aos/ipc_lib/latency_lib.h"
+#include "aos/logging/implementations.h"
+#include "aos/logging/logging.h"
+#include "aos/realtime.h"
+#include "aos/time/time.h"
+
+// This is a demo program which uses named pipes to communicate.
+// It measures both latency of a random timer thread, and latency of the
+// pipe.
+
+DEFINE_bool(sender, true, "If true, send signals to the other process.");
+DEFINE_string(fifo, "/dev/shm/aos/named_pipe_latency", "FIFO to use for the test.");
+DEFINE_int32(seconds, 10, "Duration of the test to run");
+DEFINE_int32(
+    latency_threshold, 1000,
+    "Disable tracing when anything takes more than this many microseoncds");
+DEFINE_int32(core, 7, "Core to pin to");
+DEFINE_int32(sender_priority, 53, "RT priority to send at");
+DEFINE_int32(receiver_priority, 52, "RT priority to receive at");
+DEFINE_int32(timer_priority, 51, "RT priority to spin the timer at");
+
+DEFINE_bool(log_latency, false, "If true, log the latency");
+
+namespace chrono = ::std::chrono;
+
+namespace aos {
+
+void SenderThread() {
+  int pipefd = open(FLAGS_fifo.c_str(), FD_CLOEXEC | O_NONBLOCK | O_WRONLY | O_NOATIME);
+  const monotonic_clock::time_point end_time =
+      monotonic_clock::now() + chrono::seconds(FLAGS_seconds);
+  // Standard mersenne_twister_engine seeded with 0
+  ::std::mt19937 generator(0);
+
+  // Sleep between 1 and 15 ms.
+  ::std::uniform_int_distribution<> distribution(1000, 15000);
+
+  PinCurrentThreadToCPU(FLAGS_core);
+  SetCurrentThreadRealtimePriority(FLAGS_sender_priority);
+  while (true) {
+    const monotonic_clock::time_point wakeup_time =
+        monotonic_clock::now() + chrono::microseconds(distribution(generator));
+
+    ::std::this_thread::sleep_until(wakeup_time);
+    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+    char sent_time_buffer[8];
+    memcpy(sent_time_buffer, &monotonic_now, sizeof(sent_time_buffer));
+    PCHECK(write(pipefd, static_cast<void *>(sent_time_buffer),
+                 sizeof(sent_time_buffer)));
+
+    if (monotonic_now > end_time) {
+      break;
+    }
+  }
+
+  {
+    char sent_time_buffer[8];
+    memset(sent_time_buffer, 0, sizeof(sent_time_buffer));
+    PCHECK(write(pipefd, static_cast<void *>(sent_time_buffer),
+                 sizeof(sent_time_buffer)));
+  }
+  UnsetCurrentThreadRealtimePriority();
+
+  PCHECK(close(pipefd));
+}
+
+void ReceiverThread() {
+  int pipefd = open(FLAGS_fifo.c_str(), O_CLOEXEC | O_NONBLOCK | O_RDONLY | O_NOATIME);
+  Tracing t;
+  t.Start();
+
+  chrono::nanoseconds max_wakeup_latency = chrono::nanoseconds(0);
+
+  chrono::nanoseconds sum_latency = chrono::nanoseconds(0);
+  int latency_count = 0;
+
+  internal::EPoll epoll;
+
+  epoll.OnReadable(pipefd, [&t, &epoll, &max_wakeup_latency, &sum_latency,
+                            &latency_count, pipefd]() {
+    char sent_time_buffer[8];
+    const int ret = read(pipefd, static_cast<void *>(sent_time_buffer),
+                         sizeof(sent_time_buffer));
+    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+    CHECK_EQ(ret, 8);
+
+    monotonic_clock::time_point sent_time;
+    memcpy(&sent_time, sent_time_buffer, sizeof(sent_time_buffer));
+
+    if (sent_time == monotonic_clock::epoch()) {
+      epoll.Quit();
+      return;
+    }
+
+    const chrono::nanoseconds wakeup_latency = monotonic_now - sent_time;
+
+    sum_latency += wakeup_latency;
+    ++latency_count;
+
+    max_wakeup_latency = ::std::max(wakeup_latency, max_wakeup_latency);
+
+    if (wakeup_latency > chrono::microseconds(FLAGS_latency_threshold)) {
+      t.Stop();
+      AOS_LOG(INFO, "Stopped tracing, latency %" PRId64 "\n",
+              static_cast<int64_t>(wakeup_latency.count()));
+    }
+
+    if (FLAGS_log_latency) {
+      AOS_LOG(INFO, "dt: %8d.%03d\n",
+              static_cast<int>(wakeup_latency.count() / 1000),
+              static_cast<int>(wakeup_latency.count() % 1000));
+    }
+  });
+
+  PinCurrentThreadToCPU(FLAGS_core);
+  SetCurrentThreadRealtimePriority(FLAGS_receiver_priority);
+  epoll.Run();
+  UnsetCurrentThreadRealtimePriority();
+  epoll.DeleteFd(pipefd);
+
+  const chrono::nanoseconds average_latency = sum_latency / latency_count;
+
+  AOS_LOG(
+      INFO,
+      "Max named pip wakeup latency: %d.%03d microseconds, average: %d.%03d "
+      "microseconds\n",
+      static_cast<int>(max_wakeup_latency.count() / 1000),
+      static_cast<int>(max_wakeup_latency.count() % 1000),
+      static_cast<int>(average_latency.count() / 1000),
+      static_cast<int>(average_latency.count() % 1000));
+
+  PCHECK(close(pipefd));
+}
+
+int Main(int /*argc*/, char ** /*argv*/) {
+  mkfifo(FLAGS_fifo.c_str(), 0777);
+
+  AOS_LOG(INFO, "Main!\n");
+  ::std::thread t([]() {
+    TimerThread(monotonic_clock::now() + chrono::seconds(FLAGS_seconds),
+                FLAGS_timer_priority);
+  });
+
+  ::std::thread st(
+      []() { SenderThread(); });
+
+  ReceiverThread();
+  st.join();
+
+  t.join();
+  return 0;
+}
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  ::aos::logging::Init();
+  ::aos::logging::SetImplementation(
+      new ::aos::logging::StreamLogImplementation(stdout));
+
+  return ::aos::Main(argc, argv);
+}
diff --git a/aos/ipc_lib/signal_stress.cc b/aos/ipc_lib/signal_stress.cc
new file mode 100644
index 0000000..ffbfb06
--- /dev/null
+++ b/aos/ipc_lib/signal_stress.cc
@@ -0,0 +1,199 @@
+#include "gflags/gflags.h"
+
+#include <signal.h>
+#include <sys/signalfd.h>
+#include <chrono>
+#include <random>
+#include <thread>
+
+#include "aos/events/epoll.h"
+#include "aos/init.h"
+#include "aos/ipc_lib/latency_lib.h"
+#include "aos/logging/implementations.h"
+#include "aos/logging/logging.h"
+#include "aos/realtime.h"
+#include "aos/time/time.h"
+
+// This is a demo program which uses Real-Time posix signals to communicate.
+// It measures both latency of a random timer thread, and latency of the
+// signals.
+//
+// To enable function graph:
+//   echo "function_graph" > current_tracer
+
+DEFINE_bool(sender, true, "If true, send signals to the other process.");
+DEFINE_int32(other_pid, -1, "PID of other process to ping");
+DEFINE_int32(seconds, 10, "Duration of the test to run");
+DEFINE_int32(
+    latency_threshold, 1000,
+    "Disable tracing when anything takes more than this many microseoncds");
+DEFINE_int32(core, 7, "Core to pin to");
+DEFINE_int32(sender_priority, 53, "RT priority to send at");
+DEFINE_int32(receiver_priority, 52, "RT priority to receive at");
+DEFINE_int32(timer_priority, 51, "RT priority to spin the timer at");
+
+DEFINE_bool(log_latency, false, "If true, log the latency");
+
+const uint32_t kSignalNumber = SIGRTMIN + 1;
+const uint32_t kQuitSignalNumber = SIGRTMIN + 2;
+
+namespace chrono = ::std::chrono;
+
+namespace aos {
+
+void SenderThread() {
+  const monotonic_clock::time_point end_time =
+      monotonic_clock::now() + chrono::seconds(FLAGS_seconds);
+  // Standard mersenne_twister_engine seeded with 0
+  ::std::mt19937 generator(0);
+
+  // Sleep between 1 and 15 ms.
+  ::std::uniform_int_distribution<> distribution(1000, 15000);
+
+  int pid = getpid();
+  if (FLAGS_other_pid != -1) {
+    pid = FLAGS_other_pid;
+  }
+  AOS_LOG(INFO, "Current PID: %d\n", pid);
+
+  PinCurrentThreadToCPU(FLAGS_core);
+  SetCurrentThreadRealtimePriority(FLAGS_sender_priority);
+  while (true) {
+    const monotonic_clock::time_point wakeup_time =
+        monotonic_clock::now() + chrono::microseconds(distribution(generator));
+
+    ::std::this_thread::sleep_until(wakeup_time);
+    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+    sigval s;
+    s.sival_int = static_cast<uint32_t>(
+        static_cast<uint64_t>(monotonic_now.time_since_epoch().count()) &
+        0xfffffffful);
+
+    PCHECK(sigqueue(pid, kSignalNumber, s));
+
+    if (monotonic_now > end_time) {
+      break;
+    }
+  }
+
+  {
+    sigval s;
+    s.sival_int = 0;
+    PCHECK(sigqueue(pid, kQuitSignalNumber, s));
+  }
+  UnsetCurrentThreadRealtimePriority();
+}
+
+void ReceiverThread() {
+  int signalfd_fd;
+  Tracing t;
+  t.Start();
+
+  sigset_t x;
+  sigemptyset(&x);
+  sigaddset(&x, kSignalNumber);
+  sigaddset(&x, kQuitSignalNumber);
+
+  PCHECK(signalfd_fd = signalfd(-1, &x, SFD_NONBLOCK | SFD_CLOEXEC));
+  chrono::nanoseconds max_wakeup_latency = chrono::nanoseconds(0);
+
+  chrono::nanoseconds sum_latency = chrono::nanoseconds(0);
+  int latency_count = 0;
+
+  internal::EPoll epoll;
+
+  epoll.OnReadable(signalfd_fd, [&t, &epoll, &max_wakeup_latency, &sum_latency,
+                                 &latency_count, signalfd_fd]() {
+    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+    signalfd_siginfo si;
+    const int ret =
+        read(signalfd_fd, static_cast<void *>(&si), sizeof(signalfd_siginfo));
+    CHECK_EQ(ret, static_cast<int>(sizeof(signalfd_siginfo)));
+
+    if (si.ssi_signo == kQuitSignalNumber) {
+      epoll.Quit();
+      return;
+    }
+
+    int64_t wakeup_latency_int64 =
+        static_cast<int64_t>(monotonic_now.time_since_epoch().count()) &
+        0xfffffffful;
+
+    wakeup_latency_int64 -= static_cast<int64_t>(si.ssi_int);
+
+    if (wakeup_latency_int64 > 0x80000000l) {
+      wakeup_latency_int64 -= 0x100000000l;
+    }
+
+    const chrono::nanoseconds wakeup_latency(wakeup_latency_int64);
+
+    sum_latency += wakeup_latency;
+    ++latency_count;
+
+    max_wakeup_latency = ::std::max(wakeup_latency, max_wakeup_latency);
+
+    if (wakeup_latency > chrono::microseconds(FLAGS_latency_threshold)) {
+      t.Stop();
+      AOS_LOG(INFO, "Stopped tracing, latency %" PRId64 "\n",
+              static_cast<int64_t>(wakeup_latency.count()));
+    }
+
+    if (FLAGS_log_latency) {
+      AOS_LOG(INFO, "signo: %d, sending pid: %d, dt: %8d.%03d\n", si.ssi_signo,
+              si.ssi_pid, static_cast<int>(wakeup_latency_int64 / 1000),
+              static_cast<int>(wakeup_latency_int64 % 1000));
+    }
+  });
+
+  PinCurrentThreadToCPU(FLAGS_core);
+  SetCurrentThreadRealtimePriority(FLAGS_receiver_priority);
+  epoll.Run();
+  UnsetCurrentThreadRealtimePriority();
+  epoll.DeleteFd(signalfd_fd);
+
+  const chrono::nanoseconds average_latency = sum_latency / latency_count;
+
+  AOS_LOG(INFO,
+          "Max signal wakeup latency: %d.%03d microseconds, average: %d.%03d "
+          "microseconds\n",
+          static_cast<int>(max_wakeup_latency.count() / 1000),
+          static_cast<int>(max_wakeup_latency.count() % 1000),
+          static_cast<int>(average_latency.count() / 1000),
+          static_cast<int>(average_latency.count() % 1000));
+
+  PCHECK(close(signalfd_fd));
+}
+
+int Main(int /*argc*/, char ** /*argv*/) {
+  sigset_t x;
+  sigemptyset(&x);
+  sigaddset(&x, kSignalNumber);
+  sigaddset(&x, kQuitSignalNumber);
+  pthread_sigmask(SIG_BLOCK, &x, NULL);
+
+  AOS_LOG(INFO, "Main!\n");
+  ::std::thread t([]() {
+    TimerThread(monotonic_clock::now() + chrono::seconds(FLAGS_seconds),
+                FLAGS_timer_priority);
+  });
+
+  ::std::thread st([]() { SenderThread(); });
+
+  ReceiverThread();
+  st.join();
+
+  t.join();
+  return 0;
+}
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  ::aos::logging::Init();
+  ::aos::logging::SetImplementation(
+      new ::aos::logging::StreamLogImplementation(stdout));
+
+  return ::aos::Main(argc, argv);
+}