Skip to content

Commit

Permalink
WIP refactor and missing/bad proofs bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Jan 6, 2025
1 parent 94e4013 commit 8f8d258
Show file tree
Hide file tree
Showing 8 changed files with 590 additions and 481 deletions.
431 changes: 62 additions & 369 deletions apps/arweave/src/ar_chunk_storage.erl

Large diffs are not rendered by default.

159 changes: 97 additions & 62 deletions apps/arweave/src/ar_data_sync.erl

Large diffs are not rendered by default.

380 changes: 380 additions & 0 deletions apps/arweave/src/ar_entropy_storage.erl

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions apps/arweave/src/ar_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,8 @@ register() ->
{help, "The number of chunks in the packing server queue."}]),
prometheus_gauge:new([{name, chunk_cache_size},
{help, "The number of chunks scheduled for downloading."}]),
prometheus_counter:new([{name, chunks_without_entropy_stored},
{help, "The counter is incremented every time a 2.9 unpacked chunk is written to "
"chunk_storage to be enciphered later."}]),
prometheus_counter:new([{name, chunks_stored},
{labels, [packing]},
{help, "The counter is incremented every time a chunk is written to "
"chunk_storage."}]),

Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_node_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,8 @@ handle_found_solution(Args, PrevB, State) ->
{solution, ar_util:encode(SolutionH)}, {height, Height},
{step_number, StepNumber}, {steps, length(Steps)},
{txs, length(B#block.txs)},
{recall_byte1, B#block.recall_byte},
{recall_byte2, B#block.recall_byte2},
{chunks,
case B#block.recall_byte2 of
undefined -> 1;
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_storage_module.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ get_strict(Offset, Packing) ->
RepackInPlaceModulesStoreIDs = [
{{BucketSize, Bucket, TargetPacking}, ar_storage_module:id(Module)}
|| {{BucketSize, Bucket, _Packing} = Module, TargetPacking} <- Config#config.repack_in_place_storage_modules],
ModuleStoreIDs = [ar_storage_module:id(Module)
ModuleStoreIDs = [{Module, ar_storage_module:id(Module)}
|| Module <- Config#config.storage_modules],
get_strict(Offset, Packing, ModuleStoreIDs ++ RepackInPlaceModulesStoreIDs).

Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ init([]) ->
ets:new(ar_data_sync_worker_master, [set, public, named_table]),
ets:new(ar_data_sync_state, [set, public, named_table, {read_concurrency, true}]),
ets:new(ar_chunk_storage, [set, public, named_table]),
ets:new(ar_entropy_storage, [set, public, named_table]),
ets:new(ar_mining_stats, [set, public, named_table]),
ets:new(ar_global_sync_record, [set, public, named_table]),
ets:new(ar_disk_pool_data_roots, [set, public, named_table, {read_concurrency, true}]),
Expand Down
92 changes: 46 additions & 46 deletions apps/arweave/src/ar_sync_record.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
%% this record can remain very small compared to the space taken by
%% chunk identifiers, whose number grows unlimited with time.
sync_record_by_id,
%% A map {ID, Type} => Intervals.
%% A map {ID, Packing} => Intervals.
sync_record_by_id_type,
%% The name of the WAL store.
state_db,
Expand Down Expand Up @@ -75,9 +75,9 @@ get(ID, StoreID) ->
end.

%% @doc Return the set of intervals.
get(ID, Type, StoreID) ->
get(ID, Packing, StoreID) ->
GenServerID = list_to_atom("ar_sync_record_" ++ ar_storage_module:label_by_id(StoreID)),
case catch gen_server:call(GenServerID, {get, Type, ID}, 20000) of
case catch gen_server:call(GenServerID, {get, Packing, ID}, 20000) of
{'EXIT', {timeout, {gen_server, call, _}}} ->
{error, timeout};
Reply ->
Expand All @@ -96,11 +96,11 @@ add(End, Start, ID, StoreID) ->
end.

%% @doc Add the given interval to the record with the
%% given ID and Type. Store the changes on disk before
%% given ID and Packing. Store the changes on disk before
%% returning ok.
add(End, Start, Type, ID, StoreID) ->
add(End, Start, Packing, ID, StoreID) ->
GenServerID = list_to_atom("ar_sync_record_" ++ ar_storage_module:label_by_id(StoreID)),
case catch gen_server:call(GenServerID, {add, End, Start, Type, ID}, 120000) of
case catch gen_server:call(GenServerID, {add, End, Start, Packing, ID}, 120000) of
{'EXIT', {timeout, {gen_server, call, _}}} ->
{error, timeout};
Reply ->
Expand All @@ -114,9 +114,9 @@ add_async(Event, End, Start, ID, StoreID) ->

%% @doc Special case of add/5 for repacked chunks. When repacking the ar_sync_record add
%% happens at the end so we don't need to block on it to complete.
add_async(Event, End, Start, Type, ID, StoreID) ->
add_async(Event, End, Start, Packing, ID, StoreID) ->
GenServerID = list_to_atom("ar_sync_record_" ++ ar_storage_module:label_by_id(StoreID)),
gen_server:cast(GenServerID, {add_async, Event, End, Start, Type, ID}).
gen_server:cast(GenServerID, {add_async, Event, End, Start, Packing, ID}).

%% @doc Remove the given interval from the record
%% with the given ID. Store the changes on disk before
Expand All @@ -142,7 +142,7 @@ cut(Offset, ID, StoreID) ->
Reply
end.

%% @doc Return {true, StoreID} or {{true, Type}, StoreID} if a chunk containing
%% @doc Return {true, StoreID} or {{true, Packing}, StoreID} if a chunk containing
%% the given Offset is found in the record with the given ID, false otherwise.
%% If several types are recorded for the chunk, only one of them is returned,
%% the choice is not defined. If the chunk is stored in the default storage module,
Expand All @@ -152,14 +152,14 @@ cut(Offset, ID, StoreID) ->
%% The offset is 1-based - if a chunk consists of a single
%% byte that is the first byte of the weave, is_recorded(0, ID)
%% returns false and is_recorded(1, ID) returns true.
is_recorded(Offset, {ID, Type}) ->
case is_recorded(Offset, Type, ID, "default") of
is_recorded(Offset, {ID, Packing}) ->
case is_recorded(Offset, Packing, ID, "default") of
true ->
{{true, Type}, "default"};
{{true, Packing}, "default"};
false ->
StorageModules = [Module
|| {_, _, Packing} = Module <- ar_storage_module:get_all(Offset),
Packing == Type],
|| {_, _, ModulePacking} = Module <- ar_storage_module:get_all(Offset),
ModulePacking == Packing],
is_recorded_any_by_type(Offset, ID, StorageModules)
end;
is_recorded(Offset, ID) ->
Expand All @@ -171,7 +171,7 @@ is_recorded(Offset, ID) ->
{Reply, "default"}
end.

%% @doc Return true or {true, Type} if a chunk containing
%% @doc Return true or {true, Packing} if a chunk containing
%% the given Offset is found in the record with the given ID
%% in the storage module identified by StoreID, false otherwise.
is_recorded(Offset, ID, StoreID) ->
Expand All @@ -186,17 +186,17 @@ is_recorded(Offset, ID, StoreID) ->
case is_recorded2(Offset, ets:first(sync_records), ID, StoreID) of
false ->
true;
{true, Type} ->
{true, Type}
{true, Packing} ->
{true, Packing}
end
end
end.

%% @doc Return true if a chunk containing the given Offset and Type
%% @doc Return true if a chunk containing the given Offset and Packing
%% is found in the record in the storage module identified by StoreID,
%% false otherwise.
is_recorded(Offset, Type, ID, StoreID) ->
case ets:lookup(sync_records, {ID, Type, StoreID}) of
is_recorded(Offset, Packing, ID, StoreID) ->
case ets:lookup(sync_records, {ID, Packing, StoreID}) of
[] ->
false;
[{_, TID}] ->
Expand Down Expand Up @@ -228,8 +228,8 @@ get_next_unsynced_interval(Offset, EndOffsetUpperBound, ID, StoreID) ->
%% @doc Return the lowest synced interval with the end offset strictly above the given Offset
%% and at most EndOffsetUpperBound.
%% Return not_found if there are no such intervals.
get_next_synced_interval(Offset, EndOffsetUpperBound, Type, ID, StoreID) ->
case ets:lookup(sync_records, {ID, Type, StoreID}) of
get_next_synced_interval(Offset, EndOffsetUpperBound, Packing, ID, StoreID) ->
case ets:lookup(sync_records, {ID, Packing, StoreID}) of
[] ->
not_found;
[{_, TID}] ->
Expand Down Expand Up @@ -296,16 +296,16 @@ handle_call({get, ID}, _From, State) ->
#state{ sync_record_by_id = SyncRecordByID } = State,
{reply, maps:get(ID, SyncRecordByID, ar_intervals:new()), State};

handle_call({get, Type, ID}, _From, State) ->
handle_call({get, Packing, ID}, _From, State) ->
#state{ sync_record_by_id_type = SyncRecordByIDType } = State,
{reply, maps:get({ID, Type}, SyncRecordByIDType, ar_intervals:new()), State};
{reply, maps:get({ID, Packing}, SyncRecordByIDType, ar_intervals:new()), State};

handle_call({add, End, Start, ID}, _From, State) ->
{Reply, State2} = add2(End, Start, ID, State),
{reply, Reply, State2};

handle_call({add, End, Start, Type, ID}, _From, State) ->
{Reply, State2} = add2(End, Start, Type, ID, State),
handle_call({add, End, Start, Packing, ID}, _From, State) ->
{Reply, State2} = add2(End, Start, Packing, ID, State),
{reply, Reply, State2};

handle_call({delete, End, Start, ID}, _From, State) ->
Expand Down Expand Up @@ -413,21 +413,21 @@ handle_cast({add_async, Event, End, Start, ID}, State) ->
end,
{noreply, State2};

handle_cast({add_async, Event, End, Start, Type, ID}, State) ->
{Reply, State2} = add2(End, Start, Type, ID, State),
handle_cast({add_async, Event, End, Start, Packing, ID}, State) ->
{Reply, State2} = add2(End, Start, Packing, ID, State),
case Reply of
ok ->
?LOG_DEBUG([{event, Event},
{status, success},
{sync_record_id, ID},
{offset, End},
{packing, ar_serialize:encode_packing(Type, true)}]);
{packing, ar_serialize:encode_packing(Packing, true)}]);
Error ->
?LOG_ERROR([{event, Event},
{status, failed},
{sync_record_id, ID},
{offset, End},
{packing, ar_serialize:encode_packing(Type, true)},
{packing, ar_serialize:encode_packing(Packing, true)},
{error, io_lib:format("~p", [Error])}])
end,
{noreply, State2};
Expand Down Expand Up @@ -466,13 +466,13 @@ add2(End, Start, ID, State) ->
end,
{Reply, State3}.

add2(End, Start, Type, ID, State) ->
add2(End, Start, Packing, ID, State) ->
#state{ sync_record_by_id = SyncRecordByID, sync_record_by_id_type = SyncRecordByIDType,
state_db = StateDB, store_id = StoreID } = State,
ByType = maps:get({ID, Type}, SyncRecordByIDType, ar_intervals:new()),
ByType = maps:get({ID, Packing}, SyncRecordByIDType, ar_intervals:new()),
ByType2 = ar_intervals:add(ByType, End, Start),
SyncRecordByIDType2 = maps:put({ID, Type}, ByType2, SyncRecordByIDType),
TypeTID = get_or_create_type_tid({ID, Type, StoreID}),
SyncRecordByIDType2 = maps:put({ID, Packing}, ByType2, SyncRecordByIDType),
TypeTID = get_or_create_type_tid({ID, Packing, StoreID}),
ar_ets_intervals:add(TypeTID, End, Start),
SyncRecord = maps:get(ID, SyncRecordByID, ar_intervals:new()),
SyncRecord2 = ar_intervals:add(SyncRecord, End, Start),
Expand All @@ -481,7 +481,7 @@ add2(End, Start, Type, ID, State) ->
ar_ets_intervals:add(TID, End, Start),
State2 = State#state{ sync_record_by_id = SyncRecordByID2,
sync_record_by_id_type = SyncRecordByIDType2 },
{Reply, State3} = update_write_ahead_log({{add, Type}, {End, Start, ID}}, StateDB, State2),
{Reply, State3} = update_write_ahead_log({{add, Packing}, {End, Start, ID}}, StateDB, State2),
case Reply of
ok ->
emit_add_range(Start, End, ID, StoreID);
Expand Down Expand Up @@ -515,14 +515,14 @@ is_recorded_any(_Offset, _ID, []) ->

is_recorded2(_Offset, '$end_of_table', _ID, _StoreID) ->
false;
is_recorded2(Offset, {ID, Type, StoreID}, ID, StoreID) ->
case ets:lookup(sync_records, {ID, Type, StoreID}) of
is_recorded2(Offset, {ID, Packing, StoreID}, ID, StoreID) ->
case ets:lookup(sync_records, {ID, Packing, StoreID}) of
[{_, TID}] ->
case ar_ets_intervals:is_inside(TID, Offset) of
true ->
{true, Type};
{true, Packing};
false ->
is_recorded2(Offset, ets:next(sync_records, {ID, Type, StoreID}), ID,
is_recorded2(Offset, ets:next(sync_records, {ID, Packing, StoreID}), ID,
StoreID)
end;
[] ->
Expand Down Expand Up @@ -576,15 +576,15 @@ replay_write_ahead_log(SyncRecordByID, SyncRecordByIDType, N, WAL, StateDB, Stor
SyncRecordByID2 = maps:put(ID, SyncRecord2, SyncRecordByID),
replay_write_ahead_log(
SyncRecordByID2, SyncRecordByIDType, N + 1, WAL, StateDB, StoreID);
{add, Type} ->
{add, Packing} ->
{End, Start, ID} = Params,
SyncRecord = maps:get(ID, SyncRecordByID, ar_intervals:new()),
SyncRecord2 = ar_intervals:add(SyncRecord, End, Start),
SyncRecordByID2 = maps:put(ID, SyncRecord2, SyncRecordByID),
ByType = maps:get({ID, Type}, SyncRecordByIDType, ar_intervals:new()),
ByType = maps:get({ID, Packing}, SyncRecordByIDType, ar_intervals:new()),
ByType2 = ar_intervals:add(ByType, End, Start),
emit_add_range(Start, End, ID, StoreID),
SyncRecordByIDType2 = maps:put({ID, Type}, ByType2, SyncRecordByIDType),
SyncRecordByIDType2 = maps:put({ID, Packing}, ByType2, SyncRecordByIDType),
replay_write_ahead_log(
SyncRecordByID2, SyncRecordByIDType2, N + 1, WAL, StateDB, StoreID);
delete ->
Expand Down Expand Up @@ -653,10 +653,10 @@ initialize_sync_record_by_id_type_ets(SyncRecordByIDType, StoreID) ->

initialize_sync_record_by_id_type_ets2(none, _StoreID) ->
ok;
initialize_sync_record_by_id_type_ets2({{ID, Type}, SyncRecord, Iterator}, StoreID) ->
initialize_sync_record_by_id_type_ets2({{ID, Packing}, SyncRecord, Iterator}, StoreID) ->
TID = ets:new(sync_record_type, [ordered_set, public, {read_concurrency, true}]),
ar_ets_intervals:init_from_gb_set(TID, SyncRecord),
ets:insert(sync_records, {{ID, Type, StoreID}, TID}),
ets:insert(sync_records, {{ID, Packing, StoreID}, TID}),
initialize_sync_record_by_id_type_ets2(maps:next(Iterator), StoreID).

store_state(State) ->
Expand Down Expand Up @@ -687,9 +687,9 @@ store_state(State) ->
{Error2, State};
ok ->
maps:map(
fun ({ar_data_sync, Type}, TypeRecord) ->
fun ({ar_data_sync, Packing}, TypeRecord) ->
ar_mining_stats:set_storage_module_data_size(
StoreID, Type, PartitionNumber, StorageModuleSize,
StoreID, Packing, PartitionNumber, StorageModuleSize,
StorageModuleIndex,
ar_intervals:sum(TypeRecord));
(_, _) ->
Expand Down

0 comments on commit 8f8d258

Please sign in to comment.