diff --git a/package-lock.json b/package-lock.json index fcefb63be..5e30ab7cd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2247,7 +2247,8 @@ }, "node_modules/debug": { "version": "4.3.4", - "license": "MIT", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", "dependencies": { "ms": "2.1.2" }, diff --git a/src/core-manager/index.js b/src/core-manager/index.js index e548f2d94..3f085738d 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -8,6 +8,7 @@ import { HaveExtension, ProjectExtension } from '../generated/extensions.js' import { CoreIndex } from './core-index.js' import { ReplicationStateMachine } from './replication-state-machine.js' import * as rle from './bitfield-rle.js' +import { Logger } from '../logger.js' // WARNING: Changing these will break things for existing apps, since namespaces // are used for key derivation @@ -54,6 +55,8 @@ export class CoreManager extends TypedEmitter { #state = 'opened' #ready #haveExtension + #deviceId + #l static get namespaces() { return NAMESPACES @@ -67,6 +70,7 @@ export class CoreManager extends TypedEmitter { * @param {Buffer} [options.projectSecretKey] 32-byte secret key of the project creator core * @param {Partial>} [options.encryptionKeys] Encryption keys for each namespace * @param {import('hypercore').HypercoreStorage} options.storage Folder to store all hypercore data + * @param {Logger} [options.logger] */ constructor({ sqlite, @@ -75,6 +79,7 @@ export class CoreManager extends TypedEmitter { projectSecretKey, encryptionKeys = {}, storage, + logger, }) { super() assert( @@ -85,7 +90,9 @@ export class CoreManager extends TypedEmitter { !projectSecretKey || projectSecretKey.length === 64, 'project owner core secret key must be 64-byte buffer' ) + this.#l = Logger.create('coreManager', logger) const primaryKey = keyManager.getDerivedKey('primaryKey', projectKey) + this.#deviceId = keyManager.getIdentityKeypair().publicKey.toString('hex') this.#projectKey = projectKey this.#encryptionKeys = encryptionKeys @@ -157,7 +164,15 @@ export class CoreManager extends TypedEmitter { this.#ready = Promise.all( [...this.#coreIndex].map(({ core }) => core.ready()) - ).catch(() => {}) + ) + .then(() => { + this.#l.log('ready') + }) + .catch(() => {}) + } + + get deviceId() { + return this.#deviceId } get creatorCore() { @@ -304,6 +319,12 @@ export class CoreManager extends TypedEmitter { this.#addCoreSqlStmt.run({ publicKey: key, namespace }) } + this.#l.log( + 'Added %s %s core %k', + persist ? 'remote' : writer ? 'local' : 'creator', + namespace, + key + ) this.emit('add-core', { core, key, namespace }) return { core, key, namespace } diff --git a/src/discovery/dns-sd.js b/src/discovery/dns-sd.js index 7f68531c2..677006db3 100644 --- a/src/discovery/dns-sd.js +++ b/src/discovery/dns-sd.js @@ -1,10 +1,9 @@ import { TypedEmitter } from 'tiny-typed-emitter' import { Bonjour } from 'bonjour-service' -// @ts-ignore -import debug from 'debug' import pTimeout from 'p-timeout' import { randomBytes } from 'node:crypto' import { once } from 'node:events' +import { Logger } from '../logger.js' const SERVICE_NAME = 'mapeo' @@ -48,7 +47,7 @@ export class DnsSd extends TypedEmitter { } /* c8 ignore stop */ const { name, port } = service - this.#log(`service up`, [name, address, port]) + this.#log('serviceUp', name.slice(0, 7), address, port) this.emit('up', { port, name, address }) } /** @param {import('bonjour-service').Service} service */ @@ -74,18 +73,21 @@ export class DnsSd extends TypedEmitter { /** @type {Promise | null} */ #advertisingStopping = null #log + #l /** * * @param {object} [opts] * @param {string} [opts.name] * @param {boolean} [opts.disableIpv6] + * @param {Logger} [opts.logger] */ - constructor({ name, disableIpv6 = true } = {}) { + constructor({ name, disableIpv6 = true, logger } = {}) { super() + this.#l = Logger.create('dnssd', logger) this.#name = name || randomBytes(8).toString('hex') this.#disableIpv6 = disableIpv6 - this.#log = debug('mapeo:dnssd:' + this.#name) + this.#log = this.#l.log.bind(this.#l) } get name() { diff --git a/src/discovery/local-discovery.js b/src/discovery/local-discovery.js index 6c1ade7bb..6d3d59eed 100644 --- a/src/discovery/local-discovery.js +++ b/src/discovery/local-discovery.js @@ -3,11 +3,11 @@ import net from 'node:net' import NoiseSecretStream from '@hyperswarm/secret-stream' import { once } from 'node:events' import { DnsSd } from './dns-sd.js' -import debug from 'debug' import { isPrivate } from 'bogon' import StartStopStateMachine from 'start-stop-state-machine' import pTimeout from 'p-timeout' import { keyToPublicId } from '@mapeo/crypto' +import { Logger } from '../logger.js' /** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */ /** @typedef {import('../utils.js').OpenedNoiseStream} OpenedNoiseStream */ @@ -32,21 +32,25 @@ export class LocalDiscovery extends TypedEmitter { #log /** @type {(e: Error) => void} */ #handleSocketError + #l /** * @param {Object} opts * @param {Keypair} opts.identityKeypair * @param {DnsSd} [opts.dnssd] Optional DnsSd instance, used for testing + * @param {Logger} [opts.logger] */ - constructor({ identityKeypair, dnssd }) { + constructor({ identityKeypair, dnssd, logger }) { super() + this.#l = Logger.create('mdns', logger) + this.#log = this.#l.log.bind(this.#l) this.#dnssd = dnssd || new DnsSd({ name: keyToPublicId(identityKeypair.publicKey), + logger: this.#l, }) this.#dnssd.on('up', this.#handleServiceUp.bind(this)) - this.#log = debug('mapeo:mdns:' + keyShortname(identityKeypair.publicKey)) this.#sm = new StartStopStateMachine({ start: this.#start.bind(this), stop: this.#stop.bind(this), @@ -195,7 +199,8 @@ export class LocalDiscovery extends TypedEmitter { this.#log( `${isInitiator ? 'outgoing' : 'incoming'} secretSteam connection ${ isInitiator ? 'to' : 'from' - } ${keyShortname(remotePublicKey)}` + } %h`, + remotePublicKey ) const existing = this.#noiseConnections.get(remoteId) @@ -231,7 +236,7 @@ export class LocalDiscovery extends TypedEmitter { this.#noiseConnections.set(remoteId, conn) conn.on('close', () => { - this.#log(`closed connection with ${keyShortname(remotePublicKey)}`) + this.#log('closed connection with %h', remotePublicKey) this.#noiseConnections.delete(remoteId) }) @@ -295,13 +300,4 @@ function getAddress(server) { return addr } -/** - * - * @param {Buffer} key - * @returns - */ -function keyShortname(key) { - return keyToPublicId(key).slice(0, 7) -} - function noop() {} diff --git a/src/index-writer/index.js b/src/index-writer/index.js index b131e2468..a48c5fde1 100644 --- a/src/index-writer/index.js +++ b/src/index-writer/index.js @@ -3,6 +3,7 @@ import SqliteIndexer from '@mapeo/sqlite-indexer' import { getTableConfig } from 'drizzle-orm/sqlite-core' import { getBacklinkTableName } from '../schema/utils.js' import { discoveryKey } from 'hypercore-crypto' +import { Logger } from '../logger.js' /** * @typedef {import('../datatype/index.js').MapeoDocTables} MapeoDocTables @@ -21,6 +22,7 @@ export class IndexWriter { /** @type {Map} */ #indexers = new Map() #mapDoc + #l /** * * @param {object} opts @@ -28,8 +30,10 @@ export class IndexWriter { * @param {TTables[]} opts.tables * @param {(doc: MapeoDocInternal, version: import('@mapeo/schema').VersionIdObject) => MapeoDoc} [opts.mapDoc] optionally transform a document prior to indexing. Can also validate, if an error is thrown then the document will not be indexed * @param {typeof import('@mapeo/sqlite-indexer').defaultGetWinner} [opts.getWinner] custom function to determine the "winner" of two forked documents. Defaults to choosing the document with the most recent `updatedAt` + * @param {Logger} [opts.logger] */ - constructor({ tables, sqlite, mapDoc = (d) => d, getWinner }) { + constructor({ tables, sqlite, mapDoc = (d) => d, getWinner, logger }) { + this.#l = Logger.create('indexWriter', logger) this.#mapDoc = mapDoc for (const table of tables) { const config = getTableConfig(table) @@ -63,6 +67,7 @@ export class IndexWriter { const version = { coreDiscoveryKey: discoveryKey(key), index } var doc = this.#mapDoc(decode(block, version), version) } catch (e) { + this.#l.log('Could not decode entry %d of %h', index, key) // Unknown or invalid entry - silently ignore continue } @@ -80,6 +85,16 @@ export class IndexWriter { continue } indexer.batch(docs) + if (this.#l.enabled) { + for (const doc of docs) { + this.#l.log( + 'Indexed %s %S @ %S', + doc.schemaName, + doc.docId, + doc.versionId + ) + } + } } } } diff --git a/src/local-peers.js b/src/local-peers.js index 32c0d2305..1a38360d0 100644 --- a/src/local-peers.js +++ b/src/local-peers.js @@ -10,6 +10,7 @@ import { InviteResponse_Decision, } from './generated/rpc.js' import pDefer from 'p-defer' +import { Logger } from './logger.js' const PROTOCOL_NAME = 'mapeo/rpc' @@ -59,16 +60,23 @@ class Peer { #disconnectedAt = 0 /** @type {Protomux} */ #protomux + #log /** * @param {object} options * @param {Buffer} options.publicKey * @param {ReturnType} options.channel + * @param {Logger} [options.logger] */ - constructor({ publicKey, channel }) { + constructor({ publicKey, channel, logger }) { this.#publicKey = publicKey this.#channel = channel this.#connected = pDefer() + // @ts-ignore + this.#log = (formatter, ...args) => { + const log = Logger.create('peer', logger).log + return log.apply(null, [`[%h] ${formatter}`, publicKey, ...args]) + } } /** @returns {PeerInfoInternal} */ get info() { @@ -108,26 +116,34 @@ class Peer { this.#protomux = protomux /* c8 ignore next 3 */ if (this.#state !== 'connecting') { + this.#log('ERROR: tried to connect but state was %s', this.#state) return // TODO: report error - this should not happen } this.#state = 'connected' this.#connectedAt = Date.now() this.#connected.resolve() + this.#log('connected') } disconnect() { // @ts-ignore - easier to ignore this than handle this for TS - avoids holding a reference to old Protomux instances this.#protomux = undefined /* c8 ignore next */ - if (this.#state === 'disconnected') return + if (this.#state === 'disconnected') { + this.#log('ERROR: tried to disconnect but was already disconnected') + return + } this.#state = 'disconnected' this.#disconnectedAt = Date.now() // Can just resolve this rather than reject, because #assertConnected will throw the error this.#connected.resolve() + let rejectCount = 0 for (const pending of this.pendingInvites.values()) { for (const { reject } of pending) { reject(new PeerDisconnectedError()) + rejectCount++ } } + this.#log('disconnected and rejected %d pending invites', rejectCount) this.pendingInvites.clear() } /** @param {InviteWithKeys} invite */ @@ -136,6 +152,7 @@ class Peer { const buf = Buffer.from(Invite.encode(invite).finish()) const messageType = MESSAGE_TYPES.Invite this.#channel.messages[messageType].send(buf) + this.#log('sent invite for %h', invite.projectKey) } /** @param {InviteResponse} response */ async sendInviteResponse(response) { @@ -143,6 +160,11 @@ class Peer { const buf = Buffer.from(InviteResponse.encode(response).finish()) const messageType = MESSAGE_TYPES.InviteResponse this.#channel.messages[messageType].send(buf) + this.#log( + 'sent response for %h: %s', + response.projectKey, + response.decision + ) } /** @param {DeviceInfo} deviceInfo */ async sendDeviceInfo(deviceInfo) { @@ -150,10 +172,12 @@ class Peer { const buf = Buffer.from(DeviceInfo.encode(deviceInfo).finish()) const messageType = MESSAGE_TYPES.DeviceInfo this.#channel.messages[messageType].send(buf) + this.#log('sent deviceInfo %o', deviceInfo) } /** @param {DeviceInfo} deviceInfo */ receiveDeviceInfo(deviceInfo) { this.#name = deviceInfo.name + this.#log('received deviceInfo %o', deviceInfo) } async #assertConnected() { await this.#connected.promise @@ -179,6 +203,17 @@ export class LocalPeers extends TypedEmitter { #opening = new Set() static InviteResponse = InviteResponse_Decision + #l + + /** + * + * @param {object} [opts] + * @param {Logger} [opts.logger] + */ + constructor({ logger } = {}) { + super() + this.#l = Logger.create('localPeers', logger) + } /** * Invite a peer to a project. Resolves with the response from the invitee: @@ -195,7 +230,6 @@ export class LocalPeers extends TypedEmitter { async invite(peerId, { timeout, ...invite }) { await Promise.all(this.#opening) const peer = this.#peers.get(peerId) - if (!peer) console.log([...this.#peers.keys()]) if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId) /** @type {Promise} */ return new Promise((origResolve, origReject) => { @@ -277,6 +311,11 @@ export class LocalPeers extends TypedEmitter { protomux.pair( { protocol: 'hypercore/alpha' }, /** @param {Buffer} discoveryKey */ async (discoveryKey) => { + this.#l.log( + 'Received discovery key %h from %h', + discoveryKey, + stream.noiseStream.remotePublicKey + ) this.emit('discovery-key', discoveryKey, stream.rawStream) } ) @@ -288,7 +327,13 @@ export class LocalPeers extends TypedEmitter { // opened, so this helped awaits the open openedNoiseSecretStream(stream).then((stream) => { this.#opening.delete(stream.opened) - if (stream.destroyed) return + if (stream.destroyed) { + this.#l.log( + 'Opened connection to %h but was already destroyed', + stream.remotePublicKey + ) + return + } const { remotePublicKey } = stream // This is written like this because the protomux uses the index within @@ -318,7 +363,11 @@ export class LocalPeers extends TypedEmitter { if (existingPeer && existingPeer.info.status !== 'disconnected') { existingPeer.disconnect() // Should not happen, but in case } - const peer = new Peer({ publicKey: remotePublicKey, channel }) + const peer = new Peer({ + publicKey: remotePublicKey, + channel, + logger: this.#l, + }) this.#peers.set(peerId, peer) // Do not emit peers now - will emit when connected }) @@ -345,7 +394,10 @@ export class LocalPeers extends TypedEmitter { const peerId = publicKey.toString('hex') const peer = this.#peers.get(peerId) /* c8 ignore next */ - if (!peer) return // TODO: report error - this should not happen + if (!peer) { + this.#l.log('ERROR: Could not close peer %h', publicKey) + return // TODO: report error - this should not happen + } // No-op if no change in state /* c8 ignore next */ if (peer.info.status === 'disconnected') return @@ -384,6 +436,7 @@ export class LocalPeers extends TypedEmitter { const invite = Invite.decode(value) assertInviteHasKeys(invite) this.emit('invite', peerId, invite) + this.#l.log('Invite from %h for %h', peerPublicKey, invite.projectKey) break } case 'InviteResponse': { @@ -397,6 +450,12 @@ export class LocalPeers extends TypedEmitter { for (const deferredPromise of pending) { deferredPromise.resolve(response.decision) } + this.#l.log( + 'Invite response from %h for %h: %s', + peerPublicKey, + response.projectKey, + response.decision + ) peer.pendingInvites.set(projectId, []) break } diff --git a/src/logger.js b/src/logger.js new file mode 100644 index 000000000..356e6cc35 --- /dev/null +++ b/src/logger.js @@ -0,0 +1,69 @@ +import createDebug from 'debug' +import { discoveryKey } from 'hypercore-crypto' + +const TRIM = 7 + +createDebug.formatters.h = (v) => { + if (!Buffer.isBuffer(v)) return '[undefined]' + return v.toString('hex').slice(0, TRIM) +} + +createDebug.formatters.S = (v) => { + if (typeof v !== 'string') return '[undefined]' + return v.slice(0, 7) +} + +createDebug.formatters.k = (v) => { + if (!Buffer.isBuffer(v)) return '[undefined]' + return discoveryKey(v).toString('hex').slice(0, TRIM) +} + +const counts = new Map() + +export class Logger { + #baseLogger + #log + + /** + * @param {string} ns + * @param {Logger} [logger] + */ + static create(ns, logger) { + if (logger) return logger.extend(ns) + const i = (counts.get(ns) || 0) + 1 + const deviceId = String(i).padStart(TRIM, '0') + return new Logger({ deviceId, ns }) + } + + /** + * @param {object} opts + * @param {string} opts.deviceId + * @param {createDebug.Debugger} [opts.baseLogger] + * @param {string} [opts.ns] + */ + constructor({ deviceId, baseLogger, ns }) { + this.deviceId = deviceId + this.#baseLogger = baseLogger || createDebug('mapeo' + (ns ? `:${ns}` : '')) + this.#log = this.#baseLogger.extend(this.deviceId.slice(0, TRIM)) + } + get enabled() { + return this.#log.enabled + } + + /** + * @param {Parameters} args + */ + log = (...args) => { + this.#log.apply(this, args) + } + /** + * + * @param {string} ns + */ + extend(ns) { + return new Logger({ + deviceId: this.deviceId, + baseLogger: this.#baseLogger.extend(ns), + }) + } +} diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index a21adb6e2..d138f5679 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -30,6 +30,7 @@ import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' import { MediaServer } from './media-server.js' import { LocalDiscovery } from './discovery/local-discovery.js' +import { Logger } from './logger.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -71,6 +72,7 @@ export class MapeoManager extends TypedEmitter { #invite #mediaServer #localDiscovery + #l /** * @param {Object} opts @@ -81,7 +83,9 @@ export class MapeoManager extends TypedEmitter { */ constructor({ rootKey, dbFolder, coreStorage, mediaServerOpts }) { super() - + this.#keyManager = new KeyManager(rootKey) + this.#deviceId = getDeviceId(this.#keyManager) + this.#l = new Logger({ deviceId: this.#deviceId, ns: 'manager' }) this.#dbFolder = dbFolder const sqlite = new Database( dbFolder === ':memory:' @@ -93,16 +97,15 @@ export class MapeoManager extends TypedEmitter { migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname, }) - this.#localPeers = new LocalPeers() + this.#localPeers = new LocalPeers({ logger: this.#l }) this.#localPeers.on('peers', (peers) => { this.emit('local-peers', omitPeerProtomux(peers)) }) - this.#keyManager = new KeyManager(rootKey) - this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ tables: [projectSettingsTable], sqlite, + logger: this.#l, }) this.#activeProjects = new Map() @@ -139,6 +142,7 @@ export class MapeoManager extends TypedEmitter { this.#localDiscovery = new LocalDiscovery({ identityKeypair: this.#keyManager.getIdentityKeypair(), + logger: this.#l, }) this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this)) } @@ -150,6 +154,10 @@ export class MapeoManager extends TypedEmitter { return this.#localPeers } + get deviceId() { + return this.#deviceId + } + /** * Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects * the Mapeo RPC channel and allows invites. All active projects will sync @@ -169,7 +177,11 @@ export class MapeoManager extends TypedEmitter { }) .catch((e) => { // Ignore error but log - console.error('Failed to send device info to peer', e) + this.#l.log( + 'Failed to send device info to peer %h', + noiseStream.remotePublicKey, + e + ) }) return replicationStream } @@ -285,6 +297,12 @@ export class MapeoManager extends TypedEmitter { // TODO: Close the project instance instead of keeping it around this.#activeProjects.set(projectPublicId, project) + this.#l.log( + 'created project %h, public id: %S', + projectKeypair.publicKey, + projectPublicId + ) + // 7. Return project public id return projectPublicId } @@ -338,6 +356,7 @@ export class MapeoManager extends TypedEmitter { sharedDb: this.#db, sharedIndexWriter: this.#projectSettingsIndexWriter, localPeers: this.#localPeers, + logger: this.#l, getMediaBaseUrl: this.#mediaServer.getMediaAddress.bind( this.#mediaServer ), diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 11ede268c..ba06a1e87 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -39,6 +39,7 @@ import { MemberApi } from './member-api.js' import { IconApi } from './icon-api.js' import { SyncApi, kSyncReplicate } from './sync/sync-api.js' import Hypercore from 'hypercore' +import { Logger } from './logger.js' /** @typedef {Omit} EditableProjectSettings */ @@ -64,6 +65,7 @@ export class MapeoProject { #projectPublicId #iconApi #syncApi + #l /** * @param {Object} opts @@ -77,6 +79,7 @@ export class MapeoProject { * @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data * @param {(mediaType: 'blobs' | 'icons') => Promise} opts.getMediaBaseUrl * @param {import('./local-peers.js').LocalPeers} opts.localPeers + * @param {Logger} [opts.logger] * */ constructor({ @@ -90,7 +93,9 @@ export class MapeoProject { encryptionKeys, getMediaBaseUrl, localPeers, + logger, }) { + this.#l = Logger.create('project', logger) this.#deviceId = getDeviceId(keyManager) this.#projectId = projectKeyToId(projectKey) this.#projectPublicId = projectKeyToPublicId(projectKey) @@ -121,6 +126,7 @@ export class MapeoProject { keyManager, storage: coreManagerStorage, sqlite, + logger: this.#l, }) const indexWriter = new IndexWriter({ @@ -144,6 +150,7 @@ export class MapeoProject { return doc } }, + logger: this.#l, }) this.#dataStores = { auth: new DataStore({ @@ -260,6 +267,7 @@ export class MapeoProject { this.#syncApi = new SyncApi({ coreManager: this.#coreManager, capabilities: this.#capabilities, + logger: this.#l, }) ///////// 4. Wire up sync @@ -305,6 +313,7 @@ export class MapeoProject { .then(deferred.resolve) .catch(deferred.reject) }) + this.#l.log('Created project instance %h', projectKey) } /** @@ -428,7 +437,8 @@ export class MapeoProject { return extractEditableProjectSettings( await this.#dataTypes.projectSettings.getByDocId(this.#projectId) ) - } catch { + } catch (e) { + this.#l.log('No project settings') return /** @type {EditableProjectSettings} */ ({}) } } diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index 01cf76ff7..306525b8e 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -1,5 +1,6 @@ import mapObject from 'map-obj' import { NAMESPACES } from '../core-manager/index.js' +import { Logger } from '../logger.js' /** * @typedef {import('../core-manager/index.js').Namespace} Namespace @@ -29,6 +30,7 @@ export class PeerSyncController { #downloadingRanges = new Map() /** @type {SyncStatus} */ #prevSyncStatus = createNamespaceMap('unknown') + #log /** * @param {object} opts @@ -36,8 +38,18 @@ export class PeerSyncController { * @param {import("../core-manager/index.js").CoreManager} opts.coreManager * @param {import("./sync-state.js").SyncState} opts.syncState * @param {import("../capabilities.js").Capabilities} opts.capabilities + * @param {Logger} [opts.logger] */ - constructor({ protomux, coreManager, syncState, capabilities }) { + constructor({ protomux, coreManager, syncState, capabilities, logger }) { + // @ts-ignore + this.#log = (formatter, ...args) => { + const log = Logger.create('peer', logger).log + return log.apply(null, [ + `[%h] ${formatter}`, + protomux.stream.remotePublicKey, + ...args, + ]) + } this.#coreManager = coreManager this.#protomux = protomux this.#capabilities = capabilities @@ -52,6 +64,14 @@ export class PeerSyncController { this.#updateEnabledNamespaces() } + get peerKey() { + return this.#protomux.stream.remotePublicKey + } + + get peerId() { + return this.peerKey?.toString('hex') + } + /** * Enable syncing of data (in the data and blob namespaces) */ @@ -99,6 +119,7 @@ export class PeerSyncController { const localState = mapObject(state, (ns, nsState) => { return [ns, nsState.localState] }) + this.#log('state %O', state) // Map of which namespaces have received new data since last sync change const didUpdate = mapObject(state, (ns) => { @@ -121,13 +142,12 @@ export class PeerSyncController { const cap = await this.#capabilities.getCapabilities(peerId) this.#syncCapability = cap.sync } catch (e) { + this.#log('Error reading capability', e) // Any error, consider sync blocked this.#syncCapability = createNamespaceMap('blocked') } } - // console.log(peerId.slice(0, 7), this.#syncCapability) - // console.log(peerId.slice(0, 7), didUpdate) - // console.dir(state, { depth: null, colors: true }) + this.#log('capability %o', this.#syncCapability) // If any namespace has new data, update what is enabled if (Object.values(didUpdate).indexOf(true) > -1) { @@ -190,6 +210,7 @@ export class PeerSyncController { (peer) => peer.protomux === this.#protomux ) if (!peerToUnreplicate) return + this.#log('unreplicating core %k', core.key) peerToUnreplicate.channel.close() this.#replicatingCores.delete(core) } @@ -222,6 +243,7 @@ export class PeerSyncController { this.#downloadCore(core) } this.#enabledNamespaces.add(namespace) + this.#log('enabled namespace %s', namespace) } /** @@ -233,6 +255,7 @@ export class PeerSyncController { this.#undownloadCore(core) } this.#enabledNamespaces.delete(namespace) + this.#log('disabled namespace %s', namespace) } } diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index 9508adc13..5eaf685db 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -1,6 +1,7 @@ import { TypedEmitter } from 'tiny-typed-emitter' import { SyncState } from './sync-state.js' import { PeerSyncController } from './peer-sync-controller.js' +import { Logger } from '../logger.js' export const kSyncReplicate = Symbol('replicate sync') @@ -20,6 +21,7 @@ export class SyncApi extends TypedEmitter { #peerSyncControllers = new Map() /** @type {Set<'local' | 'remote'>} */ #dataSyncEnabled = new Set() + #l /** * @@ -27,9 +29,11 @@ export class SyncApi extends TypedEmitter { * @param {import('../core-manager/index.js').CoreManager} opts.coreManager * @param {import("../capabilities.js").Capabilities} opts.capabilities * @param {number} [opts.throttleMs] + * @param {Logger} [opts.logger] */ - constructor({ coreManager, throttleMs = 200, capabilities }) { + constructor({ coreManager, throttleMs = 200, capabilities, logger }) { super() + this.#l = Logger.create('syncApi', logger) this.#coreManager = coreManager this.#capabilities = capabilities this.syncState = new SyncState({ coreManager, throttleMs }) @@ -46,6 +50,7 @@ export class SyncApi extends TypedEmitter { start() { if (this.#dataSyncEnabled.has('local')) return this.#dataSyncEnabled.add('local') + this.#l.log('Starting data sync') for (const peerSyncController of this.#peerSyncControllers.values()) { peerSyncController.enableDataSync() } @@ -57,6 +62,7 @@ export class SyncApi extends TypedEmitter { stop() { if (!this.#dataSyncEnabled.has('local')) return this.#dataSyncEnabled.delete('local') + this.#l.log('Stopping data sync') for (const peerSyncController of this.#peerSyncControllers.values()) { peerSyncController.disableDataSync() } @@ -66,13 +72,20 @@ export class SyncApi extends TypedEmitter { * @param {import('protomux')} protomux A protomux instance */ [kSyncReplicate](protomux) { - if (this.#peerSyncControllers.has(protomux)) return + if (this.#peerSyncControllers.has(protomux)) { + this.#l.log( + 'Unexpected existing peer sync controller for peer %h', + protomux.stream.remotePublicKey + ) + return + } const peerSyncController = new PeerSyncController({ protomux, coreManager: this.#coreManager, syncState: this.syncState, capabilities: this.#capabilities, + logger: this.#l, }) if (this.#dataSyncEnabled.has('local')) { peerSyncController.enableDataSync()