blob: 2f1073cabb46fe17918eefb4100f9965d8aac8f9 [file] [log] [blame]
Brian Silvermanf7bd1c22015-12-24 16:07:11 -08001/*----------------------------------------------------------------------------*/
2/* Copyright (c) FIRST 2015. All Rights Reserved. */
3/* Open Source Software - may be modified and shared by FRC teams. The code */
4/* must be accompanied by the FIRST BSD license file in the root directory of */
5/* the project. */
6/*----------------------------------------------------------------------------*/
7
8#ifndef NT_NETWORKCONNECTION_H_
9#define NT_NETWORKCONNECTION_H_
10
11#include <atomic>
12#include <chrono>
13#include <memory>
14#include <thread>
15
16#include "support/ConcurrentQueue.h"
17#include "Message.h"
18#include "ntcore_cpp.h"
19
20class NetworkStream;
21
22namespace nt {
23
24class Notifier;
25
26class NetworkConnection {
27 public:
28 enum State { kCreated, kInit, kHandshake, kSynchronized, kActive, kDead };
29
30 typedef std::function<bool(
31 NetworkConnection& conn,
32 std::function<std::shared_ptr<Message>()> get_msg,
33 std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs)>
34 HandshakeFunc;
35 typedef std::function<void(std::shared_ptr<Message> msg,
36 NetworkConnection* conn)> ProcessIncomingFunc;
37 typedef std::vector<std::shared_ptr<Message>> Outgoing;
38 typedef ConcurrentQueue<Outgoing> OutgoingQueue;
39
40 NetworkConnection(std::unique_ptr<NetworkStream> stream,
41 Notifier& notifier,
42 HandshakeFunc handshake,
43 Message::GetEntryTypeFunc get_entry_type);
44 ~NetworkConnection();
45
46 // Set the input processor function. This must be called before Start().
47 void set_process_incoming(ProcessIncomingFunc func) {
48 m_process_incoming = func;
49 }
50
51 void Start();
52 void Stop();
53
54 ConnectionInfo info() const;
55
56 bool active() const { return m_active; }
57 NetworkStream& stream() { return *m_stream; }
58
59 void QueueOutgoing(std::shared_ptr<Message> msg);
60 void PostOutgoing(bool keep_alive);
61
62 unsigned int uid() const { return m_uid; }
63
64 unsigned int proto_rev() const { return m_proto_rev; }
65 void set_proto_rev(unsigned int proto_rev) { m_proto_rev = proto_rev; }
66
67 State state() const { return static_cast<State>(m_state.load()); }
68 void set_state(State state) { m_state = static_cast<int>(state); }
69
70 std::string remote_id() const;
71 void set_remote_id(StringRef remote_id);
72
73 unsigned long long last_update() const { return m_last_update; }
74
75 NetworkConnection(const NetworkConnection&) = delete;
76 NetworkConnection& operator=(const NetworkConnection&) = delete;
77
78 private:
79 void ReadThreadMain();
80 void WriteThreadMain();
81
82 static std::atomic_uint s_uid;
83
84 unsigned int m_uid;
85 std::unique_ptr<NetworkStream> m_stream;
86 Notifier& m_notifier;
87 OutgoingQueue m_outgoing;
88 HandshakeFunc m_handshake;
89 Message::GetEntryTypeFunc m_get_entry_type;
90 ProcessIncomingFunc m_process_incoming;
91 std::thread m_read_thread;
92 std::thread m_write_thread;
93 std::atomic_bool m_active;
94 std::atomic_uint m_proto_rev;
95 std::atomic_int m_state;
96 mutable std::mutex m_remote_id_mutex;
97 std::string m_remote_id;
98 std::atomic_ullong m_last_update;
99 std::chrono::steady_clock::time_point m_last_post;
100
101 std::mutex m_pending_mutex;
102 Outgoing m_pending_outgoing;
103 std::vector<std::pair<std::size_t, std::size_t>> m_pending_update;
104
105 // Condition variables for shutdown
106 std::mutex m_shutdown_mutex;
107 std::condition_variable m_read_shutdown_cv;
108 std::condition_variable m_write_shutdown_cv;
109 bool m_read_shutdown = false;
110 bool m_write_shutdown = false;
111};
112
113} // namespace nt
114
115#endif // NT_NETWORKCONNECTION_H_