Skip to content

Commit

Permalink
fix: add boostrap_ipfs function, replace Lazy with OnceCell and updat…
Browse files Browse the repository at this point in the history
…e code
  • Loading branch information
saibatizoku committed Aug 8, 2024
1 parent 922543f commit da3e140
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 64 deletions.
2 changes: 2 additions & 0 deletions hermes/bin/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use console::Emoji;

use crate::{
cli::Cli,
ipfs::bootstrap_ipfs,
packaging::{
app::{build_app, ApplicationPackage},
sign::certificate::{self, Certificate},
Expand Down Expand Up @@ -41,6 +42,7 @@ impl Run {
package.validate(self.untrusted)?;

let hermes_home_dir = Cli::hermes_home()?;
bootstrap_ipfs(hermes_home_dir.as_path())?;
let app = build_app(&package, hermes_home_dir)?;

reactor::init()?;
Expand Down
50 changes: 29 additions & 21 deletions hermes/bin/src/ipfs/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ pub(crate) fn hermes_ipfs_add_file(
app_name: &ApplicationName, contents: IpfsFile,
) -> Result<IpfsPath, Errno> {
tracing::debug!(app_name = %app_name, "adding IPFS file");
let ipfs_path = HERMES_IPFS.file_add(contents)?.to_string();
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
let ipfs_path = ipfs.file_add(contents)?.to_string();
tracing::debug!(app_name = %app_name, path = %ipfs_path, "added IPFS file");
HERMES_IPFS.apps.pinned_file(app_name.clone(), &ipfs_path)?;
ipfs.apps.pinned_file(app_name.clone(), &ipfs_path)?;
Ok(ipfs_path)
}

Expand All @@ -42,8 +43,9 @@ pub(crate) fn hermes_ipfs_content_validate(
pub(crate) fn hermes_ipfs_get_file(
app_name: &ApplicationName, path: &IpfsPath,
) -> Result<IpfsFile, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
tracing::debug!(app_name = %app_name, path = %path, "get IPFS file");
let content = HERMES_IPFS.file_get(path)?;
let content = ipfs.file_get(path)?;
tracing::debug!(app_name = %app_name, path = %path, "got IPFS file");
Ok(content)
}
Expand All @@ -52,31 +54,34 @@ pub(crate) fn hermes_ipfs_get_file(
pub(crate) fn hermes_ipfs_pin_file(
app_name: &ApplicationName, path: &IpfsPath,
) -> Result<bool, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
tracing::debug!(app_name = %app_name, path = %path, "pin IPFS file");
let status = HERMES_IPFS.file_pin(path)?;
let status = ipfs.file_pin(path)?;
tracing::debug!(app_name = %app_name, path = %path, "pinned IPFS file");
HERMES_IPFS.apps.pinned_file(app_name.clone(), path)?;
ipfs.apps.pinned_file(app_name.clone(), path)?;
Ok(status)
}

/// Un-pin IPFS File
pub(crate) fn hermes_ipfs_unpin_file(
app_name: &ApplicationName, path: &IpfsPath,
) -> Result<bool, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
tracing::debug!(app_name = %app_name, path = %path, "un-pin IPFS file");
let status = HERMES_IPFS.file_unpin(path)?;
let status = ipfs.file_unpin(path)?;
tracing::debug!(app_name = %app_name, path = %path, "un-pinned IPFS file");
HERMES_IPFS.apps.unpinned_file(app_name, path)?;
ipfs.apps.unpinned_file(app_name, path)?;
Ok(status)
}

/// Get DHT Value
pub(crate) fn hermes_ipfs_get_dht_value(
app_name: &ApplicationName, key: DhtKey,
) -> Result<DhtValue, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
let key_str = format!("{key:x?}");
tracing::debug!(app_name = %app_name, dht_key = %key_str, "get DHT value");
let value = HERMES_IPFS.dht_get(key)?;
let value = ipfs.dht_get(key)?;
tracing::debug!(app_name = %app_name, dht_key = %key_str, "got DHT value");
Ok(value)
}
Expand All @@ -85,28 +90,29 @@ pub(crate) fn hermes_ipfs_get_dht_value(
pub(crate) fn hermes_ipfs_put_dht_value(
app_name: &ApplicationName, key: DhtKey, value: DhtValue,
) -> Result<bool, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
let key_str = format!("{key:x?}");
tracing::debug!(app_name = %app_name, dht_key = %key_str, "putting DHT value");
let status = HERMES_IPFS.dht_put(key.clone(), value)?;
let status = ipfs.dht_put(key.clone(), value)?;
tracing::debug!(app_name = %app_name, dht_key = %key_str, "have put DHT value");
HERMES_IPFS.apps.added_dht_key(app_name.clone(), key);
ipfs.apps.added_dht_key(app_name.clone(), key);
Ok(status)
}

/// Subscribe to a topic
pub(crate) fn hermes_ipfs_subscribe(
app_name: &ApplicationName, topic: PubsubTopic,
) -> Result<bool, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "subscribing to PubSub topic");
if HERMES_IPFS.apps.topic_subscriptions_contains(&topic) {
if ipfs.apps.topic_subscriptions_contains(&topic) {
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "topic subscription stream already exists");
} else {
let handle = HERMES_IPFS.pubsub_subscribe(&topic)?;
HERMES_IPFS.apps.added_topic_stream(topic.clone(), handle);
let handle = ipfs.pubsub_subscribe(&topic)?;
ipfs.apps.added_topic_stream(topic.clone(), handle);
tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "added subscription topic stream");
}
HERMES_IPFS
.apps
ipfs.apps
.added_app_topic_subscription(app_name.clone(), topic);
Ok(true)
}
Expand All @@ -115,24 +121,26 @@ pub(crate) fn hermes_ipfs_subscribe(
pub(crate) fn hermes_ipfs_publish(
_app_name: &ApplicationName, topic: &PubsubTopic, message: MessageData,
) -> Result<MessageId, Errno> {
HERMES_IPFS
.pubsub_publish(topic.to_string(), message)
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
ipfs.pubsub_publish(topic.to_string(), message)
.map(|m| m.0 .0)
}

/// Evict Peer from node
pub(crate) fn hermes_ipfs_evict_peer(
app_name: &ApplicationName, peer: PeerId,
) -> Result<bool, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
tracing::debug!(app_name = %app_name, peer_id = %peer, "evicting peer");
let status = HERMES_IPFS.peer_evict(&peer.to_string())?;
let status = ipfs.peer_evict(&peer.to_string())?;
tracing::debug!(app_name = %app_name, peer_id = %peer, "evicted peer");
HERMES_IPFS.apps.evicted_peer(app_name.clone(), peer);
ipfs.apps.evicted_peer(app_name.clone(), peer);
Ok(status)
}

#[allow(dead_code)]
/// List pinned files
pub(crate) fn hermes_ipfs_ls(app_name: &ApplicationName) -> Vec<String> {
HERMES_IPFS.apps.list_pinned_files(app_name)
pub(crate) fn hermes_ipfs_ls(app_name: &ApplicationName) -> Result<Vec<String>, Errno> {
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
Ok(ipfs.apps.list_pinned_files(app_name))
}
51 changes: 27 additions & 24 deletions hermes/bin/src/ipfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod api;
mod task;

use std::{collections::HashSet, str::FromStr};
use std::{collections::HashSet, path::Path, str::FromStr};

pub(crate) use api::{
hermes_ipfs_add_file, hermes_ipfs_content_validate, hermes_ipfs_evict_peer,
Expand All @@ -14,7 +14,7 @@ use hermes_ipfs::{
AddIpfsFile, Cid, HermesIpfs, IpfsBuilder, IpfsPath as BaseIpfsPath,
MessageId as PubsubMessageId,
};
use once_cell::sync::Lazy;
use once_cell::sync::OnceCell;
use task::{ipfs_command_handler, IpfsCommand};
use tokio::{
runtime::Builder,
Expand All @@ -40,24 +40,23 @@ use crate::{
///
/// The IPFS Node is initialized in a separate thread and the sender channel is stored in
/// the `HermesIpfsNode`.
pub(crate) static HERMES_IPFS: Lazy<HermesIpfsNode> = Lazy::new(|| {
let hermes_home_dir = crate::cli::Cli::hermes_home().unwrap_or_else(|err| {
tracing::error!("Failed to get Hermes home directory: {}", err);
".hermes".into()
});
let ipfs_data_path = hermes_home_dir.as_path().join("ipfs");
HermesIpfsNode::bootstrap(
pub(crate) static HERMES_IPFS: OnceCell<HermesIpfsNode> = OnceCell::new();

/// Bootstrap `HERMES_IPFS` node.
pub(crate) fn bootstrap_ipfs(base_dir: &Path) -> anyhow::Result<()> {
let ipfs_data_path = base_dir.join("ipfs");
let ipfs_node = HermesIpfsNode::init(
IpfsBuilder::new()
.with_default()
.set_default_listener()
.disable_tls()
.set_disk_storage(ipfs_data_path.clone()),
)
.unwrap_or_else(|err| {
tracing::error!("Failed to bootstrap IPFS node: {}", err);
HermesIpfsNode::default()
})
});
)?;
HERMES_IPFS
.set(ipfs_node)
.map_err(|_| anyhow::anyhow!("failed to start IPFS node"))?;
Ok(())
}

/// Hermes IPFS Internal Node
pub(crate) struct HermesIpfsNode {
Expand All @@ -68,24 +67,28 @@ pub(crate) struct HermesIpfsNode {
}

impl HermesIpfsNode {
/// Create and bootstrap a new `HermesIpfsNode`
pub(crate) fn bootstrap(builder: IpfsBuilder) -> anyhow::Result<Self> {
/// Create, initialize, and bootstrap a new `HermesIpfsNode`
pub(crate) fn init(builder: IpfsBuilder) -> anyhow::Result<Self> {
tracing::info!("{} Bootstrapping IPFS node", console::Emoji::new("🖧", ""),);
let runtime = Builder::new_current_thread().enable_all().build()?;
let (sender, receiver) = mpsc::channel(1);
let _handle = std::thread::spawn(move || {
drop(runtime.block_on(async move {
// Build and start IPFS node
// Build and start IPFS node
let _unused = runtime.block_on(async move {
let node = builder.start().await?;
// Bootstrap the node with default addresses
let addrs = node.default_bootstrap().await?;
// Add default addresses for bootstrapping
let addresses = node.default_bootstrap().await?;
// Connect to bootstrap nodes.
node.bootstrap().await?;
tracing::debug!("Bootstrapped IPFS node with default addresses: {:?}", addrs);
tracing::debug!(
"Bootstrapped IPFS node with default addresses: {:?}",
addresses
);
let hermes_node: HermesIpfs = node.into();
let h = tokio::spawn(ipfs_command_handler(hermes_node, receiver));
drop(tokio::join!(h));
let (..) = tokio::join!(h);
Ok::<(), anyhow::Error>(())
}));
});
std::process::exit(0);
});
Ok(Self {
Expand Down
36 changes: 20 additions & 16 deletions hermes/bin/src/ipfs/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,26 @@ pub(crate) async fn ipfs_command_handler(

/// Handler function for topic message streams.
fn topic_stream_app_handler(msg: hermes_ipfs::rust_ipfs::libp2p::gossipsub::Message) {
let msg_topic = msg.topic.into_string();
let on_topic_event = OnTopicEvent {
message: PubsubMessage {
topic: msg_topic.clone(),
message: msg.data,
publisher: msg.source.map(|p| p.to_string()),
},
};
let app_names = HERMES_IPFS.apps.subscribed_apps(&msg_topic);
// Dispatch Hermes Event
if let Err(err) = send(HermesEvent::new(
on_topic_event.clone(),
crate::event::TargetApp::List(app_names),
crate::event::TargetModule::All,
)) {
tracing::error!(on_topic_event = ?on_topic_event, "failed to send on_topic_event {err:?}");
if let Some(ipfs) = HERMES_IPFS.get() {
let msg_topic = msg.topic.into_string();
let on_topic_event = OnTopicEvent {
message: PubsubMessage {
topic: msg_topic.clone(),
message: msg.data,
publisher: msg.source.map(|p| p.to_string()),
},
};
let app_names = ipfs.apps.subscribed_apps(&msg_topic);
// Dispatch Hermes Event
if let Err(err) = send(HermesEvent::new(
on_topic_event.clone(),
crate::event::TargetApp::List(app_names),
crate::event::TargetModule::All,
)) {
tracing::error!(on_topic_event = ?on_topic_event, "failed to send on_topic_event {err:?}");
}
} else {
tracing::error!("failed to send on_topic_event. IPFS is uninitialized");
}
}

Expand Down
9 changes: 6 additions & 3 deletions hermes/bin/src/vfs/ipfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ use hermes_ipfs::Cid;

use crate::{ipfs::HERMES_IPFS, runtime_extensions::bindings::hermes::ipfs::api::Errno};

#[allow(dead_code)]
/// IPFS virtual file.
#[allow(dead_code)]
struct IpfsVirtualFile(Cid);

impl Read for IpfsVirtualFile {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let ipfs = HERMES_IPFS.get().ok_or_else(|| {
tracing::error!("IPFS service is uninitialized");
Error::from(ErrorKind::Other)
})?;
// Read data from IPFS and store it in `buf`.
let mut slice = &mut buf[..];
slice.write_all(
HERMES_IPFS
.file_get(&self.0.into())
ipfs.file_get(&self.0.into())
.map_err(|e| {
if e == Errno::InvalidCid {
Error::from(ErrorKind::NotFound)
Expand Down
2 changes: 2 additions & 0 deletions wasm/wasi/wit/deps/hermes-ipfs/api.wit
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ interface api {
pubsub-publish-error,
/// Unable to subscribe to IPFS topic.
pubsub-subscribe-error,
/// IPFS service is unavailable.
service-unavailable,
}

/// Puts a DHT key-value into IPFS.
Expand Down

0 comments on commit da3e140

Please sign in to comment.