Skip to content

Commit

Permalink
Merge pull request #105 from ripienaar/104
Browse files Browse the repository at this point in the history
(#104) avoid duplicates in task list
  • Loading branch information
ripienaar authored Dec 26, 2022
2 parents 8f37499 + 68926d7 commit 61f2296
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
6 changes: 4 additions & 2 deletions ajc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ func main() {

default:
fmt.Fprintf(os.Stderr, "ajc runtime error: %v\n", err)
fmt.Fprintln(os.Stderr)
ajc.Usage(os.Args[1:])
if err != asyncjobs.ErrNoTasks {
fmt.Fprintln(os.Stderr)
ajc.Usage(os.Args[1:])
}
}

os.Exit(1)
Expand Down
41 changes: 25 additions & 16 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,11 @@ func (s *jetStreamStorage) Tasks(ctx context.Context, limit int32) (chan *Task,
return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady)
}

if limit <= 0 {
s.log.Debugf("Defaulting to task list limit of 10000")
limit = 10000
}

nfo, err := s.tasks.stream.State()
if err != nil {
return nil, err
Expand All @@ -934,29 +939,33 @@ func (s *jetStreamStorage) Tasks(ctx context.Context, limit int32) (chan *Task,
timeout, cancel := context.WithTimeout(ctx, time.Minute)

sub, err := s.nc.Subscribe(s.nc.NewRespInbox(), func(msg *nats.Msg) {
if len(msg.Data) > 0 {
task := &Task{}
err := json.Unmarshal(msg.Data, task)
if len(msg.Data) == 0 {
return
}

task := &Task{}
err := json.Unmarshal(msg.Data, task)
if err != nil {
return
}

select {
case out <- task:
md, err := msg.Metadata()
if err != nil {
return
}

select {
case out <- task:
md, err := msg.Metadata()
if err != nil {
return
}
done := atomic.AddInt32(&cnt, 1)
if md.NumPending == 0 || done == limit {
msg.Sub.Unsubscribe()
cancel()
}
default:
done := atomic.AddInt32(&cnt, 1)
if md.NumPending == 0 || done == limit {
msg.Sub.Unsubscribe()
cancel()
}
default:
msg.Sub.Unsubscribe()
cancel()
}

msg.Ack()
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 61f2296

Please sign in to comment.