Add start/end time options to log_web_proxy
This makes it so that if you only want to plot a portion of a very long
logfile you can do so.
Change-Id: I7b4841d63b5533dd42a6e21c452d78f44efc247d
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 782c2be..1cc8e17 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -15,6 +15,8 @@
DEFINE_string(data_dir, "www", "Directory to serve data files from");
DEFINE_string(node, "", "Directory to serve data files from");
DEFINE_int32(buffer_size, -1, "-1 if infinite, in # of messages / channel.");
+DEFINE_double(monotonic_start_time, -1.0, "Start time (sec)");
+DEFINE_double(monotonic_end_time, -1.0, "End time (sec)");
int main(int argc, char **argv) {
aos::InitGoogle(&argc, &argv);
@@ -43,6 +45,15 @@
event_loop->SkipTimingReport();
+ if (FLAGS_monotonic_start_time > 0) {
+ event_loop->AddTimer([&reader]() { reader.event_loop_factory()->Exit(); })
+ ->Setup(aos::monotonic_clock::time_point(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(FLAGS_monotonic_start_time))));
+
+ reader.event_loop_factory()->Run();
+ }
+
aos::web_proxy::WebProxy web_proxy(
event_loop.get(), aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
@@ -51,5 +62,12 @@
// Keep the web proxy alive past when we finish reading the logfile.
reader.set_exit_on_finish(false);
+ if (FLAGS_monotonic_end_time > 0) {
+ event_loop->AddTimer([&web_proxy]() { web_proxy.StopRecording(); })
+ ->Setup(aos::monotonic_clock::time_point(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(FLAGS_monotonic_end_time))));
+ }
+
reader.event_loop_factory()->Run();
}
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 7161ee3..d384f72 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -59,7 +59,7 @@
}
TimerHandler *const timer = event_loop_->AddTimer([this]() {
for (auto &subscriber : subscribers_) {
- if (subscriber) subscriber->RunIteration();
+ if (subscriber) subscriber->RunIteration(recording_);
}
});
@@ -201,54 +201,58 @@
global_epoll = nullptr;
}
-void Subscriber::RunIteration() {
- if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
- fetcher_->Fetch();
- message_buffer_.clear();
- return;
- }
+void WebProxy::StopRecording() { websocket_handler_->StopRecording(); }
- while (fetcher_->FetchNext()) {
- // If we aren't building up a buffer, short-circuit the FetchNext().
- if (buffer_size_ == 0) {
+void Subscriber::RunIteration(bool fetch_new) {
+ if (fetch_new) {
+ if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
fetcher_->Fetch();
+ message_buffer_.clear();
+ return;
}
- Message message;
- message.index = fetcher_->context().queue_index;
- VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
- << "packets";
- for (int packet_index = 0;
- packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
- // Pack directly into the mbuffer. This is admittedly a bit painful.
- const size_t packet_size =
- PackedMessageSize(fetcher_->context(), packet_index);
- struct mbuf *mbuffer = mbuf_alloc(packet_size);
- {
- // Wrap a pre-allocated builder around the mbuffer.
- PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
- flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
- flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
- &fbb, fetcher_->context(), channel_index_, packet_index);
- fbb.Finish(message_offset);
-
- // Now, the flatbuffer is built from the back to the front. So any
- // extra memory will be at the front. Setup the end and start pointers
- // on the mbuf.
- mbuf_set_end(mbuffer, packet_size);
- mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
+ while (fetcher_->FetchNext()) {
+ // If we aren't building up a buffer, short-circuit the FetchNext().
+ if (buffer_size_ == 0) {
+ fetcher_->Fetch();
}
+ Message message;
+ message.index = fetcher_->context().queue_index;
+ VLOG(2) << "Packing a message with "
+ << GetPacketCount(fetcher_->context()) << "packets";
+ for (int packet_index = 0;
+ packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
+ // Pack directly into the mbuffer. This is admittedly a bit painful.
+ const size_t packet_size =
+ PackedMessageSize(fetcher_->context(), packet_index);
+ struct mbuf *mbuffer = mbuf_alloc(packet_size);
- message.data.emplace_back(
- std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
- }
- message_buffer_.push_back(std::move(message));
- // If we aren't keeping a buffer, then we should only do one iteration of
- // the while loop--otherwise, if additional messages arrive between the
- // first FetchNext() and the second iteration then we can end up behaving
- // poorly (since we do a Fetch() when buffer_size_ == 0).
- if (buffer_size_ == 0) {
- break;
+ {
+ // Wrap a pre-allocated builder around the mbuffer.
+ PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
+ flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
+ flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
+ &fbb, fetcher_->context(), channel_index_, packet_index);
+ fbb.Finish(message_offset);
+
+ // Now, the flatbuffer is built from the back to the front. So any
+ // extra memory will be at the front. Setup the end and start
+ // pointers on the mbuf.
+ mbuf_set_end(mbuffer, packet_size);
+ mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
+ }
+
+ message.data.emplace_back(
+ std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
+ }
+ message_buffer_.push_back(std::move(message));
+ // If we aren't keeping a buffer, then we should only do one iteration of
+ // the while loop--otherwise, if additional messages arrive between the
+ // first FetchNext() and the second iteration then we can end up behaving
+ // poorly (since we do a Fetch() when buffer_size_ == 0).
+ if (buffer_size_ == 0) {
+ break;
+ }
}
}
for (auto &conn : channels_) {
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index baca26e..0c1d1dc 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -41,6 +41,10 @@
void onData(::seasocks::WebSocket *sock, const uint8_t *data,
size_t size) override;
void onDisconnect(::seasocks::WebSocket *sock) override;
+ // Stops recording data, even if the event loop continues running. This allows
+ // us to continue serving the webserver + websocket server, without having to
+ // load more actual data.
+ void StopRecording() { recording_ = false; }
private:
::seasocks::Server *server_;
@@ -51,6 +55,8 @@
connections_;
EventLoop *const event_loop_;
+ // Whether to pay attention to new messages.
+ bool recording_ = true;
};
// Wrapper class that manages the seasocks server and WebsocketHandler.
@@ -75,6 +81,9 @@
void SetDataPath(const char *path) { server_.setStaticPath(path); }
+ // Stops recording data. Useful for setting end times in log replay.
+ void StopRecording();
+
private:
WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
StoreHistory store_history, int per_channel_buffer_size_bytes);
@@ -121,7 +130,12 @@
store_history_(store_history == StoreHistory::kYes),
buffer_size_(buffer_size) {}
- void RunIteration();
+ // Runs a single iteration of going through and fetching new data as needed
+ // and servicing any WebRTC channels that are requesting messages.
+ // fetch_new specifies whether we should actually attempt to retrieve new data
+ // on the channel--if false, will only worry about sending existing data to
+ // any clients.
+ void RunIteration(bool fetch_new);
void AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
TransferMethod transfer_method);