Skip to content

Commit

Permalink
Implement progressive call results
Browse files Browse the repository at this point in the history
  • Loading branch information
muzzammilshahid committed Sep 5, 2024
1 parent 9db87f9 commit 765719e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
32 changes: 28 additions & 4 deletions lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Session {
}

final Map<int, Completer<Result>> _callRequests = {};
final Map<int, Function(Result result)> _progressHandlerByRequestID = {};
final Map<int, RegisterRequest> _registerRequests = {};
final Map<int, Result Function(Invocation)> _registrations = {};
final Map<int, UnregisterRequest> _unregisterRequests = {};
Expand All @@ -49,9 +50,18 @@ class Session {

void _processIncomingMessage(msg.Message message) {
if (message is msg.Result) {
var request = _callRequests.remove(message.requestID);
if (request != null) {
request.complete(Result(args: message.args, kwargs: message.kwargs, details: message.details));
var progress = message.details["progress"] ?? false;
if (progress) {
var progressHandler = _progressHandlerByRequestID[message.requestID];
if (progressHandler != null) {
progressHandler(Result(args: message.args, kwargs: message.kwargs));
}
} else {
var request = _callRequests.remove(message.requestID);
if (request != null) {
request.complete(Result(args: message.args, kwargs: message.kwargs, details: message.details));
}
_progressHandlerByRequestID.remove(message.requestID);
}
} else if (message is msg.Registered) {
var request = _registerRequests.remove(message.requestID);
Expand All @@ -62,9 +72,17 @@ class Session {
} else if (message is msg.Invocation) {
var endpoint = _registrations[message.registrationID];
if (endpoint != null) {
var invocation = Invocation(args: message.args, kwargs: message.kwargs, details: message.details);
if (message.details["receive_progress"] ?? false) {
invocation.sendProgress = (args, kwargs) {
var yield = msg.Yield(message.requestID, args: args, kwargs: kwargs, options: {"progress": true});
var data = _wampSession.sendMessage(yield);
_baseSession.send(data);
};
}
msg.Message msgToSend;
try {
var result = endpoint(Invocation(args: message.args, kwargs: message.kwargs, details: message.details));
var result = endpoint(invocation);
msgToSend = msg.Yield(message.requestID, args: result.args, kwargs: result.kwargs, options: result.details);
} on ApplicationError catch (e) {
msgToSend = msg.Error(message.messageType(), message.requestID, e.message, args: e.args, kwargs: e.kwargs);
Expand Down Expand Up @@ -163,9 +181,15 @@ class Session {
List<dynamic>? args,
Map<String, dynamic>? kwargs,
Map<String, dynamic>? options,
Function(Result result)? progressHandler,
}) {
var call = msg.Call(_nextID, procedure, args: args, kwargs: kwargs, options: options);

if (progressHandler != null) {
call.options["receive_progress"] = true;
_progressHandlerByRequestID[call.requestID] = progressHandler;
}

var completer = Completer<Result>();
_callRequests[call.requestID] = completer;

Expand Down
2 changes: 2 additions & 0 deletions lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class Invocation {
final List<dynamic> args;
final Map<String, dynamic> kwargs;
final Map<String, dynamic> details;

late Function(List<dynamic>? args, Map<String, dynamic>? kwargs) sendProgress;
}

class UnregisterRequest {
Expand Down
3 changes: 2 additions & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ dependencies:
args: ^2.5.0
wampproto:
# ignore: invalid_dependency
git: https://github.com/xconnio/wampproto-dart/
path: /home/muzzammil/scm/xconnio/wampproto-dart
# git: https://github.com/xconnio/wampproto-dart/
web_socket_channel: ^3.0.0
yaml: 3.1.2

Expand Down

0 comments on commit 765719e

Please sign in to comment.