Skip to content

Commit

Permalink
APPLIES TO MASTER
Browse files Browse the repository at this point in the history
- docs(MAINTAIN.md): mention libmpack-lua
- tests: get stdout from clear() XXX
- client:
  - rename over-verbose things
  • Loading branch information
justinmk committed Dec 20, 2024
1 parent b97ffe6 commit 0b94290
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 57 deletions.
2 changes: 2 additions & 0 deletions MAINTAIN.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ These dependencies are "vendored" (inlined), we must update the sources manually

* `src/mpack/`: [libmpack](https://github.com/libmpack/libmpack)
* send improvements upstream!
* `src/mpack/lmpack.c`: [libmpack-lua](https://github.com/libmpack/libmpack-lua)
* send improvements upstream!
* `src/xdiff/`: [xdiff](https://github.com/git/git/tree/master/xdiff)
* `src/cjson/`: [lua-cjson](https://github.com/openresty/lua-cjson)
* `src/klib/`: [Klib](https://github.com/attractivechaos/klib)
Expand Down
4 changes: 4 additions & 0 deletions runtime/lua/coxpcall.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
-------------------------------------------------------------------------------
-- (Not needed for LuaJIT or Lua 5.2+)
--
-- Coroutine safe xpcall and pcall versions
--
-- https://keplerproject.github.io/coxpcall/
--
-- Encapsulates the protected calls with a coroutine based loop, so errors can
-- be dealed without the usual Lua 5.x pcall/xpcall issues with coroutines
-- yielding inside the call to pcall or xpcall.
Expand Down
49 changes: 28 additions & 21 deletions test/client/msgpack_rpc_stream.lua → test/client/rpc_stream.lua
Original file line number Diff line number Diff line change
@@ -1,34 +1,41 @@
---
--- Reading/writing of msgpack over any of the stream types from `uv_stream.lua`.
--- Does not implement the RPC protocol, see `session.lua` for that.
---

local mpack = vim.mpack

local Response = {}
Response.__index = Response

function Response.new(msgpack_rpc_stream, request_id)
function Response.new(rpc_stream, request_id)
return setmetatable({
_msgpack_rpc_stream = msgpack_rpc_stream,
_rpc_stream = rpc_stream,
_request_id = request_id,
}, Response)
end

function Response:send(value, is_error)
local data = self._msgpack_rpc_stream._session:reply(self._request_id)
local data = self._rpc_stream._session:reply(self._request_id)
if is_error then
data = data .. self._msgpack_rpc_stream._pack(value)
data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
data = data .. self._rpc_stream._pack(value)
data = data .. self._rpc_stream._pack(mpack.NIL)
else
data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
data = data .. self._msgpack_rpc_stream._pack(value)
data = data .. self._rpc_stream._pack(mpack.NIL)
data = data .. self._rpc_stream._pack(value)
end
self._msgpack_rpc_stream._stream:write(data)
self._rpc_stream._stream:write(data)
end

--- @class test.MsgpackRpcStream
--- Nvim msgpack RPC stream.
---
--- @class test.RpcStream
--- @field private _stream test.Stream
--- @field private __pack table
local MsgpackRpcStream = {}
MsgpackRpcStream.__index = MsgpackRpcStream
local RpcStream = {}
RpcStream.__index = RpcStream

function MsgpackRpcStream.new(stream)
function RpcStream.new(stream)
return setmetatable({
_stream = stream,
_pack = mpack.Packer(),
Expand All @@ -50,10 +57,10 @@ function MsgpackRpcStream.new(stream)
},
}),
}),
}, MsgpackRpcStream)
}, RpcStream)
end

function MsgpackRpcStream:write(method, args, response_cb)
function RpcStream:write(method, args, response_cb)
local data
if response_cb then
assert(type(response_cb) == 'function')
Expand All @@ -66,10 +73,10 @@ function MsgpackRpcStream:write(method, args, response_cb)
self._stream:write(data)
end

function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
function RpcStream:read_start(on_request, on_notification, on_eof)
self._stream:read_start(function(data)
if not data then
return eof_cb()
return on_eof()
end
local type, id_or_cb, method_or_error, args_or_result
local pos = 1
Expand All @@ -78,9 +85,9 @@ function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
type, id_or_cb, method_or_error, args_or_result, pos = self._session:receive(data, pos)
if type == 'request' or type == 'notification' then
if type == 'request' then
request_cb(method_or_error, args_or_result, Response.new(self, id_or_cb))
on_request(method_or_error, args_or_result, Response.new(self, id_or_cb))
else
notification_cb(method_or_error, args_or_result)
on_notification(method_or_error, args_or_result)
end
elseif type == 'response' then
if method_or_error == mpack.NIL then
Expand All @@ -94,12 +101,12 @@ function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
end)
end

function MsgpackRpcStream:read_stop()
function RpcStream:read_stop()
self._stream:read_stop()
end

function MsgpackRpcStream:close(signal)
function RpcStream:close(signal)
self._stream:close(signal)
end

