diff --git a/testutils/utils.go b/testutils/utils.go index e22b5301..e63013b8 100644 --- a/testutils/utils.go +++ b/testutils/utils.go @@ -3,6 +3,7 @@ package testutils import ( "fmt" "os" + "strconv" "testing" "time" @@ -146,56 +147,54 @@ func testPutGetDeleteExists(t *testing.T, kv store.Store) { func testWatch(t *testing.T, kv store.Store) { key := "testWatch" - value := []byte("world") - newValue := []byte("world!") - // Put the key - err := kv.Put(key, value, nil) + initialValue := []byte("-1") + err := kv.Put(key, initialValue, nil) assert.NoError(t, err) + v, err := kv.Get(key) + assert.NoError(t, err) + assert.Equal(t, v.Value, initialValue) stopCh := make(<-chan struct{}) events, err := kv.Watch(key, stopCh) assert.NoError(t, err) assert.NotNil(t, events) + inputs := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"} // Update loop go func() { - timeout := time.After(1 * time.Second) - tick := time.Tick(250 * time.Millisecond) - for { - select { - case <-timeout: - return - case <-tick: - err := kv.Put(key, newValue, nil) - if assert.NoError(t, err) { - continue - } + for _, i := range inputs { + err := kv.Put(key, []byte(i), nil) + if !assert.NoError(t, err) { return } + time.Sleep(500 * time.Microsecond) } }() // Check for updates - eventCount := 1 + results := []string{} for { select { case event := <-events: assert.NotNil(t, event) - if eventCount == 1 { - assert.Equal(t, event.Key, key) - assert.Equal(t, event.Value, value) - } else { - assert.Equal(t, event.Key, key) - assert.Equal(t, event.Value, newValue) + assert.Equal(t, event.Key, key) + value := string(event.Value) + if value == string(initialValue) { + // skip it initial value as only zookeper catches it + continue } - eventCount++ - // We received all the events we wanted to check - if eventCount >= 4 { + if v, _ := strconv.Atoi(value); v < 4 { + // on etcd initial events gets missed, test only tail events + continue + } + results = append(results, value) + if len(results) == len(inputs[4:]) { + assert.Equal(t, results, inputs[4:]) return } case <-time.After(getShortTimeout()): - t.Fatal("Timeout reached") + t.Fatalf("Timeout reached (results=%v)", results) return } }