Skip to content

Commit

Permalink
fix: make blocks propagate over hops
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Jan 2, 2024
1 parent 9935618 commit 1474397
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 29 deletions.
11 changes: 9 additions & 2 deletions packages/transport/blocks/src/__tests__/libp2p.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,27 @@ describe("transport", function () {
}); */

it("rw", async () => {
session = await TestSession.connected(2, {
session = await TestSession.disconnected(3, {
services: { blocks: (c) => new DirectBlock(c) }
});

await store(session, 0).start();
await store(session, 1).start();
await store(session, 2).start();

await session.connect([
[session.peers[0], session.peers[1]],
[session.peers[1], session.peers[2]]
]);

await waitForPeers(store(session, 0), store(session, 1));
await waitForPeers(store(session, 1), store(session, 2));

const data = new Uint8Array([5, 4, 3]);
const cid = await store(session, 0).put(data);

expect(cid).toEqual("zb2rhbnwihVzMMEGAPf9EwTZBsQz9fszCnM4Y8mJmBFgiyN7J");
const readData = await store(session, 1).get(cid);
const readData = await store(session, 2).get(cid);
expect(new Uint8Array(readData!)).toEqual(data);
});

Expand Down
5 changes: 3 additions & 2 deletions packages/transport/blocks/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { GetOptions } from "@peerbit/blocks-interface";
import { createStore } from "@peerbit/any-store";
import { BlockMessage, RemoteBlocks } from "./remote.js";
import { PublicSignKey } from "@peerbit/crypto";
import { DataMessage } from "@peerbit/stream-interface";
import { AnyWhere, DataMessage } from "@peerbit/stream-interface";
import { deserialize, serialize } from "@dao-xyz/borsh";

export type DirectBlockComponents = DirectStreamComponents;
Expand Down Expand Up @@ -37,7 +37,8 @@ export class DirectBlock extends DirectStream implements IBlocks {
});
this.remoteBlocks = new RemoteBlocks({
local: new AnyBlockStore(createStore(options?.directory)),
publish: (message) => this.publish(serialize(message)),
publish: (message) =>
this.publish(serialize(message), { mode: new AnyWhere() }),
localTimeout: options?.localTimeout || 1000,
messageProcessingConcurrency: options?.messageProcessingConcurrency || 10,
waitFor: this.waitFor.bind(this)
Expand Down
25 changes: 0 additions & 25 deletions packages/transport/blocks/src/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,6 @@ export class RemoteBlocks implements IBlocks {
this.localStore = options?.local;
this._resolvers = new Map();
this._readFromPeersPromises = new Map();
/* this._responseHandler = (async (evt: CustomEvent<DataMessage>) => {
if (!evt) {
return;
}
const message = evt.detail;
if (!message.data) {
return;
}
try {
const decoded = deserialize(message.data, BlockMessage);
if (decoded instanceof BlockRequest && this._localStore) {
this._loadFetchQueue.add(() =>
this.handleFetchRequest(decoded, localTimeout)
);
} else if (decoded instanceof BlockResponse) {
// TODO make sure we are not storing too much bytes in ram (like filter large blocks)
this._resolvers.get(decoded.cid)?.(decoded.bytes);
}
} catch (error) {
console.error("Got error for libp2p block transport: ", error);
return; // timeout o r invalid cid
}
}) */

this._responseHandler = async (message: BlockMessage) => {
try {
Expand Down

0 comments on commit 1474397

Please sign in to comment.