Skip to content

Commit

Permalink
fix: waitFor behaviour, only wait for reachable
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Jan 2, 2024
1 parent 93ca0a3 commit 9935618
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 41 deletions.
62 changes: 46 additions & 16 deletions packages/transport/stream/src/__tests__/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,6 @@ const getWritesCount = (writes: Map<string, DataMessage[]>) => {
return sum;
};

const addDelay = (stream: DirectStream, ms: () => number) => {
const fn = stream.processMessage.bind(stream);
stream.processMessage = async (a, b, c) => {
await delay(ms());
return fn(a, b, c);
};
};
const getUniqueMessages = async (messages: Message[]) => {
const map: Map<string, Message> = new Map();
for (const message of messages) {
const id = await getMsgId(message.bytes());
map.set(id, message);
}
return [...map.values()];
};

const createMetrics = (stream: DirectStream) => {
const s: {
stream: TestDirectStream;
Expand Down Expand Up @@ -2063,6 +2047,52 @@ describe("start/stop", () => {
await session.peers[0].services.directstream.start();
await waitForPeerStreams(stream(session, 0), stream(session, 1));
});

it("wait for only waits for reachable", async () => {
session = await disconnected(3, {
transports: [tcp()],
services: {
directstream: (c) =>
new TestDirectStream(c, {
connectionManager: { dialer: false, pruner: false }
})
}
});

await session.connect([
// behaviour seems to be more predictable if we connect after start (TODO improve startup to use existing connections in a better way)
[session.peers[0], session.peers[1]],
[session.peers[1], session.peers[2]]
]);
await waitForPeerStreams(stream(session, 0), stream(session, 1));
await waitForPeerStreams(stream(session, 1), stream(session, 2));

expect(
session.peers[0].services.directstream.routes.isReachable(
session.peers[0].services.directstream.publicKey.hashcode(),
session.peers[2].services.directstream.publicKey.hashcode()
)
).toBeFalse();
await session.peers[0].services.directstream.publish(new Uint8Array([0]), {
mode: new SeekDelivery({ redundancy: 1 })
});
await session.peers[0].services.directstream.waitFor(
session.peers[2].peerId
);
await expect(
session.peers[0].services.directstream.waitFor(session.peers[2].peerId, {
neighbour: true,
timeout: 1000
})
).rejects.toThrow();

await expect(
session.peers[0].services.directstream.waitFor(session.peers[1].peerId, {
neighbour: true,
timeout: 1000
})
);
});
});

describe("multistream", () => {
Expand Down
48 changes: 26 additions & 22 deletions packages/transport/stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,8 @@ export abstract class DirectStream<
if (
this.signaturePolicy === "StictSign" ||
mode instanceof SeekDelivery ||
mode instanceof AcknowledgeDelivery
mode instanceof AcknowledgeDelivery ||
mode instanceof AnyWhere
) {
await message.sign(this.sign);
}
Expand Down Expand Up @@ -1715,26 +1716,27 @@ export abstract class DirectStream<

async waitFor(
peer: PeerId | PublicSignKey,
options?: { signal: AbortSignal }
options?: { timeout?: number; signal?: AbortSignal; neighbour?: boolean }
) {
const hash = (
peer instanceof PublicSignKey ? peer : getPublicKeyFromPeerId(peer)
).hashcode();
try {
await waitFor(
() => {
if (!this.peers.has(hash)) {
if (options?.neighbour && !this.peers.has(hash)) {
return false;
}
if (!this.routes.isReachable(this.publicKeyHash, hash)) {

if (!this.routes.isReachable(this.publicKeyHash, hash, 0)) {
return false;
}

return true;
},
{
signal: options?.signal,
timeout: 10 * 1000
timeout: options?.timeout ?? 10 * 1000
}
);
} catch (error) {
Expand All @@ -1744,25 +1746,27 @@ export abstract class DirectStream<
" does not exist. Connection exist: " +
this.peers.has(hash) +
". Route exist: " +
this.routes.isReachable(this.publicKeyHash, hash)
this.routes.isReachable(this.publicKeyHash, hash, 0)
);
}
const stream = this.peers.get(hash)!;
try {
// Dontwait for readlable https://github.com/libp2p/js-libp2p/issues/2321
await waitFor(() => /* stream.isReadable && */ stream.isWritable, {
signal: options?.signal,
timeout: 10 * 1000
});
} catch (error) {
throw new Error(
"Stream to " +
stream.publicKey.hashcode() +
" not ready. Readable: " +
stream.isReadable +
". Writable " +
stream.isWritable
);
if (options?.neighbour) {
const stream = this.peers.get(hash)!;
try {
// Dontwait for readlable https://github.com/libp2p/js-libp2p/issues/2321
await waitFor(() => /* stream.isReadable && */ stream.isWritable, {
signal: options?.signal,
timeout: options?.timeout ?? 10 * 1000
});
} catch (error) {
throw new Error(
"Stream to " +
stream.publicKey.hashcode() +
" not ready. Readable: " +
stream.isReadable +
". Writable " +
stream.isWritable
);
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/transport/stream/src/routes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PublicSignKey } from "@peerbit/crypto";

export const MAX_ROUTE_DISTANCE = Number.MAX_SAFE_INTEGER;
export const MAX_ROUTE_DISTANCE = Number.MAX_SAFE_INTEGER - 1;
export class Routes {
// END receiver -> Neighbour

Expand Down Expand Up @@ -117,10 +117,10 @@ export class Routes {
return this.routes.get(from)?.get(target);
}

isReachable(from: string, target: string) {
isReachable(from: string, target: string, maxDistance = MAX_ROUTE_DISTANCE) {
return (
(this.routes.get(from)?.get(target)?.list[0]?.distance ??
Number.MAX_SAFE_INTEGER) < MAX_ROUTE_DISTANCE
Number.MAX_SAFE_INTEGER) <= maxDistance
);
}

Expand Down

0 comments on commit 9935618

Please sign in to comment.