diff --git a/.env b/.env new file mode 100644 index 0000000..13468cb --- /dev/null +++ b/.env @@ -0,0 +1,3 @@ +# Env file defines useful environment variables for editors. +GOOS=js +GOARCH=wasm diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/go.mod b/go.mod index 3d77f42..d74349d 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,6 @@ module github.com/hack-pad/go-webworkers go 1.19 require ( - github.com/hack-pad/go-indexeddb v0.3.2 // indirect - github.com/hack-pad/safejs v0.1.0 // indirect - golang.org/x/mod v0.7.0 // indirect - golang.org/x/sys v0.4.0 // indirect - golang.org/x/tools v0.5.0 // indirect + github.com/hack-pad/safejs v0.1.1 + github.com/pkg/errors v0.9.1 ) diff --git a/go.sum b/go.sum index 8ef6360..3c0f7d3 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,7 @@ -github.com/hack-pad/go-indexeddb v0.3.2 h1:DTqeJJYc1usa45Q5r52t01KhvlSN02+Oq+tQbSBI91A= -github.com/hack-pad/go-indexeddb v0.3.2/go.mod h1:QvfTevpDVlkfomY498LhstjwbPW6QC4VC/lxYb0Kom0= -github.com/hack-pad/safejs v0.1.0 h1:qPS6vjreAqh2amUqj4WNG1zIw7qlRQJ9K10eDKMCnE8= -github.com/hack-pad/safejs v0.1.0/go.mod h1:HdS+bKF1NrE72VoXZeWzxFOVQVUSqZJAG0xNCnb+Tio= +github.com/hack-pad/safejs v0.1.1 h1:d5qPO0iQ7h2oVtpzGnLExE+Wn9AtytxIfltcS2b9KD8= +github.com/hack-pad/safejs v0.1.1/go.mod h1:HdS+bKF1NrE72VoXZeWzxFOVQVUSqZJAG0xNCnb+Tio= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= -golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4= -golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k= diff --git a/worker/message_event.go b/worker/message_event.go new file mode 100644 index 0000000..44adf70 --- /dev/null +++ b/worker/message_event.go @@ -0,0 +1,40 @@ +//go:build js && wasm + +package worker + +import ( + "github.com/hack-pad/safejs" + "github.com/pkg/errors" +) + +// MessageEvent is received from the channel returned by Listen(). +// Represents a JS MessageEvent. +type MessageEvent struct { + data safejs.Value + err error + target *messagePort +} + +// Data returns this event's data or a parse error +func (e MessageEvent) Data() (safejs.Value, error) { + return e.data, errors.Wrapf(e.err, "failed to parse MessageEvent %+v", e.data) +} + +func parseMessageEvent(v safejs.Value) MessageEvent { + value, err := v.Get("target") + if err != nil { + return MessageEvent{err: err} + } + target, err := wrapMessagePort(value) + if err != nil { + return MessageEvent{err: err} + } + data, err := v.Get("data") + if err != nil { + return MessageEvent{err: err} + } + return MessageEvent{ + data: data, + target: target, + } +} diff --git a/worker/message_port.go b/worker/message_port.go new file mode 100644 index 0000000..8fb0e56 --- /dev/null +++ b/worker/message_port.go @@ -0,0 +1,98 @@ +//go:build js && wasm + +package worker + +import ( + "context" + "fmt" + + "github.com/hack-pad/safejs" +) + +type messagePort struct { + jsMessagePort safejs.Value +} + +func wrapMessagePort(v safejs.Value) (*messagePort, error) { + someMethod, err := v.Get("postMessage") + if err != nil { + return nil, err + } + if truthy, err := someMethod.Truthy(); err != nil || !truthy { + return nil, fmt.Errorf("invalid MessagePort value: postMessage is not a function") + } + return &messagePort{v}, nil +} + +func (p *messagePort) PostMessage(data safejs.Value, transfers []safejs.Value) error { + args := append([]any{data}, toJSSlice(transfers)) + _, err := p.jsMessagePort.Call("postMessage", args...) + return err +} + +func toJSSlice[Type any](slice []Type) []any { + newSlice := make([]any, len(slice)) + for i := range slice { + newSlice[i] = slice[i] + } + return newSlice +} + +func (p *messagePort) Listen(ctx context.Context) (_ <-chan MessageEvent, err error) { + ctx, cancel := context.WithCancel(ctx) + defer func() { + if err != nil { + cancel() + } + }() + + events := make(chan MessageEvent) + messageHandler, err := nonBlocking(func(args []safejs.Value) { + events <- parseMessageEvent(args[0]) + }) + if err != nil { + return nil, err + } + errorHandler, err := nonBlocking(func(args []safejs.Value) { + events <- parseMessageEvent(args[0]) + }) + if err != nil { + return nil, err + } + + go func() { + <-ctx.Done() + _, err := p.jsMessagePort.Call("removeEventListener", "message", messageHandler) + if err == nil { + messageHandler.Release() + } + _, err = p.jsMessagePort.Call("removeEventListener", "messageerror", errorHandler) + if err == nil { + errorHandler.Release() + } + close(events) + }() + _, err = p.jsMessagePort.Call("addEventListener", "message", messageHandler) + if err != nil { + return nil, err + } + _, err = p.jsMessagePort.Call("addEventListener", "messageerror", errorHandler) + if err != nil { + return nil, err + } + if start, err := p.jsMessagePort.Get("start"); err == nil { + if truthy, err := start.Truthy(); err == nil && truthy { + if _, err := p.jsMessagePort.Call("start"); err != nil { + return nil, err + } + } + } + return events, nil +} + +func nonBlocking(fn func(args []safejs.Value)) (safejs.Func, error) { + return safejs.FuncOf(func(_ safejs.Value, args []safejs.Value) any { + go fn(args) + return nil + }) +} diff --git a/worker/self.go b/worker/self.go new file mode 100644 index 0000000..cd773cd --- /dev/null +++ b/worker/self.go @@ -0,0 +1,66 @@ +//go:build js && wasm + +package worker + +import ( + "context" + + "github.com/hack-pad/safejs" +) + +// GlobalSelf represents the global scope, named "self", in the context of using Workers. +// Supports sending and receiving messages via PostMessage() and Listen(). +type GlobalSelf struct { + self safejs.Value + port *messagePort +} + +// Self returns the global "self" +func Self() (*GlobalSelf, error) { + self, err := safejs.Global().Get("self") + if err != nil { + return nil, err + } + port, err := wrapMessagePort(self) + if err != nil { + return nil, err + } + return &GlobalSelf{ + self: self, + port: port, + }, nil +} + +// PostMessage sends data in a message to the main thread that spawned it, +// optionally transferring ownership of all items in transfers. +// +// The data may be any value handled by the "structured clone algorithm", which includes cyclical references. +// +// Transfers is an optional array of Transferable objects to transfer ownership of. +// If the ownership of an object is transferred, it becomes unusable in the context it was sent from and becomes available only to the worker it was sent to. +// Transferable objects are instances of classes like ArrayBuffer, MessagePort or ImageBitmap objects that can be transferred. +// null is not an acceptable value for transfer. +func (s *GlobalSelf) PostMessage(message safejs.Value, transfers []safejs.Value) error { + return s.port.PostMessage(message, transfers) +} + +// Listen sends message events on a channel for events fired by worker.postMessage() calls inside the main thread's global scope. +// Stops the listener and closes the channel when ctx is canceled. +func (s *GlobalSelf) Listen(ctx context.Context) (<-chan MessageEvent, error) { + return s.port.Listen(ctx) +} + +// Close discards any tasks queued in the global scope's event loop, effectively closing this particular scope. +func (s *GlobalSelf) Close() error { + _, err := s.self.Call("close") + return err +} + +// Name returns the name that the Worker was (optionally) given when it was created. +func (s *GlobalSelf) Name() (string, error) { + name, err := s.self.Get("name") + if err != nil { + return "", err + } + return name.String() +} diff --git a/worker/self_test.go b/worker/self_test.go new file mode 100644 index 0000000..2221883 --- /dev/null +++ b/worker/self_test.go @@ -0,0 +1,35 @@ +//go:build js && wasm + +package worker + +import ( + "testing" + + "github.com/hack-pad/safejs" +) + +func TestSelf(t *testing.T) { + t.Parallel() + self, err := Self() + if err != nil { + t.Fatal(err) + } + if !self.self.Equal(safejs.MustGetGlobal("self")) { + t.Error("self is not equal to the global self") + } +} + +func TestSelfName(t *testing.T) { + t.Parallel() + self, err := Self() + if err != nil { + t.Fatal(err) + } + name, err := self.Name() + if err != nil { + t.Fatal(err) + } + if name != "" { + t.Errorf("Expected %q, got %q", "", name) + } +} diff --git a/worker/worker.go b/worker/worker.go index 2a1df85..17ec9b0 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,16 +1,101 @@ //go:build js && wasm -// +build js,wasm // Package worker provides a Web Workers driver for Go code compiled to WebAssembly. package worker -import "errors" +import ( + "context" -// Worker is a Web Worker, which represents a background task that can be created via script. -// Workers can send messages back to its creator. -type Worker struct{} + "github.com/hack-pad/safejs" +) -// NewWorker returns a new Worker -func NewWorker() (*Worker, error) { - return nil, errors.New("not implemented") +var ( + jsWorker = safejs.MustGetGlobal("Worker") + jsURL = safejs.MustGetGlobal("URL") + jsBlob = safejs.MustGetGlobal("Blob") +) + +// Worker is a Web Worker, which represents a background task created via a script. +// Use Listen() and PostMessage() to communicate with the worker. +type Worker struct { + worker safejs.Value + port *messagePort +} + +// Options contains optional configuration for new Workers +type Options struct { + // Name specifies an identifying name for the DedicatedWorkerGlobalScope representing the scope of the worker, which is mainly useful for debugging purposes. + Name string +} + +func (w Options) toJSValue() (safejs.Value, error) { + options := make(map[string]any) + if w.Name != "" { + options["name"] = w.Name + } + return safejs.ValueOf(options) +} + +// New starts a worker with the given script's URL and returns it +func New(url string, options Options) (*Worker, error) { + jsOptions, err := options.toJSValue() + if err != nil { + return nil, err + } + worker, err := jsWorker.New(url, jsOptions) + if err != nil { + return nil, err + } + port, err := wrapMessagePort(worker) + if err != nil { + return nil, err + } + return &Worker{ + port: port, + worker: worker, + }, nil +} + +// NewFromScript is like New, but starts the worker with the given script (in JavaScript) +func NewFromScript(jsScript string, options Options) (*Worker, error) { + blob, err := jsBlob.New([]any{jsScript}, map[string]any{ + "type": "text/javascript", + }) + if err != nil { + return nil, err + } + objectURL, err := jsURL.Call("createObjectURL", blob) + if err != nil { + return nil, err + } + objectURLStr, err := objectURL.String() + if err != nil { + return nil, err + } + return New(objectURLStr, options) +} + +// Terminate immediately terminates the Worker. +// This does not offer the worker an opportunity to finish its operations; it is stopped at once. +func (w *Worker) Terminate() error { + _, err := w.worker.Call("terminate") + return err +} + +// PostMessage sends data in a message to the worker, optionally transferring ownership of all items in transfers. +// +// The data may be any value handled by the "structured clone algorithm", which includes cyclical references. +// +// Transfers is an optional array of Transferable objects to transfer ownership of. +// If the ownership of an object is transferred, it becomes unusable in the context it was sent from and becomes available only to the worker it was sent to. +// Transferable objects are instances of classes like ArrayBuffer, MessagePort or ImageBitmap objects that can be transferred. +// null is not an acceptable value for transfer. +func (w *Worker) PostMessage(data safejs.Value, transfers []safejs.Value) error { + return w.port.PostMessage(data, transfers) +} + +// Listen sends message events on a channel for events fired by self.postMessage() calls inside the Worker's global scope. +// Stops the listener and closes the channel when ctx is canceled. +func (w *Worker) Listen(ctx context.Context) (<-chan MessageEvent, error) { + return w.port.Listen(ctx) } diff --git a/worker/worker_test.go b/worker/worker_test.go index fa760b2..52eb094 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,14 +1,303 @@ //go:build js && wasm -// +build js,wasm package worker -import "testing" +import ( + "context" + "fmt" + "testing" + "time" -func TestNewWorker(t *testing.T) { + "github.com/hack-pad/safejs" +) + +var ( + jsJSON = safejs.MustGetGlobal("JSON") + jsUint8Array = safejs.MustGetGlobal("Uint8Array") +) + +func TestWorkerOptionsToJSValue(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + description string + options Options + expect any + }{ + { + description: "no options", + options: Options{}, + expect: map[string]any{}, + }, + { + description: "name", + options: Options{ + Name: "foo", + }, + expect: map[string]any{ + "name": "foo", + }, + }, + } { + tc := tc // enable parallel sub-tests + t.Run(tc.description, func(t *testing.T) { + t.Parallel() + value, err := tc.options.toJSValue() + if err != nil { + t.Fatal(err) + } + expect, err := safejs.ValueOf(tc.expect) + if err != nil { + t.Fatal(err) + } + expectJSON, actualJSON := stringify(t, expect), stringify(t, value) + if expectJSON != actualJSON { + t.Errorf("\nExpected %v\nActual: %v", expectJSON, actualJSON) + } + }) + } +} + +func stringify(t *testing.T, obj safejs.Value) string { + t.Helper() + json, err := jsJSON.Call("stringify", obj) + if err != nil { + t.Fatal(err) + } + str, err := json.String() + if err != nil { + t.Fatal(err) + } + return str +} + +func makeBlobURL(t *testing.T, contents []byte, contentType string) string { + t.Helper() + jsContents, err := jsUint8Array.New(len(contents)) + if err != nil { + t.Fatal(err) + } + _, err = safejs.CopyBytesToJS(jsContents, contents) + if err != nil { + t.Fatal(err) + } + blob, err := jsBlob.New([]any{jsContents}, map[string]any{ + "type": contentType, + }) + if err != nil { + t.Fatal(err) + } + url, err := jsURL.Call("createObjectURL", blob) + if err != nil { + t.Fatal(err) + } + urlString, err := url.String() + if err != nil { + t.Fatal(err) + } + return urlString +} + +func cleanUpWorker(t *testing.T, worker *Worker) { + t.Cleanup(func() { + err := worker.Terminate() + if err != nil { + t.Fatal(err) + } + }) +} + +func TestNew(t *testing.T) { + t.Parallel() + const messageText = "Hello, world!" + blobURL := makeBlobURL(t, []byte(fmt.Sprintf(`"use strict"; +self.postMessage(%q); +`, messageText)), "text/javascript") + worker, err := New(blobURL, Options{}) + if err != nil { + t.Fatal(err) + } + cleanUpWorker(t, worker) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + message := <-messages + data, err := message.Data() + if err != nil { + t.Fatal(err) + } + dataStr, err := data.String() + if err != nil { + t.Fatal(err) + } + if dataStr != messageText { + t.Errorf("Expected %q, got %q", messageText, dataStr) + } +} + +func TestNewFromScript(t *testing.T) { + t.Parallel() + const messageText = "Hello, world!" + script := fmt.Sprintf(` +"use strict"; + +self.postMessage(%q); +`, messageText) + worker, err := NewFromScript(script, Options{}) + if err != nil { + t.Fatal(err) + } + cleanUpWorker(t, worker) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + message := <-messages + data, err := message.Data() + if err != nil { + t.Fatal(err) + } + dataStr, err := data.String() + if err != nil { + t.Fatal(err) + } + if dataStr != messageText { + t.Errorf("Expected %q, got %q", messageText, dataStr) + } +} + +func TestWorkerTerminate(t *testing.T) { t.Parallel() - _, err := NewWorker() - if err == nil || err.Error() != "not implemented" { - t.Error("NewWorker should not be implemented, got:", err) + worker, err := NewFromScript(` +"use strict"; + +self.postMessage("start"); +self.setTimeout(() => self.postMessage("done waiting"), 200); +`, Options{}) + if err != nil { + t.Fatal(err) + } + cleanUpWorker(t, worker) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + message := <-messages + data, err := message.Data() + if err != nil { + t.Fatal(err) + } + dataStr, err := data.String() + if err != nil { + t.Error(err) + } + if dataStr != "start" { + t.Fatalf("Expected worker to send 'start', got %s", dataStr) + } + + err = worker.Terminate() + if err != nil { + t.Fatal(err) } + + select { + case message := <-messages: + t.Errorf("Should not receive the delayed message on a terminated worker, got: %v", message) + case <-time.After(400 * time.Millisecond): + } +} + +func TestWorkerPostMessage(t *testing.T) { + t.Parallel() + const pingPongScript = ` +"use strict"; + +self.addEventListener("message", event => { + self.postMessage(event.data + " pong!") +}); +` + pingMessage, err := safejs.ValueOf("ping!") + if err != nil { + t.Fatal(err) + } + + t.Run("listen before post", func(t *testing.T) { + t.Parallel() + worker, err := NewFromScript(pingPongScript, Options{}) + if err != nil { + t.Fatal(err) + } + cleanUpWorker(t, worker) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + + err = worker.PostMessage(pingMessage, nil) + if err != nil { + t.Fatal(err) + } + + message := <-messages + data, err := message.Data() + if err != nil { + t.Fatal(err) + } + dataStr, err := data.String() + if err != nil { + t.Error(err) + } + expectedResponse := "ping! pong!" + if dataStr != expectedResponse { + t.Errorf("Expected response %q, got: %q", expectedResponse, dataStr) + } + }) + + t.Run("listen after post", func(t *testing.T) { + t.Parallel() + worker, err := NewFromScript(pingPongScript, Options{}) + if err != nil { + t.Fatal(err) + } + cleanUpWorker(t, worker) + + err = worker.PostMessage(pingMessage, nil) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + + message := <-messages + data, err := message.Data() + if err != nil { + t.Error(err) + } + dataStr, err := data.String() + if err != nil { + t.Error(err) + } + expectedResponse := "ping! pong!" + if dataStr != expectedResponse { + t.Errorf("Expected response %q, got: %q", expectedResponse, dataStr) + } + }) }