blob: 43d37de1ae3bda467501743623ef23d7a728b877 [file] [log] [blame]
Brian Silvermanf7bd1c22015-12-24 16:07:11 -08001/*----------------------------------------------------------------------------*/
2/* Copyright (c) FIRST 2015. All Rights Reserved. */
3/* Open Source Software - may be modified and shared by FRC teams. The code */
4/* must be accompanied by the FIRST BSD license file in the root directory of */
5/* the project. */
6/*----------------------------------------------------------------------------*/
7
8#include "RpcServer.h"
9
10#include "Log.h"
11
12using namespace nt;
13
14ATOMIC_STATIC_INIT(RpcServer)
15
16RpcServer::RpcServer() {
17 m_active = false;
18 m_terminating = false;
19}
20
21RpcServer::~RpcServer() {
22 Logger::GetInstance().SetLogger(nullptr);
23 Stop();
24 m_terminating = true;
25 m_poll_cond.notify_all();
26}
27
28void RpcServer::Start() {
29 {
30 std::lock_guard<std::mutex> lock(m_mutex);
31 if (m_active) return;
32 m_active = true;
33 }
34 {
35 std::lock_guard<std::mutex> lock(m_shutdown_mutex);
36 m_shutdown = false;
37 }
38 m_thread = std::thread(&RpcServer::ThreadMain, this);
39}
40
41void RpcServer::Stop() {
42 m_active = false;
43 if (m_thread.joinable()) {
44 // send notification so the thread terminates
45 m_call_cond.notify_one();
46 // join with timeout
47 std::unique_lock<std::mutex> lock(m_shutdown_mutex);
48 auto timeout_time =
49 std::chrono::steady_clock::now() + std::chrono::seconds(1);
50 if (m_shutdown_cv.wait_until(lock, timeout_time,
51 [&] { return m_shutdown; }))
52 m_thread.join();
53 else
54 m_thread.detach(); // timed out, detach it
55 }
56}
57
58void RpcServer::ProcessRpc(StringRef name, std::shared_ptr<Message> msg,
59 RpcCallback func, unsigned int conn_id,
60 SendMsgFunc send_response) {
61 std::unique_lock<std::mutex> lock(m_mutex);
62
63 if (func)
64 m_call_queue.emplace(name, msg, func, conn_id, send_response);
65 else
66 m_poll_queue.emplace(name, msg, func, conn_id, send_response);
67
68 lock.unlock();
69
70 if (func)
71 m_call_cond.notify_one();
72 else
73 m_poll_cond.notify_one();
74}
75
76bool RpcServer::PollRpc(bool blocking, RpcCallInfo* call_info) {
77 std::unique_lock<std::mutex> lock(m_mutex);
78 while (m_poll_queue.empty()) {
79 if (!blocking || m_terminating) return false;
80 m_poll_cond.wait(lock);
81 }
82
83 auto& item = m_poll_queue.front();
84 unsigned int call_uid = (item.conn_id << 16) | item.msg->seq_num_uid();
85 call_info->rpc_id = item.msg->id();
86 call_info->call_uid = call_uid;
87 call_info->name = std::move(item.name);
88 call_info->params = item.msg->str();
89 m_response_map.insert(std::make_pair(std::make_pair(item.msg->id(), call_uid),
90 item.send_response));
91 m_poll_queue.pop();
92 return true;
93}
94
95void RpcServer::PostRpcResponse(unsigned int rpc_id, unsigned int call_uid,
96 llvm::StringRef result) {
97 auto i = m_response_map.find(std::make_pair(rpc_id, call_uid));
98 if (i == m_response_map.end()) {
99 WARNING("posting RPC response to nonexistent call (or duplicate response)");
100 return;
101 }
102 (i->getSecond())(Message::RpcResponse(rpc_id, call_uid, result));
103 m_response_map.erase(i);
104}
105
106void RpcServer::ThreadMain() {
107 std::unique_lock<std::mutex> lock(m_mutex);
108 std::string tmp;
109 while (m_active) {
110 while (m_call_queue.empty()) {
111 m_call_cond.wait(lock);
112 if (!m_active) goto done;
113 }
114
115 while (!m_call_queue.empty()) {
116 if (!m_active) goto done;
117 auto item = std::move(m_call_queue.front());
118 m_call_queue.pop();
119
120 DEBUG4("rpc calling " << item.name);
121
122 if (item.name.empty() || !item.msg || !item.func || !item.send_response)
123 continue;
124
125 // Don't hold mutex during callback execution!
126 lock.unlock();
127 auto result = item.func(item.name, item.msg->str());
128 item.send_response(Message::RpcResponse(item.msg->id(),
129 item.msg->seq_num_uid(), result));
130 lock.lock();
131 }
132 }
133
134done:
135 // use condition variable to signal thread shutdown
136 {
137 std::lock_guard<std::mutex> lock(m_shutdown_mutex);
138 m_shutdown = true;
139 m_shutdown_cv.notify_one();
140 }
141}