Skip to content

Commit

Permalink
feat(serverless,dashboard): add Cron Functions (#920)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuiiBz authored Jun 3, 2023
1 parent f05147b commit 10d4086
Show file tree
Hide file tree
Showing 33 changed files with 630 additions and 313 deletions.
5 changes: 5 additions & 0 deletions .changeset/friendly-readers-unite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/dashboard': patch
---

Add links to documentation in Function settings
7 changes: 7 additions & 0 deletions .changeset/olive-trains-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@lagon/dashboard': patch
'@lagon/serverless': patch
'@lagon/docs': patch
---

Add Cron Functions
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/serverless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ futures = "0.3.28"
clickhouse = "0.11.4"
bytes = "1.4.0"
serde = { version = "1.0", features = ["derive"] }
uuid = "1.3.3"

[build-dependencies]
lagon-runtime = { path = "../runtime" }
Expand Down
227 changes: 163 additions & 64 deletions crates/serverless/src/cronjob.rs

Large diffs are not rendered by default.

13 changes: 1 addition & 12 deletions crates/serverless/src/deployments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,11 @@ type QueryResult = (
Option<String>,
);

pub async fn get_deployments<D>(
mut conn: PooledConn,
downloader: Arc<D>,
// cronjob: Arc<Mutex<Cronjob>>,
) -> Result<Deployments>
pub async fn get_deployments<D>(mut conn: PooledConn, downloader: Arc<D>) -> Result<Deployments>
where
D: Downloader,
{
let deployments = Arc::new(DashMap::new());

let mut deployments_list: HashMap<String, Deployment> = HashMap::new();

conn.query_map(
Expand Down Expand Up @@ -187,12 +182,6 @@ OR
for domain in deployment.get_domains() {
deployments.insert(domain, Arc::clone(&deployment));
}

// if deployment.should_run_cron() {
// if let Err(error) = cronjob.add(deployment).await {
// error!("Failed to register cron: {}", error);
// }
// }
}))
.await;

Expand Down
61 changes: 35 additions & 26 deletions crates/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{download_deployment, filesystem::rm_deployment, Deployment, Deployments};
use crate::{serverless::Workers, REGION};
use crate::{cronjob::Cronjob, serverless::Workers, REGION};
use anyhow::Result;
use futures::StreamExt;
use lagon_runtime_isolate::IsolateEvent;
Expand All @@ -23,7 +23,7 @@ async fn run<D, P>(
downloader: Arc<D>,
deployments: Deployments,
workers: Workers,
// cronjob: Arc<Mutex<Cronjob>>,
cronjob: Arc<Mutex<Cronjob>>,
pubsub: Arc<Mutex<P>>,
) -> Result<()>
where
Expand All @@ -42,8 +42,12 @@ where
let cron_region = value["cronRegion"].as_str().unwrap().to_string();

// Ignore deployments that have a cron set but where
// the region isn't this node' region
if cron.is_some() && cron_region != REGION.to_string() {
// the region isn't this node' region, except for undeploys
// because we might remove the cron from the old region
if cron.is_some()
&& cron_region != REGION.to_string()
&& kind != PubSubMessageKind::Undeploy
{
continue;
}

Expand Down Expand Up @@ -99,14 +103,14 @@ where
deployments.insert(domain.clone(), Arc::clone(&deployment));
}

// if deployment.should_run_cron() {
// let mut cronjob = cronjob.lock().await;
// let id = deployment.id.clone();
if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
let id = deployment.id.clone();

// if let Err(error) = cronjob.add(deployment).await {
// error!(deployment = id; "Failed to register cron: {}", error);
// }
// }
if let Err(error) = cronjob.add(deployment).await {
error!(deployment = id; "Failed to register cron: {}", error);
}
}
}
Err(error) => {
increment_counter!(
Expand Down Expand Up @@ -147,13 +151,13 @@ where
)
.await;

// if deployment.should_run_cron() {
// let mut cronjob = cronjob.lock().await;
if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;

// if let Err(error) = cronjob.remove(&deployment.id).await {
// error!(deployment = deployment.id; "Failed to remove cron: {}", error);
// }
// }
if let Err(error) = cronjob.remove(&deployment.id).await {
error!(deployment = deployment.id; "Failed to remove cron: {}", error);
}
}
}
Err(error) => {
increment_counter!(
Expand Down Expand Up @@ -202,14 +206,19 @@ where
clear_deployment_cache(previous_id.to_string(), workers, String::from("promotion"))
.await;

// if deployment.should_run_cron() {
// let mut cronjob = cronjob.lock().await;
// let id = deployment.id.clone();
let mut cronjob = cronjob.lock().await;

if let Err(error) = cronjob.remove(&previous_id.to_string()).await {
error!(deployment = deployment.id; "Failed to remove cron: {}", error);
}

// if let Err(error) = cronjob.add(deployment).await {
// error!(deployment = id; "Failed to register cron: {}", error);
// }
// }
if deployment.should_run_cron() {
let id = deployment.id.clone();

if let Err(error) = cronjob.add(deployment).await {
error!(deployment = id; "Failed to register cron: {}", error);
}
}
}
_ => warn!("Unknown message kind: {:?}, {}", kind, payload),
};
Expand All @@ -222,7 +231,7 @@ pub fn listen_pub_sub<D, P>(
downloader: Arc<D>,
deployments: Deployments,
workers: Workers,
// cronjob: Arc<Mutex<Cronjob>>,
cronjob: Arc<Mutex<Cronjob>>,
pubsub: Arc<Mutex<P>>,
) where
D: Downloader + Send + Sync + 'static,
Expand All @@ -236,7 +245,7 @@ pub fn listen_pub_sub<D, P>(
Arc::clone(&downloader),
Arc::clone(&deployments),
Arc::clone(&workers),
// Arc::clone(&cronjob),
Arc::clone(&cronjob),
Arc::clone(&pubsub),
)
.await
Expand Down
3 changes: 1 addition & 2 deletions crates/serverless/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use once_cell::sync::Lazy;
use std::env;

// TODO add back cron jobs
// pub mod cronjob;
pub mod clickhouse;
pub mod cronjob;
pub mod deployments;
pub mod serverless;

Expand Down
16 changes: 2 additions & 14 deletions crates/serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ async fn main() -> Result<()> {
let pool = Pool::new(opts)?;
let conn = pool.get_conn()?;

// let cronjob = Arc::new(Mutex::new(Cronjob::new().await));
let bucket = get_bucket()?;
let downloader = Arc::new(S3BucketDownloader::new(bucket));

Expand All @@ -68,19 +67,8 @@ async fn main() -> Result<()> {
let client = create_client();
run_migrations(&client).await?;

let deployments = get_deployments(
conn,
Arc::clone(&downloader), /*, Arc::clone(&cronjob)*/
)
.await?;
let serverless = start(
deployments,
addr,
downloader,
pubsub,
client, /*, cronjob*/
)
.await?;
let deployments = get_deployments(conn, Arc::clone(&downloader)).await?;
let serverless = start(deployments, addr, downloader, pubsub, client).await?;
tokio::spawn(serverless).await?;

runtime.dispose();
Expand Down
52 changes: 40 additions & 12 deletions crates/serverless/src/serverless.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
clickhouse::{LogRow, RequestRow},
cronjob::Cronjob,
deployments::{cache::run_cache_clear_task, pubsub::listen_pub_sub, Deployments},
REGION, SNAPSHOT_BLOB,
};
Expand Down Expand Up @@ -29,6 +30,7 @@ use lagon_serverless_pubsub::PubSubListener;
use log::{as_debug, error, info, warn};
use metrics::{decrement_gauge, histogram, increment_counter, increment_gauge};
use std::{
collections::HashSet,
convert::Infallible,
env,
future::Future,
Expand Down Expand Up @@ -357,7 +359,6 @@ pub async fn start<D, P>(
downloader: Arc<D>,
pubsub: P,
client: Client,
// cronjob: Arc<Mutex<Cronjob>>,
) -> Result<impl Future<Output = ()> + Send>
where
D: Downloader + Send + Sync + 'static,
Expand All @@ -366,15 +367,6 @@ where
let workers = Arc::new(DashMap::new());
let pubsub = Arc::new(TokioMutex::new(pubsub));

listen_pub_sub(
Arc::clone(&downloader),
Arc::clone(&deployments),
Arc::clone(&workers),
// Arc::clone(&cronjob),
pubsub,
);
run_cache_clear_task(&client, Arc::clone(&deployments), Arc::clone(&workers));

let insertion_interval = Duration::from_secs(1);
let inserters = Arc::new(Mutex::new((
client
Expand All @@ -385,6 +377,44 @@ where
.with_period(Some(insertion_interval)),
)));

let (log_sender, log_receiver) = flume::unbounded::<(String, String, Metadata)>();
let cronjob = Arc::new(TokioMutex::new(
Cronjob::new(log_sender.clone(), Arc::clone(&inserters)).await,
));

let mut cron_deployments = HashSet::new();

for deployment in deployments.iter() {
let deployment = deployment.value();

// Make sure we only register the cron once, since each
// deployment can have multiple domains
if cron_deployments.contains(&deployment.id) {
continue;
}

cron_deployments.insert(deployment.id.clone());

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;

if let Err(error) = cronjob.add(deployment.clone()).await {
error!("Failed to register cron: {}", error);
}
}
}

drop(cron_deployments);

listen_pub_sub(
Arc::clone(&downloader),
Arc::clone(&deployments),
Arc::clone(&workers),
Arc::clone(&cronjob),
pubsub,
);
run_cache_clear_task(&client, Arc::clone(&deployments), Arc::clone(&workers));

let inserters_handle = Arc::clone(&inserters);
tokio::spawn(async move {
loop {
Expand All @@ -402,12 +432,10 @@ where
}
});

let (log_sender, log_receiver) = flume::unbounded::<(String, String, Metadata)>();
let inserters_handle = Arc::clone(&inserters);
tokio::spawn(async move {
while let Ok(log) = log_receiver.recv_async().await {
let mut inserters = inserters_handle.lock().await;

if let Err(error) = inserters
.1
.write(&LogRow {
Expand Down
13 changes: 6 additions & 7 deletions crates/serverless/tests/assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ async fn html_assets() -> Result<()> {
Arc::new(FakeDownloader),
FakePubSub::default(),
client,
// Arc::new(Mutex::new(Cronjob::new().await)),
)
.await?;
tokio::spawn(serverless);
Expand Down Expand Up @@ -95,7 +94,6 @@ async fn assets_nested() -> Result<()> {
Arc::new(FakeDownloader),
FakePubSub::default(),
client,
// Arc::new(Mutex::new(Cronjob::new().await)),
)
.await?;
tokio::spawn(serverless);
Expand Down Expand Up @@ -148,15 +146,16 @@ async fn set_content_type() -> Result<()> {
Arc::new(FakeDownloader),
FakePubSub::default(),
client,
// Arc::new(Mutex::new(Cronjob::new().await)),
)
.await?;
tokio::spawn(serverless);

// TODO: set default content type?
// let response = reqwest::get("http://127.0.0.1:4000").await?;
// assert_eq!(response.status(), 200);
// assert_eq!(response.headers()["content-type"], "");
let response = reqwest::get("http://127.0.0.1:4000").await?;
assert_eq!(response.status(), 200);
assert_eq!(
response.headers()["content-type"],
"text/plain;charset=UTF-8"
);

let response = reqwest::get("http://127.0.0.1:4000/hello").await?;
assert_eq!(response.status(), 200);
Expand Down
4 changes: 0 additions & 4 deletions crates/serverless/tests/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ async fn simple() -> Result<()> {
Arc::new(FakeDownloader),
FakePubSub::default(),
client,
// Arc::new(Mutex::new(Cronjob::new().await)),
)
.await?;
tokio::spawn(serverless);
Expand Down Expand Up @@ -77,7 +76,6 @@ async fn custom_domains() -> Result<()> {
Arc::new(FakeDownloader),
FakePubSub::default(),
client,
// Arc::new(Mutex::new(Cronjob::new().await)),
)
.await?;
tokio::spawn(serverless);
Expand Down Expand Up @@ -125,7 +123,6 @@ async fn reuse_isolate() -> Result<()> {
Arc::new(FakeDownloader),
FakePubSub::default(),
client,
// Arc::new(Mutex::new(Cronjob::new().await)),
)
.await?;
tokio::spawn(serverless);
Expand Down Expand Up @@ -167,7 +164,6 @@ async fn reuse_isolate_across_domains() -> Result<()> {
Arc::new(FakeDownloader),
FakePubSub::default(),
client,
// Arc::new(Mutex::new(Cronjob::new().await)),
)
.await?;
tokio::spawn(serverless);
Expand Down
Loading

4 comments on commit 10d4086

@vercel
Copy link

@vercel vercel bot commented on 10d4086 Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

dashboard – ./packages/dashboard

dashboard-lagon.vercel.app
dash.lagon.app
dashboard-git-main-lagon.vercel.app

@vercel
Copy link

@vercel vercel bot commented on 10d4086 Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

www – ./www

www-lagon.vercel.app
lagon.app
www-git-main-lagon.vercel.app
lagon.dev

@vercel
Copy link

@vercel vercel bot commented on 10d4086 Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

docs – ./packages/docs

docs-git-main-lagon.vercel.app
lagon-docs.vercel.app
docs.lagon.app
docs-lagon.vercel.app

@vercel
Copy link

@vercel vercel bot commented on 10d4086 Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

storybook – ./packages/ui

storybook-swart-eight.vercel.app
storybook-git-main-lagon.vercel.app
storybook-lagon.vercel.app
ui.lagon.app

Please sign in to comment.