Skip to content

Commit

Permalink
Prevent anymore messages from being received after ctx is cancelled.
Browse files Browse the repository at this point in the history
  • Loading branch information
John Newman committed Sep 9, 2024
1 parent fca8333 commit 1af0c31
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,32 @@ func (c *MCSClient) readMessages(ctx context.Context, mcs *mcs) error {
return errors.Wrap(err, "receive version failed")
}

for ctx.Err() == nil {
for {
// receive tag
data, err := mcs.PerformReadTag()
if err != nil {
return errors.Wrap(err, "receive tag failed")
} else if data == nil {
return ErrFcmNotEnoughData
} else if ctx.Err() != nil {
break
dataCh := make(chan any)
errCh := make(chan error)
go func() {
data, err := mcs.PerformReadTag()
if err != nil {
errCh <- err
return
}

if data == nil {
errCh <- ErrFcmNotEnoughData
} else {
dataCh <- data
}
}()

var data any
select {
case <-ctx.Done():
return ctx.Err()
case d := <-dataCh:
data = d
case err := <-errCh:
return err
}

err = c.onDataMessage(data)
Expand All @@ -169,8 +186,6 @@ func (c *MCSClient) readMessages(ctx context.Context, mcs *mcs) error {
}
}
}

return nil
}

func (c *MCSClient) AckStreamPosition(mcs *mcs) error {
Expand Down

0 comments on commit 1af0c31

Please sign in to comment.