Skip to content

Commit

Permalink
chore: Add debug logging
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Nov 10, 2023
1 parent 9486d8a commit 8c6a081
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 21 deletions.
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
import { CoreIndex } from './core-index.js'
import { ReplicationStateMachine } from './replication-state-machine.js'
import * as rle from './bitfield-rle.js'
import { Logger } from '../logger.js'

// WARNING: Changing these will break things for existing apps, since namespaces
// are used for key derivation
Expand Down Expand Up @@ -54,6 +55,8 @@ export class CoreManager extends TypedEmitter {
#state = 'opened'
#ready
#haveExtension
#deviceId
#l

static get namespaces() {
return NAMESPACES
Expand All @@ -67,6 +70,7 @@ export class CoreManager extends TypedEmitter {
* @param {Buffer} [options.projectSecretKey] 32-byte secret key of the project creator core
* @param {Partial<Record<Namespace, Buffer>>} [options.encryptionKeys] Encryption keys for each namespace
* @param {import('hypercore').HypercoreStorage} options.storage Folder to store all hypercore data
* @param {Logger} [options.logger]
*/
constructor({
sqlite,
Expand All @@ -75,6 +79,7 @@ export class CoreManager extends TypedEmitter {
projectSecretKey,
encryptionKeys = {},
storage,
logger,
}) {
super()
assert(
Expand All @@ -85,7 +90,9 @@ export class CoreManager extends TypedEmitter {
!projectSecretKey || projectSecretKey.length === 64,
'project owner core secret key must be 64-byte buffer'
)
this.#l = Logger.create('coreManager', logger)
const primaryKey = keyManager.getDerivedKey('primaryKey', projectKey)
this.#deviceId = keyManager.getIdentityKeypair().publicKey.toString('hex')
this.#projectKey = projectKey
this.#encryptionKeys = encryptionKeys

Expand Down Expand Up @@ -157,7 +164,15 @@ export class CoreManager extends TypedEmitter {

this.#ready = Promise.all(
[...this.#coreIndex].map(({ core }) => core.ready())
).catch(() => {})
)
.then(() => {
this.#l.log('ready')
})
.catch(() => {})
}

get deviceId() {
return this.#deviceId
}

get creatorCore() {
Expand Down Expand Up @@ -304,6 +319,12 @@ export class CoreManager extends TypedEmitter {
this.#addCoreSqlStmt.run({ publicKey: key, namespace })
}

this.#l.log(
'Added %s %s core %k',
persist ? 'remote' : writer ? 'local' : 'creator',
namespace,
key
)
this.emit('add-core', { core, key, namespace })

return { core, key, namespace }
Expand Down
17 changes: 16 additions & 1 deletion src/index-writer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import SqliteIndexer from '@mapeo/sqlite-indexer'
import { getTableConfig } from 'drizzle-orm/sqlite-core'
import { getBacklinkTableName } from '../schema/utils.js'
import { discoveryKey } from 'hypercore-crypto'
import { Logger } from '../logger.js'

/**
* @typedef {import('../datatype/index.js').MapeoDocTables} MapeoDocTables
Expand All @@ -21,15 +22,18 @@ export class IndexWriter {
/** @type {Map<TTables['_']['name'], SqliteIndexer>} */
#indexers = new Map()
#mapDoc
#l
/**
*
* @param {object} opts
* @param {import('better-sqlite3').Database} opts.sqlite
* @param {TTables[]} opts.tables
* @param {(doc: MapeoDocInternal, version: import('@mapeo/schema').VersionIdObject) => MapeoDoc} [opts.mapDoc] optionally transform a document prior to indexing. Can also validate, if an error is thrown then the document will not be indexed
* @param {typeof import('@mapeo/sqlite-indexer').defaultGetWinner} [opts.getWinner] custom function to determine the "winner" of two forked documents. Defaults to choosing the document with the most recent `updatedAt`
* @param {Logger} [opts.logger]
*/
constructor({ tables, sqlite, mapDoc = (d) => d, getWinner }) {
constructor({ tables, sqlite, mapDoc = (d) => d, getWinner, logger }) {
this.#l = Logger.create('indexWriter', logger)
this.#mapDoc = mapDoc
for (const table of tables) {
const config = getTableConfig(table)
Expand Down Expand Up @@ -63,6 +67,7 @@ export class IndexWriter {
const version = { coreDiscoveryKey: discoveryKey(key), index }
var doc = this.#mapDoc(decode(block, version), version)
} catch (e) {
this.#l.log('Could not decode entry %d of %h', index, key)
// Unknown or invalid entry - silently ignore
continue
}
Expand All @@ -80,6 +85,16 @@ export class IndexWriter {
continue
}
indexer.batch(docs)
if (this.#l.enabled) {
for (const doc of docs) {
this.#l.log(
'Indexed %s %S @ %S',
doc.schemaName,
doc.docId,
doc.versionId
)
}
}
}
}
}
71 changes: 65 additions & 6 deletions src/local-peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
InviteResponse_Decision,
} from './generated/rpc.js'
import pDefer from 'p-defer'
import { Logger } from './logger.js'

const PROTOCOL_NAME = 'mapeo/rpc'

Expand Down Expand Up @@ -59,16 +60,23 @@ class Peer {
#disconnectedAt = 0
/** @type {Protomux<import('@hyperswarm/secret-stream')>} */
#protomux
#log

/**
* @param {object} options
* @param {Buffer} options.publicKey
* @param {ReturnType<typeof Protomux.prototype.createChannel>} options.channel
* @param {Logger} [options.logger]
*/
constructor({ publicKey, channel }) {
constructor({ publicKey, channel, logger }) {
this.#publicKey = publicKey
this.#channel = channel
this.#connected = pDefer()
// @ts-ignore
this.#log = (formatter, ...args) => {
const log = Logger.create('peer', logger).log
return log.apply(null, [`[%h] ${formatter}`, publicKey, ...args])
}
}
/** @returns {PeerInfoInternal} */
get info() {
Expand Down Expand Up @@ -108,26 +116,34 @@ class Peer {
this.#protomux = protomux
/* c8 ignore next 3 */
if (this.#state !== 'connecting') {
this.#log('ERROR: tried to connect but state was %s', this.#state)
return // TODO: report error - this should not happen
}
this.#state = 'connected'
this.#connectedAt = Date.now()
this.#connected.resolve()
this.#log('connected')
}
disconnect() {
// @ts-ignore - easier to ignore this than handle this for TS - avoids holding a reference to old Protomux instances
this.#protomux = undefined
/* c8 ignore next */
if (this.#state === 'disconnected') return
if (this.#state === 'disconnected') {
this.#log('ERROR: tried to disconnect but was already disconnected')
return
}
this.#state = 'disconnected'
this.#disconnectedAt = Date.now()
// Can just resolve this rather than reject, because #assertConnected will throw the error
this.#connected.resolve()
let rejectCount = 0
for (const pending of this.pendingInvites.values()) {
for (const { reject } of pending) {
reject(new PeerDisconnectedError())
rejectCount++
}
}
this.#log('disconnected and rejected %d pending invites', rejectCount)
this.pendingInvites.clear()
}
/** @param {InviteWithKeys} invite */
Expand All @@ -136,24 +152,32 @@ class Peer {
const buf = Buffer.from(Invite.encode(invite).finish())
const messageType = MESSAGE_TYPES.Invite
this.#channel.messages[messageType].send(buf)
this.#log('sent invite for %h', invite.projectKey)
}
/** @param {InviteResponse} response */
async sendInviteResponse(response) {
await this.#assertConnected()
const buf = Buffer.from(InviteResponse.encode(response).finish())
const messageType = MESSAGE_TYPES.InviteResponse
this.#channel.messages[messageType].send(buf)
this.#log(
'sent response for %h: %s',
response.projectKey,
response.decision
)
}
/** @param {DeviceInfo} deviceInfo */
async sendDeviceInfo(deviceInfo) {
await this.#assertConnected()
const buf = Buffer.from(DeviceInfo.encode(deviceInfo).finish())
const messageType = MESSAGE_TYPES.DeviceInfo
this.#channel.messages[messageType].send(buf)
this.#log('sent deviceInfo %o', deviceInfo)
}
/** @param {DeviceInfo} deviceInfo */
receiveDeviceInfo(deviceInfo) {
this.#name = deviceInfo.name
this.#log('received deviceInfo %o', deviceInfo)
}
async #assertConnected() {
await this.#connected.promise
Expand All @@ -179,6 +203,17 @@ export class LocalPeers extends TypedEmitter {
#opening = new Set()

static InviteResponse = InviteResponse_Decision
#l

/**
*
* @param {object} [opts]
* @param {Logger} [opts.logger]
*/
constructor({ logger } = {}) {
super()
this.#l = Logger.create('localPeers', logger)
}

/**
* Invite a peer to a project. Resolves with the response from the invitee:
Expand All @@ -195,7 +230,6 @@ export class LocalPeers extends TypedEmitter {
async invite(peerId, { timeout, ...invite }) {
await Promise.all(this.#opening)
const peer = this.#peers.get(peerId)
if (!peer) console.log([...this.#peers.keys()])
if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId)
/** @type {Promise<InviteResponse['decision']>} */
return new Promise((origResolve, origReject) => {
Expand Down Expand Up @@ -277,6 +311,11 @@ export class LocalPeers extends TypedEmitter {
protomux.pair(
{ protocol: 'hypercore/alpha' },
/** @param {Buffer} discoveryKey */ async (discoveryKey) => {
this.#l.log(
'Received dk %h from %h',
discoveryKey,
stream.noiseStream.remotePublicKey
)
this.emit('discovery-key', discoveryKey, stream.rawStream)
}
)
Expand All @@ -288,7 +327,13 @@ export class LocalPeers extends TypedEmitter {
// opened, so this helped awaits the open
openedNoiseSecretStream(stream).then((stream) => {
this.#opening.delete(stream.opened)
if (stream.destroyed) return
if (stream.destroyed) {
this.#l.log(
'Opened connection to %h but was already destroyed',
stream.remotePublicKey
)
return
}
const { remotePublicKey } = stream

// This is written like this because the protomux uses the index within
Expand Down Expand Up @@ -318,7 +363,11 @@ export class LocalPeers extends TypedEmitter {
if (existingPeer && existingPeer.info.status !== 'disconnected') {
existingPeer.disconnect() // Should not happen, but in case
}
const peer = new Peer({ publicKey: remotePublicKey, channel })
const peer = new Peer({
publicKey: remotePublicKey,
channel,
logger: this.#l,
})
this.#peers.set(peerId, peer)
// Do not emit peers now - will emit when connected
})
Expand All @@ -345,7 +394,10 @@ export class LocalPeers extends TypedEmitter {
const peerId = publicKey.toString('hex')
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
if (!peer) {
this.#l.log('ERROR: Could not close peer %h', publicKey)
return // TODO: report error - this should not happen
}
// No-op if no change in state
/* c8 ignore next */
if (peer.info.status === 'disconnected') return
Expand Down Expand Up @@ -384,6 +436,7 @@ export class LocalPeers extends TypedEmitter {
const invite = Invite.decode(value)
assertInviteHasKeys(invite)
this.emit('invite', peerId, invite)
this.#l.log('Invite from %h for %h', peerPublicKey, invite.projectKey)
break
}
case 'InviteResponse': {
Expand All @@ -397,6 +450,12 @@ export class LocalPeers extends TypedEmitter {
for (const deferredPromise of pending) {
deferredPromise.resolve(response.decision)
}
this.#l.log(
'Invite response from %h for %h: %s',
peerPublicKey,
response.projectKey,
response.decision
)
peer.pendingInvites.set(projectId, [])
break
}
Expand Down
Loading

0 comments on commit 8c6a081

Please sign in to comment.