Skip to content

Commit

Permalink
Fix get events method to always stop with watch=true. (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
jankaspar authored Nov 29, 2019
1 parent 3d36241 commit 2b8ee37
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
12 changes: 12 additions & 0 deletions internal/armada/repository/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type EventRepository interface {
ReportEvent(message *api.EventMessage) error
ReportEvents(message []*api.EventMessage) error
ReadEvents(jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, error)
GetLastMessageId(jobSetId string) (string, error)
}

type RedisEventRepository struct {
Expand Down Expand Up @@ -97,3 +98,14 @@ func (repo *RedisEventRepository) ReadEvents(jobSetId string, lastId string, lim
}
return messages, nil
}

func (repo *RedisEventRepository) GetLastMessageId(jobSetId string) (string, error) {
msg, err := repo.db.XRevRangeN(eventStreamPrefix+jobSetId, "+", "-", 1).Result()
if err != nil {
return "", err
}
if len(msg) > 0 {
return msg[0].ID, nil
}
return "0", nil
}
20 changes: 16 additions & 4 deletions internal/armada/server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ func (s *EventServer) GetJobSetEvents(request *api.JobSetRequest, stream api.Eve
return e
}

lastId := request.FromMessageId
fromId := request.FromMessageId

var timeout time.Duration = -1
var stopAfter = ""
if request.Watch {
timeout = 5 * time.Second
} else {
lastId, e := s.eventRepository.GetLastMessageId(request.Id)
if e != nil {
return e
}
stopAfter = lastId
}

for {
Expand All @@ -47,20 +54,25 @@ func (s *EventServer) GetJobSetEvents(request *api.JobSetRequest, stream api.Eve
default:
}

messages, e := s.eventRepository.ReadEvents(request.Id, lastId, 500, timeout)
messages, e := s.eventRepository.ReadEvents(request.Id, fromId, 500, timeout)

if e != nil {
return e
}

stop := len(messages) == 0
for _, msg := range messages {
lastId = msg.Id
fromId = msg.Id
if fromId == stopAfter {
stop = true
}
e = stream.Send(msg)
if e != nil {
return e
}
}

if !request.Watch && len(messages) == 0 {
if !request.Watch && stop {
return nil
}
}
Expand Down
10 changes: 10 additions & 0 deletions internal/armada/server/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func TestEventServer_ReportUsage(t *testing.T) {
})
}

func TestEventServer_GetJobSetEvents_EmptyStreamShouldNotFail(t *testing.T) {
withEventServer(func(s *EventServer) {

stream := &eventStreamMock{}
e := s.GetJobSetEvents(&api.JobSetRequest{Id: "test", Watch: false}, stream)
assert.Nil(t, e)
assert.Equal(t, 0, len(stream.sendMessages))
})
}

func reportEvent(t *testing.T, s *EventServer, event api.Event) {
msg, _ := api.Wrap(event)
_, e := s.Report(context.Background(), msg)
Expand Down

0 comments on commit 2b8ee37

Please sign in to comment.