Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

declare a PG application name visible in PG stats #62

Merged
merged 2 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hook-api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Config {

#[envconfig(default = "default")]
pub queue_name: String,

#[envconfig(default = "100")]
pub max_pg_connections: u32,
}

impl Config {
Expand Down
2 changes: 2 additions & 0 deletions hook-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ async fn main() {
// side, but we don't need more than one queue for now.
&config.queue_name,
&config.database_url,
config.max_pg_connections,
"hook-api",
)
.await
.expect("failed to initialize queue");
Expand Down
16 changes: 12 additions & 4 deletions hook-common/src/pgqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time;
use async_trait::async_trait;
use chrono;
use serde;
use sqlx::postgres::{PgPool, PgPoolOptions};
use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
use thiserror::Error;

/// Enumeration of errors for operations with PgQueue.
Expand Down Expand Up @@ -524,11 +524,19 @@ impl PgQueue {
///
/// * `queue_name`: A name for the queue we are going to initialize.
/// * `url`: A URL pointing to where the PostgreSQL database is hosted.
pub async fn new(queue_name: &str, url: &str) -> PgQueueResult<Self> {
pub async fn new(
queue_name: &str,
url: &str,
max_connections: u32,
app_name: &'static str,
) -> PgQueueResult<Self> {
let name = queue_name.to_owned();
let options = PgConnectOptions::from_str(url)
.map_err(|error| PgQueueError::PoolCreationError { error })?
.application_name(app_name);
let pool = PgPoolOptions::new()
.connect_lazy(url)
.map_err(|error| PgQueueError::PoolCreationError { error })?;
.max_connections(max_connections)
.connect_lazy_with(options);

Ok(Self { name, pool })
}
Expand Down
9 changes: 6 additions & 3 deletions hook-janitor/src/webhooks.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::str::FromStr;
use std::time::{Duration, Instant};

use async_trait::async_trait;
Expand All @@ -7,7 +8,7 @@ use hook_common::webhook::WebhookJobError;
use rdkafka::error::KafkaError;
use rdkafka::producer::{FutureProducer, FutureRecord};
use serde_json::error::Error as SerdeError;
use sqlx::postgres::{PgPool, PgPoolOptions, Postgres};
use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions, Postgres};
use sqlx::types::{chrono, Uuid};
use sqlx::{Row, Transaction};
use thiserror::Error;
Expand Down Expand Up @@ -158,10 +159,12 @@ impl WebhookCleaner {
kafka_producer: FutureProducer<KafkaContext>,
app_metrics_topic: String,
) -> Result<Self> {
let options = PgConnectOptions::from_str(database_url)
.map_err(|error| WebhookCleanerError::PoolCreationError { error })?
.application_name("hook-janitor");
let pg_pool = PgPoolOptions::new()
.acquire_timeout(Duration::from_secs(10))
.connect_lazy(database_url)
.map_err(|error| WebhookCleanerError::PoolCreationError { error })?;
.connect_lazy_with(options);

Ok(Self {
pg_pool,
Expand Down
3 changes: 3 additions & 0 deletions hook-worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub struct Config {
#[envconfig(default = "1024")]
pub max_concurrent_jobs: usize,

#[envconfig(default = "100")]
pub max_pg_connections: u32,

#[envconfig(nested = true)]
pub retry_policy: RetryPolicyConfig,

Expand Down
11 changes: 8 additions & 3 deletions hook-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ async fn main() -> Result<(), WorkerError> {
retry_policy_builder
};

let queue = PgQueue::new(config.queue_name.as_str(), &config.database_url)
.await
.expect("failed to initialize queue");
let queue = PgQueue::new(
config.queue_name.as_str(),
&config.database_url,
config.max_pg_connections,
"hook-worker",
)
.await
.expect("failed to initialize queue");

let worker = WebhookWorker::new(
&config.worker_name,
Expand Down
Loading