diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index 15f9ecc101cb..3f8562f6ca9f 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -28,6 +28,7 @@ import ( "strings" "time" + "github.com/apache/beam/sdks/v2/go/container/pool" "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" @@ -44,6 +45,7 @@ import ( var ( // Contract: https://s.apache.org/beam-fn-api-container-contract. + workerPool = flag.Bool("worker_pool", false, "Run as worker pool (optional).") id = flag.String("id", "", "Local identifier (required).") loggingEndpoint = flag.String("logging_endpoint", "", "Local logging endpoint for FnHarness (required).") artifactEndpoint = flag.String("artifact_endpoint", "", "Local artifact endpoint for FnHarness (required).") @@ -56,6 +58,7 @@ const ( cloudProfilingJobName = "CLOUD_PROF_JOB_NAME" cloudProfilingJobID = "CLOUD_PROF_JOB_ID" enableGoogleCloudProfilerOption = "enable_google_cloud_profiler" + workerPoolIdEnv = "BEAM_GO_WORKER_POOL_ID" ) func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger *tools.Logger, metadata map[string]string) error { @@ -78,6 +81,30 @@ func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger *tools.Logg func main() { flag.Parse() + + if *workerPool { + workerPoolId := fmt.Sprintf("%d", os.Getpid()) + bin, err := os.Executable() + if err != nil { + log.Fatalf("Error starting worker pool, couldn't find boot loader path: %v", err) + } + + os.Setenv(workerPoolIdEnv, workerPoolId) + log.Printf("Starting worker pool %v: Go %v binary: %vv", workerPoolId, ":50000", bin) + + ctx := context.Background() + server, err := pool.New(ctx, 50000, bin) + if err != nil { + log.Fatalf("Error starting worker pool: %v", err) + } + defer server.Stop(ctx) + if err := server.ServeAndWait(); err != nil { + log.Fatalf("Error with worker pool: %v", err) + } + log.Print("Go SDK worker pool exited.") + os.Exit(0) + } + if *id == "" { log.Fatal("No id provided.") } @@ -126,7 +153,13 @@ func main() { // (3) The persist dir may be on a noexec volume, so we must // copy the binary to a different location to execute. - const prog = "/bin/worker" + tmpPrefix, err := os.MkdirTemp("/tmp/", "bin*") + if err != nil { + logger.Fatalf(ctx, "Failed to copy worker binary: %v", err) + } + + prog := tmpPrefix + "/worker" + logger.Printf(ctx, "From: %q To:%q", filepath.Join(dir, name), prog) if err := copyExe(filepath.Join(dir, name), prog); err != nil { logger.Fatalf(ctx, "Failed to copy worker binary: %v", err) } @@ -233,6 +266,11 @@ func copyExe(from, to string) error { } defer src.Close() + // Ensure that the folder path exists locally. + if err := os.MkdirAll(filepath.Dir(to), 0755); err != nil { + return err + } + dst, err := os.OpenFile(to, os.O_WRONLY|os.O_CREATE, 0755) if err != nil { return err diff --git a/sdks/go/container/pool/workerpool.go b/sdks/go/container/pool/workerpool.go new file mode 100644 index 000000000000..5f3642eac035 --- /dev/null +++ b/sdks/go/container/pool/workerpool.go @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package pool facilitates a external worker service, as an alternate mode for +// the standard Beam container. +// +// This is predeominantly to serve as a process spawner within a given container +// VM for an arbitrary number of jobs, instead of for a single worker instance. +// +// Workers will be spawned as executed OS processes. +package pool + +import ( + "context" + "fmt" + "log/slog" + "net" + "os" + "os/exec" + "sync" + + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" + "google.golang.org/grpc" +) + +// New initializes a process based ExternalWorkerService, at the given +// port. +func New(ctx context.Context, port int, containerExecutable string) (*Process, error) { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return nil, err + } + slog.Info("starting Process server", "addr", lis.Addr()) + grpcServer := grpc.NewServer() + root, cancel := context.WithCancel(ctx) + s := &Process{lis: lis, root: root, rootCancel: cancel, workers: map[string]context.CancelFunc{}, + grpcServer: grpcServer, containerExecutable: containerExecutable} + fnpb.RegisterBeamFnExternalWorkerPoolServer(grpcServer, s) + return s, nil +} + +// ServeAndWait starts the ExternalWorkerService and blocks until exit. +func (s *Process) ServeAndWait() error { + return s.grpcServer.Serve(s.lis) +} + +// Process implements fnpb.BeamFnExternalWorkerPoolServer, by starting external +// processes. +type Process struct { + fnpb.UnimplementedBeamFnExternalWorkerPoolServer + + containerExecutable string // The host for the container executable. + + lis net.Listener + root context.Context + rootCancel context.CancelFunc + + mu sync.Mutex + workers map[string]context.CancelFunc + + grpcServer *grpc.Server +} + +// StartWorker initializes a new worker harness, implementing BeamFnExternalWorkerPoolServer.StartWorker. +func (s *Process) StartWorker(_ context.Context, req *fnpb.StartWorkerRequest) (*fnpb.StartWorkerResponse, error) { + slog.Info("starting worker", "id", req.GetWorkerId()) + s.mu.Lock() + defer s.mu.Unlock() + if s.workers == nil { + return &fnpb.StartWorkerResponse{ + Error: "worker pool shutting down", + }, nil + } + + if _, ok := s.workers[req.GetWorkerId()]; ok { + return &fnpb.StartWorkerResponse{ + Error: fmt.Sprintf("worker with ID %q already exists", req.GetWorkerId()), + }, nil + } + if req.GetLoggingEndpoint() == nil { + return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Missing logging endpoint for worker %v", req.GetWorkerId())}, nil + } + if req.GetControlEndpoint() == nil { + return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Missing control endpoint for worker %v", req.GetWorkerId())}, nil + } + if req.GetLoggingEndpoint().Authentication != nil || req.GetControlEndpoint().Authentication != nil { + return &fnpb.StartWorkerResponse{Error: "[BEAM-10610] Secure endpoints not supported."}, nil + } + + ctx := grpcx.WriteWorkerID(s.root, req.GetWorkerId()) + ctx, s.workers[req.GetWorkerId()] = context.WithCancel(ctx) + + args := []string{ + "--id=" + req.GetWorkerId(), + "--control_endpoint=" + req.GetControlEndpoint().GetUrl(), + "--artifact_endpoint=" + req.GetArtifactEndpoint().GetUrl(), + "--provision_endpoint=" + req.GetProvisionEndpoint().GetUrl(), + "--logging_endpoint=" + req.GetLoggingEndpoint().GetUrl(), + } + + cmd := exec.CommandContext(ctx, s.containerExecutable, args...) + cmd.Stdin = os.Stdin + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = nil // Use the current environment. + + if err := cmd.Start(); err != nil { + return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Unable to start boot for worker %v: %v", req.GetWorkerId(), err)}, nil + } + return &fnpb.StartWorkerResponse{}, nil +} + +// StopWorker terminates a worker harness, implementing BeamFnExternalWorkerPoolServer.StopWorker. +func (s *Process) StopWorker(_ context.Context, req *fnpb.StopWorkerRequest) (*fnpb.StopWorkerResponse, error) { + slog.Info("stopping worker", "id", req.GetWorkerId()) + s.mu.Lock() + defer s.mu.Unlock() + if s.workers == nil { + // Worker pool is already shutting down, so no action is needed. + return &fnpb.StopWorkerResponse{}, nil + } + if cancelfn, ok := s.workers[req.GetWorkerId()]; ok { + cancelfn() + delete(s.workers, req.GetWorkerId()) + return &fnpb.StopWorkerResponse{}, nil + } + return &fnpb.StopWorkerResponse{ + Error: fmt.Sprintf("no worker with id %q running", req.GetWorkerId()), + }, nil + +} + +// Stop terminates the service and stops all workers. +func (s *Process) Stop(ctx context.Context) error { + s.mu.Lock() + + slog.Debug("stopping Process", "worker_count", len(s.workers)) + s.workers = nil + s.rootCancel() + + // There can be a deadlock between the StopWorker RPC and GracefulStop + // which waits for all RPCs to finish, so it must be outside the critical section. + s.mu.Unlock() + + s.grpcServer.GracefulStop() + return nil +} diff --git a/sdks/go/container/pool/workerpool_test.go b/sdks/go/container/pool/workerpool_test.go new file mode 100644 index 000000000000..add2e25681fc --- /dev/null +++ b/sdks/go/container/pool/workerpool_test.go @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "context" + "os/exec" + "testing" + + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" +) + +func TestProcess(t *testing.T) { + // Use the no-op true binary, if available, skip this test otherwise. + dummyExec, err := exec.LookPath("true") + if err != nil { + t.Skip("Binary `true` doesn't exist, skipping tests.") + } + + endpoint := &pipepb.ApiServiceDescriptor{ + Url: "localhost:0", + } + secureEndpoint := &pipepb.ApiServiceDescriptor{ + Url: "localhost:0", + Authentication: &pipepb.AuthenticationSpec{ + Urn: "beam:authentication:oauth2_client_credentials_grant:v1", + }, + } + + ctx, cancelFn := context.WithCancel(context.Background()) + t.Cleanup(cancelFn) + server, err := New(ctx, 0, dummyExec) + if err != nil { + t.Fatalf("Unable to create server: %v", err) + } + go server.ServeAndWait() + + startTests := []struct { + req *fnpb.StartWorkerRequest + errExpected bool + }{ + { + req: &fnpb.StartWorkerRequest{ + WorkerId: "Worker1", + ControlEndpoint: endpoint, + LoggingEndpoint: endpoint, + }, + }, { + req: &fnpb.StartWorkerRequest{ + WorkerId: "Worker2", + ControlEndpoint: endpoint, + LoggingEndpoint: endpoint, + }, + }, { + req: &fnpb.StartWorkerRequest{ + WorkerId: "Worker1", + ControlEndpoint: endpoint, + LoggingEndpoint: endpoint, + }, + errExpected: true, // Repeated start + }, { + req: &fnpb.StartWorkerRequest{ + WorkerId: "missingControl", + LoggingEndpoint: endpoint, + }, + errExpected: true, + }, { + req: &fnpb.StartWorkerRequest{ + WorkerId: "missingLogging", + ControlEndpoint: endpoint, + }, + errExpected: true, + }, { + req: &fnpb.StartWorkerRequest{ + WorkerId: "secureLogging", + LoggingEndpoint: secureEndpoint, + ControlEndpoint: endpoint, + }, + errExpected: true, + }, { + req: &fnpb.StartWorkerRequest{ + WorkerId: "secureControl", + LoggingEndpoint: endpoint, + ControlEndpoint: secureEndpoint, + }, + errExpected: true, + }, + } + for _, test := range startTests { + resp, err := server.StartWorker(ctx, test.req) + if test.errExpected { + if err != nil || resp.Error == "" { + t.Errorf("Expected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp) + } + } else { + if err != nil || resp.Error != "" { + t.Errorf("Unexpected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp) + } + } + } + stopTests := []struct { + req *fnpb.StopWorkerRequest + errExpected bool + }{ + { + req: &fnpb.StopWorkerRequest{ + WorkerId: "Worker1", + }, + }, { + req: &fnpb.StopWorkerRequest{ + WorkerId: "Worker1", + }, + errExpected: true, + }, { + req: &fnpb.StopWorkerRequest{ + WorkerId: "NonExistent", + }, + errExpected: true, + }, + } + for _, test := range stopTests { + resp, err := server.StopWorker(ctx, test.req) + if test.errExpected { + if err != nil || resp.Error == "" { + t.Errorf("Expected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp) + } + } else { + if err != nil || resp.Error != "" { + t.Errorf("Unexpected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp) + } + } + } + if err := server.Stop(ctx); err != nil { + t.Fatalf("error stopping server: err: %v", err) + } +} diff --git a/sdks/go/pkg/beam/options/jobopts/options.go b/sdks/go/pkg/beam/options/jobopts/options.go index 118c3fba2fc3..0652fe24e863 100644 --- a/sdks/go/pkg/beam/options/jobopts/options.go +++ b/sdks/go/pkg/beam/options/jobopts/options.go @@ -61,8 +61,8 @@ var ( JobName = flag.String("job_name", "", "Job name (optional).") // EnvironmentType is the environment type to run the user code. - EnvironmentType = flag.String("environment_type", "DOCKER", - "Environment Type. Possible options are DOCKER, and LOOPBACK.") + EnvironmentType = flag.String("environment_type", "", + "Environment Type. Possible options are DOCKER, EXTERNAL, and LOOPBACK.") // EnvironmentConfig is the environment configuration for running the user code. EnvironmentConfig = flag.String("environment_config", diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index be4809f5e2f6..25f533b50b9b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -30,6 +30,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "github.com/docker/docker/api/types/container" @@ -73,7 +74,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *worker.W) { conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - panic(fmt.Sprintf("unable to dial sdk worker %v: %v", ep.GetEndpoint().GetUrl(), err)) + panic(fmt.Sprintf("unable to dial sdk worker pool %v: %v", ep.GetEndpoint().GetUrl(), err)) } defer conn.Close() pool := fnpb.NewBeamFnExternalWorkerPoolClient(conn) @@ -81,7 +82,12 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo endpoint := &pipepb.ApiServiceDescriptor{ Url: wk.Endpoint(), } - pool.StartWorker(ctx, &fnpb.StartWorkerRequest{ + + // Use a background context for these workers to avoid pre-mature + // cancelation issues when starting them. + bgContext := context.Background() + + resp, err := pool.StartWorker(bgContext, &fnpb.StartWorkerRequest{ WorkerId: wk.ID, ControlEndpoint: endpoint, LoggingEndpoint: endpoint, @@ -89,6 +95,11 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo ProvisionEndpoint: endpoint, Params: ep.GetParams(), }) + + if str := resp.GetError(); err != nil || str != "" { + panic(fmt.Sprintf("unable to start sdk worker %v error: %v, resp: %v", ep.GetEndpoint().GetUrl(), err, prototext.Format(resp))) + } + // Job processing happens here, but orchestrated by other goroutines // This goroutine blocks until the context is cancelled, signalling // that the pool runner should stop the worker. @@ -96,7 +107,7 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo // Previous context cancelled so we need a new one // for this request. - pool.StopWorker(context.Background(), &fnpb.StopWorkerRequest{ + pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{ WorkerId: wk.ID, }) wk.Stop() diff --git a/sdks/go/pkg/beam/runners/prism/prism.go b/sdks/go/pkg/beam/runners/prism/prism.go index e260a7bb7ecd..02a43c58dc76 100644 --- a/sdks/go/pkg/beam/runners/prism/prism.go +++ b/sdks/go/pkg/beam/runners/prism/prism.go @@ -40,8 +40,7 @@ func init() { // Execute runs the given pipeline on prism. If no endpoint is set, then an in process instance // is started, and the job run against that. // -// At present, loopback mode is forced, though this will change once prism is able to -// use SDK containers. +// If an environment type isn't set, then loopback mode is forced and started up. func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { if *jobopts.Endpoint == "" { // One hasn't been selected, so lets start one up and set the address. @@ -50,7 +49,8 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) s := jobservices.NewServer(0, internal.RunPipeline) *jobopts.Endpoint = s.Endpoint() go s.Serve() - if !jobopts.IsLoopback() { + // If the environmentType isn't set, use loopback instead. + if *jobopts.EnvironmentType == "" { *jobopts.EnvironmentType = "loopback" } }