-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathd8_accumulator.py
455 lines (391 loc) · 16.8 KB
/
d8_accumulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
"""
Module for accumulating flow on a D8 flow grid. This class can be used to calculate drainage area and discharge,
and to accumulate any other tracer across a drainage network.
Builds a network of nodes from a D8 flow grid. Uses a queue-based algorithm to traverse the network in topological order,
modified from Braun & Willet (2013) DOI: 10.1016/j.geomorph.2012.10.008. This is faster than the recursive algorithm used in
original Landlab implementation as we use an iterative build_ordered_list algorithm (much faster). Most of the code is written
in Cython for speed. The approach is linear w.r.t. the number of nodes in the network. Class is designed to be used with
geospatial rasters, but can also be used with a numpy array of D8 flow directions with some loss of functionality.
See example.py and README.md for usage examples.
"""
import warnings
from typing import Tuple, List, Union
from geojson import MultiLineString
import json
import numpy as np
from osgeo import gdal
import cfuncs as cf
def read_geo_file(filename: str) -> Tuple[np.ndarray, gdal.Dataset]:
"""Reads a geospatial file"""
ds = gdal.Open(filename)
band = ds.GetRasterBand(1)
arr = band.ReadAsArray()
return arr, ds
def write_geotiff(filename: str, arr: np.ndarray, ds: gdal.Dataset) -> None:
"""Writes a numpy array to a geotiff"""
if arr.dtype == np.float32:
arr_type = gdal.GDT_Float32
elif arr.dtype == np.float64:
arr_type = gdal.GDT_Float64
else:
arr_type = gdal.GDT_Int32
driver = gdal.GetDriverByName("GTiff")
out_ds = driver.Create(filename, arr.shape[1], arr.shape[0], 1, arr_type)
out_ds.SetProjection(ds.GetProjection())
out_ds.SetGeoTransform(ds.GetGeoTransform())
band = out_ds.GetRasterBand(1)
band.WriteArray(arr)
band.FlushCache()
band.ComputeStatistics(False)
def elevation_to_d8(filepath: str) -> None:
"""
Converts a geospatial raster of elevations to a D8 flow grid and writes it to a file called [filename]_d8.tif.
Follows convention of ESRI D8 flow directions: right = 1, lower right = 2, bottom = 4, etc.
All boundary nodes are set to be sinks.
Parameters
----------
filepath : str
Path to the geospatial raster of elevations
Returns
-------
None
"""
elev, ds = read_geo_file(filepath)
d8 = cf.topo_to_d8(elev.astype(float))
# Write the D8 flow grid to a file called d8_filename.tif
write_geotiff("d8_" + filepath, d8, ds)
def write_geojson(filename: str, geojson: dict):
"""Writes a GeoJSON object to a file"""
with open(filename, "w") as f:
json.dump(geojson, f)
class D8Accumulator:
"""Class to accumulate flow on a D8 flow grid. This class can be used to calculate drainage area and discharge,
and to accumulate any other tracer across a drainage network. The class assumes that all boundary
nodes are sinks (i.e., no flow leaves the grid). This class can be used with any geospatial file that GDAL can read.
It can also be used with a numpy array of D8 flow directions.
Parameters
----------
filename : str
Path to the D8 flow grid geospatial file (e.g., .tif, .asc, etc.)
This can be to any file that GDAL can read. Expects a single band raster (ignores other bands).
This raster should be a 2D array of D8 flow directions according to ESRI convention:
Sink [no flow]= 0
Right = 1
Lower right = 2
Bottom = 4
Lower left = 8
Left = 16
Upper left = 32
Top = 64
Upper right = 128
Attributes
----------
receivers : np.ndarray
Array of receiver nodes (i.e., the ID of the node that receives the flow from the i'th node)
baselevel_nodes : np.ndarray
Array of baselevel nodes (i.e., nodes that do not donate flow to any other nodes)
order : np.ndarray
Array of nodes in order of upstream to downstream (breadth-first)
arr : np.ndarray
Array of D8 flow directions
ds : gdal.Dataset
GDAL Dataset object of the D8 flow grid. If the array is manually set, this will be None
extent : List[float]
Extent of the array in the accumulator as [xmin, xmax, ymin, ymax]. Can be used for plotting.
Methods
-------
accumulate(weights : np.ndarray = None)
Accumulate flow on the grid using the D8 flow directions
get_channel_segments(field : np.ndarray, threshold : float)
Get the profile segments of river channels where 'field' is greater than 'threshold'. Used for, e.g., plotting
the location of a river channel as a line-string.
get_profile(start_node : int)
Extract the downstream profile *from* a given node.
node_to_coord(node : int)
Converts a node index to a coordinate pair
coord_to_node(x : float, y : float)
Converts a coordinate pair to a node index
"""
def __init__(self, filename: str):
"""
Parameters
----------
filename : str
Path to the D8 flow grid
"""
# Check that filename is a string
if not isinstance(filename, str):
raise TypeError("Filename must be a string")
self._arr, self._ds = read_geo_file(filename)
self._arr = self._arr.astype(int)
self._receivers = cf.d8_to_receivers(self.arr)
self._baselevel_nodes = np.where(
self.receivers == np.arange(len(self.receivers))
)[0]
self._order = cf.build_ordered_list_iterative(
self.receivers, self.baselevel_nodes
)
def accumulate(self, weights: np.ndarray = None) -> np.ndarray:
"""Accumulate flow on the grid using the D8 flow directions
Parameters
----------
weights : np.ndarray [ndim = 2], optional
Array of weights for each node, defaults to giving each node a weight of 1, resulting in a map of the number of upstream nodes.
If the area of each node is known, this can be used to calculate drainage area. If run-off at each node is known,
this can be used to calculate discharge.
Returns
-------
np.ndarray [ndim = 2]
Array of accumulated weights (or number of upstream nodes if no weights are passed)
"""
if weights is None:
# If no weights are passed, assume all nodes have equal weight of 1.
# Output is array of # upstream nodes
weights = np.ones(len(self.receivers))
else:
if weights.shape != self.arr.shape:
raise ValueError("Weights must be have same shape as D8 array")
weights = weights.flatten()
return cf.accumulate_flow(self.receivers, self.order, weights=weights).reshape(
self._arr.shape
)
def get_channel_segments(
self, field: np.ndarray, threshold: float
) -> Union[List[List[int]], MultiLineString]:
"""Get the profile segments of river channels where 'field' is greater than 'threshold'. i.e.,
if 'field' is drainage area, this will return the profile segments of channels with drainage area greater than 'threshold'.
Generated by topologically traversing the network using a depth-first search algorithm from baselevel nodes, only
continuing to traverse a node if the value of 'field' is greater than 'threshold'. If the D8 flow grid is a geospatial
raster, this will return a GeoJSON MultiLineString object of the profile segments. If the D8 flow grid is a numpy array,
this will return a list of segments of node IDs.
Parameters
----------
field
Array of values to get profile segments according to
threshold
Threshold value for the profile segments
Returns
-------
- GeoJSON MultiLineString object of the profile segments if the D8 flow grid is a geospatial raster.
- List of segments of node IDs if the D8 flow grid is a numpy array (and no GDAL Dataset object exists)
"""
# Nodes where field is greater than threshold
gteq_thresh = (field > threshold).flatten()
# Nodes that are baselevel
is_baselevel = np.asarray(self.receivers) == np.arange(len(self.receivers))
# Starting nodes are where field is greater than threshold and are also baselevel
starting_nodes = np.where(np.logical_and(gteq_thresh, is_baselevel))[0]
n_donors = cf.count_donors(self._receivers)
delta = cf.ndonors_to_delta(n_donors)
donors = cf.make_donor_array(self._receivers, delta)
# Get the profile segments of node IDs
segments = cf.get_channel_segments(
starting_nodes, delta, donors, field.flatten(), threshold
)
# Convert to x,y indices
if self.ds is None:
warnings.warn(
"\nNo GDAL Dataset object exists. Cannot convert to x,y indices.\nReturning node ID segments"
)
return segments
else:
geotransform = self.ds.GetGeoTransform()
ULx = geotransform[0]
ULy = geotransform[3]
dx = geotransform[1]
dy = geotransform[5]
ncols = self.arr.shape[1]
coord_segs = cf.id_segments_to_coords_segments(
segments, ncols, dx, dy, ULx, ULy
)
return MultiLineString(coord_segs)
def get_profile(self, start_node: int) -> Tuple[np.ndarray[int], np.ndarray[float]]:
"""Extract the downstream profile *from* a given node. Returns the profile as a list
of node IDs in order upstream to downstream. Also returns the distance along the profile
*counting upstream from the mouth* in the same units as the geospatial file. i.e.,
a distance of 0 is the mouth of the river, and a distance of 100 is 100 units upstream from the mouth.
Parameters
----------
start_node : int
Node ID to start the profile from. Must be a valid node ID.
Returns
-------
Tuple[np.ndarray[int], np.nadrray[float]]
Tuple of the profile as an array of node IDs and the distance along the profile from the start node.
Raises
------
ValueError
If start_node is not a valid node ID
"""
if start_node < 0 or start_node >= self.arr.size:
raise ValueError("start_node must be a valid node index")
dx = self.ds.GetGeoTransform()[1]
dy = self.ds.GetGeoTransform()[5] * -1
profile, distance = cf.get_profile(
start_node, dx, dy, self._receivers, self.arr.flatten()
)
# Check length of outputs
if len(profile) == 0:
warnings.warn("\nProfile is empty. Returning empty arrays")
return np.asarray([]), np.asarray([])
else:
dstream_dist = np.asarray(distance)
return np.asarray(profile), np.amax(dstream_dist) - dstream_dist
def get_upstream_nodes(self, start_node: int) -> np.ndarray:
"""Get all nodes upstream of a given node. Returns an array of node IDs.
Parameters
----------
start_node : int
Node ID to start the profile from. Must be a valid node ID.
Returns
-------
np.ndarray
Array of node IDs upstream of the start node.
Raises
------
ValueError
If start_node is not a valid node ID
TypeError
If start_node is not an integer
"""
self._check_valid_node(start_node)
n_donors = cf.count_donors(self._receivers)
delta = cf.ndonors_to_delta(n_donors)
donors = cf.make_donor_array(self._receivers, delta)
return np.asarray(cf.get_upstream_nodes(start_node, delta, donors))
def node_to_coord(self, node: int) -> Tuple[float, float]:
"""Converts a node index to a coordinate pair for the centre of the pixel"""
self._check_valid_node(node)
_, ncols = self.arr.shape
x_ind = node % ncols
y_ind = node // ncols
ulx, dx, _, uly, _, dy = self.ds.GetGeoTransform()
x_coord = ulx + dx * x_ind
y_coord = uly + dy * y_ind
# Add dx/2 and dy/2 to get to the center of the pixel from the upper left corner
x_coord += dx / 2
y_coord += dy / 2 # recall that dy is negative
return x_coord, y_coord
def coord_to_node(self, x: float, y: float) -> int:
"""Converts a coordinate pair to a node index"""
nrows, ncols = self.arr.shape
ulx, dx, _, uly, _, dy = self.ds.GetGeoTransform()
# Casting to int rounds towards zero ('floor' for positive numbers; e.g, int(3.9) = 3)
x_ind = int((x - ulx) / dx)
y_ind = int((y - uly) / dy)
out = y_ind * ncols + x_ind
if out > ncols * nrows or out < 0:
raise ValueError("Coordinate is out of bounds")
return out
def _check_valid_node(self, node: int) -> None:
"""Checks if a node is valid"""
if node < 0 or node >= self.arr.size:
raise ValueError("Node is out of bounds")
if not isinstance(node, int) and not np.issubdtype(type(node), np.integer):
raise TypeError("Node must be an integer")
def get_sink(self, node: int) -> int:
"""Gets the sink node which flow from the input node ultimate goes to.
Parameters
----------
node : int
Node index
Returns
-------
int
Sink node index
Raises
------
TypeError
If node is not an int
ValueError
If node is out of bounds
"""
self._check_valid_node(node)
# Get the sink node
return cf.get_sink_node(node, self._receivers)
def get_node_mask(self, nodes: np.ndarray) -> np.ndarray:
"""Get a boolean mask of the nodes in the input array
Parameters
----------
nodes : np.ndarray
Array of node indices
Returns
-------
np.ndarray
Boolean mask of the nodes in the input array
"""
out = np.zeros(self.arr.shape,dtype=bool).flatten()
out.fill(False)
out[nodes] = True
return out.reshape(self.arr.shape)
@property
def receivers(self) -> np.ndarray:
"""Array of receiver nodes (i.e., the ID of the node that receives the flow from the i'th node)"""
return np.asarray(self._receivers)
@property
def baselevel_nodes(self) -> np.ndarray:
"""Array of baselevel nodes (i.e., nodes that do not donate flow to any other nodes)"""
return self._baselevel_nodes
@property
def order(self) -> np.ndarray:
"""Array of nodes in order of upstream to downstream"""
return np.asarray(self._order)
@property
def arr(self):
"""Array of D8 flow directions"""
return self._arr
@property
def extent(self) -> List[float]:
"""
Get the extent of the array in the accumulator. Can be used for plotting.
"""
trsfm = self.ds.GetGeoTransform()
minx = trsfm[0]
maxy = trsfm[3]
maxx = minx + trsfm[1] * self.arr.shape[1]
miny = maxy + trsfm[5] * self.arr.shape[0]
return [minx, maxx, miny, maxy]
@property
def ds(self):
"""GDAL Dataset object of the D8 flow grid"""
if self._ds is None:
warnings.warn("\nNo GDAL Dataset object exists.")
return self._ds
@arr.setter
def arr(self, arr):
warnings.warn("\nManually setting the array. Geospatial information lost")
if len(arr.shape) != 2:
raise ValueError("D8 Array must be 2D")
self._arr = arr
self._ds = None
self._receivers = cf.d8_to_receivers(self.arr)
self._baselevel_nodes = np.where(
self.receivers == np.arange(len(self.receivers))
)[0]
self._order = cf.build_ordered_list_iterative(
self.receivers, self.baselevel_nodes
)
@classmethod
def from_array(cls, arr: np.ndarray):
"""
Creates a D8Accumulator from a numpy array
Parameters
----------
arr : np.ndarray
2D array of D8 flow directions
"""
if len(arr.shape) != 2:
raise ValueError("D8 Array must be 2D")
# Create an instance of the class
instance = cls.__new__(cls)
# Initialize attributes
instance._arr = arr.astype(int)
instance._ds = None
instance._receivers = cf.d8_to_receivers(instance.arr)
instance._baselevel_nodes = np.where(
instance.receivers == np.arange(len(instance.receivers))
)[0]
instance._order = cf.build_ordered_list_iterative(
instance.receivers, instance.baselevel_nodes
)
return instance