Merge "Make matplotlibcpp work with upstream Python"
diff --git a/aos/ftrace.cc b/aos/ftrace.cc
index 0afbcf5..9806661 100644
--- a/aos/ftrace.cc
+++ b/aos/ftrace.cc
@@ -9,13 +9,17 @@
 
 namespace aos {
 
+int MaybeCheckOpen(const char *file) {
+  if (!FLAGS_enable_ftrace) return -1;
+  int result = open(file, O_WRONLY);
+  PCHECK(result >= 0) << ": Failed to open " << file;
+  return result;
+}
+
 Ftrace::Ftrace()
-    : message_fd_(FLAGS_enable_ftrace
-                      ? open("/sys/kernel/debug/tracing/trace_marker", O_WRONLY)
-                      : -1),
-      on_fd_(FLAGS_enable_ftrace
-                 ? open("/sys/kernel/debug/tracing/tracing_on", O_WRONLY)
-                 : -1) {}
+    : message_fd_(MaybeCheckOpen("/sys/kernel/debug/tracing/trace_marker")),
+      on_fd_(MaybeCheckOpen("/sys/kernel/debug/tracing/tracing_on")) {
+}
 
 Ftrace::~Ftrace() {
   if (message_fd_ != -1) {
diff --git a/aos/ipc_lib/data_alignment.h b/aos/ipc_lib/data_alignment.h
index 72a7456..3bf38ce 100644
--- a/aos/ipc_lib/data_alignment.h
+++ b/aos/ipc_lib/data_alignment.h
@@ -9,11 +9,13 @@
 // alignment for their end. Flatbuffers aligns from the end, so this is what
 // matters.
 //
-// 64 is a reasonable choice for now:
+// 128 is a reasonable choice for now:
 //   Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
 //   cache lines.
 //   V4L2 requires 64 byte alignment for USERPTR buffers.
-static constexpr size_t kChannelDataAlignment = 64;
+//
+//   rockpi v4l2 requires 128 byte alignment for USERPTR buffers.
+static constexpr size_t kChannelDataAlignment = 128;
 
 template <typename T>
 inline void CheckChannelDataAlignment(T *data, size_t size) {
diff --git a/aos/scoped/scoped_fd.h b/aos/scoped/scoped_fd.h
index e098d2c..116985a 100644
--- a/aos/scoped/scoped_fd.h
+++ b/aos/scoped/scoped_fd.h
@@ -11,7 +11,18 @@
 class ScopedFD {
  public:
   explicit ScopedFD(int fd = -1) : fd_(fd) {}
+  ScopedFD(ScopedFD &) = delete;
+  ScopedFD(ScopedFD &&other) : ScopedFD(other.release()) {}
+
+  void operator=(const ScopedFD &) = delete;
+  void operator=(ScopedFD &&other) {
+    int tmp = fd_;
+    fd_ = other.fd_;
+    other.fd_ = tmp;
+  }
+
   ~ScopedFD() { Close(); }
+
   int get() const { return fd_; }
   int release() {
     const int r = fd_;
@@ -28,8 +39,8 @@
 
  private:
   int fd_;
+
   void Close();
-  DISALLOW_COPY_AND_ASSIGN(ScopedFD);
 };
 
 }  // namespace aos
diff --git a/y2022/image_streamer/image_streamer.cc b/y2022/image_streamer/image_streamer.cc
index 9a990ed..ef6657e 100644
--- a/y2022/image_streamer/image_streamer.cc
+++ b/y2022/image_streamer/image_streamer.cc
@@ -46,9 +46,11 @@
 GST_PLUGIN_STATIC_DECLARE(videotestsrc);
 GST_PLUGIN_STATIC_DECLARE(x264);
 }
+
 DEFINE_string(config, "y2022/aos_config.json",
               "Name of the config file to replay using.");
-DEFINE_string(device, "/dev/video0", "Camera fd");
+DEFINE_string(device, "/dev/video0",
+              "Camera fd. Ignored if reading from channel");
 DEFINE_string(data_dir, "image_streamer_www",
               "Directory to serve data files from");
 DEFINE_int32(width, 400, "Image width");
@@ -59,6 +61,10 @@
 DEFINE_int32(bitrate, 500000, "H264 encode bitrate");
 DEFINE_int32(min_port, 5800, "Min rtp port");
 DEFINE_int32(max_port, 5810, "Max rtp port");
+DEFINE_string(listen_on, "",
+              "Channel on which to receive frames from. Used in place of "
+              "internal V4L2 reader. Note: width and height MUST match the "
+              "expected size of channel images.");
 
 class Connection;
 
@@ -68,13 +74,140 @@
 using aos::web_proxy::WebSocketMessage;
 using aos::web_proxy::WebSocketSdp;
 
+class GstSampleSource {
+ public:
+  GstSampleSource() = default;
+
+  virtual ~GstSampleSource() = default;
+
+ private:
+  GstSampleSource(const GstSampleSource &) = delete;
+};
+
+class V4L2Source : public GstSampleSource {
+ public:
+  V4L2Source(std::function<void(GstSample *)> callback)
+      : callback_(std::move(callback)) {
+    GError *error = NULL;
+
+    // Create pipeline to read from camera, pack into rtp stream, and dump
+    // stream to callback. v4l2 device should already be configured with correct
+    // bitrate from v4l2-ctl. do-timestamp marks the time the frame was taken to
+    // track when it should be dropped under latency.
+
+    // With the Pi's hardware encoder, we can encode and package the stream once
+    // and the clients will jump in at any point unsynchronized. With the stream
+    // from x264enc this doesn't seem to work. For now, just reencode for each
+    // client since we don't expect more than 1 or 2.
+
+    pipeline_ = gst_parse_launch(
+        absl::StrFormat("v4l2src device=%s do-timestamp=true "
+                        "extra-controls=\"c,brightness=%d,auto_exposure=1,"
+                        "exposure_time_absolute=%d\" ! "
+                        "video/x-raw,width=%d,height=%d,framerate=%d/"
+                        "1,format=YUY2 ! appsink "
+                        "name=appsink "
+                        "emit-signals=true sync=false async=false "
+                        "caps=video/x-raw,format=YUY2",
+                        FLAGS_device, FLAGS_brightness, FLAGS_exposure,
+                        FLAGS_width, FLAGS_height, FLAGS_framerate)
+            .c_str(),
+        &error);
+
+    if (error != NULL) {
+      LOG(FATAL) << "Could not create v4l2 pipeline: " << error->message;
+    }
+
+    appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsink");
+    if (appsink_ == NULL) {
+      LOG(FATAL) << "Could not get appsink";
+    }
+
+    g_signal_connect(appsink_, "new-sample",
+                     G_CALLBACK(V4L2Source::OnSampleCallback),
+                     static_cast<gpointer>(this));
+
+    gst_element_set_state(pipeline_, GST_STATE_PLAYING);
+  }
+
+  ~V4L2Source() {
+    if (pipeline_ != NULL) {
+      gst_element_set_state(GST_ELEMENT(pipeline_), GST_STATE_NULL);
+      gst_object_unref(GST_OBJECT(pipeline_));
+      gst_object_unref(GST_OBJECT(appsink_));
+    }
+  }
+
+ private:
+  static GstFlowReturn OnSampleCallback(GstElement *, gpointer user_data) {
+    static_cast<V4L2Source *>(user_data)->OnSample();
+    return GST_FLOW_OK;
+  }
+
+  void OnSample() {
+    GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink_));
+    if (sample == NULL) {
+      LOG(WARNING) << "Received null sample";
+      return;
+    }
+    callback_(sample);
+    gst_sample_unref(sample);
+  }
+
+  GstElement *pipeline_;
+  GstElement *appsink_;
+
+  std::function<void(GstSample *)> callback_;
+};
+
+class ChannelSource : public GstSampleSource {
+ public:
+  ChannelSource(aos::ShmEventLoop *event_loop,
+                std::function<void(GstSample *)> callback)
+      : callback_(std::move(callback)) {
+    event_loop->MakeWatcher(
+        FLAGS_listen_on,
+        [this](const frc971::vision::CameraImage &image) { OnImage(image); });
+  }
+
+ private:
+  void OnImage(const frc971::vision::CameraImage &image) {
+    if (!image.has_rows() || !image.has_cols() || !image.has_data()) {
+      VLOG(2) << "Skipping CameraImage with no data";
+      return;
+    }
+    CHECK_EQ(image.rows(), FLAGS_height);
+    CHECK_EQ(image.cols(), FLAGS_width);
+
+    GBytes *bytes = g_bytes_new(image.data()->data(), image.data()->size());
+    GstBuffer *buffer = gst_buffer_new_wrapped_bytes(bytes);
+
+    GST_BUFFER_PTS(buffer) = image.monotonic_timestamp_ns();
+
+    GstCaps *caps = CHECK_NOTNULL(gst_caps_new_simple(
+        "video/x-raw", "width", G_TYPE_INT, image.cols(), "height", G_TYPE_INT,
+        image.rows(), "format", G_TYPE_STRING, "YUY2", nullptr));
+
+    GstSample *sample = gst_sample_new(buffer, caps, nullptr, nullptr);
+
+    callback_(sample);
+
+    gst_sample_unref(sample);
+    gst_caps_unref(caps);
+    gst_buffer_unref(buffer);
+    g_bytes_unref(bytes);
+  }
+
+  std::function<void(GstSample *)> callback_;
+};
+
 // Basic class that handles receiving new websocket connections. Creates a new
 // Connection to manage the rest of the negotiation and data passing. When the
 // websocket closes, it deletes the Connection.
 class WebsocketHandler : public ::seasocks::WebSocket::Handler {
  public:
   WebsocketHandler(aos::ShmEventLoop *event_loop, ::seasocks::Server *server);
-  ~WebsocketHandler() override;
+  ~WebsocketHandler() override = default;
 
   void onConnect(::seasocks::WebSocket *sock) override;
   void onData(::seasocks::WebSocket *sock, const uint8_t *data,
@@ -82,17 +215,11 @@
   void onDisconnect(::seasocks::WebSocket *sock) override;
 
  private:
-  static GstFlowReturn OnSampleCallback(GstElement *, gpointer user_data) {
-    static_cast<WebsocketHandler *>(user_data)->OnSample();
-    return GST_FLOW_OK;
-  }
-
-  void OnSample();
+  void OnSample(GstSample *sample);
 
   std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
   ::seasocks::Server *server_;
-  GstElement *pipeline_;
-  GstElement *appsink_;
+  std::unique_ptr<GstSampleSource> source_;
 
   aos::Sender<frc971::vision::CameraImage> sender_;
 };
@@ -157,56 +284,14 @@
 
 WebsocketHandler::WebsocketHandler(aos::ShmEventLoop *event_loop,
                                    ::seasocks::Server *server)
-    : server_(server),
-      sender_(event_loop->MakeSender<frc971::vision::CameraImage>("/camera")) {
-  GError *error = NULL;
-
-  // Create pipeline to read from camera, pack into rtp stream, and dump stream
-  // to callback.
-  // v4l2 device should already be configured with correct bitrate from
-  // v4l2-ctl. do-timestamp marks the time the frame was taken to track when it
-  // should be dropped under latency.
-
-  // With the Pi's hardware encoder, we can encode and package the stream once
-  // and the clients will jump in at any point unsynchronized. With the stream
-  // from x264enc this doesn't seem to work. For now, just reencode for each
-  // client since we don't expect more than 1 or 2.
-
-  pipeline_ = gst_parse_launch(
-      absl::StrFormat("v4l2src device=%s do-timestamp=true "
-                      "extra-controls=\"c,brightness=%d,auto_exposure=1,"
-                      "exposure_time_absolute=%d\" ! "
-                      "video/x-raw,width=%d,height=%d,framerate=%d/"
-                      "1,format=YUY2 ! appsink "
-                      "name=appsink "
-                      "emit-signals=true sync=false async=false "
-                      "caps=video/x-raw,format=YUY2",
-                      FLAGS_device, FLAGS_brightness, FLAGS_exposure,
-                      FLAGS_width, FLAGS_height, FLAGS_framerate)
-          .c_str(),
-      &error);
-
-  if (error != NULL) {
-    LOG(FATAL) << "Could not create v4l2 pipeline: " << error->message;
-  }
-
-  appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsink");
-  if (appsink_ == NULL) {
-    LOG(FATAL) << "Could not get appsink";
-  }
-
-  g_signal_connect(appsink_, "new-sample",
-                   G_CALLBACK(WebsocketHandler::OnSampleCallback),
-                   static_cast<gpointer>(this));
-
-  gst_element_set_state(pipeline_, GST_STATE_PLAYING);
-}
-
-WebsocketHandler::~WebsocketHandler() {
-  if (pipeline_ != NULL) {
-    gst_element_set_state(GST_ELEMENT(pipeline_), GST_STATE_NULL);
-    gst_object_unref(GST_OBJECT(pipeline_));
-    gst_object_unref(GST_OBJECT(appsink_));
+    : server_(server) {
+  if (FLAGS_listen_on.empty()) {
+    sender_ = event_loop->MakeSender<frc971::vision::CameraImage>("/camera");
+    source_ =
+        std::make_unique<V4L2Source>([this](auto sample) { OnSample(sample); });
+  } else {
+    source_ = std::make_unique<ChannelSource>(
+        event_loop, [this](auto sample) { OnSample(sample); });
   }
 }
 
@@ -221,18 +306,12 @@
   connections_[sock]->HandleWebSocketData(data, size);
 }
 
-void WebsocketHandler::OnSample() {
-  GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink_));
-  if (sample == NULL) {
-    LOG(WARNING) << "Received null sample";
-    return;
-  }
-
+void WebsocketHandler::OnSample(GstSample *sample) {
   for (auto iter = connections_.begin(); iter != connections_.end(); ++iter) {
     iter->second->OnSample(sample);
   }
 
-  {
+  if (sender_.valid()) {
     const GstCaps *caps = CHECK_NOTNULL(gst_sample_get_caps(sample));
     CHECK_GT(gst_caps_get_size(caps), 0U);
     const GstStructure *str = gst_caps_get_structure(caps, 0);
@@ -261,8 +340,6 @@
 
     builder.CheckOk(builder.Send(image_builder.Finish()));
   }
-
-  gst_sample_unref(sample);
 }
 
 void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {