| /*----------------------------------------------------------------------------*/ |
| /* Copyright (c) FIRST 2015. All Rights Reserved. */ |
| /* Open Source Software - may be modified and shared by FRC teams. The code */ |
| /* must be accompanied by the FIRST BSD license file in the root directory of */ |
| /* the project. */ |
| /*----------------------------------------------------------------------------*/ |
| |
| #include "Storage.h" |
| |
| #include <cctype> |
| #include <string> |
| #include <tuple> |
| |
| #include "llvm/StringExtras.h" |
| #include "Base64.h" |
| #include "Log.h" |
| #include "NetworkConnection.h" |
| |
| using namespace nt; |
| |
| ATOMIC_STATIC_INIT(Storage) |
| |
| Storage::Storage() |
| : Storage(Notifier::GetInstance(), RpcServer::GetInstance()) {} |
| |
| Storage::Storage(Notifier& notifier, RpcServer& rpc_server) |
| : m_notifier(notifier), m_rpc_server(rpc_server) { |
| m_terminating = false; |
| } |
| |
| Storage::~Storage() { |
| Logger::GetInstance().SetLogger(nullptr); |
| m_terminating = true; |
| m_rpc_results_cond.notify_all(); |
| } |
| |
| void Storage::SetOutgoing(QueueOutgoingFunc queue_outgoing, bool server) { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| m_queue_outgoing = queue_outgoing; |
| m_server = server; |
| } |
| |
| void Storage::ClearOutgoing() { |
| m_queue_outgoing = nullptr; |
| } |
| |
| NT_Type Storage::GetEntryType(unsigned int id) const { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| if (id >= m_idmap.size()) return NT_UNASSIGNED; |
| Entry* entry = m_idmap[id]; |
| if (!entry || !entry->value) return NT_UNASSIGNED; |
| return entry->value->type(); |
| } |
| |
| void Storage::ProcessIncoming(std::shared_ptr<Message> msg, |
| NetworkConnection* conn, |
| std::weak_ptr<NetworkConnection> conn_weak) { |
| std::unique_lock<std::mutex> lock(m_mutex); |
| switch (msg->type()) { |
| case Message::kKeepAlive: |
| break; // ignore |
| case Message::kClientHello: |
| case Message::kProtoUnsup: |
| case Message::kServerHelloDone: |
| case Message::kServerHello: |
| case Message::kClientHelloDone: |
| // shouldn't get these, but ignore if we do |
| break; |
| case Message::kEntryAssign: { |
| unsigned int id = msg->id(); |
| StringRef name = msg->str(); |
| Entry* entry; |
| bool may_need_update = false; |
| if (m_server) { |
| // if we're a server, id=0xffff requests are requests for an id |
| // to be assigned, and we need to send the new assignment back to |
| // the sender as well as all other connections. |
| if (id == 0xffff) { |
| // see if it was already assigned; ignore if so. |
| if (m_entries.count(name) != 0) return; |
| |
| // create it locally |
| id = m_idmap.size(); |
| auto& new_entry = m_entries[name]; |
| if (!new_entry) new_entry.reset(new Entry(name)); |
| entry = new_entry.get(); |
| entry->value = msg->value(); |
| entry->flags = msg->flags(); |
| entry->id = id; |
| m_idmap.push_back(entry); |
| |
| // update persistent dirty flag if it's persistent |
| if (entry->IsPersistent()) m_persistent_dirty = true; |
| |
| // notify |
| m_notifier.NotifyEntry(name, entry->value, NT_NOTIFY_NEW); |
| |
| // send the assignment to everyone (including the originator) |
| if (m_queue_outgoing) { |
| auto queue_outgoing = m_queue_outgoing; |
| auto outmsg = Message::EntryAssign( |
| name, id, entry->seq_num.value(), msg->value(), msg->flags()); |
| lock.unlock(); |
| queue_outgoing(outmsg, nullptr, nullptr); |
| } |
| return; |
| } |
| if (id >= m_idmap.size() || !m_idmap[id]) { |
| // ignore arbitrary entry assignments |
| // this can happen due to e.g. assignment to deleted entry |
| lock.unlock(); |
| DEBUG("server: received assignment to unknown entry"); |
| return; |
| } |
| entry = m_idmap[id]; |
| } else { |
| // clients simply accept new assignments |
| if (id == 0xffff) { |
| lock.unlock(); |
| DEBUG("client: received entry assignment request?"); |
| return; |
| } |
| if (id >= m_idmap.size()) m_idmap.resize(id+1); |
| entry = m_idmap[id]; |
| if (!entry) { |
| // create local |
| auto& new_entry = m_entries[name]; |
| if (!new_entry) { |
| // didn't exist at all (rather than just being a response to a |
| // id assignment request) |
| new_entry.reset(new Entry(name)); |
| new_entry->value = msg->value(); |
| new_entry->flags = msg->flags(); |
| new_entry->id = id; |
| m_idmap[id] = new_entry.get(); |
| |
| // notify |
| m_notifier.NotifyEntry(name, new_entry->value, NT_NOTIFY_NEW); |
| return; |
| } |
| may_need_update = true; // we may need to send an update message |
| entry = new_entry.get(); |
| entry->id = id; |
| m_idmap[id] = entry; |
| |
| // if the received flags don't match what we sent, we most likely |
| // updated flags locally in the interim; send flags update message. |
| if (msg->flags() != entry->flags) { |
| auto queue_outgoing = m_queue_outgoing; |
| auto outmsg = Message::FlagsUpdate(id, entry->flags); |
| lock.unlock(); |
| queue_outgoing(outmsg, nullptr, nullptr); |
| lock.lock(); |
| } |
| } |
| } |
| |
| // common client and server handling |
| |
| // already exists; ignore if sequence number not higher than local |
| SequenceNumber seq_num(msg->seq_num_uid()); |
| if (seq_num < entry->seq_num) { |
| if (may_need_update) { |
| auto queue_outgoing = m_queue_outgoing; |
| auto outmsg = Message::EntryUpdate(entry->id, entry->seq_num.value(), |
| entry->value); |
| lock.unlock(); |
| queue_outgoing(outmsg, nullptr, nullptr); |
| } |
| return; |
| } |
| |
| // sanity check: name should match id |
| if (msg->str() != entry->name) { |
| lock.unlock(); |
| DEBUG("entry assignment for same id with different name?"); |
| return; |
| } |
| |
| unsigned int notify_flags = NT_NOTIFY_UPDATE; |
| |
| // don't update flags from a <3.0 remote (not part of message) |
| // don't update flags if this is a server response to a client id request |
| if (!may_need_update && conn->proto_rev() >= 0x0300) { |
| // update persistent dirty flag if persistent flag changed |
| if ((entry->flags & NT_PERSISTENT) != (msg->flags() & NT_PERSISTENT)) |
| m_persistent_dirty = true; |
| if (entry->flags != msg->flags()) |
| notify_flags |= NT_NOTIFY_FLAGS; |
| entry->flags = msg->flags(); |
| } |
| |
| // update persistent dirty flag if the value changed and it's persistent |
| if (entry->IsPersistent() && *entry->value != *msg->value()) |
| m_persistent_dirty = true; |
| |
| // update local |
| entry->value = msg->value(); |
| entry->seq_num = seq_num; |
| |
| // notify |
| m_notifier.NotifyEntry(name, entry->value, notify_flags); |
| |
| // broadcast to all other connections (note for client there won't |
| // be any other connections, so don't bother) |
| if (m_server && m_queue_outgoing) { |
| auto queue_outgoing = m_queue_outgoing; |
| auto outmsg = |
| Message::EntryAssign(entry->name, id, msg->seq_num_uid(), |
| msg->value(), entry->flags); |
| lock.unlock(); |
| queue_outgoing(outmsg, nullptr, conn); |
| } |
| break; |
| } |
| case Message::kEntryUpdate: { |
| unsigned int id = msg->id(); |
| if (id >= m_idmap.size() || !m_idmap[id]) { |
| // ignore arbitrary entry updates; |
| // this can happen due to deleted entries |
| lock.unlock(); |
| DEBUG("received update to unknown entry"); |
| return; |
| } |
| Entry* entry = m_idmap[id]; |
| |
| // ignore if sequence number not higher than local |
| SequenceNumber seq_num(msg->seq_num_uid()); |
| if (seq_num <= entry->seq_num) return; |
| |
| // update local |
| entry->value = msg->value(); |
| entry->seq_num = seq_num; |
| |
| // update persistent dirty flag if it's a persistent value |
| if (entry->IsPersistent()) m_persistent_dirty = true; |
| |
| // notify |
| m_notifier.NotifyEntry(entry->name, entry->value, NT_NOTIFY_UPDATE); |
| |
| // broadcast to all other connections (note for client there won't |
| // be any other connections, so don't bother) |
| if (m_server && m_queue_outgoing) { |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, conn); |
| } |
| break; |
| } |
| case Message::kFlagsUpdate: { |
| unsigned int id = msg->id(); |
| if (id >= m_idmap.size() || !m_idmap[id]) { |
| // ignore arbitrary entry updates; |
| // this can happen due to deleted entries |
| lock.unlock(); |
| DEBUG("received flags update to unknown entry"); |
| return; |
| } |
| Entry* entry = m_idmap[id]; |
| |
| // ignore if flags didn't actually change |
| if (entry->flags == msg->flags()) return; |
| |
| // update persistent dirty flag if persistent flag changed |
| if ((entry->flags & NT_PERSISTENT) != (msg->flags() & NT_PERSISTENT)) |
| m_persistent_dirty = true; |
| |
| // update local |
| entry->flags = msg->flags(); |
| |
| // notify |
| m_notifier.NotifyEntry(entry->name, entry->value, NT_NOTIFY_FLAGS); |
| |
| // broadcast to all other connections (note for client there won't |
| // be any other connections, so don't bother) |
| if (m_server && m_queue_outgoing) { |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, conn); |
| } |
| break; |
| } |
| case Message::kEntryDelete: { |
| unsigned int id = msg->id(); |
| if (id >= m_idmap.size() || !m_idmap[id]) { |
| // ignore arbitrary entry updates; |
| // this can happen due to deleted entries |
| lock.unlock(); |
| DEBUG("received delete to unknown entry"); |
| return; |
| } |
| Entry* entry = m_idmap[id]; |
| |
| // update persistent dirty flag if it's a persistent value |
| if (entry->IsPersistent()) m_persistent_dirty = true; |
| |
| // delete it from idmap |
| m_idmap[id] = nullptr; |
| |
| // get entry (as we'll need it for notify) and erase it from the map |
| // it should always be in the map, but sanity check just in case |
| auto i = m_entries.find(entry->name); |
| if (i != m_entries.end()) { |
| auto entry2 = std::move(i->getValue()); // move the value out |
| m_entries.erase(i); |
| |
| // notify |
| m_notifier.NotifyEntry(entry2->name, entry2->value, NT_NOTIFY_DELETE); |
| } |
| |
| // broadcast to all other connections (note for client there won't |
| // be any other connections, so don't bother) |
| if (m_server && m_queue_outgoing) { |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, conn); |
| } |
| break; |
| } |
| case Message::kClearEntries: { |
| // update local |
| EntriesMap map; |
| m_entries.swap(map); |
| m_idmap.resize(0); |
| |
| // set persistent dirty flag |
| m_persistent_dirty = true; |
| |
| // notify |
| for (auto& entry : map) |
| m_notifier.NotifyEntry(entry.getKey(), entry.getValue()->value, |
| NT_NOTIFY_DELETE); |
| |
| // broadcast to all other connections (note for client there won't |
| // be any other connections, so don't bother) |
| if (m_server && m_queue_outgoing) { |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, conn); |
| } |
| break; |
| } |
| case Message::kExecuteRpc: { |
| if (!m_server) return; // only process on server |
| unsigned int id = msg->id(); |
| if (id >= m_idmap.size() || !m_idmap[id]) { |
| // ignore call to non-existent RPC |
| // this can happen due to deleted entries |
| lock.unlock(); |
| DEBUG("received RPC call to unknown entry"); |
| return; |
| } |
| Entry* entry = m_idmap[id]; |
| if (!entry->value->IsRpc()) { |
| lock.unlock(); |
| DEBUG("received RPC call to non-RPC entry"); |
| return; |
| } |
| m_rpc_server.ProcessRpc(entry->name, msg, entry->rpc_callback, |
| conn->uid(), [=](std::shared_ptr<Message> msg) { |
| auto c = conn_weak.lock(); |
| if (c) c->QueueOutgoing(msg); |
| }); |
| break; |
| } |
| case Message::kRpcResponse: { |
| if (m_server) return; // only process on client |
| m_rpc_results.insert(std::make_pair( |
| std::make_pair(msg->id(), msg->seq_num_uid()), msg->str())); |
| m_rpc_results_cond.notify_all(); |
| break; |
| } |
| default: |
| break; |
| } |
| } |
| |
| void Storage::GetInitialAssignments( |
| NetworkConnection& conn, std::vector<std::shared_ptr<Message>>* msgs) { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| conn.set_state(NetworkConnection::kSynchronized); |
| for (auto& i : m_entries) { |
| Entry* entry = i.getValue().get(); |
| msgs->emplace_back(Message::EntryAssign(i.getKey(), entry->id, |
| entry->seq_num.value(), |
| entry->value, entry->flags)); |
| } |
| } |
| |
| void Storage::ApplyInitialAssignments( |
| NetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs, |
| bool new_server, std::vector<std::shared_ptr<Message>>* out_msgs) { |
| std::unique_lock<std::mutex> lock(m_mutex); |
| if (m_server) return; // should not do this on server |
| |
| conn.set_state(NetworkConnection::kSynchronized); |
| |
| std::vector<std::shared_ptr<Message>> update_msgs; |
| |
| // clear existing id's |
| for (auto& i : m_entries) i.getValue()->id = 0xffff; |
| |
| // clear existing idmap |
| m_idmap.resize(0); |
| |
| // apply assignments |
| for (auto& msg : msgs) { |
| if (!msg->Is(Message::kEntryAssign)) { |
| DEBUG("client: received non-entry assignment request?"); |
| continue; |
| } |
| |
| unsigned int id = msg->id(); |
| if (id == 0xffff) { |
| DEBUG("client: received entry assignment request?"); |
| continue; |
| } |
| |
| SequenceNumber seq_num(msg->seq_num_uid()); |
| StringRef name = msg->str(); |
| |
| auto& entry = m_entries[name]; |
| if (!entry) { |
| // doesn't currently exist |
| entry.reset(new Entry(name)); |
| entry->value = msg->value(); |
| entry->flags = msg->flags(); |
| entry->seq_num = seq_num; |
| // notify |
| m_notifier.NotifyEntry(name, entry->value, NT_NOTIFY_NEW); |
| } else { |
| // if reconnect and sequence number not higher than local, then we |
| // don't update the local value and instead send it back to the server |
| // as an update message |
| if (!new_server && seq_num <= entry->seq_num) { |
| update_msgs.emplace_back(Message::EntryUpdate( |
| entry->id, entry->seq_num.value(), entry->value)); |
| } else { |
| entry->value = msg->value(); |
| entry->seq_num = seq_num; |
| unsigned int notify_flags = NT_NOTIFY_UPDATE; |
| // don't update flags from a <3.0 remote (not part of message) |
| if (conn.proto_rev() >= 0x0300) { |
| if (entry->flags != msg->flags()) notify_flags |= NT_NOTIFY_FLAGS; |
| entry->flags = msg->flags(); |
| } |
| // notify |
| m_notifier.NotifyEntry(name, entry->value, notify_flags); |
| } |
| } |
| |
| // set id and save to idmap |
| entry->id = id; |
| if (id >= m_idmap.size()) m_idmap.resize(id+1); |
| m_idmap[id] = entry.get(); |
| } |
| |
| // generate assign messages for unassigned local entries |
| for (auto& i : m_entries) { |
| Entry* entry = i.getValue().get(); |
| if (entry->id != 0xffff) continue; |
| out_msgs->emplace_back(Message::EntryAssign(entry->name, entry->id, |
| entry->seq_num.value(), |
| entry->value, entry->flags)); |
| } |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| for (auto& msg : update_msgs) queue_outgoing(msg, nullptr, nullptr); |
| } |
| |
| std::shared_ptr<Value> Storage::GetEntryValue(StringRef name) const { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| auto i = m_entries.find(name); |
| return i == m_entries.end() ? nullptr : i->getValue()->value; |
| } |
| |
| bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) { |
| if (name.empty()) return true; |
| if (!value) return true; |
| std::unique_lock<std::mutex> lock(m_mutex); |
| auto& new_entry = m_entries[name]; |
| if (!new_entry) new_entry.reset(new Entry(name)); |
| Entry* entry = new_entry.get(); |
| auto old_value = entry->value; |
| if (old_value && old_value->type() != value->type()) |
| return false; // error on type mismatch |
| entry->value = value; |
| |
| // if we're the server, assign an id if it doesn't have one |
| if (m_server && entry->id == 0xffff) { |
| unsigned int id = m_idmap.size(); |
| entry->id = id; |
| m_idmap.push_back(entry); |
| } |
| |
| // update persistent dirty flag if value changed and it's persistent |
| if (entry->IsPersistent() && *old_value != *value) m_persistent_dirty = true; |
| |
| // notify (for local listeners) |
| if (m_notifier.local_notifiers()) { |
| if (!old_value) |
| m_notifier.NotifyEntry(name, value, NT_NOTIFY_NEW | NT_NOTIFY_LOCAL); |
| else if (*old_value != *value) |
| m_notifier.NotifyEntry(name, value, NT_NOTIFY_UPDATE | NT_NOTIFY_LOCAL); |
| } |
| |
| // generate message |
| if (!m_queue_outgoing) return true; |
| auto queue_outgoing = m_queue_outgoing; |
| if (!old_value) { |
| auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(), |
| value, entry->flags); |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } else if (*old_value != *value) { |
| ++entry->seq_num; |
| // don't send an update if we don't have an assigned id yet |
| if (entry->id != 0xffff) { |
| auto msg = |
| Message::EntryUpdate(entry->id, entry->seq_num.value(), value); |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } |
| } |
| return true; |
| } |
| |
| void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) { |
| if (name.empty()) return; |
| if (!value) return; |
| std::unique_lock<std::mutex> lock(m_mutex); |
| auto& new_entry = m_entries[name]; |
| if (!new_entry) new_entry.reset(new Entry(name)); |
| Entry* entry = new_entry.get(); |
| auto old_value = entry->value; |
| entry->value = value; |
| if (old_value && *old_value == *value) return; |
| |
| // if we're the server, assign an id if it doesn't have one |
| if (m_server && entry->id == 0xffff) { |
| unsigned int id = m_idmap.size(); |
| entry->id = id; |
| m_idmap.push_back(entry); |
| } |
| |
| // update persistent dirty flag if it's a persistent value |
| if (entry->IsPersistent()) m_persistent_dirty = true; |
| |
| // notify (for local listeners) |
| if (m_notifier.local_notifiers()) { |
| if (!old_value) |
| m_notifier.NotifyEntry(name, value, NT_NOTIFY_NEW | NT_NOTIFY_LOCAL); |
| else |
| m_notifier.NotifyEntry(name, value, NT_NOTIFY_UPDATE | NT_NOTIFY_LOCAL); |
| } |
| |
| // generate message |
| if (!m_queue_outgoing) return; |
| auto queue_outgoing = m_queue_outgoing; |
| if (!old_value || old_value->type() != value->type()) { |
| ++entry->seq_num; |
| auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(), |
| value, entry->flags); |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } else { |
| ++entry->seq_num; |
| // don't send an update if we don't have an assigned id yet |
| if (entry->id != 0xffff) { |
| auto msg = |
| Message::EntryUpdate(entry->id, entry->seq_num.value(), value); |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } |
| } |
| } |
| |
| void Storage::SetEntryFlags(StringRef name, unsigned int flags) { |
| if (name.empty()) return; |
| std::unique_lock<std::mutex> lock(m_mutex); |
| auto i = m_entries.find(name); |
| if (i == m_entries.end()) return; |
| Entry* entry = i->getValue().get(); |
| if (entry->flags == flags) return; |
| |
| // update persistent dirty flag if persistent flag changed |
| if ((entry->flags & NT_PERSISTENT) != (flags & NT_PERSISTENT)) |
| m_persistent_dirty = true; |
| |
| entry->flags = flags; |
| |
| // notify |
| m_notifier.NotifyEntry(name, entry->value, NT_NOTIFY_FLAGS | NT_NOTIFY_LOCAL); |
| |
| // generate message |
| if (!m_queue_outgoing) return; |
| auto queue_outgoing = m_queue_outgoing; |
| unsigned int id = entry->id; |
| // don't send an update if we don't have an assigned id yet |
| if (id != 0xffff) { |
| lock.unlock(); |
| queue_outgoing(Message::FlagsUpdate(id, flags), nullptr, nullptr); |
| } |
| } |
| |
| unsigned int Storage::GetEntryFlags(StringRef name) const { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| auto i = m_entries.find(name); |
| return i == m_entries.end() ? 0 : i->getValue()->flags; |
| } |
| |
| void Storage::DeleteEntry(StringRef name) { |
| std::unique_lock<std::mutex> lock(m_mutex); |
| auto i = m_entries.find(name); |
| if (i == m_entries.end()) return; |
| auto entry = std::move(i->getValue()); |
| unsigned int id = entry->id; |
| |
| // update persistent dirty flag if it's a persistent value |
| if (entry->IsPersistent()) m_persistent_dirty = true; |
| |
| m_entries.erase(i); // erase from map |
| if (id < m_idmap.size()) m_idmap[id] = nullptr; |
| |
| if (!entry->value) return; |
| |
| // notify |
| m_notifier.NotifyEntry(name, entry->value, |
| NT_NOTIFY_DELETE | NT_NOTIFY_LOCAL); |
| |
| // if it had a value, generate message |
| // don't send an update if we don't have an assigned id yet |
| if (id != 0xffff) { |
| if (!m_queue_outgoing) return; |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| queue_outgoing(Message::EntryDelete(id), nullptr, nullptr); |
| } |
| } |
| |
| void Storage::DeleteAllEntries() { |
| std::unique_lock<std::mutex> lock(m_mutex); |
| if (m_entries.empty()) return; |
| EntriesMap map; |
| m_entries.swap(map); |
| m_idmap.resize(0); |
| |
| // set persistent dirty flag |
| m_persistent_dirty = true; |
| |
| // notify |
| if (m_notifier.local_notifiers()) { |
| for (auto& entry : map) |
| m_notifier.NotifyEntry(entry.getKey(), entry.getValue()->value, |
| NT_NOTIFY_DELETE | NT_NOTIFY_LOCAL); |
| } |
| |
| // generate message |
| if (!m_queue_outgoing) return; |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| queue_outgoing(Message::ClearEntries(), nullptr, nullptr); |
| } |
| |
| std::vector<EntryInfo> Storage::GetEntryInfo(StringRef prefix, |
| unsigned int types) { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| std::vector<EntryInfo> infos; |
| for (auto& i : m_entries) { |
| if (!i.getKey().startswith(prefix)) continue; |
| Entry* entry = i.getValue().get(); |
| auto value = entry->value; |
| if (!value) continue; |
| if (types != 0 && (types & value->type()) == 0) continue; |
| EntryInfo info; |
| info.name = i.getKey(); |
| info.type = value->type(); |
| info.flags = entry->flags; |
| info.last_change = value->last_change(); |
| infos.push_back(std::move(info)); |
| } |
| return infos; |
| } |
| |
| void Storage::NotifyEntries(StringRef prefix, |
| EntryListenerCallback only) const { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| for (auto& i : m_entries) { |
| if (!i.getKey().startswith(prefix)) continue; |
| m_notifier.NotifyEntry(i.getKey(), i.getValue()->value, NT_NOTIFY_IMMEDIATE, |
| only); |
| } |
| } |
| |
| /* Escapes and writes a string, including start and end double quotes */ |
| static void WriteString(std::ostream& os, llvm::StringRef str) { |
| os << '"'; |
| for (auto c : str) { |
| switch (c) { |
| case '\\': |
| os << "\\\\"; |
| break; |
| case '\t': |
| os << "\\t"; |
| break; |
| case '\n': |
| os << "\\n"; |
| break; |
| case '"': |
| os << "\\\""; |
| break; |
| default: |
| if (std::isprint(c)) { |
| os << c; |
| break; |
| } |
| |
| // Write out the escaped representation. |
| os << "\\x"; |
| os << llvm::hexdigit((c >> 4) & 0xF); |
| os << llvm::hexdigit((c >> 0) & 0xF); |
| } |
| } |
| os << '"'; |
| } |
| |
| bool Storage::GetPersistentEntries( |
| bool periodic, |
| std::vector<std::pair<std::string, std::shared_ptr<Value>>>* entries) |
| const { |
| // copy values out of storage as quickly as possible so lock isn't held |
| { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| // for periodic, don't re-save unless something has changed |
| if (periodic && !m_persistent_dirty) return false; |
| m_persistent_dirty = false; |
| entries->reserve(m_entries.size()); |
| for (auto& i : m_entries) { |
| Entry* entry = i.getValue().get(); |
| // only write persistent-flagged values |
| if (!entry->IsPersistent()) continue; |
| entries->emplace_back(i.getKey(), entry->value); |
| } |
| } |
| |
| // sort in name order |
| std::sort(entries->begin(), entries->end(), |
| [](const std::pair<std::string, std::shared_ptr<Value>>& a, |
| const std::pair<std::string, std::shared_ptr<Value>>& b) { |
| return a.first < b.first; |
| }); |
| return true; |
| } |
| |
| static void SavePersistentImpl( |
| std::ostream& os, |
| llvm::ArrayRef<std::pair<std::string, std::shared_ptr<Value>>> entries) { |
| std::string base64_encoded; |
| |
| // header |
| os << "[NetworkTables Storage 3.0]\n"; |
| |
| for (auto& i : entries) { |
| // type |
| auto v = i.second; |
| if (!v) continue; |
| switch (v->type()) { |
| case NT_BOOLEAN: |
| os << "boolean "; |
| break; |
| case NT_DOUBLE: |
| os << "double "; |
| break; |
| case NT_STRING: |
| os << "string "; |
| break; |
| case NT_RAW: |
| os << "raw "; |
| break; |
| case NT_BOOLEAN_ARRAY: |
| os << "array boolean "; |
| break; |
| case NT_DOUBLE_ARRAY: |
| os << "array double "; |
| break; |
| case NT_STRING_ARRAY: |
| os << "array string "; |
| break; |
| default: |
| continue; |
| } |
| |
| // name |
| WriteString(os, i.first); |
| |
| // = |
| os << '='; |
| |
| // value |
| switch (v->type()) { |
| case NT_BOOLEAN: |
| os << (v->GetBoolean() ? "true" : "false"); |
| break; |
| case NT_DOUBLE: |
| os << v->GetDouble(); |
| break; |
| case NT_STRING: |
| WriteString(os, v->GetString()); |
| break; |
| case NT_RAW: |
| Base64Encode(v->GetRaw(), &base64_encoded); |
| os << base64_encoded; |
| break; |
| case NT_BOOLEAN_ARRAY: { |
| bool first = true; |
| for (auto elem : v->GetBooleanArray()) { |
| if (!first) os << ','; |
| first = false; |
| os << (elem ? "true" : "false"); |
| } |
| break; |
| } |
| case NT_DOUBLE_ARRAY: { |
| bool first = true; |
| for (auto elem : v->GetDoubleArray()) { |
| if (!first) os << ','; |
| first = false; |
| os << elem; |
| } |
| break; |
| } |
| case NT_STRING_ARRAY: { |
| bool first = true; |
| for (auto& elem : v->GetStringArray()) { |
| if (!first) os << ','; |
| first = false; |
| WriteString(os, elem); |
| } |
| break; |
| } |
| default: |
| break; |
| } |
| |
| // eol |
| os << '\n'; |
| } |
| } |
| |
| void Storage::SavePersistent(std::ostream& os, bool periodic) const { |
| std::vector<std::pair<std::string, std::shared_ptr<Value>>> entries; |
| if (!GetPersistentEntries(periodic, &entries)) return; |
| SavePersistentImpl(os, entries); |
| } |
| |
| const char* Storage::SavePersistent(StringRef filename, bool periodic) const { |
| std::string fn = filename; |
| std::string tmp = filename; |
| tmp += ".tmp"; |
| std::string bak = filename; |
| bak += ".bak"; |
| |
| // Get entries before creating file |
| std::vector<std::pair<std::string, std::shared_ptr<Value>>> entries; |
| if (!GetPersistentEntries(periodic, &entries)) return nullptr; |
| |
| const char* err = nullptr; |
| |
| // start by writing to temporary file |
| std::ofstream os(tmp); |
| if (!os) { |
| err = "could not open file"; |
| goto done; |
| } |
| DEBUG("saving persistent file '" << filename << "'"); |
| SavePersistentImpl(os, entries); |
| os.flush(); |
| if (!os) { |
| os.close(); |
| std::remove(tmp.c_str()); |
| err = "error saving file"; |
| goto done; |
| } |
| os.close(); |
| |
| // Safely move to real file. We ignore any failures related to the backup. |
| std::remove(bak.c_str()); |
| std::rename(fn.c_str(), bak.c_str()); |
| if (std::rename(tmp.c_str(), fn.c_str()) != 0) { |
| std::rename(bak.c_str(), fn.c_str()); // attempt to restore backup |
| err = "could not rename temp file to real file"; |
| goto done; |
| } |
| |
| done: |
| // try again if there was an error |
| if (err && periodic) m_persistent_dirty = true; |
| return err; |
| } |
| |
| /* Extracts an escaped string token. Does not unescape the string. |
| * If a string cannot be matched, an empty string is returned. |
| * If the string is unterminated, an empty tail string is returned. |
| * The returned token includes the starting and trailing quotes (unless the |
| * string is unterminated). |
| * Returns a pair containing the extracted token (if any) and the remaining |
| * tail string. |
| */ |
| static std::pair<llvm::StringRef, llvm::StringRef> ReadStringToken( |
| llvm::StringRef source) { |
| // Match opening quote |
| if (source.empty() || source.front() != '"') |
| return std::make_pair(llvm::StringRef(), source); |
| |
| // Scan for ending double quote, checking for escaped as we go. |
| std::size_t size = source.size(); |
| std::size_t pos; |
| for (pos = 1; pos < size; ++pos) { |
| if (source[pos] == '"' && source[pos - 1] != '\\') { |
| ++pos; // we want to include the trailing quote in the result |
| break; |
| } |
| } |
| return std::make_pair(source.slice(0, pos), source.substr(pos)); |
| } |
| |
| static int fromxdigit(char ch) { |
| if (ch >= 'a' && ch <= 'f') |
| return (ch - 'a' + 10); |
| else if (ch >= 'A' && ch <= 'F') |
| return (ch - 'A' + 10); |
| else |
| return ch - '0'; |
| } |
| |
| static void UnescapeString(llvm::StringRef source, std::string* dest) { |
| assert(source.size() >= 2 && source.front() == '"' && source.back() == '"'); |
| dest->clear(); |
| dest->reserve(source.size() - 2); |
| for (auto s = source.begin() + 1, end = source.end() - 1; s != end; ++s) { |
| if (*s != '\\') { |
| dest->push_back(*s); |
| continue; |
| } |
| switch (*++s) { |
| case '\\': |
| case '"': |
| dest->push_back(s[-1]); |
| break; |
| case 't': |
| dest->push_back('\t'); |
| break; |
| case 'n': |
| dest->push_back('\n'); |
| break; |
| case 'x': { |
| if (!isxdigit(*(s+1))) { |
| dest->push_back('x'); // treat it like a unknown escape |
| break; |
| } |
| int ch = fromxdigit(*++s); |
| if (isxdigit(*(s+1))) { |
| ch <<= 4; |
| ch |= fromxdigit(*++s); |
| } |
| dest->push_back(static_cast<char>(ch)); |
| break; |
| } |
| default: |
| dest->push_back(s[-1]); |
| break; |
| } |
| } |
| } |
| |
| bool Storage::LoadPersistent( |
| std::istream& is, |
| std::function<void(std::size_t line, const char* msg)> warn) { |
| std::string line_str; |
| std::size_t line_num = 1; |
| |
| // entries to add |
| std::vector<std::pair<std::string, std::shared_ptr<Value>>> entries; |
| |
| // declare these outside the loop to reduce reallocs |
| std::string name, str; |
| std::vector<int> boolean_array; |
| std::vector<double> double_array; |
| std::vector<std::string> string_array; |
| |
| // ignore blank lines and lines that start with ; or # (comments) |
| while (std::getline(is, line_str)) { |
| llvm::StringRef line = llvm::StringRef(line_str).trim(); |
| if (!line.empty() && line.front() != ';' && line.front() != '#') |
| break; |
| } |
| |
| // header |
| if (line_str != "[NetworkTables Storage 3.0]") { |
| if (warn) warn(line_num, "header line mismatch, ignoring rest of file"); |
| return false; |
| } |
| |
| while (std::getline(is, line_str)) { |
| llvm::StringRef line = llvm::StringRef(line_str).trim(); |
| ++line_num; |
| |
| // ignore blank lines and lines that start with ; or # (comments) |
| if (line.empty() || line.front() == ';' || line.front() == '#') |
| continue; |
| |
| // type |
| llvm::StringRef type_tok; |
| std::tie(type_tok, line) = line.split(' '); |
| NT_Type type = NT_UNASSIGNED; |
| if (type_tok == "boolean") type = NT_BOOLEAN; |
| else if (type_tok == "double") type = NT_DOUBLE; |
| else if (type_tok == "string") type = NT_STRING; |
| else if (type_tok == "raw") type = NT_RAW; |
| else if (type_tok == "array") { |
| llvm::StringRef array_tok; |
| std::tie(array_tok, line) = line.split(' '); |
| if (array_tok == "boolean") type = NT_BOOLEAN_ARRAY; |
| else if (array_tok == "double") type = NT_DOUBLE_ARRAY; |
| else if (array_tok == "string") type = NT_STRING_ARRAY; |
| } |
| if (type == NT_UNASSIGNED) { |
| if (warn) warn(line_num, "unrecognized type"); |
| continue; |
| } |
| |
| // name |
| llvm::StringRef name_tok; |
| std::tie(name_tok, line) = ReadStringToken(line); |
| if (name_tok.empty()) { |
| if (warn) warn(line_num, "missing name"); |
| continue; |
| } |
| if (name_tok.back() != '"') { |
| if (warn) warn(line_num, "unterminated name string"); |
| continue; |
| } |
| UnescapeString(name_tok, &name); |
| |
| // = |
| line = line.ltrim(" \t"); |
| if (line.empty() || line.front() != '=') { |
| if (warn) warn(line_num, "expected = after name"); |
| continue; |
| } |
| line = line.drop_front().ltrim(" \t"); |
| |
| // value |
| std::shared_ptr<Value> value; |
| switch (type) { |
| case NT_BOOLEAN: |
| // only true or false is accepted |
| if (line == "true") |
| value = Value::MakeBoolean(true); |
| else if (line == "false") |
| value = Value::MakeBoolean(false); |
| else { |
| if (warn) |
| warn(line_num, "unrecognized boolean value, not 'true' or 'false'"); |
| goto next_line; |
| } |
| break; |
| case NT_DOUBLE: { |
| // need to convert to null-terminated string for strtod() |
| str.clear(); |
| str += line; |
| char* end; |
| double v = std::strtod(str.c_str(), &end); |
| if (*end != '\0') { |
| if (warn) warn(line_num, "invalid double value"); |
| goto next_line; |
| } |
| value = Value::MakeDouble(v); |
| break; |
| } |
| case NT_STRING: { |
| llvm::StringRef str_tok; |
| std::tie(str_tok, line) = ReadStringToken(line); |
| if (str_tok.empty()) { |
| if (warn) warn(line_num, "missing string value"); |
| goto next_line; |
| } |
| if (str_tok.back() != '"') { |
| if (warn) warn(line_num, "unterminated string value"); |
| goto next_line; |
| } |
| UnescapeString(str_tok, &str); |
| value = Value::MakeString(std::move(str)); |
| break; |
| } |
| case NT_RAW: |
| Base64Decode(line, &str); |
| value = Value::MakeRaw(std::move(str)); |
| break; |
| case NT_BOOLEAN_ARRAY: { |
| llvm::StringRef elem_tok; |
| boolean_array.clear(); |
| while (!line.empty()) { |
| std::tie(elem_tok, line) = line.split(','); |
| elem_tok = elem_tok.trim(" \t"); |
| if (elem_tok == "true") |
| boolean_array.push_back(1); |
| else if (elem_tok == "false") |
| boolean_array.push_back(0); |
| else { |
| if (warn) |
| warn(line_num, |
| "unrecognized boolean value, not 'true' or 'false'"); |
| goto next_line; |
| } |
| } |
| |
| value = Value::MakeBooleanArray(std::move(boolean_array)); |
| break; |
| } |
| case NT_DOUBLE_ARRAY: { |
| llvm::StringRef elem_tok; |
| double_array.clear(); |
| while (!line.empty()) { |
| std::tie(elem_tok, line) = line.split(','); |
| elem_tok = elem_tok.trim(" \t"); |
| // need to convert to null-terminated string for strtod() |
| str.clear(); |
| str += elem_tok; |
| char* end; |
| double v = std::strtod(str.c_str(), &end); |
| if (*end != '\0') { |
| if (warn) warn(line_num, "invalid double value"); |
| goto next_line; |
| } |
| double_array.push_back(v); |
| } |
| |
| value = Value::MakeDoubleArray(std::move(double_array)); |
| break; |
| } |
| case NT_STRING_ARRAY: { |
| llvm::StringRef elem_tok; |
| string_array.clear(); |
| while (!line.empty()) { |
| std::tie(elem_tok, line) = ReadStringToken(line); |
| if (elem_tok.empty()) { |
| if (warn) warn(line_num, "missing string value"); |
| goto next_line; |
| } |
| if (elem_tok.back() != '"') { |
| if (warn) warn(line_num, "unterminated string value"); |
| goto next_line; |
| } |
| |
| UnescapeString(elem_tok, &str); |
| string_array.push_back(std::move(str)); |
| |
| line = line.ltrim(" \t"); |
| if (line.empty()) break; |
| if (line.front() != ',') { |
| if (warn) warn(line_num, "expected comma between strings"); |
| goto next_line; |
| } |
| line = line.drop_front().ltrim(" \t"); |
| } |
| |
| value = Value::MakeStringArray(std::move(string_array)); |
| break; |
| } |
| default: |
| break; |
| } |
| if (!name.empty() && value) |
| entries.push_back(std::make_pair(std::move(name), std::move(value))); |
| next_line: |
| ; |
| } |
| |
| // copy values into storage as quickly as possible so lock isn't held |
| { |
| std::vector<std::shared_ptr<Message>> msgs; |
| std::unique_lock<std::mutex> lock(m_mutex); |
| for (auto& i : entries) { |
| auto& new_entry = m_entries[i.first]; |
| if (!new_entry) new_entry.reset(new Entry(i.first)); |
| Entry* entry = new_entry.get(); |
| auto old_value = entry->value; |
| entry->value = i.second; |
| bool was_persist = entry->IsPersistent(); |
| if (!was_persist) entry->flags |= NT_PERSISTENT; |
| |
| // if we're the server, assign an id if it doesn't have one |
| if (m_server && entry->id == 0xffff) { |
| unsigned int id = m_idmap.size(); |
| entry->id = id; |
| m_idmap.push_back(entry); |
| } |
| |
| // notify (for local listeners) |
| if (m_notifier.local_notifiers()) { |
| if (!old_value) |
| m_notifier.NotifyEntry(i.first, i.second, |
| NT_NOTIFY_NEW | NT_NOTIFY_LOCAL); |
| else if (*old_value != *i.second) { |
| unsigned int notify_flags = NT_NOTIFY_UPDATE | NT_NOTIFY_LOCAL; |
| if (!was_persist) notify_flags |= NT_NOTIFY_FLAGS; |
| m_notifier.NotifyEntry(i.first, i.second, notify_flags); |
| } |
| } |
| |
| if (!m_queue_outgoing) continue; // shortcut |
| ++entry->seq_num; |
| |
| // put on update queue |
| if (!old_value || old_value->type() != i.second->type()) |
| msgs.emplace_back(Message::EntryAssign(i.first, entry->id, |
| entry->seq_num.value(), |
| i.second, entry->flags)); |
| else if (entry->id != 0xffff) { |
| // don't send an update if we don't have an assigned id yet |
| if (*old_value != *i.second) |
| msgs.emplace_back(Message::EntryUpdate( |
| entry->id, entry->seq_num.value(), i.second)); |
| if (!was_persist) |
| msgs.emplace_back(Message::FlagsUpdate(entry->id, entry->flags)); |
| } |
| } |
| |
| if (m_queue_outgoing) { |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| for (auto& msg : msgs) queue_outgoing(std::move(msg), nullptr, nullptr); |
| } |
| } |
| |
| return true; |
| } |
| |
| const char* Storage::LoadPersistent( |
| StringRef filename, |
| std::function<void(std::size_t line, const char* msg)> warn) { |
| std::ifstream is(filename); |
| if (!is) return "could not open file"; |
| if (!LoadPersistent(is, warn)) return "error reading file"; |
| return nullptr; |
| } |
| |
| void Storage::CreateRpc(StringRef name, StringRef def, RpcCallback callback) { |
| if (name.empty() || def.empty() || !callback) return; |
| std::unique_lock<std::mutex> lock(m_mutex); |
| if (!m_server) return; // only server can create RPCs |
| |
| auto& new_entry = m_entries[name]; |
| if (!new_entry) new_entry.reset(new Entry(name)); |
| Entry* entry = new_entry.get(); |
| auto old_value = entry->value; |
| auto value = Value::MakeRpc(def); |
| entry->value = value; |
| |
| // set up the new callback |
| entry->rpc_callback = callback; |
| |
| // start the RPC server |
| if (!m_rpc_server.active()) m_rpc_server.Start(); |
| |
| if (old_value && *old_value == *value) return; |
| |
| // assign an id if it doesn't have one |
| if (entry->id == 0xffff) { |
| unsigned int id = m_idmap.size(); |
| entry->id = id; |
| m_idmap.push_back(entry); |
| } |
| |
| // generate message |
| if (!m_queue_outgoing) return; |
| auto queue_outgoing = m_queue_outgoing; |
| if (!old_value || old_value->type() != value->type()) { |
| ++entry->seq_num; |
| auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(), |
| value, entry->flags); |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } else { |
| ++entry->seq_num; |
| auto msg = Message::EntryUpdate(entry->id, entry->seq_num.value(), value); |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } |
| } |
| |
| void Storage::CreatePolledRpc(StringRef name, StringRef def) { |
| if (name.empty() || def.empty()) return; |
| std::unique_lock<std::mutex> lock(m_mutex); |
| if (!m_server) return; // only server can create RPCs |
| |
| auto& new_entry = m_entries[name]; |
| if (!new_entry) new_entry.reset(new Entry(name)); |
| Entry* entry = new_entry.get(); |
| auto old_value = entry->value; |
| auto value = Value::MakeRpc(def); |
| entry->value = value; |
| |
| // a nullptr callback indicates a polled RPC |
| entry->rpc_callback = nullptr; |
| |
| if (old_value && *old_value == *value) return; |
| |
| // assign an id if it doesn't have one |
| if (entry->id == 0xffff) { |
| unsigned int id = m_idmap.size(); |
| entry->id = id; |
| m_idmap.push_back(entry); |
| } |
| |
| // generate message |
| if (!m_queue_outgoing) return; |
| auto queue_outgoing = m_queue_outgoing; |
| if (!old_value || old_value->type() != value->type()) { |
| ++entry->seq_num; |
| auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(), |
| value, entry->flags); |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } else { |
| ++entry->seq_num; |
| auto msg = Message::EntryUpdate(entry->id, entry->seq_num.value(), value); |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } |
| } |
| |
| unsigned int Storage::CallRpc(StringRef name, StringRef params) { |
| std::unique_lock<std::mutex> lock(m_mutex); |
| auto i = m_entries.find(name); |
| if (i == m_entries.end()) return 0; |
| auto& entry = i->getValue(); |
| if (!entry->value->IsRpc()) return 0; |
| |
| ++entry->rpc_call_uid; |
| if (entry->rpc_call_uid > 0xffff) entry->rpc_call_uid = 0; |
| unsigned int combined_uid = (entry->id << 16) | entry->rpc_call_uid; |
| auto msg = Message::ExecuteRpc(entry->id, entry->rpc_call_uid, params); |
| if (m_server) { |
| // RPCs are unlikely to be used locally on the server, but handle it |
| // gracefully anyway. |
| auto rpc_callback = entry->rpc_callback; |
| lock.unlock(); |
| m_rpc_server.ProcessRpc( |
| name, msg, rpc_callback, 0xffffU, [this](std::shared_ptr<Message> msg) { |
| std::lock_guard<std::mutex> lock(m_mutex); |
| m_rpc_results.insert(std::make_pair( |
| std::make_pair(msg->id(), msg->seq_num_uid()), msg->str())); |
| m_rpc_results_cond.notify_all(); |
| }); |
| } else { |
| auto queue_outgoing = m_queue_outgoing; |
| lock.unlock(); |
| queue_outgoing(msg, nullptr, nullptr); |
| } |
| return combined_uid; |
| } |
| |
| bool Storage::GetRpcResult(bool blocking, unsigned int call_uid, |
| std::string* result) { |
| std::unique_lock<std::mutex> lock(m_mutex); |
| for (;;) { |
| auto i = |
| m_rpc_results.find(std::make_pair(call_uid >> 16, call_uid & 0xffff)); |
| if (i == m_rpc_results.end()) { |
| if (!blocking || m_terminating) return false; |
| m_rpc_results_cond.wait(lock); |
| if (m_terminating) return false; |
| continue; |
| } |
| result->swap(i->getSecond()); |
| m_rpc_results.erase(i); |
| return true; |
| } |
| } |