Adding //aos/vision/debug:debug_framework.

This will allow easy construction of year-specific debug viewers.
Currently supported source types are:
- blobs over tcp
- jpegs from a camera
- blobs from a log
- a random list of jpegs

Change-Id: I1d73f82f98ca5f60b0135ea0dd588759056e0c40
diff --git a/aos/vision/debug/BUILD b/aos/vision/debug/BUILD
index 3b4fa7c..a96d25c 100644
--- a/aos/vision/debug/BUILD
+++ b/aos/vision/debug/BUILD
@@ -20,3 +20,33 @@
         ":overlay",
     ]
 )
+
+gtk_dependent_cc_library(
+  name = 'debug_framework',
+  srcs = [
+    'debug_framework.cc',
+    'jpeg_list-source.cc',
+    'tcp-source.cc',
+    'blob_log-source.cc',
+    'camera-source.cc'
+  ],
+  hdrs = ['debug_framework.h'],
+  deps = [
+    '//aos/common/logging:logging',
+    '//aos/common/logging:implementations',
+    '//aos/vision/blob:codec',
+    '//aos/vision/blob:range_image',
+    '//aos/vision/blob:stream_view',
+    '//aos/vision/blob:find_blob',
+    '//aos/vision/events:gtk_event',
+    '//aos/vision/events:epoll_events',
+    "//aos/vision/events:tcp_client",
+    '//aos/vision/image:jpeg_routines',
+    '//aos/vision/image:image_stream',
+    '//aos/vision/image:image_types',
+    '//aos/vision/debug:debug_viewer',
+    '//aos/common/util:global_factory',
+    '@usr_repo//:gtk+-3.0',
+  ],
+  alwayslink = 1,
+)
diff --git a/aos/vision/debug/blob_log-source.cc b/aos/vision/debug/blob_log-source.cc
new file mode 100644
index 0000000..f707342
--- /dev/null
+++ b/aos/vision/debug/blob_log-source.cc
@@ -0,0 +1,184 @@
+#include "aos/vision/debug/debug_framework.h"
+
+#include <gdk/gdk.h>
+#include <gtk/gtk.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <fstream>
+#include <functional>
+#include <string>
+
+#include "aos/vision/blob/codec.h"
+
+namespace aos {
+namespace vision {
+
+namespace {
+
+long GetFileSize(const std::string& filename) {
+  struct stat stat_buf;
+  int rc = stat(filename.c_str(), &stat_buf);
+  return rc == 0 ? stat_buf.st_size : -1;
+}
+
+// Parses the blob-log file format.
+// File format goes:
+//
+// Repeated:
+//
+// frame_length.
+// timestamp.
+// fmt.w
+// fmt.h
+// Encoded blob.
+class InputFile {
+ public:
+  InputFile(const std::string &fname)
+      : ifs_(fname, std::ifstream::in), len_(GetFileSize(fname)) {
+    if (len_ <= 0) {
+      LOG(FATAL, "File (%s) not found. Size (%d)\n", fname.c_str(), (int)len_);
+    }
+    // assert(len_ > 0);
+    tmp_buf_.resize(len_, 0);
+    ifs_.read(&tmp_buf_[0], len_);
+    buf_ = &tmp_buf_[0];
+  }
+
+  bool ReadNext(BlobList *blob_list, ImageFormat *fmt, uint64_t *timestamp) {
+    if (buf_ - &tmp_buf_[0] >= len_) return false;
+    if (prev_ != nullptr) prev_frames_.emplace_back(prev_);
+    prev_ = buf_;
+    DoRead(blob_list, fmt, timestamp);
+    return true;
+  }
+
+  bool ReadPrev(BlobList *blob_list, ImageFormat *fmt, uint64_t *timestamp) {
+    if (prev_frames_.empty()) return false;
+    buf_ = prev_frames_.back();
+    prev_frames_.pop_back();
+    buf_ += sizeof(uint32_t);
+    DoRead(blob_list, fmt, timestamp);
+    prev_ = nullptr;
+    return true;
+  }
+
+ private:
+  void DoRead(BlobList *blob_list, ImageFormat *fmt, uint64_t *timestamp) {
+    buf_ += sizeof(uint32_t);
+    *timestamp = Int64Codec::Read(&buf_);
+    fmt->w = Int32Codec::Read(&buf_);
+    fmt->h = Int32Codec::Read(&buf_);
+    buf_ = ParseBlobList(blob_list, buf_);
+  }
+  std::vector<const char *> prev_frames_;
+  const char *buf_;
+  const char *prev_ = nullptr;
+  std::ifstream ifs_;
+
+  long len_;
+  std::vector<char> tmp_buf_;
+};
+
+// A single parsed frame.
+class BlobStreamFrame {
+ public:
+  BlobList blob_list;
+  ImageFormat fmt;
+  uint64_t timestamp;
+  void ReadNext(InputFile *fin) {
+    blob_list.clear();
+    if (!fin->ReadNext(&blob_list, &fmt, &timestamp)) {
+      exit(0);
+      return;
+    }
+  }
+  bool ReadPrev(InputFile *fin) {
+    blob_list.clear();
+    return fin->ReadPrev(&blob_list, &fmt, &timestamp);
+  }
+};
+
+}  // namespace
+
+// class for installing a lambda as a gtk timeout.
+class TimeoutCallback {
+ public:
+  TimeoutCallback() {}
+
+  void Reset(guint32 interval, std::function<bool()> callback) {
+    Stop();
+    callback_ = callback;
+    timeout_key_ = g_timeout_add(interval, &TimeoutCallback::Callback, this);
+  }
+  void Stop() {
+    if (callback_) {
+      g_source_remove(timeout_key_);
+    }
+    callback_ = std::function<bool()>();
+  }
+
+ private:
+  static gint Callback(void *self) {
+    return reinterpret_cast<TimeoutCallback *>(self)->Callback();
+  }
+  gint Callback() {
+    auto callback = callback_;
+    if (!callback()) {
+      return FALSE;
+    }
+    return TRUE;
+  }
+  gint timeout_key_;
+  std::function<bool()> callback_;
+};
+
+class BlobLogImageSource : public ImageSource {
+ public:
+  void Init(const std::string &blob_log_filename,
+            DebugFrameworkInterface *interface) override {
+    interface_ = interface;
+    image_source_.reset(new InputFile(blob_log_filename));
+
+    // Tick 25 fps.
+    // TODO(parker): Make this FPS configurable.
+    cb_.Reset(1000 / 25, [this]() { return Tick(); });
+
+    frame_.ReadNext(image_source_.get());
+    interface_->NewBlobList(frame_.blob_list, frame_.fmt);
+    interface_->InstallKeyPress([this](uint32_t keyval) {
+      if (keyval == GDK_KEY_Left) {
+        frame_.ReadPrev(image_source_.get());
+        interface_->NewBlobList(frame_.blob_list, frame_.fmt);
+      } else if (keyval == GDK_KEY_Right) {
+        frame_.ReadNext(image_source_.get());
+        interface_->NewBlobList(frame_.blob_list, frame_.fmt);
+      } else {
+        return;
+      }
+    });
+  }
+
+  bool Tick() {
+    frame_.ReadNext(image_source_.get());
+    interface_->NewBlobList(frame_.blob_list, frame_.fmt);
+    return true;
+  }
+
+  const char *GetHelpMessage() override {
+    return &R"(
+    format_spec is the name of a file in blob list format.
+    This viewer source will stream blobs from the log.
+)"[1];
+  }
+
+ private:
+  TimeoutCallback cb_;
+  DebugFrameworkInterface *interface_ = nullptr;
+  std::unique_ptr<InputFile> image_source_;
+  BlobStreamFrame frame_;
+};
+
+REGISTER_IMAGE_SOURCE("blob_log", BlobLogImageSource);
+
+}  // namespace vision
+}  // namespace aos
diff --git a/aos/vision/debug/camera-source.cc b/aos/vision/debug/camera-source.cc
new file mode 100644
index 0000000..ef48a11
--- /dev/null
+++ b/aos/vision/debug/camera-source.cc
@@ -0,0 +1,72 @@
+#include "aos/vision/debug/debug_framework.h"
+
+#include <gdk/gdk.h>
+#include <fstream>
+#include <string>
+
+#include "aos/vision/image/image_stream.h"
+
+namespace aos {
+namespace vision {
+
+class CameraImageSource : public ImageSource {
+ public:
+  void Init(const std::string &jpeg_list_filename,
+            DebugFrameworkInterface *interface) override {
+    // TODO: Get these params from a config file passed in through the
+    // constructor.
+    camera::CameraParams params = {.width = 640 * 2,
+                                   .height = 480 * 2,
+                                   .exposure = 10,
+                                   .brightness = 128,
+                                   .gain = 0,
+                                   .fps = 30};
+    image_stream_.reset(new ImageStream(jpeg_list_filename, params, interface));
+  }
+
+  const char *GetHelpMessage() override {
+    return &R"(
+    format_spec is filename of the camera device.
+    example: camera:/dev/video0
+    This viewer source will stream video from a usb camera of your choice.
+)"[1];
+  }
+
+  class ImageStream : public ImageStreamEvent {
+   public:
+    ImageStream(const std::string &fname, camera::CameraParams params,
+                DebugFrameworkInterface *interface)
+        : ImageStreamEvent(fname, params), interface_(interface) {
+      interface_->Loop()->Add(this);
+
+      interface_->InstallKeyPress([this](uint32_t keyval) {
+        // Takes a picture when you press 'a'.
+        // TODO(parker): Allow setting directory.
+        if (keyval == GDK_KEY_a) {
+          std::ofstream ofs(
+              std::string("/tmp/out_jpegs/test") + std::to_string(i_) + ".jpg",
+              std::ofstream::out);
+          ofs << prev_data_;
+          ofs.close();
+          ++i_;
+        }
+      });
+    }
+    void ProcessImage(DataRef data, aos::monotonic_clock::time_point) override {
+      prev_data_ = std::string(data);
+      interface_->NewJpeg(data);
+    }
+
+   private:
+    int i_ = 0;
+    std::string prev_data_;
+    DebugFrameworkInterface *interface_;
+  };
+
+  std::unique_ptr<ImageStream> image_stream_;
+};
+
+REGISTER_IMAGE_SOURCE("camera", CameraImageSource);
+
+}  // namespace vision
+}  // namespace aos
diff --git a/aos/vision/debug/debug_framework.cc b/aos/vision/debug/debug_framework.cc
new file mode 100644
index 0000000..99dfd8b
--- /dev/null
+++ b/aos/vision/debug/debug_framework.cc
@@ -0,0 +1,128 @@
+#include "aos/vision/debug/debug_framework.h"
+
+#include <gtk/gtk.h>
+
+#include "aos/common/logging/implementations.h"
+#include "aos/common/logging/logging.h"
+#include "aos/vision/blob/find_blob.h"
+#include "aos/vision/blob/stream_view.h"
+#include "aos/vision/debug/debug_viewer.h"
+#include "aos/vision/events/epoll_events.h"
+#include "aos/vision/image/jpeg_routines.h"
+
+namespace aos {
+namespace vision {
+
+bool DecodeJpeg(aos::vision::DataRef data,
+                aos::vision::BlobStreamViewer *view) {
+  auto fmt = aos::vision::GetFmt(data);
+  auto value = view->img();
+  if (!value.fmt().Equals(fmt)) {
+    view->SetFormatAndClear(fmt);
+  }
+  return aos::vision::ProcessJpeg(data, view->img().data());
+}
+
+class DebugFramework : public DebugFrameworkInterface {
+ public:
+  explicit DebugFramework(FilterHarness *filter) : filter_(filter) {
+    view_.key_press_event = [this](uint32_t keyval) {
+      for (const auto &event : key_press_events()) {
+        event(keyval);
+      }
+    };
+    filter->InstallViewer(&view_);
+  }
+
+  // This the first stage in the pipeline that takes
+  void NewJpeg(DataRef data) override {
+    DecodeJpeg(data, &view_);
+
+    auto fmt = view_.img().fmt();
+    HandleBlobs(FindBlobs(filter_->Threshold(view_.img())), fmt);
+  }
+
+  void NewBlobList(BlobList blob_list, ImageFormat fmt) override {
+    view_.SetFormatAndClear(fmt);
+
+    HandleBlobs(std::move(blob_list), fmt);
+  }
+
+  void HandleBlobs(BlobList blob_list, ImageFormat fmt) {
+    filter_->HandleBlobs(std::move(blob_list), fmt);
+    view_.Redraw();
+  }
+
+  aos::events::EpollLoop *Loop() override { return &loop_; }
+
+ private:
+  FilterHarness *filter_;
+  BlobStreamViewer view_;
+
+  aos::events::EpollLoop loop_;
+};
+
+std::unique_ptr<ImageSource> MakeImageSource(
+    const std::string &image_source_string,
+    DebugFrameworkInterface *interface) {
+  (void)interface;
+  // Each of the image_source strings is of the form format_type:format_spec
+  auto it = image_source_string.find(':');
+  if (it == std::string::npos) {
+    fprintf(stderr, "invalid ImageSource: %s.\n", image_source_string.c_str());
+    exit(-1);
+  }
+  auto image_source_type = image_source_string.substr(0, it);
+  // Get std::function<std::unique_ptr<ImageSource>()> from the registration
+  // factory.
+  const auto &factory = ImageSourceGlobalFactory::Get(image_source_type);
+  if (!factory) {
+    fprintf(stderr, "invalid ImageSource: %s.\n", image_source_string.c_str());
+    exit(-1);
+  }
+  auto result = factory();
+  // Construct the image source.
+  result->Init(image_source_string.substr(it + 1), interface);
+  return result;
+}
+
+const char *kHelpMessage = R"(
+
+image_source is parsed out and selects where to get the images
+from. Each source type has a different configuration format string listed
+below. The colon separates the source specifier and the source config
+parameter. A single command line argument help will print this message.
+)";
+
+void DebugFrameworkMain(int argc, char **argv, FilterHarness *filter) {
+  ::aos::logging::Init();
+  ::aos::logging::AddImplementation(
+      new ::aos::logging::StreamLogImplementation(stdout));
+
+  gtk_init(&argc, &argv);
+
+  // Use fprintf because it is only supposed to be used interactively.
+  // This uses a registration system to pick out the individual file type
+  // registered by REGISTER_IMAGE_SOURCE.
+  // see jpeg_list-source.cc for ane sample of this.
+  if (argc < 2 || argv[1] == std::string("help")) {
+    fprintf(stderr, "Usage %s image_source:format_spec\n", argv[0]);
+    fprintf(stderr, "%s", kHelpMessage);
+    // Iterate through all registered entities in ImageSourceGlobalFactory
+    // and print out their individual help messages.
+    for (const auto &type : ImageSourceGlobalFactory::GetAll()) {
+      fprintf(stderr, "  %s:\n", type.first.c_str());
+      fprintf(stderr, "%s", type.second()->GetHelpMessage());
+    }
+    exit(-1);
+  }
+
+  DebugFramework replay(filter);
+
+  std::unique_ptr<ImageSource> image_source = MakeImageSource(argv[1], &replay);
+
+  replay.Loop()->RunWithGtkMain();
+}
+
+}  // namespace vision
+}  // namespace aos
diff --git a/aos/vision/debug/debug_framework.h b/aos/vision/debug/debug_framework.h
new file mode 100644
index 0000000..2f0fcd1
--- /dev/null
+++ b/aos/vision/debug/debug_framework.h
@@ -0,0 +1,85 @@
+#ifndef _AOS_VISION_DEBUG_DEBUG_FRAMEWORK_H_
+#define _AOS_VISION_DEBUG_DEBUG_FRAMEWORK_H_
+
+#include "aos/common/util/global_factory.h"
+#include "aos/vision/blob/range_image.h"
+#include "aos/vision/events/epoll_events.h"
+#include "aos/vision/image/image_types.h"
+
+namespace aos {
+namespace vision {
+
+class BlobStreamViewer;
+
+// Implement per-filter to draw debug viewer information from the filter to
+// the debug BlobStreamViewer.
+class FilterHarness {
+ public:
+  virtual ~FilterHarness() {}
+
+  // Apply the filter-specific thresholding logic.
+  // Blob sources may not have this called at all.
+  virtual RangeImage Threshold(ImagePtr image) = 0;
+
+  // Each filter can only be used by one debug viewer. This will
+  // get called before calling any other methods.
+  virtual void InstallViewer(BlobStreamViewer * /*viewer*/) {}
+
+  // One frame worth of blobs. Returns if the frame is "interesting".
+  virtual bool HandleBlobs(BlobList imgs, ImageFormat fmt) = 0;
+};
+
+// For ImageSource implementations only. Allows registering key press events
+// and installing new blob lists and jpegs.
+class DebugFrameworkInterface {
+ public:
+  virtual ~DebugFrameworkInterface() {}
+
+  void InstallKeyPress(std::function<void(uint32_t)> key_press_event) {
+    key_press_events_.emplace_back(std::move(key_press_event));
+  }
+
+  virtual void NewJpeg(DataRef data) = 0;
+
+  virtual void NewBlobList(BlobList blob_list, ImageFormat fmt) = 0;
+
+  // Expose a EpollLoop to allow waiting for events.
+  virtual aos::events::EpollLoop *Loop() = 0;
+
+ protected:
+  const std::vector<std::function<void(uint32_t)>> &key_press_events() {
+    return key_press_events_;
+  }
+
+ private:
+  std::vector<std::function<void(uint32_t)>> key_press_events_;
+};
+
+// Implemented by each source type. Will stream frames to
+// DebugFrameworkInterface.
+class ImageSource {
+ public:
+  virtual ~ImageSource() {}
+
+  // Printed when you call: debug_viewer help.
+  virtual const char *GetHelpMessage() { return "    No help string :(\n"; }
+
+  // Start streaming frames to DebugFrameworkInterface.
+  virtual void Init(const std::string &args,
+                    DebugFrameworkInterface *interface) = 0;
+};
+
+// Factory for ImageSource.
+SETUP_FACTORY(ImageSource);
+
+#define REGISTER_IMAGE_SOURCE(key, SubClass) \
+  REGISTER_SUBCLASS_BY_KEY(key, ::aos::vision::ImageSource, SubClass)
+
+// Runs loop and never returns.
+// Feeds into a generic filter.
+void DebugFrameworkMain(int argc, char **argv, FilterHarness *filter);
+
+}  // namespace vision
+}  // namespace aos
+
+#endif  // _AOS_VISION_DEBUG_DEBUG_FRAMEWORK_H_
diff --git a/aos/vision/debug/jpeg_list-source.cc b/aos/vision/debug/jpeg_list-source.cc
new file mode 100644
index 0000000..2dbbe44
--- /dev/null
+++ b/aos/vision/debug/jpeg_list-source.cc
@@ -0,0 +1,108 @@
+#include "aos/vision/debug/debug_framework.h"
+
+#include <gdk/gdk.h>
+#include <fstream>
+#include <string>
+
+namespace aos {
+namespace vision {
+
+namespace {
+std::string GetFileContents(const std::string &filename) {
+  std::ifstream in(filename, std::ios::in | std::ios::binary);
+  if (in) {
+    std::string contents;
+    in.seekg(0, std::ios::end);
+    contents.resize(in.tellg());
+    in.seekg(0, std::ios::beg);
+    in.read(&contents[0], contents.size());
+    in.close();
+    return (contents);
+  }
+  fprintf(stderr, "Could not read file: %s\n", filename.c_str());
+  exit(-1);
+}
+
+std::vector<std::string> Split(DataRef inp, char delim) {
+  size_t i = 0;
+  std::vector<size_t> pos;
+  while (i < inp.size()) {
+    i = inp.find(delim, i);
+    if (i == std::string::npos) break;
+    // fprintf(stderr, "k=%d, i=%d\n", k, (int)i);
+    pos.emplace_back(i);
+    i = i + 1;
+  }
+  std::vector<std::string> res;
+  res.reserve(pos.size() + 1);
+  i = 0;
+  for (auto p : pos) {
+    res.emplace_back(inp.substr(i, p - i).to_string());
+    i = p + 1;
+  }
+  res.emplace_back(inp.substr(i).to_string());
+  return res;
+}
+}  // namespace
+
+class JpegListImageSource : public ImageSource {
+ public:
+  void Init(const std::string &jpeg_list_filename,
+            DebugFrameworkInterface *interface) override {
+    interface_ = interface;
+    auto contents = GetFileContents(jpeg_list_filename);
+
+    std::string basename;
+    auto it = jpeg_list_filename.find_last_of('/');
+    if (it != std::string::npos) {
+      basename = jpeg_list_filename.substr(0, it + 1);
+    }
+
+    for (const auto &jpeg_filename : Split(contents, '\n')) {
+      [&]() {
+        if (jpeg_filename.empty()) return;
+        for (std::size_t i = 0; i < jpeg_filename.size(); ++i) {
+          if (jpeg_filename[i] == '#') return;
+          if (jpeg_filename[i] != ' ') break;
+        }
+        if (jpeg_filename[0] == '/') {
+          images_.emplace_back(GetFileContents(jpeg_filename));
+        } else {
+          images_.emplace_back(GetFileContents(basename + jpeg_filename));
+        }
+      }();
+    }
+    fprintf(stderr, "loaded %lu items\n", images_.size());
+    if (!images_.empty()) {
+      interface_->NewJpeg(images_[idx_]);
+      interface_->InstallKeyPress([this](uint32_t keyval) {
+        if (keyval == GDK_KEY_Left && idx_ > 0) {
+          --idx_;
+        } else if (keyval == GDK_KEY_Right && idx_ < images_.size()) {
+          idx_ = (idx_ + 1) % images_.size();
+        } else {
+          return;
+        }
+        interface_->NewJpeg(images_[idx_]);
+      });
+    }
+  }
+
+  const char *GetHelpMessage() override {
+    return &R"(
+    format_spec is the name of a file with each jpeg filename on a new line.
+    This viewer source will load each jpeg individually and cycle through them
+    with the arrow keys.
+)"[1];
+  }
+
+ private:
+  DebugFrameworkInterface *interface_ = nullptr;
+  std::vector<std::string> images_;
+  size_t idx_ = 0;
+};
+
+REGISTER_IMAGE_SOURCE("jpeg_list", JpegListImageSource);
+
+}  // namespace vision
+}  // namespace aos
diff --git a/aos/vision/debug/tcp-source.cc b/aos/vision/debug/tcp-source.cc
new file mode 100644
index 0000000..28feff3
--- /dev/null
+++ b/aos/vision/debug/tcp-source.cc
@@ -0,0 +1,137 @@
+#include "aos/vision/debug/debug_framework.h"
+
+#include <gdk/gdk.h>
+#include <gtk/gtk.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <cstdlib>
+#include <fstream>
+#include <functional>
+#include <string>
+
+#include "aos/vision/blob/codec.h"
+#include "aos/vision/events/tcp_client.h"
+
+namespace aos {
+namespace vision {
+
+class BufferedLengthDelimReader {
+ public:
+  union data_len {
+    uint32_t len;
+    char buf[4];
+  };
+  BufferedLengthDelimReader() {
+    num_read_ = 0;
+    img_read_ = -1;
+  }
+  template <typename Lamb>
+  void up(int fd, Lamb lam) {
+    ssize_t count;
+    if (img_read_ < 0) {
+      count = read(fd, &len_.buf[num_read_], sizeof(len_.buf) - num_read_);
+      if (count < 0) return;
+      num_read_ += count;
+      if (num_read_ < 4) return;
+      num_read_ = 0;
+      img_read_ = 0;
+      data_.clear();
+      if (len_.len > 200000) {
+        printf("bad size: %d\n", len_.len);
+        exit(-1);
+      }
+      data_.resize(len_.len);
+    } else {
+      count = read(fd, &data_[img_read_], len_.len - img_read_);
+      if (count < 0) return;
+      img_read_ += count;
+      if (img_read_ < (int)len_.len) return;
+      lam(DataRef{&data_[0], len_.len});
+      img_read_ = -1;
+    }
+  }
+
+ private:
+  data_len len_;
+  int num_read_;
+  std::vector<char> data_;
+  int img_read_;
+};
+
+bool ParsePort(const std::string &port, int *portno) {
+  if (port.empty()) return false;
+  int value = 0;
+  if (port[0] == '0') return false;
+  for (char item : port) {
+    if (item < '0' || item > '9') return false;
+    value = value * 10 + (item - '0');
+  }
+  *portno = value;
+  return true;
+}
+
+class TCPImageSource : public ImageSource {
+ public:
+  class Impl : public aos::events::TcpClient {
+   public:
+    Impl(const std::string &hostname, int portno,
+         DebugFrameworkInterface *interface)
+        : aos::events::TcpClient(hostname.c_str(), portno), interface_(interface) {}
+
+    void ReadEvent() override {
+      read_.up(fd(), [&](DataRef data) {
+        BlobList blobl;
+        const char *buf = data.data();
+        buf += sizeof(uint32_t);
+
+        ImageFormat fmt;
+        Int64Codec::Read(&buf);
+        fmt.w = Int32Codec::Read(&buf);
+        fmt.h = Int32Codec::Read(&buf);
+        buf = ParseBlobList(&blobl, buf);
+        interface_->NewBlobList(blobl, fmt);
+      });
+    }
+
+    BufferedLengthDelimReader read_;
+    DebugFrameworkInterface *interface_ = nullptr;
+  };
+
+  void Init(const std::string &addr_and_port,
+            DebugFrameworkInterface *interface) override {
+    auto it = addr_and_port.rfind(':');
+    if (it == std::string::npos) {
+      fprintf(stderr, "usage is: tcp:hostname:port\n");
+      exit(-1);
+    }
+    auto hostname = addr_and_port.substr(0, it);
+    auto port = addr_and_port.substr(it + 1);
+    int portno = 0;
+    if (!ParsePort(port, &portno)) {
+      fprintf(stderr, "usage is: tcp:hostname:port\n");
+      exit(-1);
+    }
+
+    impl_.reset(new Impl(hostname, portno, interface));
+
+    interface->Loop()->Add(impl_.get());
+
+    interface->InstallKeyPress([this](uint32_t /*keyval*/) {});
+  }
+
+  const char *GetHelpMessage() override {
+    return &R"(
+    format_spec is in ipaddr:port format.
+    This viewer soure connects to a target_sender binary and views the live
+    blob-stream.
+)"[1];
+  }
+
+ private:
+  std::unique_ptr<Impl> impl_;
+};
+
+REGISTER_IMAGE_SOURCE("tcp", TCPImageSource);
+
+}  // namespace vision
+}  // namespace aos