started cleanup up the socket mess

removed unused #include + dependency

more formatting fixes + fixed users of ReceiveSocket

cleaned more stuff up (converted from references to pointers is one)

wip. started rewriting everything, not quite finished

got everything except SensorOutput done (I think...)

got everything compiling except for missing SensorReceiver

worked on implementing the logic. didn't finish

made everything compile and finished implementing SensorReceiver

pulling over Austin's mock time stuff

added IncrementMockTime

finished up and started on tests

remembered something else
diff --git a/aos/common/common.gyp b/aos/common/common.gyp
index 8dd1370..95b7fd6 100644
--- a/aos/common/common.gyp
+++ b/aos/common/common.gyp
@@ -83,9 +83,11 @@
       ],
       'dependencies': [
         '<(AOS)/common/common.gyp:common',
+        'time',
       ],
       'export_dependent_settings': [
         '<(AOS)/common/common.gyp:common',
+        'time',
       ],
     },
     {
@@ -142,11 +144,13 @@
         '<(AOS)/common/messages/messages.gyp:aos_queues',
         '<(AOS)/build/aos.gyp:logging',
         'timing',
+        'time',
       ],
       'export_dependent_settings': [
         '<(AOS)/common/messages/messages.gyp:aos_queues',
         '<(AOS)/build/aos.gyp:logging',
         'timing',
+        'time',
       ],
     },
     {
diff --git a/aos/common/control_loop/ControlLoop-tmpl.h b/aos/common/control_loop/ControlLoop-tmpl.h
index 80d592a..a7289ea 100644
--- a/aos/common/control_loop/ControlLoop-tmpl.h
+++ b/aos/common/control_loop/ControlLoop-tmpl.h
@@ -113,7 +113,7 @@
 template <class T, bool has_position>
 void ControlLoop<T, has_position>::Run() {
   while (true) {
-    ::aos::time::PhasedLoop10MS(0);
+    ::aos::time::PhasedLoopXMS(kLoopFrequency.ToMSec(), 0);
     Iterate();
   }
 }
diff --git a/aos/common/control_loop/ControlLoop.h b/aos/common/control_loop/ControlLoop.h
index e866bb7..0799c05 100644
--- a/aos/common/control_loop/ControlLoop.h
+++ b/aos/common/control_loop/ControlLoop.h
@@ -7,6 +7,7 @@
 #include "aos/common/control_loop/Timing.h"
 #include "aos/common/type_traits.h"
 #include "aos/common/queue.h"
+#include "aos/common/time.h"
 
 namespace aos {
 namespace control_loops {
@@ -36,6 +37,9 @@
   virtual uint32_t UniqueID() = 0;
 };
 
+// Control loops run this often, "starting" at time 0.
+const time::Time kLoopFrequency = time::Time::InSeconds(0.01);
+
 // Provides helper methods to assist in writing control loops.
 // This template expects to be constructed with a queue group as an argument
 // that has a goal, position, status, and output queue.
diff --git a/aos/common/input/SensorInput-tmpl.h b/aos/common/input/SensorInput-tmpl.h
deleted file mode 100644
index 667d2e8..0000000
--- a/aos/common/input/SensorInput-tmpl.h
+++ /dev/null
@@ -1,41 +0,0 @@
-#ifndef __VXWORKS__
-#include "aos/common/network/ReceiveSocket.h"
-#include "aos/common/Configuration.h"
-#endif
-#include "aos/common/logging/logging.h"
-
-namespace aos {
-
-#ifdef __VXWORKS__
-template<class Values> SEM_ID SensorInput<Values>::lock_ = semBCreate(SEM_Q_PRIORITY, SEM_FULL);
-template<class Values> std::vector<SensorInput<Values> *> SensorInput<Values>::running_;
-#endif
-template<class Values> void SensorInput<Values>::Run() {
-#ifndef __VXWORKS__
-	ReceiveSocket sock(NetworkPort::kSensors);
-  Values values;
-	while (true) {
-		if (sock.Recv(&values, sizeof(values)) == -1) {
-      LOG(WARNING, "socket receive failed\n");
-      continue;
-    }
-    RunIteration(values);
-	}
-#else
-  semTake(lock_, WAIT_FOREVER);
-  running_.push_back(this);
-  semGive(lock_);
-#endif
-}
-
-#ifdef __VXWORKS__
-template<class Values> void SensorInput<Values>::RunIterationAll(Values &vals) {
-  semTake(lock_, WAIT_FOREVER);
-  for (auto it = running_.begin(); it != running_.end(); ++it) {
-    (*it)->RunIteration(vals);
-  }
-  semGive(lock_);
-}
-#endif
-
-} // namespace aos
diff --git a/aos/common/input/SensorInput.h b/aos/common/input/SensorInput.h
deleted file mode 100644
index fa99bab..0000000
--- a/aos/common/input/SensorInput.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#ifndef AOS_INPUT_SENSOR_INPUT_H_
-#define AOS_INPUT_SENSOR_INPUT_H_
-
-#ifdef __VXWORKS__
-#include <vector>
-#include <semLib.h>
-#endif
-
-namespace aos {
-
-// Class for implementing code that takes information from a sensor struct and
-// places it into queues. Subclasses should be compiled for both the atom and
-// the crio to support crio control loops.
-template<class Values> class SensorInput {
- protected:
-  virtual void RunIteration(Values &values) = 0;
- public:
-  // Enters an infinite loop that reads values and calls RunIteration.
-  void Run();
-
-#ifdef __VXWORKS__
-  // Calls RunIteration on all instances with values.
-  static void RunIterationAll(Values &values);
- private:
-  static SEM_ID lock_;
-  static std::vector<SensorInput *> running_;
-#endif
-};
-
-} // namespace aos
-
-#include "SensorInput-tmpl.h"
-
-#endif
-
diff --git a/aos/common/network/SendSocket.h b/aos/common/network/SendSocket.h
index 27021e0..a0c1fe2 100644
--- a/aos/common/network/SendSocket.h
+++ b/aos/common/network/SendSocket.h
@@ -7,6 +7,7 @@
 
 class SendSocket : public Socket {
  public:
+  //inline int Send(const void *buf, int length) { return Socket::Send(buf, length); }
   // Connect must be called before use.
   SendSocket() {}
   // Calls Connect automatically.
diff --git a/aos/common/network/Socket.cpp b/aos/common/network/Socket.cpp
index 1ffc775..0366e7c 100644
--- a/aos/common/network/Socket.cpp
+++ b/aos/common/network/Socket.cpp
@@ -32,7 +32,9 @@
 
   return last_ret_ = 0;
 }
+
 Socket::Socket() : socket_(-1), last_ret_(2) {}
+
 Socket::~Socket() {
   close(socket_);
 }
@@ -45,28 +47,31 @@
   last_ret_ = 0;
 }
 
-int Socket::Recv(void *buf, int length) {
+int Socket::Receive(void *buf, int length) {
   const int ret = recv(socket_, static_cast<char *>(buf), length, 0);
   last_ret_ = (ret == -1) ? -1 : 0;
   return ret;
 }
-int Socket::Recv(void *buf, int length, long usec) {
-	timeval tv;
-	tv.tv_sec = 0;
-	tv.tv_usec = usec;
-	fd_set fds;
-	FD_ZERO(&fds);
-	FD_SET(socket_, &fds);
-	switch (select(FD_SETSIZE, &fds, NULL, NULL, &tv)) {
-		case 1:
-      return Recv(buf, length);
-		case 0:
-			return last_ret_ = 0;
-		default:
-			perror("select on socket to receive from failed");
-			return last_ret_ = -1;
-	}
+
+int Socket::Receive(void *buf, int length, time::Time timeout) {
+  timeval timeout_timeval = timeout.ToTimeval();
+  fd_set fds;
+  FD_ZERO(&fds);
+  FD_SET(socket_, &fds);
+  switch (select(FD_SETSIZE, &fds, NULL, NULL, &timeout_timeval)) {
+    case 1:
+      return Receive(buf, length);
+    case 0:
+      return last_ret_ = 0;
+    default:
+      if (errno == EINTR) {
+        return last_ret_ = 0;
+      }
+      LOG(FATAL, "select(FD_SETSIZE, %p, NULL, NULL, %p) failed with %d: %s\n",
+          &fds, &timeout_timeval, errno, strerror(errno));
+  }
 }
+
 int Socket::Send(const void *buf, int length) {
   const int ret = write(socket_,
                         lame_unconst(static_cast<const char *>(buf)), length);
diff --git a/aos/common/network/Socket.h b/aos/common/network/Socket.h
index b6d2c70..68fd32c 100644
--- a/aos/common/network/Socket.h
+++ b/aos/common/network/Socket.h
@@ -18,9 +18,21 @@
  public:
   int LastStatus() const { return last_ret_; }
 
-	int Send(const void *buf, int length);
-	int Recv(void *buf, int length);
-	int Recv(void *buf, int length, long usec); // returns 0 if timed out
+  int Send(const void *buf, int length);
+
+  // buf is where to put the data and length is the maximum amount of data to
+  // put in for all overloads.
+  // All overloads return how many bytes were received or -1 for error. 0 is a
+  // valid return value for all overloads.
+  // No timeout.
+  int Receive(void *buf, int length);
+  // DEPRECATED(brians): use the time::Time overload instead
+  int Receive(void *buf, int length, long usec_timeout) {
+    return Receive(buf, length, time::Time::InUS(usec_timeout));
+  }
+  // timeout is relative
+  int Receive(void *buf, int length, time::Time timeout);
+
  protected:
   int Connect(NetworkPort port, const char *address, int type = SOCK_DGRAM);
   Socket();
diff --git a/aos/common/queue.h b/aos/common/queue.h
index 7b38e67..e93c655 100644
--- a/aos/common/queue.h
+++ b/aos/common/queue.h
@@ -9,7 +9,7 @@
 #undef USE_UNSAFE
 #endif
 
-#include "aos/aos_core.h"
+#include "aos/common/time.h"
 #include "aos/common/macros.h"
 #ifndef USE_UNSAFE
 #include "aos/atom_code/ipc_lib/queue.h"
diff --git a/aos/common/sensors/sensor_broadcaster-tmpl.h b/aos/common/sensors/sensor_broadcaster-tmpl.h
new file mode 100644
index 0000000..fa3f2fd
--- /dev/null
+++ b/aos/common/sensors/sensor_broadcaster-tmpl.h
@@ -0,0 +1,52 @@
+#include "aos/common/Configuration.h"
+
+namespace aos {
+namespace sensors {
+
+template<class Values>
+SensorBroadcaster<Values>::SensorBroadcaster(
+    SensorPackerInterface<Values> *packer)
+    : packer_(packer),
+      notifier_(StaticNotify, this),
+      socket_(NetworkPort::kSensors,
+              configuration::GetIPAddress(
+                  configuration::NetworkDevice::kAtom)),
+      crio_control_loop_runner_(NULL) {
+  static_assert(shm_ok<SensorData<Values>>::value,
+                "it is going to get sent over a socket");
+  data_.count = 0;
+}
+
+template<class Values>
+void SensorBroadcaster<Values>::Start() {
+  notifier_.StartPeriodic(kSensorSendFrequency);
+  if (!notifier_.IsExact()) {
+    LOG(FATAL, "bad choice for kSensorSendFrequency\n");
+  }
+}
+
+template<class Values>
+void SensorBroadcaster<Values>::RegisterControlLoopRunner(
+    SensorSinkInterface<Values> *crio_control_loop_runner) {
+  if (crio_control_loop_runner_ != NULL) {
+    LOG(FATAL, "trying to register loop runner %p but already have %p\n",
+    crio_control_loop_runner, crio_control_loop_runner_);
+  }
+  crio_control_loop_runner_ = crio_control_loop_runner;
+}
+
+template<class Values>
+void SensorBroadcaster<Values>::Notify() {
+  packer_->PackInto(&data_.values);
+  socket_.Send(&data_, sizeof(data_));
+  ++data_.count;
+
+  if ((data_.count % kSendsPerCycle) == 0) {
+    if (crio_control_loop_runner_ != NULL) {
+      crio_control_loop_runner_->Process(&data_);
+    }
+  }
+}
+
+}  // namespace sensors
+}  // namespace aos
diff --git a/aos/common/sensors/sensor_broadcaster.h b/aos/common/sensors/sensor_broadcaster.h
new file mode 100644
index 0000000..1e61208
--- /dev/null
+++ b/aos/common/sensors/sensor_broadcaster.h
@@ -0,0 +1,57 @@
+#ifndef AOS_COMMON_SENSORS_SENSOR_BROADCASTER_H_
+#define AOS_COMMON_SENSORS_SENSOR_BROADCASTER_H_
+
+#include "aos/crio/shared_libs/interrupt_bridge.h"
+#include "aos/common/network/SendSocket.h"
+#include "aos/common/sensors/sensors.h"
+#include "aos/common/sensors/sensor_packer.h"
+#include "aos/common/sensors/sensor_sink.h"
+#include "aos/common/macros.h"
+
+namespace aos {
+namespace crio {
+
+template<class Values>
+class CRIOControlLoopRunner;
+
+}  // namespace crio
+namespace sensors {
+
+// A class that handles sending sensor values to the atom.
+// See sensors.h for an overview of where this fits in.
+template<class Values>
+class SensorBroadcaster {
+ public:
+  // Does not take ownership of packer.
+  SensorBroadcaster(SensorPackerInterface<Values> *packer);
+
+  void Start();
+
+ private:
+  // So that it can access RegisterControlLoopRunner.
+  friend class crio::CRIOControlLoopRunner<Values>;
+
+  // Registers an object to get run on control loop timing. Only designed to be
+  // called by crio::CRIOControlLoopRunner.
+  // Does not take ownership of crio_control_loop_runner.
+  void RegisterControlLoopRunner(
+      SensorSinkInterface<Values> *crio_control_loop_runner);
+
+  static void StaticNotify(SensorBroadcaster<Values> *self) { self->Notify(); }
+  void Notify();
+
+  SensorPackerInterface<Values> *const packer_;
+  crio::WDInterruptNotifier<SensorBroadcaster<Values>> notifier_;
+  SendSocket socket_;
+  SensorData<Values> data_;
+  SensorSinkInterface<Values> *crio_control_loop_runner_;
+
+  DISALLOW_COPY_AND_ASSIGN(SensorBroadcaster<Values>);
+};
+
+}  // namespace sensors
+}  // namespace aos
+
+#include "sensor_broadcaster-tmpl.h"
+
+#endif  // AOS_COMMON_SENSORS_SENSOR_BROADCASTER_H_
diff --git a/aos/common/sensors/sensor_packer.h b/aos/common/sensors/sensor_packer.h
new file mode 100644
index 0000000..cfcf045
--- /dev/null
+++ b/aos/common/sensors/sensor_packer.h
@@ -0,0 +1,22 @@
+#ifndef AOS_COMMON_SENSORS_SENSOR_PACKER_H_
+#define AOS_COMMON_SENSORS_SENSOR_PACKER_H_
+
+namespace aos {
+namespace sensors {
+
+// An interface that handles reading input data and putting it into the sensor
+// values struct.
+// See sensors.h for an overview of where this fits in.
+template<class Values>
+class SensorPackerInterface {
+ public:
+  virtual ~SensorPackerInterface() {}
+
+  // Reads the inputs (from WPILib etc) and writes the data into *values.
+  virtual void PackInto(Values *values) = 0;
+};
+
+}  // namespace sensors
+}  // namespace aos
+
+#endif  // AOS_COMMON_SENSORS_SENSOR_PACKER_H_
diff --git a/aos/common/sensors/sensor_receiver-tmpl.h b/aos/common/sensors/sensor_receiver-tmpl.h
new file mode 100644
index 0000000..9d37b8c
--- /dev/null
+++ b/aos/common/sensors/sensor_receiver-tmpl.h
@@ -0,0 +1,154 @@
+#include "aos/common/Configuration.h"
+#include "aos/common/inttypes.h"
+
+namespace aos {
+namespace sensors {
+
+template<class Values>
+const time::Time SensorReceiver<Values>::kJitterDelay =
+    time::Time::InSeconds(0.0025);
+
+template<class Values>
+SensorReceiver<Values>::SensorReceiver(
+    SensorUnpackerInterface<Values> *unpacker)
+    : unpacker_(unpacker),
+      synchronized_(false) {}
+
+template<class Values>
+void SensorReceiver<Values>::RunIteration() {
+  if (synchronized_) {
+    if (ReceiveData()) {
+      LOG(DEBUG, "receive said to try a reset\n");
+      synchronized_ = false;
+      return;
+    }
+    if (GoodPacket()) {
+      unpacker_->UnpackFrom(&data_.values);
+    }
+  } else {
+    LOG(INFO, "resetting to try receiving data\n");
+    Reset();
+    if (Synchronize()) {
+      LOG(INFO, "synchronized successfully\n");
+      synchronized_ = true;
+    }
+  }
+}
+
+template<class Values>
+bool SensorReceiver<Values>::GoodPacket() {
+  // If it's a multiple of kSensorSendFrequency from start_count_.
+  if (((data_.count - start_count_) % kSendsPerCycle) == 0) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
+// Looks for when the timestamps transition from before where we want to after
+// and then picks whichever one was closer. After that, reads kTestCycles and
+// makes sure that at most 1 is bad.
+template<class Values>
+bool SensorReceiver<Values>::Synchronize() {
+  time::Time old_received_time(0, 0);
+  time::Time start_time = time::Time::Now();
+  // When we want to send out the next set of values.
+  time::Time goal_time = (start_time / kLoopFrequency.ToNSec()) *
+      kLoopFrequency.ToNSec() +
+      kLoopFrequency - kJitterDelay;
+  while (true) {
+    if (ReceiveData()) return false;
+    time::Time received_time = time::Time::Now();
+    if (received_time > goal_time) {
+      // If this was the very first one we got, try again.
+      if (old_received_time == time::Time(0, 0)) return false;
+
+      assert(old_received_time < goal_time);
+
+      // If the most recent one is closer than the last one.
+      if ((received_time - goal_time).abs() <
+          (old_received_time - goal_time).abs()) {
+        start_count_ = data_.count;
+      } else {
+        start_count_ = data_.count - 1;
+      }
+
+      int bad_count = 0;
+      for (int i = 0; i < kTestCycles;) {
+        ReceiveData();
+        received_time = time::Time::Now();
+        if (GoodPacket()) {
+          LOG(DEBUG, "checking packet count=%"PRId32
+              " received at %"PRId32"s%"PRId32"ns\n",
+              data_.count, received_time.sec(), received_time.nsec());
+          // If |the difference between the goal time for this numbered packet
+          // and the time we actually got this one| is too big.
+          if (((goal_time +
+                kSensorSendFrequency * (data_.count - start_count_)) -
+               received_time).abs() > kSensorSendFrequency) {
+            LOG(INFO, "rejected time of the last good packet\n");
+            ++bad_count;
+          }
+          ++i;
+        }
+        if (bad_count > 1) {
+          LOG(WARNING, "got multiple packets with bad timestamps\n");
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    old_received_time = received_time;
+  }
+}
+
+template<class Values>
+bool SensorReceiver<Values>::ReceiveData() {
+  int old_count = data_.count;
+  DoReceiveData();
+  if (data_.count < 0) {
+    LOG(FATAL, "data count overflowed. currently %"PRId32"\n", data_.count);
+  }
+  if (data_.count < old_count) {
+    LOG(INFO, "count reset. ws %"PRId32", now %"PRId32"\n",
+        old_count, data_.count);
+    return true;
+  }
+  return false;
+}
+
+template<class Values>
+const time::Time NetworkSensorReceiver<Values>::kWarmupTime =
+    time::Time::InSeconds(0.125);
+
+template<class Values>
+NetworkSensorReceiver<Values>::NetworkSensorReceiver(
+    SensorUnpackerInterface<Values> *unpacker)
+    : SensorReceiver<Values>(unpacker),
+      socket_(NetworkPort::kSensors) {}
+
+template<class Values>
+void NetworkSensorReceiver<Values>::Reset() {
+  LOG(INFO, "beginning warm up\n");
+  time::Time start = time::Time::Now();
+  while ((time::Time::Now() - start) < kWarmupTime) {
+    socket_.Receive(this->data(), sizeof(*this->data()));
+  }
+  LOG(INFO, "done warming up\n");
+}
+
+template<class Values>
+void NetworkSensorReceiver<Values>::DoReceiveData() {
+  while (true) {
+    if (socket_.Receive(this->data(), sizeof(*this->data())) ==
+        sizeof(*this->data())) {
+      return;
+    }
+    LOG(WARNING, "received incorrect amount of data\n");
+  }
+}
+
+}  // namespace sensors
+}  // namespace aos
diff --git a/aos/common/sensors/sensor_receiver.h b/aos/common/sensors/sensor_receiver.h
new file mode 100644
index 0000000..1fe3003
--- /dev/null
+++ b/aos/common/sensors/sensor_receiver.h
@@ -0,0 +1,98 @@
+#ifndef AOS_COMMON_SENSORS_SENSOR_RECEIVER_H_
+#define AOS_COMMON_SENSORS_SENSOR_RECEIVER_H_
+
+#include "aos/common/sensors/sensor_unpacker.h"
+#include "aos/common/network/ReceiveSocket.h"
+#include "aos/common/sensors/sensors.h"
+#include "aos/common/time.h"
+#include "aos/common/gtest_prod.h"
+
+namespace aos {
+namespace sensors {
+namespace testing {
+
+FORWARD_DECLARE_TEST_CASE(SensorReceiverTest, Simple);
+
+}  // namespace testing
+
+// A class that handles receiving sensor values from the cRIO.
+// See sensors.h for an overview of where this fits in.
+//
+// Abstract class to make testing the complex logic for choosing which data to
+// use easier.
+template<class Values>
+class SensorReceiver {
+ public:
+  // Does not take ownership of unpacker.
+  SensorReceiver(SensorUnpackerInterface<Values> *unpacker);
+
+  void RunIteration();
+
+ protected:
+  SensorData<Values> *data() { return &data_; }
+
+ private:
+  // How long before the control loops run to aim for receiving sensor data (to
+  // prevent jitter if some packets arrive up to this much later).
+  static const time::Time kJitterDelay;
+  // How many cycles not to send data out to make sure that we're in phase
+  // (during this time, the code verifies that <= 1 cycle is not within 1
+  // cycle's time of kJitterDelay).
+  static const int kTestCycles = 10;
+
+  FRIEND_TEST_NAMESPACE(SensorReceiverTest, Simple, testing);
+
+  // Subclasses need to implement this to read 1 set of data (blocking until it
+  // is available) into data().
+  virtual void DoReceiveData() = 0;
+
+  // Optional: if subclasses can do anything to reinitialize after there are
+  // problems, they should do it here.
+  // This will be called right before calling DoReceiveData() the first time.
+  virtual void Reset() {}
+
+  // Returns whether the current packet looks like a good one to use.
+  bool GoodPacket();
+
+  // Synchronizes with incoming packets and sets start_count_ to where we
+  // started reading.
+  // Returns whether it succeeded in locking on.
+  bool Synchronize();
+  // Receives a set of values and makes sure that it's sane.
+  // Returns whether to start over again with timing.
+  bool ReceiveData();
+
+  SensorUnpackerInterface<Values> *const unpacker_;
+  SensorData<Values> data_;
+  // The count that we started out (all other sent packets will be multiples of
+  // this).
+  int32_t start_count_;
+  bool synchronized_;
+
+  DISALLOW_COPY_AND_ASSIGN(SensorReceiver<Values>);
+};
+
+// A SensorReceiver that receives data from a SensorBroadcaster.
+template<class Values>
+class NetworkSensorReceiver : public SensorReceiver<Values> {
+ public:
+  NetworkSensorReceiver(SensorUnpackerInterface<Values> *unpacker);
+
+ private:
+  // How long to read data as fast as possible for (to clear out buffers etc).
+  static const time::Time kWarmupTime;
+
+  virtual void DoReceiveData();
+  virtual void Reset();
+
+  ReceiveSocket socket_;
+
+  DISALLOW_COPY_AND_ASSIGN(NetworkSensorReceiver<Values>);
+};
+
+}  // namespace sensors
+}  // namespace aos
+
+#include "aos/common/sensors/sensor_receiver-tmpl.h"
+
+#endif  // AOS_COMMON_SENSORS_SENSOR_RECEIVER_H_
diff --git a/aos/common/sensors/sensor_receiver_test.cc b/aos/common/sensors/sensor_receiver_test.cc
new file mode 100644
index 0000000..5a66432
--- /dev/null
+++ b/aos/common/sensors/sensor_receiver_test.cc
@@ -0,0 +1,84 @@
+#include "aos/common/sensors/sensor_receiver.h"
+
+#include "gtest/gtest.h"
+
+#include "aos/common/sensors/sensors.h"
+#include "aos/common/time.h"
+#include "aos/common/queue_testutils.h"
+
+using ::aos::time::Time;
+
+namespace aos {
+namespace sensors {
+namespace testing {
+
+struct TestValues {
+  int count;
+  int more_data;
+};
+class TestSensorReceiver : public SensorReceiver<TestValues>,
+    public SensorUnpackerInterface<TestValues> {
+ public:
+  TestSensorReceiver()
+      : SensorReceiver<TestValues>(this),
+        resets_(0),
+        unpacks_(0) {
+    data()->count = 0;
+  }
+
+  void Reset() {
+    LOG(DEBUG, "reset for the %dth time\n", ++resets_);
+  }
+  void DoReceiveData() {
+    last_received_count_ = ++data()->count;
+    data()->values.count = last_received_count_;
+    Time::IncrementMockTime(kSensorSendFrequency);
+  }
+
+  int resets() { return resets_; }
+  int unpacks() { return unpacks_; }
+  using SensorReceiver<TestValues>::data;
+
+  void UnpackFrom(TestValues *data) {
+    // Make sure that it didn't lost one that we gave it.
+    EXPECT_EQ(last_received_count_, data->count);
+    ++unpacks_;
+  }
+ 
+ private:
+  int resets_;
+  int unpacks_;
+  int last_received_count_;
+};
+
+class SensorReceiverTest : public ::testing::Test {
+ protected:
+  SensorReceiverTest() {
+    ::aos::common::testing::EnableTestLogging();
+    Time::EnableMockTime(Time(971, 254));
+  }
+
+  TestSensorReceiver &receiver() { return receiver_; }
+
+ private:
+  TestSensorReceiver receiver_;
+};
+
+TEST_F(SensorReceiverTest, Simple) {
+  static const int kIterations = 53;
+  for (int i = 0; i < kIterations; ++i) {
+    receiver().RunIteration();
+  }
+  EXPECT_EQ(1, receiver().resets());
+  // expected value is kIterations/kSendsPerCycle (rounded up) (the number of
+  // times that it should get a good one) - 1 (to compensate for the iteration
+  // when it synced itself up)
+  EXPECT_EQ((kIterations + kSendsPerCycle - 1) / kSendsPerCycle - 1,
+            receiver().unpacks());
+}
+
+// TODO(brians) finish writing tests
+
+}  // namespace testing
+}  // namespace sensors
+}  // namespace aos
diff --git a/aos/common/sensors/sensor_sink.h b/aos/common/sensors/sensor_sink.h
new file mode 100644
index 0000000..81bc491
--- /dev/null
+++ b/aos/common/sensors/sensor_sink.h
@@ -0,0 +1,21 @@
+#ifndef AOS_COMMON_SENSORS_SENSOR_SINK_H_
+#define AOS_COMMON_SENSORS_SENSOR_SINK_H_
+
+#include "aos/common/sensors/sensors.h"
+
+namespace aos {
+namespace sensors {
+
+// Generic class for something that can do something with sensor data.
+template<class Values>
+class SensorSinkInterface {
+ public:
+  virtual ~SensorSinkInterface() {}
+
+  void Process(SensorData<Values> *data);
+};
+
+}  // namespace sensors
+}  // namespace aos
+
+#endif  // AOS_COMMON_SENSORS_SENSOR_SINK_H_
diff --git a/aos/common/sensors/sensor_unpacker.h b/aos/common/sensors/sensor_unpacker.h
new file mode 100644
index 0000000..b19e0c6
--- /dev/null
+++ b/aos/common/sensors/sensor_unpacker.h
@@ -0,0 +1,22 @@
+#ifndef AOS_COMMON_SENSORS_SENSOR_UNPACKER_H_
+#define AOS_COMMON_SENSORS_SENSOR_UNPACKER_H_
+
+namespace aos {
+namespace sensors {
+
+// An interface that handles taking data from the sensor Values struct and
+// putting it into queues (for control loops etc).
+// See sensors.h for an overview of where this fits in.
+template<class Values>
+class SensorUnpackerInterface {
+ public:
+  virtual ~SensorUnpackerInterface() {}
+
+  // Takes the data in *values and writes it out into queues etc.
+  virtual void UnpackFrom(Values *values) = 0;
+};
+
+}  // namespace sensors
+}  // namespace aos
+
+#endif  // AOS_COMMON_SENSORS_SENSOR_UNPACKER_H_
diff --git a/aos/common/sensors/sensors.cc b/aos/common/sensors/sensors.cc
new file mode 100644
index 0000000..8b774d2
--- /dev/null
+++ b/aos/common/sensors/sensors.cc
@@ -0,0 +1,10 @@
+#include "aos/common/sensors/sensors.h"
+
+namespace aos {
+namespace sensors {
+
+const time::Time kSensorSendFrequency =
+    ::aos::control_loops::kLoopFrequency / kSendsPerCycle;
+
+}  // namespace sensors
+}  // namespace aos
diff --git a/aos/common/sensors/sensors.gyp b/aos/common/sensors/sensors.gyp
new file mode 100644
index 0000000..2c49369
--- /dev/null
+++ b/aos/common/sensors/sensors.gyp
@@ -0,0 +1,87 @@
+{
+  'targets': [
+    {
+      'target_name': 'sensor_sink',
+      'type': 'static_library',
+      'sources': [
+      ],
+      'dependencies': [
+        'sensors',
+      ],
+      'export_dependent_settings': [
+        'sensors',
+      ],
+    },
+    {
+      'target_name': 'sensors',
+      'type': 'static_library',
+      'sources': [
+        'sensors.cc'
+      ],
+      'dependencies': [
+        '<(AOS)/common/common.gyp:time',
+        '<(AOS)/common/common.gyp:controls',
+      ],
+      'export_dependent_settings': [
+        '<(AOS)/common/common.gyp:time',
+        '<(AOS)/common/common.gyp:controls',
+      ],
+    },
+    {
+      'target_name': 'sensor_receiver',
+      'type': 'static_library',
+      'sources': [
+        #'sensor_receiver-tmpl.h'
+      ],
+      'dependencies': [
+        '<(AOS)/common/network/network.gyp:socket',
+        '<(AOS)/common/common.gyp:common',
+        'sensors',
+        '<(AOS)/common/common.gyp:time',
+        '<(AOS)/common/common.gyp:gtest_prod',
+      ],
+      'export_dependent_settings': [
+        '<(AOS)/common/network/network.gyp:socket',
+        '<(AOS)/common/common.gyp:common',
+        'sensors',
+        '<(AOS)/common/common.gyp:time',
+        '<(AOS)/common/common.gyp:gtest_prod',
+      ],
+    },
+    {
+      'target_name': 'sensor_receiver_test',
+      'type': 'executable',
+      'sources': [
+        'sensor_receiver_test.cc',
+      ],
+      'dependencies': [
+        '<(EXTERNALS):gtest',
+        'sensor_receiver',
+        '<(AOS)/common/common.gyp:time',
+        'sensors',
+        '<(AOS)/common/common.gyp:queue_testutils',
+      ],
+    },
+    {
+      'target_name': 'sensor_broadcaster',
+      'type': 'static_library',
+      'sources': [
+        #'sensor_broadcaster-tmpl.h',
+      ],
+      'dependencies': [
+        '<(AOS)/crio/shared_libs/shared_libs.gyp:interrupt_notifier',
+        '<(AOS)/common/network/network.gyp:socket',
+        '<(AOS)/common/common.gyp:common',
+        'sensors',
+        'sensor_sink',
+      ],
+      'export_dependent_settings': [
+        '<(AOS)/crio/shared_libs/shared_libs.gyp:interrupt_notifier',
+        '<(AOS)/common/network/network.gyp:socket',
+        '<(AOS)/common/common.gyp:common',
+        'sensors',
+        'sensor_sink',
+      ],
+    },
+  ],
+}
diff --git a/aos/common/sensors/sensors.h b/aos/common/sensors/sensors.h
new file mode 100644
index 0000000..06f5253
--- /dev/null
+++ b/aos/common/sensors/sensors.h
@@ -0,0 +1,48 @@
+#ifndef AOS_COMMON_SENSORS_SENSORS_H_
+#define AOS_COMMON_SENSORS_SENSORS_H_
+
+#include "aos/common/time.h"
+
+#include "aos/common/control_loop/ControlLoop.h"
+
+namespace aos {
+// This namespace contains all of the stuff for dealing with reading sensors and
+// communicating it to everything that needs it. There are 4 main classes whose
+// instances actually process the data. They must all be registered in the
+// appropriate ::aos::crio::ControlsManager hooks.
+//
+// SensorPackers get run on the cRIO to read inputs (from WPILib or elsewhere)
+// and put the values into the Values struct (which is templated for all of the
+// classes that use it).
+// SensorUnpackers get run on both the atom and the cRIO to take the data from
+// the Values struct and put them into queues for control loops etc.
+// SensorBroadcasters (on the cRIO) send the data to a SensorReceiver (on the
+// atom) to pass to its SensorUnpacker there.
+// CRIOControlLoopRunners register with a SensorBroadcaster to get called right
+// after reading the sensor data so that they can immediately pass it so a
+// SensorUnpacker and then run their control loops.
+// The actual SensorPacker and SensorUnpacker classes have the Interface suffix
+// on them.
+namespace sensors {
+
+// How many times per ::aos::control_loops::kLoopFrequency sensor
+// values get sent out by the cRIO.
+// This must evenly divide that frequency into multiples of sysClockRateGet().
+const int kSendsPerCycle = 10;
+// ::aos::control_loops::kLoopFrequency / kSendsPerCycle for
+// convenience.
+extern const time::Time kSensorSendFrequency;
+using ::aos::control_loops::kLoopFrequency;
+
+// This is the struct that actually gets sent over the UDP socket.
+template<class Values>
+struct SensorData {
+  Values values;
+  // Starts at 0 and goes up.
+  int32_t count;
+} __attribute__((packed));
+
+}  // namespace sensors
+}  // namespace aos
+
+#endif  // AOS_COMMON_SENSORS_SENSORS_H_
diff --git a/aos/common/time.cc b/aos/common/time.cc
index 00c936d..995011a 100644
--- a/aos/common/time.cc
+++ b/aos/common/time.cc
@@ -8,21 +8,66 @@
 
 #include "aos/common/logging/logging.h"
 #include "aos/common/inttypes.h"
+#include "aos/common/mutex.h"
 
 namespace aos {
 namespace time {
 
-Time Time::Now(clockid_t clock) {
-  timespec temp;
-  if (clock_gettime(clock, &temp) != 0) {
-    // TODO(aschuh): There needs to be a pluggable low level logging interface
-    // so we can break this dependency loop.  This also would help during
-    // startup.
-    LOG(FATAL, "clock_gettime(%jd, %p) failed with %d: %s\n",
-        static_cast<uintmax_t>(clock), &temp, errno, strerror(errno));
-  }
-  return Time(temp);
+// State required to enable and use mock time.
+namespace {
+// True if mock time is enabled.
+// This does not need to be checked with the mutex held because setting time to
+// be enabled or disabled is atomic, and all future operations are atomic
+// anyways.  If there is a race condition setting or clearing whether time is
+// enabled or not, it will still be a race condition if current_mock_time is
+// also set atomically with enabled.
+bool mock_time_enabled = false;
+// Mutex to make time reads and writes thread safe.
+Mutex time_mutex;
+// Current time when time is mocked.
+Time current_mock_time(0, 0);
+
+// TODO(aschuh): This doesn't include SleepFor and SleepUntil.
+// TODO(aschuh): Create a clock source object and change the default?
+//  That would let me create a MockTime clock source.
 }
+
+void Time::EnableMockTime(const Time now) {
+  mock_time_enabled = true;
+  MutexLocker time_mutex_locker(&time_mutex);
+  current_mock_time = now;
+}
+
+void Time::DisableMockTime() {
+  MutexLocker time_mutex_locker(&time_mutex);
+  mock_time_enabled = false;
+}
+
+void Time::SetMockTime(const Time now) {
+  MutexLocker time_mutex_locker(&time_mutex);
+  if (!mock_time_enabled) {
+    LOG(FATAL, "Tried to set mock time and mock time is not enabled\n");
+  }
+  current_mock_time = now;
+}
+
+Time Time::Now(clockid_t clock) {
+  if (mock_time_enabled) {
+    MutexLocker time_mutex_locker(&time_mutex);
+    return current_mock_time;
+  } else {
+    timespec temp;
+    if (clock_gettime(clock, &temp) != 0) {
+      // TODO(aschuh): There needs to be a pluggable low level logging interface
+      // so we can break this dependency loop.  This also would help during
+      // startup.
+      LOG(FATAL, "clock_gettime(%jd, %p) failed with %d: %s\n",
+          static_cast<uintmax_t>(clock), &temp, errno, strerror(errno));
+    }
+    return Time(temp);
+  }
+}
+
 void Time::Check() {
   if (nsec_ >= kNSecInSec || nsec_ < 0) {
     LOG(FATAL, "0 <= nsec_(%"PRId32") < %"PRId32" isn't true.\n",
diff --git a/aos/common/time.h b/aos/common/time.h
index 1d7ccae..c081e09 100644
--- a/aos/common/time.h
+++ b/aos/common/time.h
@@ -60,6 +60,7 @@
     return ans;
   }
   #endif  // SWIG
+
   // CLOCK_MONOTONIC on the fitpc and CLOCK_REALTIME on the cRIO because the
   // cRIO doesn't have any others.
   // CLOCK_REALTIME is the default realtime clock and CLOCK_MONOTONIC doesn't
@@ -113,6 +114,10 @@
   int ToTicks() const {
     return ToNSec() / static_cast<int64_t>(kNSecInSec / sysClkRateGet());
   }
+  // Constructs a Time representing ticks.
+  static Time InTicks(int ticks) {
+    return Time::InSeconds(ticks * sysClkRateGet());
+  }
 #endif
 
   // Returns the time represented in milliseconds.
@@ -159,6 +164,25 @@
     Check();
   }
 
+  // Absolute value.
+  Time abs() const {
+    if (*this > Time(0, 0)) return *this;
+    return Time(-sec_ - 1, kNSecInSec - nsec_);
+  }
+
+  // Enables returning the mock time value for Now instead of checking the
+  // system clock.  This should only be used when testing things depending on
+  // time, or many things may/will break.
+  static void EnableMockTime(const Time now);
+  // Sets now when time is being mocked.
+  static void SetMockTime(const Time now);
+  // Convenience function to just increment the mock time by a certain amount.
+  static void IncrementMockTime(const Time amount) {
+    SetMockTime(Now() + amount);
+  }
+  // Disables mocking time.
+  static void DisableMockTime();
+
  private:
   int32_t sec_, nsec_;
   // LOG(FATAL)s if nsec_ is >= kNSecInSec.
diff --git a/aos/common/time_test.cc b/aos/common/time_test.cc
index e72e5bd..678c013 100644
--- a/aos/common/time_test.cc
+++ b/aos/common/time_test.cc
@@ -141,6 +141,14 @@
   EXPECT_EQ(254971, t.ToMSec());
 }
 
+TEST(TimeTest, Abs) {
+  EXPECT_EQ(MACRO_DARG(Time(971, 1114)), MACRO_DARG(Time(971, 1114).abs()));
+  EXPECT_EQ(MACRO_DARG(Time(253, Time::kNSecInSec * 0.3)),
+            MACRO_DARG(Time(-254, Time::kNSecInSec * 0.7).abs()));
+  EXPECT_EQ(MACRO_DARG(-Time(-971, 973).ToNSec()),
+            MACRO_DARG(Time(970, Time::kNSecInSec - 973).ToNSec()));
+}
+
 }  // namespace testing
 }  // namespace time
 }  // namespace aos