Skip to content

Commit

Permalink
feat: add traceFunction call to metrics (#2898)
Browse files Browse the repository at this point in the history
Allow tracing method calls using a metrics implementation.

```js
const libp2p = await createLibp2p()

for await (const foo of libp2p.contentRouting.findProviders(cid, {
  trace: libp2p.metrics?.createTrace(),
  signal: AbortSignal.timeout(20_000)
}) {
  //...
}
```

Adds tracing support to `libp2p.contentRouting.*` and `libp2p.peerRouting.*` to start with, other methods can have it added when necessary.

Traces can have attributes set on them to give context to a call (input args, output values, yielded results, etc).
  • Loading branch information
achingbrain authored Jan 7, 2025
1 parent 7701438 commit 20d9ba7
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 13 deletions.
4 changes: 2 additions & 2 deletions packages/interface/src/content-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { AbortOptions, RoutingOptions } from '../index.js'
import type { RoutingOptions } from '../index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { CID } from 'multiformats/cid'

Expand Down Expand Up @@ -50,7 +50,7 @@ export interface ContentRouting {
* provide content corresponding to the passed CID, call this function to no
* longer remind them.
*/
cancelReprovide (key: CID, options?: AbortOptions): Promise<void>
cancelReprovide (key: CID, options?: RoutingOptions): Promise<void>

/**
* Find the providers of the passed CID.
Expand Down
11 changes: 10 additions & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -753,13 +753,22 @@ export interface LoggerOptions {
log: Logger
}

/**
* An object that includes a trace object that is passed onwards.
*
* This is used by metrics method tracing to link function calls together.
*/
export interface TraceOptions {
trace?: any
}

/**
* When a routing operation involves reading values, these options allow
* controlling where the values are read from. By default libp2p will check
* local caches but may not use the network if a valid local value is found,
* these options allow tuning that behaviour.
*/
export interface RoutingOptions extends AbortOptions, ProgressOptions {
export interface RoutingOptions extends AbortOptions, ProgressOptions, TraceOptions {
/**
* Pass `false` to not use the network
*
Expand Down
57 changes: 57 additions & 0 deletions packages/interface/src/metrics/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,61 @@ export interface Metrics {
* method on the returned summary group object
*/
registerSummaryGroup: ((name: string, options?: SummaryOptions) => SummaryGroup) & ((name: string, options: CalculatedSummaryOptions<Record<string, number>>) => void)

/**
* Wrap a function for tracing purposes.
*
* All functions wrapped like this should accept a final optional options arg.
*
* In order to pass an execution context along to create a multi-layered
* trace, the index of the options arg must be specified.
*/
traceFunction <F extends (...args: any[]) => AsyncIterator<any>> (name: string, fn: F, options?: TraceGeneratorFunctionOptions<Parameters<F>, ReturnType<F>, YieldType<ReturnType<F>>>): F
traceFunction <F extends (...args: any[]) => Iterator<any>> (name: string, fn: F, options?: TraceGeneratorFunctionOptions<Parameters<F>, ReturnType<F>, YieldType<ReturnType<F>>>): F
traceFunction <F extends (...args: any[]) => any = (...args: any[]) => any> (name: string, fn: F, options?: TraceFunctionOptions<Parameters<F>, ReturnType<F>>): F

/**
* Creates a tracing context that can be used to trace a method call
*/
createTrace(): any
}

/**
* Infer the yielded type of an (async)iterable
*/
type YieldType<T extends AsyncIterator<any> | Iterator<any>> = T extends AsyncIterator<infer Y> ? Y : T extends Iterator<infer Y, any, any> ? Y : never

export type TraceAttributes = Record<string, number | string | boolean | number[] | string[] | boolean[]>

export interface TraceFunctionOptions<A, B> {
/**
* To construct a trace that spans multiple method invocations, it's necessary
* to pass the trace context onwards as part of the options object.
*
* Specify the index of the options object in the args array here.
*
* @default 0
*/
optionsIndex?: number

/**
* Set attributes on the trace by modifying the passed attributes object.
*/
getAttributesFromArgs?(args: A, attributes: TraceAttributes): TraceAttributes

/**
* Set attributes on the trace by modifying the passed attributes object. The
* object will have previously been passed to `appendAttributesFromArgs`
* and/or `appendAttributesFromYieldedValue` (if defined)
*/
getAttributesFromReturnValue?(value: B, attributes: TraceAttributes): TraceAttributes
}

export interface TraceGeneratorFunctionOptions<A, B, C = any> extends TraceFunctionOptions<A, B> {
/**
* Set attributes on the trace by modifying the passed attributes object. The
* object will have previously been passed to `appendAttributesFromArgs` (if
* defined)
*/
getAttributesFromYieldedValue? (value: C, attributes: TraceAttributes, index: number): TraceAttributes
}
17 changes: 15 additions & 2 deletions packages/kad-dht/src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ export class ContentFetching {
this.peerRouting = peerRouting
this.queryManager = queryManager
this.network = network

this.get = components.metrics?.traceFunction('libp2p.kadDHT.get', this.get.bind(this), {
optionsIndex: 1
}) ?? this.get
this.put = components.metrics?.traceFunction('libp2p.kadDHT.put', this.put.bind(this), {
optionsIndex: 2
}) ?? this.put
}

/**
Expand Down Expand Up @@ -145,7 +152,10 @@ export class ContentFetching {

// put record to the closest peers
yield * pipe(
this.peerRouting.getClosestPeers(key, { signal: options.signal }),
this.peerRouting.getClosestPeers(key, {
...options,
signal: options.signal
}),
(source) => map(source, (event) => {
return async () => {
if (event.name !== 'FINAL_PEER') {
Expand Down Expand Up @@ -252,7 +262,10 @@ export class ContentFetching {
const self = this // eslint-disable-line @typescript-eslint/no-this-alias

const getValueQuery: QueryFunc = async function * ({ peer, signal }) {
for await (const event of self.peerRouting.getValueOrPeers(peer, key, { signal })) {
for await (const event of self.peerRouting.getValueOrPeers(peer, key, {
...options,
signal
})) {
yield event

if (event.name === 'PEER_RESPONSE' && (event.record != null)) {
Expand Down
23 changes: 23 additions & 0 deletions packages/kad-dht/src/content-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@ export class ContentRouting {
this.queryManager = queryManager
this.routingTable = routingTable
this.providers = providers

this.findProviders = components.metrics?.traceFunction('libp2p.kadDHT.findProviders', this.findProviders.bind(this), {
optionsIndex: 1,
getAttributesFromYieldedValue: (event, attrs: { providers?: string[] }) => {
if (event.name === 'PROVIDER') {
attrs.providers ??= []
attrs.providers.push(...event.providers.map(info => info.id.toString()))
}

return attrs
}
}) ?? this.findProviders
this.provide = components.metrics?.traceFunction('libp2p.kadDHT.provide', this.provide.bind(this), {
optionsIndex: 1,
getAttributesFromYieldedValue: (event, attrs: { providers?: string[] }) => {
if (event.name === 'PEER_RESPONSE' && event.messageName === 'ADD_PROVIDER') {
attrs.providers ??= []
attrs.providers.push(event.from.toString())
}

return attrs
}
}) ?? this.provide
}

/**
Expand Down
55 changes: 55 additions & 0 deletions packages/kad-dht/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,61 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
operations: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_outbound_rpc_requests_total`),
errors: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_outbound_rpc_errors_total`)
}

this.sendRequest = components.metrics?.traceFunction('libp2p.kadDHT.sendRequest', this.sendRequest.bind(this), {
optionsIndex: 2,
getAttributesFromArgs ([to, message], attrs) {
return {
...attrs,
to: to.toString(),
'message type': `${message.type}`
}
},
getAttributesFromYieldedValue: (event, attrs) => {
if (event.name === 'PEER_RESPONSE') {
if (event.providers.length > 0) {
event.providers.forEach((value, index) => {
attrs[`providers-${index}`] = value.id.toString()
})
}

if (event.closer.length > 0) {
event.closer.forEach((value, index) => {
attrs[`closer-${index}`] = value.id.toString()
})
}
}

return attrs
}
}) ?? this.sendRequest
this.sendMessage = components.metrics?.traceFunction('libp2p.kadDHT.sendMessage', this.sendMessage.bind(this), {
optionsIndex: 2,
getAttributesFromArgs ([to, message], attrs) {
return {
...attrs,
to: to.toString(),
'message type': `${message.type}`
}
},
getAttributesFromYieldedValue: (event, attrs) => {
if (event.name === 'PEER_RESPONSE') {
if (event.providers.length > 0) {
event.providers.forEach((value, index) => {
attrs[`providers-${index}`] = value.id.toString()
})
}

if (event.closer.length > 0) {
event.closer.forEach((value, index) => {
attrs[`closer-${index}`] = value.id.toString()
})
}
}

return attrs
}
}) ?? this.sendMessage
}

/**
Expand Down
10 changes: 9 additions & 1 deletion packages/kad-dht/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import type { Network } from '../network.js'
import type { QueryManager, QueryOptions } from '../query/manager.js'
import type { QueryFunc } from '../query/types.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { ComponentLogger, Logger, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'
import type { ComponentLogger, Logger, Metrics, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'

export interface PeerRoutingComponents {
peerId: PeerId
peerStore: PeerStore
logger: ComponentLogger
metrics?: Metrics
}

export interface PeerRoutingInit {
Expand Down Expand Up @@ -55,6 +56,13 @@ export class PeerRouting {
this.peerStore = components.peerStore
this.peerId = components.peerId
this.log = components.logger.forComponent(`${init.logPrefix}:peer-routing`)

this.findPeer = components.metrics?.traceFunction('libp2p.kadDHT.findPeer', this.findPeer.bind(this), {
optionsIndex: 1
}) ?? this.findPeer
this.getClosestPeers = components.metrics?.traceFunction('libp2p.kadDHT.getClosestPeers', this.getClosestPeers.bind(this), {
optionsIndex: 1
}) ?? this.getClosestPeers
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/query-self.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import { timeOperationMethod } from './utils.js'
import type { OperationMetrics } from './kad-dht.js'
import type { PeerRouting } from './peer-routing/index.js'
import type { RoutingTable } from './routing-table/index.js'
import type { ComponentLogger, Logger, PeerId, Startable } from '@libp2p/interface'
import type { ComponentLogger, Logger, Metrics, PeerId, Startable } from '@libp2p/interface'
import type { DeferredPromise } from 'p-defer'

export interface QuerySelfInit {
logPrefix: string
peerRouting: PeerRouting
Expand All @@ -28,6 +27,7 @@ export interface QuerySelfInit {
export interface QuerySelfComponents {
peerId: PeerId
logger: ComponentLogger
metrics?: Metrics
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export class QueryManager implements Startable {
// Create query paths from the starting peers
const paths = peersToQuery.map((peer, index) => {
return queryPath({
...options,
key,
startingPeer: peer,
ourPeerId: this.peerId,
Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu

try {
for await (const event of query({
...options,
key,
peer,
signal: compoundSignal,
Expand Down
56 changes: 54 additions & 2 deletions packages/libp2p/src/content-routing.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { NotStartedError } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import merge from 'it-merge'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { NoContentRoutersError } from './errors.js'
import type { AbortOptions, ComponentLogger, ContentRouting, PeerInfo, PeerRouting, PeerStore, RoutingOptions, Startable } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, ContentRouting, Metrics, PeerInfo, PeerRouting, PeerStore, RoutingOptions, Startable } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export interface CompoundContentRoutingInit {
Expand All @@ -13,6 +14,7 @@ export interface CompoundContentRoutingComponents {
peerStore: PeerStore
peerRouting: PeerRouting
logger: ComponentLogger
metrics?: Metrics
}

export class CompoundContentRouting implements ContentRouting, Startable {
Expand All @@ -24,6 +26,56 @@ export class CompoundContentRouting implements ContentRouting, Startable {
this.routers = init.routers ?? []
this.started = false
this.components = components

this.findProviders = components.metrics?.traceFunction('libp2p.contentRouting.findProviders', this.findProviders.bind(this), {
optionsIndex: 1,
getAttributesFromArgs: ([cid], attrs) => {
return {
...attrs,
cid: cid.toString()
}
},
getAttributesFromYieldedValue: (value, attrs: { providers?: string[] }) => {
return {
...attrs,
providers: [...(Array.isArray(attrs.providers) ? attrs.providers : []), value.id.toString()]
}
}
}) ?? this.findProviders
this.provide = components.metrics?.traceFunction('libp2p.contentRouting.provide', this.provide.bind(this), {
optionsIndex: 1,
getAttributesFromArgs: ([cid], attrs) => {
return {
...attrs,
cid: cid.toString()
}
}
}) ?? this.provide
this.cancelReprovide = components.metrics?.traceFunction('libp2p.contentRouting.cancelReprovide', this.cancelReprovide.bind(this), {
optionsIndex: 1,
getAttributesFromArgs: ([cid], attrs) => {
return {
...attrs,
cid: cid.toString()
}
}
}) ?? this.cancelReprovide
this.put = components.metrics?.traceFunction('libp2p.contentRouting.put', this.put.bind(this), {
optionsIndex: 2,
getAttributesFromArgs: ([key]) => {
return {
key: uint8ArrayToString(key, 'base36')
}
}
}) ?? this.put
this.get = components.metrics?.traceFunction('libp2p.contentRouting.get', this.get.bind(this), {
optionsIndex: 1,
getAttributesFromArgs: ([key]) => {
return {
key: uint8ArrayToString(key, 'base36')
}
}
}) ?? this.get
}

readonly [Symbol.toStringTag] = '@libp2p/content-routing'
Expand All @@ -43,7 +95,7 @@ export class CompoundContentRouting implements ContentRouting, Startable {
/**
* Iterates over all content routers in parallel to find providers of the given key
*/
async * findProviders (key: CID, options: RoutingOptions = {}): AsyncIterable<PeerInfo> {
async * findProviders (key: CID, options: RoutingOptions = {}): AsyncGenerator<PeerInfo> {
if (this.routers.length === 0) {
throw new NoContentRoutersError('No content routers available')
}
Expand Down
Loading

0 comments on commit 20d9ba7

Please sign in to comment.