Skip to content

Commit

Permalink
Merge pull request apache#1 from giusdp/main
Browse files Browse the repository at this point in the history
Add streamer server
  • Loading branch information
sciabarracom authored Dec 9, 2024
2 parents 5715288 + dfbcaae commit 8226eb5
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 1 deletion.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
# openserverless-streamer
Apache openserverless

The openserverless streamer is a tool to relay a stream from OpenWhisk actions to an outside
HTTP client.

The streamer is a simple HTTP server that exposes an endpoint /stream/{namespace}/{action} to
invoke the relative OpenWhisk action, open a socket for the action to write to, and relay the
output to the client.

It expects 2 environment variables to be set:
- `APIHOST`: the OpenWhisk API host
- `STREAMER_ADDR`: the address of the streamer server for the OpenWhisk actions to connect to

20 changes: 20 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module github.com/apache/openserverless-streaming-proxy

go 1.23.3

require github.com/apache/openwhisk-client-go v0.0.0-20241028140229-bb8408824b9b

require (
github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21 // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/nicksnyder/go-i18n v1.10.3 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
55 changes: 55 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
github.com/apache/openwhisk-client-go v0.0.0-20241028140229-bb8408824b9b h1:TH5kG6vfWi6t0T3XLP5i+HBY2iZ0UtHvP43n92ttx2Y=
github.com/apache/openwhisk-client-go v0.0.0-20241028140229-bb8408824b9b/go.mod h1:2ipmJ/3d2lYbfhmHq3bNj5Q876f20daGybloGmdrbzQ=
github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21 h1:tuijfIjZyjZaHq9xDUh0tNitwXshJpbLkqMOJv4H3do=
github.com/cloudfoundry/jibber_jabber v0.0.0-20151120183258-bcc4c8345a21/go.mod h1:po7NpZ/QiTKzBKyrsEAxwnTamCoh8uDk/egRpQ7siIc=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8=
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nicksnyder/go-i18n v1.10.3 h1:0U60fnLBNrLBVt8vb8Q67yKNs+gykbQuLsIkiesJL+w=
github.com/nicksnyder/go-i18n v1.10.3/go.mod h1:hvLG5HTlZ4UfSuVLSRuX7JRUomIaoKQM19hm6f+no7o=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4=
github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg=
github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
152 changes: 152 additions & 0 deletions httpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package main

import (
"context"
"encoding/json"
"log"
"net"
"net/http"
"os"
"strings"
)

func startHTTPServer(streamingProxyAddr string, apihost string) {
httpPort := os.Getenv("HTTP_SERVER_PORT")
if httpPort == "" {
httpPort = "80"
}

router := http.NewServeMux()

router.HandleFunc("GET /", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Streamer proxy running"))
})
router.HandleFunc("POST /stream/{ns}/{action}", handleHTTPStream(streamingProxyAddr, apihost))
router.HandleFunc("POST /stream/{ns}/{pkg}/{action}", handleHTTPStream(streamingProxyAddr, apihost))

server := &http.Server{
Addr: ":" + httpPort,
Handler: router,
}

log.Println("HTTP server listening on port", httpPort)
if err := server.ListenAndServe(); err != nil {
log.Println("Error starting HTTP server:", err)
}
}

func handleHTTPStream(streamingProxyAddr string, apihost string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx, done := context.WithCancel(context.Background())

streamDataChan := make(chan []byte)

namespace := r.PathValue("ns")
pkg := r.PathValue("pkg")
action := r.PathValue("action")

actionToInvoke := action
if pkg != "" {
actionToInvoke = pkg + "/" + action
}

log.Println("Received request for", namespace, pkg, action)

apiKey := r.Header.Get("Authorization")
if apiKey == "" {
http.Error(w, "Missing Authorization header", http.StatusBadRequest)
done()
return
}

// get the apikey without the Bearer prefix
apiKey = strings.TrimPrefix(apiKey, "Bearer ")

// Create OpenWhisk client
client := NewOpenWhiskClient(apihost, apiKey, namespace)

// opens a socket for listening in a random port
socketServer, err := startTCPServer(ctx, streamingProxyAddr, streamDataChan)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}
go socketServer.WaitToCleanUp()

tcpServerHost, tcpServerPort, err := net.SplitHostPort(socketServer.listener.Addr().String())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}

// parse the json body and add STREAM_HOST and STREAM_PORT
body := r.Body
defer body.Close()

