diff --git a/cmd/dmsgcurl/commands/dmsgcurl.go b/cmd/dmsgcurl/commands/dmsgcurl.go index 3bc9c506..e3fed8a1 100644 --- a/cmd/dmsgcurl/commands/dmsgcurl.go +++ b/cmd/dmsgcurl/commands/dmsgcurl.go @@ -1,9 +1,8 @@ -// Package commands cmd/dmsgcurl/commands/dmsgcurl.go +// Package commands cmd/dmsgcurl/commands package commands import ( "context" - "encoding/json" "errors" "fmt" "io" @@ -18,7 +17,6 @@ import ( "time" "github.com/sirupsen/logrus" - "github.com/skycoin/skywire" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/buildinfo" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cmdutil" @@ -26,16 +24,17 @@ import ( "github.com/spf13/cobra" "golang.org/x/net/proxy" - "github.com/skycoin/dmsg/pkg/disc" + "github.com/skycoin/dmsg/internal/cli" "github.com/skycoin/dmsg/pkg/dmsg" "github.com/skycoin/dmsg/pkg/dmsghttp" ) var ( - dmsgDisc string - dmsgSessions int - dmsgcurlData string - // dmsgcurlHeader string + ctxs []context.Context + cancels []func() + dmsgDiscs []string + dmsgSessions int + dmsgcurlData string sk cipher.SecKey dmsgcurlLog *logging.Logger dmsgcurlAgent string @@ -44,54 +43,44 @@ var ( dmsgcurlWait int dmsgcurlOutput string replace bool - proxyAddr string - httpClient *http.Client + proxyAddr []string + httpClients []*http.Client dialer = proxy.Direct ) func init() { - var envServices skywire.EnvServices - var services skywire.Services - if err := json.Unmarshal([]byte(skywire.ServicesJSON), &envServices); err == nil { - if err := json.Unmarshal(envServices.Prod, &services); err == nil { - dmsgDisc = services.DmsgDiscovery - } - } - RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "c", dmsgDisc, "dmsg discovery url") - RootCmd.Flags().StringVarP(&proxyAddr, "proxy", "p", "", "connect to dmsg via proxy (i.e. '127.0.0.1:1080')") + RootCmd.Flags().StringSliceVarP(&dmsgDiscs, "dmsg-disc", "c", []string{dmsg.DiscAddr(false)}, "dmsg discovery url(s)") + RootCmd.Flags().StringSliceVarP(&proxyAddr, "proxy", "p", proxyAddr, "connect to dmsg via proxy (i.e. '127.0.0.1:1080')") RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") - RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "fatal", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") + RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "fatal", "[ debug | warn | error | fatal | panic | trace | info ]") RootCmd.Flags().StringVarP(&dmsgcurlData, "data", "d", "", "dmsghttp POST data") - // RootCmd.Flags().StringVarP(&dmsgcurlHeader, "header", "H", "", "Pass custom header(s) to server") RootCmd.Flags().StringVarP(&dmsgcurlOutput, "out", "o", "", "output filepath") - RootCmd.Flags().BoolVarP(&replace, "replace", "r", false, "replace exist file with new downloaded") + RootCmd.Flags().BoolVarP(&replace, "replace", "r", false, "replace existing file with new downloaded") RootCmd.Flags().IntVarP(&dmsgcurlTries, "try", "t", 1, "download attempts (0 unlimits)") RootCmd.Flags().IntVarP(&dmsgcurlWait, "wait", "w", 0, "time to wait between fetches") RootCmd.Flags().StringVarP(&dmsgcurlAgent, "agent", "a", "dmsgcurl/"+buildinfo.Version(), "identify as `AGENT`") if os.Getenv("DMSGCURL_SK") != "" { sk.Set(os.Getenv("DMSGCURL_SK")) //nolint } - RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") + RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified") } -// RootCmd containsa the root dmsgcurl command +// RootCmd contains the root cli command var RootCmd = &cobra.Command{ Use: func() string { return strings.Split(filepath.Base(strings.ReplaceAll(strings.ReplaceAll(fmt.Sprintf("%v", os.Args), "[", ""), "]", "")), " ")[0] }(), - Short: "DMSG curl utility", - Long: ` - ┌┬┐┌┬┐┌─┐┌─┐┌─┐┬ ┬┬─┐┬ - │││││└─┐│ ┬│ │ │├┬┘│ - ─┴┘┴ ┴└─┘└─┘└─┘└─┘┴└─┴─┘ -DMSG curl utility`, + Short: "DMSG curl utility", + Long: `DMSG curl utility`, SilenceErrors: true, SilenceUsage: true, DisableSuggestions: true, DisableFlagsInUseLine: true, Version: buildinfo.Version(), - RunE: func(_ *cobra.Command, args []string) error { + if len(dmsgDiscs) == 0 || dmsgDiscs[0] == "" { + dmsgDiscs = []string{dmsg.DiscAddr(false)} + } if dmsgcurlLog == nil { dmsgcurlLog = logging.MustGetLogger("dmsgcurl") } @@ -100,164 +89,154 @@ DMSG curl utility`, logging.SetLevel(lvl) } } - if dmsgDisc == "" { - dmsgcurlLog.Fatal("Dmsg Discovery URL not specified") - } - - ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgcurlLog) - defer cancel() - - httpClient = &http.Client{} - - if proxyAddr != "" { - // Use SOCKS5 proxy dialer if specified - dialer, err := proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct) - if err != nil { - log.Fatalf("Error creating SOCKS5 dialer: %v", err) - } - transport := &http.Transport{ - Dial: dialer.Dial, - } - httpClient = &http.Client{ - Transport: transport, + dmsgcurlLog.Debug("DMSG Discovery: ", dmsgDiscs) + for i := range dmsgDiscs { + ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgcurlLog) + defer cancel() + ctxs = append(ctxs, ctx) + cancels = append(cancels, cancel) + + httpClient := &http.Client{} + + if i < len(proxyAddr) && proxyAddr[i] != "" { + // Use SOCKS5 proxy dialer if specified + dialer, err := proxy.SOCKS5("tcp", proxyAddr[i], nil, proxy.Direct) + if err != nil { + log.Fatalf("Error creating SOCKS5 dialer: %v", err) + } + transport := &http.Transport{ + Dial: dialer.Dial, + } + httpClient = &http.Client{ + Transport: transport, + } + ctxs[i] = context.WithValue(context.Background(), "socks5_proxy", proxyAddr[i]) //nolint } - ctx = context.WithValue(context.Background(), "socks5_proxy", proxyAddr) //nolint + httpClients = append(httpClients, httpClient) } - pk, err := sk.PubKey() if err != nil { pk, sk = cipher.GenerateKeyPair() } - - u, err := parseURL(args) + if len(args) == 0 { + return errors.New("no URL(s) provided") + } + if len(args) > 1 { + return errors.New("multiple URLs is not yet supported") + } + parsedURL, err := url.Parse(args[0]) if err != nil { dmsgcurlLog.WithError(err).Fatal("failed to parse provided URL") } - if dmsgcurlData != "" { - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) - if err != nil { - dmsgcurlLog.WithError(err).Fatal("failed to start dmsg") - } - defer closeDmsg() - - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} - - req, err := http.NewRequest(http.MethodPost, u.URL.String(), strings.NewReader(dmsgcurlData)) - if err != nil { - dmsgcurlLog.WithError(err).Fatal("Failed to formulate HTTP request.") - } - req.Header.Set("Content-Type", "text/plain") - - resp, err := httpC.Do(req) - if err != nil { - dmsgcurlLog.WithError(err).Fatal("Failed to execute HTTP request.") - } - - defer func() { - if err := resp.Body.Close(); err != nil { - dmsgcurlLog.WithError(err).Fatal("Failed to close response body") + for i := range dmsgDiscs { + if dmsgcurlData != "" { + err = handlePostRequest(ctxs[i], dmsgcurlLog, pk, sk, httpClients[i], dmsgDiscs[i], dmsgSessions, parsedURL, dmsgcurlData) + if err == nil { + return nil } - }() - respBody, err := io.ReadAll(resp.Body) - if err != nil { - dmsgcurlLog.WithError(err).Fatal("Failed to read respose body.") + dmsgcurlLog.WithError(err).Debug("An error occurred") } - fmt.Println(string(respBody)) - } else { - file := os.Stdout - if dmsgcurlOutput != "" { - file, err = parseOutputFile(dmsgcurlOutput, replace) - } - if err != nil { - return fmt.Errorf("failed to prepare output file: %w", err) - } - defer func() { - if fErr := file.Close(); fErr != nil { - dmsgcurlLog.WithError(fErr).Warn("Failed to close output file.") - } - if err != nil { - if rErr := os.RemoveAll(file.Name()); rErr != nil { - dmsgcurlLog.WithError(rErr).Warn("Failed to remove output file.") - } - } - }() - - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) - if err != nil { - return fmt.Errorf("failed to start dmsg: %w", err) - } - defer closeDmsg() - - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} - - for i := 0; i < dmsgcurlTries; i++ { - if dmsgcurlOutput != "" { - dmsgcurlLog.Debugf("Download attempt %d/%d ...", i, dmsgcurlTries) - if _, err := file.Seek(0, 0); err != nil { - return fmt.Errorf("failed to reset file: %w", err) - } - } - if err := Download(ctx, dmsgcurlLog, &httpC, file, u.URL.String(), 0); err != nil { - dmsgcurlLog.WithError(err).Error() - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Duration(dmsgcurlWait) * time.Second): - continue - } - } - - // download successful. + err = handleDownload(ctxs[i], dmsgcurlLog, pk, sk, httpClients[i], dmsgDiscs[i], dmsgSessions, parsedURL) + if err == nil { return nil } - - return errors.New("all download attempts failed") - + dmsgcurlLog.WithError(err).Debug("An error occurred") } - return nil + return err }, } -// URL represents a dmsg http URL. -type URL struct { - dmsg.Addr - url.URL -} - -// Fill fills the internal fields from an URL string. -func (du *URL) fill(str string) error { - u, err := url.Parse(str) +func handlePostRequest(ctx context.Context, dmsgLogger *logging.Logger, pk cipher.PubKey, sk cipher.SecKey, httpClient *http.Client, dmsgDisc string, dmsgSessions int, parsedURL *url.URL, dmsgcurlData string) error { + dmsgC, closeDmsg, err := cli.StartDmsg(ctx, dmsgLogger, pk, sk, httpClient, dmsgDisc, dmsgSessions) if err != nil { + dmsgcurlLog.WithError(err).Warnf("Failed to start dmsg") return err } + defer closeDmsg() + + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} + req, err := http.NewRequest(http.MethodPost, parsedURL.String(), strings.NewReader(dmsgcurlData)) + if err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to formulate HTTP request.") + } + req.Header.Set("Content-Type", "text/plain") - if u.Scheme == "" { - return errors.New("URL is missing a scheme") + resp, err := httpC.Do(req) + if err != nil { + dmsgcurlLog.WithError(err).Debug("Failed to execute HTTP request") } + defer closeResponseBody(resp) - if u.Host == "" { - return errors.New("URL is missing a host") + respBody, err := io.ReadAll(resp.Body) + if err != nil { + dmsgcurlLog.WithError(err).Debug("Failed to read response body.") + return err } + fmt.Println(string(respBody)) + return nil - du.URL = *u - return du.Addr.Set(u.Host) } -func parseURL(args []string) (*URL, error) { - if len(args) == 0 { - return nil, errors.New("no URL(s) provided") +func handleDownload(ctx context.Context, dmsgLogger *logging.Logger, pk cipher.PubKey, sk cipher.SecKey, httpClient *http.Client, dmsgDisc string, dmsgSessions int, parsedURL *url.URL) error { + file, err := prepareOutputFile() + if err != nil { + return fmt.Errorf("failed to prepare output file: %w", err) + } + defer closeAndCleanFile(file, err) + + dmsgC, closeDmsg, err := cli.StartDmsg(ctx, dmsgLogger, pk, sk, httpClient, dmsgDisc, dmsgSessions) + if err != nil { + dmsgcurlLog.WithError(err).Warnf("Failed to start dmsg") + return err + } + defer closeDmsg() + + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} + + for i := 0; i < dmsgcurlTries; i++ { + if dmsgcurlOutput != "" { + dmsgcurlLog.Debugf("Download attempt %d/%d ...", i, dmsgcurlTries) + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file: %w", err) + } + } + if err := download(ctx, dmsgcurlLog, &httpC, file, parsedURL.String(), 0); err != nil { + dmsgcurlLog.WithError(err).Error() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Duration(dmsgcurlWait) * time.Second): + continue + } + } + + return nil } + return err +} - if len(args) > 1 { - return nil, errors.New("multiple URLs is not yet supported") +func prepareOutputFile() (*os.File, error) { + if dmsgcurlOutput == "" { + return os.Stdout, nil } + return parseOutputFile(dmsgcurlOutput, replace) +} - var out URL - if err := out.fill(args[0]); err != nil { - return nil, fmt.Errorf("provided URL is invalid: %w", err) +func closeAndCleanFile(file *os.File, err error) { + if fErr := file.Close(); fErr != nil { + dmsgcurlLog.WithError(fErr).Warn("Failed to close output file.") + } + if err != nil && file != os.Stdout { + if rErr := os.RemoveAll(file.Name()); rErr != nil { + dmsgcurlLog.WithError(rErr).Warn("Failed to remove output file.") + } } +} - return &out, nil +func closeResponseBody(resp *http.Response) { + if err := resp.Body.Close(); err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to close response body") + } } func parseOutputFile(output string, replace bool) (*os.File, error) { @@ -281,31 +260,7 @@ func parseOutputFile(output string, replace bool) (*os.File, error) { return nil, os.ErrExist } -func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { - dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, httpClient, dmsgcurlLog), &dmsg.Config{MinSessions: dmsgSessions}) - go dmsgC.Serve(context.Background()) - - stop = func() { - err := dmsgC.Close() - dmsgcurlLog.WithError(err).Debug("Disconnected from dmsg network.") - fmt.Printf("\n") - } - dmsgcurlLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). - Debug("Connecting to dmsg network...") - - select { - case <-ctx.Done(): - stop() - return nil, nil, ctx.Err() - - case <-dmsgC.Ready(): - dmsgcurlLog.Debug("Dmsg network ready.") - return dmsgC, stop, nil - } -} - -// Download downloads a file from the given URL into 'w'. -func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error { +func download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error { req, err := http.NewRequest(http.MethodGet, urlStr, nil) if err != nil { log.WithError(err).Fatal("Failed to formulate HTTP request.") @@ -314,20 +269,14 @@ func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w if err != nil { return fmt.Errorf("failed to connect to HTTP server: %w", err) } - if maxSize > 0 { - if resp.ContentLength > maxSize*1024 { - return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize) - } + if maxSize > 0 && resp.ContentLength > maxSize*1024 { + return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize) } - n, err := CancellableCopy(ctx, w, resp.Body, resp.ContentLength) + n, err := cancellableCopy(ctx, w, resp.Body, resp.ContentLength) if err != nil { return fmt.Errorf("download failed at %d/%dB: %w", n, resp.ContentLength, err) } - defer func() { - if err := resp.Body.Close(); err != nil { - log.WithError(err).Warn("HTTP Response body closed with non-nil error.") - } - }() + defer closeResponseBody(resp) return nil } @@ -336,38 +285,25 @@ type readerFunc func(p []byte) (n int, err error) func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) } -// CancellableCopy will call the Reader and Writer interface multiple time, in order -// to copy by chunk (avoiding loading the whole file in memory). -func CancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) { - - n, err := io.Copy(io.MultiWriter(w, &ProgressWriter{Total: length}), readerFunc(func(p []byte) (int, error) { - - // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations +func cancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) { + n, err := io.Copy(io.MultiWriter(w, &progressWriter{Total: length}), readerFunc(func(p []byte) (int, error) { select { - - // if context has been canceled case <-ctx.Done(): - // stop process and propagate "Download Canceled" error return 0, errors.New("Download Canceled") default: - // otherwise just run default io.Reader implementation return body.Read(p) } })) return n, err } -// ProgressWriter prints the progress of a download to stdout. -type ProgressWriter struct { - // atomic requires 64-bit alignment for struct field access +type progressWriter struct { Current int64 Total int64 } -// Write implements io.Writer -func (pw *ProgressWriter) Write(p []byte) (int, error) { +func (pw *progressWriter) Write(p []byte) (int, error) { n := len(p) - current := atomic.AddInt64(&pw.Current, int64(n)) total := atomic.LoadInt64(&pw.Total) pc := fmt.Sprintf("%d%%", current*100/total) @@ -379,11 +315,10 @@ func (pw *ProgressWriter) Write(p []byte) (int, error) { fmt.Print("\n") } } - return n, nil } -// Execute executes root CLI command. +// Execute executes the RootCmd func Execute() { if err := RootCmd.Execute(); err != nil { log.Fatal("Failed to execute command: ", err) diff --git a/cmd/dmsgpty-host/commands/root.go b/cmd/dmsgpty-host/commands/root.go index 6e89fbfa..62e05c4e 100644 --- a/cmd/dmsgpty-host/commands/root.go +++ b/cmd/dmsgpty-host/commands/root.go @@ -259,7 +259,7 @@ func fillConfigFromENV(conf dmsgpty.Config) (dmsgpty.Config, error) { return conf, fmt.Errorf("failed to parse dmsg port: %w", err) } - conf.DmsgPort = uint16(dmsgPort) + conf.DmsgPort = uint16(dmsgPort) //nolint } if val, ok := os.LookupEnv(envPrefix + "_CLINET"); ok { diff --git a/cmd/dmsgweb/commands/dmsgweb.go b/cmd/dmsgweb/commands/dmsgweb.go index cd93098b..2eb6e971 100644 --- a/cmd/dmsgweb/commands/dmsgweb.go +++ b/cmd/dmsgweb/commands/dmsgweb.go @@ -28,6 +28,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/net/proxy" + "github.com/skycoin/dmsg/internal/cli" dmsg "github.com/skycoin/dmsg/pkg/dmsg" "github.com/skycoin/dmsg/pkg/dmsghttp" ) @@ -61,7 +62,7 @@ func init() { RootCmd.Flags().StringVarP(&addProxy, "addproxy", "r", scriptExecString("${ADDPROXY}", dmsgwebconffile), "configure additional socks5 proxy for dmsgweb (i.e. 127.0.0.1:1080)") RootCmd.Flags().UintSliceVarP(&webPort, "port", "p", scriptExecUintSlice("${WEBPORT[@]:-8080}", dmsgwebconffile), "port(s) to serve the web application") RootCmd.Flags().StringSliceVarP(&resolveDmsgAddr, "resolve", "t", scriptExecStringSlice("${RESOLVEPK[@]}", dmsgwebconffile), "resolve the specified dmsg address:port on the local port & disable proxy") - RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "d", dmsgDisc, "dmsg discovery url") + RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", dmsgDisc, "dmsg discovery url(s)") RootCmd.Flags().StringVarP(&proxyAddr, "proxy", "x", "", "connect to dmsg via proxy (i.e. '127.0.0.1:1080')") RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", scriptExecInt("${DMSGSESSIONS:-1}", dmsgwebconffile), "number of dmsg servers to connect to") RootCmd.Flags().BoolSliceVarP(&rawTCP, "rt", "c", scriptExecBoolSlice("${RAWTCP[@]:-false}", dmsgwebconffile), "proxy local port as raw TCP") @@ -170,9 +171,6 @@ dmsgweb conf file detected: ` + dmsgwebconffile if filterDomainSuffix == "" { dmsgWebLog.Fatal("domain suffix to filter cannot be an empty string") } - if dmsgDisc == "" { - dmsgDisc = dmsg.DiscAddr(false) - } ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgWebLog) defer cancel() @@ -220,7 +218,7 @@ dmsgweb conf file detected: ` + dmsgwebconffile ctx = context.WithValue(context.Background(), "socks5_proxy", proxyAddr) //nolint } - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) + dmsgC, closeDmsg, err := cli.StartDmsg(ctx, dmsgWebLog, pk, sk, &httpC, dmsgDisc, dmsgSessions) if err != nil { dmsgWebLog.WithError(err).Fatal("failed to start dmsg") } diff --git a/cmd/dmsgweb/commands/dmsgwebsrv.go b/cmd/dmsgweb/commands/dmsgwebsrv.go index fafe1bb0..314716c1 100644 --- a/cmd/dmsgweb/commands/dmsgwebsrv.go +++ b/cmd/dmsgweb/commands/dmsgwebsrv.go @@ -37,8 +37,8 @@ func init() { srvCmd.Flags().UintSliceVarP(&localPort, "lport", "l", scriptExecUintSlice("${LOCALPORT[@]:-8086}", dmsgwebsrvconffile), "local application http interface port(s)") srvCmd.Flags().UintSliceVarP(&dmsgPort, "dport", "d", scriptExecUintSlice("${DMSGPORT[@]:-80}", dmsgwebsrvconffile), "dmsg port(s) to serve") srvCmd.Flags().StringSliceVarP(&wl, "wl", "w", scriptExecStringSlice("${WHITELISTPKS[@]}", dmsgwebsrvconffile), "whitelisted keys for dmsg authenticated routes\r") - srvCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", dmsgDisc, "dmsg discovery url") - srvCmd.Flags().StringVarP(&proxyAddr, "proxy", "x", "", "connect to dmsg via proxy (i.e. '127.0.0.1:1080')") + srvCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", dmsgDisc, "dmsg discovery url(s)") + srvCmd.Flags().StringVarP(&proxyAddr, "proxy", "x", proxyAddr, "connect to dmsg via proxy (i.e. '127.0.0.1:1080')") srvCmd.Flags().IntVarP(&dmsgSess, "dsess", "e", scriptExecInt("${DMSGSESSIONS:-1}", dmsgwebsrvconffile), "dmsg sessions") srvCmd.Flags().BoolSliceVarP(&rawTCP, "rt", "c", scriptExecBoolSlice("${RAWTCP[@]:-false}", dmsgwebsrvconffile), "proxy local port as raw TCP") if os.Getenv("DMSGWEBSRVSK") != "" { diff --git a/cmd/dmsgweb/commands/root.go b/cmd/dmsgweb/commands/root.go index f24e3517..31671b66 100644 --- a/cmd/dmsgweb/commands/root.go +++ b/cmd/dmsgweb/commands/root.go @@ -2,12 +2,10 @@ package commands import ( - "context" "fmt" "log" "net" "net/http" - "os" "runtime" "strconv" "strings" @@ -21,7 +19,6 @@ import ( "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging" "golang.org/x/net/proxy" - "github.com/skycoin/dmsg/pkg/disc" dmsg "github.com/skycoin/dmsg/pkg/dmsg" ) @@ -63,7 +60,8 @@ func Execute() { } } -func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { +/* +func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey, dmsgDisc string) (dmsgC *dmsg.Client, stop func(), err error) { dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, httpClient, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions}) go dmsgC.Serve(ctx) @@ -86,7 +84,7 @@ func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC * return dmsgC, stop, nil } } - +*/ //TODO: these functions are more or less duplicated in several places - need to standardize and put in it's own library import in "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/..." func scriptExecString(s, envfile string) string { diff --git a/internal/cli/cli.go b/internal/cli/cli.go new file mode 100644 index 00000000..1f7312fd --- /dev/null +++ b/internal/cli/cli.go @@ -0,0 +1,74 @@ +// Package cli internal/cli/cli.go +package cli + +import ( + "context" + "fmt" + "net/http" + + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging" + + "github.com/skycoin/dmsg/pkg/disc" + "github.com/skycoin/dmsg/pkg/dmsg" +) + +// StartDmsg starts dmsg returns a dmsg client for the given dmsg discovery +func StartDmsg(ctx context.Context, dmsgLogger *logging.Logger, pk cipher.PubKey, sk cipher.SecKey, httpClient *http.Client, dmsgDisc string, dmsgSessions int) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, httpClient, dmsgLogger), &dmsg.Config{MinSessions: dmsgSessions}) + go dmsgC.Serve(context.Background()) + + stop = func() { + err := dmsgC.Close() + dmsgLogger.WithError(err).Debug("Disconnected from dmsg network.") + fmt.Printf("\n") + } + dmsgLogger.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). + Debug("Connecting to dmsg network...") + select { + case <-ctx.Done(): + stop() + return nil, nil, ctx.Err() + + case <-dmsgC.Ready(): + dmsgLogger.Debug("Dmsg network ready.") + return dmsgC, stop, nil + } +} + +//TODO +/* +func startDmsgDirect(ctx context.Context, v *Visor, log *logging.Logger) error { //nolint:all + var keys cipher.PubKeys + servers := v.conf.Dmsg.Servers + + if len(servers) == 0 { + return nil + } + + keys = append(keys, v.conf.PK) + entries := direct.GetAllEntries(keys, servers) + dClient := direct.NewClient(entries, v.MasterLogger().PackageLogger("dmsg_http:direct_client")) + + dmsgDC, closeDmsgDC, err := direct.StartDmsg(ctx, v.MasterLogger().PackageLogger("dmsg_http:dmsgDC"), + v.conf.PK, v.conf.SK, dClient, dmsg.DefaultConfig()) + if err != nil { + return fmt.Errorf("failed to start dmsg: %w", err) + } + + dmsgHTTP := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgDC)} + + v.pushCloseStack("dmsg_http", func() error { + closeDmsgDC() + return nil + }) + + v.initLock.Lock() + v.dClient = dClient + v.dmsgHTTP = &dmsgHTTP + v.dmsgDC = dmsgDC + v.initLock.Unlock() + time.Sleep(time.Duration(len(entries)) * time.Second) + return nil +} +*/ diff --git a/internal/dmsg-discovery/api/entries_endpoint_test.go b/internal/dmsg-discovery/api/entries_endpoint_test.go index 4737190d..bce1d0b2 100644 --- a/internal/dmsg-discovery/api/entries_endpoint_test.go +++ b/internal/dmsg-discovery/api/entries_endpoint_test.go @@ -54,7 +54,8 @@ func TestEntriesEndpoint(t *testing.T) { contentType: "application/json", responseIsEntry: true, entry: baseEntry, - entryPreHook: func(t *testing.T, e *disc.Entry, body *string) { //nolint + // entryPreHook: func(t *testing.T, e *disc.Entry, body *string) { + entryPreHook: func(t *testing.T, e *disc.Entry, _ *string) { err := e.Sign(sk) require.NoError(t, err) }, diff --git a/pkg/disc/client_mock_test.go b/pkg/disc/client_mock_test.go index 31a95a7f..a057c3c2 100644 --- a/pkg/disc/client_mock_test.go +++ b/pkg/disc/client_mock_test.go @@ -293,7 +293,8 @@ func TestNewMockUpdateEntriesEndpoint(t *testing.T) { name: "update entry iteration", responseShouldError: false, secretKey: sk, - storerPreHook: func(apiClient disc.APIClient, e *disc.Entry) { //nolint + // storerPreHook: func(apiClient disc.APIClient, e *disc.Entry) { + storerPreHook: func(_ disc.APIClient, e *disc.Entry) { e.Server.Address = "different one" }, }, @@ -301,7 +302,8 @@ func TestNewMockUpdateEntriesEndpoint(t *testing.T) { name: "update entry unauthorized", responseShouldError: true, secretKey: ephemeralSk1, - storerPreHook: func(apiClient disc.APIClient, e *disc.Entry) { //nolint + // storerPreHook: func(apiClient disc.APIClient, e *disc.Entry) { + storerPreHook: func(_ disc.APIClient, e *disc.Entry) { e.Server.Address = "different one" }, }, diff --git a/pkg/dmsg/const.go b/pkg/dmsg/const.go index dfe8a7e5..1debdbad 100644 --- a/pkg/dmsg/const.go +++ b/pkg/dmsg/const.go @@ -22,6 +22,9 @@ const ( DefaultCommunityDmsgServerType = "community" ) +// DmsghttpJSON is dmsghttp-config.json embedded in skywire.DmsghttpJSON +var DmsghttpJSON = skywire.DmsghttpJSON + // DiscAddr returns the address of the dmsg discovery func DiscAddr(testenv bool) string { if testenv { @@ -29,3 +32,35 @@ func DiscAddr(testenv bool) string { } return skywire.Prod.DmsgDiscovery } + +// DmsghttpConfig is the struct that corresponds to the json data of the dmsghttp-config.json +type DmsghttpConfig struct { + Test struct { + DmsgServers []struct { + Static string `json:"static"` + Server struct { + Address string `json:"address"` + } `json:"server"` + } `json:"dmsg_servers"` + DmsgDiscovery string `json:"dmsg_discovery"` + TransportDiscovery string `json:"transport_discovery"` + AddressResolver string `json:"address_resolver"` + RouteFinder string `json:"route_finder"` + UptimeTracker string `json:"uptime_tracker"` + ServiceDiscovery string `json:"service_discovery"` + } `json:"test"` + Prod struct { + DmsgServers []struct { + Static string `json:"static"` + Server struct { + Address string `json:"address"` + } `json:"server"` + } `json:"dmsg_servers"` + DmsgDiscovery string `json:"dmsg_discovery"` + TransportDiscovery string `json:"transport_discovery"` + AddressResolver string `json:"address_resolver"` + RouteFinder string `json:"route_finder"` + UptimeTracker string `json:"uptime_tracker"` + ServiceDiscovery string `json:"service_discovery"` + } `json:"prod"` +} diff --git a/pkg/dmsghttp/examples_test.go b/pkg/dmsghttp/examples_test.go index 6eadbe09..a4d7b5e1 100644 --- a/pkg/dmsghttp/examples_test.go +++ b/pkg/dmsghttp/examples_test.go @@ -74,7 +74,8 @@ func ExampleMakeHTTPTransport() { }() r := chi.NewRouter() - r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { //nolint + // r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte("