Skip to content

Commit

Permalink
refactor: accept blob id from caller
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Dec 25, 2023
1 parent 49a6b7a commit 971a932
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 60 deletions.
3 changes: 2 additions & 1 deletion apps/keck/src/server/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ pub async fn set_blob(
})
.filter_map(|data| future::ready(data.ok()));

let (id, blob) = stream_to_blob(body).await;
if let Ok(id) = context
.storage
.blobs()
.put_blob_stream(Some(workspace.clone()), body)
.put_blob(Some(workspace.clone()), id, blob)
.await
{
if has_error {
Expand Down
2 changes: 1 addition & 1 deletion apps/keck/src/server/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use axum::{
};
use doc::doc_apis;
use jwst_rpc::{BroadcastChannels, RpcContextImpl};
use jwst_storage::{BlobStorageType, JwstStorage, JwstStorageResult};
use jwst_storage::{stream_to_blob, BlobStorageType, JwstStorage, JwstStorageResult};
use tokio::sync::RwLock;

use super::*;
Expand Down
9 changes: 3 additions & 6 deletions apps/keck/src/server/sync/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,9 @@ impl Context {
})
.filter_map(|data| future::ready(data.ok()));

if let Ok(id) = self
.get_storage()
.blobs()
.put_blob_stream(workspace.clone(), stream)
.await
{
let (id, blob) = stream_to_blob(stream).await;

Check warning on line 97 in apps/keck/src/server/sync/blobs.rs

View check run for this annotation

Codecov / codecov/patch

apps/keck/src/server/sync/blobs.rs#L97

Added line #L97 was not covered by tests

if let Ok(id) = self.get_storage().blobs().put_blob(workspace.clone(), id, blob).await {

Check warning on line 99 in apps/keck/src/server/sync/blobs.rs

View check run for this annotation

Codecov / codecov/patch

apps/keck/src/server/sync/blobs.rs#L99

Added line #L99 was not covered by tests
if has_error {
let _ = self.get_storage().blobs().delete_blob(workspace, id).await;
StatusCode::INTERNAL_SERVER_ERROR.into_response()
Expand Down
1 change: 1 addition & 0 deletions apps/keck/src/server/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub use jwst_logger::{debug, error, info, warn};
pub use jwst_storage::stream_to_blob;
pub use nanoid::nanoid;
pub use serde::{Deserialize, Serialize};
9 changes: 1 addition & 8 deletions libs/jwst-core/src/types/blob.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::collections::HashMap;

use bytes::Bytes;
use chrono::NaiveDateTime;
use futures::Stream;

use super::*;

Expand All @@ -29,12 +27,7 @@ pub trait BlobStorage<E = JwstError> {
id: String,
params: Option<HashMap<String, String>>,
) -> JwstResult<BlobMetadata, E>;
async fn put_blob_stream(
&self,
workspace: Option<String>,
stream: impl Stream<Item = Bytes> + Send,
) -> JwstResult<String, E>;
async fn put_blob(&self, workspace: Option<String>, blob: Vec<u8>) -> JwstResult<String, E>;
async fn put_blob(&self, workspace: Option<String>, id: String, blob: Vec<u8>) -> JwstResult<String, E>;
async fn delete_blob(&self, workspace: Option<String>, id: String) -> JwstResult<bool, E>;
async fn delete_workspace(&self, workspace_id: String) -> JwstResult<(), E>;
async fn get_blobs_size(&self, workspaces: Vec<String>) -> JwstResult<i64, E>;
Expand Down
7 changes: 5 additions & 2 deletions libs/jwst-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ use std::{path::PathBuf, sync::Arc, time::Duration};

use async_trait::async_trait;
use chrono::Utc;
use futures::{Future, Stream};
use futures::Future;
use jwst_core::{DocStorage, JwstResult, Workspace};
use jwst_logger::{debug, error, info, trace, warn};
use path_ext::PathExt;
use rate_limiter::{get_bucket, is_sqlite, Bucket};
use sea_orm::{prelude::*, ConnectOptions, Database, DbErr, QuerySelect, Set};
#[cfg(feature = "bucket")]
pub use storage::blobs::MixedBucketDBParam;
pub use storage::{blobs::BlobStorageType, JwstStorage};
pub use storage::{
blobs::{stream_to_blob, BlobStorageType},
JwstStorage,
};
pub use types::{JwstStorageError, JwstStorageResult};

#[inline]
Expand Down
29 changes: 3 additions & 26 deletions libs/jwst-storage/src/storage/blobs/blob_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use jwst_core::{Base64Engine, URL_SAFE_ENGINE};
use sea_orm::FromQueryResult;
use sha2::{Digest, Sha256};

use super::*;

Expand Down Expand Up @@ -208,33 +206,12 @@ impl BlobStorage<JwstStorageError> for BlobDBStorage {
}
}

async fn put_blob_stream(
&self,
workspace: Option<String>,
stream: impl Stream<Item = Bytes> + Send,
) -> JwstStorageResult<String> {
let _lock = self.bucket.write().await;
let workspace = workspace.unwrap_or("__default__".into());

let (hash, blob) = get_hash(stream).await;

if self.insert(&workspace, &hash, &blob).await.is_ok() {
Ok(hash)
} else {
Err(JwstStorageError::WorkspaceNotFound(workspace))
}
}

async fn put_blob(&self, workspace: Option<String>, blob: Vec<u8>) -> JwstStorageResult<String> {
async fn put_blob(&self, workspace: Option<String>, id: String, blob: Vec<u8>) -> JwstStorageResult<String> {
let _lock = self.bucket.write().await;
let workspace = workspace.unwrap_or("__default__".into());
let mut hasher = Sha256::new();

hasher.update(&blob);
let hash = URL_SAFE_ENGINE.encode(hasher.finalize());

if self.insert(&workspace, &hash, &blob).await.is_ok() {
Ok(hash)
if self.insert(&workspace, &id, &blob).await.is_ok() {
Ok(id)
} else {
Err(JwstStorageError::WorkspaceNotFound(workspace))
}
Expand Down
21 changes: 6 additions & 15 deletions libs/jwst-storage/src/storage/blobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ mod utils;
#[cfg(test)]
pub use blob_storage::blobs_storage_test;
pub use blob_storage::BlobDBStorage;
use bytes::Bytes;
use jwst_core::{BlobMetadata, BlobStorage};
use jwst_storage_migration::Alias;
use thiserror::Error;
use tokio::task::JoinError;
use utils::{get_hash, InternalBlobMetadata};
pub use utils::stream_to_blob;
use utils::InternalBlobMetadata;

use super::{entities::prelude::*, *};

Expand Down Expand Up @@ -105,23 +105,14 @@ impl BlobStorage<JwstStorageError> for JwstBlobStorage {
}
}

async fn put_blob_stream(
async fn put_blob(
&self,
workspace: Option<String>,
stream: impl Stream<Item = Bytes> + Send,
id: String,
blob: Vec<u8>,
) -> JwstResult<String, JwstStorageError> {
match self {
JwstBlobStorage::Raw(db) => db.put_blob_stream(workspace, stream).await,
#[cfg(feature = "image")]
JwstBlobStorage::Auto(db) => db.put_blob_stream(workspace, stream).await,
#[cfg(feature = "bucket")]
JwstBlobStorage::Bucket(db) => db.put_blob_stream(workspace, stream).await,
}
}

async fn put_blob(&self, workspace: Option<String>, blob: Vec<u8>) -> JwstResult<String, JwstStorageError> {
match self {
JwstBlobStorage::Raw(db) => db.put_blob(workspace, blob).await,
JwstBlobStorage::Raw(db) => db.put_blob(workspace, id, blob).await,
#[cfg(feature = "image")]
JwstBlobStorage::Auto(db) => db.put_blob(workspace, blob).await,
#[cfg(feature = "bucket")]
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/storage/blobs/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use jwst_core::{Base64Engine, BlobMetadata, URL_SAFE_ENGINE};
use sea_orm::FromQueryResult;
use sha2::{Digest, Sha256};

pub async fn get_hash(stream: impl Stream<Item = Bytes> + Send) -> (String, Vec<u8>) {
pub async fn stream_to_blob(stream: impl Stream<Item = Bytes> + Send) -> (String, Vec<u8>) {
let mut hasher = Sha256::new();

let buffer = stream
Expand Down

0 comments on commit 971a932

Please sign in to comment.