blob: 178b63d429a4be10fca7659c8a2f4e01089b9d25 [file] [log] [blame]
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
import numpy as np
import queue
import threading
import copy
from multispline import Multispline
from libspline import Spline, DistanceSpline, Trajectory
from matplotlib.backends.backend_gtk3agg import (FigureCanvasGTK3Agg as
FigureCanvas)
from matplotlib.figure import Figure
class Graph(Gtk.Bin):
def __init__(self):
super(Graph, self).__init__()
fig = Figure(figsize=(5, 4), dpi=100)
self.axis = fig.add_subplot(111)
self.canvas = FigureCanvas(fig) # a Gtk.DrawingArea
self.canvas.set_vexpand(True)
self.canvas.set_size_request(800, 250)
self.callback_id = self.canvas.mpl_connect('motion_notify_event',
self.on_mouse_move)
self.add(self.canvas)
# The current graph data
self.data = None
# The size of a timestep
self.dt = 0.00505
# The position of the cursor in seconds
self.cursor = 0
# Reference to the parent gtk Widget that wants to get redrawn
# when user moves the cursor
self.cursor_watcher = None
self.cursor_line = None
self.queue = queue.Queue(maxsize=1)
thread = threading.Thread(target=self.worker)
thread.daemon = True
thread.start()
def find_cursor(self):
"""Gets the cursor position as a distance along the spline"""
if self.data is None:
return None
cursor_index = int(self.cursor / self.dt)
if cursor_index > self.data.size:
return None
# use the time to index into the position data
distance_at_cursor = self.data[0][cursor_index - 1]
multispline_index = int(self.data[5][cursor_index - 1])
return (multispline_index, distance_at_cursor)
def place_cursor(self, multispline_index, distance):
"""Places the cursor at a certain distance along the spline"""
if self.data is None:
return
# find the section that is the current multispline
start_of_multispline = np.searchsorted(self.data[5],
multispline_index,
side='left')
end_of_multispline = np.searchsorted(self.data[5],
multispline_index,
side='right')
multispline_region = self.data[0][
start_of_multispline:end_of_multispline]
# convert distance along this multispline to time along trajectory
index = np.searchsorted(multispline_region, distance,
side='left') + start_of_multispline
time = index * self.dt
self.cursor = time
self.redraw_cursor()
def on_mouse_move(self, event):
"""Updates the cursor and all the canvases that watch it on mouse move"""
if self.data is None:
return
total_steps_taken = self.data.shape[1]
total_time = self.dt * total_steps_taken
if event.xdata is not None:
# clip the position if still on the canvas, but off the graph
self.cursor = np.clip(event.xdata, 0, total_time)
self.redraw_cursor()
# tell the field to update too
if self.cursor_watcher is not None:
self.cursor_watcher.queue_draw()
def redraw_cursor(self):
"""Redraws the cursor line"""
# TODO: This redraws the entire graph and isn't very snappy
if self.cursor_line: self.cursor_line.remove()
self.cursor_line = self.axis.axvline(self.cursor)
self.canvas.draw_idle()
def schedule_recalculate(self, multisplines):
"""Submits points to be graphed
Can be superseded by newer points if an old one isn't finished processing.
"""
new_copy = copy.deepcopy(multisplines)
# empty the queue
try:
self.queue.get_nowait()
except queue.Empty:
pass # was already empty
# replace with new request
self.queue.put_nowait(new_copy)
def worker(self):
while True:
self.recalculate_graph(self.queue.get())
def recalculate_graph(self, multisplines):
if len(multisplines) == 0: return
# call C++ wrappers to calculate the trajectory
full_data = None
for multispline_index, multispline in enumerate(multisplines):
multispline.update_lib_spline()
if len(multispline.getLibsplines()) == 0: continue
distanceSpline = DistanceSpline(multispline.getLibsplines())
traj = Trajectory(distanceSpline)
multispline.addConstraintsToTrajectory(traj)
traj.Plan()
XVA = traj.GetPlanXVA(self.dt)
if XVA is None: continue
position, _, _ = XVA
voltages = np.transpose([traj.Voltage(x) for x in position])
data = np.append(XVA, voltages, axis=0)
indicies = np.full((1, XVA.shape[1]), multispline_index, dtype=int)
data = np.append(data, indicies, axis=0)
if full_data is not None:
full_data = np.append(full_data, data, axis=1)
else:
full_data = data
if full_data is None: return
self.data = full_data
# extract values to be graphed
total_steps_taken = full_data.shape[1]
total_time = self.dt * total_steps_taken
times = np.linspace(0, total_time, num=total_steps_taken)
position, velocity, acceleration, left_voltage, right_voltage, _ = full_data
# update graph
self.axis.clear()
self.axis.plot(times, velocity)
self.axis.plot(times, acceleration)
self.axis.plot(times, left_voltage)
self.axis.plot(times, right_voltage)
self.axis.legend(
["Velocity", "Acceleration", "Left Voltage", "Right Voltage"])
self.axis.xaxis.set_label_text("Time (sec)")
# renumber the x-axis to include the last point,
# the total time to drive the spline
self.axis.xaxis.set_ticks(np.linspace(0, total_time, num=8))
# redraw
self.canvas.draw_idle()