Skip to content

Commit

Permalink
Merge pull request #78 from permaweb/fix/key-normalization
Browse files Browse the repository at this point in the history
feat: key normalization and hsig API
  • Loading branch information
samcamwilliams authored Jan 18, 2025
2 parents 58e65a6 + e1b93df commit d7d4a77
Show file tree
Hide file tree
Showing 48 changed files with 1,955 additions and 1,916 deletions.
21 changes: 1 addition & 20 deletions c_src/hb_beamr.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,6 @@ void send_error(Proc* proc, const char* message_fmt, ...) {

int msg_res = erl_drv_output_term(proc->port_term, msg, msg_index);
DRV_DEBUG("Sent error message. Res: %d", msg_res);
driver_free(message);
driver_free(msg);
va_end(args);
}

Expand Down Expand Up @@ -367,7 +365,6 @@ wasm_trap_t* generic_import_handler(void* env, const wasm_val_vec_t* args, wasm_
DRV_DEBUG("Sending %d terms...", msg_index);
// Send the message to the caller process
int msg_res = erl_drv_output_term(proc->port_term, msg, msg_index);
driver_free(msg);
// Wait for the response (we set this directly after the message was sent
// so we have the lock, before Erlang sends us data back)
drv_wait(proc->current_import->response_ready, proc->current_import->cond, &proc->current_import->ready);
Expand All @@ -380,8 +377,6 @@ wasm_trap_t* generic_import_handler(void* env, const wasm_val_vec_t* args, wasm_
wasm_name_t message;
wasm_name_new_from_string_nt(&message, proc->current_import->error_message);
wasm_trap_t* trap = wasm_trap_new(proc->store, &message);
// TODO: check where the error_message is allocated
driver_free(proc->current_import->error_message);
driver_free(proc->current_import);
proc->current_import = NULL;
return trap;
Expand All @@ -404,9 +399,6 @@ wasm_trap_t* generic_import_handler(void* env, const wasm_val_vec_t* args, wasm_
DRV_DEBUG("Cleaning up import response");
erl_drv_cond_destroy(proc->current_import->cond);
erl_drv_mutex_destroy(proc->current_import->response_ready);
if (proc->current_import->result_terms) {
driver_free(proc->current_import->result_terms);
}
driver_free(proc->current_import);

proc->current_import = NULL;
Expand Down Expand Up @@ -570,8 +562,6 @@ static void async_init(void* raw) {

int send_res = erl_drv_output_term(proc->port_term, init_msg, msg_i);
DRV_DEBUG("Send result: %d", send_res);
// TODO: init_msg is not freed up, but probably should live as long as the driver
// What happens during stop?

proc->current_import = NULL;
proc->is_initialized = 1;
Expand Down Expand Up @@ -680,12 +670,8 @@ static void async_call(void* raw) {
DRV_DEBUG("Sending %d terms", msg_index);
int response_msg_res = erl_drv_output_term(proc->port_term, msg, msg_index);
driver_free(msg);

DRV_DEBUG("Msg: %d", response_msg_res);

driver_free(proc->current_args);
driver_free(proc->current_function);

wasm_val_vec_delete(&results);
proc->current_import = NULL;

Expand Down Expand Up @@ -728,7 +714,6 @@ static void wasm_driver_stop(ErlDrvData raw) {
Proc* proc = (Proc*)raw;
DRV_DEBUG("Stopping WASM driver");

// TODO: We should probably lock a mutex here and in the import_response function.
if(proc->current_import) {
DRV_DEBUG("Shutting down during import response...");
proc->current_import->error_message = "WASM driver unloaded during import response";
Expand Down Expand Up @@ -861,7 +846,6 @@ static void wasm_driver_output(ErlDrvData raw, char *buff, ErlDrvSizeT bufflen)
msg[0] = ERL_DRV_ATOM;
msg[1] = atom_ok;
erl_drv_output_term(proc->port_term, msg, 2);
driver_free(msg);
}
else if (strcmp(command, "read") == 0) {
DRV_DEBUG("Read received");
Expand All @@ -877,7 +861,6 @@ static void wasm_driver_output(ErlDrvData raw, char *buff, ErlDrvSizeT bufflen)
send_error(proc, "Read request out of bounds");
return;
}
DRV_DEBUG("Read received. Ptr: %ld. Size: %ld", ptr, size_l);
byte_t* memory_data = wasm_memory_data(get_memory(proc));
DRV_DEBUG("Memory location to read from: %p", memory_data + ptr);

Expand All @@ -898,8 +881,6 @@ static void wasm_driver_output(ErlDrvData raw, char *buff, ErlDrvSizeT bufflen)

int msg_res = erl_drv_output_term(proc->port_term, msg, msg_index);
DRV_DEBUG("Read response sent: %d", msg_res);
driver_free(out_binary);
driver_free(msg);
}
else if (strcmp(command, "size") == 0) {
DRV_DEBUG("Size received");
Expand Down Expand Up @@ -954,4 +935,4 @@ DRIVER_INIT(wasm_driver) {
atom_import = driver_mk_atom("import");
atom_execution_result = driver_mk_atom("execution_result");
return &wasm_driver_entry;
}
}
55 changes: 28 additions & 27 deletions src/ar_bundles.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
%%% @doc Module for creating, signing, and verifying Arweave data items and bundles.

-define(BUNDLE_TAGS, [
{<<"Bundle-Format">>, <<"Binary">>},
{<<"Bundle-Version">>, <<"2.0.0">>}
{<<"bundle-format">>, <<"binary">>},
{<<"bundle-version">>, <<"2.0.0">>}
]).

-define(LIST_TAGS, [
{<<"Map-Format">>, <<"List">>}
{<<"map-format">>, <<"list">>}
]).

% How many bytes of a binary to print with `print/1'.
Expand Down Expand Up @@ -87,7 +87,7 @@ format(Item, Indent) ->
format_line("INCORRECT ITEM: ~p", [Item], Indent).

format_data(Item, Indent) when is_binary(Item#tx.data) ->
case lists:keyfind(<<"Bundle-Format">>, 1, Item#tx.tags) of
case lists:keyfind(<<"bundle-format">>, 1, Item#tx.tags) of
{_, _} ->
format_data(deserialize(serialize(Item)), Indent);
false ->
Expand Down Expand Up @@ -248,11 +248,11 @@ verify_item(DataItem) ->
ValidID andalso ValidSignature andalso ValidTags.

type(Item) when is_record(Item, tx) ->
lists:keyfind(<<"Bundle-Map">>, 1, Item#tx.tags),
case lists:keyfind(<<"Bundle-Map">>, 1, Item#tx.tags) of
{<<"Bundle-Map">>, _} ->
case lists:keyfind(<<"Map-Format">>, 1, Item#tx.tags) of
{<<"Map-Format">>, <<"List">>} -> list;
lists:keyfind(<<"bundle-map">>, 1, Item#tx.tags),
case lists:keyfind(<<"bundle-map">>, 1, Item#tx.tags) of
{<<"bundle-map">>, _} ->
case lists:keyfind(<<"map-format">>, 1, Item#tx.tags) of
{<<"map-format">>, <<"list">>} -> list;
_ -> map
end;
_ ->
Expand Down Expand Up @@ -525,11 +525,11 @@ add_list_tags(Tags) ->
add_manifest_tags(Tags, ManifestID) ->
lists:filter(
fun
({<<"Bundle-Map">>, _}) -> false;
({<<"bundle-map">>, _}) -> false;
(_) -> true
end,
Tags
) ++ [{<<"Bundle-Map">>, hb_util:encode(ManifestID)}].
) ++ [{<<"bundle-map">>, hb_util:encode(ManifestID)}].

finalize_bundle_data(Processed) ->
Length = <<(length(Processed)):256/integer>>,
Expand Down Expand Up @@ -562,8 +562,8 @@ new_manifest(Index) ->
TX = normalize(#tx{
format = ans104,
tags = [
{<<"Data-Protocol">>, <<"Bundle-Map">>},
{<<"Variant">>, <<"0.0.1">>}
{<<"data-protocol">>, <<"bundle-map">>},
{<<"variant">>, <<"0.0.1">>}
],
data = jiffy:encode(Index)
}),
Expand Down Expand Up @@ -691,18 +691,18 @@ deserialize(Bin, json) ->
end.

maybe_unbundle(Item) ->
Format = lists:keyfind(<<"Bundle-Format">>, 1, Item#tx.tags),
Version = lists:keyfind(<<"Bundle-Version">>, 1, Item#tx.tags),
Format = lists:keyfind(<<"bundle-format">>, 1, Item#tx.tags),
Version = lists:keyfind(<<"bundle-version">>, 1, Item#tx.tags),
case {Format, Version} of
{{<<"Bundle-Format">>, <<"Binary">>}, {<<"Bundle-Version">>, <<"2.0.0">>}} ->
{{<<"bundle-format">>, <<"binary">>}, {<<"bundle-version">>, <<"2.0.0">>}} ->
maybe_map_to_list(maybe_unbundle_map(Item));
_ ->
Item
end.

maybe_map_to_list(Item) ->
case lists:keyfind(<<"Map-Format">>, 1, Item#tx.tags) of
{<<"Map-Format">>, <<"List">>} ->
case lists:keyfind(<<"map-format">>, 1, Item#tx.tags) of
{<<"map-format">>, <<"List">>} ->
unbundle_list(Item);
_ ->
Item
Expand All @@ -720,8 +720,8 @@ unbundle_list(Item) ->
}.

maybe_unbundle_map(Bundle) ->
case lists:keyfind(<<"Bundle-Map">>, 1, Bundle#tx.tags) of
{<<"Bundle-Map">>, MapTXID} ->
case lists:keyfind(<<"bundle-map">>, 1, Bundle#tx.tags) of
{<<"bundle-map">>, MapTXID} ->
case unbundle(Bundle) of
detached -> Bundle#tx { data = detached };
Items ->
Expand Down Expand Up @@ -793,6 +793,7 @@ item_to_json_struct(
}
) ->
% Set "From" if From-Process is Tag or set with "Owner" address
?event({invoked_item_to_json_struct, {tags, Tags}, {owner, Owner}, {data, Data}}),
From =
case lists:filter(fun({Name, _}) -> Name =:= <<"From-Process">> end, Tags) of
[{_, FromProcess}] -> FromProcess;
Expand Down Expand Up @@ -1022,7 +1023,7 @@ assert_data_item(KeyType, Owner, Target, Anchor, Tags, Data, DataItem) ->
test_empty_bundle() ->
Bundle = serialize([]),
BundleItem = deserialize(Bundle),
?assertEqual([], BundleItem#tx.data).
?assertEqual(#{}, BundleItem#tx.data).

test_bundle_with_one_item() ->
Item = new_item(
Expand All @@ -1033,7 +1034,7 @@ test_bundle_with_one_item() ->
),
Bundle = serialize([Item]),
BundleItem = deserialize(Bundle),
?assertEqual(ItemData, (erlang:hd(BundleItem#tx.data))#tx.data).
?assertEqual(ItemData, (maps:get(<<"1">>, BundleItem#tx.data))#tx.data).

test_bundle_with_two_items() ->
Item1 = new_item(
Expand All @@ -1050,8 +1051,8 @@ test_bundle_with_two_items() ->
),
Bundle = serialize([Item1, Item2]),
BundleItem = deserialize(Bundle),
?assertEqual(ItemData1, (erlang:hd(BundleItem#tx.data))#tx.data),
?assertEqual(ItemData2, (erlang:hd(tl(BundleItem#tx.data)))#tx.data).
?assertEqual(ItemData1, (maps:get(<<"1">>, BundleItem#tx.data))#tx.data),
?assertEqual(ItemData2, (maps:get(<<"2">>, BundleItem#tx.data))#tx.data).

test_recursive_bundle() ->
W = ar_wallet:new(),
Expand All @@ -1072,9 +1073,9 @@ test_recursive_bundle() ->
}, W),
Bundle = serialize([Item3]),
BundleItem = deserialize(Bundle),
[UnbundledItem3] = BundleItem#tx.data,
[UnbundledItem2] = UnbundledItem3#tx.data,
[UnbundledItem1] = UnbundledItem2#tx.data,
#{<<"1">> := UnbundledItem3} = BundleItem#tx.data,
#{<<"1">> := UnbundledItem2} = UnbundledItem3#tx.data,
#{<<"1">> := UnbundledItem1} = UnbundledItem2#tx.data,
?assert(verify_item(UnbundledItem1)),
% TODO: Verify bundled lists...
?assertEqual(Item1#tx.data, UnbundledItem1#tx.data).
Expand Down
6 changes: 3 additions & 3 deletions src/ar_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ init(Opts) ->
{help, "The total amount of bytes posted via HTTP, per remote endpoint"},
{labels, [route]}
]),
?event(debug, started),
?event(started),
{ok, #state{ opts = Opts }}.

handle_call({get_connection, Args}, From,
Expand Down Expand Up @@ -400,8 +400,8 @@ await_response(Args, Opts) ->
Opts
);
false ->
log(err, http_fetched_too_much_data, Args,
<<"Fetched too much data">>, Opts),
?event(error, {http_fetched_too_much_data, Args,
<<"Fetched too much data">>, Opts}),
{error, too_much_data}
end
end;
Expand Down
2 changes: 1 addition & 1 deletion src/ar_rate_limiter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ handle_cast({throttle, Peer, Path, From}, State) ->
%% Try to approach but not hit the limit.
case N2 + 1 > max(1, HalfLimit * 80 / 100) of
true ->
?event(debug,
?event(
{approaching_peer_rpm_limit,
{peer, Peer},
{path, Path},
Expand Down
8 changes: 4 additions & 4 deletions src/dev_cron.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
}).

init(State = #{ process := ProcM }, Params) ->
case lists:keyfind(<<"Time">>, 1, Params) of
{<<"Time">>, CronTime} ->
case lists:keyfind(<<"time">>, 1, Params) of
{<<"time">>, CronTime} ->
MilliSecs = parse_time(CronTime),
%% TODO: What's the most sensible way to initialize the last_run?
%% Current behavior: Timer starts after _first_ message.
Expand Down Expand Up @@ -57,8 +57,8 @@ execute(_, S) ->

timestamp(M) ->
% TODO: Process this properly
case lists:keyfind(<<"Timestamp">>, 1, M#tx.tags) of
{<<"Timestamp">>, TSBin} ->
case lists:keyfind(<<"timestamp">>, 1, M#tx.tags) of
{<<"timestamp">>, TSBin} ->
list_to_integer(binary_to_list(TSBin));
false ->
0
Expand Down
16 changes: 8 additions & 8 deletions src/dev_cu.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@ execute(CarrierMsg, S) ->
Wallet = hb:wallet(),
{ok, Results} =
case MaybeBundle of
#tx{data = #{ <<"Message">> := _Msg, <<"Assignment">> := Assignment }} ->
#tx{data = #{ <<"message">> := _Msg, <<"assignment">> := Assignment }} ->
% TODO: Execute without needing to call the SU unnecessarily.
{_, ProcID} = lists:keyfind(<<"Process">>, 1, Assignment#tx.tags),
{_, ProcID} = lists:keyfind(<<"process">>, 1, Assignment#tx.tags),
?event({dev_cu_computing_from_full_assignment, {process, ProcID}, {slot, hb_util:id(Assignment, signed)}}),
hb_process:result(ProcID, hb_util:id(Assignment, signed), Store, Wallet);
_ ->
case lists:keyfind(<<"Process">>, 1, CarrierMsg#tx.tags) of
case lists:keyfind(<<"process">>, 1, CarrierMsg#tx.tags) of
{_, Process} ->
{_, Slot} = lists:keyfind(<<"Slot">>, 1, CarrierMsg#tx.tags),
{_, Slot} = lists:keyfind(<<"slot">>, 1, CarrierMsg#tx.tags),
?event({dev_cu_computing_from_slot_ref, {process, Process}, {slot, Slot}}),
hb_process:result(Process, Slot, Store, Wallet);
false ->
{error, no_viable_computation}
end
end,
{ResType, ModState = #{ results := _ModResults }} =
case lists:keyfind(<<"Attest-To">>, 1, CarrierMsg#tx.tags) of
case lists:keyfind(<<"attest-to">>, 1, CarrierMsg#tx.tags) of
{_, RawAttestTo} ->
AttestTo = hb_util:decode(RawAttestTo),
?event({attest_to_only_message, RawAttestTo}),
Expand All @@ -52,7 +52,7 @@ execute(CarrierMsg, S) ->
S#{
results =>
#tx {
tags = [{<<"Status">>, <<"404">>}],
tags = [{<<"status-code">>, 404}],
data = <<"Requested message to attest to not in results bundle.">>
}
}
Expand All @@ -63,8 +63,8 @@ execute(CarrierMsg, S) ->
results => ar_bundles:sign_item(
#tx {
tags = [
{<<"Status">>, <<"200">>},
{<<"Attestation-For">>, RawAttestTo}
{<<"status-code">>, 200},
{<<"attestation-for">>, RawAttestTo}
],
data = <<>>
},
Expand Down
Loading

0 comments on commit d7d4a77

Please sign in to comment.