-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathglobal_data_manager.py
418 lines (332 loc) · 12 KB
/
global_data_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# Copyright 2023 DFKI GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
from typing import Union, Dict
from util_arolib.types import *
from util_arolib.types import get_copy as get_copy_aro
from util_arolib.geometry import correct_polygon
from silo_planning.types import *
class GlobalDataManager:
""" Class used to register and access all objects in the problem, incl. fields, machines and silos """
# @todo: replace with realistic values
DEF_HARV_UNLOADING_SPEED_MASS = 100.0
DEF_HARV_UNLOADING_SPEED_VOLUME = 100.0
DEF_TV_UNLOADING_SPEED_MASS = 200.0
DEF_TV_UNLOADING_SPEED_VOLUME = 200.0
DEF_MASS_PER_SWEEP = 500.0
def __init__(self):
self.__fields: Dict[int, Field] = dict()
""" Registered fields: {field_id: field}"""
self.__machines: Dict[int, Machine] = dict()
""" Registered machines (harvesters, transport vehicles): {machine_id: machine}"""
self.__silos: Dict[int, SiloExtended] = dict()
""" Registered silos: {silo_id: silo}"""
self.__compactors: Dict[int, Compactor] = dict()
""" Registered compactors: {compactor_id: compactor}"""
def register_field(self, field: Field) -> bool:
""" Register/add a field with unique id
Note: the saved object is an (adjusted) copy of the input object
Parameters
----------
field : Field
Field
Returns
----------
success : bool
True on success
"""
if field.id in self.__fields.keys():
print(f'[ERROR]: a field with given id was already registered')
return False
field_new = self.__initialize_field(field)
if field_new is None:
return False
self.__fields[field_new.id] = field_new
return True
def register_machine(self, machine: Machine) -> bool:
""" Register/add a machine with unique id
Note: the saved object is an (adjusted) copy of the input object
Parameters
----------
machine : Machine
Machine
Returns
----------
success : bool
True on success
"""
if machine.id in self.__machines.keys():
print(f'[ERROR]: a machine with given id {machine.id} was already registered')
return False
m = get_copy_aro(machine)
if m.unloading_speed_mass <= 0.0:
m.unloading_speed_mass = self.DEF_HARV_UNLOADING_SPEED_MASS if m.machinetype is MachineType.HARVESTER \
else self.DEF_TV_UNLOADING_SPEED_MASS
if m.unloading_speed_volume <= 0.0:
m.unloading_speed_mass = self.DEF_HARV_UNLOADING_SPEED_VOLUME if m.machinetype is MachineType.HARVESTER \
else self.DEF_TV_UNLOADING_SPEED_VOLUME
m.bunker_mass = max(m.bunker_mass, 0.0)
m.bunker_volume = max(m.bunker_volume, 0.0)
self.__machines[machine.id] = m
return True
def register_silo(self, silo: SiloExtended) -> bool:
""" Register/add a silo with unique id
Note: the saved object is an (adjusted) copy of the input object
Parameters
----------
silo : SiloExtended
Silo
Returns
----------
success : bool
True on success
"""
if silo.id in self.__silos.keys():
print(f'[ERROR]: a silo with given id was already registered')
return False
self.__silos[silo.id] = copy.deepcopy(silo)
return True
def register_compactor(self, compactor: Compactor) -> bool:
""" Register/add a compactor with unique id
Note: the saved object is an (adjusted) copy of the input object
Parameters
----------
compactor : Compactor
Compactor
Returns
----------
success : bool
True on success
"""
if compactor.id in self.__compactors.keys():
print(f'[ERROR]: a compactor with given id {compactor.id} was already registered')
return False
if compactor.silo_id not in self.__silos.keys():
print(f'[ERROR]: a compactor with given id {compactor.id} belongs to an unregistered silo')
return False
c = copy.deepcopy(compactor)
if c.mass_per_sweep <= 0.0:
c.unloading_speed_mass = self.DEF_MASS_PER_SWEEP
self.__compactors[compactor.id] = c
return True
@property
def fields(self) -> Dict[int, Field]:
""" Get all registered fields
Returns
----------
fields : Dict[int, Field]
Registered fields: {field_id: field}
"""
return self.__fields
@property
def machines(self) -> Dict[int, Machine]:
""" Get all registered machines (harvesters, transport vehicles)
Returns
----------
machines : Dict[int, Machine]
Registered machines: {machine_id: machine}
"""
return self.__machines
@property
def silos(self) -> Dict[int, SiloExtended]:
""" Get all registered silos
Returns
----------
silos : Dict[int, SiloExtended]
Registered silos: {silo_id: silo}
"""
return self.__silos
@property
def compactors(self) -> Dict[int, Compactor]:
""" Get all registered compactors
Returns
----------
compactors : Dict[int, Compactor]
Registered compactors: {compactor_id: compactor}
"""
return self.__compactors
def get_field(self, field_id: int) -> Union[Field, None]:
""" Get the registered field with the given id
Parameters
----------
field_id : int
Field id
Returns
----------
field : Field
Registered field (None if not found)
"""
return self.__fields.get(field_id)
def get_field_with_silos(self, field_id: int, silos: List[ Union[SiloExtended, ResourcePoint] ] = None) -> Union[Field, None]:
""" Get a copy of a registered field adding the given silos/resource points
Parameters
----------
field_id : int
Field id
silos : List[ Union[SiloExtended, ResourcePoint] ]
Silos / resource points. If None, it will add all registered silos.
Returns
----------
field : Field
Copy of the registered field with silos (None if not found)
"""
f = self.__fields.get(field_id)
if f is None:
return None
f_new = get_copy_aro(f)
for sf in f_new.subfields:
sf.resource_points = ResourcePointVector()
if silos is None:
for silo in self.__silos.values():
sf.resource_points.append( copy.deepcopy(silo) )
else:
for silo in silos:
sf.resource_points.append( copy.deepcopy(silo) )
return f_new
def get_machine(self, machine_id: int) -> Union[Machine, None]:
""" Get the registered machine with the given id
Parameters
----------
machine_id : int
Machine id
Returns
----------
machine : Machine
Registered machine (None if not found)
"""
return self.__machines.get(machine_id)
def get_silo(self, silo_id: int) -> Union[SiloExtended, None]:
""" Get the registered silo with the given id
Parameters
----------
silo_id : int
Silo id
Returns
----------
silo : SiloExtended
Registered silo (None if not found)
"""
return self.__silos.get(silo_id)
def get_compactor(self, compactor_id: int) -> Union[Compactor, None]:
""" Get the registered silo with the given id
Parameters
----------
compactor_id : int
Compactor id
Returns
----------
compactor : Compactor
Registered compactor (None if not found)
"""
return self.__compactors.get(compactor_id)
def get_field_copy(self, field_id: int) -> Union[Field, None]:
""" Get a copy the registered field with the given id
Parameters
----------
field_id : int
Field id
Returns
----------
field : Field
Copy of the registered field (None if not found)
"""
f = self.__fields.get(field_id)
if f is None:
print(f'[ERROR]: invalid field id')
return None
return get_copy_aro(f)
def get_machine_copy(self, machine_id: int) -> Union[Machine, None]:
""" Get a copy the registered field with the given id
Parameters
----------
machine_id : int
Machine id
Returns
----------
machine : Machine
Copy of the registered machine (None if not found)
"""
m = self.__machines.get(machine_id)
if m is None:
print(f'[ERROR]: invalid machine id')
return None
return get_copy_aro(m)
def get_silo_copy(self, silo_id: int) -> Union[SiloExtended, None]:
""" Get a copy the registered field with the given id
Parameters
----------
silo_id : int
Silo id
Returns
----------
silo : SiloExtended
Copy of the registered silo (None if not found)
"""
s = self.__silos.get(silo_id)
if s is None:
print(f'[ERROR]: invalid silo id')
return None
return copy.deepcopy(s)
def get_compactor_copy(self, compactor_id: int) -> Union[Compactor, None]:
""" Get a copy the registered field with the given id
Parameters
----------
compactor_id : int
Compactor id
Returns
----------
compactor : Compactor
Copy of the registered compactor (None if not found)
"""
c = self.__compactors.get(compactor_id)
if c is None:
print(f'[ERROR]: invalid compactor id')
return None
return copy.deepcopy(c)
@staticmethod
def __initialize_field(_field: Field) -> Union[Field, None]:
""" Initialize a field
Parameters
----------
_field : Field
Field
Returns
----------
initialize_field : Field
Initialized field (None on error)
"""
if len(_field.subfields) == 0:
print(f'[ERROR]: invalid field: the field has no subfield')
return None
field = get_copy_aro(_field)
field.subfields = SubfieldVector()
for sf in _field.subfields:
if len(sf.boundary_outer.points) > 2 \
and len(sf.access_points) > 0 \
and len(sf.reference_lines) > 0 \
and len(sf.reference_lines[0].points) > 1:
field.subfields.append(sf)
break
if len(field.subfields) == 0:
print(f'[ERROR]: invalid field: the field has no valid subfield '
f'(boundary, access points, reference lines, ...)')
return None
sf = field.subfields[0]
sf.resource_points = ResourcePointVector()
correct_polygon(field.outer_boundary, True)
for sf in field.subfields:
correct_polygon(sf.boundary_outer, True)
correct_polygon(sf.boundary_inner, True)
return field