Skip to content

Commit

Permalink
More flume purge
Browse files Browse the repository at this point in the history
put in a horrible unsafe hack to figure out if 2 senders are the same channel. To be replaced by a PR on async_channel.
  • Loading branch information
rklaehn committed Jul 23, 2024
1 parent c4d002a commit 115d425
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 58 deletions.
16 changes: 14 additions & 2 deletions iroh-blobs/src/util/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,19 @@ impl<T> Clone for FlumeProgressSender<T> {
}
}

fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
assert!(std::mem::size_of::<async_channel::Sender<T>>() == std::mem::size_of::<usize>());
fn get_arc_reference<T>(x: &async_channel::Sender<T>) -> &Arc<()> {
unsafe {
// Transmute the reference to MyNewType to a reference to Arc<()>
std::mem::transmute::<_, &Arc<()>>(x)
}
}
let a = get_arc_reference(a);
let b = get_arc_reference(b);
Arc::ptr_eq(a, b)
}

impl<T> FlumeProgressSender<T> {
/// Create a new progress sender from a flume sender.
pub fn new(sender: async_channel::Sender<T>) -> Self {
Expand All @@ -482,8 +495,7 @@ impl<T> FlumeProgressSender<T> {

/// Returns true if `other` sends on the same `flume` channel as `self`.
pub fn same_channel(&self, other: &FlumeProgressSender<T>) -> bool {
self.id.load(std::sync::atomic::Ordering::SeqCst)
== other.id.load(std::sync::atomic::Ordering::SeqCst)
same_channel(&self.sender, &other.sender)
}
}

Expand Down
51 changes: 24 additions & 27 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ enum Action {
#[display("ListAuthors")]
ListAuthors {
#[debug("reply")]
reply: flume::Sender<Result<AuthorId>>,
reply: async_channel::Sender<Result<AuthorId>>,
},
#[display("ListReplicas")]
ListReplicas {
#[debug("reply")]
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
},
#[display("ContentHashes")]
ContentHashes {
Expand Down Expand Up @@ -108,12 +108,12 @@ enum ReplicaAction {
reply: oneshot::Sender<Result<()>>,
},
Subscribe {
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
Unsubscribe {
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
Expand Down Expand Up @@ -166,7 +166,7 @@ enum ReplicaAction {
},
GetMany {
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
reply: async_channel::Sender<Result<SignedEntry>>,
},
DropReplica {
reply: oneshot::Sender<Result<()>>,
Expand Down Expand Up @@ -222,7 +222,7 @@ struct OpenReplica {
/// [`SyncHandle::drop`] will not block.
#[derive(Debug, Clone)]
pub struct SyncHandle {
tx: flume::Sender<Action>,
tx: async_channel::Sender<Action>,
join_handle: Arc<Option<JoinHandle<()>>>,
}

Expand All @@ -232,7 +232,7 @@ pub struct OpenOpts {
/// Set to true to set sync state to true.
pub sync: bool,
/// Optionally subscribe to replica events.
pub subscribe: Option<flume::Sender<Event>>,
pub subscribe: Option<async_channel::Sender<Event>>,
}
impl OpenOpts {
/// Set sync state to true.
Expand All @@ -241,7 +241,7 @@ impl OpenOpts {
self
}
/// Subscribe to replica events.
pub fn subscribe(mut self, subscribe: flume::Sender<Event>) -> Self {
pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
self.subscribe = Some(subscribe);
self
}
Expand All @@ -255,7 +255,7 @@ impl SyncHandle {
content_status_callback: Option<ContentStatusCallback>,
me: String,
) -> SyncHandle {
let (action_tx, action_rx) = flume::bounded(ACTION_CAP);
let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
let actor = Actor {
store,
states: Default::default(),
Expand Down Expand Up @@ -298,7 +298,7 @@ impl SyncHandle {
pub async fn subscribe(
&self,
namespace: NamespaceId,
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
) -> Result<()> {
let (reply, rx) = oneshot::channel();
self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
Expand All @@ -309,7 +309,7 @@ impl SyncHandle {
pub async fn unsubscribe(
&self,
namespace: NamespaceId,
sender: flume::Sender<Event>,
sender: async_channel::Sender<Event>,
) -> Result<()> {
let (reply, rx) = oneshot::channel();
self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
Expand Down Expand Up @@ -435,7 +435,7 @@ impl SyncHandle {
&self,
namespace: NamespaceId,
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
reply: async_channel::Sender<Result<SignedEntry>>,
) -> Result<()> {
let action = ReplicaAction::GetMany { query, reply };
self.send_replica(namespace, action).await?;
Expand Down Expand Up @@ -489,13 +489,13 @@ impl SyncHandle {
Ok(store)
}

pub async fn list_authors(&self, reply: flume::Sender<Result<AuthorId>>) -> Result<()> {
pub async fn list_authors(&self, reply: async_channel::Sender<Result<AuthorId>>) -> Result<()> {
self.send(Action::ListAuthors { reply }).await
}

pub async fn list_replicas(
&self,
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
) -> Result<()> {
self.send(Action::ListReplicas { reply }).await
}
Expand Down Expand Up @@ -566,7 +566,7 @@ impl SyncHandle {

async fn send(&self, action: Action) -> Result<()> {
self.tx
.send_async(action)
.send(action)
.await
.context("sending to iroh_docs actor failed")?;
Ok(())
Expand All @@ -581,7 +581,7 @@ impl Drop for SyncHandle {
fn drop(&mut self) {
// this means we're dropping the last reference
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
self.tx.send(Action::Shutdown { reply: None }).ok();
self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
let handle = handle.take().expect("this can only run once");
if let Err(err) = handle.join() {
warn!(?err, "Failed to join sync actor");
Expand All @@ -593,7 +593,7 @@ impl Drop for SyncHandle {
struct Actor {
store: Store,
states: OpenReplicas,
action_rx: flume::Receiver<Action>,
action_rx: async_channel::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
}
Expand All @@ -619,10 +619,10 @@ impl Actor {
}
continue;
}
action = self.action_rx.recv_async() => {
action = self.action_rx.recv() => {
match action {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
Err(async_channel::RecvError) => {
debug!("action channel disconnected");
break None;
}
Expand Down Expand Up @@ -979,17 +979,14 @@ impl OpenReplicas {
}

async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
channel: async_channel::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel
.send_async(Err(err))
.await
.map_err(send_reply_error)?,
Err(err) => channel.send(Err(err)).await.map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send_async(item).await.map_err(send_reply_error)?;
channel.send(item).await.map_err(send_reply_error)?;
}
}
}
Expand Down Expand Up @@ -1032,10 +1029,10 @@ mod tests {
let id = namespace.id();
sync.import_namespace(namespace.into()).await?;
sync.open(id, Default::default()).await?;
let (tx, rx) = flume::bounded(10);
let (tx, rx) = async_channel::bounded(10);
sync.subscribe(id, tx).await?;
sync.close(id).await?;
assert!(rx.recv_async().await.is_err());
assert!(rx.recv().await.is_err());
Ok(())
}
}
5 changes: 2 additions & 3 deletions iroh-docs/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,9 @@ impl Engine {

// Subscribe to insert events from the replica.
let a = {
let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP);
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
this.sync.subscribe(namespace, s).await?;
r.into_stream()
.map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
};

// Subscribe to events from the [`live::Actor`].
Expand Down
8 changes: 4 additions & 4 deletions iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ pub struct LiveActor<B: iroh_blobs::store::Store> {
gossip: Gossip,
bao_store: B,
downloader: Downloader,
replica_events_tx: flume::Sender<crate::Event>,
replica_events_rx: flume::Receiver<crate::Event>,
replica_events_tx: async_channel::Sender<crate::Event>,
replica_events_rx: async_channel::Receiver<crate::Event>,

/// Send messages to self.
/// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
Expand Down Expand Up @@ -192,7 +192,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
sync_actor_tx: mpsc::Sender<ToLiveActor>,
gossip_actor_tx: mpsc::Sender<ToGossipActor>,
) -> Self {
let (replica_events_tx, replica_events_rx) = flume::bounded(1024);
let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
Self {
inbox,
sync,
Expand Down Expand Up @@ -262,7 +262,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
}
}
event = self.replica_events_rx.recv_async() => {
event = self.replica_events_rx.recv() => {
trace!(?i, "tick: replica_event");
inc!(Metrics, doc_live_tick_replica_event);
let event = event.context("replica_events closed")?;
Expand Down
46 changes: 34 additions & 12 deletions iroh-docs/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,31 @@ pub struct SyncOutcome {
pub num_sent: usize,
}

fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
assert!(std::mem::size_of::<async_channel::Sender<T>>() == std::mem::size_of::<usize>());
fn get_arc_reference<T>(x: &async_channel::Sender<T>) -> &Arc<()> {
unsafe {
// Transmute the reference to MyNewType to a reference to Arc<()>
std::mem::transmute::<_, &Arc<()>>(x)
}
}
let a = get_arc_reference(a);
let b = get_arc_reference(b);
Arc::ptr_eq(a, b)
}

#[derive(Debug, Default)]
struct Subscribers(Vec<flume::Sender<Event>>);
struct Subscribers(Vec<async_channel::Sender<Event>>);
impl Subscribers {
pub fn subscribe(&mut self, sender: flume::Sender<Event>) {
pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
self.0.push(sender)
}
pub fn unsubscribe(&mut self, sender: &flume::Sender<Event>) {
self.0.retain(|s| !s.same_channel(sender));
pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
self.0.retain(|s| !same_channel(s, &sender));
}
pub fn send(&mut self, event: Event) {
self.0.retain(|sender| sender.send(event.clone()).is_ok())
self.0
.retain(|sender| sender.send_blocking(event.clone()).is_ok())
}
pub fn len(&self) -> usize {
self.0.len()
Expand Down Expand Up @@ -263,10 +277,10 @@ impl ReplicaInfo {

/// Subscribe to insert events.
///
/// When subscribing to a replica, you must ensure that the corresponding [`flume::Receiver`] is
/// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is
/// received from in a loop. If not receiving, local and remote inserts will hang waiting for
/// the receiver to be received from.
pub fn subscribe(&mut self, sender: flume::Sender<Event>) {
pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
self.subscribers.subscribe(sender)
}

Expand All @@ -275,7 +289,7 @@ impl ReplicaInfo {
/// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
/// multiple replicas, you can use this method to explicitly unsubscribe the sender from
/// this replica without having to drop the receiver.
pub fn unsubscribe(&mut self, sender: &flume::Sender<Event>) {
pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
self.subscribers.unsubscribe(sender)
}

Expand Down Expand Up @@ -2156,6 +2170,14 @@ mod tests {
Ok(())
}

fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
let mut res = vec![];
while let Ok(ev) = events.try_recv() {
res.push(ev);
}
res
}

/// This tests that no events are emitted for entries received during sync which are obsolete
/// (too old) by the time they are actually inserted in the store.
#[test]
Expand All @@ -2173,8 +2195,8 @@ mod tests {
let mut replica1 = store1.new_replica(namespace.clone())?;
let mut replica2 = store2.new_replica(namespace.clone())?;

let (events1_sender, events1) = flume::bounded(32);
let (events2_sender, events2) = flume::bounded(32);
let (events1_sender, events1) = async_channel::bounded(32);
let (events2_sender, events2) = async_channel::bounded(32);

replica1.info.subscribe(events1_sender);
replica2.info.subscribe(events2_sender);
Expand All @@ -2198,8 +2220,8 @@ mod tests {
.sync_process_message(from1, peer1, &mut state2)
.unwrap();
assert!(from2.is_none());
let events1 = events1.drain().collect::<Vec<_>>();
let events2 = events2.drain().collect::<Vec<_>>();
let events1 = drain(events1);
let events2 = drain(events2);
assert_eq!(events1.len(), 1);
assert_eq!(events2.len(), 1);
assert!(matches!(events1[0], Event::LocalInsert { .. }));
Expand Down
Loading

0 comments on commit 115d425

Please sign in to comment.