-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.py
131 lines (95 loc) · 4.5 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""Module with core PTransform, a unique processing step between sources and sinks."""
import logging
from datetime import date, timedelta
import apache_beam as beam
from apache_beam.transforms.window import IntervalWindow
from pipe_gaps.pipeline.processes import CoreProcess
logger = logging.getLogger(__name__)
def window_intersects_with_range(
date_range: tuple[date, date], window: IntervalWindow, offset: int = 0
):
"""Checks whether a window is intersecting with a given range."""
# window_end = window.end.to_utc_datetime(has_tz=True).date() - timedelta(days=1)
window_start = (window.start.to_utc_datetime(has_tz=True) + timedelta(seconds=offset)).date()
return window_start < date_range[1]
class Core(beam.PTransform):
def __init__(self, core_process: CoreProcess, side_inputs=None):
"""A core PTransform for pipelines.
This is meant to be a unique processing step between sources and sinks.
This PTransform will:
1. Group input PCollection into consecutive closed sets,
(that may or may not have overlap) using the grouping key
and time window defined in core_process.
2. Process the interior of the sets obtained in 1.
3. Process the union of the boundaries of each pair of consecutive sets.
4. Join outputs from 2 and 3 and assigns the output schema defined in core_process.
Args:
core_process: The instance that defines the core process.
side_inputs: A PCollection with side inputs that will be used
to process the unoin of the boundaries.
"""
self._process = core_process
self._side_inputs = side_inputs
def set_side_inputs(self, side_inputs):
self._side_inputs = side_inputs
def expand(self, pcoll):
logger.info(f"Grouping inputs by keys: {self._process.grouping_key()}.")
groups = pcoll | self.group_by_key_and_timestamp()
out_boundaries = groups | self.process_boundaries()
out_groups = groups | self.process_groups()
return (out_groups, out_boundaries) | self.join_outputs()
def assign_sliding_windows(self):
"""Returns the SlidingWindows PTransform."""
period, offset = self._process.time_window_period_and_offset()
size = period + offset
return "SlidingWindows" >> (
beam.Map(lambda e: beam.window.TimestampedValue(e, e["timestamp"]))
| beam.WindowInto(beam.window.SlidingWindows(size=size, period=period, offset=offset))
)
def group_by_key_and_timestamp(self):
"""Returns the GroupByKeyAndTime PTransform."""
key = self._process.grouping_key()
date_range = self._process._date_range
tr = (
self.assign_sliding_windows()
| self.group_by_key()
)
if date_range is not None:
tr = tr | self._filter_windows_out_of_range(date_range)
return f"GroupBy{key.name()}AndTime" >> tr
def group_by_key(self):
"""Returns the GroupByKey PTransform."""
key = self._process.grouping_key()
return f"GroupBy{key.name()}" >> beam.GroupBy(**key.func)
def process_groups(self):
"""Returns the ProcessGroups PTransform."""
return "ProcessGroups" >> (
beam.FlatMap(self._process.process_group)
| beam.WindowInto(beam.window.GlobalWindows())
)
def process_boundaries(self):
"""Returns the ProcessBoundaries PTransform."""
return "ProcessBoundaries" >> self._process_boundaries()
def join_outputs(self):
"""Returns the JoinOutputs PTransform."""
return "JoinOutputs" >> beam.Flatten().with_output_types(self._process.output_type())
def _process_boundaries(self):
side_inputs = None
if self._side_inputs is not None:
side_inputs = beam.pvalue.AsMultiMap(
self._side_inputs | self.group_by_key()
)
tr = (
beam.Map(self._process.get_group_boundary)
| beam.WindowInto(beam.window.GlobalWindows())
| self.group_by_key()
| beam.FlatMap(self._process.process_boundaries, side_inputs=side_inputs)
)
return tr
def _filter_windows_out_of_range(self, date_range):
_, offset = self._process.time_window_period_and_offset()
return beam.Filter(
lambda _,
window=beam.DoFn.WindowParam: window_intersects_with_range(
date_range, window, offset=offset)
)