Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi Deployment - multiple dmsg-discovery configuration [WIP] #281

Merged
merged 9 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 145 additions & 210 deletions cmd/dmsgcurl/commands/dmsgcurl.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/dmsgpty-host/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func fillConfigFromENV(conf dmsgpty.Config) (dmsgpty.Config, error) {
return conf, fmt.Errorf("failed to parse dmsg port: %w", err)
}

conf.DmsgPort = uint16(dmsgPort)
conf.DmsgPort = uint16(dmsgPort) //nolint
}

if val, ok := os.LookupEnv(envPrefix + "_CLINET"); ok {
Expand Down
8 changes: 3 additions & 5 deletions cmd/dmsgweb/commands/dmsgweb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/spf13/cobra"
"golang.org/x/net/proxy"

"github.com/skycoin/dmsg/internal/cli"
dmsg "github.com/skycoin/dmsg/pkg/dmsg"
"github.com/skycoin/dmsg/pkg/dmsghttp"
)
Expand Down Expand Up @@ -61,7 +62,7 @@ func init() {
RootCmd.Flags().StringVarP(&addProxy, "addproxy", "r", scriptExecString("${ADDPROXY}", dmsgwebconffile), "configure additional socks5 proxy for dmsgweb (i.e. 127.0.0.1:1080)")
RootCmd.Flags().UintSliceVarP(&webPort, "port", "p", scriptExecUintSlice("${WEBPORT[@]:-8080}", dmsgwebconffile), "port(s) to serve the web application")
RootCmd.Flags().StringSliceVarP(&resolveDmsgAddr, "resolve", "t", scriptExecStringSlice("${RESOLVEPK[@]}", dmsgwebconffile), "resolve the specified dmsg address:port on the local port & disable proxy")
RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "d", dmsgDisc, "dmsg discovery url")
RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", dmsgDisc, "dmsg discovery url(s)")
RootCmd.Flags().StringVarP(&proxyAddr, "proxy", "x", "", "connect to dmsg via proxy (i.e. '127.0.0.1:1080')")
RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", scriptExecInt("${DMSGSESSIONS:-1}", dmsgwebconffile), "number of dmsg servers to connect to")
RootCmd.Flags().BoolSliceVarP(&rawTCP, "rt", "c", scriptExecBoolSlice("${RAWTCP[@]:-false}", dmsgwebconffile), "proxy local port as raw TCP")
Expand Down Expand Up @@ -170,9 +171,6 @@ dmsgweb conf file detected: ` + dmsgwebconffile
if filterDomainSuffix == "" {
dmsgWebLog.Fatal("domain suffix to filter cannot be an empty string")
}
if dmsgDisc == "" {
dmsgDisc = dmsg.DiscAddr(false)
}
ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgWebLog)
defer cancel()

Expand Down Expand Up @@ -220,7 +218,7 @@ dmsgweb conf file detected: ` + dmsgwebconffile
ctx = context.WithValue(context.Background(), "socks5_proxy", proxyAddr) //nolint
}

dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk)
dmsgC, closeDmsg, err := cli.StartDmsg(ctx, dmsgWebLog, pk, sk, &httpC, dmsgDisc, dmsgSessions)
if err != nil {
dmsgWebLog.WithError(err).Fatal("failed to start dmsg")
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/dmsgweb/commands/dmsgwebsrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func init() {
srvCmd.Flags().UintSliceVarP(&localPort, "lport", "l", scriptExecUintSlice("${LOCALPORT[@]:-8086}", dmsgwebsrvconffile), "local application http interface port(s)")
srvCmd.Flags().UintSliceVarP(&dmsgPort, "dport", "d", scriptExecUintSlice("${DMSGPORT[@]:-80}", dmsgwebsrvconffile), "dmsg port(s) to serve")
srvCmd.Flags().StringSliceVarP(&wl, "wl", "w", scriptExecStringSlice("${WHITELISTPKS[@]}", dmsgwebsrvconffile), "whitelisted keys for dmsg authenticated routes\r")
srvCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", dmsgDisc, "dmsg discovery url")
srvCmd.Flags().StringVarP(&proxyAddr, "proxy", "x", "", "connect to dmsg via proxy (i.e. '127.0.0.1:1080')")
srvCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", dmsgDisc, "dmsg discovery url(s)")
srvCmd.Flags().StringVarP(&proxyAddr, "proxy", "x", proxyAddr, "connect to dmsg via proxy (i.e. '127.0.0.1:1080')")
srvCmd.Flags().IntVarP(&dmsgSess, "dsess", "e", scriptExecInt("${DMSGSESSIONS:-1}", dmsgwebsrvconffile), "dmsg sessions")
srvCmd.Flags().BoolSliceVarP(&rawTCP, "rt", "c", scriptExecBoolSlice("${RAWTCP[@]:-false}", dmsgwebsrvconffile), "proxy local port as raw TCP")
if os.Getenv("DMSGWEBSRVSK") != "" {
Expand Down
8 changes: 3 additions & 5 deletions cmd/dmsgweb/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
package commands

import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
Expand All @@ -21,7 +19,6 @@ import (
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging"
"golang.org/x/net/proxy"

"github.com/skycoin/dmsg/pkg/disc"
dmsg "github.com/skycoin/dmsg/pkg/dmsg"
)

Expand Down Expand Up @@ -63,7 +60,8 @@ func Execute() {
}
}

func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) {
/*
func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey, dmsgDisc string) (dmsgC *dmsg.Client, stop func(), err error) {
dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, httpClient, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions})
go dmsgC.Serve(ctx)

Expand All @@ -86,7 +84,7 @@ func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *
return dmsgC, stop, nil
}
}

*/
//TODO: these functions are more or less duplicated in several places - need to standardize and put in it's own library import in "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/..."

func scriptExecString(s, envfile string) string {
Expand Down
74 changes: 74 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Package cli internal/cli/cli.go
package cli

import (
"context"
"fmt"
"net/http"

"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging"

"github.com/skycoin/dmsg/pkg/disc"
"github.com/skycoin/dmsg/pkg/dmsg"
)

// StartDmsg starts dmsg returns a dmsg client for the given dmsg discovery
func StartDmsg(ctx context.Context, dmsgLogger *logging.Logger, pk cipher.PubKey, sk cipher.SecKey, httpClient *http.Client, dmsgDisc string, dmsgSessions int) (dmsgC *dmsg.Client, stop func(), err error) {
dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, httpClient, dmsgLogger), &dmsg.Config{MinSessions: dmsgSessions})
go dmsgC.Serve(context.Background())

stop = func() {
err := dmsgC.Close()
dmsgLogger.WithError(err).Debug("Disconnected from dmsg network.")
fmt.Printf("\n")
}
dmsgLogger.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc).
Debug("Connecting to dmsg network...")
select {
case <-ctx.Done():
stop()
return nil, nil, ctx.Err()

case <-dmsgC.Ready():
dmsgLogger.Debug("Dmsg network ready.")
return dmsgC, stop, nil
}
}

