diff --git a/src/postgres_scanner.cpp b/src/postgres_scanner.cpp index dfee871..c03f7c2 100644 --- a/src/postgres_scanner.cpp +++ b/src/postgres_scanner.cpp @@ -62,40 +62,21 @@ struct PostgresGlobalState : public GlobalTableFunctionState { PostgresConnection connection; }; -static void PostgresGetSnapshot(PostgresVersion version, const PostgresBindData &bind_data, - PostgresGlobalState &gstate) { +static void PostgresGetSnapshot(const PostgresBindData &bind_data, PostgresGlobalState &gstate) { unique_ptr result; // by default disable snapshotting gstate.snapshot = string(); if (gstate.max_threads <= 1) { return; } - if (version.type_v == PostgresInstanceType::AURORA) { - return; - } + // reader threads can use the same snapshot auto &con = gstate.GetConnection(); - // pg_stat_wal_receiver was introduced in PostgreSQL 9.6 - if (version < PostgresVersion(9, 6, 0)) { - result = con.TryQuery("SELECT pg_is_in_recovery(), pg_export_snapshot()"); - if (result) { - auto in_recovery = result->GetBool(0, 0); - if (!in_recovery) { - gstate.snapshot = result->GetString(0, 1); - } - } - return; - } result = - con.TryQuery("SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)"); + con.TryQuery("SELECT pg_export_snapshot()"); if (result) { - auto in_recovery = result->GetBool(0, 0) || result->GetInt64(0, 2) > 0; - gstate.snapshot = ""; - if (!in_recovery) { - gstate.snapshot = result->GetString(0, 1); - } - return; + gstate.snapshot = result->GetString(0, 0); } } @@ -324,7 +305,7 @@ static unique_ptr PostgresInitGlobalState(ClientContex result->collection->InitializeScan(result->scan_state); } else { // we create a transaction here, and get the snapshot id to enable transaction-safe parallelism - PostgresGetSnapshot(bind_data.version, bind_data, *result); + PostgresGetSnapshot(bind_data, *result); } return std::move(result); }