-
Notifications
You must be signed in to change notification settings - Fork 560
/
Copy pathcompositor.py
202 lines (169 loc) · 10.3 KB
/
compositor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to the Spot CAM Compositor service."""
from google.protobuf.wrappers_pb2 import BoolValue
from bosdyn.api.spot_cam import compositor_pb2, service_pb2_grpc
from bosdyn.client.common import BaseClient, handle_common_header_errors
class CompositorClient(BaseClient):
"""A client calling Spot CAM Compositor services.
"""
default_service_name = 'spot-cam-compositor'
service_type = 'bosdyn.api.spot_cam.CompositorService'
def __init__(self):
super(CompositorClient, self).__init__(service_pb2_grpc.CompositorServiceStub)
def set_screen(self, name, **kwargs):
"""Change the current view that is being streamed over the network"""
request = compositor_pb2.SetScreenRequest(name=name)
return self.call(self._stub.SetScreen, request, self._name_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def set_screen_async(self, name, **kwargs):
"""Async version of set_screen()"""
request = compositor_pb2.SetScreenRequest(name=name)
return self.call_async(self._stub.SetScreen, request, self._name_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def get_screen(self, **kwargs):
"""Get the currently selected screen"""
request = compositor_pb2.GetScreenRequest()
return self.call(self._stub.GetScreen, request, self._name_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def get_screen_async(self, **kwargs):
"""Async version of get_screen()"""
request = compositor_pb2.GetScreenRequest()
return self.call_async(self._stub.GetScreen, request, self._name_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def list_screens(self, **kwargs):
"""List available screens"""
request = compositor_pb2.ListScreensRequest()
return self.call(self._stub.ListScreens, request, self._screens_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def list_screens_async(self, **kwargs):
"""Async version of list_screens()"""
request = compositor_pb2.ListScreensRequest()
return self.call_async(self._stub.ListScreens, request, self._screens_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def get_visible_cameras(self, **kwargs):
"""List cameras on Spot CAM"""
request = compositor_pb2.GetVisibleCamerasRequest()
return self.call(self._stub.GetVisibleCameras, request, self._streams_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def get_visible_cameras_async(self, **kwargs):
"""Async version of get_visible_cameras()"""
request = compositor_pb2.GetVisibleCamerasRequest()
return self.call_async(self._stub.GetVisibleCameras, request, self._streams_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def set_ir_colormap(self, colormap, min_temp, max_temp, auto_scale, **kwargs):
"""Set IR colormap to use on Spot CAM
Args:
colormap (bosdyn.api.spot_cam.compositor_pb2.IrColorMap.ColorMap): IR display colormap
min_temp (Float): minimum temperature on the temperature scale
max_temp (Float): maximum temperature on the temperature scale
auto_scale (Boolean): Auto-scale the color map. This is the most human-understandable
option. min_temp and max_temp are ignored if this is set to True
kwargs: extra arguments for controlling RPC details.
"""
scale = compositor_pb2.IrColorMap.ScalingPair(min=min_temp, max=max_temp)
auto = BoolValue(value=auto_scale)
ir_colormap = compositor_pb2.IrColorMap(colormap=colormap, scale=scale, auto_scale=auto)
request = compositor_pb2.SetIrColormapRequest(map=ir_colormap)
return self.call(self._stub.SetIrColormap, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def set_ir_colormap_async(self, colormap, min_temp, max_temp, auto_scale, **kwargs):
"""Async version of set_ir_colormap()"""
scale = compositor_pb2.IrColorMap.ScalingPair(min=min_temp, max=max_temp)
auto = BoolValue(value=auto_scale)
ir_colormap = compositor_pb2.IrColorMap(colormap=colormap, scale=scale, auto_scale=auto)
request = compositor_pb2.SetIrColormapRequest(map=ir_colormap)
return self.call_async(self._stub.SetIrColormap, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def get_ir_colormap(self, **kwargs):
"""Get currently selected IR colormap on Spot CAM"""
request = compositor_pb2.GetIrColormapRequest()
return self.call(self._stub.GetIrColormap, request, self._colormap_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def get_ir_colormap_async(self, **kwargs):
"""Async version of get_ir_colormap()"""
request = compositor_pb2.GetIrColormapRequest()
return self.call_async(self._stub.GetIrColormap, request, self._colormap_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def set_ir_meter_overlay(self, x, y, enable, unit, **kwargs):
"""Set IR reticle position to use on Spot CAM IR
Args:
x (Float): (0,1) horizontal coordinate of reticle
y (Float): (0,1) vertical coordinate of reticle
enable (Boolean): Enable the reticle on the display
unit (TempUnit): Temperature unit to display
kwargs: extra arguments for controlling RPC details.
"""
coords = compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y)
# setting both coords and meter fields for backwards compatibility
overlay = compositor_pb2.IrMeterOverlay(enable=enable, coords=coords, meter=[coords],
unit=unit)
request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
return self.call(self._stub.SetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def set_ir_meter_overlay_async(self, x, y, enable, unit, **kwargs):
"""Async version of set_ir_meter_overlay()"""
coords = compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y)
# setting both coords and meter fields for backwards compatibility
overlay = compositor_pb2.IrMeterOverlay(enable=enable, coords=coords, meter=[coords],
unit=unit)
request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
return self.call_async(self._stub.SetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def set_multi_ir_meter_overlay(self, coords, enable, unit, **kwargs):
"""Set multiple IR reticle positions to use on Spot CAM IR
Args:
coords (List[Tuple(Float, Float)]): List of (x, y) reticle coordinates in range (0,1)
e.g. [(0.1, 0.2), (0.2, 0.4), (0.7, 0.7)]
enable (Boolean): Enable the reticles on the display
unit (TempUnit): Temperature unit to display
kwargs: extra arguments for controlling RPC details.
"""
coords_proto = [
compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y) for x, y in coords
]
overlay = compositor_pb2.IrMeterOverlay(enable=enable, meter=coords_proto, unit=unit)
request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
return self.call(self._stub.SetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def set_multi_ir_meter_overlay_async(self, coords, enable, unit, **kwargs):
"""Async version of set_multi_ir_meter_overlay()"""
coords_proto = [
compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y) for x, y in coords
]
overlay = compositor_pb2.IrMeterOverlay(enable=enable, meter=coords_proto, unit=unit)
request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
return self.call_async(self._stub.SetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def get_ir_meter_overlay(self, **kwargs):
"""Get current IR reticle positions"""
request = compositor_pb2.GetIrMeterOverlayRequest()
return self.call(self._stub.GetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
def get_ir_meter_overlay_async(self, **kwargs):
"""Async version of get_ir_meter_overlay()"""
request = compositor_pb2.GetIrMeterOverlayRequest()
return self.call_async(self._stub.GetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
@staticmethod
def _return_response(response):
return response
@staticmethod
def _name_from_response(response):
return response.name
@staticmethod
def _screens_from_response(response):
return response.screens
@staticmethod
def _streams_from_response(response):
return response.streams
@staticmethod
def _colormap_from_response(response):
return response.map
@staticmethod
@handle_common_header_errors
def _compositor_error_from_response(response): # pylint: disable=unused-argument
return None