Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
declare a PG application name visible in PG stats
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Feb 2, 2024
1 parent da6250b commit 0426a81
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
1 change: 1 addition & 0 deletions hook-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ async fn main() {
// side, but we don't need more than one queue for now.
&config.queue_name,
&config.database_url,
"hook-api",
)
.await
.expect("failed to initialize queue");
Expand Down
11 changes: 6 additions & 5 deletions hook-common/src/pgqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time;
use async_trait::async_trait;
use chrono;
use serde;
use sqlx::postgres::{PgPool, PgPoolOptions};
use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
use thiserror::Error;

/// Enumeration of errors for operations with PgQueue.
Expand Down Expand Up @@ -524,11 +524,12 @@ impl PgQueue {
///
/// * `queue_name`: A name for the queue we are going to initialize.
/// * `url`: A URL pointing to where the PostgreSQL database is hosted.
pub async fn new(queue_name: &str, url: &str) -> PgQueueResult<Self> {
pub async fn new(queue_name: &str, url: &str, app_name: &'static str) -> PgQueueResult<Self> {
let name = queue_name.to_owned();
let pool = PgPoolOptions::new()
.connect_lazy(url)
.map_err(|error| PgQueueError::PoolCreationError { error })?;
let options = PgConnectOptions::from_str(url)
.map_err(|error| PgQueueError::PoolCreationError { error })?
.application_name(app_name);
let pool = PgPoolOptions::new().connect_lazy_with(options);

Ok(Self { name, pool })
}
Expand Down
9 changes: 6 additions & 3 deletions hook-janitor/src/webhooks.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::str::FromStr;
use std::time::{Duration, Instant};

use async_trait::async_trait;
Expand All @@ -7,7 +8,7 @@ use hook_common::webhook::WebhookJobError;
use rdkafka::error::KafkaError;
use rdkafka::producer::{FutureProducer, FutureRecord};
use serde_json::error::Error as SerdeError;
use sqlx::postgres::{PgPool, PgPoolOptions, Postgres};
use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions, Postgres};
use sqlx::types::{chrono, Uuid};
use sqlx::{Row, Transaction};
use thiserror::Error;
Expand Down Expand Up @@ -158,10 +159,12 @@ impl WebhookCleaner {
kafka_producer: FutureProducer<KafkaContext>,
app_metrics_topic: String,
) -> Result<Self> {
let options = PgConnectOptions::from_str(database_url)
.map_err(|error| WebhookCleanerError::PoolCreationError { error })?
.application_name("hook-janitor");
let pg_pool = PgPoolOptions::new()
.acquire_timeout(Duration::from_secs(10))
.connect_lazy(database_url)
.map_err(|error| WebhookCleanerError::PoolCreationError { error })?;
.connect_lazy_with(options);

Ok(Self {
pg_pool,
Expand Down
10 changes: 7 additions & 3 deletions hook-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ async fn main() -> Result<(), WorkerError> {
retry_policy_builder
};

let queue = PgQueue::new(config.queue_name.as_str(), &config.database_url)
.await
.expect("failed to initialize queue");
let queue = PgQueue::new(
config.queue_name.as_str(),
&config.database_url,
"hook-worker",
)
.await
.expect("failed to initialize queue");

let worker = WebhookWorker::new(
&config.worker_name,
Expand Down

0 comments on commit 0426a81

Please sign in to comment.