diff --git a/packages/elixir-client/lib/electric/client/offset.ex b/packages/elixir-client/lib/electric/client/offset.ex index d93772dee4..b296548a2a 100644 --- a/packages/elixir-client/lib/electric/client/offset.ex +++ b/packages/elixir-client/lib/electric/client/offset.ex @@ -78,7 +78,7 @@ defmodule Electric.Client.Offset do else with [tx_offset_str, op_offset_str] <- :binary.split(str, "_"), {tx_offset, ""} <- Integer.parse(tx_offset_str), - {op_offset, ""} <- Integer.parse(op_offset_str) do + {op_offset, ""} <- parse_int_or_inf(op_offset_str) do {:ok, %__MODULE__{tx: tx_offset, op: op_offset}} else _ -> {:error, "has invalid format"} @@ -86,6 +86,9 @@ defmodule Electric.Client.Offset do end end + defp parse_int_or_inf("inf"), do: {:infinity, ""} + defp parse_int_or_inf(int), do: Integer.parse(int) + @doc """ Create a new #{__MODULE__} struct from the given LSN and operation offsets. @@ -115,7 +118,7 @@ defmodule Electric.Client.Offset do end def to_string(%__MODULE__{tx: tx, op: op}) do - "#{Integer.to_string(tx)}_#{Integer.to_string(op)}" + "#{Integer.to_string(tx)}_#{if op == :infinity, do: "inf", else: Integer.to_string(op)}" end @spec to_tuple(t()) :: {tx_offset(), op_offset()} diff --git a/packages/elixir-client/test/electric/client/mock_test.exs b/packages/elixir-client/test/electric/client/mock_test.exs index 8d213b803e..efa8b08b26 100644 --- a/packages/elixir-client/test/electric/client/mock_test.exs +++ b/packages/elixir-client/test/electric/client/mock_test.exs @@ -42,7 +42,7 @@ defmodule Electric.Client.MockTest do Client.Mock.response(client, status: 200, schema: %{id: %{type: "int8"}}, - last_offset: Offset.new(0, 0), + last_offset: Offset.new(0, 1), shape_handle: "my-shape", body: [ Client.Mock.change(value: %{id: "4444"}), @@ -60,7 +60,7 @@ defmodule Electric.Client.MockTest do %ChangeMessage{value: %{"id" => 2222}}, %ChangeMessage{value: %{"id" => 3333}}, %ChangeMessage{value: %{"id" => 4444}}, - up_to_date0() + up_to_date() ] = events end end diff --git a/packages/elixir-client/test/support/client_helpers.ex b/packages/elixir-client/test/support/client_helpers.ex index 2145e69644..7ef142aec8 100644 --- a/packages/elixir-client/test/support/client_helpers.ex +++ b/packages/elixir-client/test/support/client_helpers.ex @@ -4,7 +4,7 @@ defmodule Support.ClientHelpers do defmacro offset(tx, op), do: quote(do: %Offset{tx: unquote(tx), op: unquote(op)}) - defmacro offset0, do: quote(do: offset(0, 0)) + defmacro offset0, do: quote(do: offset(0, :infinity)) defmacro up_to_date() do quote(do: %ControlMessage{control: :up_to_date, offset: %Offset{tx: _, op: _}}) @@ -19,5 +19,5 @@ defmodule Support.ClientHelpers do ) end - defmacro up_to_date0(), do: quote(do: up_to_date(0, 0)) + defmacro up_to_date0(), do: quote(do: up_to_date(0, :infinity)) end diff --git a/packages/react-hooks/test/react-hooks.test.tsx b/packages/react-hooks/test/react-hooks.test.tsx index ac8c559b95..5a7a26755e 100644 --- a/packages/react-hooks/test/react-hooks.test.tsx +++ b/packages/react-hooks/test/react-hooks.test.tsx @@ -3,7 +3,7 @@ import { describe, expect, inject, it as bareIt } from 'vitest' import { setTimeout as sleep } from 'node:timers/promises' import { testWithIssuesTable as it } from './support/test-context' import { useShape, sortedOptionsHash, UseShapeResult } from '../src/react-hooks' -import { Shape, Message } from '@electric-sql/client' +import { Shape, ShapeStream } from '@electric-sql/client' const BASE_URL = inject(`baseUrl`) @@ -352,13 +352,19 @@ describe(`useShape`, () => { // Add another row to shape const [newId] = await insertIssues({ title: `other row` }) + + const parallelWaiterStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + subscribe: true, + }) + // And wait until it's definitely seen await waitFor(async () => { - const res = await fetch( - `${BASE_URL}/v1/shape?table=${issuesTableUrl}&offset=-1` - ) - const body = (await res.json()) as Message[] - expect(body).toMatchObject([{}, { value: { id: newId } }]) + return parallelWaiterStream.isUpToDate || (await sleep(50)) }) await sleep(50) diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index 90542388a9..a0def01e02 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -308,11 +308,8 @@ describe(`HTTP Sync`, () => { ) await vi.waitFor(async () => { - const res = await fetch( - `${BASE_URL}/v1/shape?table=${tableUrl}&offset=-1` - ) - const body = (await res.json()) as Message[] - expect(body.length).greaterThan(1) + await sleep(100) + return client.isUpToDate }) const updatedData = await client.rows @@ -532,11 +529,8 @@ describe(`HTTP Sync`, () => { // And wait until it's definitely seen await vi.waitFor(async () => { - const res = await fetch( - `${BASE_URL}/v1/shape?table=${issuesTableUrl}&offset=-1` - ) - const body = (await res.json()) as Message[] - expect(body).toHaveLength(12) + await sleep(50) + return issueStream.isUpToDate }) let catchupOpsCount = 0 @@ -563,7 +557,9 @@ describe(`HTTP Sync`, () => { expect(catchupOpsCount).toBe(9) }) - it(`should return correct caching headers`, async ({ + // This test doesn't work anymore because server sends initial snapshot as a complete + // stable chunk, so e-tag is the same between requests. + it.skip(`should return correct caching headers`, async ({ issuesTableUrl, insertIssues, }) => { @@ -603,7 +599,12 @@ describe(`HTTP Sync`, () => { expect(etagHeader).not.toEqual(etag2Header) }) - it(`should revalidate etags`, async ({ issuesTableUrl, insertIssues }) => { + // This test doesn't work anymore because server sends initial snapshot as a complete + // stable chunk, so e-tag is the same between requests. + it.skip(`should revalidate etags`, async ({ + issuesTableUrl, + insertIssues, + }) => { // Start the shape await fetch(`${BASE_URL}/v1/shape?table=${issuesTableUrl}&offset=-1`, {}) // Fill it up in separate transactions @@ -794,11 +795,8 @@ describe(`HTTP Sync`, () => { // And wait until it's definitely seen await vi.waitFor(async () => { - const res = await fetch( - `${BASE_URL}/v1/shape?table=${issuesTableUrl}&offset=-1` - ) - const body = (await res.json()) as Message[] - expect(body.length).greaterThan(2) + await sleep(50) + return issueStream.isUpToDate }) const responseSizes: number[] = []