Skip to content
This repository has been archived by the owner on Aug 12, 2021. It is now read-only.

delete entries from the cache when the TTL expires #6

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ This package requires **Go 1.7** (context in std lib) or later.
## Browse for services in your local network

```go
// Discover all services on the network (e.g. _workstation._tcp)
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
log.Fatalln("Failed to initialize resolver:", err.Error())
}

entries := make(chan *zeroconf.ServiceEntry)
go func(results <-chan *zeroconf.ServiceEntry) {
for entry := range results {
Expand All @@ -45,6 +39,7 @@ go func(results <-chan *zeroconf.ServiceEntry) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
// Discover all services on the network (e.g. _workstation._tcp)
err = resolver.Browse(ctx, "_workstation._tcp", "local.", entries)
if err != nil {
log.Fatalln("Failed to browse:", err.Error())
Expand Down Expand Up @@ -94,7 +89,7 @@ See what needs to be done and submit a pull request :)
* [x] Browse / Lookup / Register services
* [x] Multiple IPv6 / IPv4 addresses support
* [x] Send multiple probes (exp. back-off) if no service answers (*)
* [ ] Timestamp entries for TTL checks
* [x] Timestamp entries for TTL checks
* [ ] Compare new multicasts with already received services

_Notes:_
Expand Down
150 changes: 71 additions & 79 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ const (
IPv4AndIPv6 = (IPv4 | IPv6) //< Default option.
)

// Client structure encapsulates both IPv4/IPv6 UDP connections.
type client struct {
ipv4conn *ipv4.PacketConn
ipv6conn *ipv6.PacketConn
ifaces []net.Interface
}

type clientOpts struct {
listenOn IPType
ifaces []net.Interface
Expand All @@ -52,99 +59,70 @@ func SelectIfaces(ifaces []net.Interface) ClientOption {
}
}

// Resolver acts as entry point for service lookups and to browse the DNS-SD.
type Resolver struct {
c *client
}

// NewResolver creates a new resolver and joins the UDP multicast groups to
// listen for mDNS messages.
func NewResolver(options ...ClientOption) (*Resolver, error) {
// Apply default configuration and load supplied options.
var conf = clientOpts{
listenOn: IPv4AndIPv6,
}
for _, o := range options {
if o != nil {
o(&conf)
}
}

c, err := newClient(conf)
// Browse for all services of a given type in a given domain.
func Browse(ctx context.Context, service, domain string, entries chan<- *ServiceEntry, opts ...ClientOption) error {
cl, err := newClient(applyOpts(opts...))
if err != nil {
return nil, err
return err
}
return &Resolver{
c: c,
}, nil
}

// Browse for all services of a given type in a given domain.
func (r *Resolver) Browse(ctx context.Context, service, domain string, entries chan<- *ServiceEntry) error {
params := defaultParams(service)
if domain != "" {
params.Domain = domain
}
params.Entries = entries
params.isBrowsing = true
ctx, cancel := context.WithCancel(ctx)
go r.c.mainloop(ctx, params)
return cl.run(ctx, params)
}

err := r.c.query(params)
// Lookup a specific service by its name and type in a given domain.
func Lookup(ctx context.Context, instance, service, domain string, entries chan<- *ServiceEntry, opts ...ClientOption) error {
cl, err := newClient(applyOpts(opts...))
if err != nil {
cancel()
return err
}
// If previous probe was ok, it should be fine now. In case of an error later on,
// the entries' queue is closed.
go func() {
if err := r.c.periodicQuery(ctx, params); err != nil {
cancel()
}
}()

return nil
}

// Lookup a specific service by its name and type in a given domain.
func (r *Resolver) Lookup(ctx context.Context, instance, service, domain string, entries chan<- *ServiceEntry) error {
params := defaultParams(service)
params.Instance = instance
if domain != "" {
params.Domain = domain
}
params.Entries = entries
ctx, cancel := context.WithCancel(ctx)
go r.c.mainloop(ctx, params)
err := r.c.query(params)
if err != nil {
// cancel mainloop
cancel()
return err
return cl.run(ctx, params)
}

func applyOpts(options ...ClientOption) clientOpts {
// Apply default configuration and load supplied options.
var conf = clientOpts{
listenOn: IPv4AndIPv6,
}
// If previous probe was ok, it should be fine now. In case of an error later on,
// the entries' queue is closed.
go func() {
if err := r.c.periodicQuery(ctx, params); err != nil {
cancel()
for _, o := range options {
if o != nil {
o(&conf)
}
}
return conf
}

func (c *client) run(ctx context.Context, params *lookupParams) error {
ctx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
go func() {
defer close(done)
c.mainloop(ctx, params)
}()

return nil
// If previous probe was ok, it should be fine now. In case of an error later on,
// the entries' queue is closed.
err := c.periodicQuery(ctx, params)
cancel()
<-done
return err
}

// defaultParams returns a default set of QueryParams.
func defaultParams(service string) *lookupParams {
return newLookupParams("", service, "local", false, make(chan *ServiceEntry))
}

// Client structure encapsulates both IPv4/IPv6 UDP connections.
type client struct {
ipv4conn *ipv4.PacketConn
ipv6conn *ipv6.PacketConn
ifaces []net.Interface
}

// Client structure constructor
func newClient(opts clientOpts) (*client, error) {
ifaces := opts.ifaces
Expand Down Expand Up @@ -177,6 +155,8 @@ func newClient(opts clientOpts) (*client, error) {
}, nil
}

var cleanupFreq = 10 * time.Second
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can probably be on the order of minutes. But not really an issue any which way.


// Start listeners and waits for the shutdown signal from exit channel
func (c *client) mainloop(ctx context.Context, params *lookupParams) {
// start listening for responses
Expand All @@ -189,16 +169,28 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
}

// Iterate through channels from listeners goroutines
var entries, sentEntries map[string]*ServiceEntry
sentEntries = make(map[string]*ServiceEntry)
var entries map[string]*ServiceEntry
sentEntries := make(map[string]*ServiceEntry)

ticker := time.NewTicker(cleanupFreq)
defer ticker.Stop()
for {
var now time.Time
select {
case <-ctx.Done():
// Context expired. Notify subscriber that we are done here.
params.done()
c.shutdown()
return
case t := <-ticker.C:
for k, e := range sentEntries {
if t.After(e.Expiry) {
delete(sentEntries, k)
}
}
continue
case msg := <-msgCh:
now = time.Now()
entries = make(map[string]*ServiceEntry)
sections := append(msg.Answer, msg.Ns...)
sections = append(sections, msg.Extra...)
Expand All @@ -218,7 +210,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
params.Service,
params.Domain)
}
entries[rr.Ptr].TTL = rr.Hdr.Ttl
entries[rr.Ptr].Expiry = now.Add(time.Duration(rr.Hdr.Ttl) * time.Second)
case *dns.SRV:
if params.ServiceInstanceName() != "" && params.ServiceInstanceName() != rr.Hdr.Name {
continue
Expand All @@ -233,7 +225,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
}
entries[rr.Hdr.Name].HostName = rr.Target
entries[rr.Hdr.Name].Port = int(rr.Port)
entries[rr.Hdr.Name].TTL = rr.Hdr.Ttl
entries[rr.Hdr.Name].Expiry = now.Add(time.Duration(rr.Hdr.Ttl) * time.Second)
case *dns.TXT:
if params.ServiceInstanceName() != "" && params.ServiceInstanceName() != rr.Hdr.Name {
continue
Expand All @@ -247,7 +239,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
params.Domain)
}
entries[rr.Hdr.Name].Text = rr.Txt
entries[rr.Hdr.Name].TTL = rr.Hdr.Ttl
entries[rr.Hdr.Name].Expiry = now.Add(time.Duration(rr.Hdr.Ttl) * time.Second)
}
}
// Associate IPs in a second round as other fields should be filled by now.
Expand All @@ -271,7 +263,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {

if len(entries) > 0 {
for k, e := range entries {
if e.TTL == 0 {
if !e.Expiry.After(now) {
delete(entries, k)
delete(sentEntries, k)
continue
Expand Down Expand Up @@ -381,6 +373,10 @@ func (c *client) periodicQuery(ctx context.Context, params *lookupParams) error
}
}()
for {
// Do periodic query.
if err := c.query(params); err != nil {
return err
}
// Backoff and cancel logic.
wait := bo.NextBackOff()
if wait == backoff.Stop {
Expand All @@ -391,6 +387,7 @@ func (c *client) periodicQuery(ctx context.Context, params *lookupParams) error
} else {
timer.Reset(wait)
}

select {
case <-timer.C:
// Wait for next iteration.
Expand All @@ -399,12 +396,11 @@ func (c *client) periodicQuery(ctx context.Context, params *lookupParams) error
// Done here. Received a matching mDNS entry.
return nil
case <-ctx.Done():
if params.isBrowsing {
return nil
}
return ctx.Err()
}
// Do periodic query.
if err := c.query(params); err != nil {
return err
}
}
}

Expand All @@ -428,11 +424,7 @@ func (c *client) query(params *lookupParams) error {
m.SetQuestion(serviceName, dns.TypePTR)
}
m.RecursionDesired = false
if err := c.sendQuery(m); err != nil {
return err
}

return nil
return c.sendQuery(m)
}

// Pack the dns.Msg and write to available connections (multicast)
Expand Down
9 changes: 2 additions & 7 deletions examples/resolv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ var (
func main() {
flag.Parse()

// Discover all services on the network (e.g. _workstation._tcp)
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
log.Fatalln("Failed to initialize resolver:", err.Error())
}

entries := make(chan *zeroconf.ServiceEntry)
go func(results <-chan *zeroconf.ServiceEntry) {
for entry := range results {
Expand All @@ -34,7 +28,8 @@ func main() {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(*waitTime))
defer cancel()
err = resolver.Browse(ctx, *service, *domain, entries)
// Discover all services on the network (e.g. _workstation._tcp)
err := zeroconf.Browse(ctx, *service, *domain, entries)
if err != nil {
log.Fatalln("Failed to browse:", err.Error())
}
Expand Down
4 changes: 3 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
multicastRepetitions = 2
)

var defaultTTL uint32 = 3200

// Register a service by given arguments. This call will take the system's hostname
// and lookup IP by that hostname.
func Register(instance, service, domain string, port int, text []string, ifaces []net.Interface) (*Server, error) {
Expand Down Expand Up @@ -173,7 +175,7 @@ func newServer(ifaces []net.Interface) (*Server, error) {
ipv4conn: ipv4conn,
ipv6conn: ipv6conn,
ifaces: ifaces,
ttl: 3200,
ttl: defaultTTL,
shouldShutdown: make(chan struct{}),
}

Expand Down
13 changes: 7 additions & 6 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net"
"sync"
"time"
)

// ServiceRecord contains the basic description of a service, which contains instance name, service type & domain
Expand Down Expand Up @@ -103,12 +104,12 @@ func (l *lookupParams) disableProbing() {
// used to answer multicast queries.
type ServiceEntry struct {
ServiceRecord
HostName string `json:"hostname"` // Host machine DNS name
Port int `json:"port"` // Service Port
Text []string `json:"text"` // Service info served as a TXT record
TTL uint32 `json:"ttl"` // TTL of the service record
AddrIPv4 []net.IP `json:"-"` // Host machine IPv4 address
AddrIPv6 []net.IP `json:"-"` // Host machine IPv6 address
HostName string `json:"hostname"` // Host machine DNS name
Port int `json:"port"` // Service Port
Text []string `json:"text"` // Service info served as a TXT record
Expiry time.Time `json:"expiry"` // Expiry of the service entry, will be converted to a TTL value
AddrIPv4 []net.IP `json:"-"` // Host machine IPv4 address
AddrIPv6 []net.IP `json:"-"` // Host machine IPv6 address
}

// NewServiceEntry constructs a ServiceEntry.
Expand Down
Loading