Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly update branch version #61

Merged
merged 2 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/buffer/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub(crate) struct BufferControllerInner {
pub(crate) ops_in: mpsc::UnboundedSender<TextChange>,
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<Option<BufferUpdate>>)>,
pub(crate) delta_request: mpsc::Sender<oneshot::Sender<Option<BufferUpdate>>>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
pub(crate) ack_tx: mpsc::UnboundedSender<LocalVersion>,
}
Expand Down Expand Up @@ -93,7 +93,7 @@ impl AsyncReceiver<BufferUpdate> for BufferController {
}

let (tx, rx) = oneshot::channel();
self.0.delta_request.send((last_update, tx)).await?;
self.0.delta_request.send(tx).await?;
Ok(rx.await?)
}

Expand Down
42 changes: 20 additions & 22 deletions src/buffer/worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use diamond_types::list::encoding::EncodeOptions;
use diamond_types::list::{Branch, OpLog};
use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch};
Expand All @@ -25,7 +26,7 @@ struct BufferWorker {
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>,
content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<Option<BufferUpdate>>)>,
delta_req: mpsc::Receiver<oneshot::Sender<Option<BufferUpdate>>>,
controller: std::sync::Weak<BufferControllerInner>,
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
oplog: OpLog,
Expand Down Expand Up @@ -82,7 +83,7 @@ impl BufferController {
content_checkout: req_rx,
delta_req: recv_rx,
callback: cb_rx,
oplog: OpLog::new(),
oplog,
branch: Branch::new(),
timer: Timer::new(10), // TODO configurable!
};
Expand All @@ -107,20 +108,22 @@ impl BufferController {
tokio::select! {
biased;

// received a new poller, add it to collection
res = worker.poller.recv() => match res {
None => break tracing::error!("poller channel closed"),
Some(tx) => worker.pollers.push(tx),
},

// received new change ack, merge editor branch up to that version
res = worker.ack_rx.recv() => match res {
None => break tracing::error!("ack channel closed"),
Some(v) => {
worker.branch.merge(&worker.oplog, &v)
worker.branch.merge(&worker.oplog, &v);
worker.local_version.send(worker.branch.local_version())
.unwrap_or_warn("could not ack local version");
},
},

// received a new poller, add it to collection
res = worker.poller.recv() => match res {
None => break tracing::error!("poller channel closed"),
Some(tx) => worker.pollers.push(tx),
},

// received a text change from editor
res = worker.ops_in.recv() => match res {
None => break tracing::debug!("stopping: editor closed channel"),
Expand All @@ -137,16 +140,19 @@ impl BufferController {
// controller is ready to apply change and recv(), calculate it and send it back
res = worker.delta_req.recv() => match res {
None => break tracing::error!("no more active controllers: can't send changes"),
Some((last_ver, tx)) => worker.handle_delta_request(last_ver, tx).await,
Some(tx) => worker.handle_delta_request(tx).await,
},

// received a request for full CRDT content
res = worker.content_checkout.recv() => match res {
None => break tracing::error!("no more active controllers: can't update content"),
Some(tx) => {
worker.branch.merge(&worker.oplog, worker.oplog.local_version_ref());
worker.local_version.send(worker.branch.local_version())
.unwrap_or_warn("could not checkout local version");
let content = worker.branch.content().to_string();
tx.send(content).unwrap_or_warn("checkout request dropped");
tx.send(content)
.unwrap_or_warn("checkout request dropped");
},
}
}
Expand Down Expand Up @@ -184,7 +190,7 @@ impl BufferWorker {

if change.is_delete() || change.is_insert() {
tx.send(Operation {
data: self.oplog.encode_from(Default::default(), &last_ver),
data: self.oplog.encode_from(EncodeOptions::default(), &last_ver),
})
.await
.unwrap_or_warn("failed to send change!");
Expand Down Expand Up @@ -221,11 +227,8 @@ impl BufferWorker {
}
}

async fn handle_delta_request(
&mut self,
last_ver: LocalVersion,
tx: oneshot::Sender<Option<BufferUpdate>>,
) {
async fn handle_delta_request(&mut self, tx: oneshot::Sender<Option<BufferUpdate>>) {
let last_ver = self.branch.local_version();
if let Some((lv, Some(dtop))) = self
.oplog
.iter_xf_operations_from(&last_ver, self.oplog.local_version_ref())
Expand All @@ -235,8 +238,6 @@ impl BufferWorker {
// this step_ver will be the version after we apply the operation
// we give it to the controller so that he knows where it's at.
let step_ver = self.oplog.version_union(&[lv.end - 1], &last_ver);
self.branch.merge(&self.oplog, &step_ver);
let new_local_v = self.branch.local_version();

let hash = if self.timer.step() {
Some(crate::ext::hash(self.branch.content().to_string()))
Expand Down Expand Up @@ -282,9 +283,6 @@ impl BufferWorker {
},
},
};
self.local_version
.send(new_local_v)
.unwrap_or_warn("could not update local version");
tx.send(Some(tc))
.unwrap_or_warn("could not update ops channel -- is controller dead?");
} else {
Expand Down