jsonBody := make(map[string]interface{})
err = json.NewDecoder(body).Decode(&jsonBody)
if err != nil {
http.Error(w, "Error decoding JSON body: "+err.Error(), http.StatusInternalServerError)
done()
return
}

jsonBody["STREAM_HOST"] = tcpServerHost
jsonBody["STREAM_PORT"] = tcpServerPort

log.Println("Enriched JSON body with STREAM_HOST and STREAM_PORT")

// invoke the action
res, httpResp, err := client.Actions.Invoke(actionToInvoke, jsonBody, false, false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}

if httpResp.StatusCode != http.StatusAccepted {
http.Error(w, "Error invoking action: "+httpResp.Status, http.StatusInternalServerError)
done()
return
}

if m, ok := res.(map[string]interface{}); ok {
log.Println("Action invoked:", m["activationId"])
} else {
http.Error(w, "Unexpected reply from action invocation", http.StatusInternalServerError)
done()
return
}

// Flush the headers
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
done()
return
}

log.Println("Setup streaming data to client")

for {
select {
case data := <-streamDataChan:
log.Println("Sending data to client:", string(data))
_, err := w.Write([]byte("data: " + string(data) + "\n\n"))
if err != nil {
log.Println("Error writing to HTTP response:", err)
done()
return
}
flusher.Flush()
case <-r.Context().Done():
log.Println("HTTP Client closed connection")
done()
return
}
}
}

}
17 changes: 17 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import "os"

func main() {
owApihost := os.Getenv("OW_APIHOST")
if owApihost == "" {
panic("OW_APIHOST is not set")
}

streamerAddr := os.Getenv("STREAMER_ADDR")
if streamerAddr == "" {
panic("STREAMER_ADDR is not set")
}

startHTTPServer(streamerAddr, owApihost)
}
22 changes: 22 additions & 0 deletions ow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"net/http"

"github.com/apache/openwhisk-client-go/whisk"
)

func NewOpenWhiskClient(apiHost string, apiKey string, namespace string) *whisk.Client {
client, err := whisk.NewClient(http.DefaultClient,
&whisk.Config{
Host: apiHost,
Namespace: namespace,
AuthToken: apiKey,
})

if err != nil {
panic(err)
}

return client
}
107 changes: 107 additions & 0 deletions tcpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package main

import (
"context"
"errors"
"io"
"log"
"net"
"sync"
"time"
)

type SocketsServer struct {
ctx context.Context
listener net.Listener
streamDataChan chan []byte
wg sync.WaitGroup
}

func startTCPServer(ctx context.Context, streamingProxyAddr string, streamDataChan chan []byte) (*SocketsServer, error) {
listener, err := net.Listen("tcp", streamingProxyAddr+":0")
if err != nil {
return nil, errors.New("Error starting TCP server")
}

s := &SocketsServer{
ctx: ctx,
listener: listener,
streamDataChan: streamDataChan,
}

s.wg.Add(1)
go s.acceptConnections()

log.Println("New TCP server listening on:", s.listener.Addr().String())
return s, nil
}

func (s *SocketsServer) acceptConnections() {
defer s.wg.Done()

for {
log.Println("Accepting connections on", s.listener.Addr().String())
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.ctx.Done():
log.Println("Stopped accepting connections")
return
default:
log.Println("accept error", err.Error())
}
} else {
s.wg.Add(1)
go func() {
s.handleConnection(conn)
s.wg.Done()
}()
}
}

}
func (s *SocketsServer) handleConnection(conn net.Conn) {
defer conn.Close()
buf := make([]byte, 2048)

ReadLoop:
for {
select {

case <-s.ctx.Done():
log.Println("Closing TCP connection")
return

default:
conn.SetDeadline(time.Now().Add(100 * time.Millisecond))

for {
n, err := conn.Read(buf)

if err != nil {
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue ReadLoop
} else if err != io.EOF {
log.Println("Error reading from TCP connection", err)
return
}
}

if n == 0 {
continue ReadLoop
}

s.streamDataChan <- buf[:n]
}

}
}
}

func (s *SocketsServer) WaitToCleanUp() {
<-s.ctx.Done()
log.Println("Stopping listening on", s.listener.Addr().String())
s.listener.Close()
s.wg.Wait()
log.Print("TCP server on", s.listener.Addr().String(), "closed\n\n")
}
Loading

0 comments on commit 8226eb5

Please sign in to comment.