diff --git a/README.md b/README.md index bec29d7..e84aed8 100644 --- a/README.md +++ b/README.md @@ -2,45 +2,54 @@ This is a lightweight daemon that allows mining to a local (or remote) kaspa node using stratum-base miners. -This daemon is confirmed working with the miners below in both dual-mining and kaspa-only modes (for those that support it) and Windows/MacOs/Linux/HiveOs. +This daemon is confirmed working with the miners below in both kaspa-only and dual-mining modes (for those that support it) on Windows/MacOs/Linux/HiveOs. * bzminer * lolminer * srbminer * teamreadminer +* IceRiver ASICs [*(setup details)*](#iceriver-asics-configuration-details) +* Bitmain ASICS -No fee, forever. Do what you want with it. +Hive setup: [detailed instructions here](docs/hive-setup.md) Discord discussions/issues: [here](https://discord.com/channels/599153230659846165/1025501807570600027) Huge shoutout to https://github.com/KaffinPX/KStratum for the inspiration -Tips appreciated: `kaspa:qp9v6090sr8jjlkq7r3f4h9un5rtfhu3raknfg3cca9eapzee57jzew0kxwlp` - -Try my 0-fee [Kaspa Pool](http://ghost-pool.io/) built with this code! - - -## Hive Setup -[detailed instructions here](hive-setup.md) +Tips appreciated: +- [@onemorebsmith](https://github.com/onemorebsmith): `kaspa:qp9v6090sr8jjlkq7r3f4h9un5rtfhu3raknfg3cca9eapzee57jzew0kxwlp` +- [@rdugan](https://github.com/rdugan): `kaspa:qrkhyhej7h0gmmvsuf8mmufget4n4xnlwx5j360sz70q7xvu0hlaxfmt9p8j8` # Features: -Shares-based work allocation with miner-like periodic stat output: +### Shares-based work allocation with miner-like periodic stat output + +``` +=============================================================================== + worker name | avg hashrate | acc/stl/inv | blocks | uptime +------------------------------------------------------------------------------- + octo12_1 | 43.36GH/s | 1183/0/0 | 1 | 53m18s + pc | 758.97MH/s | 1017/0/0 | 0 | 52m54s +------------------------------------------------------------------------------- + | 44.12GH/s | 2200/0/0 | 1 | 53m20s +========================================================== ks_bridge_v1.1.7 === +``` -![image](https://user-images.githubusercontent.com/59971111/191983487-479e19ec-a8cb-4edb-afc4-55a1165e79fc.png) +### Variable difficulty engine (vardiff) +Multiple miners with significantly different hashrates can be connected to the same stratum bridge instance, and the appropriate difficulty will automatically be decided for each one. Default settings target 20 shares/min, resulting in high confidence decisions regarding difficulty adjustments, and stable measured hashrates (1hr avg hashrates within +/- 10% of actual). -Optional monitoring UI: -https://github.com/onemorebsmith/kaspa-stratum-bridge/blob/main/monitoring-setup.md +### Optional monitoring UI -![image](https://user-images.githubusercontent.com/59971111/192025446-f20d74a5-f9e0-4290-b98b-9f56af8f23b4.png) +Detailed setup [instructions](/docs/monitoring-setup.md) -![image](https://user-images.githubusercontent.com/59971111/191980688-2d0faf6b-d551-4880-a316-de2303cfeb7d.png) +![Monitoring Dashboard](/docs/images/dashboard.png) -Prometheus API: +### Prometheus API If the app is run with the `-prom={port}` flag the application will host stats on the port specified by `{port}`, these stats are documented in the file [prom.go](src/kaspastratum/prom.go). This is intended to be use by prometheus but the stats can be fetched and used independently if desired. `curl http://localhost:2114/metrics | grep ks_` will get a listing of current stats. All published stats have a `ks_` prefix for ease of use. @@ -68,54 +77,173 @@ ks_worker_job_counter{ip="192.168.0.65",miner="BzMiner-v11.1.0",wallet="kaspa:qz ``` -# Install +# Installation -## Docker All-in-one +## Option 1: Build from source (native executable) -Note: This does requires that docker is installed. +* Install go 1.18 or later using whatever package manager is approprate for your system, or from https://go.dev/doc/install. - +* run `cd cmd/kaspabridge;go build .` + +* Modify the config file in ./cmd/bridge/config.yaml with your setup, the file comments explain the various flags + +* run `./kaspabridge` in the `cmd/kaspabridge` directory -`docker compose -f docker-compose-all.yml up -d` will run the bridge with default settings. This assumes a local kaspad node with default port settings and exposes port 5555 to incoming stratum connections. +All-in-one (build + run) `cd cmd/kaspabridge/;go build .;./kaspabridge` +## Option 2: Docker all-in-one + +*Best option for users who want access to reporting, and aren't already using Grafana/Prometheus. Requires a local copy of this repository, and docker installation.* +* [Install Docker](https://docs.docker.com/engine/install/) using the appropriate method for your OS. The docker commands below are assuming a server type installation - details may be different for a desktop installation. -This also spins up a local prometheus and grafana instance that gather stats and host the metrics dashboard. Once the services are up and running you can view the dashboard using `http://127.0.0.1:3000/d/x7cE7G74k/monitoring` +* Clone this repository using git (`git clone https://github.com/rdugan/kaspa-stratum-bridge.git`) or download and unpack the [zip file](https://github.com/rdugan/kaspa-stratum-bridge/archive/refs/heads/main.zip) -Default grafana user/pass: admin/admin +* Enter the 'kaspa-stratum-bridge' directory and type the command `docker compose -f docker-compose-all-src.yml up -d --build` [^1]. This will run the bridge assuming a local kaspad node with default port settings, and listen on port 5555 for incoming stratum connections. These settings can be updated in the [config.yaml](cmd/kaspabridge/config.yaml) file, or overridden by modifying/adding/deleting the parameters in the 'command' section of the [docker-compose-all-src.yml](docker-compose-all-src.yml) file. Additionally, Prometheus (the stats database) and Grafana (the dashboard) will be started and accessible on ports 9090 and 3000 respectively. Once all services are running, the dashboard should be reachable at with default user/pass: admin/admin -Most of the stats on the graph are averaged over an hour time period, so keep in mind that the metrics might be inaccurate for the first hour or so that the bridge is up. +[^1]: This command builds the bridge component from source, rather than the previous behavior of pulling down a pre-built image. You may still use the pre-built image by replacing 'docker-compose-all-src.yml' with 'docker-compose-all.yml', but it is not guaranteed to be up to date, so compiling from source is the better alternative. +Many of the stats on the graph are averaged over a configurable time period (24hr default - use the 'resolution' dropdown to change this), so keep in mind that the metrics might be incomplete during this initial period. -## Docker (non-compose) -Note: This does not require pulling down the repo, it only requires that docker is installed. +## Option 3: Docker bridge only -`docker run -p 5555:5555 onemorebsmith/kaspa_bridge:latest --log=false` will run the bridge with default settings. This assumes a local kaspad node with default port settings and exposes port 5555 to incoming stratum connections. +*Best option for users who want docker encapsulation, and don't need reporting, or are already using Grafana/Prometheus. Requires a local copy of this repository, and docker installation.* +* [Install Docker](https://docs.docker.com/engine/install/) using the appropriate method for your OS. The docker commands below are assuming a server type installation - details may be different for a desktop installation. -Detailed: +* Clone this repository using git (`git clone https://github.com/rdugan/kaspa-stratum-bridge.git`) or download and unpack the [zip file](https://github.com/rdugan/kaspa-stratum-bridge/archive/refs/heads/main.zip) -`docker run -p {stratum_port}:5555 onemorebsmith/kaspa_bridge --log=false --kaspa={kaspad_address} --stats={false}` will run the bridge targeting a kaspad node at {kaspad_address}. stratum port accepting connections on {stratum_port}, and only logging connection activity, found blocks, and errors +* Enter the 'kaspa-stratum-bridge' directory and type the command `docker compose -f docker-compose-bridge-src.yml up -d --build` [^2]. This will run the bridge assuming a local kaspad node with default port settings, and listen on port 5555 for incoming stratum connections. These settings can be updated in the [config.yaml](cmd/kaspabridge/config.yaml) file, or overridden by modifying/adding/deleting the parameters in the 'command' section of the [docker-compose-bridge-src.yml](docker-compose-bridge-src.yml) file. No further services will be enabled. - +[^2]: This command builds the bridge component from source, rather than the previous behavior of pulling down a pre-built image. You may still use the pre-built image by issuing the command `docker run -p 5555:5555 onemorebsmith/kaspa_bridge:latest`, but it is not guaranteed to be up to date, so compiling from source is the better alternative. -## Manual build -Install go 1.18 using whatever package manager is approprate for your system +# Configuration - +Configuration for the bridge is done via the [config.yaml](cmd/kaspabridge/config.yaml) file in the same directory as the executable, or `./cmd/kaspabridge` from the project root if building from source / using docker. Available parameters are as follows: -run `cd cmd/kaspabridge;go build .` - +``` +# stratum_listen_port: the port that will be listening for incoming stratum +# traffic +# Note `:PORT` format is needed if not specifiying a specific ip range +stratum_port: :5555 + +# kaspad_address: address/port of the rpc server for kaspad, typically 16110 +# For a list of public nodes, run `nslookup mainnet-dnsseed.daglabs-dev.com` +# uncomment for to use a public node +# kaspad_address: 46.17.104.200:16110 +kaspad_address: localhost:16110 + +# min_share_diff: only accept shares of the specified difficulty (or higher) +# from the miner(s). Higher values will reduce the number of shares submitted, +# thereby reducing network traffic and server load, while lower values will +# increase the number of shares submitted, thereby reducing the amount of time +# needed for accurate hashrate measurements +# +# If var_diff is enabled, min_share_diff will be the starting difficulty. +# +# Default value is chosen to accomodate current top of the line IceRiver ASICs. +# If you don't want to change the default to match your device(s), the vardiff +# engine will adjust to an appropriate diff for lower hashrate devices within a +# few minutes. +min_share_diff: 4096 + +# pow2_clamp: restrict difficulty to 2^n (e.g. 64, 128, 256, etc). This is +# required for IceRiver and BitMain ASICs, where difficulties further away from +# powers of 2 cause higher error rates. Using this feature will limit the +# functionality of vardiff, such that the shares_per_min becomes more of a +# minimum, rather than a target we can expect to converge on. +pow2_clamp: false + +# var_diff: if true, enables the auto-adjusting variable share diff mechanism. +# Starts with the value defined by the 'min_share_diff' setting, then checks +# every 10s whether each client is maintaining a 20 shares/minute submission +# rate, and sends an updated min diff per client if necessary. Max tolerance +# is +/- 5% after 4hrs. +var_diff: true + +# shares_per_min: number of shares per minute the vardiff engine should target. +# Default value is chosen to allow for 99% confidence in measurement accuracy, +# which affects fidelity of difficulty update decisions, as well as hashrate +# stability (measured 1hr avg hashrate should be within +/- 10% of actual, with +# the noted confidence.) Higher values will result in better vardiff engine +# performance and increased hashrate stability. Lower values will cause +# vardiff to behave more erratically, while measured hashrate will display +# larger variations. +# +# Incorrect configuration of this parameter may induce high error rates on +# IceRiver devices, so it is recommended to avoid unnecessary changes. +# +# Example values and their resulting confidence levels: +# 20 => 99%, 15 => 95%, 12 => 90% +shares_per_min: 20 + +# var_diff_stats: if true, print vardiff engine stats to the log every 10s +var_diff_stats: false + +# block_wait_time: time to wait since last new block message from kaspad before +# manually requesting a new block. Examples are '500ms', '3s', '1m', etc. +block_wait_time: 3s + +# extranonce_size: size in bytes of extranonce, from 0 (no extranonce) to 3. +# With no extranonce (0), all clients will search through the same nonce-space, +# therefore performing duplicate work unless the miner(s) implement client +# side nonce randomizing. More bytes allow for more clients with unique +# nonce-spaces (i.e. no overlapping work), but reduces the per client +# overall nonce-space (though with 1s block times, this shouldn't really +# be a concern). +# 1 byte = 256 clients, 2 bytes = 65536, 3 bytes = 16777216. +extranonce_size: 0 + +# print_stats: if true will print stats to the console, false just workers +# joining/disconnecting, blocks found, and errors will be printed +print_stats: true + +# log_to_file: if true logs will be written to a file local to the executable +log_to_file: true + +# prom_port: if specified, prometheus will serve stats on the port provided +# see readme for summary on how to get prom up and running using docker +# you can get the raw metrics (along with default golang metrics) using +# `curl http://localhost:{prom_port}/metrics` +# Note `:PORT` format is needed if not specifiying a specific ip range +prom_port: :2114 -Modify the config file in ./cmd/bridge/config.yaml with your setup, the file comments explain the various flags +``` - +Config parameters can also be specificied by command line flags, which have slightly different names (these would be added in the 'command' subsection of the 'ks_bridge' section of the appropriate 'docker-compose-*.yml' file for docker installations.) This method has precedence over the config.yaml file: -run `./kaspabridge` in the `cmd/kaspabridge` directory +``` + - '-log=true' # enable/disable logging + - '-stats=false' # include stats readout every 10s in log + - '-stratum=:5555' # port to which miners should connect + - '-prom=:2114' # port at which raw prometheus stats will be available + - '-kaspa=host.docker.internal:16110' # host/port at which kaspad node is running + - '-mindiff=64' # minimum share difficulty to accept from miner(s) + - '-vardiff=true' # enable auto-adjusting variable min diff + - '-pow2clamp=false' # limit diff to 2^n (e.g. 64, 128, 256, etc) + - '-sharespermin=20' # number of shares per minute the vardiff engine should target + - '-vardiffstats=false' # include vardiff stats readout every 10s in log + - '-extranonce=0' # size in bytes of extranonce + - '-blockwait=3s' # time in to wait before manually requesting new block + - '-hcp=' # port at which healthcheck is exposed (at path '/readyz') +``` - +## IceRiver ASICs configuration details + +IceRiver ASICs require a 2 byte extranonce (extranonce_size=2), an increased minimum share difficulty (use vardiff, or see table below), and difficulty values limited to 2^n (pow2_clamp=true). Without these settings, you may experience lower than expected hashrates and/or high invalid rates. + +It is recommended to allow the variable difficulty engine to determine the proper diff setting per client (enabled by default), but if you prefer to set a fixed difficulty, disable vardiff, and consult the following table for the recommended settings for each of the different devices (should produce minimum 20 shares/min): + +|ASIC | Min Diff | +| ----- | ---- | +|KS0 | 64 | +|KS0PRO | 128 | +|KS1 | 512 | +|KS2 | 1024 | +|KS3L/M | 2048 | +|KS3 | 4096 | -all-in-one (build + run) `cd cmd/kaspabridge/;go build .;./kaspabridge` +See previous sections for details on setting these parameters for your particular installation. diff --git a/cmd/kaspabridge/config.yaml b/cmd/kaspabridge/config.yaml index 060910a..7adac2c 100644 --- a/cmd/kaspabridge/config.yaml +++ b/cmd/kaspabridge/config.yaml @@ -1,4 +1,5 @@ -# stratum_listen_port: the port that will be listening for incoming stratum traffic +# stratum_listen_port: the port that will be listening for incoming stratum +# traffic # Note `:PORT` format is needed if not specifiying a specific ip range stratum_port: :5555 @@ -8,16 +9,56 @@ stratum_port: :5555 # kaspad_address: 46.17.104.200:16110 kaspad_address: localhost:16110 -# min_share_diff: only accept shares of the specified difficulty (or higher) from -# the miner(s). Higher values will reduce the number of shares submitted, thereby -# reducing network traffic and server load, while lower values will increase the -# number of shares submitted, thereby reducing the amount of time needed for -# accurate hashrate measurements -# min_share_diff: 4 +# min_share_diff: only accept shares of the specified difficulty (or higher) +# from the miner(s). Higher values will reduce the number of shares submitted, +# thereby reducing network traffic and server load, while lower values will +# increase the number of shares submitted, thereby reducing the amount of time +# needed for accurate hashrate measurements +# +# If var_diff is enabled, min_share_diff will be the starting difficulty. +# +# Default value is chosen to accomodate current top of the line IceRiver ASICs. +# If you don't want to change the default to match your device(s), the vardiff +# engine will adjust to an appropriate diff for lower hashrate devices within a +# few minutes. +min_share_diff: 4096 + +# pow2_clamp: restrict difficulty to 2^n (e.g. 64, 128, 256, etc). This is +# required for IceRiver and BitMain ASICs, where difficulties further away from +# powers of 2 cause higher error rates. Using this feature will limit the +# functionality of vardiff, such that the shares_per_min becomes more of a +# minimum, rather than a target we can expect to converge on. +pow2_clamp: false + +# var_diff: if true, enables the auto-adjusting variable share diff mechanism. +# Starts with the value defined by the 'min_share_diff' setting, then checks +# every 10s whether each client is maintaining a 20 shares/minute submission +# rate, and sends an updated min diff per client if necessary. Max tolerance +# is +/- 5% after 4hrs. +var_diff: true + +# shares_per_min: number of shares per minute the vardiff engine should target. +# Default value is chosen to allow for 99% confidence in measurement accuracy, +# which affects fidelity of difficulty update decisions, as well as hashrate +# stability (measured 1hr avg hashrate should be within +/- 10% of actual, with +# the noted confidence.) Higher values will result in better vardiff engine +# performance and increased hashrate stability. Lower values will cause +# vardiff to behave more erratically, while measured hashrate will display +# larger variations. +# +# Incorrect configuration of this parameter may induce high error rates on +# IceRiver devices, so it is recommended to avoid unnecessary changes. +# +# Example values and their resulting confidence levels: +# 20 => 99%, 15 => 95%, 12 => 90% +shares_per_min: 20 + +# var_diff_stats: if true, print vardiff engine stats to the log every 10s +var_diff_stats: false # block_wait_time: time to wait since last new block message from kaspad before -# manually requesting a new block -# block_wait_time: 500ms +# manually requesting a new block. Examples are '500ms', '3s', '1m', etc. +block_wait_time: 3s # extranonce_size: size in bytes of extranonce, from 0 (no extranonce) to 3. # With no extranonce (0), all clients will search through the same nonce-space, @@ -27,7 +68,7 @@ kaspad_address: localhost:16110 # overall nonce-space (though with 1s block times, this shouldn't really # be a concern). # 1 byte = 256 clients, 2 bytes = 65536, 3 bytes = 16777216. -# extranonce_size: 0 +extranonce_size: 0 # print_stats: if true will print stats to the console, false just workers # joining/disconnecting, blocks found, and errors will be printed @@ -36,7 +77,7 @@ print_stats: true # log_to_file: if true logs will be written to a file local to the executable log_to_file: true -# prom_port: if this is specified prometheus will serve stats on the port provided +# prom_port: if specified, prometheus will serve stats on the port provided # see readme for summary on how to get prom up and running using docker # you can get the raw metrics (along with default golang metrics) using # `curl http://localhost:{prom_port}/metrics` diff --git a/cmd/kaspabridge/main.go b/cmd/kaspabridge/main.go index d508418..c3e0695 100644 --- a/cmd/kaspabridge/main.go +++ b/cmd/kaspabridge/main.go @@ -6,7 +6,6 @@ import ( "log" "os" "path" - "time" "github.com/onemorebsmith/kaspastratum/src/kaspastratum" "gopkg.in/yaml.v2" @@ -30,21 +29,18 @@ func main() { flag.StringVar(&cfg.StratumPort, "stratum", cfg.StratumPort, "stratum port to listen on, default `:5555`") flag.BoolVar(&cfg.PrintStats, "stats", cfg.PrintStats, "true to show periodic stats to console, default `true`") flag.StringVar(&cfg.RPCServer, "kaspa", cfg.RPCServer, "address of the kaspad node, default `localhost:16110`") - flag.DurationVar(&cfg.BlockWaitTime, "blockwait", cfg.BlockWaitTime, "time in ms to wait before manually requesting new block, default `500`") - flag.UintVar(&cfg.MinShareDiff, "mindiff", cfg.MinShareDiff, "minimum share difficulty to accept from miner(s), default `4`") + flag.DurationVar(&cfg.BlockWaitTime, "blockwait", cfg.BlockWaitTime, "time in ms to wait before manually requesting new block, default `3s`") + flag.UintVar(&cfg.MinShareDiff, "mindiff", cfg.MinShareDiff, "minimum share difficulty to accept from miner(s), default `4096`") + flag.BoolVar(&cfg.ClampPow2, "pow2clamp", cfg.ClampPow2, "true to limit diff to powers of 2, required for IceRiver/Bitmain ASICs, default `true`") + flag.BoolVar(&cfg.VarDiff, "vardiff", cfg.VarDiff, "true to enable auto-adjusting variable min diff, default `true`") + flag.UintVar(&cfg.SharesPerMin, "sharespermin", cfg.SharesPerMin, "number of shares per minute the vardiff engine should target, default `20`") + flag.BoolVar(&cfg.VarDiffStats, "vardiffstats", cfg.VarDiffStats, "include vardiff stats readout every 10s in log, default `false`") flag.UintVar(&cfg.ExtranonceSize, "extranonce", cfg.ExtranonceSize, "size in bytes of extranonce, default `0`") flag.StringVar(&cfg.PromPort, "prom", cfg.PromPort, "address to serve prom stats, default `:2112`") flag.BoolVar(&cfg.UseLogFile, "log", cfg.UseLogFile, "if true will output errors to log file, default `true`") flag.StringVar(&cfg.HealthCheckPort, "hcp", cfg.HealthCheckPort, `(rarely used) if defined will expose a health check on /readyz, default ""`) flag.Parse() - if cfg.MinShareDiff == 0 { - cfg.MinShareDiff = 4 - } - if cfg.BlockWaitTime == 0 { - cfg.BlockWaitTime = 5 * time.Second // this should never happen due to kas 1s block times - } - log.Println("----------------------------------") log.Printf("initializing bridge") log.Printf("\tkaspad: %s", cfg.RPCServer) @@ -53,6 +49,10 @@ func main() { log.Printf("\tstats: %t", cfg.PrintStats) log.Printf("\tlog: %t", cfg.UseLogFile) log.Printf("\tmin diff: %d", cfg.MinShareDiff) + log.Printf("\tpow2 clamp: %t", cfg.ClampPow2) + log.Printf("\tvar diff: %t", cfg.VarDiff) + log.Printf("\tshares per min: %d", cfg.SharesPerMin) + log.Printf("\tvar diff stats: %t", cfg.VarDiffStats) log.Printf("\tblock wait: %s", cfg.BlockWaitTime) log.Printf("\textranonce size: %d", cfg.ExtranonceSize) log.Printf("\thealth check: %s", cfg.HealthCheckPort) diff --git a/docker-compose-all-src.yml b/docker-compose-all-src.yml new file mode 100644 index 0000000..0da443f --- /dev/null +++ b/docker-compose-all-src.yml @@ -0,0 +1,42 @@ +volumes: + prometheus_data: {} +services: + ks_bridge: + build: + context: . + no_cache: true + container_name: ks_bridge + restart: unless-stopped + user: "0" + command: + - '-stats=false' + - '-kaspa=host.docker.internal:16110' + ports: + - 5555:5555 + - 2114:2114 + extra_hosts: + - host.docker.internal:host-gateway + grafana: + image: grafana/grafana-oss:latest + container_name: ks_grafana + restart: unless-stopped + user: "0" + volumes: + - ./docker/grafana:/var/lib/grafana +# env_file: +# - ./docker/grafana.env + ports: + - 3000:3000 + extra_hosts: + - host.docker.internal:host-gateway + prometheus: + image: prom/prometheus:latest + container_name: ks_prom + restart: unless-stopped + volumes: + - prometheus_data:/prometheus + - ./docker/prometheus-internal.yml:/etc/prometheus/prometheus.yml + ports: + - 9090:9090 + extra_hosts: + - host.docker.internal:host-gateway diff --git a/docker-compose-all.yml b/docker-compose-all.yml index 1d04811..fd8b896 100644 --- a/docker-compose-all.yml +++ b/docker-compose-all.yml @@ -12,6 +12,8 @@ services: - '-stratum=:5555' - '-prom=:2114' - '-kaspa=host.docker.internal:16110' + - '-mindiff=64' + - '-vardiff=true' ports: - 5555:5555 - 2114:2114 diff --git a/docker-compose-bridge-src.yml b/docker-compose-bridge-src.yml new file mode 100644 index 0000000..0ad2fb5 --- /dev/null +++ b/docker-compose-bridge-src.yml @@ -0,0 +1,16 @@ +services: + ks_bridge: + build: + context: . + no_cache: true + container_name: ks_bridge + restart: unless-stopped + user: "0" + command: + - '-stats=false' + - '-kaspa=host.docker.internal:16110' + ports: + - 5555:5555 + - 2114:2114 + extra_hosts: + - host.docker.internal:host-gateway diff --git a/docker/grafana/grafana.db b/docker/grafana/grafana.db index 3d5e5f0..b23d9e1 100644 Binary files a/docker/grafana/grafana.db and b/docker/grafana/grafana.db differ diff --git a/hive-setup.md b/docs/hive-setup.md similarity index 100% rename from hive-setup.md rename to docs/hive-setup.md diff --git a/docs/images/dashboard.png b/docs/images/dashboard.png new file mode 100644 index 0000000..2c20a30 Binary files /dev/null and b/docs/images/dashboard.png differ diff --git a/docs/images/dashboard_no_data.png b/docs/images/dashboard_no_data.png new file mode 100644 index 0000000..9ebeec0 Binary files /dev/null and b/docs/images/dashboard_no_data.png differ diff --git a/monitoring-setup.md b/docs/monitoring-setup.md similarity index 59% rename from monitoring-setup.md rename to docs/monitoring-setup.md index 0cff872..7102b35 100644 --- a/monitoring-setup.md +++ b/docs/monitoring-setup.md @@ -1,6 +1,4 @@ -# Configuring monitoring (Grafana + Prom) - -Easiest setup, [join my 0-fee solo pool](http://grafana.ghost-pool.io/d/x7cE7G74k/pool-monitoring?orgId=1&refresh=5s) +# Configuring monitoring (Grafana + Prometheus) ## Reqirements @@ -23,7 +21,7 @@ At this point if you can not progress without docker installed. Go install it if For this example I'll be running everything in docker -- including the bridge. So type the following from the root folder to stand up everything: -`docker compose -f docker-compose-all.yml up -d` +`docker compose -f docker-compose-all-src.yml up -d --build` Youll see output about downloading images and such and eventually see output like below: @@ -39,20 +37,16 @@ You may point your miners the IP address of the computer you installed on at por ## Accessing grafana -Assuming the setup went correctly you'll be able to access grafana by visiting http://127.0.0.1:3000 +Assuming the setup went correctly you'll be able to access grafana by visiting ![image](https://user-images.githubusercontent.com/59971111/192024515-dd487a3a-3d15-4d21-bfbf-189b2db69782.png) The default user/password is admin/admin. Grafana will prompt you to change the password but you can just ignore it (hit skip). -You'll land on the main grafana page. There's a lot you can do here but for now just click the pre-made dashboard: - -![image](https://user-images.githubusercontent.com/59971111/192024840-f8ebd4b0-dda0-4249-b4da-3a971baf9836.png) - -This will drop you to the mining dashboard. It'll look like below until you start getting info from your miners. +You will then be redirected to the mining dashboard. It'll look like below until you start getting info from your miners. -![image](https://user-images.githubusercontent.com/59971111/192024903-ed629405-ac6f-4263-8005-8863399d227a.png) +![Monitoring Dashboard Without Data](/docs/images/dashboard_no_data.png) -At this point you're configured and good to go. Things to note here are that the stats will be inaccurate for the first hour or so that the bridge is running as most of the stats are based on 10-30m averages. Also note that there is a 'wallet_filter' and 'show_balances' toggle near the top of the screen. These filter the database and hide your balance if you don't want that exposed. The monitoring UI is also accessable on any device on your local network (including your phone!) if you use the host computers ip address -- just type in the ip and port such as `http://192.168.0.25/3000` (this is an example, this exact link probablly wont work for you) +At this point you're configured and good to go. Many of the stats on the graph are averaged over a configurable time period (24hr default - use the 'resolution' dropdown on the top left of the page to change this), so keep in mind that the metrics might be incomplete during this initial period. Also note that there are 'wallet_filter' and 'show_balances' dropdowns as well. These filter the database and hide your balance if you don't want that exposed. The monitoring UI is also accessable on any device on your local network (including your phone!) if you use the host computers ip address -- just type in the ip and port such as `http://192.168.0.25/3000` (this is an example, this exact link probablly wont work for you) diff --git a/go.mod b/go.mod index 1ccb673..8e9806f 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,14 @@ go 1.18 require ( github.com/google/go-cmp v0.5.8 - github.com/kaspanet/kaspad v0.12.7 + github.com/google/uuid v1.3.0 + github.com/kaspanet/kaspad v0.12.15 github.com/mattn/go-colorable v0.1.13 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.13.0 + go.uber.org/atomic v1.7.0 go.uber.org/zap v1.23.0 - golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 + golang.org/x/crypto v0.11.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -18,18 +20,16 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/jrick/logrotate v1.0.0 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect - golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/net v0.12.0 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.11.0 // indirect google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08 // indirect google.golang.org/grpc v1.38.0 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/go.sum b/go.sum index 6944b06..ad1fb04 100644 --- a/go.sum +++ b/go.sum @@ -148,8 +148,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kaspanet/go-muhash v0.0.4 h1:CQrm1RTJpQy+h4ZFjj9qq42K5fmA5QTGifzb47p4qWk= github.com/kaspanet/go-secp256k1 v0.0.7 h1:WHnrwopKB6ZeHSbdAwwxNhTqflm56XT1mM6LF4/OvOs= -github.com/kaspanet/kaspad v0.12.7 h1:ptppiM3nSkMAqumc/TDxD/qoTdk7rUujzA8GqEpL05o= -github.com/kaspanet/kaspad v0.12.7/go.mod h1:5fH29a2ZIeET3GDkBqAN9Yk7tOl9mYteNkOlw3F9kMA= +github.com/kaspanet/kaspad v0.12.15 h1:UytF5bEx0cZfbwc63igsL3ltA+gtoHUxM1b3Sq5uAMo= +github.com/kaspanet/kaspad v0.12.15/go.mod h1:7p8WgqEpdLQybHt71P7zx47Ob7kKJnwYWzKu4wQvxsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -237,8 +237,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM= -golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -302,8 +302,9 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -361,8 +362,9 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -372,8 +374,9 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/make_release.sh b/make_release.sh index 320ff34..d153043 100755 --- a/make_release.sh +++ b/make_release.sh @@ -2,7 +2,7 @@ CMD_PATH="../cmd/kaspabridge" rm -rf release mkdir -p release cd release -VERSION=1.1.6 +VERSION=1.2.2 ARCHIVE="ks_bridge-${VERSION}" OUTFILE="ks_bridge" OUTDIR="ks_bridge" diff --git a/src/gostratum/default_client.go b/src/gostratum/default_client.go index 908ebce..6bccecd 100644 --- a/src/gostratum/default_client.go +++ b/src/gostratum/default_client.go @@ -4,20 +4,25 @@ import ( "fmt" "regexp" "strings" + "time" "github.com/kaspanet/kaspad/util" "github.com/mattn/go-colorable" + "github.com/onemorebsmith/kaspastratum/src/utils" "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +var bitmainRegex = regexp.MustCompile(".*(GodMiner).*") + type StratumMethod string const ( - StratumMethodSubscribe StratumMethod = "mining.subscribe" - StratumMethodAuthorize StratumMethod = "mining.authorize" - StratumMethodSubmit StratumMethod = "mining.submit" + StratumMethodSubscribe StratumMethod = "mining.subscribe" + StratumMethodExtranonceSubscribe StratumMethod = "mining.extranonce.subscribe" + StratumMethodAuthorize StratumMethod = "mining.authorize" + StratumMethodSubmit StratumMethod = "mining.submit" ) func DefaultLogger() *zap.Logger { @@ -25,7 +30,7 @@ func DefaultLogger() *zap.Logger { cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder return zap.New(zapcore.NewCore( zapcore.NewConsoleEncoder(cfg), - zapcore.AddSync(colorable.NewColorableStdout()), + &utils.BufferedWriteSyncer{WS: zapcore.AddSync(colorable.NewColorableStdout()), FlushInterval: 5 * time.Second}, zapcore.DebugLevel, )) } @@ -41,9 +46,10 @@ func DefaultConfig(logger *zap.Logger) StratumListenerConfig { func DefaultHandlers() StratumHandlerMap { return StratumHandlerMap{ - string(StratumMethodSubscribe): HandleSubscribe, - string(StratumMethodAuthorize): HandleAuthorize, - string(StratumMethodSubmit): HandleSubmit, + string(StratumMethodSubscribe): HandleSubscribe, + string(StratumMethodExtranonceSubscribe): HandleExtranonceSubscribe, + string(StratumMethodAuthorize): HandleAuthorize, + string(StratumMethodSubmit): HandleSubmit, } } @@ -83,21 +89,38 @@ func HandleAuthorize(ctx *StratumContext, event JsonRpcEvent) error { } func HandleSubscribe(ctx *StratumContext, event JsonRpcEvent) error { - if err := ctx.Reply(NewResponse(event, - []any{true, "EthereumStratum/1.0.0"}, nil)); err != nil { - return errors.Wrap(err, "failed to send response to subscribe") - } if len(event.Params) > 0 { app, ok := event.Params[0].(string) if ok { ctx.RemoteApp = app } } + var err error + if bitmainRegex.MatchString(ctx.RemoteApp) { + err = ctx.Reply(NewResponse(event, + []any{nil, ctx.Extranonce, 8 - (len(ctx.Extranonce) / 2)}, nil)) + } else { + err = ctx.Reply(NewResponse(event, + []any{true, "EthereumStratum/1.0.0"}, nil)) + } + if err != nil { + return errors.Wrap(err, "failed to send response to subscribe") + } ctx.Logger.Info("client subscribed ", zap.Any("context", ctx)) return nil } +func HandleExtranonceSubscribe(ctx *StratumContext, event JsonRpcEvent) error { + err := ctx.Reply(NewResponse(event, true, nil)) + if err != nil { + return errors.Wrap(err, "failed to send response to extranonce subscribe") + } + + ctx.Logger.Info("client subscribed to extranonce ", zap.Any("context", ctx)) + return nil +} + func HandleSubmit(ctx *StratumContext, event JsonRpcEvent) error { // stub ctx.Logger.Info("work submission") @@ -105,7 +128,13 @@ func HandleSubmit(ctx *StratumContext, event JsonRpcEvent) error { } func SendExtranonce(ctx *StratumContext) { - if err := ctx.Send(NewEvent("", "set_extranonce", []any{ctx.Extranonce})); err != nil { + var err error + if bitmainRegex.MatchString(ctx.RemoteApp) { + err = ctx.Send(NewEvent("", "mining.set_extranonce", []any{ctx.Extranonce, 8 - (len(ctx.Extranonce) / 2)})) + } else { + err = ctx.Send(NewEvent("", "set_extranonce", []any{ctx.Extranonce})) + } + if err != nil { // should we doing anything further on failure ctx.Logger.Error(errors.Wrap(err, "failed to set extranonce").Error(), zap.Any("context", ctx)) } diff --git a/src/gostratum/stratum_client.go b/src/gostratum/stratum_client.go index 6b4a1dd..78da4bc 100644 --- a/src/gostratum/stratum_client.go +++ b/src/gostratum/stratum_client.go @@ -17,6 +17,7 @@ func spawnClientListener(ctx *StratumContext, connection net.Conn, s *StratumLis for { err := readFromConnection(connection, func(line string) error { + // ctx.Logger.Info("client message: ", zap.String("raw", line)) event, err := UnmarshalEvent(line) if err != nil { ctx.Logger.Error("error unmarshalling event", zap.String("raw", line)) diff --git a/src/gostratum/stratum_context.go b/src/gostratum/stratum_context.go index 507d1b6..c21c776 100644 --- a/src/gostratum/stratum_context.go +++ b/src/gostratum/stratum_context.go @@ -16,6 +16,7 @@ import ( type StratumContext struct { parentContext context.Context RemoteAddr string + RemotePort int WalletAddr string WorkerName string RemoteApp string @@ -31,6 +32,7 @@ type StratumContext struct { type ContextSummary struct { RemoteAddr string + RemotePort int WalletAddr string WorkerName string RemoteApp string @@ -45,6 +47,7 @@ func (sc *StratumContext) Connected() bool { func (sc *StratumContext) Summary() ContextSummary { return ContextSummary{ RemoteAddr: sc.RemoteAddr, + RemotePort: sc.RemotePort, WalletAddr: sc.WalletAddr, WorkerName: sc.WorkerName, RemoteApp: sc.RemoteApp, @@ -57,6 +60,7 @@ func NewMockContext(ctx context.Context, logger *zap.Logger, state any) (*Stratu parentContext: ctx, State: state, RemoteAddr: "127.0.0.1", + RemotePort: 50001, WalletAddr: uuid.NewString(), WorkerName: uuid.NewString(), RemoteApp: "mock.context", diff --git a/src/gostratum/stratum_listener.go b/src/gostratum/stratum_listener.go index edb3cb5..89a8375 100644 --- a/src/gostratum/stratum_listener.go +++ b/src/gostratum/stratum_listener.go @@ -88,6 +88,7 @@ func (s *StratumListener) Listen(ctx context.Context) error { func (s *StratumListener) newClient(ctx context.Context, connection net.Conn) { addr := connection.RemoteAddr().String() + port := connection.RemoteAddr().(*net.TCPAddr).Port parts := strings.Split(addr, ":") if len(parts) > 0 { addr = parts[0] // trim off the port @@ -95,6 +96,7 @@ func (s *StratumListener) newClient(ctx context.Context, connection net.Conn) { clientContext := &StratumContext{ parentContext: ctx, RemoteAddr: addr, + RemotePort: port, Logger: s.Logger.With(zap.String("client", addr)), connection: connection, State: s.StateGenerator(), diff --git a/src/kaspastratum/client_handler.go b/src/kaspastratum/client_handler.go index 161f7c8..dd834c4 100644 --- a/src/kaspastratum/client_handler.go +++ b/src/kaspastratum/client_handler.go @@ -14,7 +14,7 @@ import ( "go.uber.org/zap" ) -var bigJobRegex = regexp.MustCompile(".*BzMiner.*") +var bigJobRegex = regexp.MustCompile(".*(BzMiner|IceRiverMiner).*") const balanceDelay = time.Minute @@ -125,17 +125,19 @@ func (c *clientListener) NewBlockAvailable(kapi *KaspaApi) { if !state.initialized { state.initialized = true state.useBigJob = bigJobRegex.MatchString(client.RemoteApp) - // first pass through send the difficulty since it's fixed + // first pass through send config/default difficulty state.stratumDiff = newKaspaDiff() state.stratumDiff.setDiffValue(c.minShareDiff) - if err := client.Send(gostratum.JsonRpcEvent{ - Version: "2.0", - Method: "mining.set_difficulty", - Params: []any{state.stratumDiff.diffValue}, - }); err != nil { - RecordWorkerError(client.WalletAddr, ErrFailedSetDiff) - client.Logger.Error(errors.Wrap(err, "failed sending difficulty").Error(), zap.Any("context", client)) - return + sendClientDiff(client, state) + c.shareHandler.setClientVardiff(client, c.minShareDiff) + } else { + varDiff := c.shareHandler.getClientVardiff(client) + if varDiff != state.stratumDiff.diffValue && varDiff != 0 { + // send updated vardiff + client.Logger.Info(fmt.Sprintf("changing diff from %f to %f", state.stratumDiff.diffValue, varDiff)) + state.stratumDiff.setDiffValue(varDiff) + sendClientDiff(client, state) + c.shareHandler.startClientVardiff(client) } } @@ -185,3 +187,16 @@ func (c *clientListener) NewBlockAvailable(kapi *KaspaApi) { } } } + +func sendClientDiff(client *gostratum.StratumContext, state *MiningState) { + if err := client.Send(gostratum.JsonRpcEvent{ + Version: "2.0", + Method: "mining.set_difficulty", + Params: []any{state.stratumDiff.diffValue}, + }); err != nil { + RecordWorkerError(client.WalletAddr, ErrFailedSetDiff) + client.Logger.Error(errors.Wrap(err, "failed sending difficulty").Error()) + return + } + client.Logger.Info(fmt.Sprintf("Setting client diff: %f", state.stratumDiff.diffValue)) +} diff --git a/src/kaspastratum/kaspaapi.go b/src/kaspastratum/kaspaapi.go index f5588e9..2f6ec9c 100644 --- a/src/kaspastratum/kaspaapi.go +++ b/src/kaspastratum/kaspaapi.go @@ -99,14 +99,8 @@ func (s *KaspaApi) waitForSync(verbose bool) error { } func (s *KaspaApi) startBlockTemplateListener(ctx context.Context, blockReadyCb func()) { - blockReadyChan := make(chan bool) - err := s.kaspad.RegisterForNewBlockTemplateNotifications(func(_ *appmessage.NewBlockTemplateNotificationMessage) { - blockReadyChan <- true - }) - if err != nil { - s.logger.Error("fatal: failed to register for block notifications from kaspa") - } - + var blockReadyChan chan bool + restartChannel := true ticker := time.NewTicker(s.blockWaitTime) for { if err := s.waitForSync(false); err != nil { @@ -115,6 +109,18 @@ func (s *KaspaApi) startBlockTemplateListener(ctx context.Context, blockReadyCb s.logger.Error("error reconnecting to kaspad, waiting before retry: ", err) time.Sleep(5 * time.Second) } + restartChannel = true + } + if restartChannel { + blockReadyChan = make(chan bool) + err := s.kaspad.RegisterForNewBlockTemplateNotifications(func(_ *appmessage.NewBlockTemplateNotificationMessage) { + blockReadyChan <- true + }) + if err != nil { + s.logger.Error("fatal: failed to register for block notifications from kaspa") + } else { + restartChannel = false + } } select { case <-ctx.Done(): diff --git a/src/kaspastratum/mining_state.go b/src/kaspastratum/mining_state.go index e50e605..560a996 100644 --- a/src/kaspastratum/mining_state.go +++ b/src/kaspastratum/mining_state.go @@ -12,21 +12,23 @@ import ( const maxjobs = 32 type MiningState struct { - Jobs map[int]*appmessage.RPCBlock + Jobs map[uint64]*appmessage.RPCBlock JobLock sync.Mutex - jobCounter int + jobCounter uint64 bigDiff big.Int initialized bool useBigJob bool connectTime time.Time stratumDiff *kaspaDiff + maxJobs uint8 } func MiningStateGenerator() any { return &MiningState{ - Jobs: map[int]*appmessage.RPCBlock{}, + Jobs: make(map[uint64]*appmessage.RPCBlock, maxjobs), JobLock: sync.Mutex{}, connectTime: time.Now(), + maxJobs: maxjobs, } } @@ -34,16 +36,16 @@ func GetMiningState(ctx *gostratum.StratumContext) *MiningState { return ctx.State.(*MiningState) } -func (ms *MiningState) AddJob(job *appmessage.RPCBlock) int { +func (ms *MiningState) AddJob(job *appmessage.RPCBlock) uint64 { + ms.JobLock.Lock() ms.jobCounter++ idx := ms.jobCounter - ms.JobLock.Lock() ms.Jobs[idx%maxjobs] = job ms.JobLock.Unlock() return idx } -func (ms *MiningState) GetJob(id int) (*appmessage.RPCBlock, bool) { +func (ms *MiningState) GetJob(id uint64) (*appmessage.RPCBlock, bool) { ms.JobLock.Lock() job, exists := ms.Jobs[id%maxjobs] ms.JobLock.Unlock() diff --git a/src/kaspastratum/prom.go b/src/kaspastratum/prom.go index fb55375..4f22761 100644 --- a/src/kaspastratum/prom.go +++ b/src/kaspastratum/prom.go @@ -120,7 +120,7 @@ func RecordBlockFound(worker *gostratum.StratumContext, nonce, bluescore uint64, labels := commonLabels(worker) labels["nonce"] = fmt.Sprintf("%d", nonce) labels["bluescore"] = fmt.Sprintf("%d", bluescore) - labels["hash"] = fmt.Sprintf("%d", hash) + labels["hash"] = hash blockGauge.With(labels).Set(1) } @@ -152,15 +152,15 @@ func InitInvalidCounter(worker *gostratum.StratumContext, errorType string) { } func InitWorkerCounters(worker *gostratum.StratumContext) { - labels := commonLabels(worker) + labels := commonLabels(worker) shareCounter.With(labels).Add(0) shareDiffCounter.With(labels).Add(0) - errTypes := []string{"stale", "duplicate", "invalid", "weak"} - for _, e := range errTypes { - InitInvalidCounter(worker, e) - } + errTypes := []string{"stale", "duplicate", "invalid", "weak"} + for _, e := range errTypes { + InitInvalidCounter(worker, e) + } blockCounter.With(labels).Add(0) diff --git a/src/kaspastratum/prom_test.go b/src/kaspastratum/prom_test.go index aa95db1..0e557a7 100644 --- a/src/kaspastratum/prom_test.go +++ b/src/kaspastratum/prom_test.go @@ -12,7 +12,7 @@ func TestPromValid(t *testing.T) { // is valid to write to here ctx := gostratum.StratumContext{} - RecordShareFound(&ctx) + RecordShareFound(&ctx, 1000.1001) RecordStaleShare(&ctx) RecordDupeShare(&ctx) RecordInvalidShare(&ctx) diff --git a/src/kaspastratum/share_handler.go b/src/kaspastratum/share_handler.go index b1dba7b..1179364 100644 --- a/src/kaspastratum/share_handler.go +++ b/src/kaspastratum/share_handler.go @@ -3,6 +3,8 @@ package kaspastratum import ( "fmt" "log" + "math" + "os" "sort" "strconv" "strings" @@ -15,20 +17,27 @@ import ( "github.com/kaspanet/kaspad/domain/consensus/utils/pow" "github.com/kaspanet/kaspad/infrastructure/network/rpcclient" "github.com/onemorebsmith/kaspastratum/src/gostratum" + "github.com/onemorebsmith/kaspastratum/src/utils" "github.com/pkg/errors" "go.uber.org/atomic" "go.uber.org/zap" ) +const varDiffThreadSleep = 10 + type WorkStats struct { - BlocksFound atomic.Int64 - SharesFound atomic.Int64 - SharesDiff atomic.Float64 - StaleShares atomic.Int64 - InvalidShares atomic.Int64 - WorkerName string - StartTime time.Time - LastShare time.Time + BlocksFound atomic.Int64 + SharesFound atomic.Int64 + SharesDiff atomic.Float64 + StaleShares atomic.Int64 + InvalidShares atomic.Int64 + WorkerName string + StartTime time.Time + LastShare time.Time + VarDiffStartTime time.Time + VarDiffSharesFound atomic.Int64 + VarDiffWindow int + MinDiff atomic.Float64 } type shareHandler struct { @@ -54,12 +63,13 @@ func (sh *shareHandler) getCreateStats(ctx *gostratum.StratumContext) *WorkStats if ctx.WorkerName != "" { stats, found = sh.stats[ctx.WorkerName] } + workerId := fmt.Sprintf("%s:%d", ctx.RemoteAddr, ctx.RemotePort) if !found { // no worker name, check by remote address - stats, found = sh.stats[ctx.RemoteAddr] + stats, found = sh.stats[workerId] if found { // no worker name, but remote addr is there - // so replacet the remote addr with the worker names - delete(sh.stats, ctx.RemoteAddr) + // so replace the remote addr with the worker names + delete(sh.stats, workerId) stats.WorkerName = ctx.WorkerName sh.stats[ctx.WorkerName] = stats } @@ -67,9 +77,9 @@ func (sh *shareHandler) getCreateStats(ctx *gostratum.StratumContext) *WorkStats if !found { // legit doesn't exist, create it stats = &WorkStats{} stats.LastShare = time.Now() - stats.WorkerName = ctx.RemoteAddr + stats.WorkerName = workerId stats.StartTime = time.Now() - sh.stats[ctx.RemoteAddr] = stats + sh.stats[workerId] = stats // TODO: not sure this is the best place, nor whether we shouldn't be // resetting on disconnect @@ -81,13 +91,13 @@ func (sh *shareHandler) getCreateStats(ctx *gostratum.StratumContext) *WorkStats } type submitInfo struct { + jobId uint64 block *appmessage.RPCBlock - state *MiningState noncestr string nonceVal uint64 } -func validateSubmit(ctx *gostratum.StratumContext, event gostratum.JsonRpcEvent) (*submitInfo, error) { +func validateSubmit(ctx *gostratum.StratumContext, state *MiningState, event gostratum.JsonRpcEvent) (*submitInfo, error) { if len(event.Params) < 3 { RecordWorkerError(ctx.WalletAddr, ErrBadDataFromMiner) return nil, fmt.Errorf("malformed event, expected at least 2 params") @@ -97,13 +107,12 @@ func validateSubmit(ctx *gostratum.StratumContext, event gostratum.JsonRpcEvent) RecordWorkerError(ctx.WalletAddr, ErrBadDataFromMiner) return nil, fmt.Errorf("unexpected type for param 1: %+v", event.Params...) } - jobId, err := strconv.ParseInt(jobIdStr, 10, 0) + jobId, err := strconv.ParseUint(jobIdStr, 10, 0) if err != nil { RecordWorkerError(ctx.WalletAddr, ErrBadDataFromMiner) return nil, errors.Wrap(err, "job id is not parsable as an number") } - state := GetMiningState(ctx) - block, exists := state.GetJob(int(jobId)) + block, exists := state.GetJob(jobId) if !exists { RecordWorkerError(ctx.WalletAddr, ErrMissingJob) return nil, fmt.Errorf("job does not exist. stale?") @@ -114,7 +123,7 @@ func validateSubmit(ctx *gostratum.StratumContext, event gostratum.JsonRpcEvent) return nil, fmt.Errorf("unexpected type for param 2: %+v", event.Params...) } return &submitInfo{ - state: state, + jobId: jobId, block: block, noncestr: strings.Replace(noncestr, "0x", "", 1), }, nil @@ -125,7 +134,7 @@ var ( ErrDupeShare = fmt.Errorf("duplicate share") ) -// the max difference between tip blue score and job blue score that we'll accept +// max difference between tip blue score and job blue score that we'll accept // anything greater than this is considered a stale const workWindow = 8 @@ -144,7 +153,10 @@ func (sh *shareHandler) checkStales(ctx *gostratum.StratumContext, si *submitInf } func (sh *shareHandler) HandleSubmit(ctx *gostratum.StratumContext, event gostratum.JsonRpcEvent) error { - submitInfo, err := validateSubmit(ctx, event) + state := GetMiningState(ctx) + maxJobs := uint64(state.maxJobs) + + submitInfo, err := validateSubmit(ctx, state, event) if err != nil { return err } @@ -159,7 +171,6 @@ func (sh *shareHandler) HandleSubmit(ctx *gostratum.StratumContext, event gostra } //ctx.Logger.Debug(submitInfo.block.Header.BlueScore, " submit ", submitInfo.noncestr) - state := GetMiningState(ctx) if state.useBigJob { submitInfo.nonceVal, err = strconv.ParseUint(submitInfo.noncestr, 16, 64) if err != nil { @@ -173,7 +184,69 @@ func (sh *shareHandler) HandleSubmit(ctx *gostratum.StratumContext, event gostra return errors.Wrap(err, "failed parsing noncestr") } } + + jobId := submitInfo.jobId + block := submitInfo.block + var invalidShare bool + for { + converted, err := appmessage.RPCBlockToDomainBlock(block) + if err != nil { + return fmt.Errorf("failed to cast block to mutable block: %+v", err) + } + mutableHeader := converted.Header.ToMutable() + mutableHeader.SetNonce(submitInfo.nonceVal) + powState := pow.NewState(mutableHeader) + powValue := powState.CalculateProofOfWorkValue() + + // The block hash must be less or equal than the claimed target. + if powValue.Cmp(&powState.Target) <= 0 { + // block? + if err := sh.submit(ctx, converted, submitInfo.nonceVal, event.Id); err != nil { + return err + } + invalidShare = false + break + } else if powValue.Cmp(state.stratumDiff.targetValue) >= 0 { + // invalid share, but don't record it yet + if jobId == submitInfo.jobId { + ctx.Logger.Warn(fmt.Sprintf("low diff share... checking for bad job ID (%d)", jobId)) + invalidShare = true + } + + // stupid hack for busted ass IceRiver/Bitmain ASICs. Need to loop + // through job history because they submit jobs with incorrect IDs + if jobId == 1 || jobId%maxJobs == submitInfo.jobId%maxJobs+1 { + // exhausted all previous blocks + break + } else { + var exists bool + jobId-- + block, exists = state.GetJob(jobId) + if !exists { + // just exit loop - bad share will be recorded + break + } + } + } else { + // valid share + if invalidShare { + ctx.Logger.Warn(fmt.Sprintf("found correct job ID: %d", jobId)) + invalidShare = false + } + break + } + } + stats := sh.getCreateStats(ctx) + + if invalidShare { + ctx.Logger.Warn("low diff share confirmed") + stats.InvalidShares.Add(1) + sh.overall.InvalidShares.Add(1) + RecordWeakShare(ctx) + return ctx.ReplyLowDiffShare(event.Id) + } + // if err := sh.checkStales(ctx, submitInfo); err != nil { // if err == ErrDupeShare { // ctx.Logger.Info("dupe share "+submitInfo.noncestr, ctx.WorkerName, ctx.WalletAddr) @@ -190,30 +263,8 @@ func (sh *shareHandler) HandleSubmit(ctx *gostratum.StratumContext, event gostra // ctx.Logger.Error("unknown error during check stales: ", err.Error()) // return ctx.ReplyBadShare(event.Id) // } - - converted, err := appmessage.RPCBlockToDomainBlock(submitInfo.block) - if err != nil { - return fmt.Errorf("failed to cast block to mutable block: %+v", err) - } - mutableHeader := converted.Header.ToMutable() - mutableHeader.SetNonce(submitInfo.nonceVal) - powState := pow.NewState(mutableHeader) - powValue := powState.CalculateProofOfWorkValue() - - // The block hash must be less or equal than the claimed target. - if powValue.Cmp(&powState.Target) <= 0 { - if err := sh.submit(ctx, converted, submitInfo.nonceVal, event.Id); err != nil { - return err - } - } - // remove for now until I can figure it out. No harm here as we're not - // } else if powValue.Cmp(state.stratumDiff.targetValue) >= 0 { - // ctx.Logger.Warn("weak block") - // RecordWeakShare(ctx) - // return ctx.ReplyLowDiffShare(event.Id) - // } - stats.SharesFound.Add(1) + stats.VarDiffSharesFound.Add(1) stats.SharesDiff.Add(state.stratumDiff.hashValue) stats.LastShare = time.Now() sh.overall.SharesFound.Add(1) @@ -263,8 +314,8 @@ func (sh *shareHandler) submit(ctx *gostratum.StratumContext, sh.overall.BlocksFound.Add(1) RecordBlockFound(ctx, block.Header.Nonce(), block.Header.BlueScore(), blockhash.String()) - // nil return allows HandleSubmit to record share (blocks are shares too!) and - // handle the response to the client + // nil return allows HandleSubmit to record share (blocks are shares too!) + // and handle the response to the client return nil } @@ -273,29 +324,34 @@ func (sh *shareHandler) startStatsThread() error { for { // console formatting is terrible. Good luck whever touches anything time.Sleep(10 * time.Second) - sh.statsLock.Lock() + + // don't like locking entire stats struct - risk should be negligible + // if mutex is ultimately needed, should move to one per client + // sh.statsLock.Lock() + str := "\n===============================================================================\n" str += " worker name | avg hashrate | acc/stl/inv | blocks | uptime \n" str += "-------------------------------------------------------------------------------\n" var lines []string totalRate := float64(0) for _, v := range sh.stats { + // print stats rate := GetAverageHashrateGHs(v) totalRate += rate - rateStr := fmt.Sprintf("%0.2fGH/s", rate) // todo, fix units + rateStr := stringifyHashrate(rate) ratioStr := fmt.Sprintf("%d/%d/%d", v.SharesFound.Load(), v.StaleShares.Load(), v.InvalidShares.Load()) lines = append(lines, fmt.Sprintf(" %-15s| %14.14s | %14.14s | %12d | %11s", v.WorkerName, rateStr, ratioStr, v.BlocksFound.Load(), time.Since(v.StartTime).Round(time.Second))) } sort.Strings(lines) str += strings.Join(lines, "\n") - rateStr := fmt.Sprintf("%0.2fGH/s", totalRate) // todo, fix units + rateStr := stringifyHashrate(totalRate) ratioStr := fmt.Sprintf("%d/%d/%d", sh.overall.SharesFound.Load(), sh.overall.StaleShares.Load(), sh.overall.InvalidShares.Load()) str += "\n-------------------------------------------------------------------------------\n" str += fmt.Sprintf(" | %14.14s | %14.14s | %12d | %11s", rateStr, ratioStr, sh.overall.BlocksFound.Load(), time.Since(start).Round(time.Second)) str += "\n========================================================== ks_bridge_" + version + " ===\n" - sh.statsLock.Unlock() + // sh.statsLock.Unlock() log.Println(str) } } @@ -303,3 +359,175 @@ func (sh *shareHandler) startStatsThread() error { func GetAverageHashrateGHs(stats *WorkStats) float64 { return stats.SharesDiff.Load() / time.Since(stats.StartTime).Seconds() } + +func stringifyHashrate(ghs float64) string { + unitStrings := [...]string{"M", "G", "T", "P", "E", "Z", "Y"} + var unit string + var hr float64 + + if ghs < 1 { + hr = ghs * 1000 + unit = unitStrings[0] + } else if ghs < 1000 { + hr = ghs + unit = unitStrings[1] + } else { + for i, u := range unitStrings[2:] { + hr = ghs / (float64(i) * 1000) + if hr < 1000 { + break + } + unit = u + } + } + + return fmt.Sprintf("%0.2f%sH/s", hr, unit) +} + +func (sh *shareHandler) startVardiffThread(expectedShareRate uint, logStats bool, clamp bool) error { + // 20 shares/min allows a ~99% confidence assumption of: + // < 100% variation after 1m + // < 50% variation after 3m + // < 25% variation after 10m + // < 15% variation after 30m + // < 10% variation after 1h + // < 5% variation after 4h + var windows = [...]uint{1, 3, 10, 30, 60, 240, 0} + var tolerances = [...]float64{1, 0.5, 0.25, 0.15, 0.1, 0.05, 0.05} + var bws = &utils.BufferedWriteSyncer{WS: os.Stdout, FlushInterval: varDiffThreadSleep * time.Second} + + for { + time.Sleep(varDiffThreadSleep * time.Second) + + // don't like locking entire stats struct - risk should be negligible + // if mutex is ultimately needed, should move to one per client + // sh.statsLock.Lock() + + stats := "\n=== vardiff ===================================================================\n\n" + stats += " worker name | diff | window | elapsed | shares | rate \n" + stats += "-------------------------------------------------------------------------------\n" + + var statsLines []string + var toleranceErrs []string + + for _, v := range sh.stats { + worker := v.WorkerName + if v.VarDiffStartTime.IsZero() { + // no vardiff sent to client + toleranceErrs = append(toleranceErrs, fmt.Sprintf("no diff sent to client %s", worker)) + continue + } + + diff := v.MinDiff.Load() + shares := v.VarDiffSharesFound.Load() + duration := time.Since(v.VarDiffStartTime).Minutes() + shareRate := float64(shares) / duration + shareRateRatio := shareRate / float64(expectedShareRate) + window := windows[v.VarDiffWindow] + tolerance := tolerances[v.VarDiffWindow] + + statsLines = append(statsLines, fmt.Sprintf(" %-14s| %11.2f | %8d | %10.2f | %11d | %9.2f", worker, diff, window, duration, shares, shareRate)) + + // check final stage first, as this is where majority of time spent + if window == 0 { + if math.Abs(1-shareRateRatio) >= tolerance { + // final stage submission rate OOB + toleranceErrs = append(toleranceErrs, fmt.Sprintf("%s final share rate (%f) exceeded tolerance (+/- %d%%)", worker, shareRate, int(tolerance*100))) + updateVarDiff(v, diff*shareRateRatio, clamp) + } + continue + } + + // check all previously cleared windows + i := 1 + for i < v.VarDiffWindow { + if math.Abs(1-shareRateRatio) >= tolerances[i] { + // breached tolerance of previously cleared window + toleranceErrs = append(toleranceErrs, fmt.Sprintf("%s share rate (%f) exceeded tolerance (+/- %d%%) for %dm window", worker, shareRate, int(tolerances[i]*100), windows[i])) + updateVarDiff(v, diff*shareRateRatio, clamp) + break + } + i++ + } + if i < v.VarDiffWindow { + // should only happen if we broke previous loop + continue + } + + // check for current window max exception + if float64(shares) >= float64(window*expectedShareRate)*(1+tolerance) { + // submission rate > window max + toleranceErrs = append(toleranceErrs, fmt.Sprintf("%s share rate (%f) exceeded upper tolerance (+ %d%%) for %dm window", worker, shareRate, int(tolerance*100), window)) + updateVarDiff(v, diff*shareRateRatio, clamp) + continue + } + + // check whether we've exceeded window length + if duration >= float64(window) { + // check for current window min exception + if float64(shares) <= float64(window*expectedShareRate)*(1-tolerance) { + // submission rate < window min + toleranceErrs = append(toleranceErrs, fmt.Sprintf("%s share rate (%f) exceeded lower tolerance (- %d%%) for %dm window", worker, shareRate, int(tolerance*100), window)) + updateVarDiff(v, diff*math.Max(shareRateRatio, 0.1), clamp) + continue + } + + v.VarDiffWindow++ + } + } + sort.Strings(statsLines) + stats += strings.Join(statsLines, "\n") + stats += "\n\n========================================================== ks_bridge_" + version + " ===\n" + stats += "\n" + strings.Join(toleranceErrs, "\n") + "\n\n" + if logStats { + bws.Write([]byte(stats)) + } + + // sh.statsLock.Unlock() + } +} + +// update vardiff with new mindiff, reset counters, and disable tracker until +// client handler restarts it while sending diff on next block +func updateVarDiff(stats *WorkStats, minDiff float64, clamp bool) float64 { + if clamp { + minDiff = math.Pow(2, math.Floor(math.Log2(minDiff))) + } + + previousMinDiff := stats.MinDiff.Load() + newMinDiff := math.Max(0.125, minDiff) + if newMinDiff != previousMinDiff { + log.Printf("updating vardiff to %f for client %s", newMinDiff, stats.WorkerName) + stats.VarDiffStartTime = time.Time{} + stats.VarDiffWindow = 0 + stats.MinDiff.Store(newMinDiff) + } + return previousMinDiff +} + +// (re)start vardiff tracker +func startVarDiff(stats *WorkStats) { + if stats.VarDiffStartTime.IsZero() { + stats.VarDiffSharesFound.Store(0) + stats.VarDiffStartTime = time.Now() + } +} + +func (sh *shareHandler) startClientVardiff(ctx *gostratum.StratumContext) { + stats := sh.getCreateStats(ctx) + startVarDiff(stats) +} + +func (sh *shareHandler) getClientVardiff(ctx *gostratum.StratumContext) float64 { + stats := sh.getCreateStats(ctx) + return stats.MinDiff.Load() +} + +func (sh *shareHandler) setClientVardiff(ctx *gostratum.StratumContext, minDiff float64) float64 { + stats := sh.getCreateStats(ctx) + // only called for initial diff setting, and clamping is handled during + // config load + previousMinDiff := updateVarDiff(stats, minDiff, false) + startVarDiff(stats) + return previousMinDiff +} diff --git a/src/kaspastratum/stratum_server.go b/src/kaspastratum/stratum_server.go index b4b11af..1d3965a 100644 --- a/src/kaspastratum/stratum_server.go +++ b/src/kaspastratum/stratum_server.go @@ -2,6 +2,7 @@ package kaspastratum import ( "context" + "math" "net/http" _ "net/http/pprof" "os" @@ -9,12 +10,13 @@ import ( "github.com/mattn/go-colorable" "github.com/onemorebsmith/kaspastratum/src/gostratum" + "github.com/onemorebsmith/kaspastratum/src/utils" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) -const version = "v1.1.6" -const minBlockWaitTime = 500 * time.Millisecond +const version = "v1.2.2" +const minBlockWaitTime = 3 * time.Second type BridgeConfig struct { StratumPort string `yaml:"stratum_port"` @@ -25,7 +27,11 @@ type BridgeConfig struct { HealthCheckPort string `yaml:"health_check_port"` BlockWaitTime time.Duration `yaml:"block_wait_time"` MinShareDiff uint `yaml:"min_share_diff"` + VarDiff bool `yaml:"var_diff"` + SharesPerMin uint `yaml:"shares_per_min"` + VarDiffStats bool `yaml:"var_diff_stats"` ExtranonceSize uint `yaml:"extranonce_size"` + ClampPow2 bool `yaml:"pow2_clamp"` } func configureZap(cfg BridgeConfig) (*zap.SugaredLogger, func()) { @@ -34,9 +40,11 @@ func configureZap(cfg BridgeConfig) (*zap.SugaredLogger, func()) { fileEncoder := zapcore.NewJSONEncoder(pe) consoleEncoder := zapcore.NewConsoleEncoder(pe) + bws := &utils.BufferedWriteSyncer{WS: zapcore.AddSync(colorable.NewColorableStdout()), FlushInterval: 5 * time.Second} + if !cfg.UseLogFile { return zap.New(zapcore.NewCore(consoleEncoder, - zapcore.AddSync(colorable.NewColorableStdout()), zap.InfoLevel)).Sugar(), func() {} + bws, zap.InfoLevel)).Sugar(), func() { bws.Stop() } } // log file fun @@ -44,11 +52,12 @@ func configureZap(cfg BridgeConfig) (*zap.SugaredLogger, func()) { if err != nil { panic(err) } + blws := &utils.BufferedWriteSyncer{WS: zapcore.AddSync(logFile), FlushInterval: 5 * time.Second} core := zapcore.NewTee( - zapcore.NewCore(fileEncoder, zapcore.AddSync(logFile), zap.InfoLevel), - zapcore.NewCore(consoleEncoder, zapcore.AddSync(colorable.NewColorableStdout()), zap.InfoLevel), + zapcore.NewCore(fileEncoder, blws, zap.InfoLevel), + zapcore.NewCore(consoleEncoder, bws, zap.InfoLevel), ) - return zap.New(core).Sugar(), func() { logFile.Close() } + return zap.New(core).Sugar(), func() { bws.Stop(); blws.Stop(); logFile.Close() } } func ListenAndServe(cfg BridgeConfig) error { @@ -60,7 +69,7 @@ func ListenAndServe(cfg BridgeConfig) error { } blockWaitTime := cfg.BlockWaitTime - if blockWaitTime < minBlockWaitTime { + if blockWaitTime == 0 { blockWaitTime = minBlockWaitTime } ksApi, err := NewKaspaAPI(cfg.RPCServer, blockWaitTime, logger) @@ -77,15 +86,18 @@ func ListenAndServe(cfg BridgeConfig) error { } shareHandler := newShareHandler(ksApi.kaspad) - minDiff := cfg.MinShareDiff - if minDiff < 1 { - minDiff = 1 + minDiff := float64(cfg.MinShareDiff) + if minDiff == 0 { + minDiff = 4 + } + if cfg.ClampPow2 { + minDiff = math.Pow(2, math.Floor(math.Log2(minDiff))) } extranonceSize := cfg.ExtranonceSize if extranonceSize > 3 { extranonceSize = 3 } - clientHandler := newClientListener(logger, shareHandler, float64(minDiff), int8(extranonceSize)) + clientHandler := newClientListener(logger, shareHandler, minDiff, int8(extranonceSize)) handlers := gostratum.DefaultHandlers() // override the submit handler with an actual useful handler handlers[string(gostratum.StratumMethodSubmit)] = @@ -110,6 +122,14 @@ func ListenAndServe(cfg BridgeConfig) error { clientHandler.NewBlockAvailable(ksApi) }) + if cfg.VarDiff { + sharesPerMin := cfg.SharesPerMin + if sharesPerMin <= 0 { + sharesPerMin = 20 + } + go shareHandler.startVardiffThread(sharesPerMin, cfg.VarDiffStats, cfg.ClampPow2) + } + if cfg.PrintStats { go shareHandler.startStatsThread() } diff --git a/src/utils/buffered_write_syncer.go b/src/utils/buffered_write_syncer.go new file mode 100644 index 0000000..dd008af --- /dev/null +++ b/src/utils/buffered_write_syncer.go @@ -0,0 +1,218 @@ +package utils + +import ( + "bufio" + "context" + "errors" + "sync" + "time" + + "go.uber.org/multierr" + "go.uber.org/zap/zapcore" +) + +const ( + // _defaultBufferSize specifies the default size used by Buffer. + _defaultBufferSize = 256 * 1024 // 256 kB + + // _defaultFlushInterval specifies the default flush interval for + // Buffer. + _defaultFlushInterval = 30 * time.Second +) + +var ErrWSFlush = errors.New("unable to flush write stream") + +// buffers writes in-memory before flushing them to a wrapped WriteSyncer after +// reaching some limit, or at some fixed interval--whichever comes first. +type BufferedWriteSyncer struct { + // WS is the WriteSyncer around which BufferedWriteSyncer will buffer + // writes. + // + // This field is required. + WS zapcore.WriteSyncer + + // Size specifies the maximum amount of data the writer will buffered + // before flushing. + // + // Defaults to 256 kB if unspecified. + Size int + + // FlushInterval specifies how often the writer should flush data if + // there have been no writes. + // + // Defaults to 30 seconds if unspecified. + FlushInterval time.Duration + + // Clock, if specified, provides control of the source of time for the + // writer. + // + // Defaults to the system clock. + Clock zapcore.Clock + + // unexported fields for state + mu sync.Mutex + initialized bool // whether initialize() has run + stopped bool // whether Stop() has run + writer *bufio.Writer + ticker *time.Ticker + stop chan struct{} // closed when flushLoop should stop + done chan struct{} // closed when flushLoop has stopped +} + +func (s *BufferedWriteSyncer) initialize() { + size := s.Size + if size == 0 { + size = _defaultBufferSize + } + + flushInterval := s.FlushInterval + if flushInterval == 0 { + flushInterval = _defaultFlushInterval + } + + if s.Clock == nil { + s.Clock = zapcore.DefaultClock + } + + s.ticker = s.Clock.NewTicker(flushInterval) + s.writer = bufio.NewWriterSize(s.WS, size) + s.stop = make(chan struct{}) + s.done = make(chan struct{}) + s.initialized = true + go s.flushLoop() +} + +// Write writes log data into buffer syncer directly, multiple Write calls will +// be batched, and log data will be flushed to disk when the buffer is full or +// periodically. +func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) { + locked := false + tryCount := 0 + for { + locked = s.mu.TryLock() + if !locked { + if tryCount < 5 { + time.Sleep(100 * time.Millisecond) + tryCount++ + } else { + // silently dropping log messages if we can't acquire lock + return len(bs), nil + } + } else { + break + } + } + + defer s.mu.Unlock() + + if !s.initialized { + s.initialize() + } + + // To avoid partial writes from being flushed, we manually flush the + // existing buffer if: + // * The current write doesn't fit into the buffer fully, and + // * The buffer is not empty (since bufio will not split large writes when the buffer is empty) + if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 { + err := s.Flush() + if err != nil { + if errors.Is(err, ErrWSFlush) { + // silently dropping log messages if we can't flush buffer + return len(bs), nil + } else { + return 0, err + } + } + } + + return s.writer.Write(bs) +} + +// Flush with timeout +func (s *BufferedWriteSyncer) Flush() error { + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + c := make(chan error) + + go func(ctx context.Context) { + err := s.writer.Flush() + select { + case <-ctx.Done(): + c <- ErrWSFlush + default: + c <- err + } + }(ctx) + + return <-c +} + +// Sync flushes buffered log data into disk directly. +func (s *BufferedWriteSyncer) Sync() error { + s.mu.Lock() + defer s.mu.Unlock() + + var err error + if s.initialized { + err = s.Flush() + } + + if errors.Is(err, ErrWSFlush) { + // unable to flush buffer, fail silently and skip WS sync + return nil + } else { + return multierr.Append(err, s.WS.Sync()) + } +} + +// flushLoop flushes the buffer at the configured interval until Stop is +// called. +func (s *BufferedWriteSyncer) flushLoop() { + defer close(s.done) + + for { + select { + case <-s.ticker.C: + // we just simply ignore error here + // because the underlying bufio writer stores any errors + // and we return any error from Sync() as part of the close + _ = s.Sync() + case <-s.stop: + return + } + } +} + +// Stop closes the buffer, cleans up background goroutines, and flushes +// remaining unwritten data. +func (s *BufferedWriteSyncer) Stop() (err error) { + var stopped bool + + // Critical section. + func() { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.initialized { + return + } + + stopped = s.stopped + if stopped { + return + } + s.stopped = true + + s.ticker.Stop() + close(s.stop) // tell flushLoop to stop + <-s.done // and wait until it has + }() + + // Don't call Sync on consecutive Stops. + if !stopped { + err = s.Sync() + } + + return err +}