diff --git a/hook-api/src/config.rs b/hook-api/src/config.rs index 3fe88b3..55fa404 100644 --- a/hook-api/src/config.rs +++ b/hook-api/src/config.rs @@ -13,6 +13,9 @@ pub struct Config { #[envconfig(default = "default")] pub queue_name: String, + + #[envconfig(default = "100")] + pub max_pg_connections: u32, } impl Config { diff --git a/hook-api/src/main.rs b/hook-api/src/main.rs index 4fbbdfb..9a9a9fd 100644 --- a/hook-api/src/main.rs +++ b/hook-api/src/main.rs @@ -28,6 +28,8 @@ async fn main() { // side, but we don't need more than one queue for now. &config.queue_name, &config.database_url, + config.max_pg_connections, + "hook-api", ) .await .expect("failed to initialize queue"); diff --git a/hook-common/src/pgqueue.rs b/hook-common/src/pgqueue.rs index fa2b5eb..4dab918 100644 --- a/hook-common/src/pgqueue.rs +++ b/hook-common/src/pgqueue.rs @@ -7,7 +7,7 @@ use std::time; use async_trait::async_trait; use chrono; use serde; -use sqlx::postgres::{PgPool, PgPoolOptions}; +use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; use thiserror::Error; /// Enumeration of errors for operations with PgQueue. @@ -524,11 +524,19 @@ impl PgQueue { /// /// * `queue_name`: A name for the queue we are going to initialize. /// * `url`: A URL pointing to where the PostgreSQL database is hosted. - pub async fn new(queue_name: &str, url: &str) -> PgQueueResult { + pub async fn new( + queue_name: &str, + url: &str, + max_connections: u32, + app_name: &'static str, + ) -> PgQueueResult { let name = queue_name.to_owned(); + let options = PgConnectOptions::from_str(url) + .map_err(|error| PgQueueError::PoolCreationError { error })? + .application_name(app_name); let pool = PgPoolOptions::new() - .connect_lazy(url) - .map_err(|error| PgQueueError::PoolCreationError { error })?; + .max_connections(max_connections) + .connect_lazy_with(options); Ok(Self { name, pool }) } diff --git a/hook-janitor/src/webhooks.rs b/hook-janitor/src/webhooks.rs index ee8ff43..5cdf431 100644 --- a/hook-janitor/src/webhooks.rs +++ b/hook-janitor/src/webhooks.rs @@ -1,3 +1,4 @@ +use std::str::FromStr; use std::time::{Duration, Instant}; use async_trait::async_trait; @@ -7,7 +8,7 @@ use hook_common::webhook::WebhookJobError; use rdkafka::error::KafkaError; use rdkafka::producer::{FutureProducer, FutureRecord}; use serde_json::error::Error as SerdeError; -use sqlx::postgres::{PgPool, PgPoolOptions, Postgres}; +use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions, Postgres}; use sqlx::types::{chrono, Uuid}; use sqlx::{Row, Transaction}; use thiserror::Error; @@ -158,10 +159,12 @@ impl WebhookCleaner { kafka_producer: FutureProducer, app_metrics_topic: String, ) -> Result { + let options = PgConnectOptions::from_str(database_url) + .map_err(|error| WebhookCleanerError::PoolCreationError { error })? + .application_name("hook-janitor"); let pg_pool = PgPoolOptions::new() .acquire_timeout(Duration::from_secs(10)) - .connect_lazy(database_url) - .map_err(|error| WebhookCleanerError::PoolCreationError { error })?; + .connect_lazy_with(options); Ok(Self { pg_pool, diff --git a/hook-worker/src/config.rs b/hook-worker/src/config.rs index 74342f7..477ff74 100644 --- a/hook-worker/src/config.rs +++ b/hook-worker/src/config.rs @@ -29,6 +29,9 @@ pub struct Config { #[envconfig(default = "1024")] pub max_concurrent_jobs: usize, + #[envconfig(default = "100")] + pub max_pg_connections: u32, + #[envconfig(nested = true)] pub retry_policy: RetryPolicyConfig, diff --git a/hook-worker/src/main.rs b/hook-worker/src/main.rs index 345fa3d..6cad3fd 100644 --- a/hook-worker/src/main.rs +++ b/hook-worker/src/main.rs @@ -35,9 +35,14 @@ async fn main() -> Result<(), WorkerError> { retry_policy_builder }; - let queue = PgQueue::new(config.queue_name.as_str(), &config.database_url) - .await - .expect("failed to initialize queue"); + let queue = PgQueue::new( + config.queue_name.as_str(), + &config.database_url, + config.max_pg_connections, + "hook-worker", + ) + .await + .expect("failed to initialize queue"); let worker = WebhookWorker::new( &config.worker_name,