From e746cf16748a0eab02828f024859e4bc81edc902 Mon Sep 17 00:00:00 2001 From: Luke Berndt Date: Mon, 1 Apr 2024 14:53:19 -0400 Subject: [PATCH] code clean up --- axis-ptz-controller/axis_ptz_controller.py | 73 +++++++++++----------- axis-ptz-controller/camera_control.py | 3 +- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/axis-ptz-controller/axis_ptz_controller.py b/axis-ptz-controller/axis_ptz_controller.py index 88fa34b..ccacd27 100644 --- a/axis-ptz-controller/axis_ptz_controller.py +++ b/axis-ptz-controller/axis_ptz_controller.py @@ -32,11 +32,13 @@ from camera_configuration import CameraConfiguration from camera_control import CameraControl + class Status(Enum): SLEEPING = 0 SLEWING = 1 TRACKING = 2 + class AxisPtzController(BaseMQTTPubSub): """Point the camera at an object using a proportional rate controller, and capture images while in track.""" @@ -524,7 +526,9 @@ def _config_callback( "capture_interval", self.capture_interval ) # [s] self.capture_dir = config.get("capture_dir", self.capture_dir) - self.tracking_interval = config.get("tracking_interval", self.tracking_interval) # [s] + self.tracking_interval = config.get( + "tracking_interval", self.tracking_interval + ) # [s] # Cannot set tripod yaw, pitch, and roll because of side effects self.pan_gain = config.get("pan_gain", self.pan_gain) # [1/s] self.pan_rate_max = config.get("pan_rate_max", self.pan_rate_max) @@ -617,8 +621,6 @@ def _log_config(self) -> None: f"AxisPtzController configuration:\n{json.dumps(config, indent=4)}" ) - - def _compute_object_pointing(self, time_since_last_update=0) -> None: # Compute position in the geocentric (XYZ) coordinate system # of the object relative to the tripod at time zero, the @@ -630,13 +632,14 @@ def _compute_object_pointing(self, time_since_last_update=0) -> None: # Assign lead time, computing and adding age of object # message, if enabled - time_delta = self.lead_time # [s] time_since_last_update + time_delta = self.lead_time # [s] time_since_last_update object_msg_age = 0 if self.include_age: - object_msg_age = time() - self.timestamp_o #datetime.utcnow().timestamp() - self.timestamp_o # [s] + object_msg_age = ( + time() - self.timestamp_o + ) # datetime.utcnow().timestamp() - self.timestamp_o # [s] logging.debug(f"Object msg age: {object_msg_age} [s]") time_delta += object_msg_age - # Compute position and velocity in the topocentric (ENz) # coordinate system of the object relative to the tripod at @@ -650,8 +653,12 @@ def _compute_object_pointing(self, time_since_last_update=0) -> None: self.vertical_rate_o, ] ) - r_ENz_o_N_t = self.r_ENz_o_0_t + self.v_ENz_o_0_t * object_msg_age # where the object is now - r_ENz_o_1_t = self.r_ENz_o_0_t + self.v_ENz_o_0_t * time_delta # where the object will be in the future, for leading + r_ENz_o_N_t = ( + self.r_ENz_o_0_t + self.v_ENz_o_0_t * object_msg_age + ) # where the object is now + r_ENz_o_1_t = ( + self.r_ENz_o_0_t + self.v_ENz_o_0_t * time_delta + ) # where the object will be in the future, for leading # Compute position, at time one, and velocity, at time zero, # in the geocentric (XYZ) coordinate system of the object @@ -685,7 +692,9 @@ def _compute_object_pointing(self, time_since_last_update=0) -> None: # Compute pan and tilt to point the camera at the object r_uvw_o_n_t = np.matmul(self.E_XYZ_to_uvw, self.r_XYZ_o_N_t) r_uvw_o_1_t = np.matmul(self.E_XYZ_to_uvw, self.r_XYZ_o_1_t) - self.rho_o_now = math.degrees(math.atan2(r_uvw_o_n_t[0], r_uvw_o_n_t[1])) # [deg] + self.rho_o_now = math.degrees( + math.atan2(r_uvw_o_n_t[0], r_uvw_o_n_t[1]) + ) # [deg] self.tau_o_now = math.degrees( math.atan2(r_uvw_o_n_t[2], axis_ptz_utilities.norm(r_uvw_o_n_t[0:2])) ) @@ -694,13 +703,14 @@ def _compute_object_pointing(self, time_since_last_update=0) -> None: math.atan2(r_uvw_o_1_t[2], axis_ptz_utilities.norm(r_uvw_o_1_t[0:2])) ) # [deg] logging.debug(f"Object pan and tilt: {self.rho_o}, {self.tau_o} [deg]") - logging.debug(f"\t🔭\tObject - Pan: {self.rho_o}\t Obj ENz 1t: {r_ENz_o_1_t}\t Obj ENz 0t: {self.r_ENz_o_0_t}\t Obj Velo 0t: {self.v_ENz_o_0_t}\t Lead time: {time_delta}") + logging.debug( + f"\t🔭\tObject - Pan: {self.rho_o}\t Obj ENz 1t: {r_ENz_o_1_t}\t Obj ENz 0t: {self.r_ENz_o_0_t}\t Obj Velo 0t: {self.v_ENz_o_0_t}\t Lead time: {time_delta}" + ) def _track_object(self, time_since_last_update) -> None: - if self.status != Status.TRACKING: return - + # Make sure Object info is not updated while pointing is being computed self.object_lock.acquire() @@ -716,7 +726,6 @@ def _track_object(self, time_since_last_update) -> None: else: logging.debug(f"Controller pan and tilt: {self.rho_c}, {self.tau_c} [deg]") - # Compute angle delta between camera and object pan and tilt self.delta_rho = self._compute_angle_delta(self.rho_c, self.rho_o) self.delta_tau = self._compute_angle_delta(self.tau_c, self.tau_o) @@ -767,7 +776,9 @@ def _track_object(self, time_since_last_update) -> None: rho_2_dot_o = math.degrees(-omega_2[2]) tau_2_dot_o = math.degrees(omega_2[0]) - logging.info(f"Object pan and tilt orig rates: {self.rho_dot_o}, {self.tau_dot_o} [deg/s] vs {rho_2_dot_o}, {tau_2_dot_o} [deg/s]") + logging.info( + f"Object pan and tilt orig rates: {self.rho_dot_o}, {self.tau_dot_o} [deg/s] vs {rho_2_dot_o}, {tau_2_dot_o} [deg/s]" + ) logging.debug( f"Object pan and tilt rates: {self.rho_dot_o}, {self.tau_dot_o} [deg/s]" ) @@ -823,15 +834,12 @@ def _track_object(self, time_since_last_update) -> None: self.do_capture = True self.capture_time = time() - if ( self.do_capture - and time() - self.capture_time > self.capture_interval - ): + if self.do_capture and time() - self.capture_time > self.capture_interval: capture_thread = threading.Thread(target=self._capture_image) capture_thread.daemon = True capture_thread.start() - - #logging.info(f"\t⏱️\tRATES - 🎥 Pan: {self.rho_dot_c}\tTilt: {self.tau_dot_c}\t🛩️ Pan: {self.rho_dot_o}\tTilt: {self.tau_dot_o} ANGLES: 🎥 Pan: {self.rho_c}\tTilt: {self.tau_c}\t🛩️ Pan: {self.rho_o}\tTilt: {self.tau_o} ") + # logging.info(f"\t⏱️\tRATES - 🎥 Pan: {self.rho_dot_c}\tTilt: {self.tau_dot_c}\t🛩️ Pan: {self.rho_dot_o}\tTilt: {self.tau_dot_o} ANGLES: 🎥 Pan: {self.rho_c}\tTilt: {self.tau_c}\t🛩️ Pan: {self.rho_o}\tTilt: {self.tau_o} ") elapsed_time = time() - start_time # Log camera pointing using MQTT if self.log_to_mqtt: @@ -876,15 +884,11 @@ def _track_object(self, time_since_last_update) -> None: logging.debug(f"Publishing logger msg: {logger_msg}") self.publish_to_topic(self.logger_topic, logger_msg) - - - def _slew_camera(self) -> None: - if self.status == Status.SLEWING: logging.error("Camera is already slewing") return - + self.status = Status.SLEWING # Get camera pan and tilt @@ -898,9 +902,7 @@ def _slew_camera(self) -> None: f"Absolute move to pan: {self.rho_o}, and tilt: {self.tau_o}, with zoom: {self.zoom}" ) try: - self.camera_control.absolute_move( - self.rho_o, self.tau_o, self.zoom, 99 - ) + self.camera_control.absolute_move(self.rho_o, self.tau_o, self.zoom, 99) except Exception as e: logging.error(f"Error: {e}") else: @@ -926,7 +928,6 @@ def _slew_camera(self) -> None: # Start Tracking self.status = Status.TRACKING - def _orientation_callback( self, _client: Union[mqtt.Client, None], @@ -1038,7 +1039,9 @@ def _object_callback( ) <= set(data.keys()): logging.info(f"Required keys missing from object message data: {data}") return - logging.info(f"\t🗒️\tProcessing object msg data: {data['object_id']} \t {data['latitude']} \t {data['longitude']} \t {data['altitude']}") + logging.info( + f"\t🗒️\tProcessing object msg data: {data['object_id']} \t {data['latitude']} \t {data['longitude']} \t {data['altitude']}" + ) self.timestamp_o = float(data["timestamp"]) # [s] self.timestamp_c = self.timestamp_o self.lambda_o = data["longitude"] # [deg] @@ -1060,10 +1063,10 @@ def _object_callback( self.camera_control.stop_move() except Exception as e: logging.error(f"Error: {e}") - + # Reset the stop timer because we received an object message self._reset_stop_timer() - + if self.use_camera: # Point the camera at any new object directly if self.object_id != object_id: @@ -1243,7 +1246,6 @@ def _compute_pan_rate_index(self, rho_dot: float): else: self.pan_rate_index = round((100 / self.pan_rate_max) * rho_dot) - def _compute_tilt_rate_index(self, tau_dot: float): """Compute tilt rate index between -100 and 100 using rates in deg/s, limiting the results to the specified minimum and @@ -1420,7 +1422,6 @@ def main(self) -> None: self.manual_control_topic, self._manual_control_callback ) - update_tracking_time = time() # Enter the main loop @@ -1435,9 +1436,11 @@ def main(self) -> None: if not self.use_camera: self._update_pointing() - # Track object - if ( self.use_camera and time() - update_tracking_time > self.tracking_interval ): + if ( + self.use_camera + and time() - update_tracking_time > self.tracking_interval + ): time_since_last_update = time() - update_tracking_time update_tracking_time = time() self._track_object(time_since_last_update) diff --git a/axis-ptz-controller/camera_control.py b/axis-ptz-controller/camera_control.py index fedbc28..1c14a1c 100644 --- a/axis-ptz-controller/camera_control.py +++ b/axis-ptz-controller/camera_control.py @@ -1,5 +1,6 @@ import logging from sensecam_control import vapix_control + logging.getLogger("vapix_control").setLevel(logging.CRITICAL) logging.getLogger("sensecam_control").setLevel(logging.CRITICAL) from typing import Tuple, Union @@ -55,7 +56,7 @@ def get_ptz(self) -> Tuple[float, float, float, float]: """ try: - logging.disable(logging.INFO) + logging.disable(logging.INFO) resp = self._camera_command({"query": "position"}) logging.disable(logging.NOTSET) pan = float(resp.text.split()[0].split("=")[1])