Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix interval join to work with Bytewax 0.20.1 #5

Merged
merged 2 commits into from
Jul 5, 2024

Conversation

whoahbot
Copy link
Contributor

@whoahbot whoahbot commented Jul 3, 2024

Ignore test failures for the moment, I still have to fix GH actions for this repo layout.

@whoahbot whoahbot requested a review from davidselassie July 3, 2024 17:59
Copy link
Contributor

@davidselassie davidselassie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the semantics of the operator and does not produce the correct join behavior. This update does not carry through which "right side" each value is coming from and collapsing an N-sided join into a 2-sided join.

Comment on lines 693 to 697
sided_left = op.map_value("side_left_0", left, _add_side_builder(0))
sided_rights = [
op.map_value(f"side_right_{i + 1}", right, _add_side_builder(i + 1))
for i, right in enumerate(rights)
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still required. Otherwise how does the internal machinery of the join logic know which of the "rights" each value came from?

@@ -366,7 +360,7 @@ def _unwrap_interval_unpaired(event: _IntervalEvent[V, W]) -> Optional[V]:
def interval(
step_id: str,
left: KeyedStream[V],
clock: Clock[V, SC],
clock: Clock,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be able to be typed. The clock must accept items of type V. Although SC might be confusing the type checker because it isn't used again. The state of the clock can be anything.

Suggested change
clock: Clock,
clock: Clock[V, Any],

Comment on lines 466 to 468
def on_value(self, side: LeftRight, value: Tuple[str, V]) -> Iterable[_JoinState]:
join_side, join_value = value
self.state.set_val(join_side, join_value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upstream type still needs to be something like Tuple[str, V] or Tuple[int, V] to describe "which right" each item came from and this unpacking still must happen.

The core interval operator only has the concept of "left" and "right" sides, but interval_join builds on top of that to introduce an "index" on the right side so that the _JoinState individually tracks all slots individually: one for the left, and one for each right.

E.g. you are joining 3 streams. Name on the left; email and age on the right. The join state needs 3 slots, not 2. This new code would replace an already seen email with an incoming age since they are both "right". You need to track each slot individually.

It was probably confusing for me to name the top level "interval side" as side and this inner "join side" as side; interval_side and join_side, or side and index would have been made it clearer they were different concepts.

- Keep the side assignment param to `on_value`
- Fix test code for interval.
- Fix clearing state.seen for all right hand sides.
@@ -0,0 +1 @@
"""Interval joins for Bytewax."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's your thinking on the premium package name structure? bytewax.interval_join.operators.interval seems kind of long and repetitive, but we can only have the namespace package at the top level. Put everything in bytewax.interval.operators? Or just bytewax.interval?

Also this specific module maybe just call interval because it does provide more functionality than just joins.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think bytewax.interval makes sense to me. As you point out, we provide more than interval_join.

I was originally thinking that bytewax.interval.operators makes sense, as other premium packages may provide things other than operators, but I could be convinced otherwise.

@@ -0,0 +1 @@
"""Interval joins for Bytewax."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Interval joins for Bytewax."""
"""Operators that find items on different streams close in time."""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I'll apply this in another PR. I missed this suggestion before I merged this PR.

@whoahbot whoahbot merged commit 8fd633a into main Jul 5, 2024
0 of 13 checks passed
@whoahbot whoahbot deleted the update_bytewax_version branch August 2, 2024 17:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants