Skip to content

Commit

Permalink
Merge branch 'release/v1.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Yang committed Jan 2, 2019
2 parents a221b43 + 6eed72e commit 2f65dea
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 1 deletion.
37 changes: 36 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,41 @@
# distributed-lock
# Distributed Lock

## Preface
After considered many of implementations of distributed lock with Golang, I prefered like to write it by myself.

This is an ETCD-based implementation and used v3 protocol for performance enhancement. Simple & Easy, enjoy it!

## How to use it?

#### 1. Start an ETCD instance using Docker container
```shell
docker run -d --name etcd-server \
--publish 2379:2379 \
--publish 2380:2380 \
--env ALLOW_NONE_AUTHENTICATION=yes \
--env ETCD_ADVERTISE_CLIENT_URLS=http://etcd-server:2379 \
bitnami/etcd:latest
```

#### 2. Code Sample
```go
dl, err := dlock.NewDistributedLock(DistributedLockOptions{
Key: "/KEY-SPEC-100",
ETCDAddress: "http://127.0.0.1:2379", //accessible ETCD instance.
TTL: 5,
//hook function to receive an event while acquiring the lock.
HoldingLockFunc: func() {
fmt.Println("You are master now...!")

},
//hook function to receive an event while losing the lock.
LosingLockFunc: func() {
fmt.Println("You've lost master role, waiting...")
},
})
if err != nil {
panic(err)
}

dl.TryGetLock()
```
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/g0194776/distributed-lock

require (
github.com/coreos/etcd v3.3.10+incompatible
github.com/gogo/protobuf v1.2.0 // indirect
github.com/pkg/errors v0.8.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/sirupsen/logrus v1.2.0
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 // indirect
google.golang.org/grpc v1.17.0 // indirect
)
44 changes: 44 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/etcd v3.3.10+incompatible h1:KjVWqrZ5U0wa3CxY2AxlH6/UcB+PK2td1DcsYhA+HRs=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
139 changes: 139 additions & 0 deletions lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package dlock

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/pkg/errors"
"github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
"os"
"sync/atomic"
"time"
)

type distributedLock struct {
options *DistributedLockOptions
stub *DistributedLockStub
}

func (dl *distributedLock) TryGetLock() error {
err := dl.doInitialize()
if err != nil {
return err
}
rsp, err := dl.getKeyInformation()
if err != nil {
return errors.WithStack(fmt.Errorf("Failed to execute ETCD transaction: %s", err.Error()))
}
if dl.isLockMaster(rsp) {
log.Info("You are holding the lock!")
if atomic.CompareAndSwapInt32(dl.stub.isMaster, 0, 1) {
go dl.options.HoldingLockFunc()
}
return nil
} else {
log.Info("You are not holding the lock, waiting...")
}
return nil
}

func (dl *distributedLock) doInitialize() error {
var err error
if dl.options.etcdClient == nil {
dl.options.etcdClient, err = clientv3.New(clientv3.Config{
Endpoints: []string{dl.options.ETCDAddress},
DialTimeout: 5 * time.Second,
})
}
if err != nil {
return errors.WithStack(fmt.Errorf("Failed to initialize ETCD client: %s", err.Error()))
}
if dl.stub == nil {
var x int32
id := uuid.Must(uuid.NewV4())
dl.stub = &DistributedLockStub{isMaster: &x, Owner: id.String(), msgChan: make(chan string)}
dl.stub.lease, err = dl.options.etcdClient.Grant(context.TODO(), int64(dl.options.TTL))
if err != nil {
return errors.WithStack(fmt.Errorf("Failed to grant lease: %s", err.Error()))
}
//keep lease alive forever until the process closed.
dl.options.etcdClient.KeepAlive(context.TODO(), dl.stub.lease.ID)
//asynchronously run new go-routines for monitoring changes.
go dl.monitorLock()
go dl.doSyncState()
}
return nil
}

func (dl *distributedLock) getKeyInformation() (*clientv3.TxnResponse, error) {
cmp := clientv3.Compare(clientv3.CreateRevision(dl.options.Key), "=", 0)
put := clientv3.OpPut(dl.options.Key, dl.stub.Owner, clientv3.WithLease(dl.stub.lease.ID))
get := clientv3.OpGet(dl.options.Key)
getOwner := clientv3.OpGet(dl.options.Key /*"/master-role-spec"*/, clientv3.WithFirstCreate()...)
return dl.options.etcdClient.Txn(dl.options.etcdClient.Ctx()).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
}

