diff --git a/certexchange/client.go b/certexchange/client.go index cf4ac4be..d6eab30b 100644 --- a/certexchange/client.go +++ b/certexchange/client.go @@ -8,7 +8,6 @@ import ( "runtime/debug" "time" - "github.com/filecoin-project/go-f3" "github.com/filecoin-project/go-f3/certs" "github.com/filecoin-project/go-f3/gpbft" "github.com/libp2p/go-libp2p/core/host" @@ -27,8 +26,6 @@ type Client struct { Host host.Host NetworkName gpbft.NetworkName RequestTimeout time.Duration - - Log f3.Logger } func (c *Client) withDeadline(ctx context.Context) (context.Context, context.CancelFunc) { @@ -44,7 +41,7 @@ func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *Res defer func() { if perr := recover(); perr != nil { _err = fmt.Errorf("panicked requesting certificates from peer %s: %v\n%s", p, perr, string(debug.Stack())) - c.Log.Error(_err) + log.Error(_err) } }() @@ -75,7 +72,7 @@ func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *Res bw := bufio.NewWriter(stream) if err := req.MarshalCBOR(bw); err != nil { - c.Log.Debugf("failed to marshal certificate exchange request to peer %s: %w", p, err) + log.Debugf("failed to marshal certificate exchange request to peer %s: %w", p, err) return nil, nil, err } if err := bw.Flush(); err != nil { @@ -91,7 +88,7 @@ func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *Res } err = resp.UnmarshalCBOR(br) if err != nil { - c.Log.Debugf("failed to unmarshal certificate exchange response header from peer %s: %w", p, err) + log.Debugf("failed to unmarshal certificate exchange response header from peer %s: %w", p, err) return nil, nil, err } @@ -105,7 +102,7 @@ func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *Res go func() { defer func() { if perr := recover(); perr != nil { - c.Log.Errorf("panicked while receiving certificates from peer %s: %v\n%s", p, perr, string(debug.Stack())) + log.Errorf("panicked while receiving certificates from peer %s: %v\n%s", p, perr, string(debug.Stack())) } cancelReq() close(ch) @@ -122,12 +119,12 @@ func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *Res case io.EOF: return default: - c.Log.Debugf("failed to unmarshal certificate from peer %s: %w", p, err) + log.Debugf("failed to unmarshal certificate from peer %s: %w", p, err) return } // One quick sanity check. The rest will be validated by the caller. if cert.GPBFTInstance != request.FirstInstance+i { - c.Log.Warnf("received out-of-order certificate from peer %s", p) + log.Warnf("received out-of-order certificate from peer %s", p) return } diff --git a/certexchange/polling/common.go b/certexchange/polling/common.go index f05c922e..301aed68 100644 --- a/certexchange/polling/common.go +++ b/certexchange/polling/common.go @@ -2,6 +2,8 @@ package polling import ( "github.com/benbjohnson/clock" + logging "github.com/ipfs/go-log/v2" ) +var log = logging.Logger("f3/certexchange") var clk clock.Clock = clock.New() diff --git a/certexchange/polling/common_test.go b/certexchange/polling/common_test.go index fec20b6d..ee4b4393 100644 --- a/certexchange/polling/common_test.go +++ b/certexchange/polling/common_test.go @@ -10,16 +10,12 @@ import ( "github.com/filecoin-project/go-f3/sim/signing" "github.com/benbjohnson/clock" - logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" ) // The network name used in tests. const TestNetworkName gpbft.NetworkName = "testnet" -// The logger used in tests. -var TestLog = logging.Logger("f3-testing") - // The clock used in tests. Time doesn't pass in tests unless you add time to this clock. var MockClock = clock.NewMock() diff --git a/certexchange/polling/poller_test.go b/certexchange/polling/poller_test.go index 6b9a1c93..d92d668c 100644 --- a/certexchange/polling/poller_test.go +++ b/certexchange/polling/poller_test.go @@ -49,7 +49,6 @@ func TestPoller(t *testing.T) { NetworkName: polling.TestNetworkName, Host: serverHost, Store: serverCs, - Log: polling.TestLog, } require.NoError(t, server.Start()) @@ -62,7 +61,6 @@ func TestPoller(t *testing.T) { client := certexchange.Client{ Host: clientHost, NetworkName: polling.TestNetworkName, - Log: polling.TestLog, } poller, err := polling.NewPoller(ctx, &client, clientCs, backend) diff --git a/certexchange/polling/subscriber.go b/certexchange/polling/subscriber.go index 5a83e854..c16c990b 100644 --- a/certexchange/polling/subscriber.go +++ b/certexchange/polling/subscriber.go @@ -64,7 +64,7 @@ func (s *Subscriber) Start() error { }() if err := s.run(ctx); err != nil && ctx.Err() == nil { - s.Log.Errorf("polling certificate exchange subscriber exited early: %s", err) + log.Errorf("polling certificate exchange subscriber exited early: %s", err) } }() @@ -115,7 +115,7 @@ func (s *Subscriber) run(ctx context.Context) error { nextInterval := predictor.update(progress) nextPollTime := pollTime.Add(nextInterval) delay := max(clk.Until(nextPollTime), 0) - s.Log.Infof("predicted interval is %s (waiting %s)", nextInterval, delay) + log.Infof("predicted interval is %s (waiting %s)", nextInterval, delay) timer.Reset(delay) case <-ctx.Done(): return ctx.Err() @@ -132,14 +132,14 @@ func (s *Subscriber) poll(ctx context.Context) (uint64, error) { peers := s.peerTracker.suggestPeers() start := s.poller.NextInstance - s.Log.Debugf("polling %d peers for instance %d", len(peers), s.poller.NextInstance) + log.Debugf("polling %d peers for instance %d", len(peers), s.poller.NextInstance) for _, peer := range peers { oldInstance := s.poller.NextInstance res, err := s.poller.Poll(ctx, peer) if err != nil { return s.poller.NextInstance - start, err } - s.Log.Debugf("polled %s for instance %d, got %+v", peer, s.poller.NextInstance, res) + log.Debugf("polled %s for instance %d, got %+v", peer, s.poller.NextInstance, res) // If we manage to advance, all old "hits" are actually misses. if oldInstance < s.poller.NextInstance { misses = append(misses, hits...) diff --git a/certexchange/polling/subscriber_test.go b/certexchange/polling/subscriber_test.go index 4e87832d..5f5f8289 100644 --- a/certexchange/polling/subscriber_test.go +++ b/certexchange/polling/subscriber_test.go @@ -45,7 +45,6 @@ func TestSubscriber(t *testing.T) { NetworkName: polling.TestNetworkName, Host: h, Store: cs, - Log: polling.TestLog, } } @@ -63,7 +62,6 @@ func TestSubscriber(t *testing.T) { client := certexchange.Client{ Host: clientHost, NetworkName: polling.TestNetworkName, - Log: polling.TestLog, } subscriber := polling.Subscriber{ diff --git a/certexchange/protocol_test.go b/certexchange/protocol_test.go index fee9d269..11e3c9da 100644 --- a/certexchange/protocol_test.go +++ b/certexchange/protocol_test.go @@ -11,15 +11,12 @@ import ( "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" - logging "github.com/ipfs/go-log/v2" mocknetwork "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" ) const testNetworkName gpbft.NetworkName = "testnet" -var log = logging.Logger("certexchange-test") - func testPowerTable(entries int64) (gpbft.PowerEntries, gpbft.CID) { powerTable := make(gpbft.PowerEntries, entries) @@ -68,13 +65,11 @@ func TestClientServer(t *testing.T) { NetworkName: testNetworkName, Host: h1, Store: cs, - Log: log, } client := certexchange.Client{ Host: h2, NetworkName: testNetworkName, - Log: log, } require.NoError(t, server.Start()) diff --git a/certexchange/server.go b/certexchange/server.go index e9b22d6f..23c24d4d 100644 --- a/certexchange/server.go +++ b/certexchange/server.go @@ -8,13 +8,16 @@ import ( "runtime/debug" "time" - "github.com/filecoin-project/go-f3" "github.com/filecoin-project/go-f3/certstore" "github.com/filecoin-project/go-f3/gpbft" + + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" ) +var log = logging.Logger("f3/certexchange") + const maxResponseLen = 256 // Server is libp2p a certificate exchange server. @@ -24,7 +27,6 @@ type Server struct { NetworkName gpbft.NetworkName Host host.Host Store *certstore.Store - Log f3.Logger cancel context.CancelFunc } @@ -40,7 +42,7 @@ func (s *Server) handleRequest(ctx context.Context, stream network.Stream) (_err defer func() { if perr := recover(); perr != nil { _err = fmt.Errorf("panicked in server response: %v", perr) - s.Log.Errorf("%s\n%s", string(debug.Stack())) + log.Errorf("%s\n%s", string(debug.Stack())) } }() @@ -56,7 +58,7 @@ func (s *Server) handleRequest(ctx context.Context, stream network.Stream) (_err // Request has no variable-length fields, so we don't need a limited reader. var req Request if err := req.UnmarshalCBOR(br); err != nil { - s.Log.Debugf("failed to read request from stream: %w", err) + log.Debugf("failed to read request from stream: %w", err) return err } @@ -72,14 +74,14 @@ func (s *Server) handleRequest(ctx context.Context, stream network.Stream) (_err if resp.PendingInstance >= req.FirstInstance && req.IncludePowerTable { pt, err := s.Store.GetPowerTable(ctx, req.FirstInstance) if err != nil { - s.Log.Errorf("failed to load power table: %w", err) + log.Errorf("failed to load power table: %w", err) return err } resp.PowerTable = pt } if err := resp.MarshalCBOR(bw); err != nil { - s.Log.Debugf("failed to write header to stream: %w", err) + log.Debugf("failed to write header to stream: %w", err) return err } @@ -96,12 +98,12 @@ func (s *Server) handleRequest(ctx context.Context, stream network.Stream) (_err if err == nil || errors.Is(err, certstore.ErrCertNotFound) { for i := range certs { if err := certs[i].MarshalCBOR(bw); err != nil { - s.Log.Debugf("failed to write certificate to stream: %w", err) + log.Debugf("failed to write certificate to stream: %w", err) return err } } } else { - s.Log.Errorf("failed to load finality certificates: %w", err) + log.Errorf("failed to load finality certificates: %w", err) } } return bw.Flush() diff --git a/cmd/f3/run.go b/cmd/f3/run.go index 42804e28..0eab9bb7 100644 --- a/cmd/f3/run.go +++ b/cmd/f3/run.go @@ -21,9 +21,9 @@ import ( "golang.org/x/xerrors" ) -const DiscoveryTag = "f3-standalone" +var log = logging.Logger("f3/cli") -var log = logging.Logger("f3") +const DiscoveryTag = "f3-standalone" var runCmd = cli.Command{ Name: "run", @@ -115,8 +115,7 @@ var runCmd = cli.Command{ ec := ec.NewFakeEC(1, m.BootstrapEpoch, m.ECPeriod, initialPowerTable, true) - module, err := f3.New(ctx, mprovider, ds, h, ps, - signingBackend, ec, log, nil) + module, err := f3.New(ctx, mprovider, ds, h, ps, signingBackend, ec, nil) if err != nil { return xerrors.Errorf("creating module: %w", err) } diff --git a/f3.go b/f3.go index 5db0f388..42560bb5 100644 --- a/f3.go +++ b/f3.go @@ -31,7 +31,6 @@ type F3 struct { host host.Host ds datastore.Datastore ec ec.Backend - log Logger pubsub *pubsub.PubSub runningCtx context.Context @@ -48,7 +47,7 @@ type F3 struct { // The context is used for initialization not runtime. // signingMarshaller can be nil for default SigningMarshaler func New(_ctx context.Context, manifest manifest.ManifestProvider, ds datastore.Datastore, h host.Host, - ps *pubsub.PubSub, verif gpbft.Verifier, ec ec.Backend, log Logger, signingMarshaller gpbft.SigningMarshaler) (*F3, error) { + ps *pubsub.PubSub, verif gpbft.Verifier, ec ec.Backend, signingMarshaller gpbft.SigningMarshaler) (*F3, error) { runningCtx, cancel := context.WithCancel(context.Background()) errgrp, runningCtx := errgroup.WithContext(runningCtx) @@ -63,7 +62,6 @@ func New(_ctx context.Context, manifest manifest.ManifestProvider, ds datastore. host: h, ds: ds, ec: ec, - log: log, pubsub: ps, runningCtx: runningCtx, cancelCtx: cancel, @@ -96,13 +94,13 @@ func (m *F3) Broadcast(ctx context.Context, signatureBuilder *gpbft.SignatureBui m.mu.Unlock() if runner == nil { - m.log.Error("attempted to broadcast message while F3 wasn't running") + log.Error("attempted to broadcast message while F3 wasn't running") return } err := runner.BroadcastMessage(msg) if err != nil { - m.log.Errorf("failed to broadcast message: %+v", err) + log.Errorf("failed to broadcast message: %+v", err) } } @@ -258,7 +256,7 @@ func (m *F3) reconfigure(ctx context.Context, manifest *manifest.Manifest) error m.cs = cs m.manifest = manifest m.runner, err = newRunner( - ctx, m.cs, runnerEc, m.log, m.pubsub, + ctx, m.cs, runnerEc, m.pubsub, m.signingMarshaller, m.verifier, m.busBroadcast.Publish, m.manifest, ) diff --git a/host.go b/host.go index 8e334028..15f2c287 100644 --- a/host.go +++ b/host.go @@ -12,7 +12,6 @@ import ( "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" - logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" peer "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/multierr" @@ -32,7 +31,6 @@ type gpbftRunner struct { signingMarshaller gpbft.SigningMarshaler verifier gpbft.Verifier broadcastCb BroadcastMessage - log, logWithSkip Logger participant *gpbft.Participant topic *pubsub.Topic @@ -48,7 +46,6 @@ func newRunner( _ context.Context, cs *certstore.Store, ec ec.Backend, - log Logger, ps *pubsub.PubSub, signer gpbft.SigningMarshaler, verifier gpbft.Verifier, @@ -66,25 +63,19 @@ func newRunner( signingMarshaller: signer, verifier: verifier, broadcastCb: broadcastCb, - log: log, - logWithSkip: log, runningCtx: runningCtx, errgrp: errgrp, ctxCancel: ctxCancel, } - if zapLogger, ok := runner.log.(*logging.ZapEventLogger); ok { - runner.logWithSkip = logging.WithSkip(zapLogger, 1) - } - // create a stopped timer to facilitate alerts requested from gpbft runner.alertTimer = time.NewTimer(100 * time.Hour) if !runner.alertTimer.Stop() { <-runner.alertTimer.C } - runner.log.Infof("Starting gpbft runner") - opts := append(m.GpbftOptions(), gpbft.WithTracer((*gpbftTracer)(runner))) + log.Infof("Starting gpbft runner") + opts := append(m.GpbftOptions(), gpbft.WithTracer(tracer)) p, err := gpbft.NewParticipant((*gpbftHost)(runner), opts...) if err != nil { return nil, xerrors.Errorf("creating participant: %w", err) @@ -122,7 +113,7 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) { h.errgrp.Go(func() (_err error) { defer func() { if _err != nil && h.runningCtx.Err() == nil { - h.log.Errorf("exited GPBFT runner early: %+v", _err) + log.Errorf("exited GPBFT runner early: %+v", _err) } }() for h.runningCtx.Err() == nil { @@ -171,7 +162,7 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) { // TODO: we need to distinguish between "fatal" and // "non-fatal" errors here. Ideally only returning "real" // errors. - h.log.Errorf("error when processing message: %+v", err) + log.Errorf("error when processing message: %+v", err) } case <-h.runningCtx.Done(): return nil @@ -199,7 +190,7 @@ func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) ts, err := h.ec.GetTipset(h.runningCtx, cert.ECChain.Head().Key) if err != nil { // this should not happen - h.log.Errorf("could not get timestamp of just finalized tipset: %+v", err) + log.Errorf("could not get timestamp of just finalized tipset: %+v", err) return time.Now().Add(ecDelay) } @@ -218,7 +209,7 @@ func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) for instance := cert.GPBFTInstance - 1; instance > h.manifest.InitialInstance; instance-- { cert, err := h.certStore.Get(h.runningCtx, instance) if err != nil { - h.log.Errorf("error while getting instance %d from certstore: %+v", instance, err) + log.Errorf("error while getting instance %d from certstore: %+v", instance, err) break } if !cert.ECChain.HasSuffix() { @@ -233,7 +224,7 @@ func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) } backoff := time.Duration(float64(ecDelay) * backoffMultipler) - h.log.Infof("backing off for: %v", backoff) + log.Infof("backing off for: %v", backoff) return ts.Timestamp().Add(backoff) } @@ -269,11 +260,11 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, pID peer.ID, validatedMessage, err := h.participant.ValidateMessage(&gmsg) if errors.Is(err, gpbft.ErrValidationInvalid) { - h.log.Debugf("validation error during validation: %+v", err) + log.Debugf("validation error during validation: %+v", err) return pubsub.ValidationReject } if err != nil { - h.log.Warnf("unknown error during validation: %+v", err) + log.Warnf("unknown error during validation: %+v", err) return pubsub.ValidationIgnore } msg.ValidatorData = validatedMessage @@ -338,7 +329,7 @@ func (h *gpbftRunner) startPubsub() (<-chan gpbft.ValidatedMessage, error) { } gmsg, ok := msg.ValidatorData.(gpbft.ValidatedMessage) if !ok { - h.log.Errorf("invalid msgValidatorData: %+v", msg.ValidatorData) + log.Errorf("invalid msgValidatorData: %+v", msg.ValidatorData) continue } select { @@ -352,15 +343,6 @@ func (h *gpbftRunner) startPubsub() (<-chan gpbft.ValidatedMessage, error) { return messageQueue, nil } -type gpbftTracer gpbftRunner - -// Log fulfills the gpbft.Tracer interface -func (h *gpbftTracer) Log(fmt string, args ...any) { - h.logWithSkip.Debugf(fmt, args...) -} - -var _ gpbft.Tracer = (*gpbftTracer)(nil) - // gpbftHost is a newtype of gpbftRunner exposing APIs required by the gpbft.Participant type gpbftHost gpbftRunner @@ -513,7 +495,7 @@ func (h *gpbftHost) GetCommitteeForInstance(instance uint64) (*gpbft.PowerTable, powerEntries, err = h.certStore.GetPowerTable(h.runningCtx, instance) if err != nil { - h.log.Debugf("failed getting power table from certstore: %v, falling back to EC", err) + log.Debugf("failed getting power table from certstore: %v, falling back to EC", err) powerEntries, err = h.ec.GetPowerTable(h.runningCtx, powerTsk) if err != nil { @@ -561,7 +543,7 @@ func (h *gpbftHost) Time() time.Time { // The timestamp may be in the past, in which case the alarm will fire as soon as possible // (but not synchronously). func (h *gpbftHost) SetAlarm(at time.Time) { - h.log.Debugf("set alarm for %v", at) + log.Debugf("set alarm for %v", at) // we cannot reuse the timer because we don't know if it was read or not h.alertTimer.Stop() h.alertTimer = time.NewTimer(time.Until(at)) @@ -574,11 +556,11 @@ func (h *gpbftHost) SetAlarm(at time.Time) { // based on the decision received (which may be in the past). // E.g. this might be: finalised tipset timestamp + epoch duration + stabilisation delay. func (h *gpbftHost) ReceiveDecision(decision *gpbft.Justification) time.Time { - h.log.Infof("got decision at instance %d, finalized head at epoch: %d", + log.Infof("got decision at instance %d, finalized head at epoch: %d", decision.Vote.Instance, decision.Vote.Value.Head().Epoch) cert, err := h.saveDecision(decision) if err != nil { - h.log.Errorf("error while saving decision: %+v", err) + log.Errorf("error while saving decision: %+v", err) } return (*gpbftRunner)(h).computeNextInstanceStart(cert) } diff --git a/logging.go b/logging.go new file mode 100644 index 00000000..8d42513e --- /dev/null +++ b/logging.go @@ -0,0 +1,17 @@ +package f3 + +import ( + "github.com/filecoin-project/go-f3/gpbft" + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("f3") +var tracer gpbft.Tracer = (*gpbftTracer)(logging.WithSkip(logging.Logger("gpbft"), 1)) + +// Tracer used by GPBFT, backed by a Zap logger. +type gpbftTracer logging.ZapEventLogger + +// Log fulfills the gpbft.Tracer interface +func (h *gpbftTracer) Log(fmt string, args ...any) { + (*logging.ZapEventLogger)(h).Debugf(fmt, args...) +} diff --git a/test/f3_test.go b/test/f3_test.go index a6209870..4012fee0 100644 --- a/test/f3_test.go +++ b/test/f3_test.go @@ -457,8 +457,7 @@ func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, erro e.signingBackend.Allow(int(id)) - module, err := f3.New(e.testCtx, mprovider, ds, h, ps, - e.signingBackend, e.ec, log, nil) + module, err := f3.New(e.testCtx, mprovider, ds, h, ps, e.signingBackend, e.ec, nil) if err != nil { return nil, xerrors.Errorf("creating module: %w", err) }