Skip to content

Commit

Permalink
feat: etcd PrefixWatch
Browse files Browse the repository at this point in the history
  • Loading branch information
illyaks committed Apr 30, 2024
1 parent 859fb7a commit 3b257ad
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
33 changes: 33 additions & 0 deletions pkg/etcd/etcd_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"minik8s/pkg/util"
"minik8s/util/log"
"sync"
)

type Store struct {
Expand Down Expand Up @@ -63,3 +64,35 @@ func (store Store) DeleteEtcdPair(key string) bool {

return true
}

func (store Store) PrefixWatch(wg *sync.WaitGroup, ctx context.Context, prefix string, handler func(key string, value string)) {
/* usage (not pretty sure):
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
PrefixWatch(...)
...
cancel() (when you want to terminate it)
wg.Wait()
*/
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
rch := store.etcdClient.Watch(ctx, prefix, clientv3.WithPrefix())
for resp := range rch {
err := resp.Err()
if err != nil {
log.Fatal("Failed to watch prefix-watch: %s", err.Error())
}
for _, ev := range resp.Events {
handler(string(ev.Kv.Key), string(ev.Kv.Value))
}
}
}
}
}()
}
78 changes: 78 additions & 0 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
package etcd

import (
"context"
"fmt"
"minik8s/pkg/api"
"sync"
"testing"
)

type TestEtcdHandler struct {
firstEnter bool
done chan bool
}

func (h TestEtcdHandler) WatchHandler(key string, value string) {
fmt.Println("key: ", key, "value: ", value)
select {
case <-h.done:
fmt.Println("it's done but enter again")
default:
close(h.done)
}
}

func TestEtcd(t *testing.T) {
newPod := &api.Pod{
Metadata: api.ObjectMeta{
Expand Down Expand Up @@ -45,3 +63,63 @@ func TestEtcd(t *testing.T) {
t.Errorf("etcd delete pod fail")
}
}

func TestEtcdWatch(t *testing.T) {
newPod := &api.Pod{
Metadata: api.ObjectMeta{
Name: "test-pod",
NameSpace: "default",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "test-container1",
Image: "docker.io/library/nginx:latest",
ImagePullPolicy: api.PullPolicyIfNotPresent,
},
{
Name: "test-container2",
Image: "docker.io/library/nginx:latest",
ImagePullPolicy: api.PullPolicyIfNotPresent,
},
},
},
}

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
handler := TestEtcdHandler{done: make(chan bool), firstEnter: true}
EtcdStore.PrefixWatch(wg, ctx, "/registry/pods/", handler.WatchHandler)
time := 0
for {
select {
case <-handler.done:
fmt.Println("it's done")
cancel()
wg.Wait()
_, res := EtcdStore.GetPod("default", "test-pod")
if res == true {
res = EtcdStore.DeletePod("default", "test-pod")
if res != true {
t.Errorf("etcd delete pod fail")
}
}
return
default:
if time%2 == 0 {
fmt.Println("start put")
res := EtcdStore.PutPod(*newPod)
if res != true {
t.Errorf("etcd put pod fail")
}
} else {
fmt.Println("start delete")
res := EtcdStore.DeletePod("default", "test-pod")
if res != true {
t.Errorf("etcd delete pod fail")
}
}
time = time + 1
}
}
}
2 changes: 1 addition & 1 deletion pkg/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func (s *Subscriber) Subscribe(wg *sync.WaitGroup, ctx context.Context, topics [
wg := &sync.WaitGroup{}
Subscribe(...)
<- handler.ready (handler.setup should close ready)
cancel() (when you want to terminate it)
wg.Wait()
cancel()
*/
// use go func() to run it async
// maybe we should give a way to terminate it
Expand Down

0 comments on commit 3b257ad

Please sign in to comment.