Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlowEnrichment stage #50

Open
ghost opened this issue Aug 25, 2016 · 11 comments
Open

FlowEnrichment stage #50

ghost opened this issue Aug 25, 2016 · 11 comments

Comments

@ghost
Copy link

ghost commented Aug 25, 2016

While working with akka-streams, I had a couple of flows where I had, for example, a (id, message) tuple but the ID was just carried along the stream. Everything in the stream itself (maps and filters, mostly) only cared about the message. The resulting code was pretty ugly (pattern matching the tuple at every point, with type annotations since the type-inference went haywire), and so I wrote a GraphStage that wraps a Flow and adds/removes the ID.

Do you think this is something that can be added here?

@ktoso
Copy link
Contributor

ktoso commented Aug 25, 2016

I'd be very happy to see that actually. Seen such patterns in big data things before but noone had a nice api for it.

Interesting idea would be:

  Source.single("name" -> 42)
    .focusOn(_._2) 
      .map(_ + 58)      // could these be SubFlow/Source?
      .filter(_ > 10)   // could these be SubFlow/Source?
    .unfocus
    .map { case (name, num) => s"$name -> $num" }

@ghost
Copy link
Author

ghost commented Aug 25, 2016

I'll upload the code and open a PR later, so you can have a look.
The focus/unfocus looks nice, but I'm not sure how it would work. Where will it keep the "current" ID?

The current API looks like this:

    val innerFlow = Flow[String].map(_.toUpperCase).filterNot(s => s.startsWith("T"))
    val foo = Source(List((1l, "one"), (2l, "two"), (3l, "three"), (4l, "four")))
      .via(enrich(innerFlow))
      .runWith(Sink.foreach(println))

@ktoso
Copy link
Contributor

ktoso commented Aug 25, 2016

Perhaps exact syntax of focusOn is unachievable, but I think the feature should be possible.
Effectively .map(el => f(ef)) is actually .map(el => copy(f(extract(ef))).

We need the copy and extract implemented. Extract is trivial, it's just _._2, copy may be something we ask people to provide in focusOn, so (t, s) => t.copy(bla = s).

It would be fun to see if we could use techniques from lenses here, see: https://github.com/julien-truffaut/Monocle

@ghost
Copy link
Author

ghost commented Aug 25, 2016

Ooh, good idea. I'll try to see if I can make it work.

(Just to be clear, you don't mean to actually use Monocle/catz/whatever, right?)

@ktoso
Copy link
Contributor

ktoso commented Aug 25, 2016

We could, if it turns out to be awesome might be very fun project.
Could live as separate project in akka-stream-contrib.

Simple one would be nice anyway, and perhaps then the more generic monocle-powered version

@drewhk
Copy link

drewhk commented Aug 25, 2016

Issues are many-to-many Flows (this have been considered before). It might work in some cases but be aware of the issue.

@ghost
Copy link
Author

ghost commented Aug 25, 2016

@ktoso
Here's the initial code I had: http://pastebin.com/CNx30ic9
I'll do some more work trying to get the "focus/unfocus" API (the current approach I'm trying is to extend FlowOpsMat...), and then tidy things up and upload to GH.

@drewhk
Yeah, I know. If it ends up here, that will have to be documented properly. Still, even in its current form it's pretty useful (to me, at least).

@talpr
Copy link

talpr commented Aug 26, 2016

@ktoso
I pushed the code to: https://github.com/talpr/akka-stream-contrib/commits/talpr-50-flow-tagger

I got focus/unfocus to work, but without support for materialized values (inner flow is always materialized to NotUsed). The latest commit in the branch(77a8f63) is the one without materialization, the commit before that(14e4fa7) is my initial attempt to handle materialization.

The problem is that while the inner flow is being constructed, the original source/flow is not yet connected to the inner flow, so I couldn't get the types (and semantics) to line up. It's probably possible by making a new shape, and then importing the source/flow into it, but I'll have to fiddle with it some more and see where I get.

What do you think?

@ktoso
Copy link
Contributor

ktoso commented Aug 26, 2016

Cool! Could you submit that branch as a PR? It's easier to comment and check out this way :)

@talpr
Copy link

talpr commented Aug 27, 2016

@ktoso
I got materialized values to work. Not the prettiest thing in the world, but it works...
(PR #52)

@johanandren
Copy link
Member

More discussion that may be relevant: akka/akka#15957

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants