From 909ff4b12ff8dd2f2b282b0c913b14a27ebf7baa Mon Sep 17 00:00:00 2001 From: Tom Lienard Date: Fri, 24 Feb 2023 17:49:05 +0100 Subject: [PATCH] fix(serverless): reconnect Redis Pub/Sub on error (#614) --- .changeset/light-parents-lay.md | 5 + crates/serverless/src/deployments/pubsub.rs | 353 ++++++++++---------- 2 files changed, 190 insertions(+), 168 deletions(-) create mode 100644 .changeset/light-parents-lay.md diff --git a/.changeset/light-parents-lay.md b/.changeset/light-parents-lay.md new file mode 100644 index 000000000..850482740 --- /dev/null +++ b/.changeset/light-parents-lay.md @@ -0,0 +1,5 @@ +--- +'@lagon/serverless': patch +--- + +Reconnect Redis Pub/Sub on error diff --git a/crates/serverless/src/deployments/pubsub.rs b/crates/serverless/src/deployments/pubsub.rs index 2f49f667f..0655a31dd 100644 --- a/crates/serverless/src/deployments/pubsub.rs +++ b/crates/serverless/src/deployments/pubsub.rs @@ -65,201 +65,218 @@ pub async fn clear_deployments_cache( } } -pub fn listen_pub_sub( - bucket: Bucket, - deployments: Arc>>>, - pool: LocalPoolHandle, - cronjob: Arc>, -) -> JoinHandle> { - tokio::spawn(async move { - let url = env::var("REDIS_URL").expect("REDIS_URL must be set"); - let client = redis::Client::open(url)?; - let mut conn = client.get_connection()?; - let mut pub_sub = conn.as_pubsub(); +async fn run( + bucket: &Bucket, + deployments: &Arc>>>, + pool: &LocalPoolHandle, + cronjob: &Arc>, + client: &redis::Client, +) -> Result<()> { + let mut conn = client.get_connection()?; + let mut pub_sub = conn.as_pubsub(); - pub_sub.subscribe("deploy")?; - pub_sub.subscribe("undeploy")?; - pub_sub.subscribe("promote")?; + info!("Redis Pub/Sub connected"); - loop { - let msg = pub_sub.get_message()?; - let channel = msg.get_channel_name(); - let payload: String = msg.get_payload()?; + pub_sub.subscribe("deploy")?; + pub_sub.subscribe("undeploy")?; + pub_sub.subscribe("promote")?; - let value: Value = serde_json::from_str(&payload)?; + loop { + let msg = pub_sub.get_message()?; + let channel = msg.get_channel_name(); + let payload: String = msg.get_payload()?; - let cron = value["cron"].as_str(); - let cron_region = value["cronRegion"].as_str().unwrap().to_string(); + let value: Value = serde_json::from_str(&payload)?; - // Ignore deployments that have a cron set but where - // the region isn't this node' region - if cron.is_some() && cron_region != REGION.to_string() { - continue; - } - - let cron = cron.map(|cron| cron.to_string()); - - let deployment = Deployment { - id: value["deploymentId"].as_str().unwrap().to_string(), - function_id: value["functionId"].as_str().unwrap().to_string(), - function_name: value["functionName"].as_str().unwrap().to_string(), - assets: value["assets"] - .as_array() - .unwrap() - .iter() - .map(|v| v.as_str().unwrap().to_string()) - .collect(), - domains: value["domains"] - .as_array() - .unwrap() - .iter() - .map(|v| v.as_str().unwrap().to_string()) - .collect(), - environment_variables: value["env"] - .as_object() - .unwrap() - .iter() - .map(|(k, v)| (k.to_owned(), v.as_str().unwrap().to_string())) - .collect::>(), - memory: value["memory"].as_u64().unwrap() as usize, - timeout: value["timeout"].as_u64().unwrap() as usize, - startup_timeout: value["startupTimeout"].as_u64().unwrap() as usize, - is_production: value["isProduction"].as_bool().unwrap(), - cron, - }; - - match channel { - "deploy" => { - match download_deployment(&deployment, &bucket).await { - Ok(_) => { - increment_counter!( - "lagon_deployments", - "status" => "success", - "deployment" => deployment.id.clone(), - "function" => deployment.function_id.clone(), - "region" => REGION.clone(), - ); - - let mut deployments = deployments.write().await; - let domains = deployment.get_domains(); - let deployment = Arc::new(deployment); - - for domain in &domains { - deployments.insert(domain.clone(), Arc::clone(&deployment)); - } + let cron = value["cron"].as_str(); + let cron_region = value["cronRegion"].as_str().unwrap().to_string(); - clear_deployments_cache(domains, &pool, "deployment").await; + // Ignore deployments that have a cron set but where + // the region isn't this node' region + if cron.is_some() && cron_region != REGION.to_string() { + continue; + } - if deployment.should_run_cron() { - let mut cronjob = cronjob.lock().await; - let id = deployment.id.clone(); + let cron = cron.map(|cron| cron.to_string()); + + let deployment = Deployment { + id: value["deploymentId"].as_str().unwrap().to_string(), + function_id: value["functionId"].as_str().unwrap().to_string(), + function_name: value["functionName"].as_str().unwrap().to_string(), + assets: value["assets"] + .as_array() + .unwrap() + .iter() + .map(|v| v.as_str().unwrap().to_string()) + .collect(), + domains: value["domains"] + .as_array() + .unwrap() + .iter() + .map(|v| v.as_str().unwrap().to_string()) + .collect(), + environment_variables: value["env"] + .as_object() + .unwrap() + .iter() + .map(|(k, v)| (k.to_owned(), v.as_str().unwrap().to_string())) + .collect::>(), + memory: value["memory"].as_u64().unwrap() as usize, + timeout: value["timeout"].as_u64().unwrap() as usize, + startup_timeout: value["startupTimeout"].as_u64().unwrap() as usize, + is_production: value["isProduction"].as_bool().unwrap(), + cron, + }; - if let Err(error) = cronjob.add(deployment).await { - error!(deployment = id; "Failed to register cron: {}", error); - } - } + match channel { + "deploy" => { + match download_deployment(&deployment, bucket).await { + Ok(_) => { + increment_counter!( + "lagon_deployments", + "status" => "success", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); + + let mut deployments = deployments.write().await; + let domains = deployment.get_domains(); + let deployment = Arc::new(deployment); + + for domain in &domains { + deployments.insert(domain.clone(), Arc::clone(&deployment)); } - Err(error) => { - increment_counter!( - "lagon_deployments", - "status" => "error", - "deployment" => deployment.id.clone(), - "function" => deployment.function_id.clone(), - "region" => REGION.clone(), - ); - error!( - deployment = deployment.id; - "Failed to download deployment: {}", error - ); - } - }; - } - "undeploy" => { - match rm_deployment(&deployment.id) { - Ok(_) => { - increment_counter!( - "lagon_undeployments", - "status" => "success", - "deployment" => deployment.id.clone(), - "function" => deployment.function_id.clone(), - "region" => REGION.clone(), - ); - - let mut deployments = deployments.write().await; - let domains = deployment.get_domains(); - - for domain in &domains { - deployments.remove(domain); - } - clear_deployments_cache(domains, &pool, "undeployment").await; + clear_deployments_cache(domains, pool, "deployment").await; - if deployment.should_run_cron() { - let mut cronjob = cronjob.lock().await; + if deployment.should_run_cron() { + let mut cronjob = cronjob.lock().await; + let id = deployment.id.clone(); - if let Err(error) = cronjob.remove(&deployment.id).await { - error!(deployment = deployment.id; "Failed to remove cron: {}", error); - } + if let Err(error) = cronjob.add(deployment).await { + error!(deployment = id; "Failed to register cron: {}", error); } } - Err(error) => { - increment_counter!( - "lagon_undeployments", - "status" => "error", - "deployment" => deployment.id.clone(), - "function" => deployment.function_id.clone(), - "region" => REGION.clone(), - ); - error!(deployment = deployment.id; "Failed to delete deployment: {}", error); - } - }; - } - "promote" => { - increment_counter!( - "lagon_promotion", - "deployment" => deployment.id.clone(), - "function" => deployment.function_id.clone(), - "region" => REGION.clone(), - ); - - let previous_id = value["previousDeploymentId"].as_str().unwrap(); - let mut deployments = deployments.write().await; - - if let Some(deployment) = deployments.get(previous_id) { - let mut unpromoted_deployment = deployment.as_ref().clone(); - unpromoted_deployment.is_production = false; - - for domain in deployment.get_domains() { - deployments.remove(&domain); + } + Err(error) => { + increment_counter!( + "lagon_deployments", + "status" => "error", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); + error!( + deployment = deployment.id; + "Failed to download deployment: {}", error + ); + } + }; + } + "undeploy" => { + match rm_deployment(&deployment.id) { + Ok(_) => { + increment_counter!( + "lagon_undeployments", + "status" => "success", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); + + let mut deployments = deployments.write().await; + let domains = deployment.get_domains(); + + for domain in &domains { + deployments.remove(domain); } - let unpromoted_deployment = Arc::new(unpromoted_deployment); + clear_deployments_cache(domains, pool, "undeployment").await; - for domain in unpromoted_deployment.get_domains() { - deployments.insert(domain, Arc::clone(&unpromoted_deployment)); + if deployment.should_run_cron() { + let mut cronjob = cronjob.lock().await; + + if let Err(error) = cronjob.remove(&deployment.id).await { + error!(deployment = deployment.id; "Failed to remove cron: {}", error); + } } } + Err(error) => { + increment_counter!( + "lagon_undeployments", + "status" => "error", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); + error!(deployment = deployment.id; "Failed to delete deployment: {}", error); + } + }; + } + "promote" => { + increment_counter!( + "lagon_promotion", + "deployment" => deployment.id.clone(), + "function" => deployment.function_id.clone(), + "region" => REGION.clone(), + ); + + let previous_id = value["previousDeploymentId"].as_str().unwrap(); + let mut deployments = deployments.write().await; + + if let Some(deployment) = deployments.get(previous_id) { + let mut unpromoted_deployment = deployment.as_ref().clone(); + unpromoted_deployment.is_production = false; + + for domain in deployment.get_domains() { + deployments.remove(&domain); + } - let deployment = Arc::new(deployment); - let domains = deployment.get_domains(); + let unpromoted_deployment = Arc::new(unpromoted_deployment); - for domain in &domains { - deployments.insert(domain.clone(), Arc::clone(&deployment)); + for domain in unpromoted_deployment.get_domains() { + deployments.insert(domain, Arc::clone(&unpromoted_deployment)); } + } + + let deployment = Arc::new(deployment); + let domains = deployment.get_domains(); - clear_deployments_cache(domains, &pool, "promotion").await; + for domain in &domains { + deployments.insert(domain.clone(), Arc::clone(&deployment)); + } - if deployment.should_run_cron() { - let mut cronjob = cronjob.lock().await; - let id = deployment.id.clone(); + clear_deployments_cache(domains, pool, "promotion").await; - if let Err(error) = cronjob.add(deployment).await { - error!(deployment = id; "Failed to register cron: {}", error); - } + if deployment.should_run_cron() { + let mut cronjob = cronjob.lock().await; + let id = deployment.id.clone(); + + if let Err(error) = cronjob.add(deployment).await { + error!(deployment = id; "Failed to register cron: {}", error); } } - _ => warn!("Unknown channel: {}", channel), - }; + } + _ => warn!("Unknown channel: {}", channel), + }; + } +} + +pub fn listen_pub_sub( + bucket: Bucket, + deployments: Arc>>>, + pool: LocalPoolHandle, + cronjob: Arc>, +) -> JoinHandle> { + tokio::spawn(async move { + let url = env::var("REDIS_URL").expect("REDIS_URL must be set"); + let client = redis::Client::open(url)?; + + loop { + if let Err(error) = run(&bucket, &deployments, &pool, &cronjob, &client).await { + error!("Pub/sub error: {}", error); + } } }) }