Skip to content

Commit

Permalink
Merge pull request #61 from permaweb/feat/converge-http
Browse files Browse the repository at this point in the history
feat: Converge via HTTP interface
  • Loading branch information
samcamwilliams authored Jan 4, 2025
2 parents 89f24cb + f8484d3 commit e988a29
Show file tree
Hide file tree
Showing 10 changed files with 490 additions and 141 deletions.
10 changes: 5 additions & 5 deletions src/dev_dedup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ handle(keys, M1, _M2, _Opts) ->
handle(set, M1, M2, Opts) ->
dev_message:set(M1, M2, Opts);
handle(Key, M1, M2, Opts) ->
?event(debug, {dedup_handle, {key, Key}, {msg1, M1}, {msg2, M2}}),
?event({dedup_handle, {key, Key}, {msg1, M1}, {msg2, M2}}),
case hb_converge:get(<<"Pass">>, {as, dev_message, M1}, 1, Opts) of
1 ->
Msg2ID = hb_converge:get(<<"id">>, M2, Opts),
Dedup = hb_converge:get(<<"Dedup">>, {as, dev_message, M1}, [], Opts),
?event(debug, {dedup_checking, {existing, Dedup}}),
?event({dedup_checking, {existing, Dedup}}),
case lists:member(Msg2ID, Dedup) of
true ->
?event(debug, {already_seen, Msg2ID}),
?event({already_seen, Msg2ID}),
{skip, M1};
false ->
?event(debug, {not_seen, Msg2ID}),
?event({not_seen, Msg2ID}),
M3 = hb_converge:set(
M1,
#{ <<"Dedup">> => [Msg2ID|Dedup] }
Expand All @@ -40,7 +40,7 @@ handle(Key, M1, M2, Opts) ->
{ok, M3}
end;
Pass ->
?event(debug, {multipass_detected, skipping_dedup, {pass, Pass}}),
?event({multipass_detected, skipping_dedup, {pass, Pass}}),
{ok, M1}
end.

Expand Down
13 changes: 11 additions & 2 deletions src/dev_scheduler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
%%% Converge API functions:
-export([info/0]).
%%% Local scheduling functions:
-export([schedule/3]).
-export([schedule/3, append/3]).
%%% CU-flow functions:
-export([slot/3, status/3, next/3]).
-export([start/0, init/3, end_of_schedule/3, checkpoint/1]).
%%% Test helper exports:
-export([test_process/0]).
-include("include/hb.hrl").
-include_lib("eunit/include/eunit.hrl").

Expand All @@ -44,6 +46,7 @@ info() ->
status,
next,
schedule,
append,
slot,
init,
end_of_schedule,
Expand Down Expand Up @@ -150,6 +153,11 @@ schedule(Msg1, Msg2, Opts) ->
<<"GET">> -> get_schedule(Msg1, Msg2, Opts)
end.

%% @doc Alternate access path for scheduling a message, for situations in which
%% the user cannot modify the `Method`.
append(Msg1, Msg2, Opts) ->
post_schedule(Msg1, Msg2, Opts).

%% @doc Schedules a new message on the SU.
post_schedule(Msg1, Msg2, Opts) ->
?event(scheduling_message),
Expand All @@ -159,7 +167,8 @@ post_schedule(Msg1, Msg2, Opts) ->
?no_prod("Once we have GQL, get the scheduler location record. "
"For now, we'll just use the address of the wallet."),
SchedulerLocation =
hb_converge:get(<<"Process/Scheduler-Location">>, Msg1, Opts#{ hashpath => ignore }),
hb_converge:get(<<"Process/Scheduler-Location">>,
Msg1, Opts#{ hashpath => ignore }),
ProcID = hb_converge:get(id, Proc),
PID = dev_scheduler_registry:find(ProcID, true),
#{ wallet := Wallet } = dev_scheduler_server:info(PID),
Expand Down
39 changes: 27 additions & 12 deletions src/hb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
%%% 1. The `hb' and `hb_opts' modules manage the node's configuration,
%%% environment variables, and debugging tools.
%%%
%%% 2. The `hb_http' and `hb_http_router' modules manage all HTTP-related
%%% functionality. `hb_http_router' handles turning received HTTP requests
%%% 2. The `hb_http' and `hb_http_server' modules manage all HTTP-related
%%% functionality. `hb_http_server' handles turning received HTTP requests
%%% into messages and applying those messages with the appropriate devices.
%%% `hb_http' handles making requests and responding with messages. `cowboy'
%%% is used to implement the underlying HTTP server.
Expand Down Expand Up @@ -203,28 +203,43 @@ debug_wait(T, Mod, Func, Line) ->

%% @doc Run a function as many times as possible in a given amount of time.
benchmark(Fun, TLen) ->
benchmark(Fun, TLen, 1).
benchmark(Fun, TLen, ReportEvery) ->
T0 = erlang:system_time(millisecond),
until(
fun() -> erlang:system_time(millisecond) - T0 > (TLen * 1000) end,
Fun,
ReportEvery,
0
).

until(Condition, Fun, ReportEvery, Count) ->
case Count rem ReportEvery of
0 -> ?event(benchmark, {iteration, Count});
_ -> ok
end,
%% @doc Run multiple instances of a function in parallel for a given amount of time.
benchmark(Fun, TLen, Procs) ->
Parent = self(),
StartWorker =
fun(_) ->
Ref = make_ref(),
link(spawn(fun() ->
Count = benchmark(Fun, TLen),
Parent ! {work_complete, Ref, Count}
end)),
Ref
end,
CollectRes =
fun(R) ->
receive
{work_complete, R, Count} ->
Count
end
end,
Refs = lists:map(StartWorker, lists:seq(1, Procs)),
lists:sum(lists:map(CollectRes, Refs)).

until(Condition, Fun, Count) ->
case Condition() of
false ->
case apply(Fun, hb_converge:truncate_args(Fun, [Count])) of
{count, AddToCount} ->
until(Condition, Fun, ReportEvery, Count + AddToCount);
until(Condition, Fun, Count + AddToCount);
_ ->
until(Condition, Fun, ReportEvery, Count + 1)
until(Condition, Fun, Count + 1)
end;
true -> Count
end.
2 changes: 1 addition & 1 deletion src/hb_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ start(_StartType, _StartArgs) ->
hb_sup:start_link(),
ok = dev_scheduler_registry:start(),
_TimestampServer = ar_timestamp:start(),
{ok, _} = hb_http_router:start().
{ok, _} = hb_http_server:start().

stop(_State) ->
ok.
2 changes: 1 addition & 1 deletion src/hb_converge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ resolve(Msg, Opts) ->
case Path of
[ Msg1ID | _Rest ] when ?IS_ID(Msg1ID) ->
?event({normalizing_single_message_message_path, Msg}),
{ok, Msg1} = hb_cache:read(Msg1ID, Opts),
{ok, Msg1} = hb_cache:read(<<"Messages/", Msg1ID/binary>>, Opts),
resolve(
Msg1,
hb_path:tl(Msg, Opts),
Expand Down
10 changes: 9 additions & 1 deletion src/hb_converge_test_vectors.erl
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,12 @@ list_transform_test(Opts) ->
?assertEqual(<<"B">>, hb_converge:get(2, Msg, Opts)),
?assertEqual(<<"C">>, hb_converge:get(3, Msg, Opts)),
?assertEqual(<<"D">>, hb_converge:get(4, Msg, Opts)),
?assertEqual(<<"E">>, hb_converge:get(5, Msg, Opts)).
?assertEqual(<<"E">>, hb_converge:get(5, Msg, Opts)).

singleton_resolve_test() ->
Msg1 = #{
% Should be parsed out and used alone as Msg2:
path => <<"Key1">>,
<<"Key1">> => <<"Value1">>
},
?assertEqual({ok, <<"Value1">>}, hb_converge:resolve(Msg1, #{})).
147 changes: 110 additions & 37 deletions src/hb_http.erl
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
%%% Hyperbeam's core HTTP request/reply functionality. The functions in this
%%% module generally take a message request from their caller and return a
%%% response in message form, as granted by the peer. This module is mostly
%%% used by hb_client, but can also be used by other modules that need to make
%%% HTTP requests.
-module(hb_http).
-export([start/0]).
-export([get/1, get/2, get_binary/1]).
-export([post/2, post/3, post_binary/2]).
-export([reply/2, reply/3]).
-export([message_to_status/1, req_to_message/1]).
-export([message_to_status/1, req_to_message/2]).
-include("include/hb.hrl").
-hb_debug(print).

%%% Hyperbeam's core HTTP request/reply functionality. The functions in this
%%% module generally take a message in request and return a response in message
%%% form. This module is mostly used by hb_client, but can also be used by other
%%% modules that need to make HTTP requests.
-include_lib("eunit/include/eunit.hrl").

start() ->
httpc:set_options([{max_keep_alive_length, 0}]).
httpc:set_options([{max_keep_alive_length, 0}]),
ok.

%% @doc Gets a URL via HTTP and returns the resulting message in deserialized
%% form.
get(Host, Path) -> ?MODULE:get(Host ++ Path).
get(URL) ->
case get_binary(URL) of
{ok, Res} -> {ok, ar_bundles:deserialize(Res)};
{ok, Res} ->
{ok, hb_message:convert(ar_bundles:deserialize(Res), converge, tx, #{})};
Error -> Error
end.

Expand Down Expand Up @@ -50,10 +52,11 @@ post(URL, Message) when not is_binary(Message) ->
URL
}
),
post(URL, ar_bundles:serialize(ar_bundles:normalize(Message)));
post(URL, ar_bundles:serialize(hb_message:convert(Message, tx, #{})));
post(URL, Message) ->
case post_binary(URL, Message) of
{ok, Res} -> {ok, ar_bundles:deserialize(Res)};
{ok, Res} ->
{ok, hb_message:convert(ar_bundles:deserialize(Res), converge, tx, #{})};
Error -> Error
end.

Expand All @@ -63,7 +66,7 @@ post_binary(URL, Message) ->
case httpc:request(
post,
{iolist_to_binary(URL), [], "application/octet-stream", Message},
[],
[{timeout, 100}, {connect_timeout, 100}],
[{body_format, binary}]
) of
{ok, {{_, Status, _}, _, Body}} when Status == 200; Status == 201 ->
Expand All @@ -83,55 +86,125 @@ post_binary(URL, Message) ->
reply(Req, Message) ->
reply(Req, message_to_status(Message), Message).
reply(Req, Status, Message) ->
TX = hb_message:convert(Message, tx, converge, #{}),
?event(
{
replying,
Status,
maps:get(path, Req, undefined_path),
case is_record(Message, tx) of
true -> hb_util:id(Message);
false -> data_body
end
{replying,
{status, Status},
{path, maps:get(path, Req, undefined_path)},
{tx, TX}
}
),
Req2 = cowboy_req:reply(
Status,
#{<<"Content-Type">> => <<"application/octet-stream">>},
hb_message:serialize(Message),
ar_bundles:serialize(TX),
Req
),
{ok, Req2, no_state}.

%% @doc Get the HTTP status code from a transaction (if it exists).
message_to_status(Item) ->
case lists:keyfind(<<"Status">>, 1, Item#tx.tags) of
{_, RawStatus} ->
case dev_message:get(<<"Status">>, Item) of
{ok, RawStatus} ->
case is_integer(RawStatus) of
true -> RawStatus;
false -> binary_to_integer(RawStatus)
end;
false -> 200
_ -> 200
end.

%% @doc Convert a cowboy request to a normalized message.
req_to_message(Req) ->
Method = cowboy_req:method(Req),
Path = cowboy_req:path(Req),
req_to_message(Req, Opts) ->
{ok, Body} = read_body(Req),
QueryTags = cowboy_req:parse_qs(Req),
hb_converge:set(
#{
<<"Method">> => Method,
<<"Path">> => Path,
<<"Body">> => Body
},
maps:from_list(QueryTags)
).
hb_message:convert(ar_bundles:deserialize(Body), converge, tx, Opts).

%% @doc Helper to grab the full body of a HTTP request, even if it's chunked.
read_body(Req) -> read_body(Req, <<>>).
read_body(Req0, Acc) ->
case cowboy_req:read_body(Req0) of
{ok, Data, _Req} -> {ok, << Acc/binary, Data/binary >>};
{more, Data, Req} -> read_body(Req, << Acc/binary, Data/binary >>)
end.
end.

%%% Tests

simple_converge_resolve_test() ->
URL = hb_http_server:start_test_node(),
{ok, Res} =
post(
URL,
#{
path => <<"Key1">>,
<<"Key1">> =>
#{<<"Key2">> =>
#{
<<"Key3">> => <<"Value2">>
}
}
}
),
?assertEqual(<<"Value2">>, hb_converge:get(<<"Key2/Key3">>, Res, #{})).

wasm_compute_request(ImageFile, Func, Params) ->
{ok, Bin} = file:read_file(ImageFile),
#{
path => <<"Init/Compute/Results">>,
device => <<"WASM-64/1.0">>,
<<"WASM-Function">> => Func,
<<"WASM-Params">> => Params,
<<"Image">> => Bin
}.

run_wasm_unsigned_test() ->
URL = hb_http_server:start_test_node(#{force_signed => false}),
Msg = wasm_compute_request(<<"test/test-64.wasm">>, <<"fac">>, [10]),
{ok, Res} = post(URL, Msg),
?assertEqual(ok, hb_converge:get(<<"Type">>, Res, #{})).

run_wasm_signed_test() ->
URL = hb_http_server:start_test_node(#{force_signed => true}),
Msg = wasm_compute_request(<<"test/test-64.wasm">>, <<"fac">>, [10]),
{ok, Res} = post(URL, Msg),
?assertEqual(ok, hb_converge:get(<<"Type">>, Res, #{})).

% http_scheduling_test() ->
% % We need the rocksdb backend to run for hb_cache module to work
% application:ensure_all_started(hb),
% pg:start(pg),
% <<I1:32/unsigned-integer, I2:32/unsigned-integer, I3:32/unsigned-integer>>
% = crypto:strong_rand_bytes(12),
% rand:seed(exsplus, {I1, I2, I3}),
% URL = hb_http_server:start_test_node(#{force_signed => true}),
% Msg1 = dev_scheduler:test_process(),
% Proc = hb_converge:get(process, Msg1, #{ hashpath => ignore }),
% ProcID = hb_util:id(Proc),
% {ok, Res} =
% hb_converge:resolve(
% Msg1,
% #{
% path => <<"Append">>,
% <<"Method">> => <<"POST">>,
% <<"Message">> => Proc
% },
% #{}
% ),
% MsgX = #{
% device => <<"Scheduler/1.0">>,
% path => <<"Append">>,
% <<"Process">> => Proc,
% <<"Message">> =>
% #{
% <<"Target">> => ProcID,
% <<"Type">> => <<"Message">>,
% <<"Test-Val">> => 1
% }
% },
% Res = post(URL, MsgX),
% ?event(debug, {post_result, Res}),
% Msg3 = #{
% path => <<"Slot">>,
% <<"Method">> => <<"GET">>,
% <<"Process">> => ProcID
% },
% SlotRes = post(URL, Msg3),
% ?event(debug, {slot_result, SlotRes}).
Loading

0 comments on commit e988a29

Please sign in to comment.