Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parked: Tombstone services before sending updates #50

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion catalog/services_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func (state *ServicesState) ExpireServer(hostname string) {

for _, svc := range state.Servers[hostname].Services {
previousStatus := svc.Status
state.ServiceChanged(svc, previousStatus, svc.Updated)
svc.Tombstone()
state.ServiceChanged(svc, previousStatus, svc.Updated)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know it occurs to me that maybe we ought to be just doing this outside the loop by capturing the initial state and the final state. No one can process those intermediate changes that fast anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I guess if anything relied on svc.Updated that would break them. Maybe there is some middle ground.

tombstones = append(tombstones, *svc)
}

Expand Down
12 changes: 12 additions & 0 deletions catalog/services_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,13 +670,25 @@ func Test_ClusterMembershipManagement(t *testing.T) {
state.AddServiceEntry(service1)
state.AddServiceEntry(service2)

dummyListener := mockListener{
events: make(chan ChangeEvent, len(state.Servers[hostname].Services)),
}
state.AddListener(&dummyListener)

go state.ExpireServer(hostname)
expired := <-state.Broadcasts

So(len(expired), ShouldEqual, 2)
// Timestamps chagne when tombstoning, so regex match
So(expired[0], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$")
So(expired[1], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$")

Convey("and sends the tombstones to any listener", func() {
for i := 0; i < len(state.Servers[hostname].Services); i++ {
changeEvent := <-dummyListener.Chan()
So(changeEvent.Service.Status, ShouldEqual, service.TOMBSTONE)
}
})
})

Convey("does not announce services for hosts with none", func() {
Expand Down
77 changes: 61 additions & 16 deletions catalog/url_listener_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package catalog

import (
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"testing"
"time"

"github.com/Nitro/sidecar/service"
"github.com/relistan/go-director"
Expand Down Expand Up @@ -47,36 +50,57 @@ func Test_prepareCookieJar(t *testing.T) {
}

func Test_Listen(t *testing.T) {
Convey("Listen()", t, func() {
Convey("Listen()", t, func(c C) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not know this was a thing. Interesting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's handy for testing inside goroutines, although I still haven't gotten around to fixing this bug: smartystreets/goconvey#477

url := "http://beowulf.example.com"

httpmock.RegisterResponder(
"POST", url,
func(req *http.Request) (*http.Response, error) {
return httpmock.NewStringResponse(500, "so bad!"), nil
},
)

httpmock.Activate()
listener := NewUrlListener(url, false)
errors := make(chan error)
listener.looper = director.NewFreeLooper(1, errors)

hostname := "grendel"

svcId1 := "deadbeef123"
service1 := service.Service{ID: svcId1, Hostname: hostname}
svcId2 := "ecgtheow"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

service2 := service.Service{ID: svcId2, Hostname: hostname}

state := NewServicesState()
state.Hostname = hostname
state.AddServiceEntry(service1)
state.Servers[hostname].Services[service1.ID].Tombstone()

postShouldErr := false
var changeEventTime time.Time
httpmock.RegisterResponder(
"POST", url,
func(req *http.Request) (*http.Response, error) {
if postShouldErr {
return httpmock.NewStringResponse(500, "so bad!"), nil
}

bodyBytes, err := ioutil.ReadAll(req.Body)
c.So(err, ShouldBeNil)

var evt StateChangedEvent
err = json.Unmarshal(bodyBytes, &evt)
c.So(err, ShouldBeNil)
c.So(evt.ChangeEvent.PreviousStatus, ShouldEqual, service.ALIVE)

// Make sure each new event comes in with a different timestamp
c.So(evt.ChangeEvent.Time, ShouldNotEqual, changeEventTime)
changeEventTime = evt.ChangeEvent.Time

return httpmock.NewBytesResponse(200, nil), nil
},
)
httpmock.Activate()
Reset(func() {
httpmock.DeactivateAndReset()
})

Convey("handles a bad post", func() {
postShouldErr = true

state.AddServiceEntry(service1)
state.Servers[hostname].Services[service1.ID].Tombstone()

listener := NewUrlListener(url, false)
errors := make(chan error)
listener.looper = director.NewFreeLooper(1, errors)

listener.eventChannel <- ChangeEvent{}
listener.Retries = 0
listener.Watch(state)
Expand All @@ -85,5 +109,26 @@ func Test_Listen(t *testing.T) {
So(err, ShouldBeNil)
So(len(errors), ShouldEqual, 0)
})

Convey("gets all updates when a server expires", func() {
state.AddServiceEntry(service1)
state.AddServiceEntry(service2)

listener := NewUrlListener(url, false)
errors := make(chan error)
// Do two iterations: One for each service from the expired server
listener.looper = director.NewFreeLooper(
len(state.Servers[hostname].Services), errors)
listener.Retries = 0

listener.Watch(state)

state.ExpireServer(hostname)

// Block until both iterations are done
err := listener.looper.Wait()
So(err, ShouldBeNil)
So(len(errors), ShouldEqual, 0)
})
})
}
2 changes: 1 addition & 1 deletion receiver/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func UpdateHandler(response http.ResponseWriter, req *http.Request, rcvr *Receiv
rcvr.StateLock.Lock()
defer rcvr.StateLock.Unlock()

if rcvr.CurrentState == nil || rcvr.CurrentState.LastChanged.Before(evt.State.LastChanged) {
if rcvr.CurrentState == nil || rcvr.CurrentState.LastChanged.Before(evt.ChangeEvent.Time) {
rcvr.CurrentState = evt.State
rcvr.LastSvcChanged = &evt.ChangeEvent.Service

Expand Down
5 changes: 5 additions & 0 deletions receiver/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func Test_updateHandler(t *testing.T) {
Status: service.ALIVE,
},
PreviousStatus: service.TOMBSTONE,
Time: evtState.LastChanged,
},
}

Expand Down Expand Up @@ -120,6 +121,7 @@ func Test_updateHandler(t *testing.T) {
Status: service.ALIVE,
},
PreviousStatus: service.TOMBSTONE,
Time: evtState.LastChanged,
},
}

Expand Down Expand Up @@ -178,6 +180,7 @@ func Test_updateHandler(t *testing.T) {
Status: service.ALIVE,
},
PreviousStatus: service.TOMBSTONE,
Time: evtState.LastChanged,
},
}

Expand Down Expand Up @@ -207,6 +210,7 @@ func Test_updateHandler(t *testing.T) {
Status: service.ALIVE,
},
PreviousStatus: service.TOMBSTONE,
Time: evtState.LastChanged,
},
}

Expand Down Expand Up @@ -239,6 +243,7 @@ func Test_updateHandler(t *testing.T) {
Status: service.DRAINING,
},
PreviousStatus: service.ALIVE,
Time: evtState.LastChanged,
},
}

Expand Down