Skip to content

Commit

Permalink
fix(serverless): reconnect Redis Pub/Sub on error (#614)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuiiBz authored Feb 24, 2023
1 parent 56df58c commit 909ff4b
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 168 deletions.
5 changes: 5 additions & 0 deletions .changeset/light-parents-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Reconnect Redis Pub/Sub on error
353 changes: 185 additions & 168 deletions crates/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,201 +65,218 @@ pub async fn clear_deployments_cache(
}
}

pub fn listen_pub_sub(
bucket: Bucket,
deployments: Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
pool: LocalPoolHandle,
cronjob: Arc<Mutex<Cronjob>>,
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let client = redis::Client::open(url)?;
let mut conn = client.get_connection()?;
let mut pub_sub = conn.as_pubsub();
async fn run(
bucket: &Bucket,
deployments: &Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
pool: &LocalPoolHandle,
cronjob: &Arc<Mutex<Cronjob>>,
client: &redis::Client,
) -> Result<()> {
let mut conn = client.get_connection()?;
let mut pub_sub = conn.as_pubsub();

pub_sub.subscribe("deploy")?;
pub_sub.subscribe("undeploy")?;
pub_sub.subscribe("promote")?;
info!("Redis Pub/Sub connected");

loop {
let msg = pub_sub.get_message()?;
let channel = msg.get_channel_name();
let payload: String = msg.get_payload()?;
pub_sub.subscribe("deploy")?;
pub_sub.subscribe("undeploy")?;
pub_sub.subscribe("promote")?;

let value: Value = serde_json::from_str(&payload)?;
loop {
let msg = pub_sub.get_message()?;
let channel = msg.get_channel_name();
let payload: String = msg.get_payload()?;

let cron = value["cron"].as_str();
let cron_region = value["cronRegion"].as_str().unwrap().to_string();
let value: Value = serde_json::from_str(&payload)?;

// Ignore deployments that have a cron set but where
// the region isn't this node' region
if cron.is_some() && cron_region != REGION.to_string() {
continue;
}

let cron = cron.map(|cron| cron.to_string());

let deployment = Deployment {
id: value["deploymentId"].as_str().unwrap().to_string(),
function_id: value["functionId"].as_str().unwrap().to_string(),
function_name: value["functionName"].as_str().unwrap().to_string(),
assets: value["assets"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect(),
domains: value["domains"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect(),
environment_variables: value["env"]
.as_object()
.unwrap()
.iter()
.map(|(k, v)| (k.to_owned(), v.as_str().unwrap().to_string()))
.collect::<HashMap<_, _>>(),
memory: value["memory"].as_u64().unwrap() as usize,
timeout: value["timeout"].as_u64().unwrap() as usize,
startup_timeout: value["startupTimeout"].as_u64().unwrap() as usize,
is_production: value["isProduction"].as_bool().unwrap(),
cron,
};

match channel {
"deploy" => {
match download_deployment(&deployment, &bucket).await {
Ok(_) => {
increment_counter!(
"lagon_deployments",
"status" => "success",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();
let deployment = Arc::new(deployment);

for domain in &domains {
deployments.insert(domain.clone(), Arc::clone(&deployment));
}
let cron = value["cron"].as_str();
let cron_region = value["cronRegion"].as_str().unwrap().to_string();

clear_deployments_cache(domains, &pool, "deployment").await;
// Ignore deployments that have a cron set but where
// the region isn't this node' region
if cron.is_some() && cron_region != REGION.to_string() {
continue;
}

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
let id = deployment.id.clone();
let cron = cron.map(|cron| cron.to_string());

let deployment = Deployment {
id: value["deploymentId"].as_str().unwrap().to_string(),
function_id: value["functionId"].as_str().unwrap().to_string(),
function_name: value["functionName"].as_str().unwrap().to_string(),
assets: value["assets"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect(),
domains: value["domains"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect(),
environment_variables: value["env"]
.as_object()
.unwrap()
.iter()
.map(|(k, v)| (k.to_owned(), v.as_str().unwrap().to_string()))
.collect::<HashMap<_, _>>(),
memory: value["memory"].as_u64().unwrap() as usize,
timeout: value["timeout"].as_u64().unwrap() as usize,
startup_timeout: value["startupTimeout"].as_u64().unwrap() as usize,
is_production: value["isProduction"].as_bool().unwrap(),
cron,
};

if let Err(error) = cronjob.add(deployment).await {
error!(deployment = id; "Failed to register cron: {}", error);
}
}
match channel {
"deploy" => {
match download_deployment(&deployment, bucket).await {
Ok(_) => {
increment_counter!(
"lagon_deployments",
"status" => "success",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();
let deployment = Arc::new(deployment);

for domain in &domains {
deployments.insert(domain.clone(), Arc::clone(&deployment));
}
Err(error) => {
increment_counter!(
"lagon_deployments",
"status" => "error",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);
error!(
deployment = deployment.id;
"Failed to download deployment: {}", error
);
}
};
}
"undeploy" => {
match rm_deployment(&deployment.id) {
Ok(_) => {
increment_counter!(
"lagon_undeployments",
"status" => "success",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();

for domain in &domains {
deployments.remove(domain);
}

clear_deployments_cache(domains, &pool, "undeployment").await;
clear_deployments_cache(domains, pool, "deployment").await;

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
let id = deployment.id.clone();

if let Err(error) = cronjob.remove(&deployment.id).await {
error!(deployment = deployment.id; "Failed to remove cron: {}", error);
}
if let Err(error) = cronjob.add(deployment).await {
error!(deployment = id; "Failed to register cron: {}", error);
}
}
Err(error) => {
increment_counter!(
"lagon_undeployments",
"status" => "error",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);
error!(deployment = deployment.id; "Failed to delete deployment: {}", error);
}
};
}
"promote" => {
increment_counter!(
"lagon_promotion",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let previous_id = value["previousDeploymentId"].as_str().unwrap();
let mut deployments = deployments.write().await;

if let Some(deployment) = deployments.get(previous_id) {
let mut unpromoted_deployment = deployment.as_ref().clone();
unpromoted_deployment.is_production = false;

for domain in deployment.get_domains() {
deployments.remove(&domain);
}
Err(error) => {
increment_counter!(
"lagon_deployments",
"status" => "error",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);
error!(
deployment = deployment.id;
"Failed to download deployment: {}", error
);
}
};
}
"undeploy" => {
match rm_deployment(&deployment.id) {
Ok(_) => {
increment_counter!(
"lagon_undeployments",
"status" => "success",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();

for domain in &domains {
deployments.remove(domain);
}

let unpromoted_deployment = Arc::new(unpromoted_deployment);
clear_deployments_cache(domains, pool, "undeployment").await;

for domain in unpromoted_deployment.get_domains() {
deployments.insert(domain, Arc::clone(&unpromoted_deployment));
if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;

if let Err(error) = cronjob.remove(&deployment.id).await {
error!(deployment = deployment.id; "Failed to remove cron: {}", error);
}
}
}
Err(error) => {
increment_counter!(
"lagon_undeployments",
"status" => "error",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);
error!(deployment = deployment.id; "Failed to delete deployment: {}", error);
}
};
}
"promote" => {
increment_counter!(
"lagon_promotion",
"deployment" => deployment.id.clone(),
"function" => deployment.function_id.clone(),
"region" => REGION.clone(),
);

let previous_id = value["previousDeploymentId"].as_str().unwrap();
let mut deployments = deployments.write().await;

if let Some(deployment) = deployments.get(previous_id) {
let mut unpromoted_deployment = deployment.as_ref().clone();
unpromoted_deployment.is_production = false;

for domain in deployment.get_domains() {
deployments.remove(&domain);
}

let deployment = Arc::new(deployment);
let domains = deployment.get_domains();
let unpromoted_deployment = Arc::new(unpromoted_deployment);

for domain in &domains {
deployments.insert(domain.clone(), Arc::clone(&deployment));
for domain in unpromoted_deployment.get_domains() {
deployments.insert(domain, Arc::clone(&unpromoted_deployment));
}
}

let deployment = Arc::new(deployment);
let domains = deployment.get_domains();

clear_deployments_cache(domains, &pool, "promotion").await;
for domain in &domains {
deployments.insert(domain.clone(), Arc::clone(&deployment));
}

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
let id = deployment.id.clone();
clear_deployments_cache(domains, pool, "promotion").await;

if let Err(error) = cronjob.add(deployment).await {
error!(deployment = id; "Failed to register cron: {}", error);
}
if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
let id = deployment.id.clone();

if let Err(error) = cronjob.add(deployment).await {
error!(deployment = id; "Failed to register cron: {}", error);
}
}
_ => warn!("Unknown channel: {}", channel),
};
}
_ => warn!("Unknown channel: {}", channel),
};
}
}

pub fn listen_pub_sub(
bucket: Bucket,
deployments: Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
pool: LocalPoolHandle,
cronjob: Arc<Mutex<Cronjob>>,
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let client = redis::Client::open(url)?;

loop {
if let Err(error) = run(&bucket, &deployments, &pool, &cronjob, &client).await {
error!("Pub/sub error: {}", error);
}
}
})
}

0 comments on commit 909ff4b

Please sign in to comment.