diff --git a/app/app.go b/app/app.go index 41c1a4e68..6e001d087 100644 --- a/app/app.go +++ b/app/app.go @@ -68,11 +68,15 @@ func RunWarp(ctx context.Context, opts WarpOptions) error { if err != nil { return err } + + log.Printf("scan results: %+v", res) + endpoints = make([]string, len(res)) for i := 0; i < len(res); i++ { - endpoints[i] = res[i].String() + endpoints[i] = res[i].AddrPort.String() } } + log.Printf("using warp endpoints: %+v", endpoints) var warpErr error switch { diff --git a/go.mod b/go.mod index a39cfd83c..ab91e1b1a 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/MakeNowJust/heredoc/v2 v2.0.1 github.com/Psiphon-Labs/psiphon-tunnel-core v2.0.28+incompatible github.com/bepass-org/proxy v0.0.0-20240201095508-c86216dd0aea - github.com/davecgh/go-spew v1.1.1 + github.com/fatih/color v1.16.0 github.com/flynn/noise v1.1.0 github.com/go-ini/ini v1.67.0 github.com/hashicorp/golang-lru v1.0.2 @@ -16,6 +16,7 @@ require ( github.com/quic-go/quic-go v0.40.1 github.com/refraction-networking/conjure v0.7.11-0.20240130155008-c8df96195ab2 github.com/refraction-networking/utls v1.3.3 + github.com/rodaine/table v1.1.1 golang.org/x/crypto v0.19.0 golang.org/x/net v0.21.0 golang.org/x/sys v0.17.0 @@ -48,6 +49,8 @@ require ( github.com/juju/ratelimit v1.0.2 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/libp2p/go-reuseport v0.4.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/miekg/dns v1.1.44-0.20210804161652-ab67aa642300 // indirect github.com/mroth/weightedrand v1.0.0 // indirect github.com/onsi/ginkgo/v2 v2.9.5 // indirect diff --git a/go.sum b/go.sum index bf67c3222..5769cd374 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/elazarl/goproxy v0.0.0-20200809112317-0581fc3aee2d h1:rtM8HsT3NG37YPj github.com/elazarl/goproxy v0.0.0-20200809112317-0581fc3aee2d/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= github.com/elazarl/goproxy/ext v0.0.0-20200809112317-0581fc3aee2d h1:st1tmvy+4duoRj+RaeeJoECWCWM015fBtf/4aR+hhqk= github.com/elazarl/goproxy/ext v0.0.0-20200809112317-0581fc3aee2d/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/florianl/go-nfqueue v1.1.1-0.20200829120558-a2f196e98ab0 h1:7ZJyJV4KiWBijCCzUPvVaqxsDxO36+KD0XKBdEN3I+8= github.com/florianl/go-nfqueue v1.1.1-0.20200829120558-a2f196e98ab0/go.mod h1:2z3Tfqwv2ueuK6h563xUHRcCh1mv38wS9EjiWiesk84= github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg= @@ -78,8 +80,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y= @@ -105,6 +107,13 @@ github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQsc github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU= github.com/marusama/semaphore v0.0.0-20171214154724-565ffd8e868a h1:6SRny9FLB1eWasPyDUqBQnMi9NhXU01XIlB0ao89YoI= github.com/marusama/semaphore v0.0.0-20171214154724-565ffd8e868a/go.mod h1:TmeOqAKoDinfPfSohs14CO3VcEf7o+Bem6JiNe05yrQ= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mdlayher/netlink v1.4.2-0.20210930205308-a81a8c23d40a h1:yk5OmRew64lWdeNanQ3l0hDgUt1E8MfipPhh/GO9Tuw= github.com/mdlayher/netlink v1.4.2-0.20210930205308-a81a8c23d40a/go.mod h1:qw8F9IVzxa0GpqhVAfOw8DNyo7ec/jxI6bPWPEg1MV4= github.com/mdlayher/socket v0.0.0-20210624160740-9dbe287ded84 h1:L1jnQ6o+K3M574eez7eTxbsia6H1SfJaVpaXY33L37Q= @@ -164,6 +173,10 @@ github.com/refraction-networking/obfs4 v0.1.2 h1:J842O4fGSkd2W8ogYj0KN6gqVVY+Cpq github.com/refraction-networking/obfs4 v0.1.2/go.mod h1:wAl/+gWiLsrcykJA3nKJHx89f5/gXGM8UKvty7+mvbM= github.com/refraction-networking/utls v1.3.3 h1:f/TBLX7KBciRyFH3bwupp+CE4fzoYKCirhdRcC490sw= github.com/refraction-networking/utls v1.3.3/go.mod h1:DlecWW1LMlMJu+9qpzzQqdHDT/C2LAe03EdpLUz/RL8= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rodaine/table v1.1.1 h1:zBliy3b4Oj6JRmncse2Z85WmoQvDrXOYuy0JXCt8Qz8= +github.com/rodaine/table v1.1.1/go.mod h1:iqTRptjn+EVcrVBYtNMlJ2wrJZa3MpULUmcXFpfcziA= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 h1:7YvPJVmEeFHR1Tj9sZEYsmarJEQfMVYpd/Vyy/A8dqE= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/sergeyfrolov/bsbuffer v0.0.0-20180903213811-94e85abb8507 h1:ML7ZNtcln5UBo5Wv7RIv9Xg3Pr5VuRCWLFXEwda54Y4= @@ -228,7 +241,9 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/ipscanner/example/cfscanner/main.go b/ipscanner/example/cfscanner/main.go index cacaa8de8..a87796135 100644 --- a/ipscanner/example/cfscanner/main.go +++ b/ipscanner/example/cfscanner/main.go @@ -1,6 +1,10 @@ package main -import "github.com/bepass-org/wireguard-go/ipscanner" +import ( + "context" + + "github.com/bepass-org/wireguard-go/ipscanner" +) func main() { // new scanner @@ -8,6 +12,10 @@ func main() { ipscanner.WithHTTPPing(), ipscanner.WithUseIPv6(true), ) - go scanner.Run() - select {} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go scanner.Run(ctx) + + <-ctx.Done() } diff --git a/ipscanner/example/warpscanner/main.go b/ipscanner/example/warpscanner/main.go index f6fb1264c..cb3d266f5 100644 --- a/ipscanner/example/warpscanner/main.go +++ b/ipscanner/example/warpscanner/main.go @@ -1,13 +1,18 @@ package main import ( - "fmt" + "context" + "log/slog" "net" "net/netip" + "os" "time" "github.com/bepass-org/wireguard-go/ipscanner" + "github.com/bepass-org/wireguard-go/ipscanner/internal/statute" "github.com/bepass-org/wireguard-go/warp" + "github.com/fatih/color" + "github.com/rodaine/table" ) var ( @@ -30,34 +35,59 @@ func canConnectIPv6(remoteAddr netip.AddrPort) bool { return true } -func RunScan(privKey, pubKey string) (result []netip.AddrPort) { +func RunScan(privKey, pubKey string) (result []statute.IPInfo) { // new scanner scanner := ipscanner.NewScanner( + ipscanner.WithLogger(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))), ipscanner.WithWarpPing(), ipscanner.WithWarpPrivateKey(privKey), ipscanner.WithWarpPeerPublicKey(pubKey), ipscanner.WithUseIPv6(canConnectIPv6(googlev6DNSAddr80)), ipscanner.WithUseIPv4(true), - ipscanner.WithMaxDesirableRTT(500), + ipscanner.WithMaxDesirableRTT(500*time.Millisecond), ipscanner.WithCidrList(warp.WarpPrefixes()), ) - scanner.Run() - var ipList []netip.Addr + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + scanner.Run(ctx) + + t := time.NewTicker(1 * time.Second) + defer t.Stop() + for { - ipList = scanner.GetAvailableIPs() - if len(ipList) > 2 { - scanner.Stop() - break + ipList := scanner.GetAvailableIPs() + if len(ipList) > 1 { + for i := 0; i < 2; i++ { + result = append(result, ipList[i]) + } + return + } + + select { + case <-ctx.Done(): + // Context is done + return + case <-t.C: + // Prevent the loop from spinning too fast + continue } - time.Sleep(1 * time.Second) - } - for i := 0; i < 2; i++ { - result = append(result, netip.AddrPortFrom(ipList[i], warp.RandomWarpPort())) } - return } func main() { - fmt.Println(RunScan(privKey, pubKey)) - time.Sleep(10 * time.Second) + result := RunScan(privKey, pubKey) + + headerFmt := color.New(color.FgGreen, color.Underline).SprintfFunc() + columnFmt := color.New(color.FgYellow).SprintfFunc() + + tbl := table.New("Address", "RTT (ping)", "Time") + tbl.WithHeaderFormatter(headerFmt).WithFirstColumnFormatter(columnFmt) + + for _, info := range result { + tbl.AddRow(info.AddrPort, info.RTT, info.CreatedAt) + } + + tbl.Print() } diff --git a/ipscanner/internal/engine/engine.go b/ipscanner/internal/engine/engine.go index cf16749d8..e6f9aea66 100644 --- a/ipscanner/internal/engine/engine.go +++ b/ipscanner/internal/engine/engine.go @@ -2,9 +2,8 @@ package engine import ( "context" - "fmt" + "log/slog" "net/netip" - "strings" "time" "github.com/bepass-org/wireguard-go/ipscanner/internal/iterator" @@ -13,92 +12,65 @@ import ( ) type Engine struct { - generator *iterator.IpGenerator - ipQueue *IPQueue - ctx context.Context - cancelFunc context.CancelFunc - ping func(netip.Addr) (int, error) - statute.Logger + generator *iterator.IpGenerator + ipQueue *IPQueue + ping func(netip.Addr) (statute.IPInfo, error) + log *slog.Logger } -func NewScannerEngine(opts *statute.ScannerOptions, ctx ...context.Context) *Engine { +func NewScannerEngine(opts *statute.ScannerOptions) *Engine { queue := NewIPQueue(opts) - var contextToUse context.Context - var cancel context.CancelFunc - if len(ctx) > 0 { - contextToUse = ctx[0] - } else { - contextToUse, cancel = context.WithCancel(context.Background()) - } p := ping.Ping{ Options: opts, } return &Engine{ - ipQueue: queue, - ctx: contextToUse, - cancelFunc: cancel, - ping: p.DoPing, - generator: iterator.NewIterator(opts), - Logger: opts.Logger, + ipQueue: queue, + ping: p.DoPing, + generator: iterator.NewIterator(opts), + log: opts.Logger.With(slog.String("subsystem", "engine")), } } -func (e *Engine) GetAvailableIPs(desc bool) []netip.Addr { +func (e *Engine) GetAvailableIPs(desc bool) []statute.IPInfo { if e.ipQueue != nil { return e.ipQueue.AvailableIPs(desc) } return nil } -func (e *Engine) Run() { +func (e *Engine) Run(ctx context.Context) { for { select { - case <-e.ctx.Done(): - fmt.Println("Context Done!") + case <-ctx.Done(): return case <-e.ipQueue.available: - e.Logger.Debug("New Scanning Round Started") + e.log.Debug("Started new scanning round") batch, err := e.generator.NextBatch() if err != nil { - e.Logger.Error("Error while generating IP: %v", err) + e.log.Error("Error while generating IP: %v", err) // in case of disastrous error, to prevent resource draining wait for 2 seconds and try again time.Sleep(2 * time.Second) continue } for _, ip := range batch { select { - case <-e.ctx.Done(): - fmt.Println("Context Done!") + case <-ctx.Done(): return default: - e.Logger.Debug("Pinging IP: %s", ip) - if rtt, err := e.ping(ip); err == nil { - ipInfo := statute.IPInfo{ - IP: ip, - RTT: rtt, - CreatedAt: time.Now(), - } - e.Logger.Debug("IP: %s, RTT: %d", ip, rtt) + e.log.Debug("pinging IP", "addr", ip) + if ipInfo, err := e.ping(ip); err == nil { + e.log.Debug("ping success", "addr", ipInfo.AddrPort, "rtt", ipInfo.RTT) e.ipQueue.Enqueue(ipInfo) } else { - // if timeout error - if strings.Contains(err.Error(), ": i/o timeout") { - e.Logger.Debug("Timeout Error: %s", ip) - continue - } - e.Logger.Error("Error while pinging IP: %s, Error: %v", ip, err) + e.log.Error("ping error", "addr", ip, "error", err) } } } default: - e.Logger.Debug("Engine: call the expire function") + e.log.Debug("calling expire") e.ipQueue.Expire() time.Sleep(200 * time.Millisecond) } } } - -func (e *Engine) Cancel() { - e.cancelFunc() -} diff --git a/ipscanner/internal/engine/queue.go b/ipscanner/internal/engine/queue.go index 87b4ba0a2..6436fec91 100644 --- a/ipscanner/internal/engine/queue.go +++ b/ipscanner/internal/engine/queue.go @@ -1,7 +1,7 @@ package engine import ( - "net/netip" + "log/slog" "sort" "sync" "time" @@ -10,29 +10,27 @@ import ( ) type IPQueue struct { - queue []statute.IPInfo - maxQueueSize int - mu sync.Mutex - available chan struct{} - maxTTL time.Duration - rttThreshold int - inIdealMode bool - onChangeCallback statute.TIPQueueChangeCallback - logger statute.Logger - reserved statute.IPInfQueue + queue []statute.IPInfo + maxQueueSize int + mu sync.Mutex + available chan struct{} + maxTTL time.Duration + rttThreshold time.Duration + inIdealMode bool + log *slog.Logger + reserved statute.IPInfQueue } func NewIPQueue(opts *statute.ScannerOptions) *IPQueue { var reserved statute.IPInfQueue return &IPQueue{ - queue: make([]statute.IPInfo, 0), - maxQueueSize: opts.IPQueueSize, - maxTTL: opts.IPQueueTTL, - rttThreshold: opts.MaxDesirableRTT, - available: make(chan struct{}, opts.IPQueueSize), - onChangeCallback: opts.IPQueueChangeCallback, - logger: opts.Logger, - reserved: reserved, + queue: make([]statute.IPInfo, 0), + maxQueueSize: opts.IPQueueSize, + maxTTL: opts.IPQueueTTL, + rttThreshold: opts.MaxDesirableRTT, + available: make(chan struct{}, opts.IPQueueSize), + log: opts.Logger.With(slog.String("subsystem", "engine/queue")), + reserved: reserved, } } @@ -41,43 +39,51 @@ func (q *IPQueue) Enqueue(info statute.IPInfo) bool { defer q.mu.Unlock() defer func() { - q.onChangeCallback(q.queue) + q.log.Debug("queue change", "len", len(q.queue)) + for _, ipInfo := range q.queue { + q.log.Debug( + "queue change", + "created", ipInfo.CreatedAt, + "addr", ipInfo.AddrPort, + "rtt", ipInfo.RTT, + ) + } }() - q.logger.Debug("Enqueue: Sorting queue by RTT") + q.log.Debug("Enqueue: Sorting queue by RTT") sort.Slice(q.queue, func(i, j int) bool { return q.queue[i].RTT < q.queue[j].RTT }) if len(q.queue) == 0 { - q.logger.Debug("Enqueue: empty queue adding first available item") + q.log.Debug("Enqueue: empty queue adding first available item") q.queue = append(q.queue, info) return false } if info.RTT <= q.rttThreshold { - q.logger.Debug("Enqueue: the new item's RTT is less than at least one of the members.") + q.log.Debug("Enqueue: the new item's RTT is less than at least one of the members.") if len(q.queue) >= q.maxQueueSize && info.RTT < q.queue[len(q.queue)-1].RTT { - q.logger.Debug("Enqueue: the queue is full, remove the item with the highest RTT.") + q.log.Debug("Enqueue: the queue is full, remove the item with the highest RTT.") q.queue = q.queue[:len(q.queue)-1] } else if len(q.queue) < q.maxQueueSize { - q.logger.Debug("Enqueue: Insert the new item in a sorted position.") + q.log.Debug("Enqueue: Insert the new item in a sorted position.") index := sort.Search(len(q.queue), func(i int) bool { return q.queue[i].RTT > info.RTT }) q.queue = append(q.queue[:index], append([]statute.IPInfo{info}, q.queue[index:]...)...) } else { - q.logger.Debug("Enqueue: The Queue is full but we keep the new item in the reserved queue.") + q.log.Debug("Enqueue: The Queue is full but we keep the new item in the reserved queue.") q.reserved.Enqueue(info) } } - q.logger.Debug("Enqueue: Checking if any member has a higher RTT than the threshold.") + q.log.Debug("Enqueue: Checking if any member has a higher RTT than the threshold.") for _, member := range q.queue { if member.RTT > q.rttThreshold { return false // If any member has a higher RTT than the threshold, return false. } } - q.logger.Debug("Enqueue: All members have an RTT lower than the threshold.") + q.log.Debug("Enqueue: All members have an RTT lower than the threshold.") if len(q.queue) < q.maxQueueSize { // the queue isn't full dont wait return false @@ -85,13 +91,21 @@ func (q *IPQueue) Enqueue(info statute.IPInfo) bool { q.inIdealMode = true // ok wait for expiration signal - q.logger.Debug("Enqueue: All members have an RTT lower than the threshold. Waiting for expiration signal.") + q.log.Debug("Enqueue: All members have an RTT lower than the threshold. Waiting for expiration signal.") return true } func (q *IPQueue) Dequeue() (statute.IPInfo, bool) { defer func() { - go q.onChangeCallback(q.queue) + q.log.Debug("queue change", "len", len(q.queue)) + for _, ipInfo := range q.queue { + q.log.Debug( + "queue change", + "created", ipInfo.CreatedAt, + "addr", ipInfo.AddrPort, + "rtt", ipInfo.RTT, + ) + } }() q.mu.Lock() defer q.mu.Unlock() @@ -113,28 +127,36 @@ func (q *IPQueue) Expire() { defer q.mu.Unlock() if !q.inIdealMode { - q.logger.Debug("Expire: Not in ideal mode") + q.log.Debug("Expire: Not in ideal mode") q.available <- struct{}{} return } - q.logger.Debug("Expire: In ideal mode") + q.log.Debug("Expire: In ideal mode") defer func() { - q.onChangeCallback(q.queue) + q.log.Debug("queue change", "len", len(q.queue)) + for _, ipInfo := range q.queue { + q.log.Debug( + "queue change", + "created", ipInfo.CreatedAt, + "addr", ipInfo.AddrPort, + "rtt", ipInfo.RTT, + ) + } }() shouldStartNewScan := false resQ := make([]statute.IPInfo, 0) for i := 0; i < len(q.queue); i++ { if time.Since(q.queue[i].CreatedAt) > q.maxTTL { - q.logger.Debug("Expire: Removing expired item from queue") + q.log.Debug("Expire: Removing expired item from queue") shouldStartNewScan = true } else { resQ = append(resQ, q.queue[i]) } } q.queue = resQ - q.logger.Debug("Expire: Adding reserved items to queue") + q.log.Debug("Expire: Adding reserved items to queue") for i := 0; i < q.maxQueueSize && i < q.reserved.Size(); i++ { q.queue = append(q.queue, q.reserved.Dequeue()) } @@ -143,7 +165,7 @@ func (q *IPQueue) Expire() { } } -func (q *IPQueue) AvailableIPs(desc bool) []netip.Addr { +func (q *IPQueue) AvailableIPs(desc bool) []statute.IPInfo { q.mu.Lock() defer q.mu.Unlock() @@ -159,10 +181,5 @@ func (q *IPQueue) AvailableIPs(desc bool) []netip.Addr { return sortedQueue[i].RTT < sortedQueue[j].RTT }) - ips := make([]netip.Addr, len(sortedQueue)) - for i, info := range sortedQueue { - ips[i] = info.IP - } - - return ips + return sortedQueue } diff --git a/ipscanner/internal/ping/http.go b/ipscanner/internal/ping/http.go index 90ac553c4..295d8c467 100644 --- a/ipscanner/internal/ping/http.go +++ b/ipscanner/internal/ping/http.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "net" "net/http" "net/netip" "net/url" @@ -14,16 +13,16 @@ import ( ) type HttpPingResult struct { - Time int - Proto string - Status int - Length int - Err error - IP netip.Addr + AddrPort netip.AddrPort + Proto string + Status int + Length int + RTT time.Duration + Err error } -func (h *HttpPingResult) Result() int { - return h.Time +func (h *HttpPingResult) Result() statute.IPInfo { + return statute.IPInfo{AddrPort: h.AddrPort, RTT: h.RTT, CreatedAt: time.Now()} } func (h *HttpPingResult) Error() error { @@ -33,9 +32,9 @@ func (h *HttpPingResult) Error() error { func (h *HttpPingResult) String() string { if h.Err != nil { return fmt.Sprintf("%s", h.Err) - } else { - return fmt.Sprintf("%s: protocol=%s, status=%d, length=%d, time=%d ms", h.IP.String(), h.Proto, h.Status, h.Length, h.Time) } + + return fmt.Sprintf("%s: protocol=%s, status=%d, length=%d, time=%d ms", h.AddrPort, h.Proto, h.Status, h.Length, h.RTT) } type HttpPing struct { @@ -56,16 +55,10 @@ func (h *HttpPing) PingContext(ctx context.Context) statute.IPingResult { return h.errorResult(err) } orighost := u.Host - port := u.Port() - ip := statute.CloneIP(h.IP) - if !ip.IsValid() { + + if !h.IP.IsValid() { return h.errorResult(fmt.Errorf("no IP specified")) } - ipstr := ip.String() - if statute.IsIPv6(ip) { - ipstr = fmt.Sprintf("[%s]", ipstr) - } - targetAddr := net.JoinHostPort(ipstr, port) req, err := http.NewRequestWithContext(ctx, h.Method, h.URL, nil) if err != nil { @@ -81,22 +74,35 @@ func (h *HttpPing) PingContext(ctx context.Context) statute.IPingResult { } req.Host = orighost - client := h.opts.HttpClientFunc(h.opts.RawDialerFunc, h.opts.TLSDialerFunc, h.opts.QuicDialerFunc, targetAddr) + addr := netip.AddrPortFrom(h.IP, h.opts.Port) + client := h.opts.HttpClientFunc(h.opts.RawDialerFunc, h.opts.TLSDialerFunc, h.opts.QuicDialerFunc, addr.String()) client.CheckRedirect = func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse } + t0 := time.Now() resp, err := client.Do(req) if err != nil { return h.errorResult(err) } + defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return h.errorResult(err) } - return &HttpPingResult{int(time.Since(t0).Milliseconds()), resp.Proto, resp.StatusCode, len(body), nil, ip} + + res := HttpPingResult{ + AddrPort: addr, + Proto: resp.Proto, + Status: resp.StatusCode, + Length: len(body), + RTT: time.Since(t0), + Err: nil, + } + + return &res } func (h *HttpPing) errorResult(err error) *HttpPingResult { diff --git a/ipscanner/internal/ping/ping.go b/ipscanner/internal/ping/ping.go index 1ea02a5c5..794c2e732 100644 --- a/ipscanner/internal/ping/ping.go +++ b/ipscanner/internal/ping/ping.go @@ -1,6 +1,7 @@ package ping import ( + "errors" "fmt" "net/netip" @@ -12,56 +13,52 @@ type Ping struct { } // DoPing performs a ping on the given IP address. -func (p *Ping) DoPing(ip netip.Addr) (int, error) { - var sum, ops, hp, tp int - var err error +func (p *Ping) DoPing(ip netip.Addr) (statute.IPInfo, error) { if p.Options.SelectedOps&statute.HTTPPing > 0 { - hp, err = p.httpPing(ip) + res, err := p.httpPing(ip) if err != nil { - return 0, err + return statute.IPInfo{}, err } - ops++ - sum += hp + + return res, nil } if p.Options.SelectedOps&statute.TLSPing > 0 { - tp, err = p.tlsPing(ip) + res, err := p.tlsPing(ip) if err != nil { - return 0, err + return statute.IPInfo{}, err } - ops++ - sum += tp + + return res, nil } if p.Options.SelectedOps&statute.TCPPing > 0 { - tp, err = p.tcpPing(ip) + res, err := p.tcpPing(ip) if err != nil { - return 0, err + return statute.IPInfo{}, err } - ops++ - sum += tp + + return res, nil } if p.Options.SelectedOps&statute.QUICPing > 0 { - tp, err = p.quicPing(ip) + res, err := p.quicPing(ip) if err != nil { - return 0, err + return statute.IPInfo{}, err } - ops++ - sum += tp + + return res, nil } if p.Options.SelectedOps&statute.WARPPing > 0 { - tp, err = p.warpPing(ip) + res, err := p.warpPing(ip) if err != nil { - return 0, err + return statute.IPInfo{}, err } - ops++ - sum += tp - } - if ops == 0 { - return 99, nil + + return res, nil } - return sum / ops, nil + + return statute.IPInfo{}, errors.New("no ping operation selected") } -func (p *Ping) httpPing(ip netip.Addr) (int, error) { +func (p *Ping) httpPing(ip netip.Addr) (statute.IPInfo, error) { return p.calc( NewHttpPing( ip, @@ -77,38 +74,33 @@ func (p *Ping) httpPing(ip netip.Addr) (int, error) { ) } -func (p *Ping) warpPing(ip netip.Addr) (int, error) { - return p.calc( - NewWarpPing( - ip, - p.Options, - ), - ) +func (p *Ping) warpPing(ip netip.Addr) (statute.IPInfo, error) { + return p.calc(NewWarpPing(ip, p.Options)) } -func (p *Ping) tlsPing(ip netip.Addr) (int, error) { +func (p *Ping) tlsPing(ip netip.Addr) (statute.IPInfo, error) { return p.calc( NewTlsPing(ip, p.Options.Hostname, p.Options.Port, p.Options), ) } -func (p *Ping) tcpPing(ip netip.Addr) (int, error) { +func (p *Ping) tcpPing(ip netip.Addr) (statute.IPInfo, error) { return p.calc( NewTcpPing(ip, p.Options.Hostname, p.Options.Port, p.Options), ) } -func (p *Ping) quicPing(ip netip.Addr) (int, error) { +func (p *Ping) quicPing(ip netip.Addr) (statute.IPInfo, error) { return p.calc( NewQuicPing(ip, p.Options.Hostname, p.Options.Port, p.Options), ) } -func (p *Ping) calc(tp statute.IPing) (int, error) { +func (p *Ping) calc(tp statute.IPing) (statute.IPInfo, error) { pr := tp.Ping() err := pr.Error() if err != nil { - return 0, err + return statute.IPInfo{}, err } return pr.Result(), nil } diff --git a/ipscanner/internal/ping/quic.go b/ipscanner/internal/ping/quic.go index 38674785a..a01439c66 100644 --- a/ipscanner/internal/ping/quic.go +++ b/ipscanner/internal/ping/quic.go @@ -3,7 +3,6 @@ package ping import ( "context" "fmt" - "net" "net/netip" "time" @@ -14,15 +13,15 @@ import ( ) type QuicPingResult struct { - Time int - Err error - IP netip.Addr - QUICVersion uint32 + AddrPort netip.AddrPort + QUICVersion quic.VersionNumber TLSVersion uint16 + RTT time.Duration + Err error } -func (h *QuicPingResult) Result() int { - return h.Time +func (h *QuicPingResult) Result() statute.IPInfo { + return statute.IPInfo{AddrPort: h.AddrPort, RTT: h.RTT, CreatedAt: time.Now()} } func (h *QuicPingResult) Error() error { @@ -32,9 +31,9 @@ func (h *QuicPingResult) Error() error { func (h *QuicPingResult) String() string { if h.Err != nil { return fmt.Sprintf("%s", h.Err) - } else { - return fmt.Sprintf("%s: quic=%s, tls=%s, time=%d ms", h.IP.String(), quic.VersionNumber(h.QUICVersion).String(), statute.TlsVersionToString(h.TLSVersion), h.Time) } + + return fmt.Sprintf("%s: quic=%s, tls=%s, time=%d ms", h.AddrPort, quic.VersionNumber(h.QUICVersion), statute.TlsVersionToString(h.TLSVersion), h.RTT) } type QuicPing struct { @@ -50,20 +49,28 @@ func (h *QuicPing) Ping() statute.IPingResult { } func (h *QuicPing) PingContext(ctx context.Context) statute.IPingResult { - ip := statute.CloneIP(h.IP) - if !ip.IsValid() { + if !h.IP.IsValid() { return h.errorResult(fmt.Errorf("no IP specified")) } - addr := net.JoinHostPort(ip.String(), fmt.Sprint(h.Port)) + + addr := netip.AddrPortFrom(h.IP, h.Port) t0 := time.Now() - conn, err := h.opts.QuicDialerFunc(ctx, addr, nil, nil) + conn, err := h.opts.QuicDialerFunc(ctx, addr.String(), nil, nil) if err != nil { return h.errorResult(err) } + res := QuicPingResult{ + AddrPort: addr, + RTT: time.Since(t0), + QUICVersion: conn.ConnectionState().Version, + TLSVersion: conn.ConnectionState().TLS.Version, + Err: nil, + } + defer conn.CloseWithError(quic.ApplicationErrorCode(uint64(http3.ErrCodeNoError)), "") - return &QuicPingResult{int(time.Since(t0).Milliseconds()), nil, ip, uint32(conn.ConnectionState().Version), conn.ConnectionState().TLS.Version} + return &res } func NewQuicPing(ip netip.Addr, host string, port uint16, opts *statute.ScannerOptions) *QuicPing { @@ -71,7 +78,6 @@ func NewQuicPing(ip netip.Addr, host string, port uint16, opts *statute.ScannerO IP: ip, Host: host, Port: port, - opts: *opts, } } diff --git a/ipscanner/internal/ping/tcp.go b/ipscanner/internal/ping/tcp.go index 1215adbff..2ef29150d 100644 --- a/ipscanner/internal/ping/tcp.go +++ b/ipscanner/internal/ping/tcp.go @@ -2,23 +2,22 @@ package ping import ( "context" + "errors" "fmt" - "net" "net/netip" - "strconv" "time" "github.com/bepass-org/wireguard-go/ipscanner/internal/statute" ) type TcpPingResult struct { - Time int - Err error - IP netip.Addr + AddrPort netip.AddrPort + RTT time.Duration + Err error } -func (tp *TcpPingResult) Result() int { - return tp.Time +func (tp *TcpPingResult) Result() statute.IPInfo { + return statute.IPInfo{AddrPort: tp.AddrPort, RTT: tp.RTT, CreatedAt: time.Now()} } func (tp *TcpPingResult) Error() error { @@ -29,13 +28,13 @@ func (tp *TcpPingResult) String() string { if tp.Err != nil { return fmt.Sprintf("%s", tp.Err) } else { - return fmt.Sprintf("%s: time=%d ms", tp.IP.String(), tp.Time) + return fmt.Sprintf("%s: time=%d ms", tp.AddrPort, tp.RTT) } } type TcpPing struct { host string - Port uint16 + port uint16 ip netip.Addr opts statute.ScannerOptions @@ -55,25 +54,26 @@ func (tp *TcpPing) Ping() statute.IPingResult { } func (tp *TcpPing) PingContext(ctx context.Context) statute.IPingResult { - ip := statute.CloneIP(tp.ip) - if !ip.IsValid() { - return &TcpPingResult{0, fmt.Errorf("no IP specified"), netip.Addr{}} + if !tp.ip.IsValid() { + return &TcpPingResult{AddrPort: netip.AddrPort{}, RTT: 0, Err: errors.New("no IP specified")} } + + addr := netip.AddrPortFrom(tp.ip, tp.port) t0 := time.Now() - conn, err := tp.opts.RawDialerFunc(ctx, "tcp", net.JoinHostPort(ip.String(), strconv.FormatUint(uint64(tp.Port), 10))) + conn, err := tp.opts.RawDialerFunc(ctx, "tcp", addr.String()) if err != nil { - return &TcpPingResult{0, err, netip.Addr{}} + return &TcpPingResult{AddrPort: addr, RTT: 0, Err: err} } defer conn.Close() - return &TcpPingResult{int(time.Since(t0).Milliseconds()), nil, ip} + + return &TcpPingResult{AddrPort: addr, RTT: time.Since(t0), Err: nil} } func NewTcpPing(ip netip.Addr, host string, port uint16, opts *statute.ScannerOptions) *TcpPing { return &TcpPing{ host: host, - Port: port, + port: port, ip: ip, - opts: *opts, } } diff --git a/ipscanner/internal/ping/tls.go b/ipscanner/internal/ping/tls.go index 3b1ab4602..b0aad1f4b 100644 --- a/ipscanner/internal/ping/tls.go +++ b/ipscanner/internal/ping/tls.go @@ -3,23 +3,21 @@ package ping import ( "context" "fmt" - "net" "net/netip" - "strconv" "time" "github.com/bepass-org/wireguard-go/ipscanner/internal/statute" ) type TlsPingResult struct { - Time int + AddrPort netip.AddrPort TLSVersion uint16 + RTT time.Duration Err error - IP netip.Addr } -func (t *TlsPingResult) Result() int { - return t.Time +func (t *TlsPingResult) Result() statute.IPInfo { + return statute.IPInfo{AddrPort: t.AddrPort, RTT: t.RTT, CreatedAt: time.Now()} } func (t *TlsPingResult) Error() error { @@ -29,9 +27,9 @@ func (t *TlsPingResult) Error() error { func (t *TlsPingResult) String() string { if t.Err != nil { return fmt.Sprintf("%s", t.Err) - } else { - return fmt.Sprintf("%s: protocol=%s, time=%d ms", t.IP.String(), statute.TlsVersionToString(t.TLSVersion), t.Result()) } + + return fmt.Sprintf("%s: protocol=%s, time=%d ms", t.AddrPort, statute.TlsVersionToString(t.TLSVersion), t.RTT) } type TlsPing struct { @@ -47,19 +45,17 @@ func (t *TlsPing) Ping() statute.IPingResult { } func (t *TlsPing) PingContext(ctx context.Context) statute.IPingResult { - ip := statute.CloneIP(t.IP) - - if !ip.IsValid() { + if !t.IP.IsValid() { return t.errorResult(fmt.Errorf("no IP specified")) } - + addr := netip.AddrPortFrom(t.IP, t.Port) t0 := time.Now() - client, err := t.opts.TLSDialerFunc(ctx, "tcp", net.JoinHostPort(ip.String(), strconv.FormatUint(uint64(t.Port), 10))) + client, err := t.opts.TLSDialerFunc(ctx, "tcp", addr.String()) if err != nil { return t.errorResult(err) } defer client.Close() - return &TlsPingResult{int(time.Since(t0).Milliseconds()), t.opts.TlsVersion, nil, ip} + return &TlsPingResult{AddrPort: addr, TLSVersion: t.opts.TlsVersion, RTT: time.Since(t0), Err: nil} } func NewTlsPing(ip netip.Addr, host string, port uint16, opts *statute.ScannerOptions) *TlsPing { diff --git a/ipscanner/internal/ping/warp.go b/ipscanner/internal/ping/warp.go index e290d9abb..fb165c156 100644 --- a/ipscanner/internal/ping/warp.go +++ b/ipscanner/internal/ping/warp.go @@ -15,20 +15,19 @@ import ( "github.com/bepass-org/wireguard-go/ipscanner/internal/statute" "github.com/bepass-org/wireguard-go/warp" - "github.com/davecgh/go-spew/spew" "github.com/flynn/noise" "golang.org/x/crypto/blake2s" "golang.org/x/crypto/curve25519" ) type WarpPingResult struct { - Time int - Err error - IP netip.Addr + AddrPort netip.AddrPort + RTT time.Duration + Err error } -func (h *WarpPingResult) Result() int { - return h.Time +func (h *WarpPingResult) Result() statute.IPInfo { + return statute.IPInfo{AddrPort: h.AddrPort, RTT: h.RTT, CreatedAt: time.Now()} } func (h *WarpPingResult) Error() error { @@ -39,7 +38,7 @@ func (h *WarpPingResult) String() string { if h.Err != nil { return fmt.Sprintf("%s", h.Err) } else { - return fmt.Sprintf("%s: protocol=%s, time=%d ms", h.IP.String(), "warp", h.Time) + return fmt.Sprintf("%s: protocol=%s, time=%d ms", h.AddrPort, "warp", h.RTT) } } @@ -57,10 +56,9 @@ func (h *WarpPing) Ping() statute.IPingResult { } func (h *WarpPing) PingContext(_ context.Context) statute.IPingResult { - t0 := time.Now() - - err := initiateHandshake( - netip.AddrPortFrom(h.IP, warp.RandomWarpPort()), + addr := netip.AddrPortFrom(h.IP, warp.RandomWarpPort()) + rtt, err := initiateHandshake( + addr, h.PrivateKey, h.PeerPublicKey, h.PresharedKey, @@ -68,7 +66,8 @@ func (h *WarpPing) PingContext(_ context.Context) statute.IPingResult { if err != nil { return h.errorResult(err) } - return &WarpPingResult{int(time.Since(t0).Milliseconds()), nil, h.IP} + + return &WarpPingResult{AddrPort: addr, RTT: rtt, Err: nil} } func (h *WarpPing) errorResult(err error) *WarpPingResult { @@ -126,20 +125,20 @@ func randomInt(min, max int) int { return int(nBig.Int64()) + min } -func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKeyBase64, presharedKeyBase64 string) error { +func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKeyBase64, presharedKeyBase64 string) (time.Duration, error) { staticKeyPair, err := staticKeypair(privateKeyBase64) if err != nil { - return err + return 0, err } peerPublicKey, err := base64.StdEncoding.DecodeString(peerPublicKeyBase64) if err != nil { - return err + return 0, err } presharedKey, err := base64.StdEncoding.DecodeString(presharedKeyBase64) if err != nil { - return err + return 0, err } if presharedKeyBase64 == "" { @@ -148,7 +147,7 @@ func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKe ephemeral, err := ephemeralKeypair() if err != nil { - return err + return 0, err } cs := noise.NewCipherSuite(noise.DH25519, noise.CipherChaChaPoly, noise.HashBLAKE2s) @@ -165,7 +164,7 @@ func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKe Random: rand.Reader, }) if err != nil { - return err + return 0, err } // Prepare handshake initiation packet @@ -179,7 +178,7 @@ func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKe tai64nTimestampBuf = binary.BigEndian.AppendUint32(tai64nTimestampBuf, uint32(now.Nanosecond())) msg, _, _, err := hs.WriteMessage(nil, tai64nTimestampBuf) if err != nil { - return err + return 0, err } initiationPacket := new(bytes.Buffer) @@ -190,11 +189,11 @@ func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKe macKey := blake2s.Sum256(append([]byte("mac1----"), peerPublicKey...)) hasher, err := blake2s.New128(macKey[:]) // using macKey as the key if err != nil { - return err + return 0, err } _, err = hasher.Write(initiationPacket.Bytes()) if err != nil { - return err + return 0, err } initiationPacketMAC := hasher.Sum(nil) @@ -204,7 +203,7 @@ func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKe conn, err := net.Dial("udp", serverAddr.String()) if err != nil { - return err + return 0, err } defer conn.Close() @@ -216,36 +215,40 @@ func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKe randomPacket := make([]byte, packetSize) _, err := rand.Read(randomPacket) if err != nil { - return fmt.Errorf("error generating random packet: %w", err) + return 0, fmt.Errorf("error generating random packet: %w", err) } // Send the random packet _, err = conn.Write(randomPacket) if err != nil { - return fmt.Errorf("error sending random packet: %w", err) + return 0, fmt.Errorf("error sending random packet: %w", err) } // Wait for a random duration between 200 and 500 milliseconds time.Sleep(time.Duration(randomInt(200, 500)) * time.Millisecond) } - // spew.Dump(initiationPacket) _, err = initiationPacket.WriteTo(conn) if err != nil { - return err + return 0, err } + t0 := time.Now() response := make([]byte, 92) conn.SetReadDeadline(time.Now().Add(5 * time.Second)) i, err := conn.Read(response) - fmt.Println("server response, len+packet: ", i, response[12:60]) if err != nil { - return err + return 0, err + } + rtt := time.Since(t0) + + if i < 60 { + return 0, fmt.Errorf("invalid handshake response length %d bytes", i) } // Check the response type if response[0] != 2 { // 2 is the message type for response - return errors.New("invalid response type") + return 0, errors.New("invalid response type") } // Extract sender and receiver index from the response @@ -254,23 +257,20 @@ func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKe // our index(we set it to 28) ourIndex := binary.LittleEndian.Uint32(response[8:12]) if ourIndex != 28 { // Check if the response corresponds to our sender index - return errors.New("invalid sender index in response") + return 0, errors.New("invalid sender index in response") } payload, _, _, err := hs.ReadMessage(nil, response[12:60]) - spew.Dump(payload) if err != nil { - spew.Dump(err) - return err + return 0, err } // Check if the payload is empty (as expected in WireGuard handshake) if len(payload) != 0 { - return errors.New("unexpected payload in response") + return 0, errors.New("unexpected payload in response") } - fmt.Println("Handshake completed successfully") - return nil + return rtt, nil } func NewWarpPing(ip netip.Addr, opts *statute.ScannerOptions) *WarpPing { diff --git a/ipscanner/internal/statute/default.go b/ipscanner/internal/statute/default.go index 2c71910aa..24265a4a4 100644 --- a/ipscanner/internal/statute/default.go +++ b/ipscanner/internal/statute/default.go @@ -168,30 +168,6 @@ func DefaultQuicDialerFunc(ctx context.Context, addr string, _ *tls.Config, _ *q return quic.DialAddrEarly(ctx, addr, defaultTLSConfig(addr), quicConfig) } -// default logger - -type Logger interface { - Debug(s string, v ...interface{}) - Error(s string, v ...interface{}) -} - -type DefaultLogger struct{} - -func (l DefaultLogger) Debug(s string, v ...interface{}) { - fmt.Printf(fmt.Sprintf("%s\r\n", s), v...) -} - -func (l DefaultLogger) Error(s string, v ...interface{}) { - fmt.Printf(fmt.Sprintf("%s\r\n", s), v...) -} - -func DefaultIPQueueChangeCallback(ips []IPInfo) { - fmt.Printf("queue change: %d\r\n", len(ips)) - for _, ip := range ips { - fmt.Printf("IP:%s\tRTT:%d\tTS:%s\r\n", ip.IP.String(), ip.RTT, ip.CreatedAt.String()) - } -} - func DefaultCFRanges() []netip.Prefix { return []netip.Prefix{ netip.MustParsePrefix("103.21.244.0/22"), diff --git a/ipscanner/internal/statute/ping.go b/ipscanner/internal/statute/ping.go index 42702141d..816a26fc6 100644 --- a/ipscanner/internal/statute/ping.go +++ b/ipscanner/internal/statute/ping.go @@ -4,11 +4,10 @@ import ( "context" "crypto/tls" "fmt" - "net/netip" ) type IPingResult interface { - Result() int + Result() IPInfo Error() error fmt.Stringer } @@ -34,15 +33,3 @@ func TlsVersionToString(ver uint16) string { return "unknown" } } - -func IsIPv4(ip netip.Addr) bool { - return ip.Is4() -} - -func IsIPv6(ip netip.Addr) bool { - return ip.Is6() -} - -func CloneIP(ip netip.Addr) netip.Addr { - return ip -} diff --git a/ipscanner/internal/statute/statute.go b/ipscanner/internal/statute/statute.go index 19fe6bea7..4efe9ceeb 100644 --- a/ipscanner/internal/statute/statute.go +++ b/ipscanner/internal/statute/statute.go @@ -3,6 +3,7 @@ package statute import ( "context" "crypto/tls" + "log/slog" "net" "net/http" "net/netip" @@ -28,8 +29,8 @@ var ( ) type IPInfo struct { - IP netip.Addr - RTT int + AddrPort netip.AddrPort + RTT time.Duration CreatedAt time.Time } @@ -38,7 +39,7 @@ type ScannerOptions struct { UseIPv6 bool CidrList []netip.Prefix // CIDR ranges to scan SelectedOps int - Logger Logger + Logger *slog.Logger InsecureSkipVerify bool RawDialerFunc TDialerFunc TLSDialerFunc TDialerFunc @@ -57,7 +58,7 @@ type ScannerOptions struct { Port uint16 IPQueueSize int IPQueueTTL time.Duration - MaxDesirableRTT int + MaxDesirableRTT time.Duration IPQueueChangeCallback TIPQueueChangeCallback ConnectionTimeout time.Duration HandshakeTimeout time.Duration diff --git a/ipscanner/scanner.go b/ipscanner/scanner.go index 3da7343c3..04700fde4 100644 --- a/ipscanner/scanner.go +++ b/ipscanner/scanner.go @@ -1,7 +1,9 @@ package ipscanner import ( + "context" "crypto/tls" + "log/slog" "net/netip" "time" @@ -11,44 +13,42 @@ import ( type IPScanner struct { options statute.ScannerOptions - logger statute.Logger + log *slog.Logger engine *engine.Engine - // onChange func([]netip.Addr) } func NewScanner(options ...Option) *IPScanner { p := &IPScanner{ options: statute.ScannerOptions{ - UseIPv4: true, - UseIPv6: true, - CidrList: statute.DefaultCFRanges(), - SelectedOps: 0, - Logger: statute.DefaultLogger{}, - InsecureSkipVerify: true, - RawDialerFunc: statute.DefaultDialerFunc, - TLSDialerFunc: statute.DefaultTLSDialerFunc, - QuicDialerFunc: statute.DefaultQuicDialerFunc, - HttpClientFunc: statute.DefaultHTTPClientFunc, - UseHTTP3: false, - UseHTTP2: false, - DisableCompression: false, - HTTPPath: "/", - Referrer: "", - UserAgent: "Chrome/80.0.3987.149", - Hostname: "www.cloudflare.com", - WarpPresharedKey: "", - WarpPeerPublicKey: "", - WarpPrivateKey: "", - Port: 443, - IPQueueSize: 8, - MaxDesirableRTT: 400, - IPQueueTTL: 30 * time.Second, - IPQueueChangeCallback: statute.DefaultIPQueueChangeCallback, - ConnectionTimeout: 1 * time.Second, - HandshakeTimeout: 1 * time.Second, - TlsVersion: tls.VersionTLS13, + UseIPv4: true, + UseIPv6: true, + CidrList: statute.DefaultCFRanges(), + SelectedOps: 0, + Logger: slog.Default(), + InsecureSkipVerify: true, + RawDialerFunc: statute.DefaultDialerFunc, + TLSDialerFunc: statute.DefaultTLSDialerFunc, + QuicDialerFunc: statute.DefaultQuicDialerFunc, + HttpClientFunc: statute.DefaultHTTPClientFunc, + UseHTTP3: false, + UseHTTP2: false, + DisableCompression: false, + HTTPPath: "/", + Referrer: "", + UserAgent: "Chrome/80.0.3987.149", + Hostname: "www.cloudflare.com", + WarpPresharedKey: "", + WarpPeerPublicKey: "", + WarpPrivateKey: "", + Port: 443, + IPQueueSize: 8, + MaxDesirableRTT: 400 * time.Millisecond, + IPQueueTTL: 30 * time.Second, + ConnectionTimeout: 1 * time.Second, + HandshakeTimeout: 1 * time.Second, + TlsVersion: tls.VersionTLS13, }, - logger: statute.DefaultLogger{}, + log: slog.Default(), } for _, option := range options { @@ -132,8 +132,9 @@ func WithUserAgent(userAgent string) Option { } } -func WithLogger(logger statute.Logger) Option { +func WithLogger(logger *slog.Logger) Option { return func(i *IPScanner) { + i.log = logger i.options.Logger = logger } } @@ -198,7 +199,7 @@ func WithIPQueueSize(size int) Option { } } -func WithMaxDesirableRTT(threshold int) Option { +func WithMaxDesirableRTT(threshold time.Duration) Option { return func(i *IPScanner) { i.options.MaxDesirableRTT = threshold } @@ -210,12 +211,6 @@ func WithIPQueueTTL(ttl time.Duration) Option { } } -func WithIPQueueChangeCallback(callback statute.TIPQueueChangeCallback) Option { - return func(i *IPScanner) { - i.options.IPQueueChangeCallback = callback - } -} - func WithConnectionTimeout(timeout time.Duration) Option { return func(i *IPScanner) { i.options.ConnectionTimeout = timeout @@ -252,28 +247,20 @@ func WithWarpPreSharedKey(presharedKey string) Option { } } -func (i *IPScanner) SetIPQueueChangeCallback(callback statute.TIPQueueChangeCallback) { - i.options.IPQueueChangeCallback = callback -} - // run engine and in case of new event call onChange callback also if it gets canceled with context // cancel all operations -func (i *IPScanner) Run() { +func (i *IPScanner) Run(ctx context.Context) { statute.FinalOptions = &i.options if !i.options.UseIPv4 && !i.options.UseIPv6 { - i.logger.Error("Fatal: both IPv4 and IPv6 are disabled, nothing to do") + i.log.Error("Fatal: both IPv4 and IPv6 are disabled, nothing to do") return } i.engine = engine.NewScannerEngine(&i.options) - go i.engine.Run() -} - -func (i *IPScanner) Stop() { - i.engine.Cancel() + go i.engine.Run(ctx) } -func (i *IPScanner) GetAvailableIPs() []netip.Addr { +func (i *IPScanner) GetAvailableIPs() []statute.IPInfo { if i.engine != nil { return i.engine.GetAvailableIPs(false) } diff --git a/wiresocks/scanner.go b/wiresocks/scanner.go index cbe58fba8..64f947555 100644 --- a/wiresocks/scanner.go +++ b/wiresocks/scanner.go @@ -25,7 +25,7 @@ func canConnectIPv6(remoteAddr netip.AddrPort) bool { return true } -func RunScan(ctx context.Context, rtt time.Duration) (result []netip.AddrPort, err error) { +func RunScan(ctx context.Context, rtt time.Duration) (result []ipscanner.IPInfo, err error) { cfg, err := ini.Load("./primary/wgcf-profile.ini") if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) @@ -44,34 +44,34 @@ func RunScan(ctx context.Context, rtt time.Duration) (result []netip.AddrPort, e ipscanner.WithWarpPeerPublicKey(publicKey), ipscanner.WithUseIPv6(canConnectIPv6(netip.MustParseAddrPort("[2001:4860:4860::8888]:80"))), ipscanner.WithUseIPv4(true), - ipscanner.WithMaxDesirableRTT(int(rtt.Milliseconds())), + ipscanner.WithMaxDesirableRTT(rtt), ipscanner.WithCidrList(warp.WarpPrefixes()), ) - scanner.Run() - timeoutTimer := time.NewTimer(2 * time.Minute) - defer timeoutTimer.Stop() + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + scanner.Run(ctx) + + t := time.NewTicker(1 * time.Second) + defer t.Stop() for { + ipList := scanner.GetAvailableIPs() + if len(ipList) > 1 { + for i := 0; i < 2; i++ { + result = append(result, ipList[i]) + } + return result, nil + } + select { case <-ctx.Done(): // Context is done - canceled externally - scanner.Stop() return nil, fmt.Errorf("user canceled the operation") - case <-timeoutTimer.C: - // Handle the internal timeout - scanner.Stop() - return nil, fmt.Errorf("scanner maximum time exceeded") - default: - ipList := scanner.GetAvailableIPs() - if len(ipList) > 1 { - scanner.Stop() - for i := 0; i < 2; i++ { - result = append(result, netip.AddrPortFrom(ipList[i], warp.RandomWarpPort())) - } - return result, nil - } - time.Sleep(1 * time.Second) // Prevent the loop from spinning too fast + case <-t.C: + // Prevent the loop from spinning too fast + continue } } }