From c294f14f8911ed2c0037aa68fc0a9b165721f881 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 4 Jul 2021 13:05:12 -0700 Subject: [PATCH 1/4] remove the Resolver --- README.md | 7 +-- client.go | 93 ++++++++++++++++----------------------- examples/resolv/client.go | 9 +--- service_test.go | 25 ++--------- 4 files changed, 44 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index f639e8cd..9733a262 100644 --- a/README.md +++ b/README.md @@ -29,12 +29,6 @@ This package requires **Go 1.7** (context in std lib) or later. ## Browse for services in your local network ```go -// Discover all services on the network (e.g. _workstation._tcp) -resolver, err := zeroconf.NewResolver(nil) -if err != nil { - log.Fatalln("Failed to initialize resolver:", err.Error()) -} - entries := make(chan *zeroconf.ServiceEntry) go func(results <-chan *zeroconf.ServiceEntry) { for entry := range results { @@ -45,6 +39,7 @@ go func(results <-chan *zeroconf.ServiceEntry) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) defer cancel() +// Discover all services on the network (e.g. _workstation._tcp) err = resolver.Browse(ctx, "_workstation._tcp", "local.", entries) if err != nil { log.Fatalln("Failed to browse:", err.Error()) diff --git a/client.go b/client.go index 270394ab..34abd57f 100644 --- a/client.go +++ b/client.go @@ -26,6 +26,13 @@ const ( IPv4AndIPv6 = (IPv4 | IPv6) //< Default option. ) +// Client structure encapsulates both IPv4/IPv6 UDP connections. +type client struct { + ipv4conn *ipv4.PacketConn + ipv6conn *ipv6.PacketConn + ifaces []net.Interface +} + type clientOpts struct { listenOn IPType ifaces []net.Interface @@ -52,80 +59,61 @@ func SelectIfaces(ifaces []net.Interface) ClientOption { } } -// Resolver acts as entry point for service lookups and to browse the DNS-SD. -type Resolver struct { - c *client -} - -// NewResolver creates a new resolver and joins the UDP multicast groups to -// listen for mDNS messages. -func NewResolver(options ...ClientOption) (*Resolver, error) { - // Apply default configuration and load supplied options. - var conf = clientOpts{ - listenOn: IPv4AndIPv6, - } - for _, o := range options { - if o != nil { - o(&conf) - } - } - - c, err := newClient(conf) +// Browse for all services of a given type in a given domain. +func Browse(ctx context.Context, service, domain string, entries chan<- *ServiceEntry, opts ...ClientOption) error { + cl, err := newClient(applyOpts(opts...)) if err != nil { - return nil, err + return err } - return &Resolver{ - c: c, - }, nil -} - -// Browse for all services of a given type in a given domain. -func (r *Resolver) Browse(ctx context.Context, service, domain string, entries chan<- *ServiceEntry) error { params := defaultParams(service) if domain != "" { params.Domain = domain } params.Entries = entries params.isBrowsing = true - ctx, cancel := context.WithCancel(ctx) - go r.c.mainloop(ctx, params) + return cl.run(ctx, params) +} - err := r.c.query(params) +// Lookup a specific service by its name and type in a given domain. +func Lookup(ctx context.Context, instance, service, domain string, entries chan<- *ServiceEntry, opts ...ClientOption) error { + cl, err := newClient(applyOpts(opts...)) if err != nil { - cancel() return err } - // If previous probe was ok, it should be fine now. In case of an error later on, - // the entries' queue is closed. - go func() { - if err := r.c.periodicQuery(ctx, params); err != nil { - cancel() - } - }() - - return nil -} - -// Lookup a specific service by its name and type in a given domain. -func (r *Resolver) Lookup(ctx context.Context, instance, service, domain string, entries chan<- *ServiceEntry) error { params := defaultParams(service) params.Instance = instance if domain != "" { params.Domain = domain } params.Entries = entries + return cl.run(ctx, params) +} + +func applyOpts(options ...ClientOption) clientOpts { + // Apply default configuration and load supplied options. + var conf = clientOpts{ + listenOn: IPv4AndIPv6, + } + for _, o := range options { + if o != nil { + o(&conf) + } + } + return conf +} + +func (c *client) run(ctx context.Context, params *lookupParams) error { ctx, cancel := context.WithCancel(ctx) - go r.c.mainloop(ctx, params) - err := r.c.query(params) - if err != nil { - // cancel mainloop + go c.mainloop(ctx, params) + + if err := c.query(params); err != nil { cancel() return err } // If previous probe was ok, it should be fine now. In case of an error later on, // the entries' queue is closed. go func() { - if err := r.c.periodicQuery(ctx, params); err != nil { + if err := c.periodicQuery(ctx, params); err != nil { cancel() } }() @@ -138,13 +126,6 @@ func defaultParams(service string) *lookupParams { return newLookupParams("", service, "local", false, make(chan *ServiceEntry)) } -// Client structure encapsulates both IPv4/IPv6 UDP connections. -type client struct { - ipv4conn *ipv4.PacketConn - ipv6conn *ipv6.PacketConn - ifaces []net.Interface -} - // Client structure constructor func newClient(opts clientOpts) (*client, error) { ifaces := opts.ifaces diff --git a/examples/resolv/client.go b/examples/resolv/client.go index 83186bc4..45332306 100644 --- a/examples/resolv/client.go +++ b/examples/resolv/client.go @@ -18,12 +18,6 @@ var ( func main() { flag.Parse() - // Discover all services on the network (e.g. _workstation._tcp) - resolver, err := zeroconf.NewResolver(nil) - if err != nil { - log.Fatalln("Failed to initialize resolver:", err.Error()) - } - entries := make(chan *zeroconf.ServiceEntry) go func(results <-chan *zeroconf.ServiceEntry) { for entry := range results { @@ -34,7 +28,8 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(*waitTime)) defer cancel() - err = resolver.Browse(ctx, *service, *domain, entries) + // Discover all services on the network (e.g. _workstation._tcp) + err := zeroconf.Browse(ctx, *service, *domain, entries) if err != nil { log.Fatalln("Failed to browse:", err.Error()) } diff --git a/service_test.go b/service_test.go index 2c5a23ed..f45d83de 100644 --- a/service_test.go +++ b/service_test.go @@ -40,12 +40,8 @@ func TestBasic(t *testing.T) { time.Sleep(time.Second) - resolver, err := NewResolver(nil) - if err != nil { - t.Fatalf("Expected create resolver success, but got %v", err) - } entries := make(chan *ServiceEntry, 100) - if err := resolver.Browse(ctx, mdnsService, mdnsDomain, entries); err != nil { + if err := Browse(ctx, mdnsService, mdnsDomain, entries); err != nil { t.Fatalf("Expected browse success, but got %v", err) } <-ctx.Done() @@ -69,11 +65,6 @@ func TestBasic(t *testing.T) { } func TestNoRegister(t *testing.T) { - resolver, err := NewResolver(nil) - if err != nil { - t.Fatalf("Expected create resolver success, but got %v", err) - } - // before register, mdns resolve shuold not have any entry entries := make(chan *ServiceEntry) go func(results <-chan *ServiceEntry) { @@ -84,7 +75,7 @@ func TestNoRegister(t *testing.T) { }(entries) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if err := resolver.Browse(ctx, mdnsService, mdnsDomain, entries); err != nil { + if err := Browse(ctx, mdnsService, mdnsDomain, entries); err != nil { t.Fatalf("Expected browse success, but got %v", err) } <-ctx.Done() @@ -100,12 +91,8 @@ func TestSubtype(t *testing.T) { time.Sleep(time.Second) - resolver, err := NewResolver(nil) - if err != nil { - t.Fatalf("Expected create resolver success, but got %v", err) - } entries := make(chan *ServiceEntry, 100) - if err := resolver.Browse(ctx, mdnsSubtype, mdnsDomain, entries); err != nil { + if err := Browse(ctx, mdnsSubtype, mdnsDomain, entries); err != nil { t.Fatalf("Expected browse success, but got %v", err) } <-ctx.Done() @@ -136,12 +123,8 @@ func TestSubtype(t *testing.T) { time.Sleep(time.Second) - resolver, err := NewResolver(nil) - if err != nil { - t.Fatalf("Expected create resolver success, but got %v", err) - } entries := make(chan *ServiceEntry, 100) - if err := resolver.Browse(ctx, mdnsService, mdnsDomain, entries); err != nil { + if err := Browse(ctx, mdnsService, mdnsDomain, entries); err != nil { t.Fatalf("Expected browse success, but got %v", err) } <-ctx.Done() From 434efb7910861c03dc840f8cb9398e68de237ef9 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 4 Jul 2021 14:28:19 -0700 Subject: [PATCH 2/4] make Lookup and Resolve blocking calls --- client.go | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/client.go b/client.go index 34abd57f..d6085b93 100644 --- a/client.go +++ b/client.go @@ -104,21 +104,12 @@ func applyOpts(options ...ClientOption) clientOpts { func (c *client) run(ctx context.Context, params *lookupParams) error { ctx, cancel := context.WithCancel(ctx) + defer cancel() go c.mainloop(ctx, params) - if err := c.query(params); err != nil { - cancel() - return err - } // If previous probe was ok, it should be fine now. In case of an error later on, // the entries' queue is closed. - go func() { - if err := c.periodicQuery(ctx, params); err != nil { - cancel() - } - }() - - return nil + return c.periodicQuery(ctx, params) } // defaultParams returns a default set of QueryParams. @@ -362,6 +353,10 @@ func (c *client) periodicQuery(ctx context.Context, params *lookupParams) error } }() for { + // Do periodic query. + if err := c.query(params); err != nil { + return err + } // Backoff and cancel logic. wait := bo.NextBackOff() if wait == backoff.Stop { @@ -372,6 +367,7 @@ func (c *client) periodicQuery(ctx context.Context, params *lookupParams) error } else { timer.Reset(wait) } + select { case <-timer.C: // Wait for next iteration. @@ -380,12 +376,11 @@ func (c *client) periodicQuery(ctx context.Context, params *lookupParams) error // Done here. Received a matching mDNS entry. return nil case <-ctx.Done(): + if params.isBrowsing { + return nil + } return ctx.Err() } - // Do periodic query. - if err := c.query(params); err != nil { - return err - } } } @@ -409,11 +404,7 @@ func (c *client) query(params *lookupParams) error { m.SetQuestion(serviceName, dns.TypePTR) } m.RecursionDesired = false - if err := c.sendQuery(m); err != nil { - return err - } - - return nil + return c.sendQuery(m) } // Pack the dns.Msg and write to available connections (multicast) From 5bca855e128012e78209ec6f214841ee63f9deda Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 4 Jul 2021 14:35:09 -0700 Subject: [PATCH 3/4] wait for the mainloop to complete before return Lookup and Browse --- client.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index d6085b93..a6787cd3 100644 --- a/client.go +++ b/client.go @@ -104,12 +104,18 @@ func applyOpts(options ...ClientOption) clientOpts { func (c *client) run(ctx context.Context, params *lookupParams) error { ctx, cancel := context.WithCancel(ctx) - defer cancel() - go c.mainloop(ctx, params) + done := make(chan struct{}) + go func() { + defer close(done) + c.mainloop(ctx, params) + }() // If previous probe was ok, it should be fine now. In case of an error later on, // the entries' queue is closed. - return c.periodicQuery(ctx, params) + err := c.periodicQuery(ctx, params) + cancel() + <-done + return err } // defaultParams returns a default set of QueryParams. From 72b80e62aafa5b7ec943265972c8c11017da366a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 5 Jun 2021 21:25:01 -0700 Subject: [PATCH 4/4] delete entries from the cache when the TTL expires --- README.md | 2 +- client.go | 26 ++++++++++++++++++++------ server.go | 4 +++- service.go | 13 +++++++------ service_test.go | 30 ++++++++++++++++++++++++++++++ 5 files changed, 61 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 9733a262..28d157d1 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ See what needs to be done and submit a pull request :) * [x] Browse / Lookup / Register services * [x] Multiple IPv6 / IPv4 addresses support * [x] Send multiple probes (exp. back-off) if no service answers (*) -* [ ] Timestamp entries for TTL checks +* [x] Timestamp entries for TTL checks * [ ] Compare new multicasts with already received services _Notes:_ diff --git a/client.go b/client.go index a6787cd3..8b527bb8 100644 --- a/client.go +++ b/client.go @@ -155,6 +155,8 @@ func newClient(opts clientOpts) (*client, error) { }, nil } +var cleanupFreq = 10 * time.Second + // Start listeners and waits for the shutdown signal from exit channel func (c *client) mainloop(ctx context.Context, params *lookupParams) { // start listening for responses @@ -167,16 +169,28 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) { } // Iterate through channels from listeners goroutines - var entries, sentEntries map[string]*ServiceEntry - sentEntries = make(map[string]*ServiceEntry) + var entries map[string]*ServiceEntry + sentEntries := make(map[string]*ServiceEntry) + + ticker := time.NewTicker(cleanupFreq) + defer ticker.Stop() for { + var now time.Time select { case <-ctx.Done(): // Context expired. Notify subscriber that we are done here. params.done() c.shutdown() return + case t := <-ticker.C: + for k, e := range sentEntries { + if t.After(e.Expiry) { + delete(sentEntries, k) + } + } + continue case msg := <-msgCh: + now = time.Now() entries = make(map[string]*ServiceEntry) sections := append(msg.Answer, msg.Ns...) sections = append(sections, msg.Extra...) @@ -196,7 +210,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) { params.Service, params.Domain) } - entries[rr.Ptr].TTL = rr.Hdr.Ttl + entries[rr.Ptr].Expiry = now.Add(time.Duration(rr.Hdr.Ttl) * time.Second) case *dns.SRV: if params.ServiceInstanceName() != "" && params.ServiceInstanceName() != rr.Hdr.Name { continue @@ -211,7 +225,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) { } entries[rr.Hdr.Name].HostName = rr.Target entries[rr.Hdr.Name].Port = int(rr.Port) - entries[rr.Hdr.Name].TTL = rr.Hdr.Ttl + entries[rr.Hdr.Name].Expiry = now.Add(time.Duration(rr.Hdr.Ttl) * time.Second) case *dns.TXT: if params.ServiceInstanceName() != "" && params.ServiceInstanceName() != rr.Hdr.Name { continue @@ -225,7 +239,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) { params.Domain) } entries[rr.Hdr.Name].Text = rr.Txt - entries[rr.Hdr.Name].TTL = rr.Hdr.Ttl + entries[rr.Hdr.Name].Expiry = now.Add(time.Duration(rr.Hdr.Ttl) * time.Second) } } // Associate IPs in a second round as other fields should be filled by now. @@ -249,7 +263,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) { if len(entries) > 0 { for k, e := range entries { - if e.TTL == 0 { + if !e.Expiry.After(now) { delete(entries, k) delete(sentEntries, k) continue diff --git a/server.go b/server.go index 70fd11ac..a78c4d65 100644 --- a/server.go +++ b/server.go @@ -21,6 +21,8 @@ const ( multicastRepetitions = 2 ) +var defaultTTL uint32 = 3200 + // Register a service by given arguments. This call will take the system's hostname // and lookup IP by that hostname. func Register(instance, service, domain string, port int, text []string, ifaces []net.Interface) (*Server, error) { @@ -173,7 +175,7 @@ func newServer(ifaces []net.Interface) (*Server, error) { ipv4conn: ipv4conn, ipv6conn: ipv6conn, ifaces: ifaces, - ttl: 3200, + ttl: defaultTTL, shouldShutdown: make(chan struct{}), } diff --git a/service.go b/service.go index 6253c543..43bbf8aa 100644 --- a/service.go +++ b/service.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "sync" + "time" ) // ServiceRecord contains the basic description of a service, which contains instance name, service type & domain @@ -103,12 +104,12 @@ func (l *lookupParams) disableProbing() { // used to answer multicast queries. type ServiceEntry struct { ServiceRecord - HostName string `json:"hostname"` // Host machine DNS name - Port int `json:"port"` // Service Port - Text []string `json:"text"` // Service info served as a TXT record - TTL uint32 `json:"ttl"` // TTL of the service record - AddrIPv4 []net.IP `json:"-"` // Host machine IPv4 address - AddrIPv6 []net.IP `json:"-"` // Host machine IPv6 address + HostName string `json:"hostname"` // Host machine DNS name + Port int `json:"port"` // Service Port + Text []string `json:"text"` // Service info served as a TXT record + Expiry time.Time `json:"expiry"` // Expiry of the service entry, will be converted to a TTL value + AddrIPv4 []net.IP `json:"-"` // Host machine IPv4 address + AddrIPv6 []net.IP `json:"-"` // Host machine IPv6 address } // NewServiceEntry constructs a ServiceEntry. diff --git a/service_test.go b/service_test.go index f45d83de..a4d82c45 100644 --- a/service_test.go +++ b/service_test.go @@ -146,4 +146,34 @@ func TestSubtype(t *testing.T) { t.Fatalf("Expected port is %d, but got %d", mdnsPort, result.Port) } }) + + t.Run("ttl", func(t *testing.T) { + origTTL := defaultTTL + origCleanupFreq := cleanupFreq + defer func() { + defaultTTL = origTTL + cleanupFreq = origCleanupFreq + }() + defaultTTL = 2 // 2 seconds + cleanupFreq = 100 * time.Millisecond + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go startMDNS(ctx, mdnsPort, mdnsName, mdnsSubtype, mdnsDomain) + + entries := make(chan *ServiceEntry, 100) + if err := Browse(ctx, mdnsService, mdnsDomain, entries); err != nil { + t.Fatalf("Expected browse success, but got %v", err) + } + + <-ctx.Done() + if len(entries) != 2 { + t.Fatalf("Expected to have received 2 entries, but got %d", len(entries)) + } + res1 := <-entries + res2 := <-entries + if res1.ServiceInstanceName() != res2.ServiceInstanceName() { + t.Fatalf("expected the two entries to be identical") + } + }) }