-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcoordination.go
156 lines (138 loc) · 4.3 KB
/
coordination.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package tableroll
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"time"
"github.com/euank/filelock"
"github.com/inconshreveable/log15"
"github.com/pkg/errors"
"k8s.io/utils/clock"
)
// errNoOwner indicates that either no process currently is marked as
// controlling the upgradeable file descriptors (e.g. initial startup case), or
// a process is supposed to own them but is dead (e.g. it crashed).
var errNoOwner = errors.New("no owner process exists")
// coordination is used to coordinate between N processes, one of which is the
// current owner.
// It must provide means of getting the owner, updating the owner, and.
// ensuring it has unique ownership of that information for the duration
// between a read and update.
// It is implemented in this case with unix locks on a file.
type coordinator struct {
lock *filelock.FileLock
dir string
id string
l log15.Logger
// mocks
clock clock.Clock
}
func newCoordinator(clock clock.Clock, l log15.Logger, dir string, id string) *coordinator {
l = l.New("dir", dir)
coord := &coordinator{dir: dir, l: l, clock: clock, id: id}
return coord
}
func (c *coordinator) Listen(ctx context.Context) (*net.UnixListener, error) {
listenpath := upgradeSockPath(c.dir, c.id)
l, err := (&net.ListenConfig{}).Listen(ctx, "unix", listenpath)
if err != nil {
return nil, err
}
return l.(*net.UnixListener), nil
}
func touchFile(path string) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDONLY, 0o755)
f.Close()
return err
}
// Lock takes an exclusive lock on the given coordination directory. If the
// directory is already locked, the function will block until the lock can be
// acquired, or until the passed context is cancelled.
func (c *coordinator) Lock(ctx context.Context) error {
idPath := c.idFile()
if err := touchFile(idPath); err != nil {
return err
}
c.l.Info("taking lock on coordination dir")
flock, err := filelock.NewLock(idPath, filelock.RegFile)
if err != nil {
return err
}
for ctx.Err() == nil {
err := flock.TryExclusiveLock()
if err == nil {
// lock get
break
}
if err != filelock.ErrLocked {
return errors.Wrap(err, "error trying to lock coordination directory")
}
// lock busy, wait and try again
c.clock.Sleep(100 * time.Millisecond)
}
c.l.Info("took lock on coordination dir")
c.lock = flock
return ctx.Err()
}
func (c *coordinator) idFile() string {
// named 'pid' for historical reasons, originally the opaque id was always a pid
return filepath.Join(c.dir, "pid")
}
// BecomeOwner marks this coordinator as the owner of the coordination directory.
// It should only be called while the lock is held.
func (c *coordinator) BecomeOwner() error {
c.l.Info("writing id to become owner", "id", c.id)
return os.WriteFile(c.idFile(), []byte(c.id), 0o755)
}
// Unlock unlocks the coordination id file
func (c *coordinator) Unlock() error {
if c.lock == nil {
c.l.Info("not unlocking coordination dir; not locked")
return nil
}
c.l.Info("unlocking coordination dir")
return c.lock.Unlock()
}
// GetOwnerID returns the current 'owner' for this coordination directory.
// It will return 'errNoOwner' if there isn't currently an owner.
func (c *coordinator) GetOwnerID() (string, error) {
c.l.Info("discovering current owner")
data, err := os.ReadFile(c.idFile())
if err != nil {
return "", err
}
if len(data) == 0 {
// empty file, that means no owner
return "", errNoOwner
}
c.l.Info("found owner", "owner", string(data))
return string(data), nil
}
func (c *coordinator) ConnectOwner(ctx context.Context) (*net.UnixConn, error) {
oid, err := c.GetOwnerID()
if err != nil {
return nil, err
}
c.l.Info("connecting to owner", "owner", oid)
sockPath := upgradeSockPath(c.dir, oid)
conn, err := (&net.Dialer{}).DialContext(ctx, "unix", sockPath)
if err != nil {
if isContextDialErr(err) {
return nil, err
}
c.l.Warn("found an owner ID, but it wasn't listening; possibly a stale process that crashed?", "oid", oid, "dialErr", err)
return nil, errNoOwner
}
return conn.(*net.UnixConn), nil
}
func isContextDialErr(err error) bool {
if opErr, ok := err.(*net.OpError); ok {
err = opErr.Err
}
return err == context.Canceled || err == context.DeadlineExceeded
}
func upgradeSockPath(coordinationDir string, oid string) string {
return filepath.Join(coordinationDir, fmt.Sprintf("%s.sock", oid))
}