func (dl *distributedLock) isLockMaster(rsp *clientv3.TxnResponse) bool {
if rsp.Succeeded {
return true
}
v := string(rsp.Responses[0].GetResponseRange().Kvs[0].Value)
revision := rsp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
ownerKey := rsp.Responses[1].GetResponseRange().Kvs
host, _ := os.Hostname()
if (len(ownerKey) == 0 || ownerKey[0].CreateRevision == revision) && v == host {
return true
}
return false
}

func (dl *distributedLock) monitorLock() {
watcher := clientv3.NewWatcher(dl.options.etcdClient)
defer watcher.Close()
wc := watcher.Watch(context.Background(), dl.options.Key)
for {
wr, isOK := <-wc
if !isOK {
break
}
if wr.Events != nil && len(wr.Events) > 0 {
for i := 0; i < len(wr.Events); i++ {
if wr.Events[i].Type == clientv3.EventTypeDelete {
dl.stub.msgChan <- "deleted"
}
}
}
}
}

func (dl *distributedLock) doSyncState() {
var msg string
var isOK bool
for {
msg, isOK = <-dl.stub.msgChan
if !isOK {
break
}
//ignored other events except DELETED.
if msg == "deleted" {
if atomic.CompareAndSwapInt32(dl.stub.isMaster, 1, 0) {
go dl.options.LosingLockFunc()
}
//repeatedly get the lock.
dl.TryGetLock()
}
}
}

func (dl *distributedLock) Close() {
defer func() {
recover()
}()
if dl.stub != nil {
if dl.stub.msgChan != nil {
close(dl.stub.msgChan)
}
dl.stub = nil
}
}
71 changes: 71 additions & 0 deletions lock_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package dlock

import (
"errors"
"sync"

"github.com/coreos/etcd/clientv3"
)

var (
lockObj *sync.Mutex
lockMap = make(map[string]DistributedLocker)
)

type DistributedLocker interface {
TryGetLock() error
Close()
}

//This is a simple object which holds by the real lock.
//Add more contextual information if needed.
type DistributedLockStub struct {
Owner string
isMaster *int32
msgChan chan string
lease *clientv3.LeaseGrantResponse
}

type DistributedLockOptions struct {
etcdClient *clientv3.Client
Key string
ETCDAddress string
TTL int
//It'll be trigger when get lock.
HoldingLockFunc func()
//It'll be trigger when lost lock.
LosingLockFunc func()
}

func init() {
if lockObj == nil {
lockObj = &sync.Mutex{}
}
}

//Immediately create a new distributed lock instance when there is no any instance with the same Key being created before.
func NewDistributedLock(option DistributedLockOptions) (DistributedLocker, error) {
if option.Key == "" {
return nil, errors.New("option.Key must be set.")
}
if option.HoldingLockFunc == nil {
return nil, errors.New("option.HoldingLockFunc is required for receiving notification.")
}
if option.LosingLockFunc == nil {
return nil, errors.New("option.LosingLockFunc is required for receiving notification.")
}
//set default value.
if option.TTL == 0 {
option.TTL = 5
}
lockObj.Lock()
defer lockObj.Unlock()
var isOK bool
var l DistributedLocker
if l, isOK = lockMap[option.Key]; isOK {
return l, nil
}
l = &distributedLock{options: &option}
lockMap[option.Key] = l
return l, nil
}
26 changes: 26 additions & 0 deletions lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dlock

import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func Test_SpecifiedKeyNotExists(t *testing.T) {
dl, err := NewDistributedLock(DistributedLockOptions{
Key: "/KEY-SPEC-100",
ETCDAddress: "http://127.0.0.1:2379",
TTL: 5,
HoldingLockFunc: func() {
fmt.Println("You are master now...!")
},
LosingLockFunc: func() {
fmt.Println("You've lost master role, waiting...")
},
})
assert.Nil(t, err)
assert.Nil(t, dl.TryGetLock())
fmt.Printf("%#v\n", dl)
time.Sleep(30 * time.Second)
}

0 comments on commit 2f65dea

Please sign in to comment.