Run yapf on all python files in the repo
Signed-off-by: Ravago Jones <ravagojones@gmail.com>
Change-Id: I221e04c3f517fab8535b22551553799e0fee7a80
diff --git a/aos/seasocks/gen_embedded.py b/aos/seasocks/gen_embedded.py
index fdcdfd9..b6f43aa 100755
--- a/aos/seasocks/gen_embedded.py
+++ b/aos/seasocks/gen_embedded.py
@@ -13,18 +13,18 @@
output = sys.argv[1].replace('"', '')
if not os.path.exists(os.path.dirname(output)):
- os.makedirs(os.path.dirname(output))
+ os.makedirs(os.path.dirname(output))
if len(sys.argv) >= 3:
- web = sys.argv[2:]
+ web = sys.argv[2:]
else:
- web = []
- for root, dirs, files in os.walk("./www_defaults", topdown=False):
- for name in files + dirs:
- web.append(os.path.join(root, name))
+ web = []
+ for root, dirs, files in os.walk("./www_defaults", topdown=False):
+ for name in files + dirs:
+ web.append(os.path.join(root, name))
with open(output, 'w') as o:
- o.write("""
+ o.write("""
#include "internal/Embedded.h"
#include <string>
@@ -35,14 +35,14 @@
std::unordered_map<std::string, EmbeddedContent> embedded = {
""")
- for f in web:
- with open(f, 'rb') as file:
- bytes = file.read()
- o.write('{"/%s", {' % os.path.basename(f))
- o.write('"' + "".join(['\\x%02x' % x for x in bytes]) + '"')
- o.write(',%d }},' % len(bytes))
+ for f in web:
+ with open(f, 'rb') as file:
+ bytes = file.read()
+ o.write('{"/%s", {' % os.path.basename(f))
+ o.write('"' + "".join(['\\x%02x' % x for x in bytes]) + '"')
+ o.write(',%d }},' % len(bytes))
- o.write("""
+ o.write("""
};
} // namespace
diff --git a/aos/util/log_to_mcap_test.py b/aos/util/log_to_mcap_test.py
index 72c9078..7bdffe4 100644
--- a/aos/util/log_to_mcap_test.py
+++ b/aos/util/log_to_mcap_test.py
@@ -12,25 +12,32 @@
def main(argv: Sequence[Text]):
parser = argparse.ArgumentParser()
- parser.add_argument("--log_to_mcap", required=True, help="Path to log_to_mcap binary.")
+ parser.add_argument("--log_to_mcap",
+ required=True,
+ help="Path to log_to_mcap binary.")
parser.add_argument("--mcap", required=True, help="Path to mcap binary.")
- parser.add_argument("--generate_log", required=True, help="Path to logfile generator.")
+ parser.add_argument("--generate_log",
+ required=True,
+ help="Path to logfile generator.")
args = parser.parse_args(argv)
with tempfile.TemporaryDirectory() as tmpdir:
log_name = tmpdir + "/test_log/"
mcap_name = tmpdir + "/log.mcap"
- subprocess.run([args.generate_log, "--output_folder", log_name]).check_returncode()
+ subprocess.run([args.generate_log, "--output_folder",
+ log_name]).check_returncode()
# Run with a really small chunk size, to force a multi-chunk file.
- subprocess.run(
- [args.log_to_mcap, "--output_path", mcap_name, "--mcap_chunk_size", "1000", "--mode", "json",
- log_name]).check_returncode()
+ subprocess.run([
+ args.log_to_mcap, "--output_path", mcap_name, "--mcap_chunk_size",
+ "1000", "--mode", "json", log_name
+ ]).check_returncode()
# MCAP attempts to find $HOME/.mcap.yaml, and dies on $HOME not existing. So
# give it an arbitrary config location (it seems to be fine with a non-existent config).
- doctor_result = subprocess.run(
- [args.mcap, "doctor", mcap_name, "--config", tmpdir + "/.mcap.yaml"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding='utf-8')
+ doctor_result = subprocess.run([
+ args.mcap, "doctor", mcap_name, "--config", tmpdir + "/.mcap.yaml"
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ encoding='utf-8')
print(doctor_result.stdout)
print(doctor_result.stderr)
# mcap doctor doesn't actually return a non-zero exit code on certain failures...
diff --git a/aos/util/trapezoid_profile.py b/aos/util/trapezoid_profile.py
index af85339..76c9758 100644
--- a/aos/util/trapezoid_profile.py
+++ b/aos/util/trapezoid_profile.py
@@ -2,8 +2,9 @@
import numpy
+
class TrapezoidProfile(object):
- """Computes a trapezoidal motion profile
+ """Computes a trapezoidal motion profile
Attributes:
_acceleration_time: the amount of time the robot will travel at the
@@ -21,136 +22,140 @@
_timestep: time between calls to Update (delta_time)
_output: output array containing distance to goal and velocity
"""
- def __init__(self, delta_time):
- """Constructs a TrapezoidProfile.
+
+ def __init__(self, delta_time):
+ """Constructs a TrapezoidProfile.
Args:
delta_time: time between calls to Update (seconds)
"""
- self._acceleration_time = 0
- self._acceleration = 0
- self._constant_time = 0
- self._deceleration_time = 0
- self._deceleration = 0
+ self._acceleration_time = 0
+ self._acceleration = 0
+ self._constant_time = 0
+ self._deceleration_time = 0
+ self._deceleration = 0
- self._maximum_acceleration = 0
- self._maximum_velocity = 0
- self._timestep = delta_time
+ self._maximum_acceleration = 0
+ self._maximum_velocity = 0
+ self._timestep = delta_time
- self._output = numpy.array(numpy.zeros((2,1)))
+ self._output = numpy.array(numpy.zeros((2, 1)))
- # Updates the state
- def Update(self, goal_position, goal_velocity):
- self._CalculateTimes(goal_position - self._output[0], goal_velocity)
+ # Updates the state
+ def Update(self, goal_position, goal_velocity):
+ self._CalculateTimes(goal_position - self._output[0], goal_velocity)
- next_timestep = self._timestep
+ next_timestep = self._timestep
- # We now have the amount of time we need to accelerate to follow the
- # profile, the amount of time we need to move at constant velocity
- # to follow the profile, and the amount of time we need to decelerate to
- # follow the profile. Do as much of that as we have time left in dt.
- if self._acceleration_time > next_timestep:
- self._UpdateVals(self._acceleration, next_timestep)
- else:
- self._UpdateVals(self._acceleration, self._acceleration_time)
-
- next_timestep -= self._acceleration_time
- if self._constant_time > next_timestep:
- self._UpdateVals(0, next_timestep)
- else:
- self._UpdateVals(0, self._constant_time)
- next_timestep -= self._constant_time;
- if self._deceleration_time > next_timestep:
- self._UpdateVals(self._deceleration, next_timestep)
+ # We now have the amount of time we need to accelerate to follow the
+ # profile, the amount of time we need to move at constant velocity
+ # to follow the profile, and the amount of time we need to decelerate to
+ # follow the profile. Do as much of that as we have time left in dt.
+ if self._acceleration_time > next_timestep:
+ self._UpdateVals(self._acceleration, next_timestep)
else:
- self._UpdateVals(self._deceleration, self._deceleration_time)
- next_timestep -= self._deceleration_time
- self._UpdateVals(0, next_timestep)
+ self._UpdateVals(self._acceleration, self._acceleration_time)
- return self._output
+ next_timestep -= self._acceleration_time
+ if self._constant_time > next_timestep:
+ self._UpdateVals(0, next_timestep)
+ else:
+ self._UpdateVals(0, self._constant_time)
+ next_timestep -= self._constant_time
+ if self._deceleration_time > next_timestep:
+ self._UpdateVals(self._deceleration, next_timestep)
+ else:
+ self._UpdateVals(self._deceleration,
+ self._deceleration_time)
+ next_timestep -= self._deceleration_time
+ self._UpdateVals(0, next_timestep)
- # Useful for preventing windup etc.
- def MoveCurrentState(self, current):
- self._output = current
+ return self._output
- # Useful for preventing windup etc.
- def MoveGoal(self, dx):
- self._output[0] += dx
+ # Useful for preventing windup etc.
+ def MoveCurrentState(self, current):
+ self._output = current
- def SetGoal(self, x):
- self._output[0] = x
+ # Useful for preventing windup etc.
+ def MoveGoal(self, dx):
+ self._output[0] += dx
- def set_maximum_acceleration(self, maximum_acceleration):
- self._maximum_acceleration = maximum_acceleration
+ def SetGoal(self, x):
+ self._output[0] = x
- def set_maximum_velocity(self, maximum_velocity):
- self._maximum_velocity = maximum_velocity
+ def set_maximum_acceleration(self, maximum_acceleration):
+ self._maximum_acceleration = maximum_acceleration
- def _UpdateVals(self, acceleration, delta_time):
- self._output[0, 0] += (self._output[1, 0] * delta_time
- + 0.5 * acceleration * delta_time * delta_time)
- self._output[1, 0] += acceleration * delta_time
+ def set_maximum_velocity(self, maximum_velocity):
+ self._maximum_velocity = maximum_velocity
- def _CalculateTimes(self, distance_to_target, goal_velocity):
- if distance_to_target == 0:
- self._acceleration_time = 0
- self._acceleration = 0
- self._constant_time = 0
- self._deceleration_time = 0
- self._deceleration = 0
- return
- elif distance_to_target < 0:
- # Recurse with everything inverted.
- self._output[1] *= -1
- self._CalculateTimes(-distance_to_target, -goal_velocity)
- self._output[1] *= -1
- self._acceleration *= -1
- self._deceleration *= -1
- return
+ def _UpdateVals(self, acceleration, delta_time):
+ self._output[0, 0] += (self._output[1, 0] * delta_time +
+ 0.5 * acceleration * delta_time * delta_time)
+ self._output[1, 0] += acceleration * delta_time
- self._constant_time = 0
- self._acceleration = self._maximum_acceleration
- maximum_acceleration_velocity = (
- distance_to_target * 2 * numpy.abs(self._acceleration)
- + self._output[1] * self._output[1])
- if maximum_acceleration_velocity > 0:
- maximum_acceleration_velocity = numpy.sqrt(maximum_acceleration_velocity)
- else:
- maximum_acceleration_velocity = -numpy.sqrt(-maximum_acceleration_velocity)
+ def _CalculateTimes(self, distance_to_target, goal_velocity):
+ if distance_to_target == 0:
+ self._acceleration_time = 0
+ self._acceleration = 0
+ self._constant_time = 0
+ self._deceleration_time = 0
+ self._deceleration = 0
+ return
+ elif distance_to_target < 0:
+ # Recurse with everything inverted.
+ self._output[1] *= -1
+ self._CalculateTimes(-distance_to_target, -goal_velocity)
+ self._output[1] *= -1
+ self._acceleration *= -1
+ self._deceleration *= -1
+ return
- # Since we know what we'd have to do if we kept after it to decelerate, we
- # know the sign of the acceleration.
- if maximum_acceleration_velocity > goal_velocity:
- self._deceleration = -self._maximum_acceleration
- else:
- self._deceleration = self._maximum_acceleration
+ self._constant_time = 0
+ self._acceleration = self._maximum_acceleration
+ maximum_acceleration_velocity = (
+ distance_to_target * 2 * numpy.abs(self._acceleration) +
+ self._output[1] * self._output[1])
+ if maximum_acceleration_velocity > 0:
+ maximum_acceleration_velocity = numpy.sqrt(
+ maximum_acceleration_velocity)
+ else:
+ maximum_acceleration_velocity = -numpy.sqrt(
+ -maximum_acceleration_velocity)
- # We now know the top velocity we can get to.
- top_velocity = numpy.sqrt((distance_to_target +
- (self._output[1] * self._output[1]) /
- (2.0 * self._acceleration) +
- (goal_velocity * goal_velocity) /
- (2.0 * self._deceleration)) /
- (-1.0 / (2.0 * self._deceleration) +
- 1.0 / (2.0 * self._acceleration)))
+ # Since we know what we'd have to do if we kept after it to decelerate, we
+ # know the sign of the acceleration.
+ if maximum_acceleration_velocity > goal_velocity:
+ self._deceleration = -self._maximum_acceleration
+ else:
+ self._deceleration = self._maximum_acceleration
- # If it can go too fast, we now know how long we get to accelerate for and
- # how long to go at constant velocity.
- if top_velocity > self._maximum_velocity:
- self._acceleration_time = ((self._maximum_velocity - self._output[1]) /
- self._maximum_acceleration)
- self._constant_time = (distance_to_target +
- (goal_velocity * goal_velocity -
- self._maximum_velocity * self._maximum_velocity) /
- (2.0 * self._maximum_acceleration)) / self._maximum_velocity
- else:
- self._acceleration_time = (
- (top_velocity - self._output[1]) / self._acceleration)
+ # We now know the top velocity we can get to.
+ top_velocity = numpy.sqrt(
+ (distance_to_target + (self._output[1] * self._output[1]) /
+ (2.0 * self._acceleration) + (goal_velocity * goal_velocity) /
+ (2.0 * self._deceleration)) / (-1.0 /
+ (2.0 * self._deceleration) + 1.0 /
+ (2.0 * self._acceleration)))
- if self._output[1] > self._maximum_velocity:
- self._constant_time = 0
- self._acceleration_time = 0
+ # If it can go too fast, we now know how long we get to accelerate for and
+ # how long to go at constant velocity.
+ if top_velocity > self._maximum_velocity:
+ self._acceleration_time = (
+ (self._maximum_velocity - self._output[1]) /
+ self._maximum_acceleration)
+ self._constant_time = (
+ distance_to_target +
+ (goal_velocity * goal_velocity -
+ self._maximum_velocity * self._maximum_velocity) /
+ (2.0 * self._maximum_acceleration)) / self._maximum_velocity
+ else:
+ self._acceleration_time = ((top_velocity - self._output[1]) /
+ self._acceleration)
- self._deceleration_time = (
- (goal_velocity - top_velocity) / self._deceleration)
+ if self._output[1] > self._maximum_velocity:
+ self._constant_time = 0
+ self._acceleration_time = 0
+ self._deceleration_time = ((goal_velocity - top_velocity) /
+ self._deceleration)
diff --git a/aos/vision/download/downloader.py b/aos/vision/download/downloader.py
index 3dafb48..0d15b87 100644
--- a/aos/vision/download/downloader.py
+++ b/aos/vision/download/downloader.py
@@ -12,39 +12,51 @@
import os
import os.path
+
def RunAndSplitShas(shcmd):
- out = subprocess.check_output(shcmd)
- return [line.split(' ')[0] for line in filter(lambda x: x, out.split('\n'))]
+ out = subprocess.check_output(shcmd)
+ return [
+ line.split(' ')[0] for line in filter(lambda x: x, out.split('\n'))
+ ]
+
def GetChecksums(fnames):
- return RunAndSplitShas(["sha256sum"] + fnames)
+ return RunAndSplitShas(["sha256sum"] + fnames)
+
def GetJetsonChecksums(ssh_target, fnames):
- target_files = ["/root/%s" % fname for fname in fnames]
- subprocess.check_call(["ssh", ssh_target, "touch " + " ".join(target_files)])
- cmds = ["ssh", ssh_target, "sha256sum " + " ".join(target_files)]
- return RunAndSplitShas(cmds)
+ target_files = ["/root/%s" % fname for fname in fnames]
+ subprocess.check_call(
+ ["ssh", ssh_target, "touch " + " ".join(target_files)])
+ cmds = ["ssh", ssh_target, "sha256sum " + " ".join(target_files)]
+ return RunAndSplitShas(cmds)
+
def ToJetsonFname(fname):
- if (fname[-9:] == ".stripped"):
- fname = fname[:-9]
- return os.path.basename(fname)
+ if (fname[-9:] == ".stripped"):
+ fname = fname[:-9]
+ return os.path.basename(fname)
+
def VerifyCheckSumsAndUpload(fnames, ssh_target):
- jetson_fnames = [ToJetsonFname(fname) for fname in fnames]
- checksums = GetChecksums(fnames)
- jetson_checksums = GetJetsonChecksums(ssh_target, jetson_fnames)
- for i in range(len(fnames)):
- if (checksums[i] != jetson_checksums[i]):
- # if empty, unlink
- subprocess.check_call(["ssh", ssh_target, "unlink " + jetson_fnames[i]])
- subprocess.check_call(["scp", fnames[i], ssh_target + ":" + jetson_fnames[i]])
+ jetson_fnames = [ToJetsonFname(fname) for fname in fnames]
+ checksums = GetChecksums(fnames)
+ jetson_checksums = GetJetsonChecksums(ssh_target, jetson_fnames)
+ for i in range(len(fnames)):
+ if (checksums[i] != jetson_checksums[i]):
+ # if empty, unlink
+ subprocess.check_call(
+ ["ssh", ssh_target, "unlink " + jetson_fnames[i]])
+ subprocess.check_call(
+ ["scp", fnames[i], ssh_target + ":" + jetson_fnames[i]])
+
def main(argv):
- args = argv[argv.index('--') + 1:]
- files = argv[1:argv.index('--')]
+ args = argv[argv.index('--') + 1:]
+ files = argv[1:argv.index('--')]
- VerifyCheckSumsAndUpload(files, args[-1])
+ VerifyCheckSumsAndUpload(files, args[-1])
+
if __name__ == '__main__':
- main(sys.argv)
+ main(sys.argv)