Skip to content

Commit

Permalink
[prism] Support AnyOf in Prism. (#33705)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Jan 22, 2025
1 parent 7daccb5 commit 8c4bec8
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
* Initial support for AllowedLateness added. ([#33542](https://github.com/apache/beam/pull/33542))
* The Go SDK's inprocess Prism runner (AKA the Go SDK default runner) now supports non-loopback mode environment types. ([#33572](https://github.com/apache/beam/pull/33572))
* Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651))
* Support the AnyOf Environment for execution in Prism ([#33705](https://github.com/apache/beam/pull/33705))
* This improves support for developing Xlang pipelines, when using a compatible cross language service.

## Breaking Changes

Expand Down
54 changes: 47 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"log/slog"
"os"
"os/exec"
"slices"
"time"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
Expand All @@ -46,16 +47,26 @@ import (

func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) error {
logger := j.Logger.With(slog.String("envID", wk.Env))
// TODO fix broken abstraction.
// We're starting a worker pool here, because that's the loopback environment.
// It's sort of a mess, largely because of loopback, which has
// a different flow from a provisioned docker container.
e := j.Pipeline.GetComponents().GetEnvironments()[env]

if e.GetUrn() == urns.EnvAnyOf {
// We've been given a choice!
ap := &pipepb.AnyOfEnvironmentPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ap); err != nil {
logger.Error("unmarshaling any environment payload", "error", err)
return err
}
e = selectAnyOfEnv(ap)
logger.Info("AnyEnv resolved", "selectedUrn", e.GetUrn(), "worker", wk.ID)
// Process the environment as normal.
}

switch e.GetUrn() {
case urns.EnvExternal:
ep := &pipepb.ExternalPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ep); err != nil {
logger.Error("unmarshing external environment payload", "error", err)
logger.Error("unmarshaling external environment payload", "error", err)
return err
}
go func() {
externalEnvironment(ctx, ep, wk)
Expand All @@ -65,13 +76,15 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
case urns.EnvDocker:
dp := &pipepb.DockerPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), dp); err != nil {
logger.Error("unmarshing docker environment payload", "error", err)
logger.Error("unmarshaling docker environment payload", "error", err)
return err
}
return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint())
case urns.EnvProcess:
pp := &pipepb.ProcessPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil {
logger.Error("unmarshing docker environment payload", "error", err)
logger.Error("unmarshaling process environment payload", "error", err)
return err
}
go func() {
processEnvironment(ctx, pp, wk)
Expand All @@ -83,6 +96,33 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
}
}

func selectAnyOfEnv(ap *pipepb.AnyOfEnvironmentPayload) *pipepb.Environment {
// Prefer external, then process, then docker, unknown environments are 0.
ranks := map[string]int{
urns.EnvDocker: 1,
urns.EnvProcess: 5,
urns.EnvExternal: 10,
}

envs := ap.GetEnvironments()

slices.SortStableFunc(envs, func(a, b *pipepb.Environment) int {
rankA := ranks[a.GetUrn()]
rankB := ranks[b.GetUrn()]

// Reverse the comparison so our favourite is at the front
switch {
case rankA > rankB:
return -1 // Usually "greater than" would be 1
case rankA < rankB:
return 1
}
return 0
})
// Pick our favourite.
return envs[0]
}

func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *worker.W) {
conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/environments_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"testing"

pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
)

func TestSelectAnyOf(t *testing.T) {
tests := []struct {
name, want string
wantTag string
envs []*pipepb.Environment
}{
{name: "singleDefault", want: urns.EnvDefault, envs: []*pipepb.Environment{{Urn: urns.EnvDefault}}},
{name: "singleDocker", want: urns.EnvDocker, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}}},
{name: "singleProcess", want: urns.EnvProcess, envs: []*pipepb.Environment{{Urn: urns.EnvProcess}}},
{name: "singleExternal", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvExternal}}},
{name: "multiplePickExternal_1", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvExternal}, {Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}}},
{name: "multiplePickExternal_2", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}, {Urn: urns.EnvExternal}}},
{name: "multiplePickProcess", want: urns.EnvProcess, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}}},
{name: "multiplePickDocker", want: urns.EnvDocker, envs: []*pipepb.Environment{{Urn: urns.EnvDefault}, {Urn: urns.EnvDocker}}},
{name: "multiplePickFirstExternal", want: urns.EnvExternal, wantTag: "first", envs: []*pipepb.Environment{{Urn: urns.EnvExternal, Payload: []byte("first")}, {Urn: urns.EnvExternal, Payload: []byte("second")}}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
selected := selectAnyOfEnv(&pipepb.AnyOfEnvironmentPayload{Environments: test.envs})
if selected.GetUrn() != test.want {
t.Errorf("expected %v, got %v", test.want, selected.GetUrn())
}
if got, want := string(selected.GetPayload()), test.wantTag; got != want {
t.Errorf("expected payload with tag %v, got %v", want, got)
}
})
}

}
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/urns/urns.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,5 @@ var (
EnvProcess = envUrn(pipepb.StandardEnvironments_PROCESS)
EnvExternal = envUrn(pipepb.StandardEnvironments_EXTERNAL)
EnvDefault = envUrn(pipepb.StandardEnvironments_DEFAULT)
EnvAnyOf = envUrn(pipepb.StandardEnvironments_ANYOF)
)

0 comments on commit 8c4bec8

Please sign in to comment.