//TODO
/*
func startDmsgDirect(ctx context.Context, v *Visor, log *logging.Logger) error { //nolint:all
var keys cipher.PubKeys
servers := v.conf.Dmsg.Servers

if len(servers) == 0 {
return nil
}

keys = append(keys, v.conf.PK)
entries := direct.GetAllEntries(keys, servers)
dClient := direct.NewClient(entries, v.MasterLogger().PackageLogger("dmsg_http:direct_client"))

dmsgDC, closeDmsgDC, err := direct.StartDmsg(ctx, v.MasterLogger().PackageLogger("dmsg_http:dmsgDC"),
v.conf.PK, v.conf.SK, dClient, dmsg.DefaultConfig())
if err != nil {
return fmt.Errorf("failed to start dmsg: %w", err)
}

dmsgHTTP := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgDC)}

v.pushCloseStack("dmsg_http", func() error {
closeDmsgDC()
return nil
})

v.initLock.Lock()
v.dClient = dClient
v.dmsgHTTP = &dmsgHTTP
v.dmsgDC = dmsgDC
v.initLock.Unlock()
time.Sleep(time.Duration(len(entries)) * time.Second)
return nil
}
*/
3 changes: 2 additions & 1 deletion internal/dmsg-discovery/api/entries_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func TestEntriesEndpoint(t *testing.T) {
contentType: "application/json",
responseIsEntry: true,
entry: baseEntry,
entryPreHook: func(t *testing.T, e *disc.Entry, body *string) { //nolint
// entryPreHook: func(t *testing.T, e *disc.Entry, body *string) {
entryPreHook: func(t *testing.T, e *disc.Entry, _ *string) {
err := e.Sign(sk)
require.NoError(t, err)
},
Expand Down
6 changes: 4 additions & 2 deletions pkg/disc/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,17 @@ func TestNewMockUpdateEntriesEndpoint(t *testing.T) {
name: "update entry iteration",
responseShouldError: false,
secretKey: sk,
storerPreHook: func(apiClient disc.APIClient, e *disc.Entry) { //nolint
// storerPreHook: func(apiClient disc.APIClient, e *disc.Entry) {
storerPreHook: func(_ disc.APIClient, e *disc.Entry) {
e.Server.Address = "different one"
},
},
{
name: "update entry unauthorized",
responseShouldError: true,
secretKey: ephemeralSk1,
storerPreHook: func(apiClient disc.APIClient, e *disc.Entry) { //nolint
// storerPreHook: func(apiClient disc.APIClient, e *disc.Entry) {
storerPreHook: func(_ disc.APIClient, e *disc.Entry) {
e.Server.Address = "different one"
},
},
Expand Down
35 changes: 35 additions & 0 deletions pkg/dmsg/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,45 @@ const (
DefaultCommunityDmsgServerType = "community"
)

// DmsghttpJSON is dmsghttp-config.json embedded in skywire.DmsghttpJSON
var DmsghttpJSON = skywire.DmsghttpJSON

// DiscAddr returns the address of the dmsg discovery
func DiscAddr(testenv bool) string {
if testenv {
return skywire.Test.DmsgDiscovery
}
return skywire.Prod.DmsgDiscovery
}

// DmsghttpConfig is the struct that corresponds to the json data of the dmsghttp-config.json
type DmsghttpConfig struct {
Test struct {
DmsgServers []struct {
Static string `json:"static"`
Server struct {
Address string `json:"address"`
} `json:"server"`
} `json:"dmsg_servers"`
DmsgDiscovery string `json:"dmsg_discovery"`
TransportDiscovery string `json:"transport_discovery"`
AddressResolver string `json:"address_resolver"`
RouteFinder string `json:"route_finder"`
UptimeTracker string `json:"uptime_tracker"`
ServiceDiscovery string `json:"service_discovery"`
} `json:"test"`
Prod struct {
DmsgServers []struct {
Static string `json:"static"`
Server struct {
Address string `json:"address"`
} `json:"server"`
} `json:"dmsg_servers"`
DmsgDiscovery string `json:"dmsg_discovery"`
TransportDiscovery string `json:"transport_discovery"`
AddressResolver string `json:"address_resolver"`
RouteFinder string `json:"route_finder"`
UptimeTracker string `json:"uptime_tracker"`
ServiceDiscovery string `json:"service_discovery"`
} `json:"prod"`
}
3 changes: 2 additions & 1 deletion pkg/dmsghttp/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func ExampleMakeHTTPTransport() {
}()

r := chi.NewRouter()
r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { //nolint
// r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte("<html><body><h1>Hello World!</h1></body></html>")) //nolint:errcheck
})
go func() { _ = http.Serve(lis, r) }() //nolint
Expand Down
2 changes: 1 addition & 1 deletion pkg/dmsghttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (r httpClientResult) Assert(t *testing.T, i int) {
func startHTTPServer(t *testing.T, results chan httpServerResult, lis net.Listener) {
r := chi.NewRouter()

r.HandleFunc(endpointHTML, func(w http.ResponseWriter, r *http.Request) { //nolint
r.HandleFunc(endpointHTML, func(w http.ResponseWriter, _ *http.Request) {
result := httpServerResult{Path: endpointHTML}

n, err := w.Write(endpointHTMLData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dmsghttp/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func UpdateServers(ctx context.Context, dClient disc.APIClient, dmsgDisc string,
for {
select {
case <-ctx.Done():
return //nolint
return entries
case <-ticker.C:
servers, err := dmsgclient.AllServers(ctx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dmsgpty/cli_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func getPtySize(t *os.File) (*pty.Winsize, error) {
// prepareStdin sets stdin to raw mode and provides a function to restore the original state.
func (cli *CLI) prepareStdin() (restore func(), err error) {
var oldState *term.State
if oldState, err = term.MakeRaw(int(os.Stdin.Fd())); err != nil {
if oldState, err = term.MakeRaw(int(os.Stdin.Fd())); err != nil { //nolint
cli.Log.
WithError(err).
Warn("Failed to set stdin to raw mode.")
return
}
restore = func() {
// Attempt to restore state.
if err = term.Restore(int(os.Stdin.Fd()), oldState); err != nil {
if err = term.Restore(int(os.Stdin.Fd()), oldState); err != nil { //nolint
cli.Log.
WithError(err).
Error("Failed to restore original stdin state.")
Expand Down
Loading