diff --git a/Cargo.toml b/Cargo.toml index 4fb27af0f0..0aeb0ad3bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["packages/api/actor","packages/api/auth","packages/api/cf-verification","packages/api/cloud","packages/api/games","packages/api/group","packages/api/identity","packages/api/job","packages/api/matchmaker","packages/api/monolith-edge","packages/api/monolith-public","packages/api/portal","packages/api/provision","packages/api/status","packages/api/traefik-provider","packages/api/ui","packages/common/api-helper/build","packages/common/api-helper/macros","packages/common/cache/build","packages/common/cache/result","packages/common/chirp-workflow/core","packages/common/chirp-workflow/macros","packages/common/chirp/client","packages/common/chirp/metrics","packages/common/chirp/perf","packages/common/chirp/types","packages/common/chirp/worker","packages/common/chirp/worker-attributes","packages/common/claims","packages/common/config","packages/common/connection","packages/common/convert","packages/common/deno-embed","packages/common/env","packages/common/fdb-util","packages/common/formatted-error","packages/common/global-error","packages/common/health-checks","packages/common/hub-embed","packages/common/kv-str","packages/common/metrics","packages/common/migrate","packages/common/nomad-util","packages/common/operation/core","packages/common/operation/macros","packages/common/pools","packages/common/redis-util","packages/common/runtime","packages/common/s3-util","packages/common/schemac","packages/common/server-cli","packages/common/service-manager","packages/common/smithy-output/api-auth/rust","packages/common/smithy-output/api-auth/rust-server","packages/common/smithy-output/api-cf-verification/rust","packages/common/smithy-output/api-cf-verification/rust-server","packages/common/smithy-output/api-cloud/rust","packages/common/smithy-output/api-cloud/rust-server","packages/common/smithy-output/api-group/rust","packages/common/smithy-output/api-group/rust-server","packages/common/smithy-output/api-identity/rust","packages/common/smithy-output/api-identity/rust-server","packages/common/smithy-output/api-job/rust","packages/common/smithy-output/api-job/rust-server","packages/common/smithy-output/api-kv/rust","packages/common/smithy-output/api-kv/rust-server","packages/common/smithy-output/api-matchmaker/rust","packages/common/smithy-output/api-matchmaker/rust-server","packages/common/smithy-output/api-party/rust","packages/common/smithy-output/api-party/rust-server","packages/common/smithy-output/api-portal/rust","packages/common/smithy-output/api-portal/rust-server","packages/common/smithy-output/api-status/rust","packages/common/smithy-output/api-status/rust-server","packages/common/smithy-output/api-traefik-provider/rust","packages/common/smithy-output/api-traefik-provider/rust-server","packages/common/sqlite-util","packages/common/test","packages/common/test-images","packages/common/types-proto/build","packages/common/types-proto/core","packages/common/util/core","packages/common/util/macros","packages/common/util/search","packages/infra/client/actor-kv","packages/infra/client/config","packages/infra/client/container-runner","packages/infra/client/echo","packages/infra/client/isolate-v8-runner","packages/infra/client/logs","packages/infra/client/manager","packages/infra/legacy/job-runner","packages/infra/schema-generator","packages/infra/server","packages/services/build","packages/services/build/ops/create","packages/services/build/ops/get","packages/services/build/ops/list-for-env","packages/services/build/ops/list-for-game","packages/services/build/standalone/default-create","packages/services/build/util","packages/services/captcha/ops/hcaptcha-config-get","packages/services/captcha/ops/hcaptcha-verify","packages/services/captcha/ops/request","packages/services/captcha/ops/turnstile-config-get","packages/services/captcha/ops/turnstile-verify","packages/services/captcha/ops/verify","packages/services/captcha/util","packages/services/cdn/ops/namespace-auth-user-remove","packages/services/cdn/ops/namespace-auth-user-update","packages/services/cdn/ops/namespace-create","packages/services/cdn/ops/namespace-domain-create","packages/services/cdn/ops/namespace-domain-remove","packages/services/cdn/ops/namespace-get","packages/services/cdn/ops/namespace-resolve-domain","packages/services/cdn/ops/ns-auth-type-set","packages/services/cdn/ops/ns-enable-domain-public-auth-set","packages/services/cdn/ops/site-create","packages/services/cdn/ops/site-get","packages/services/cdn/ops/site-list-for-game","packages/services/cdn/ops/version-get","packages/services/cdn/ops/version-prepare","packages/services/cdn/ops/version-publish","packages/services/cdn/util","packages/services/cdn/worker","packages/services/cf-custom-hostname/ops/get","packages/services/cf-custom-hostname/ops/list-for-namespace-id","packages/services/cf-custom-hostname/ops/resolve-hostname","packages/services/cf-custom-hostname/worker","packages/services/cloud/ops/device-link-create","packages/services/cloud/ops/game-config-create","packages/services/cloud/ops/game-config-get","packages/services/cloud/ops/game-token-create","packages/services/cloud/ops/namespace-create","packages/services/cloud/ops/namespace-get","packages/services/cloud/ops/namespace-token-development-create","packages/services/cloud/ops/namespace-token-public-create","packages/services/cloud/ops/version-get","packages/services/cloud/ops/version-publish","packages/services/cloud/standalone/default-create","packages/services/cloud/worker","packages/services/cluster","packages/services/cluster/standalone/datacenter-tls-renew","packages/services/cluster/standalone/default-update","packages/services/cluster/standalone/gc","packages/services/cluster/standalone/metrics-publish","packages/services/custom-user-avatar/ops/list-for-game","packages/services/custom-user-avatar/ops/upload-complete","packages/services/debug/ops/email-res","packages/services/ds","packages/services/ds-log/ops/export","packages/services/ds-log/ops/read","packages/services/dynamic-config","packages/services/edge/monolith/standalone/workflow-worker","packages/services/edge/pegboard","packages/services/email-verification/ops/complete","packages/services/email-verification/ops/create","packages/services/email/ops/send","packages/services/external/ops/request-validate","packages/services/external/worker","packages/services/faker/ops/build","packages/services/faker/ops/cdn-site","packages/services/faker/ops/game","packages/services/faker/ops/game-namespace","packages/services/faker/ops/game-version","packages/services/faker/ops/job-run","packages/services/faker/ops/job-template","packages/services/faker/ops/mm-lobby","packages/services/faker/ops/mm-lobby-row","packages/services/faker/ops/mm-player","packages/services/faker/ops/region","packages/services/faker/ops/team","packages/services/faker/ops/user","packages/services/game/ops/banner-upload-complete","packages/services/game/ops/create","packages/services/game/ops/get","packages/services/game/ops/list-all","packages/services/game/ops/list-for-team","packages/services/game/ops/logo-upload-complete","packages/services/game/ops/namespace-create","packages/services/game/ops/namespace-get","packages/services/game/ops/namespace-list","packages/services/game/ops/namespace-resolve-name-id","packages/services/game/ops/namespace-resolve-url","packages/services/game/ops/namespace-validate","packages/services/game/ops/namespace-version-history-list","packages/services/game/ops/namespace-version-set","packages/services/game/ops/recommend","packages/services/game/ops/resolve-name-id","packages/services/game/ops/resolve-namespace-id","packages/services/game/ops/token-development-validate","packages/services/game/ops/validate","packages/services/game/ops/version-create","packages/services/game/ops/version-get","packages/services/game/ops/version-list","packages/services/game/ops/version-validate","packages/services/ip/ops/info","packages/services/job-log/ops/read","packages/services/job-log/worker","packages/services/job-run","packages/services/job/standalone/gc","packages/services/job/util","packages/services/linode","packages/services/linode/standalone/gc","packages/services/load-test/standalone/api-cloud","packages/services/load-test/standalone/mm","packages/services/load-test/standalone/mm-sustain","packages/services/load-test/standalone/sqlx","packages/services/load-test/standalone/watch-requests","packages/services/mm-config/ops/game-get","packages/services/mm-config/ops/game-upsert","packages/services/mm-config/ops/lobby-group-get","packages/services/mm-config/ops/lobby-group-resolve-name-id","packages/services/mm-config/ops/lobby-group-resolve-version","packages/services/mm-config/ops/namespace-config-set","packages/services/mm-config/ops/namespace-config-validate","packages/services/mm-config/ops/namespace-create","packages/services/mm-config/ops/namespace-get","packages/services/mm-config/ops/version-get","packages/services/mm-config/ops/version-prepare","packages/services/mm-config/ops/version-publish","packages/services/mm/ops/dev-player-token-create","packages/services/mm/ops/lobby-find-fail","packages/services/mm/ops/lobby-find-lobby-query-list","packages/services/mm/ops/lobby-find-try-complete","packages/services/mm/ops/lobby-for-run-id","packages/services/mm/ops/lobby-get","packages/services/mm/ops/lobby-history","packages/services/mm/ops/lobby-idle-update","packages/services/mm/ops/lobby-list-for-namespace","packages/services/mm/ops/lobby-list-for-user-id","packages/services/mm/ops/lobby-player-count","packages/services/mm/ops/lobby-runtime-aggregate","packages/services/mm/ops/lobby-state-get","packages/services/mm/ops/player-count-for-namespace","packages/services/mm/ops/player-get","packages/services/mm/standalone/gc","packages/services/mm/util","packages/services/mm/worker","packages/services/monolith/standalone/worker","packages/services/monolith/standalone/workflow-worker","packages/services/nomad/standalone/monitor","packages/services/pegboard","packages/services/pegboard/standalone/dc-init","packages/services/pegboard/standalone/gc","packages/services/pegboard/standalone/metrics-publish","packages/services/pegboard/standalone/ws","packages/services/region/ops/get","packages/services/region/ops/list","packages/services/region/ops/list-for-game","packages/services/region/ops/recommend","packages/services/region/ops/resolve","packages/services/region/ops/resolve-for-game","packages/services/server-spec","packages/services/team-invite/ops/get","packages/services/team-invite/worker","packages/services/team/ops/avatar-upload-complete","packages/services/team/ops/get","packages/services/team/ops/join-request-list","packages/services/team/ops/member-count","packages/services/team/ops/member-get","packages/services/team/ops/member-list","packages/services/team/ops/member-relationship-get","packages/services/team/ops/profile-validate","packages/services/team/ops/recommend","packages/services/team/ops/resolve-display-name","packages/services/team/ops/user-ban-get","packages/services/team/ops/user-ban-list","packages/services/team/ops/validate","packages/services/team/util","packages/services/team/worker","packages/services/telemetry/standalone/beacon","packages/services/tier","packages/services/token/ops/create","packages/services/token/ops/exchange","packages/services/token/ops/get","packages/services/token/ops/revoke","packages/services/upload/ops/complete","packages/services/upload/ops/file-list","packages/services/upload/ops/get","packages/services/upload/ops/list-for-user","packages/services/upload/ops/prepare","packages/services/upload/worker","packages/services/user","packages/services/user-identity/ops/create","packages/services/user-identity/ops/delete","packages/services/user-identity/ops/get","packages/services/user/ops/avatar-upload-complete","packages/services/user/ops/get","packages/services/user/ops/pending-delete-toggle","packages/services/user/ops/profile-validate","packages/services/user/ops/resolve-email","packages/services/user/ops/team-list","packages/services/user/ops/token-create","packages/services/user/standalone/delete-pending","packages/services/user/worker","packages/services/workflow/standalone/gc","packages/services/workflow/standalone/metrics-publish","packages/toolchain/actors-sdk-embed","packages/toolchain/cli","packages/toolchain/js-utils-embed","packages/toolchain/toolchain","sdks/api/full/rust"] +members = ["packages/api/actor","packages/api/auth","packages/api/cf-verification","packages/api/cloud","packages/api/games","packages/api/group","packages/api/identity","packages/api/job","packages/api/matchmaker","packages/api/monolith-edge","packages/api/monolith-public","packages/api/portal","packages/api/provision","packages/api/status","packages/api/traefik-provider","packages/api/ui","packages/common/api-helper/build","packages/common/api-helper/macros","packages/common/cache/build","packages/common/cache/result","packages/common/chirp-workflow/core","packages/common/chirp-workflow/macros","packages/common/chirp/client","packages/common/chirp/metrics","packages/common/chirp/perf","packages/common/chirp/types","packages/common/chirp/worker","packages/common/chirp/worker-attributes","packages/common/claims","packages/common/config","packages/common/connection","packages/common/convert","packages/common/deno-embed","packages/common/env","packages/common/fdb-util","packages/common/formatted-error","packages/common/global-error","packages/common/health-checks","packages/common/hub-embed","packages/common/kv-str","packages/common/metrics","packages/common/migrate","packages/common/nomad-util","packages/common/operation/core","packages/common/operation/macros","packages/common/pools","packages/common/redis-util","packages/common/runtime","packages/common/s3-util","packages/common/schemac","packages/common/server-cli","packages/common/service-manager","packages/common/smithy-output/api-auth/rust","packages/common/smithy-output/api-auth/rust-server","packages/common/smithy-output/api-cf-verification/rust","packages/common/smithy-output/api-cf-verification/rust-server","packages/common/smithy-output/api-cloud/rust","packages/common/smithy-output/api-cloud/rust-server","packages/common/smithy-output/api-group/rust","packages/common/smithy-output/api-group/rust-server","packages/common/smithy-output/api-identity/rust","packages/common/smithy-output/api-identity/rust-server","packages/common/smithy-output/api-job/rust","packages/common/smithy-output/api-job/rust-server","packages/common/smithy-output/api-kv/rust","packages/common/smithy-output/api-kv/rust-server","packages/common/smithy-output/api-matchmaker/rust","packages/common/smithy-output/api-matchmaker/rust-server","packages/common/smithy-output/api-party/rust","packages/common/smithy-output/api-party/rust-server","packages/common/smithy-output/api-portal/rust","packages/common/smithy-output/api-portal/rust-server","packages/common/smithy-output/api-status/rust","packages/common/smithy-output/api-status/rust-server","packages/common/smithy-output/api-traefik-provider/rust","packages/common/smithy-output/api-traefik-provider/rust-server","packages/common/sqlite-util","packages/common/test","packages/common/test-images","packages/common/types-proto/build","packages/common/types-proto/core","packages/common/util/core","packages/common/util/macros","packages/common/util/search","packages/infra/client/actor-kv","packages/infra/client/config","packages/infra/client/container-runner","packages/infra/client/echo","packages/infra/client/isolate-v8-runner","packages/infra/client/logs","packages/infra/client/manager","packages/infra/edge-server","packages/infra/legacy/job-runner","packages/infra/schema-generator","packages/infra/server","packages/services/build","packages/services/build/ops/create","packages/services/build/ops/get","packages/services/build/ops/list-for-env","packages/services/build/ops/list-for-game","packages/services/build/standalone/default-create","packages/services/build/util","packages/services/captcha/ops/hcaptcha-config-get","packages/services/captcha/ops/hcaptcha-verify","packages/services/captcha/ops/request","packages/services/captcha/ops/turnstile-config-get","packages/services/captcha/ops/turnstile-verify","packages/services/captcha/ops/verify","packages/services/captcha/util","packages/services/cdn/ops/namespace-auth-user-remove","packages/services/cdn/ops/namespace-auth-user-update","packages/services/cdn/ops/namespace-create","packages/services/cdn/ops/namespace-domain-create","packages/services/cdn/ops/namespace-domain-remove","packages/services/cdn/ops/namespace-get","packages/services/cdn/ops/namespace-resolve-domain","packages/services/cdn/ops/ns-auth-type-set","packages/services/cdn/ops/ns-enable-domain-public-auth-set","packages/services/cdn/ops/site-create","packages/services/cdn/ops/site-get","packages/services/cdn/ops/site-list-for-game","packages/services/cdn/ops/version-get","packages/services/cdn/ops/version-prepare","packages/services/cdn/ops/version-publish","packages/services/cdn/util","packages/services/cdn/worker","packages/services/cf-custom-hostname/ops/get","packages/services/cf-custom-hostname/ops/list-for-namespace-id","packages/services/cf-custom-hostname/ops/resolve-hostname","packages/services/cf-custom-hostname/worker","packages/services/cloud/ops/device-link-create","packages/services/cloud/ops/game-config-create","packages/services/cloud/ops/game-config-get","packages/services/cloud/ops/game-token-create","packages/services/cloud/ops/namespace-create","packages/services/cloud/ops/namespace-get","packages/services/cloud/ops/namespace-token-development-create","packages/services/cloud/ops/namespace-token-public-create","packages/services/cloud/ops/version-get","packages/services/cloud/ops/version-publish","packages/services/cloud/standalone/default-create","packages/services/cloud/worker","packages/services/cluster","packages/services/cluster/standalone/datacenter-tls-renew","packages/services/cluster/standalone/default-update","packages/services/cluster/standalone/gc","packages/services/cluster/standalone/metrics-publish","packages/services/custom-user-avatar/ops/list-for-game","packages/services/custom-user-avatar/ops/upload-complete","packages/services/debug/ops/email-res","packages/services/ds","packages/services/ds-log/ops/export","packages/services/ds-log/ops/read","packages/services/dynamic-config","packages/services/edge/monolith/standalone/workflow-worker","packages/services/edge/pegboard","packages/services/email-verification/ops/complete","packages/services/email-verification/ops/create","packages/services/email/ops/send","packages/services/external/ops/request-validate","packages/services/external/worker","packages/services/faker/ops/build","packages/services/faker/ops/cdn-site","packages/services/faker/ops/game","packages/services/faker/ops/game-namespace","packages/services/faker/ops/game-version","packages/services/faker/ops/job-run","packages/services/faker/ops/job-template","packages/services/faker/ops/mm-lobby","packages/services/faker/ops/mm-lobby-row","packages/services/faker/ops/mm-player","packages/services/faker/ops/region","packages/services/faker/ops/team","packages/services/faker/ops/user","packages/services/game/ops/banner-upload-complete","packages/services/game/ops/create","packages/services/game/ops/get","packages/services/game/ops/list-all","packages/services/game/ops/list-for-team","packages/services/game/ops/logo-upload-complete","packages/services/game/ops/namespace-create","packages/services/game/ops/namespace-get","packages/services/game/ops/namespace-list","packages/services/game/ops/namespace-resolve-name-id","packages/services/game/ops/namespace-resolve-url","packages/services/game/ops/namespace-validate","packages/services/game/ops/namespace-version-history-list","packages/services/game/ops/namespace-version-set","packages/services/game/ops/recommend","packages/services/game/ops/resolve-name-id","packages/services/game/ops/resolve-namespace-id","packages/services/game/ops/token-development-validate","packages/services/game/ops/validate","packages/services/game/ops/version-create","packages/services/game/ops/version-get","packages/services/game/ops/version-list","packages/services/game/ops/version-validate","packages/services/ip/ops/info","packages/services/job-log/ops/read","packages/services/job-log/worker","packages/services/job-run","packages/services/job/standalone/gc","packages/services/job/util","packages/services/linode","packages/services/linode/standalone/gc","packages/services/load-test/standalone/api-cloud","packages/services/load-test/standalone/mm","packages/services/load-test/standalone/mm-sustain","packages/services/load-test/standalone/sqlx","packages/services/load-test/standalone/watch-requests","packages/services/mm-config/ops/game-get","packages/services/mm-config/ops/game-upsert","packages/services/mm-config/ops/lobby-group-get","packages/services/mm-config/ops/lobby-group-resolve-name-id","packages/services/mm-config/ops/lobby-group-resolve-version","packages/services/mm-config/ops/namespace-config-set","packages/services/mm-config/ops/namespace-config-validate","packages/services/mm-config/ops/namespace-create","packages/services/mm-config/ops/namespace-get","packages/services/mm-config/ops/version-get","packages/services/mm-config/ops/version-prepare","packages/services/mm-config/ops/version-publish","packages/services/mm/ops/dev-player-token-create","packages/services/mm/ops/lobby-find-fail","packages/services/mm/ops/lobby-find-lobby-query-list","packages/services/mm/ops/lobby-find-try-complete","packages/services/mm/ops/lobby-for-run-id","packages/services/mm/ops/lobby-get","packages/services/mm/ops/lobby-history","packages/services/mm/ops/lobby-idle-update","packages/services/mm/ops/lobby-list-for-namespace","packages/services/mm/ops/lobby-list-for-user-id","packages/services/mm/ops/lobby-player-count","packages/services/mm/ops/lobby-runtime-aggregate","packages/services/mm/ops/lobby-state-get","packages/services/mm/ops/player-count-for-namespace","packages/services/mm/ops/player-get","packages/services/mm/standalone/gc","packages/services/mm/util","packages/services/mm/worker","packages/services/monolith/standalone/worker","packages/services/monolith/standalone/workflow-worker","packages/services/nomad/standalone/monitor","packages/services/pegboard","packages/services/pegboard/standalone/dc-init","packages/services/pegboard/standalone/gc","packages/services/pegboard/standalone/metrics-publish","packages/services/pegboard/standalone/ws","packages/services/region/ops/get","packages/services/region/ops/list","packages/services/region/ops/list-for-game","packages/services/region/ops/recommend","packages/services/region/ops/resolve","packages/services/region/ops/resolve-for-game","packages/services/server-spec","packages/services/team-invite/ops/get","packages/services/team-invite/worker","packages/services/team/ops/avatar-upload-complete","packages/services/team/ops/get","packages/services/team/ops/join-request-list","packages/services/team/ops/member-count","packages/services/team/ops/member-get","packages/services/team/ops/member-list","packages/services/team/ops/member-relationship-get","packages/services/team/ops/profile-validate","packages/services/team/ops/recommend","packages/services/team/ops/resolve-display-name","packages/services/team/ops/user-ban-get","packages/services/team/ops/user-ban-list","packages/services/team/ops/validate","packages/services/team/util","packages/services/team/worker","packages/services/telemetry/standalone/beacon","packages/services/tier","packages/services/token/ops/create","packages/services/token/ops/exchange","packages/services/token/ops/get","packages/services/token/ops/revoke","packages/services/upload/ops/complete","packages/services/upload/ops/file-list","packages/services/upload/ops/get","packages/services/upload/ops/list-for-user","packages/services/upload/ops/prepare","packages/services/upload/worker","packages/services/user","packages/services/user-identity/ops/create","packages/services/user-identity/ops/delete","packages/services/user-identity/ops/get","packages/services/user/ops/avatar-upload-complete","packages/services/user/ops/get","packages/services/user/ops/pending-delete-toggle","packages/services/user/ops/profile-validate","packages/services/user/ops/resolve-email","packages/services/user/ops/team-list","packages/services/user/ops/token-create","packages/services/user/standalone/delete-pending","packages/services/user/worker","packages/toolchain/actors-sdk-embed","packages/toolchain/cli","packages/toolchain/js-utils-embed","packages/toolchain/toolchain","sdks/api/full/rust"] [workspace.package] version = "25.1.0-rc.1" @@ -360,6 +360,9 @@ path = "packages/infra/client/logs" [workspace.dependencies.pegboard-manager] path = "packages/infra/client/manager" +[workspace.dependencies.rivet-edge-server] +path = "packages/infra/edge-server" + [workspace.dependencies.rivet-job-runner] path = "packages/infra/legacy/job-runner" @@ -999,12 +1002,6 @@ path = "packages/services/user/standalone/delete-pending" [workspace.dependencies.user-worker] path = "packages/services/user/worker" -[workspace.dependencies.workflow-gc] -path = "packages/services/workflow/standalone/gc" - -[workspace.dependencies.workflow-metrics-publish] -path = "packages/services/workflow/standalone/metrics-publish" - [workspace.dependencies.rivet-actors-sdk-embed] path = "packages/toolchain/actors-sdk-embed" diff --git a/packages/common/chirp-workflow/core/src/db/crdb_nats.rs b/packages/common/chirp-workflow/core/src/db/crdb_nats.rs index 2074f64298..8cd0283d0a 100644 --- a/packages/common/chirp-workflow/core/src/db/crdb_nats.rs +++ b/packages/common/chirp-workflow/core/src/db/crdb_nats.rs @@ -1,6 +1,7 @@ //! Implementation of a workflow database driver with PostgreSQL (CockroachDB) and NATS. use std::{ + collections::HashSet, sync::Arc, time::{Duration, Instant}, }; @@ -34,6 +35,12 @@ const QUERY_RETRY_MS: usize = 500; const TXN_RETRY: Duration = Duration::from_millis(100); /// Maximum times a query ran by this database adapter is retried. const MAX_QUERY_RETRIES: usize = 16; +/// How long before considering the leases of a given worker instance "expired". +const WORKER_INSTANCE_EXPIRED_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30); +/// How long before overwriting an existing GC lock. +const GC_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30); +/// How long before overwriting an existing metrics lock. +const METRICS_LOCK_TIMEOUT_MS: i64 = GC_LOCK_TIMEOUT_MS; /// For SQL macros. const CONTEXT_NAME: &str = "chirp_workflow_crdb_nats_engine"; /// For NATS wake mechanism. @@ -159,6 +166,248 @@ impl Database for DatabaseCrdbNats { } } + async fn clear_expired_leases(&self, worker_instance_id: Uuid) -> WorkflowResult<()> { + let acquired_lock = sql_fetch_optional!( + [self, (i64,)] + " + UPDATE db_workflow.workflow_gc + SET + worker_instance_id = $1, + lock_ts = $2 + WHERE lock_ts IS NULL OR lock_ts < $2 - $3 + RETURNING 1 + ", + worker_instance_id, + rivet_util::timestamp::now(), + GC_LOCK_TIMEOUT_MS, + ) + .await? + .is_some(); + + if acquired_lock { + // Reset all workflows on worker instances that have not had a ping in the last 30 seconds + let rows = sql_fetch_all!( + [self, (Uuid, Uuid,)] + " + UPDATE db_workflow.workflows AS w + SET + worker_instance_id = NULL, + wake_immediate = true, + wake_deadline_ts = NULL, + wake_signals = ARRAY[], + wake_sub_workflow_id = NULL + FROM db_workflow.worker_instances AS wi + WHERE + wi.last_ping_ts < $1 AND + wi.worker_instance_id = w.worker_instance_id AND + w.output IS NULL AND + w.silence_ts IS NULL AND + -- Check for any wake condition so we don't restart a permanently dead workflow + ( + w.wake_immediate OR + w.wake_deadline_ts IS NOT NULL OR + cardinality(w.wake_signals) > 0 OR + w.wake_sub_workflow_id IS NOT NULL + ) + RETURNING w.workflow_id, wi.worker_instance_id + ", + rivet_util::timestamp::now() - WORKER_INSTANCE_EXPIRED_THRESHOLD_MS, + ) + .await?; + + if !rows.is_empty() { + let unique_worker_instance_ids = rows + .iter() + .map(|(_, worker_instance_id)| worker_instance_id) + .collect::>(); + + tracing::info!( + worker_instance_ids=?unique_worker_instance_ids, + total_workflows=%rows.len(), + "handled failover", + ); + } + + // Clear lock + sql_execute!( + [self] + " + UPDATE db_workflow.workflow_gc + SET + worker_instance_id = NULL, + lock_ts = NULL + WHERE worker_instance_id = $1 + ", + worker_instance_id, + ) + .await?; + } + + Ok(()) + } + + async fn publish_metrics(&self, worker_instance_id: Uuid) -> WorkflowResult<()> { + // Always update ping + metrics::WORKER_LAST_PING + .with_label_values(&[&worker_instance_id.to_string()]) + .set(rivet_util::timestamp::now()); + + let acquired_lock = sql_fetch_optional!( + [self, (i64,)] + " + UPDATE db_workflow.workflow_metrics + SET + worker_instance_id = $1, + lock_ts = $2 + WHERE lock_ts IS NULL OR lock_ts < $2 - $3 + RETURNING 1 + ", + worker_instance_id, + rivet_util::timestamp::now(), + METRICS_LOCK_TIMEOUT_MS, + ) + .await? + .is_some(); + + if acquired_lock { + let ( + total_workflow_count, + active_workflow_count, + dead_workflow_count, + sleeping_workflow_count, + pending_signal_count, + ) = tokio::try_join!( + sql_fetch_all!( + [self, (String, i64)] + " + SELECT workflow_name, COUNT(*) + FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' + GROUP BY workflow_name + ", + ), + sql_fetch_all!( + [self, (String, i64)] + " + SELECT workflow_name, COUNT(*) + FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' + WHERE + output IS NULL AND + worker_instance_id IS NOT NULL AND + silence_ts IS NULL + GROUP BY workflow_name + ", + ), + sql_fetch_all!( + [self, (String, String, i64)] + " + SELECT workflow_name, error, COUNT(*) + FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' + WHERE + error IS NOT NULL AND + output IS NULL AND + silence_ts IS NULL AND + wake_immediate = FALSE AND + wake_deadline_ts IS NULL AND + cardinality(wake_signals) = 0 AND + wake_sub_workflow_id IS NULL + GROUP BY workflow_name, error + ", + ), + sql_fetch_all!( + [self, (String, i64)] + " + SELECT workflow_name, COUNT(*) + FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' + WHERE + worker_instance_id IS NULL AND + output IS NULL AND + silence_ts IS NULL AND + ( + wake_immediate OR + wake_deadline_ts IS NOT NULL OR + cardinality(wake_signals) > 0 OR + wake_sub_workflow_id IS NOT NULL + ) + GROUP BY workflow_name + ", + ), + sql_fetch_all!( + [self, (String, i64)] + " + SELECT signal_name, COUNT(*) + FROM ( + SELECT signal_name + FROM db_workflow.signals + WHERE + ack_ts IS NULL AND + silence_ts IS NULL + UNION ALL + SELECT signal_name + FROM db_workflow.tagged_signals + WHERE + ack_ts IS NULL AND + silence_ts IS NULL + ) AS OF SYSTEM TIME '-1s' + GROUP BY signal_name + ", + ), + )?; + + // Get rid of metrics that don't exist in the db anymore (declarative) + metrics::WORKFLOW_TOTAL.reset(); + metrics::WORKFLOW_ACTIVE.reset(); + metrics::WORKFLOW_DEAD.reset(); + metrics::WORKFLOW_SLEEPING.reset(); + metrics::SIGNAL_PENDING.reset(); + + for (workflow_name, count) in total_workflow_count { + metrics::WORKFLOW_TOTAL + .with_label_values(&[&workflow_name]) + .set(count); + } + + for (workflow_name, count) in active_workflow_count { + metrics::WORKFLOW_ACTIVE + .with_label_values(&[&workflow_name]) + .set(count); + } + + for (workflow_name, error, count) in dead_workflow_count { + metrics::WORKFLOW_DEAD + .with_label_values(&[&workflow_name, &error]) + .set(count); + } + + for (workflow_name, count) in sleeping_workflow_count { + metrics::WORKFLOW_SLEEPING + .with_label_values(&[&workflow_name]) + .set(count); + } + + for (signal_name, count) in pending_signal_count { + metrics::SIGNAL_PENDING + .with_label_values(&[&signal_name]) + .set(count); + } + + // Clear lock + sql_execute!( + [self] + " + UPDATE db_workflow.workflow_metrics + SET + worker_instance_id = NULL, + lock_ts = NULL + WHERE worker_instance_id = $1 + ", + worker_instance_id, + ) + .await?; + } + + Ok(()) + } + async fn dispatch_workflow( &self, ray_id: Uuid, diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/mod.rs index cdcd2d7db2..90ef073a91 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/mod.rs @@ -4,6 +4,7 @@ use foundationdb::future::FdbValue; pub mod signal; pub mod wake; +pub mod worker_instance; pub mod workflow; pub trait FormalKey { diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/worker_instance.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/worker_instance.rs new file mode 100644 index 0000000000..072cb2ca04 --- /dev/null +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/worker_instance.rs @@ -0,0 +1,100 @@ +use std::{borrow::Cow, result::Result::Ok}; + +// TODO: Use concrete error types +use anyhow::*; +use foundationdb::tuple::{PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset}; +use uuid::Uuid; + +use super::FormalKey; + +#[derive(Debug)] +pub struct LastPingTsKey { + worker_instance_id: Uuid, +} + +impl LastPingTsKey { + pub fn new(worker_instance_id: Uuid) -> Self { + LastPingTsKey { worker_instance_id } + } +} + +impl FormalKey for LastPingTsKey { + // Timestamp. + type Value = i64; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(i64::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for LastPingTsKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + "worker_instance", + "data", + self.worker_instance_id, + "last_ping_ts", + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for LastPingTsKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, worker_instance_id, _)) = + <(Cow, Cow, Uuid, Cow)>::unpack(input, tuple_depth)?; + let v = LastPingTsKey { worker_instance_id }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct MetricsLockKey {} + +impl MetricsLockKey { + pub fn new() -> Self { + MetricsLockKey {} + } +} + +impl FormalKey for MetricsLockKey { + // Timestamp. + type Value = i64; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(i64::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for MetricsLockKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ("worker_instance", "metrics_lock"); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for MetricsLockKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _)) = <(Cow, Cow)>::unpack(input, tuple_depth)?; + let v = MetricsLockKey {}; + + Ok((input, v)) + } +} diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/workflow.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/workflow.rs index c270559717..1a58f8fe5b 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/workflow.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/keys/workflow.rs @@ -11,24 +11,29 @@ use uuid::Uuid; use super::{FormalChunkedKey, FormalKey}; pub struct LeaseKey { - workflow_id: Uuid, + pub workflow_id: Uuid, } impl LeaseKey { pub fn new(workflow_id: Uuid) -> Self { LeaseKey { workflow_id } } + + pub fn subspace() -> LeaseSubspaceKey { + LeaseSubspaceKey::new() + } } impl FormalKey for LeaseKey { - type Value = Uuid; + /// Workflow name, worker instance id. + type Value = (String, Uuid); fn deserialize(&self, raw: &[u8]) -> Result { - Ok(Uuid::from_slice(raw)?) + serde_json::from_slice(raw).map_err(Into::into) } fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.as_bytes().to_vec()) + serde_json::to_vec(&value).map_err(Into::into) } } @@ -53,6 +58,25 @@ impl<'de> TupleUnpack<'de> for LeaseKey { } } +pub struct LeaseSubspaceKey {} + +impl LeaseSubspaceKey { + pub fn new() -> Self { + LeaseSubspaceKey {} + } +} + +impl TuplePack for LeaseSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ("workflow", "lease"); + t.pack(w, tuple_depth) + } +} + pub struct TagKey { workflow_id: Uuid, pub k: String, diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs index 46f7fffed7..17a88a4a23 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs @@ -2,7 +2,7 @@ // TODO: Move code to smaller functions for readability use std::{ - collections::HashSet, + collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -44,6 +44,10 @@ type GlobalError = WorkflowError; const QUERY_RETRY_MS: usize = 500; /// Maximum times a query ran by this database adapter is retried. const MAX_QUERY_RETRIES: usize = 4; +/// How long before considering the leases of a given worker instance "expired". +const WORKER_INSTANCE_EXPIRED_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30); +/// How long before overwriting an existing metrics lock. +const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30); /// For SQL macros. const CONTEXT_NAME: &str = "chirp_workflow_fdb_sqlite_nats_engine"; /// For NATS wake mechanism. @@ -155,6 +159,225 @@ impl Database for DatabaseFdbSqliteNats { } } + async fn clear_expired_leases(&self, _worker_instance_id: Uuid) -> WorkflowResult<()> { + let (expired_worker_instance_ids, expired_workflow_count) = self + .pools + .fdb()? + .run(|tx, _mc| async move { + let now = rivet_util::timestamp::now(); + + let mut last_ping_cache: Vec<(Uuid, i64)> = Vec::new(); + let mut expired_workflow_count = 0; + let mut expired_worker_instance_ids = Vec::new(); + + let lease_subspace = self + .subspace + .subspace(&keys::workflow::LeaseKey::subspace()); + + // List all active leases + let mut stream = tx.get_ranges_keyvalues( + fdb::RangeOption { + mode: StreamingMode::WantAll, + ..(&lease_subspace).into() + }, + // Not SERIALIZABLE because we don't want this to conflict with other queries which write + // leases + SNAPSHOT, + ); + + while let Some(lease_key_entry) = stream.try_next().await? { + let lease_key = self + .subspace + .unpack::(lease_key_entry.key()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + let (workflow_name, worker_instance_id) = lease_key + .deserialize(lease_key_entry.value()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + let last_ping_key = + keys::worker_instance::LastPingTsKey::new(worker_instance_id); + + // Get last ping of worker instance for this lease + let last_ping_ts = if let Some((_, last_ping_ts)) = last_ping_cache + .iter() + .find(|(k, _)| k == &worker_instance_id) + { + *last_ping_ts + } else if let Some(last_ping_entry) = tx + .get( + &self.subspace.pack(&last_ping_key), + // Not SERIALIZABLE because we don't want this to conflict + SNAPSHOT, + ) + .await? + { + // Deserialize last ping value + let last_ping_ts = last_ping_key + .deserialize(&last_ping_entry) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + // Update cache + last_ping_cache.push((worker_instance_id, last_ping_ts)); + + last_ping_ts + } else { + // Update cache + last_ping_cache.push((worker_instance_id, 0)); + + 0 + }; + + // Worker has not pinged within the threshold, meaning the lease is expired + if last_ping_ts < now - WORKER_INSTANCE_EXPIRED_THRESHOLD_MS { + // NOTE: We add a read conflict here so this query conflicts with any other + // `clear_expired_leases` queries running at the same time (will conflict with the + // following `tx.clear`). + tx.add_conflict_range( + lease_key_entry.key(), + lease_key_entry.key(), + ConflictRangeType::Read, + )?; + + // Clear lease + tx.clear(lease_key_entry.key()); + + // Add immediate wake for workflow + let wake_condition_key = keys::wake::WorkflowWakeConditionKey::new( + workflow_name.to_string(), + lease_key.workflow_id, + keys::wake::WakeCondition::Immediate, + ); + tx.set( + &self.subspace.pack(&wake_condition_key), + &wake_condition_key + .serialize(()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?, + ); + + expired_workflow_count += 1; + expired_worker_instance_ids.push(worker_instance_id); + } + } + + Ok((expired_worker_instance_ids, expired_workflow_count)) + }) + .await?; + + if expired_workflow_count != 0 { + tracing::info!( + worker_instance_ids=?expired_worker_instance_ids, + total_workflows=%expired_workflow_count, + "handled failover", + ); + } + + Ok(()) + } + + async fn publish_metrics(&self, worker_instance_id: Uuid) -> WorkflowResult<()> { + // Always update ping + metrics::WORKER_LAST_PING + .with_label_values(&[&worker_instance_id.to_string()]) + .set(rivet_util::timestamp::now()); + + // Attempt to be the only worker publishing metrics by writing to the lock key + let acquired_lock = self + .pools + .fdb()? + .run(|tx, _mc| async move { + let metrics_lock_key = keys::worker_instance::MetricsLockKey::new(); + + // Read existing lock + let lock_expired = if let Some(entry) = tx + .get(&self.subspace.pack(&metrics_lock_key), SERIALIZABLE) + .await? + { + let lock_ts = metrics_lock_key + .deserialize(&entry) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + lock_ts < rivet_util::timestamp::now() - METRICS_LOCK_TIMEOUT_MS + } else { + true + }; + + if lock_expired { + // Write to lock key. FDB transactions guarantee that if multiple workers are running this + // query at the same time only one will succeed which means only one will have the lock. + tx.set( + &self.subspace.pack(&metrics_lock_key), + &metrics_lock_key + .serialize(rivet_util::timestamp::now()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?, + ); + } + + Ok(lock_expired) + }) + .await?; + + if acquired_lock { + // TODO: Add FDB indexes for these metric queries and implement + + // // Get rid of metrics that don't exist in the db anymore (declarative) + // metrics::WORKFLOW_TOTAL.reset(); + // metrics::WORKFLOW_ACTIVE.reset(); + // metrics::WORKFLOW_DEAD.reset(); + // metrics::WORKFLOW_SLEEPING.reset(); + // metrics::SIGNAL_PENDING.reset(); + + // let ( + // total_workflow_count, + // active_workflow_count, + // dead_workflow_count, + // sleeping_workflow_count, + // pending_signal_count, + // ) = ; + + // for (workflow_name, count) in total_workflow_count { + // metrics::WORKFLOW_TOTAL + // .with_label_values(&[&workflow_name]) + // .set(count); + // } + + // for (workflow_name, count) in active_workflow_count { + // metrics::WORKFLOW_ACTIVE + // .with_label_values(&[&workflow_name]) + // .set(count); + // } + + // for (workflow_name, error, count) in dead_workflow_count { + // metrics::WORKFLOW_DEAD + // .with_label_values(&[&workflow_name, &error]) + // .set(count); + // } + + // for (workflow_name, count) in sleeping_workflow_count { + // metrics::WORKFLOW_SLEEPING + // .with_label_values(&[&workflow_name]) + // .set(count); + // } + + // for (signal_name, count) in pending_signal_count { + // metrics::SIGNAL_PENDING + // .with_label_values(&[&signal_name]) + // .set(count); + // } + + // Clear lock + self.pools + .fdb()? + .run(|tx, _mc| async move { + let metrics_lock_key = keys::worker_instance::MetricsLockKey::new(); + tx.clear(&self.subspace.pack(&metrics_lock_key)); + + Ok(()) + }) + .await?; + } + + Ok(()) + } + async fn dispatch_workflow( &self, ray_id: Uuid, @@ -473,8 +696,20 @@ impl Database for DatabaseFdbSqliteNats { let owned_filter = owned_filter.clone(); async move { + let now = rivet_util::timestamp::now(); + + // Update worker instance ping + let last_ping_key = + keys::worker_instance::LastPingTsKey::new(worker_instance_id); + tx.set( + &self.subspace.pack(&last_ping_key), + &last_ping_key + .serialize(now) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?, + ); + // All wake conditions with a timestamp after this timestamp will be pulled - let pull_before = rivet_util::timestamp::now() + let pull_before = now + i64::try_from(worker::TICK_INTERVAL.as_millis()) .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; @@ -527,15 +762,15 @@ impl Database for DatabaseFdbSqliteNats { .await?; // Check leases - let wf_ids = entries + let wf_name_by_id = entries .iter() - .map(|(_, key)| key.workflow_id) - .collect::>(); - let leased_wf_ids = futures_util::stream::iter(wf_ids) - .map(|wf_id| { + .map(|(_, key)| (key.workflow_id, key.workflow_name.clone())) + .collect::>(); + let leased_wf_ids = futures_util::stream::iter(wf_name_by_id) + .map(|(workflow_id, workflow_name)| { let tx = tx.clone(); async move { - let lease_key = keys::workflow::LeaseKey::new(wf_id); + let lease_key = keys::workflow::LeaseKey::new(workflow_id); let lease_key_buf = self.subspace.pack(&lease_key); // Check lease @@ -544,11 +779,13 @@ impl Database for DatabaseFdbSqliteNats { } else { tx.set( &lease_key_buf, - &lease_key.serialize(worker_instance_id).map_err(|x| { + &lease_key + .serialize((workflow_name, worker_instance_id)) + .map_err(|x| { fdb::FdbBindingError::CustomError(x.into()) })?, ); - Ok(Some(wf_id)) + Ok(Some(workflow_id)) } } }) diff --git a/packages/common/chirp-workflow/core/src/db/mod.rs b/packages/common/chirp-workflow/core/src/db/mod.rs index e1952bc029..1e1648e5b2 100644 --- a/packages/common/chirp-workflow/core/src/db/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/mod.rs @@ -29,12 +29,23 @@ pub trait Database: Send { /// When using a wake worker instead of a polling worker, this function will return once the worker /// should fetch the database again. async fn wake(&self) -> WorkflowResult<()> { - unimplemented!( + tracing::debug!( "{} does not implement Database::wake", std::any::type_name::(), ); + + std::future::pending::<()>().await; + + Ok(()) } + /// Releases workflows that were leased by workers that have since expired (their last ping has passed + /// the expired threshold), making them eligible to be run again. Called periodically. + async fn clear_expired_leases(&self, worker_instance_id: Uuid) -> WorkflowResult<()>; + + /// Function to publish metrics. Called periodically. + async fn publish_metrics(&self, worker_instance_id: Uuid) -> WorkflowResult<()>; + /// Writes a new workflow to the database. If unique is set, this should return the existing workflow ID /// (if one exists) instead of the given workflow ID. async fn dispatch_workflow( @@ -58,6 +69,7 @@ pub trait Database: Send { ) -> WorkflowResult>; /// Pulls workflows for processing by the worker. Will only pull workflows with names matching the filter. + /// Should also update the ping of this worker instance. async fn pull_workflows( &self, worker_instance_id: Uuid, diff --git a/packages/common/chirp-workflow/core/src/metrics.rs b/packages/common/chirp-workflow/core/src/metrics.rs index cf16c20184..c3ba4996e3 100644 --- a/packages/common/chirp-workflow/core/src/metrics.rs +++ b/packages/common/chirp-workflow/core/src/metrics.rs @@ -1,10 +1,10 @@ use rivet_metrics::{prometheus::*, BUCKETS, REGISTRY}; lazy_static::lazy_static! { - pub static ref WORKER_ACTIVE: IntGaugeVec = register_int_gauge_vec_with_registry!( - "chirp_workflow_worker_active", - "Total active workers.", - &[], + pub static ref WORKER_LAST_PING: IntGaugeVec = register_int_gauge_vec_with_registry!( + "chirp_workflow_worker_last_ping", + "Last ping of a worker instance as a unix ts.", + &["worker_instance_id"], *REGISTRY, ).unwrap(); pub static ref PULL_WORKFLOWS_DURATION: GaugeVec = register_gauge_vec_with_registry!( diff --git a/packages/common/chirp-workflow/core/src/worker.rs b/packages/common/chirp-workflow/core/src/worker.rs index 4025972c9a..f483aa6c32 100644 --- a/packages/common/chirp-workflow/core/src/worker.rs +++ b/packages/common/chirp-workflow/core/src/worker.rs @@ -6,6 +6,7 @@ use uuid::Uuid; use crate::{ctx::WorkflowCtx, db::DatabaseHandle, metrics, registry::RegistryHandle, utils}; pub const TICK_INTERVAL: Duration = Duration::from_secs(10); +const GC_INTERVAL: Duration = Duration::from_secs(20); /// Used to spawn a new thread that indefinitely polls the database for new workflows. Only pulls workflows /// that are registered in its registry. After pulling, the workflows are ran and their state is written to @@ -29,33 +30,8 @@ impl Worker { } } - /// Polls the database periodically - pub async fn poll_start( - mut self, - config: rivet_config::Config, - pools: rivet_pools::Pools, - ) -> GlobalResult<()> { - tracing::debug!( - worker_instance_id = ?self.worker_instance_id, - registered_workflows = ?self.registry.size(), - "started worker instance", - ); - - let shared_client = chirp_client::SharedClient::from_env(pools.clone())?; - let cache = rivet_cache::CacheInner::from_env(pools.clone())?; - - // Regular tick interval to poll the database - let mut interval = tokio::time::interval(TICK_INTERVAL); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - interval.tick().await; - self.tick(&shared_client, &config, &pools, &cache).await?; - } - } - /// Polls the database periodically or wakes immediately when `Database::wake` finishes - pub async fn wake_start( + pub async fn start( mut self, config: rivet_config::Config, pools: rivet_pools::Pools, @@ -73,10 +49,22 @@ impl Worker { let mut interval = tokio::time::interval(TICK_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // Regular tick interval to clear expired leases + let mut gc_interval = tokio::time::interval(GC_INTERVAL); + gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { tokio::select! { _ = interval.tick() => {}, - res = self.db.wake() => res?, + _ = gc_interval.tick() => { + self.gc(); + self.publish_metrics(); + continue; + }, + res = self.db.wake() => { + res?; + interval.reset(); + }, } self.tick(&shared_client, &config, &pools, &cache).await?; @@ -136,4 +124,32 @@ impl Worker { Ok(()) } + + fn gc(&self) { + let db = self.db.clone(); + let worker_instance_id = self.worker_instance_id; + + tokio::task::spawn( + async move { + if let Err(err) = db.clear_expired_leases(worker_instance_id).await { + tracing::error!(?err, "unhandled gc error"); + } + } + .in_current_span(), + ); + } + + fn publish_metrics(&self) { + let db = self.db.clone(); + let worker_instance_id = self.worker_instance_id; + + tokio::task::spawn( + async move { + if let Err(err) = db.publish_metrics(worker_instance_id).await { + tracing::error!(?err, "unhandled metrics error"); + } + } + .in_current_span(), + ); + } } diff --git a/packages/infra/edge-server/Cargo.toml b/packages/infra/edge-server/Cargo.toml index 65b4200a5e..febd1d990b 100644 --- a/packages/infra/edge-server/Cargo.toml +++ b/packages/infra/edge-server/Cargo.toml @@ -25,8 +25,6 @@ s3-util.workspace = true # Standalone edge-monolith-workflow-worker.workspace = true pegboard-ws.workspace = true -workflow-gc.workspace = true -workflow-metrics-publish.workspace = true # API # api-monolith-edge.workspace = true diff --git a/packages/infra/edge-server/src/run_config.rs b/packages/infra/edge-server/src/run_config.rs index cba3c57995..82fe6695ce 100644 --- a/packages/infra/edge-server/src/run_config.rs +++ b/packages/infra/edge-server/src/run_config.rs @@ -22,9 +22,6 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { ServiceKind::Singleton, |config, pools| Box::pin(workflow_metrics_publish::start(config, pools)), ), - Service::new("workflow_gc", ServiceKind::Singleton, |config, pools| { - Box::pin(workflow_gc::start(config, pools)) - }), ]; Ok(RunConfigData { diff --git a/packages/infra/server/Cargo.toml b/packages/infra/server/Cargo.toml index f0354e2b03..93f29faeab 100644 --- a/packages/infra/server/Cargo.toml +++ b/packages/infra/server/Cargo.toml @@ -41,8 +41,6 @@ nomad-monitor.workspace = true pegboard-gc.workspace = true pegboard-metrics-publish.workspace = true pegboard-ws.workspace = true -workflow-gc.workspace = true -workflow-metrics-publish.workspace = true # Cron telemetry-beacon.workspace = true diff --git a/packages/infra/server/src/run_config.rs b/packages/infra/server/src/run_config.rs index fb28a933c3..33ea8bb964 100644 --- a/packages/infra/server/src/run_config.rs +++ b/packages/infra/server/src/run_config.rs @@ -29,14 +29,6 @@ pub fn config(rivet_config: rivet_config::Config) -> Result { ServiceKind::Standalone, |config, pools| Box::pin(monolith_workflow_worker::start(config, pools)), ), - Service::new( - "workflow_metrics_publish", - ServiceKind::Singleton, - |config, pools| Box::pin(workflow_metrics_publish::start(config, pools)), - ), - Service::new("workflow_gc", ServiceKind::Singleton, |config, pools| { - Box::pin(workflow_gc::start(config, pools)) - }), Service::new("mm_gc", ServiceKind::Singleton, |config, pools| { Box::pin(mm_gc::start(config, pools)) }), diff --git a/packages/services/workflow/standalone/gc/Cargo.toml b/packages/services/workflow/standalone/gc/Cargo.toml deleted file mode 100644 index d870591e0d..0000000000 --- a/packages/services/workflow/standalone/gc/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "workflow-gc" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -chirp-client.workspace = true -rivet-connection.workspace = true -rivet-health-checks.workspace = true -rivet-metrics.workspace = true -rivet-operation.workspace = true -rivet-runtime.workspace = true -tokio = { version = "1.40", features = ["full"] } -tracing = "0.1" -tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } -tracing-logfmt = "0.3" -rivet-config.workspace = true - -[dev-dependencies] -chirp-worker.workspace = true diff --git a/packages/services/workflow/standalone/gc/src/lib.rs b/packages/services/workflow/standalone/gc/src/lib.rs deleted file mode 100644 index 930b3f7715..0000000000 --- a/packages/services/workflow/standalone/gc/src/lib.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::{collections::HashSet, time::Duration}; - -use rivet_operation::prelude::*; - -const WORKER_INSTANCE_LOST_THRESHOLD: i64 = util::duration::seconds(30); - -pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> GlobalResult<()> { - let mut interval = tokio::time::interval(Duration::from_secs(15)); - loop { - interval.tick().await; - - let ts = util::timestamp::now(); - run_from_env(config.clone(), pools.clone(), ts).await?; - } -} - -#[tracing::instrument(skip_all)] -pub async fn run_from_env( - config: rivet_config::Config, - pools: rivet_pools::Pools, - ts: i64, -) -> GlobalResult<()> { - let client = chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("workflow-gc"); - let cache = rivet_cache::CacheInner::from_env(pools.clone())?; - let ctx = OperationContext::new( - "workflow-gc".into(), - Duration::from_secs(60), - config, - rivet_connection::Connection::new(client, pools, cache), - Uuid::new_v4(), - Uuid::new_v4(), - util::timestamp::now(), - util::timestamp::now(), - (), - ); - - // Reset all workflows on worker instances that have not had a ping in the last 30 seconds - let rows = sql_fetch_all!( - [ctx, (Uuid, Uuid,)] - " - UPDATE db_workflow.workflows AS w - SET - worker_instance_id = NULL, - wake_immediate = true, - wake_deadline_ts = NULL, - wake_signals = ARRAY[], - wake_sub_workflow_id = NULL - FROM db_workflow.worker_instances AS wi - WHERE - wi.last_ping_ts < $1 AND - wi.worker_instance_id = w.worker_instance_id AND - w.output IS NULL AND - w.silence_ts IS NULL AND - -- Check for any wake condition so we don't restart a permanently dead workflow - ( - w.wake_immediate OR - w.wake_deadline_ts IS NOT NULL OR - cardinality(w.wake_signals) > 0 OR - w.wake_sub_workflow_id IS NOT NULL - ) - RETURNING w.workflow_id, wi.worker_instance_id - ", - ts - WORKER_INSTANCE_LOST_THRESHOLD, - ) - .await?; - - if !rows.is_empty() { - let unique_worker_instance_ids = rows - .iter() - .map(|(_, worker_instance_id)| worker_instance_id) - .collect::>(); - - tracing::info!( - worker_instance_ids=?unique_worker_instance_ids, - total_workflows=%rows.len(), - "handled failover", - ); - } - - Ok(()) -} diff --git a/packages/services/workflow/standalone/gc/tests/integration.rs b/packages/services/workflow/standalone/gc/tests/integration.rs deleted file mode 100644 index 7ed1d1bb00..0000000000 --- a/packages/services/workflow/standalone/gc/tests/integration.rs +++ /dev/null @@ -1,20 +0,0 @@ -use chirp_worker::prelude::*; -use tracing_subscriber::prelude::*; - -use ::workflow_gc::run_from_env; - -#[tokio::test(flavor = "multi_thread")] -async fn basic() { - tracing_subscriber::registry() - .with( - tracing_logfmt::builder() - .layer() - .with_filter(tracing_subscriber::filter::LevelFilter::INFO), - ) - .init(); - - let pools = rivet_pools::Pools::new(config).await.unwrap(); - - // TODO: - run_from_env(util::timestamp::now(), pools).await.unwrap(); -} diff --git a/packages/services/workflow/standalone/metrics-publish/Cargo.toml b/packages/services/workflow/standalone/metrics-publish/Cargo.toml deleted file mode 100644 index 8246b0f410..0000000000 --- a/packages/services/workflow/standalone/metrics-publish/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -name = "workflow-metrics-publish" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -chirp-client.workspace = true -chirp-workflow.workspace = true -rivet-config.workspace = true -rivet-connection.workspace = true -rivet-health-checks.workspace = true -rivet-metrics.workspace = true -rivet-runtime.workspace = true -tokio = { version = "1.40", features = ["full"] } -tracing = "0.1" -tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } - -[dependencies.sqlx] -workspace = true - -[dev-dependencies] diff --git a/packages/services/workflow/standalone/metrics-publish/src/lib.rs b/packages/services/workflow/standalone/metrics-publish/src/lib.rs deleted file mode 100644 index 824c10d052..0000000000 --- a/packages/services/workflow/standalone/metrics-publish/src/lib.rs +++ /dev/null @@ -1,163 +0,0 @@ -use chirp_workflow::prelude::*; - -pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> GlobalResult<()> { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(15)); - loop { - interval.tick().await; - - run_from_env(config.clone(), pools.clone()).await?; - } -} - -#[tracing::instrument(skip_all)] -pub async fn run_from_env( - config: rivet_config::Config, - pools: rivet_pools::Pools, -) -> GlobalResult<()> { - let client = - chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("workflow-metrics-publish"); - let cache = rivet_cache::CacheInner::from_env(pools.clone())?; - let ctx = StandaloneCtx::new( - chirp_workflow::compat::db_from_pools(&pools).await?, - config, - rivet_connection::Connection::new(client, pools, cache), - "workflow-metrics-publish", - ) - .await?; - - let ( - (active_worker_count,), - total_workflow_count, - active_workflow_count, - dead_workflow_count, - sleeping_workflow_count, - pending_signal_count, - ) = tokio::try_join!( - sql_fetch_one!( - [ctx, (i64,)] - " - SELECT COUNT(*) - FROM db_workflow.worker_instances AS OF SYSTEM TIME '-1s' - WHERE last_ping_ts > $1 - ", - util::timestamp::now() - util::duration::seconds(30), - ), - sql_fetch_all!( - [ctx, (String, i64)] - " - SELECT workflow_name, COUNT(*) - FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' - GROUP BY workflow_name - ", - ), - sql_fetch_all!( - [ctx, (String, i64)] - " - SELECT workflow_name, COUNT(*) - FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' - WHERE - output IS NULL AND - worker_instance_id IS NOT NULL AND - silence_ts IS NULL - GROUP BY workflow_name - ", - ), - sql_fetch_all!( - [ctx, (String, String, i64)] - " - SELECT workflow_name, error, COUNT(*) - FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' - WHERE - error IS NOT NULL AND - output IS NULL AND - silence_ts IS NULL AND - wake_immediate = FALSE AND - wake_deadline_ts IS NULL AND - cardinality(wake_signals) = 0 AND - wake_sub_workflow_id IS NULL - GROUP BY workflow_name, error - ", - ), - sql_fetch_all!( - [ctx, (String, i64)] - " - SELECT workflow_name, COUNT(*) - FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' - WHERE - worker_instance_id IS NULL AND - output IS NULL AND - silence_ts IS NULL AND - ( - wake_immediate OR - wake_deadline_ts IS NOT NULL OR - cardinality(wake_signals) > 0 OR - wake_sub_workflow_id IS NOT NULL - ) - GROUP BY workflow_name - ", - ), - sql_fetch_all!( - [ctx, (String, i64)] - " - SELECT signal_name, COUNT(*) - FROM ( - SELECT signal_name - FROM db_workflow.signals - WHERE - ack_ts IS NULL AND - silence_ts IS NULL - UNION ALL - SELECT signal_name - FROM db_workflow.tagged_signals - WHERE - ack_ts IS NULL AND - silence_ts IS NULL - ) AS OF SYSTEM TIME '-1s' - GROUP BY signal_name - ", - ), - )?; - - // Get rid of metrics that don't exist in the db anymore (declarative) - chirp_workflow::metrics::WORKFLOW_TOTAL.reset(); - chirp_workflow::metrics::WORKFLOW_ACTIVE.reset(); - chirp_workflow::metrics::WORKFLOW_DEAD.reset(); - chirp_workflow::metrics::WORKFLOW_SLEEPING.reset(); - chirp_workflow::metrics::SIGNAL_PENDING.reset(); - - chirp_workflow::metrics::WORKER_ACTIVE - .with_label_values(&[]) - .set(active_worker_count); - - for (workflow_name, count) in total_workflow_count { - chirp_workflow::metrics::WORKFLOW_TOTAL - .with_label_values(&[&workflow_name]) - .set(count); - } - - for (workflow_name, count) in active_workflow_count { - chirp_workflow::metrics::WORKFLOW_ACTIVE - .with_label_values(&[&workflow_name]) - .set(count); - } - - for (workflow_name, error, count) in dead_workflow_count { - chirp_workflow::metrics::WORKFLOW_DEAD - .with_label_values(&[&workflow_name, &error]) - .set(count); - } - - for (workflow_name, count) in sleeping_workflow_count { - chirp_workflow::metrics::WORKFLOW_SLEEPING - .with_label_values(&[&workflow_name]) - .set(count); - } - - for (signal_name, count) in pending_signal_count { - chirp_workflow::metrics::SIGNAL_PENDING - .with_label_values(&[&signal_name]) - .set(count); - } - - Ok(()) -} diff --git a/packages/services/workflow/standalone/metrics-publish/tests/integration.rs b/packages/services/workflow/standalone/metrics-publish/tests/integration.rs deleted file mode 100644 index 6c8ea4d0f2..0000000000 --- a/packages/services/workflow/standalone/metrics-publish/tests/integration.rs +++ /dev/null @@ -1 +0,0 @@ -// TODO: