diff --git a/client.go b/client.go index 1c42514..eb9d162 100644 --- a/client.go +++ b/client.go @@ -146,15 +146,32 @@ func (c *MCSClient) readMessages(ctx context.Context, mcs *mcs) error { return errors.Wrap(err, "receive version failed") } - for ctx.Err() == nil { + for { // receive tag - data, err := mcs.PerformReadTag() - if err != nil { - return errors.Wrap(err, "receive tag failed") - } else if data == nil { - return ErrFcmNotEnoughData - } else if ctx.Err() != nil { - break + dataCh := make(chan any) + errCh := make(chan error) + go func() { + data, err := mcs.PerformReadTag() + if err != nil { + errCh <- err + return + } + + if data == nil { + errCh <- ErrFcmNotEnoughData + } else { + dataCh <- data + } + }() + + var data any + select { + case <-ctx.Done(): + return ctx.Err() + case d := <-dataCh: + data = d + case err := <-errCh: + return err } err = c.onDataMessage(data) @@ -169,8 +186,6 @@ func (c *MCSClient) readMessages(ctx context.Context, mcs *mcs) error { } } } - - return nil } func (c *MCSClient) AckStreamPosition(mcs *mcs) error {