return MsgpackRpcStream
return RpcStream
52 changes: 40 additions & 12 deletions test/client/session.lua
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
---
--- Nvim msgpack RPC protocol session. Manages requests/notifications/responses.
---

local uv = vim.uv
local MsgpackRpcStream = require('test.client.msgpack_rpc_stream')
local RpcStream = require('test.client.rpc_stream')

--- Nvim msgpack RPC protocol session. Manages requests/notifications/responses.
---
--- @class test.Session
--- @field private _pending_messages string[]
--- @field private _msgpack_rpc_stream test.MsgpackRpcStream
--- @field private _pending_messages string[] Requests/notifications received from the remote end.
--- @field private _rpc_stream test.RpcStream
--- @field private _prepare uv.uv_prepare_t
--- @field private _timer uv.uv_timer_t
--- @field private _is_running boolean
--- @field exec_lua_setup boolean
--- @field private _is_running boolean true during `Session:run()` scope.
--- @field private _stdout_buffer string[] Stores stdout chunks
--- @field public stdout string Full stdout after the process exits
local Session = {}
Session.__index = Session
if package.loaded['jit'] then
Expand Down Expand Up @@ -51,9 +59,10 @@ local function coroutine_exec(func, ...)
end))
end

--- Creates a new msgpack RPC session.
function Session.new(stream)
return setmetatable({
_msgpack_rpc_stream = MsgpackRpcStream.new(stream),
_rpc_stream = RpcStream.new(stream),
_pending_messages = {},
_prepare = uv.new_prepare(),
_timer = uv.new_timer(),
Expand Down Expand Up @@ -91,10 +100,13 @@ function Session:next_message(timeout)
return table.remove(self._pending_messages, 1)
end

--- Sends a notification to the RPC endpoint.
function Session:notify(method, ...)
self._msgpack_rpc_stream:write(method, { ... })
self._rpc_stream:write(method, { ... })
end

--- Sends a request to the RPC endpoint.
---
--- @param method string
--- @param ... any
--- @return boolean, table
Expand All @@ -114,8 +126,16 @@ function Session:request(method, ...)
return true, result
end

--- Runs the event loop.
--- Processes incoming RPC requests/notifications until exhausted.
---
--- TODO(justinmk): luaclient2 avoids this via uvutil.cb_wait() + uvutil.add_idle_call()?
---
--- @param request_cb function Handles requests from the sever to the local end.
--- @param notification_cb function Handles notifications from the sever to the local end.
--- @param setup_cb function
--- @param timeout number
function Session:run(request_cb, notification_cb, setup_cb, timeout)
--- Handles an incoming request.
local function on_request(method, args, response)
coroutine_exec(request_cb, method, args, function(status, result, flag)
if status then
Expand All @@ -126,6 +146,7 @@ function Session:run(request_cb, notification_cb, setup_cb, timeout)
end)
end

--- Handles an incoming notification.
local function on_notification(method, args)
coroutine_exec(notification_cb, method, args)
end
Expand Down Expand Up @@ -160,39 +181,45 @@ function Session:close(signal)
if not self._prepare:is_closing() then
self._prepare:close()
end
self._msgpack_rpc_stream:close(signal)
self._rpc_stream:close(signal)
self.closed = true
end

--- Sends a request to the RPC endpoint, without blocking (schedules a coroutine).
function Session:_yielding_request(method, args)
return coroutine.yield(function(co)
self._msgpack_rpc_stream:write(method, args, function(err, result)
self._rpc_stream:write(method, args, function(err, result)
resume(co, err, result)
end)
end)
end

--- Sends a request to the RPC endpoint, and blocks (polls event loop) until a response is received.
function Session:_blocking_request(method, args)
local err, result

-- Invoked when a request is received from the remote end.
local function on_request(method_, args_, response)
table.insert(self._pending_messages, { 'request', method_, args_, response })
end

-- Invoked when a notification is received from the remote end.
local function on_notification(method_, args_)
table.insert(self._pending_messages, { 'notification', method_, args_ })
end

self._msgpack_rpc_stream:write(method, args, function(e, r)
self._rpc_stream:write(method, args, function(e, r)
err = e
result = r
uv.stop()
end)

-- Poll for incoming requests/notifications received from the remote end.
self:_run(on_request, on_notification)
return (err or self.eof_err), result
end

--- Polls for incoming requests/notifications received from the remote end.
function Session:_run(request_cb, notification_cb, timeout)
if type(timeout) == 'number' then
self._prepare:start(function()
Expand All @@ -202,14 +229,15 @@ function Session:_run(request_cb, notification_cb, timeout)
self._prepare:stop()
end)
end
self._msgpack_rpc_stream:read_start(request_cb, notification_cb, function()
self._rpc_stream:read_start(request_cb, notification_cb, function()
uv.stop()
self.eof_err = { 1, 'EOF was received from Nvim. Likely the Nvim process crashed.' }
end)
uv.run()
self._prepare:stop()
self._timer:stop()
self._msgpack_rpc_stream:read_stop()
self._rpc_stream:read_stop()
end

--- Nvim msgpack RPC session.
return Session
Loading

0 comments on commit 0b94290

Please sign in to comment.