diff --git a/README.md b/README.md index 2963ca4..9736574 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,13 @@ # openserverless-streamer -Apache openserverless + +The openserverless streamer is a tool to relay a stream from OpenWhisk actions to an outside +HTTP client. + +The streamer is a simple HTTP server that exposes an endpoint /stream/{namespace}/{action} to +invoke the relative OpenWhisk action, open a socket for the action to write to, and relay the +output to the client. + +It expects 2 environment variables to be set: +- `APIHOST`: the OpenWhisk API host +- `STREAMER_ADDR`: the address of the streamer server for the OpenWhisk actions to connect to + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..433db23 --- /dev/null +++ b/go.mod @@ -0,0 +1,20 @@ +module github.com/apache/openserverless-streaming-proxy + +go 1.23.3 + +require github.com/apache/openwhisk-client-go v0.0.0-20241028140229-bb8408824b9b + +require ( + github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21 // indirect + github.com/fatih/color v1.17.0 // indirect + github.com/google/go-querystring v1.1.0 // indirect + github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/nicksnyder/go-i18n v1.10.3 // indirect + github.com/nxadm/tail v1.4.11 // indirect + github.com/pelletier/go-toml v1.9.5 // indirect + golang.org/x/net v0.32.0 // indirect + golang.org/x/sys v0.28.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4eced11 --- /dev/null +++ b/go.sum @@ -0,0 +1,55 @@ +github.com/apache/openwhisk-client-go v0.0.0-20241028140229-bb8408824b9b h1:TH5kG6vfWi6t0T3XLP5i+HBY2iZ0UtHvP43n92ttx2Y= +github.com/apache/openwhisk-client-go v0.0.0-20241028140229-bb8408824b9b/go.mod h1:2ipmJ/3d2lYbfhmHq3bNj5Q876f20daGybloGmdrbzQ= +github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21 h1:tuijfIjZyjZaHq9xDUh0tNitwXshJpbLkqMOJv4H3do= +github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21/go.mod h1:po7NpZ/QiTKzBKyrsEAxwnTamCoh8uDk/egRpQ7siIc= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= +github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= +github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= +github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8= +github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/nicksnyder/go-i18n v1.10.3 h1:0U60fnLBNrLBVt8vb8Q67yKNs+gykbQuLsIkiesJL+w= +github.com/nicksnyder/go-i18n v1.10.3/go.mod h1:hvLG5HTlZ4UfSuVLSRuX7JRUomIaoKQM19hm6f+no7o= +github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= +github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= +github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= +github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= +github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= +github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/httpserver.go b/httpserver.go new file mode 100644 index 0000000..8efe601 --- /dev/null +++ b/httpserver.go @@ -0,0 +1,152 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "net" + "net/http" + "os" + "strings" +) + +func startHTTPServer(streamingProxyAddr string, apihost string) { + httpPort := os.Getenv("HTTP_SERVER_PORT") + if httpPort == "" { + httpPort = "80" + } + + router := http.NewServeMux() + + router.HandleFunc("GET /", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("Streamer proxy running")) + }) + router.HandleFunc("POST /stream/{ns}/{action}", handleHTTPStream(streamingProxyAddr, apihost)) + router.HandleFunc("POST /stream/{ns}/{pkg}/{action}", handleHTTPStream(streamingProxyAddr, apihost)) + + server := &http.Server{ + Addr: ":" + httpPort, + Handler: router, + } + + log.Println("HTTP server listening on port", httpPort) + if err := server.ListenAndServe(); err != nil { + log.Println("Error starting HTTP server:", err) + } +} + +func handleHTTPStream(streamingProxyAddr string, apihost string) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + ctx, done := context.WithCancel(context.Background()) + + streamDataChan := make(chan []byte) + + namespace := r.PathValue("ns") + pkg := r.PathValue("pkg") + action := r.PathValue("action") + + actionToInvoke := action + if pkg != "" { + actionToInvoke = pkg + "/" + action + } + + log.Println("Received request for", namespace, pkg, action) + + apiKey := r.Header.Get("Authorization") + if apiKey == "" { + http.Error(w, "Missing Authorization header", http.StatusBadRequest) + done() + return + } + + // get the apikey without the Bearer prefix + apiKey = strings.TrimPrefix(apiKey, "Bearer ") + + // Create OpenWhisk client + client := NewOpenWhiskClient(apihost, apiKey, namespace) + + // opens a socket for listening in a random port + socketServer, err := startTCPServer(ctx, streamingProxyAddr, streamDataChan) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + done() + return + } + go socketServer.WaitToCleanUp() + + tcpServerHost, tcpServerPort, err := net.SplitHostPort(socketServer.listener.Addr().String()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + done() + return + } + + // parse the json body and add STREAM_HOST and STREAM_PORT + body := r.Body + defer body.Close() + + jsonBody := make(map[string]interface{}) + err = json.NewDecoder(body).Decode(&jsonBody) + if err != nil { + http.Error(w, "Error decoding JSON body: "+err.Error(), http.StatusInternalServerError) + done() + return + } + + jsonBody["STREAM_HOST"] = tcpServerHost + jsonBody["STREAM_PORT"] = tcpServerPort + + log.Println("Enriched JSON body with STREAM_HOST and STREAM_PORT") + + // invoke the action + res, httpResp, err := client.Actions.Invoke(actionToInvoke, jsonBody, false, false) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + done() + return + } + + if httpResp.StatusCode != http.StatusAccepted { + http.Error(w, "Error invoking action: "+httpResp.Status, http.StatusInternalServerError) + done() + return + } + + if m, ok := res.(map[string]interface{}); ok { + log.Println("Action invoked:", m["activationId"]) + } else { + http.Error(w, "Unexpected reply from action invocation", http.StatusInternalServerError) + done() + return + } + + // Flush the headers + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported", http.StatusInternalServerError) + done() + return + } + + log.Println("Setup streaming data to client") + + for { + select { + case data := <-streamDataChan: + log.Println("Sending data to client:", string(data)) + _, err := w.Write([]byte("data: " + string(data) + "\n\n")) + if err != nil { + log.Println("Error writing to HTTP response:", err) + done() + return + } + flusher.Flush() + case <-r.Context().Done(): + log.Println("HTTP Client closed connection") + done() + return + } + } + } + +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..9510236 --- /dev/null +++ b/main.go @@ -0,0 +1,17 @@ +package main + +import "os" + +func main() { + owApihost := os.Getenv("OW_APIHOST") + if owApihost == "" { + panic("OW_APIHOST is not set") + } + + streamerAddr := os.Getenv("STREAMER_ADDR") + if streamerAddr == "" { + panic("STREAMER_ADDR is not set") + } + + startHTTPServer(streamerAddr, owApihost) +} diff --git a/ow.go b/ow.go new file mode 100644 index 0000000..a2f992c --- /dev/null +++ b/ow.go @@ -0,0 +1,22 @@ +package main + +import ( + "net/http" + + "github.com/apache/openwhisk-client-go/whisk" +) + +func NewOpenWhiskClient(apiHost string, apiKey string, namespace string) *whisk.Client { + client, err := whisk.NewClient(http.DefaultClient, + &whisk.Config{ + Host: apiHost, + Namespace: namespace, + AuthToken: apiKey, + }) + + if err != nil { + panic(err) + } + + return client +} diff --git a/tcpserver.go b/tcpserver.go new file mode 100644 index 0000000..063b3e2 --- /dev/null +++ b/tcpserver.go @@ -0,0 +1,107 @@ +package main + +import ( + "context" + "errors" + "io" + "log" + "net" + "sync" + "time" +) + +type SocketsServer struct { + ctx context.Context + listener net.Listener + streamDataChan chan []byte + wg sync.WaitGroup +} + +func startTCPServer(ctx context.Context, streamingProxyAddr string, streamDataChan chan []byte) (*SocketsServer, error) { + listener, err := net.Listen("tcp", streamingProxyAddr+":0") + if err != nil { + return nil, errors.New("Error starting TCP server") + } + + s := &SocketsServer{ + ctx: ctx, + listener: listener, + streamDataChan: streamDataChan, + } + + s.wg.Add(1) + go s.acceptConnections() + + log.Println("New TCP server listening on:", s.listener.Addr().String()) + return s, nil +} + +func (s *SocketsServer) acceptConnections() { + defer s.wg.Done() + + for { + log.Println("Accepting connections on", s.listener.Addr().String()) + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.ctx.Done(): + log.Println("Stopped accepting connections") + return + default: + log.Println("accept error", err.Error()) + } + } else { + s.wg.Add(1) + go func() { + s.handleConnection(conn) + s.wg.Done() + }() + } + } + +} +func (s *SocketsServer) handleConnection(conn net.Conn) { + defer conn.Close() + buf := make([]byte, 2048) + +ReadLoop: + for { + select { + + case <-s.ctx.Done(): + log.Println("Closing TCP connection") + return + + default: + conn.SetDeadline(time.Now().Add(100 * time.Millisecond)) + + for { + n, err := conn.Read(buf) + + if err != nil { + if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { + continue ReadLoop + } else if err != io.EOF { + log.Println("Error reading from TCP connection", err) + return + } + } + + if n == 0 { + continue ReadLoop + } + + s.streamDataChan <- buf[:n] + } + + } + } +} + +func (s *SocketsServer) WaitToCleanUp() { + <-s.ctx.Done() + log.Println("Stopping listening on", s.listener.Addr().String()) + s.listener.Close() + s.wg.Wait() + log.Print("TCP server on", s.listener.Addr().String(), "closed\n\n") +} diff --git a/tcpserver_test.go b/tcpserver_test.go new file mode 100644 index 0000000..43f3be3 --- /dev/null +++ b/tcpserver_test.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "net" + "sync" + "testing" + "time" +) + +func TestHandleConnection(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streamDataChan := make(chan []byte, 10) + server := &SocketsServer{ + ctx: ctx, + streamDataChan: streamDataChan, + } + + // Create a pipe to simulate a network connection + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + defer serverConn.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + server.handleConnection(serverConn) + }() + + // Write data to the client connection + testData := []byte("test data") + clientConn.Write(testData) + + // Read data from the streamDataChan + select { + case receivedData := <-streamDataChan: + if string(receivedData) != string(testData) { + t.Errorf("Expected %s, but got %s", string(testData), string(receivedData)) + } + case <-time.After(1 * time.Second): + t.Error("Timeout waiting for data") + } + + // Cancel the context to stop the server + cancel() + wg.Wait() +} + +func TestHandleConnectionContextCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + streamDataChan := make(chan []byte, 10) + server := &SocketsServer{ + ctx: ctx, + streamDataChan: streamDataChan, + } + + // Create a pipe to simulate a network connection + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + defer serverConn.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + server.handleConnection(serverConn) + }() + + // Cancel the context to stop the server + cancel() + wg.Wait() + + // Write data to the client connection after context is cancelled + testData := []byte("test data") + clientConn.Write(testData) + + // Ensure no data is read from the streamDataChan + select { + case <-streamDataChan: + t.Error("Expected no data, but got some") + case <-time.After(100 * time.Millisecond): + // Expected case + } +}