This repository has been archived by the owner on Oct 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathqgs_reader.py
358 lines (286 loc) · 11.9 KB
/
qgs_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import os
import re
from xml.etree import ElementTree
class QGSReader:
"""QGSReader class
Read QGIS 2.18 or 3.x projects and extract data for QWC config.
"""
def __init__(self, logger):
"""Constructor
:param Logger logger: Application logger
"""
self.logger = logger
self.root = None
self.qgis_version = 0
# get path to QGIS projects from ENV
self.qgs_resources_path = os.environ.get('QGIS_RESOURCES_PATH', 'qgs/')
def read(self, qgs_path):
"""Read QGIS project file and return True on success.
:param str qgs_path: QGS name with optional path relative to
QGIS_RESOURCES_PATH
"""
qgs_file = "%s.qgs" % qgs_path
qgs_path = os.path.join(self.qgs_resources_path, qgs_file)
if not os.path.exists(qgs_path):
self.logger.warn("Could not find QGS file '%s'" % qgs_path)
return False
try:
tree = ElementTree.parse(qgs_path)
self.root = tree.getroot()
if self.root.tag != 'qgis':
self.logger.warn("'%s' is not a QGS file" % qgs_path)
return False
# extract QGIS version
version = self.root.get('version')
major, minor, rev = [
int(v) for v in version.split('-')[0].split('.')
]
self.qgis_version = major * 10000 + minor * 100 + rev
except Exception as e:
self.logger.error(e)
return False
return True
def layer_metadata(self, layer_name):
"""Collect layer metadata from QGS.
:param str layer_name: Layer shortname
"""
config = {}
if self.root is None:
self.logger.warning("Root element is empty")
return config
# find layer by shortname
for maplayer in self.root.findall('.//maplayer'):
if maplayer.find('shortname') is not None:
maplayer_name = maplayer.find('shortname').text
else:
maplayer_name = maplayer.find('layername').text
if maplayer_name == layer_name:
provider = maplayer.find('provider').text
if provider != 'postgres':
self.logger.info("Not a PostgreSQL layer")
continue
datasource = maplayer.find('datasource').text
config['database'] = self.db_connection(datasource)
config.update(self.table_metadata(datasource))
config.update(self.attributes_metadata(maplayer))
break
return config
def db_connection(self, datasource):
"""Parse QGIS datasource URI and return SQLALchemy DB connection
string for a PostgreSQL database or connection service.
:param str datasource: QGIS datasource URI
"""
connection_string = None
if 'service=' in datasource:
# PostgreSQL connection service
m = re.search(r"service='([\w ]+)'", datasource)
if m is not None:
connection_string = 'postgresql:///?service=%s' % m.group(1)
elif 'dbname=' in datasource:
# PostgreSQL database
dbname, host, port, user, password = '', '', '', '', ''
m = re.search(r"dbname='(.+?)' \w+=", datasource)
if m is not None:
dbname = m.group(1)
m = re.search(r"host=([\w\.]+)", datasource)
if m is not None:
host = m.group(1)
m = re.search(r"port=(\d+)", datasource)
if m is not None:
port = m.group(1)
m = re.search(r"user='(.+?)' \w+=", datasource)
if m is not None:
user = m.group(1)
# unescape \' and \\'
user = re.sub(r"\\'", "'", user)
user = re.sub(r"\\\\", r"\\", user)
m = re.search(r"password='(.+?)' \w+=", datasource)
if m is not None:
password = m.group(1)
# unescape \' and \\'
password = re.sub(r"\\'", "'", password)
password = re.sub(r"\\\\", r"\\", password)
# postgresql://user:password@host:port/dbname
connection_string = 'postgresql://'
if user and password:
connection_string += "%s:%s@" % (user, password)
connection_string += "%s:%s/%s" % (host, port, dbname)
return connection_string
def table_metadata(self, datasource):
"""Parse QGIS datasource URI and return table metadata.
:param str datasource: QGIS datasource URI
"""
metadata = {}
# parse schema, table and geometry column
m = re.search(r'table="(.+?)" \((\w+)\) \w+=', datasource)
if m is not None:
table = m.group(1)
parts = table.split('"."')
metadata['schema'] = parts[0]
metadata['table_name'] = parts[1]
metadata['geometry_column'] = m.group(2)
else:
m = re.search(r'table="(.+?)" \w+=', datasource)
if m is not None:
table = m.group(1)
parts = table.split('"."')
metadata['schema'] = parts[0]
metadata['table_name'] = parts[1]
m = re.search(r"key='(.+?)' \w+=", datasource)
if m is not None:
metadata['primary_key'] = m.group(1)
m = re.search(r"type=([\w.]+)", datasource)
if m is not None:
metadata['geometry_type'] = m.group(1).upper()
m = re.search(r"srid=([\d.]+)", datasource)
if m is not None:
metadata['srid'] = int(m.group(1))
return metadata
def attributes_metadata(self, maplayer):
"""Collect layer attributes.
:param Element maplayer: QGS maplayer node
"""
attributes = []
fields = {}
aliases = maplayer.find('aliases')
for alias in aliases.findall('alias'):
field = alias.get('field')
if self.field_hidden(maplayer, field):
# skip hidden fields
continue
attributes.append(field)
fields[field] = {}
# get alias
name = alias.get('name')
if name:
fields[field]['alias'] = name
# get any constraints from edit widgets
constraints = self.edit_widget_constraints(maplayer, field)
if constraints:
fields[field]['constraints'] = constraints
return {
'attributes': attributes,
'fields': fields
}
def edit_widget_constraints(self, maplayer, field):
"""Get any constraints from edit widget config.
:param Element maplayer: QGS maplayer node
:param str field: Field name
"""
if self.qgis_version > 30000:
return self.edit_widget_constraints_v3(maplayer, field)
else:
return self.edit_widget_constraints_v2(maplayer, field)
def edit_widget_constraints_v2(self, maplayer, field):
"""Get any constraints from edit widget config (QGIS 2.18).
:param Element maplayer: QGS maplayer node
:param str field: Field name
"""
constraints = {}
edittype = maplayer.find("edittypes/edittype[@name='%s']" % field)
widget_config = edittype.find('widgetv2config')
if widget_config.get('fieldEditable') == '0':
constraints['readonly'] = True
if (not constraints.get('readonly', False) and
widget_config.get('notNull') == '1'):
constraints['required'] = True
constraint_desc = widget_config.get('constraintDescription', '')
if len(constraint_desc) > 0:
constraints['placeholder'] = constraint_desc
if edittype.get('widgetv2type') == 'Range':
constraints.update({
'min': self.parse_number(widget_config.get('Min')),
'max': self.parse_number(widget_config.get('Max')),
'step': self.parse_number(widget_config.get('Step'))
})
elif edittype.get('widgetv2type') == 'ValueMap':
values = []
for value in widget_config.findall('value'):
values.append({
'label': value.get('key'),
'value': value.get('value')
})
if values:
constraints['values'] = values
return constraints
def edit_widget_constraints_v3(self, maplayer, field):
"""Get any constraints from edit widget config (QGIS 3.x).
:param Element maplayer: QGS maplayer node
:param str field: Field name
"""
constraints = {}
# NOTE: <editable /> is empty if Attributes Form is not configured
editable_field = maplayer.find("editable/field[@name='%s']" % field)
if (editable_field is not None and
editable_field.get('editable') == '0'):
constraints['readonly'] = True
if not constraints.get('readonly', False):
# ConstraintNotNull = 1
constraints['required'] = int(
maplayer.find("constraints/constraint[@field='%s']" % field)
.get('constraints')
) & 1 > 0
constraint_desc = maplayer.find(
"constraintExpressions/constraint[@field='%s']" % field
).get('desc')
if len(constraint_desc) > 0:
constraints['placeholder'] = constraint_desc
edit_widget = maplayer.find(
"fieldConfiguration/field[@name='%s']/editWidget" % field
)
if edit_widget.get('type') == 'Range':
min_option = edit_widget.find(
"config/Option/Option[@name='Min']")
max_option = edit_widget.find(
"config/Option/Option[@name='Max']")
step_option = edit_widget.find(
"config/Option/Option[@name='Step']")
constraints.update({
'min': self.parse_number(
min_option.get('value')) if min_option else -2147483648,
'max': self.parse_number(
max_option.get('value')) if max_option else 2147483647,
'step': self.parse_number(
step_option.get('value')) if step_option else 1
})
elif edit_widget.get('type') == 'ValueMap':
values = []
for option_map in edit_widget.findall(
"config/Option/Option[@type='List']/Option"
):
option = option_map.find("Option")
values.append({
'label': option.get('name'),
'value': option.get('value')
})
if values:
constraints['values'] = values
return constraints
def field_hidden(self, maplayer, field):
"""Return whether field is hidden.
:param Element maplayer: QGS maplayer node
:param str field: Field name
"""
if self.qgis_version > 30000:
edit_widget = maplayer.find(
"fieldConfiguration/field[@name='%s']/editWidget" % field
)
return edit_widget.get('type') == 'Hidden'
else:
edittype = maplayer.find("edittypes/edittype[@name='%s']" % field)
return edittype.get('widgetv2type') == 'Hidden'
def parse_number(self, value):
"""Parse string as int or float, or return string if neither.
:param str value: Number value as string
"""
result = value
try:
result = int(value)
except ValueError:
# int conversion failed
try:
result = float(value)
except ValueError:
# float conversion failed
pass
return result