diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index ee8bd7384..313524266 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -4,11 +4,12 @@ -behaviour(gen_server). -export([start_link/2, name/1, is_storage_supported/3, put/2, put/3, - open_files/1, get/1, get/2, get/5, read_chunk2/5, get_range/2, get_range/3, - close_file/2, close_files/1, cut/2, delete/1, delete/2, + open_files/1, get/1, get/2, get/3,get/5, locate_chunk_on_disk/2, get_range/2, get_range/3, + get_handle_by_filepath/1, close_file/2, close_files/1, cut/2, delete/1, delete/2, list_files/2, run_defragmentation/0, get_storage_module_path/2, get_chunk_storage_path/2, is_prepared/1, - get_chunk_bucket_start/1]). + get_chunk_bucket_start/1, get_chunk_bucket_end/1, + sync_record_id/1, write_chunk/4, write_chunk2/6, record_chunk/5]). -export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). @@ -116,7 +117,7 @@ get(Byte) -> %% @doc Return {AbsoluteEndOffset, Chunk} for the chunk containing the given byte. get(Byte, StoreID) -> - case ar_sync_record:get_interval(Byte + 1, ?MODULE, StoreID) of + case ar_sync_record:get_interval(Byte + 1, ar_chunk_storage, StoreID) of not_found -> not_found; {_End, IntervalStart} -> @@ -135,6 +136,16 @@ get(Byte, IntervalStart, StoreID) -> {EndOffset, Chunk} end. +locate_chunk_on_disk(PaddedEndOffset, StoreID) -> + locate_chunk_on_disk(PaddedEndOffset, StoreID, #{}). + +locate_chunk_on_disk(PaddedEndOffset, StoreID, FileIndex) -> + ChunkFileStart = get_chunk_file_start(PaddedEndOffset), + Filepath = filepath(ChunkFileStart, FileIndex, StoreID), + {Position, ChunkOffset} = + get_position_and_relative_chunk_offset(ChunkFileStart, PaddedEndOffset), + {ChunkFileStart, Filepath, Position, ChunkOffset}. + %% @doc Return a list of {AbsoluteEndOffset, Chunk} pairs for the stored chunks %% inside the given range. The given interval does not have to cover every chunk %% completely - we return all chunks at the intersection with the range. @@ -148,7 +159,7 @@ get_range(Start, Size) -> %% at most Start + Size + ?DATA_CHUNK_SIZE - 1. get_range(Start, Size, StoreID) -> ?assert(Size < get_chunk_group_size()), - case ar_sync_record:get_next_synced_interval(Start, infinity, ?MODULE, StoreID) of + case ar_sync_record:get_next_synced_interval(Start, infinity, ar_chunk_storage, StoreID) of {_End, IntervalStart} when Start + Size > IntervalStart -> Start2 = max(Start, IntervalStart), Size2 = Start + Size - Start2, @@ -191,7 +202,7 @@ close_files(StoreID) -> %% @doc Soft-delete everything above the given end offset. cut(Offset, StoreID) -> - ar_sync_record:cut(Offset, ?MODULE, StoreID). + ar_sync_record:cut(Offset, ar_chunk_storage, StoreID). %% @doc Remove the chunk with the given end offset. delete(Offset) -> @@ -430,7 +441,7 @@ handle_cast(do_prepare_replica_2_9, State) -> waiting_for_repack -> waiting_for_repack; false -> - is_replica_2_9_entropy_sub_chunk_recorded( + ar_entropy_storage:is_sub_chunk_recorded( PaddedEndOffset, SubChunkStart, StoreID) end, StoreEntropy = @@ -443,8 +454,8 @@ handle_cast(do_prepare_replica_2_9, State) -> is_recorded; false -> %% Get all the entropies needed to encipher the chunk at PaddedEndOffset. - Entropies = generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart), - EntropyKeys = generate_entropy_keys( + Entropies = ar_entropy_storage:generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart), + EntropyKeys = ar_entropy_storage:generate_entropy_keys( RewardAddr, PaddedEndOffset, SubChunkStart), SliceIndex = ar_replica_2_9:get_slice_index(PaddedEndOffset), %% If we are not at the beginning of the entropy, shift the offset to @@ -453,14 +464,14 @@ handle_cast(do_prepare_replica_2_9, State) -> %% to the neighbouring storage module(s) on the left or on the right %% since the storage module may be configured to be smaller than the %% partition. - PaddedEndOffset2 = shift_replica_2_9_entropy_offset( + PaddedEndOffset2 = ar_entropy_storage:shift_entropy_offset( PaddedEndOffset, -SliceIndex), %% The end of a recall partition (3.6TB) may fall in the middle of a chunk, so %% we'll use the padded offset to end the store_entropy iteration. PartitionEnd = (Partition + 1) * ?PARTITION_SIZE, PaddedPartitionEnd = get_chunk_bucket_end(ar_block:get_chunk_padded_offset(PartitionEnd)), - store_entropy(Entropies, PaddedEndOffset2, SubChunkStart, PaddedPartitionEnd, + ar_entropy_storage:store_entropy(Entropies, PaddedEndOffset2, SubChunkStart, PaddedPartitionEnd, EntropyKeys, RewardAddr, 0, 0) end, ?LOG_DEBUG([{event, do_prepare_replica_2_9}, {store_id, StoreID}, @@ -557,22 +568,20 @@ handle_call(is_prepared, _From, #state{ is_prepared = IsPrepared } = State) -> handle_call({put, PaddedEndOffset, Chunk}, _From, State) when byte_size(Chunk) == ?DATA_CHUNK_SIZE -> case handle_store_chunk(PaddedEndOffset, Chunk, State) of - {ok, FileIndex2} -> - {reply, ok, State#state{ file_index = FileIndex2 }}; + {ok, FileIndex2, Packing} -> + {reply, {ok, Packing}, State#state{ file_index = FileIndex2 }}; Error -> {reply, Error, State} end; handle_call({delete, PaddedEndOffset}, _From, State) -> #state{ file_index = FileIndex, store_id = StoreID } = State, - ChunkFileStart = get_chunk_file_start(PaddedEndOffset), - Filepath = filepath(ChunkFileStart, FileIndex, StoreID), StartOffset = PaddedEndOffset - ?DATA_CHUNK_SIZE, - case ar_sync_record:delete(PaddedEndOffset, StartOffset, ?MODULE, StoreID) of + case ar_sync_record:delete(PaddedEndOffset, StartOffset, ar_chunk_storage, StoreID) of ok -> - case delete_replica_2_9_entropy_record(PaddedEndOffset, StoreID) of + case ar_entropy_storage:delete_record(PaddedEndOffset, StoreID) of ok -> - case delete_chunk(PaddedEndOffset, ChunkFileStart, Filepath) of + case delete_chunk(PaddedEndOffset, StoreID) of ok -> {reply, ok, State}; Error -> @@ -592,7 +601,7 @@ handle_call(reset, _, #state{ store_id = StoreID, file_index = FileIndex } = Sta end, FileIndex ), - ok = ar_sync_record:cut(0, ?MODULE, StoreID), + ok = ar_sync_record:cut(0, ar_chunk_storage, StoreID), erlang:erase(), {reply, ok, State#state{ file_index = #{} }}; @@ -631,7 +640,7 @@ handle_info({chunk, {packed, Ref, ChunkArgs}}, %% send the chunk to the corresponding ar_data_sync %% module to store it in RocksDB. ar_sync_record:delete(PaddedEndOffset, StartOffset, - ?MODULE, StoreID); + ar_chunk_storage, StoreID); Error2 -> Error2 end, @@ -644,20 +653,13 @@ handle_info({chunk, {packed, Ref, ChunkArgs}}, prev_repack_cursor = PrevCursor }}; store -> case handle_store_chunk(PaddedEndOffset, Chunk, State2) of - {ok, FileIndex2} -> + {ok, FileIndex2, NewPacking} -> ar_sync_record:add_async(repacked_chunk, PaddedEndOffset, StartOffset, - Packing, ar_data_sync, StoreID), + NewPacking, ar_data_sync, StoreID), {noreply, State2#state{ file_index = FileIndex2, repack_cursor = PaddedEndOffset, prev_repack_cursor = PrevCursor }}; - {error, stored_without_entropy} -> - ar_sync_record:add_async(unpacked_padded, - PaddedEndOffset, PaddedEndOffset - ?DATA_CHUNK_SIZE, - Packing, ar_data_sync, StoreID), - {noreply, State2#state{ - repack_cursor = PaddedEndOffset, - prev_repack_cursor = PrevCursor }}; Error3 -> PackingStr = ar_serialize:encode_packing(Packing, true), ?LOG_ERROR([{event, failed_to_store_repacked_chunk}, @@ -751,82 +753,27 @@ get_filepath(Name, StoreID) -> filename:join([ChunkDir, Name]). handle_store_chunk(PaddedEndOffset, Chunk, State) -> - #state{ store_id = StoreID } = State, - case ar_storage_module:get_packing(StoreID) of + #state{ store_id = StoreID, is_prepared = IsPrepared, file_index = FileIndex } = State, + Packing = ar_storage_module:get_packing(StoreID), + case Packing of {replica_2_9, Addr} -> - handle_store_chunk_replica_2_9(PaddedEndOffset, Chunk, Addr, State); + ar_entropy_storage:record_chunk(PaddedEndOffset, Chunk, Addr, StoreID, FileIndex, IsPrepared); _ -> - handle_store_chunk2(PaddedEndOffset, Chunk, State) - end. - -handle_store_chunk_replica_2_9(PaddedEndOffset, Chunk, RewardAddr, State) -> - #state{ store_id = StoreID, is_prepared = IsPrepared } = State, - StartOffset = PaddedEndOffset - ?DATA_CHUNK_SIZE, - ChunkFileStart = get_chunk_file_start(PaddedEndOffset), - Filepath = filepath(ChunkFileStart, StoreID), - acquire_replica_2_9_semaphore(Filepath), - CheckIsStoredAlready = - ar_sync_record:is_recorded(PaddedEndOffset, ?MODULE, StoreID), - CheckIsEntropyRecorded = - case CheckIsStoredAlready of - true -> - {error, already_stored}; - false -> - is_replica_2_9_entropy_recorded(PaddedEndOffset, StoreID) - end, - ReadEntropy = - case CheckIsEntropyRecorded of - {error, _} = Error -> - Error; - false -> - case IsPrepared of - false -> - no_entropy_yet; - true -> - missing_entropy - end; - true -> - get(StartOffset, StartOffset, StoreID) - end, - case ReadEntropy of - {error, _} = Error2 -> - release_replica_2_9_semaphore(Filepath), - Error2; - not_found -> - release_replica_2_9_semaphore(Filepath), - {error, not_prepared_yet2}; - missing_entropy -> - Entropy = generate_missing_replica_2_9_entropy(PaddedEndOffset, RewardAddr), - PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy), - Result = handle_store_chunk2(PaddedEndOffset, PackedChunk, State), - release_replica_2_9_semaphore(Filepath), - Result; - no_entropy_yet -> - Result = handle_store_chunk_no_entropy(PaddedEndOffset, Chunk, State), - release_replica_2_9_semaphore(Filepath), - Result; - {_EndOffset, Entropy} -> - release_replica_2_9_semaphore(Filepath), - PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy), - handle_store_chunk2(PaddedEndOffset, PackedChunk, State) + record_chunk(PaddedEndOffset, Chunk, Packing, StoreID, FileIndex) end. -handle_store_chunk_no_entropy(PaddedEndOffset, Chunk, State) -> - #state{ file_index = FileIndex, store_id = StoreID } = State, - ChunkFileStart = get_chunk_file_start(PaddedEndOffset), - case store_chunk(ChunkFileStart, PaddedEndOffset, Chunk, FileIndex, StoreID) of +record_chunk(PaddedEndOffset, Chunk, Packing, StoreID, FileIndex) -> + case write_chunk(PaddedEndOffset, Chunk, FileIndex, StoreID) of {ok, Filepath} -> - prometheus_counter:inc(chunks_without_entropy_stored), - %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new - %% sync_record id (ar_chunk_storage_replica_2_9_1_unpacked) going forward. - %% The old id (ar_chunk_storage_replica_2_9_unpacked) should not be used. - ID = ar_chunk_storage_replica_2_9_1_unpacked, + prometheus_counter:inc(chunks_stored, [Packing]), case ar_sync_record:add( - PaddedEndOffset, PaddedEndOffset - ?DATA_CHUNK_SIZE, ID, StoreID) of + PaddedEndOffset, PaddedEndOffset - ?DATA_CHUNK_SIZE, + sync_record_id(Packing), StoreID) of ok -> + ChunkFileStart = get_chunk_file_start(PaddedEndOffset), ets:insert(chunk_storage_file_index, - {{ChunkFileStart, StoreID}, Filepath}), - {error, stored_without_entropy}; + {{ChunkFileStart, StoreID}, Filepath}), + {ok, maps:put(ChunkFileStart, Filepath, FileIndex), Packing}; Error -> Error end; @@ -834,23 +781,13 @@ handle_store_chunk_no_entropy(PaddedEndOffset, Chunk, State) -> Error2 end. -handle_store_chunk2(PaddedEndOffset, Chunk, State) -> - #state{ file_index = FileIndex, store_id = StoreID } = State, - ChunkFileStart = get_chunk_file_start(PaddedEndOffset), - case store_chunk(ChunkFileStart, PaddedEndOffset, Chunk, FileIndex, StoreID) of - {ok, Filepath} -> - prometheus_counter:inc(chunks_stored), - case ar_sync_record:add( - PaddedEndOffset, PaddedEndOffset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of - ok -> - ets:insert(chunk_storage_file_index, {{ChunkFileStart, StoreID}, Filepath}), - {ok, maps:put(ChunkFileStart, Filepath, FileIndex)}; - Error -> - Error - end; - Error2 -> - Error2 - end. +sync_record_id(unpacked_padded) -> + %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new + %% sync_record id (ar_chunk_storage_replica_2_9_1_unpacked) going forward. + %% The old id (ar_chunk_storage_replica_2_9_unpacked) should not be used. + ar_chunk_storage_replica_2_9_1_unpacked; +sync_record_id(_Packing) -> + ar_chunk_storage. get_chunk_file_start(EndOffset) -> StartOffset = EndOffset - ?DATA_CHUNK_SIZE, @@ -862,311 +799,16 @@ get_chunk_file_start_by_start_offset(StartOffset) -> get_chunk_bucket_end(PaddedEndOffset) -> get_chunk_bucket_start(PaddedEndOffset) + ?DATA_CHUNK_SIZE. -%% @doc Return true if the given sub-chunk bucket contains the 2.9 entropy. -is_replica_2_9_entropy_sub_chunk_recorded( - PaddedEndOffset, SubChunkBucketStartOffset, StoreID) -> - %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new - %% sync_record id (ar_chunk_storage_replica_2_9_1_entropy) going forward. - %% The old id (ar_chunk_storage_replica_2_9_entropy) should not be used. - ID = ar_chunk_storage_replica_2_9_1_entropy, - ChunkBucketStart = get_chunk_bucket_start(PaddedEndOffset), - SubChunkBucketStart = ChunkBucketStart + SubChunkBucketStartOffset, - ar_sync_record:is_recorded(SubChunkBucketStart + 1, ID, StoreID). - -%% @doc Return true if the 2.9 entropy for every sub-chunk of the chunk with the -%% given offset (> start offset, =< end offset) is recorded. -%% We check every sub-chunk because the entropy is written on the sub-chunk level. -is_replica_2_9_entropy_recorded(PaddedEndOffset, StoreID) -> - ChunkBucketStart = get_chunk_bucket_start(PaddedEndOffset), - is_replica_2_9_entropy_recorded2(ChunkBucketStart, - ChunkBucketStart + ?DATA_CHUNK_SIZE, StoreID). - -is_replica_2_9_entropy_recorded2(Cursor, BucketEnd, _StoreID) - when Cursor >= BucketEnd -> - true; -is_replica_2_9_entropy_recorded2(Cursor, BucketEnd, StoreID) -> - %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new - %% sync_record id (ar_chunk_storage_replica_2_9_1_entropy) going forward. - %% The old id (ar_chunk_storage_replica_2_9_entropy) should not be used. - ID = ar_chunk_storage_replica_2_9_1_entropy, - case ar_sync_record:is_recorded(Cursor + 1, ID, StoreID) of - false -> - false; - true -> - SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, - is_replica_2_9_entropy_recorded2(Cursor + SubChunkSize, BucketEnd, StoreID) - end. - -update_replica_2_9_entropy_record(PaddedEndOffset, StoreID) -> - %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new - %% sync_record id (ar_chunk_storage_replica_2_9_1_entropy) going forward. - %% The old id (ar_chunk_storage_replica_2_9_entropy) should not be used. - ID = ar_chunk_storage_replica_2_9_1_entropy, - BucketEnd = get_chunk_bucket_end(PaddedEndOffset), - BucketStart = get_chunk_bucket_start(PaddedEndOffset), - ar_sync_record:add_async(replica_2_9_entropy, BucketEnd, BucketStart, ID, StoreID). - -delete_replica_2_9_entropy_record(PaddedEndOffset, StoreID) -> - %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new - %% sync_record id (ar_chunk_storage_replica_2_9_1_entropy) going forward. - %% The old id (ar_chunk_storage_replica_2_9_entropy) should not be used. - ID = ar_chunk_storage_replica_2_9_1_entropy, - BucketStart = get_chunk_bucket_start(PaddedEndOffset), - ar_sync_record:delete(BucketStart + ?DATA_CHUNK_SIZE, BucketStart, ID, StoreID). - -generate_missing_replica_2_9_entropy(PaddedEndOffset, RewardAddr) -> - Entropies = generate_entropies(RewardAddr, PaddedEndOffset, 0), - EntropyIndex = ar_replica_2_9:get_slice_index(PaddedEndOffset), - take_combined_entropy_by_index(Entropies, EntropyIndex). - -%% @doc Returns all the entropies needed to encipher the chunk at PaddedEndOffset. -%% ar_packing_server:get_replica_2_9_entropy/3 will query a cached entropy, or generate it -%% if it is not cached. -generate_entropies(_RewardAddr, _PaddedEndOffset, SubChunkStart) - when SubChunkStart == ?DATA_CHUNK_SIZE -> - []; -generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart) -> - SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, - [ar_packing_server:get_replica_2_9_entropy(RewardAddr, PaddedEndOffset, SubChunkStart) - | generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart + SubChunkSize)]. - -generate_entropy_keys(_RewardAddr, _Offset, SubChunkStart) - when SubChunkStart == ?DATA_CHUNK_SIZE -> - []; -generate_entropy_keys(RewardAddr, Offset, SubChunkStart) -> - SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, - [ar_replica_2_9:get_entropy_key(RewardAddr, Offset, SubChunkStart) - | generate_entropy_keys(RewardAddr, Offset, SubChunkStart + SubChunkSize)]. - -store_entropy( - _Entropies, PaddedEndOffset, _SubChunkStartOffset, - RangeEnd, _Keys, _RewardAddr, N, WaitN) when PaddedEndOffset > RangeEnd -> - %% The amount of entropy generated per partition is slightly more than the amount needed. - %% So at the end of a partition we will have finished processing chunks, but still have - %% some entropy left. In this case we stop the recursion early and wait for the writes - %% to complete. - wait_store_entropy_processes(WaitN), - {ok, N}; -store_entropy( - Entropies, PaddedEndOffset, SubChunkStartOffset, - RangeEnd, Keys, RewardAddr, N, WaitN) -> - case take_and_combine_entropy_slices(Entropies) of - {<<>>, []} -> - %% We've finished processing all the entropies, wait for the writes to complete. - wait_store_entropy_processes(WaitN), - {ok, N}; - {ChunkEntropy, Rest} -> - true = (ar_replica_2_9:get_entropy_partition(PaddedEndOffset) == - ar_replica_2_9:get_entropy_partition(RangeEnd)), - sanity_check_replica_2_9_entropy_keys(PaddedEndOffset, RewardAddr, - SubChunkStartOffset, Keys), - FindModule = - case ar_storage_module:get_strict( - PaddedEndOffset, {replica_2_9, RewardAddr}) of - not_found -> - ?LOG_WARNING([ - {event, failed_to_find_storage_module_for_2_9_entropy}, - {padded_end_offset, PaddedEndOffset}]), - not_found; - {ok, StoreID} -> - {ok, StoreID} - end, - case FindModule of - not_found -> - PaddedEndOffset2 = shift_replica_2_9_entropy_offset(PaddedEndOffset, 1), - store_entropy(Rest, PaddedEndOffset2, SubChunkStartOffset, RangeEnd, - Keys, RewardAddr, N, WaitN); - {ok, StoreID2} -> - From = self(), - spawn_link(fun() -> - StartTime = erlang:monotonic_time(), - - store_entropy2(ChunkEntropy, PaddedEndOffset, StoreID2, RewardAddr), - - EndTime = erlang:monotonic_time(), - ElapsedTime = erlang:convert_time_unit( - EndTime-StartTime, native, microsecond), - %% bytes per second - WriteRate = case ElapsedTime > 0 of - true -> 1000000 * byte_size(ChunkEntropy) div ElapsedTime; - false -> 0 - end, - prometheus_gauge:set(replica_2_9_entropy_store_rate, - [StoreID2], WriteRate), - From ! {store_entropy_sub_chunk_written, WaitN + 1} - end), - PaddedEndOffset2 = shift_replica_2_9_entropy_offset(PaddedEndOffset, 1), - store_entropy(Rest, PaddedEndOffset2, SubChunkStartOffset, RangeEnd, - Keys, RewardAddr, N + length(Keys), WaitN + 1) - end - end. - -%% @doc Take the first slice of each entropy and combine into a single binary. This binary -%% can be used to encipher a single chunk. --spec take_and_combine_entropy_slices(Entropies :: [binary()]) -> - {ChunkEntropy :: binary(), RemainingSlicesOfEachEntropy :: [binary()]}. -take_and_combine_entropy_slices(Entropies) -> - true = ?COMPOSITE_PACKING_SUB_CHUNK_COUNT == length(Entropies), - take_and_combine_entropy_slices(Entropies, [], []). - -take_and_combine_entropy_slices([], Acc, RestAcc) -> - {iolist_to_binary(Acc), lists:reverse(RestAcc)}; -take_and_combine_entropy_slices([<<>> | Entropies], _Acc, _RestAcc) -> - true = lists:all(fun(Entropy) -> Entropy == <<>> end, Entropies), - {<<>>, []}; -take_and_combine_entropy_slices([ - << EntropySlice:(?COMPOSITE_PACKING_SUB_CHUNK_SIZE)/binary, Rest/binary >> - | Entropies], Acc, RestAcc) -> - take_and_combine_entropy_slices(Entropies, [Acc | [EntropySlice]], [Rest | RestAcc]). - -take_combined_entropy_by_index(Entropies, Index) -> - take_combined_entropy_by_index(Entropies, Index, []). - -take_combined_entropy_by_index([], _Index, Acc) -> - iolist_to_binary(Acc); -take_combined_entropy_by_index([Entropy | Entropies], Index, Acc) -> - SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, - take_combined_entropy_by_index(Entropies, Index, - [Acc | [binary:part(Entropy, Index * SubChunkSize, SubChunkSize)]]). - -sanity_check_replica_2_9_entropy_keys( - _PaddedEndOffset, _RewardAddr, _SubChunkStartOffset, []) -> - ok; -sanity_check_replica_2_9_entropy_keys( - PaddedEndOffset, RewardAddr, SubChunkStartOffset, [Key | Keys]) -> - Key = ar_replica_2_9:get_entropy_key(RewardAddr, PaddedEndOffset, SubChunkStartOffset), - SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, - sanity_check_replica_2_9_entropy_keys(PaddedEndOffset, RewardAddr, - SubChunkStartOffset + SubChunkSize, Keys). - -wait_store_entropy_processes(0) -> - ok; -wait_store_entropy_processes(N) -> - receive {store_entropy_sub_chunk_written, N} -> - wait_store_entropy_processes(N - 1) - end. - -shift_replica_2_9_entropy_offset(Offset, SectorCount) -> - SectorSize = ar_replica_2_9:get_sector_size(), - get_chunk_bucket_end(ar_block:get_chunk_padded_offset(Offset + SectorSize * SectorCount)). - -store_entropy2(ChunkEntropy, PaddedEndOffset, StoreID, RewardAddr) -> - ChunkFileStart = get_chunk_file_start(PaddedEndOffset), - Filepath = filepath(ChunkFileStart, StoreID), - Size = byte_size(ChunkEntropy), - true = Size == ?DATA_CHUNK_SIZE, - {Position, _ChunkOffset} = get_position_and_relative_chunk_offset( - ChunkFileStart, PaddedEndOffset), - - %% We allow generating and filling it the 2.9 entropy and storing unpacked chunks (to - %% be enciphered later) asynchronously. Whatever comes first, is stored. - %% If the other counterpart is stored already, we read it, encipher and store the - %% packed chunk. - acquire_replica_2_9_semaphore(Filepath), - %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new - %% sync_record id (ar_chunk_storage_replica_2_9_1_unpacked) going forward. - %% The old id (ar_chunk_storage_replica_2_9_unpacked) should not be used. - ID = ar_chunk_storage_replica_2_9_1_unpacked, - IsUnpackedChunkRecorded = ar_sync_record:is_recorded(PaddedEndOffset, ID, StoreID), - SourceArgs = - case get_handle_by_filepath(Filepath) of - {error, _} = Error -> - Error; - F -> - case IsUnpackedChunkRecorded of - false -> - %% The entropy for the first sub-chunk of the chunk. - %% The zero-offset does not have a real meaning, it is set - %% to make sure we pass offset validation on read. - EntropyWithOffset = - << (get_special_zero_offset()):?OFFSET_BIT_SIZE, - ChunkEntropy/binary >>, - {no_unpacked_chunk, EntropyWithOffset, F}; - true -> - UnpackedChunk = file:pread(F, Position, Size), - {unpacked_chunk, UnpackedChunk, F} - end - end, - FinalArgs = - case SourceArgs of - {error, _} = Error2 -> - Error2; - {no_unpacked_chunk, Bin2, F2} -> - %% Write the entropy to disk - {Bin2, F2, false}; - {unpacked_chunk, {error, _} = Error2, F2} -> - {Error2, F2}; - {unpacked_chunk, {ok, Bin2}, F2} -> - %% Encipher the unpacked chunk with the entropy and write it back to disk - EncipheredChunk = ar_packing_server:encipher_replica_2_9_chunk( - iolist_to_binary(Bin2), ChunkEntropy), - {EncipheredChunk, F2, true} - end, - WriteResult = - case FinalArgs of - {error, _} = Error3 -> - Error3; - {{error, _}, F3} = Error3 -> - {Error3, F3}; - {Bin3, F3, IsComplete} -> - {file:pwrite(F3, Position, Bin3), F3, IsComplete} - end, - case WriteResult of - {error, Reason} = Error4 -> - ?LOG_ERROR([{event, failed_to_store_replica_2_9_sub_chunk_entropy}, - {file, Filepath}, {position, Position}, - {reason, io_lib:format("~p", [Reason])}]), - release_replica_2_9_semaphore(Filepath), - Error4; - {{error, Reason}, F4} = Error4 -> - file:close(F4), - ?LOG_ERROR([{event, failed_to_store_replica_2_9_sub_chunk_entropy}, - {file, Filepath}, {position, Position}, - {reason, io_lib:format("~p", [Reason])}]), - release_replica_2_9_semaphore(Filepath), - Error4; - {ok, F4, IsComplete2} -> - file:close(F4), - ets:insert(chunk_storage_file_index, - {{ChunkFileStart, StoreID}, Filepath}), - case update_replica_2_9_entropy_record(PaddedEndOffset, StoreID) of - ok -> - prometheus_counter:inc(replica_2_9_entropy_stored, - [StoreID], Size), - release_replica_2_9_semaphore(Filepath), - case IsComplete2 of - false -> - ok; - true -> - StartOffset = PaddedEndOffset - ?DATA_CHUNK_SIZE, - ar_sync_record:add_async(replica_2_9_entropy_with_chunk, - PaddedEndOffset, StartOffset, ?MODULE, StoreID), - ar_sync_record:add_async(replica_2_9_entropy_with_chunk, - PaddedEndOffset, StartOffset, {replica_2_9, RewardAddr}, - ar_data_sync, StoreID) - end; - Error5 -> - release_replica_2_9_semaphore(Filepath), - Error5 - end - end. - -acquire_replica_2_9_semaphore(Filepath) -> - case ets:insert_new(ar_chunk_storage, {{replica_2_9_semaphore, Filepath}}) of - false -> - timer:sleep(20), - acquire_replica_2_9_semaphore(Filepath); - true -> - ok +write_chunk(PaddedOffset, Chunk, FileIndex, StoreID) -> + {_ChunkFileStart, Filepath, Position, ChunkOffset} = + locate_chunk_on_disk(PaddedOffset, StoreID, FileIndex), + case get_handle_by_filepath(Filepath) of + {error, _} = Error -> + Error; + F -> + write_chunk2(PaddedOffset, ChunkOffset, Chunk, Filepath, F, Position) end. -release_replica_2_9_semaphore(Filepath) -> - ets:delete(ar_chunk_storage, {replica_2_9_semaphore, Filepath}). - -store_chunk(ChunkFileStart, PaddedOffset, Chunk, FileIndex, StoreID) -> - Filepath = filepath(ChunkFileStart, FileIndex, StoreID), - store_chunk2(ChunkFileStart, PaddedOffset, Chunk, Filepath). - filepath(ChunkFileStart, FileIndex, StoreID) -> case maps:get(ChunkFileStart, FileIndex, not_found) of not_found -> @@ -1197,17 +839,7 @@ get_handle_by_filepath(Filepath) -> F end. -store_chunk2(ChunkFileStart, PaddedOffset, Chunk, Filepath) -> - case get_handle_by_filepath(Filepath) of - {error, _} = Error -> - Error; - F -> - store_chunk3(ChunkFileStart, PaddedOffset, Chunk, Filepath, F) - end. - -store_chunk3(ChunkFileStart, PaddedOffset, Chunk, Filepath, F) -> - {Position, ChunkOffset} = get_position_and_relative_chunk_offset(ChunkFileStart, - PaddedOffset), +write_chunk2(PaddedOffset, ChunkOffset, Chunk, Filepath, F, Position) -> ChunkOffsetBinary = case ChunkOffset of 0 -> @@ -1247,11 +879,11 @@ get_position_and_relative_chunk_offset_by_start_offset(ChunkFileStart, BucketPic Position = RelativeOffset + ?OFFSET_SIZE * (RelativeOffset div ?DATA_CHUNK_SIZE), {Position, ChunkOffset}. -delete_chunk(PaddedOffset, ChunkFileStart, Filepath) -> +delete_chunk(PaddedOffset, StoreID) -> + {_ChunkFileStart, Filepath, Position, _ChunkOffset} = + locate_chunk_on_disk(PaddedOffset, StoreID), case file:open(Filepath, [read, write, raw]) of {ok, F} -> - {Position, _ChunkOffset} = - get_position_and_relative_chunk_offset(ChunkFileStart, PaddedOffset), ZeroChunk = case erlang:get(zero_chunk) of undefined -> @@ -1265,9 +897,9 @@ delete_chunk(PaddedOffset, ChunkFileStart, Filepath) -> Chunk -> Chunk end, - acquire_replica_2_9_semaphore(Filepath), + ar_entropy_storage:acquire_semaphore(Filepath), Result = file:pwrite(F, Position, ZeroChunk), - release_replica_2_9_semaphore(Filepath), + ar_entropy_storage:release_semaphore(Filepath), Result; {error, enoent} -> ok; diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 9308c1f1b..e7a5ab2b5 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -332,11 +332,11 @@ get_chunk(Offset, #{ packing := Packing } = Options) -> case {RequestOrigin, Pack} of {miner, _} -> StorageModules = ar_storage_module:get_all(Offset), - ar_sync_record:is_recorded_any(Offset, ?MODULE, StorageModules); + ar_sync_record:is_recorded_any(Offset, ar_data_sync, StorageModules); {_, false} -> - ar_sync_record:is_recorded(Offset, {?MODULE, Packing}); + ar_sync_record:is_recorded(Offset, {ar_data_sync, Packing}); {_, true} -> - ar_sync_record:is_recorded(Offset, ?MODULE) + ar_sync_record:is_recorded(Offset, ar_data_sync) end, SeekOffset = case maps:get(bucket_based_offset, Options, true) of @@ -350,16 +350,16 @@ get_chunk(Offset, #{ packing := Packing } = Options) -> get_chunk(Offset, SeekOffset, Pack, Packing, StoredPacking, StoreID, RequestOrigin); {true, StoreID} -> - UnpackedReply = ar_sync_record:is_recorded(Offset, {?MODULE, unpacked}), + UnpackedReply = ar_sync_record:is_recorded(Offset, {ar_data_sync, unpacked}), log_chunk_error(RequestOrigin, chunk_record_not_associated_with_packing, [{store_id, StoreID}, {seek_offset, SeekOffset}, {is_recorded_unpacked, io_lib:format("~p", [UnpackedReply])}]), {error, chunk_not_found}; Reply -> - UnpackedReply = ar_sync_record:is_recorded(Offset, {?MODULE, unpacked}), + UnpackedReply = ar_sync_record:is_recorded(Offset, {ar_data_sync, unpacked}), Modules = ar_storage_module:get_all(Offset), ModuleIDs = [ar_storage_module:id(Module) || Module <- Modules], - RootRecords = [ets:lookup(sync_records, {?MODULE, ID}) + RootRecords = [ets:lookup(sync_records, {ar_data_sync, ID}) || ID <- ModuleIDs], log_chunk_error(RequestOrigin, chunk_record_not_found, [{modules_covering_offset, ModuleIDs}, @@ -372,7 +372,8 @@ get_chunk(Offset, #{ packing := Packing } = Options) -> %% @doc Fetch the merkle proofs for the chunk corresponding to Offset. get_chunk_proof(Offset, Options) -> - IsRecorded = ar_sync_record:is_recorded(Offset, ?MODULE), + RequestOrigin = maps:get(origin, Options, unknown), + IsRecorded = ar_sync_record:is_recorded(Offset, ar_data_sync), SeekOffset = case maps:get(bucket_based_offset, Options, true) of true -> @@ -382,7 +383,7 @@ get_chunk_proof(Offset, Options) -> end, case IsRecorded of {{true, StoredPacking}, StoreID} -> - get_chunk_proof(Offset, SeekOffset, StoredPacking, StoreID); + get_chunk_proof(Offset, SeekOffset, StoredPacking, StoreID, RequestOrigin); _ -> {error, chunk_not_found} end. @@ -730,7 +731,7 @@ handle_cast({join, RecentBI}, State) -> [gen_server:cast(name(ar_storage_module:id(Module)), {cut, Offset}) || Module <- Config#config.storage_modules], ok = ar_chunk_storage:cut(Offset, StoreID), - ok = ar_sync_record:cut(Offset, ?MODULE, StoreID), + ok = ar_sync_record:cut(Offset, ar_data_sync, StoreID), ar_events:send(sync_record, {global_cut, Offset}), reset_orphaned_data_roots_disk_pool_timestamps(OrphanedDataRoots) end, @@ -749,7 +750,7 @@ handle_cast({join, RecentBI}, State) -> handle_cast({cut, Start}, #sync_data_state{ store_id = StoreID, range_end = End } = State) -> - case ar_sync_record:get_next_synced_interval(Start, End, ?MODULE, StoreID) of + case ar_sync_record:get_next_synced_interval(Start, End, ar_data_sync, StoreID) of not_found -> ok; _Interval -> @@ -766,7 +767,7 @@ handle_cast({cut, Start}, #sync_data_state{ store_id = StoreID, true -> ok = remove_chunks_index_range(Start, End, State), ok = ar_chunk_storage:cut(Start, StoreID), - ok = ar_sync_record:cut(Start, ?MODULE, StoreID) + ok = ar_sync_record:cut(Start, ar_data_sync, StoreID) end end, {noreply, State}; @@ -791,7 +792,7 @@ handle_cast({add_tip_block, BlockTXPairs, BI}, State) -> add_block_data_roots_to_disk_pool(AddedDataRoots), reset_orphaned_data_roots_disk_pool_timestamps(OrphanedDataRoots), ok = ar_chunk_storage:cut(BlockStartOffset, StoreID), - ok = ar_sync_record:cut(BlockStartOffset, ?MODULE, StoreID), + ok = ar_sync_record:cut(BlockStartOffset, ar_data_sync, StoreID), ar_events:send(sync_record, {global_cut, BlockStartOffset}), DiskPoolThreshold = get_disk_pool_threshold(BI), ets:insert(ar_data_sync_state, {disk_pool_threshold, DiskPoolThreshold}), @@ -1249,7 +1250,7 @@ handle_cast({remove_range, End, Cursor, Ref, PID}, State) -> %% we will not report false positives to peers, %% and the chunk can still be removed upon retry. RemoveFromSyncRecord = ar_sync_record:delete(PaddedOffset, - PaddedStartOffset, ?MODULE, StoreID), + PaddedStartOffset, ar_data_sync, StoreID), RemoveFromChunkStorage = case RemoveFromSyncRecord of ok -> @@ -1321,7 +1322,7 @@ handle_cast({remove_recently_processed_disk_pool_offset, Offset, ChunkDataKey}, {noreply, remove_recently_processed_disk_pool_offset(Offset, ChunkDataKey, State)}; handle_cast({request_default_unpacked_packing, Cursor, RightBound}, State) -> - case ar_sync_record:get_next_synced_interval(Cursor, RightBound, unpacked, ?MODULE, + case ar_sync_record:get_next_synced_interval(Cursor, RightBound, unpacked, ar_data_sync, "default") of not_found -> ok; @@ -1609,8 +1610,9 @@ get_chunk(Offset, SeekOffset, Pack, Packing, StoredPacking, StoreID, RequestOrig end end. -get_chunk_proof(Offset, SeekOffset, StoredPacking, StoreID) -> - case read_chunk_with_metadata(Offset, SeekOffset, StoredPacking, StoreID, false, false) of +get_chunk_proof(Offset, SeekOffset, StoredPacking, StoreID, RequestOrigin) -> + case read_chunk_with_metadata( + Offset, SeekOffset, StoredPacking, StoreID, false, RequestOrigin) of {error, Reason} -> {error, Reason}; {ok, DataPath, AbsoluteOffset, TXRoot, ChunkSize, TXPath} -> @@ -1625,6 +1627,11 @@ get_chunk_proof(Offset, SeekOffset, StoredPacking, StoreID) -> case CheckProof of error -> %% Proof was read but could not be validated. + log_chunk_error(RequestOrigin, chunk_proof_failed_validation, + [{offset, Offset}, + {seek_offset, SeekOffset}, + {stored_packing, ar_serialize:encode_packing(StoredPacking, true)}, + {store_id, StoreID}]), {error, chunk_not_found}; _ -> Proof = #{ data_path => DataPath, tx_path => TXPath }, @@ -1654,6 +1661,11 @@ read_chunk_with_metadata( {error, chunk_not_found}; {ok, _, {AbsoluteOffset, _, _, _, _, _, ChunkSize}} when AbsoluteOffset - SeekOffset >= ChunkSize -> + log_chunk_error(RequestOrigin, chunk_offset_mismatch, + [{absolute_offset, AbsoluteOffset}, + {seek_offset, SeekOffset}, + {store_id, StoreID}, + {stored_packing, ar_serialize:encode_packing(StoredPacking, true)}]), {error, chunk_not_found}; {ok, _, {AbsoluteOffset, ChunkDataKey, TXRoot, _, TXPath, _, ChunkSize}} -> ReadFun = @@ -1669,11 +1681,13 @@ read_chunk_with_metadata( ModuleIDs = [ar_storage_module:id(Module) || Module <- Modules], log_chunk_error(RequestOrigin, failed_to_read_chunk_data_path, [{seek_offset, SeekOffset}, + {absolute_offset, AbsoluteOffset}, {store_id, StoreID}, {stored_packing, ar_serialize:encode_packing(StoredPacking, true)}, {modules_covering_seek_offset, ModuleIDs}, - {chunk_data_key, ar_util:encode(ChunkDataKey)}]), + {chunk_data_key, ar_util:encode(ChunkDataKey)}, + {read_fun, ReadFun}]), invalidate_bad_data_record({SeekOffset - 1, AbsoluteOffset, StoreID, 1}), {error, chunk_not_found}; {error, Error} -> @@ -1683,12 +1697,12 @@ read_chunk_with_metadata( {absolute_end_offset, Offset}]), {error, failed_to_read_chunk}; {ok, {Chunk, DataPath}} -> - case ar_sync_record:is_recorded(Offset, StoredPacking, ?MODULE, + case ar_sync_record:is_recorded(Offset, StoredPacking, ar_data_sync, StoreID) of false -> Modules = ar_storage_module:get_all(SeekOffset), ModuleIDs = [ar_storage_module:id(Module) || Module <- Modules], - RootRecords = [ets:lookup(sync_records, {?MODULE, ID}) + RootRecords = [ets:lookup(sync_records, {ar_data_sync, ID}) || ID <- ModuleIDs], log_chunk_error(chunk_metadata_read_sync_record_race_condition, [{seek_offset, SeekOffset}, @@ -1726,7 +1740,7 @@ invalidate_bad_data_record({Start, End, StoreID, Case}) -> end, ?LOG_WARNING([{event, invalidating_bad_data_record}, {type, Case}, {range_start, PaddedStart2}, {range_end, PaddedEnd}]), - case ar_sync_record:delete(PaddedEnd, PaddedStart2, ?MODULE, StoreID) of + case ar_sync_record:delete(PaddedEnd, PaddedStart2, ar_data_sync, StoreID) of ok -> ar_sync_record:add(PaddedEnd, PaddedStart2, invalid_chunks, StoreID), case ar_kv:delete({chunks_index, StoreID}, << End:?OFFSET_KEY_BITSIZE >>) of @@ -1941,7 +1955,7 @@ init_kv(StoreID) -> filename:join(["storage_modules", StoreID, ?ROCKS_DB_DIR]) end, ok = ar_kv:open(filename:join(Dir, "ar_data_sync_db"), ColumnFamilyDescriptors, [], - [{?MODULE, StoreID}, {chunks_index, StoreID}, {data_root_index_old, StoreID}, + [{ar_data_sync, StoreID}, {chunks_index, StoreID}, {data_root_index_old, StoreID}, {data_root_offset_index, StoreID}, {tx_index, StoreID}, {tx_offset_index, StoreID}, {disk_pool_chunks_index_old, StoreID}, {migrations_index, StoreID}]), ok = ar_kv:open(filename:join(Dir, "ar_data_sync_chunk_db"), [{max_open_files, 10000}, @@ -2456,7 +2470,7 @@ get_unsynced_intervals_from_other_storage_modules(_TargetStoreID, _StoreID, Rang get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, RangeStart, RangeEnd, Intervals) -> FindNextMissing = - case ar_sync_record:get_next_synced_interval(RangeStart, RangeEnd, ?MODULE, + case ar_sync_record:get_next_synced_interval(RangeStart, RangeEnd, ar_data_sync, TargetStoreID) of not_found -> {request, {RangeStart, RangeEnd}}; @@ -2470,7 +2484,7 @@ get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, RangeS get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, End2, RangeEnd, Intervals); {request, {Cursor, RightBound}} -> - case ar_sync_record:get_next_synced_interval(Cursor, RightBound, ?MODULE, + case ar_sync_record:get_next_synced_interval(Cursor, RightBound, ar_data_sync, StoreID) of not_found -> get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, @@ -2682,7 +2696,7 @@ validate_data_path(DataRoot, Offset, TXSize, DataPath, Chunk) -> chunk_offsets_synced(_, _, _, _, N) when N == 0 -> true; chunk_offsets_synced(DataRootIndex, DataRootKey, ChunkOffset, TXStartOffset, N) -> - case ar_sync_record:is_recorded(TXStartOffset + ChunkOffset, ?MODULE) of + case ar_sync_record:is_recorded(TXStartOffset + ChunkOffset, ar_data_sync) of {{true, _}, _StoreID} -> case TXStartOffset of 0 -> @@ -2716,7 +2730,7 @@ get_chunk_data_key(DataPathHash) -> write_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> case ar_tx_blacklist:is_byte_blacklisted(Offset) of true -> - ok; + {ok, Packing}; false -> write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) @@ -2726,26 +2740,35 @@ write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Pa State) -> #sync_data_state{ chunk_data_db = ChunkDataDB, store_id = StoreID } = State, ShouldStoreInChunkStorage = ar_chunk_storage:is_storage_supported(Offset, ChunkSize, Packing), - Result = - case ShouldStoreInChunkStorage of - true -> - PaddedOffset = ar_block:get_chunk_padded_offset(Offset), - ar_chunk_storage:put(PaddedOffset, Chunk, StoreID); - false -> - ok - end, - case Result of - ok -> - case ShouldStoreInChunkStorage of - false -> - ar_kv:put(ChunkDataDB, ChunkDataKey, term_to_binary({Chunk, DataPath})); - true -> - ar_kv:put(ChunkDataDB, ChunkDataKey, term_to_binary(DataPath)) + case ShouldStoreInChunkStorage of + true -> + PaddedOffset = ar_block:get_chunk_padded_offset(Offset), + Result = ar_chunk_storage:put(PaddedOffset, Chunk, StoreID), + case Result of + {ok, NewPacking} -> + case ar_kv:put(ChunkDataDB, ChunkDataKey, term_to_binary(DataPath)) of + ok -> + {ok, NewPacking}; + Error -> + Error + end; + _ -> + ?LOG_ERROR([{event, failed_to_write_not_blacklisted_chunk_to_chunk_data_db}, + {offset, Offset}, {chunk_size, ChunkSize}, + {packing, ar_serialize:encode_packing(Packing, true)}, + {store_id, StoreID}]), + Result end; - _ -> - Result + false -> + case ar_kv:put(ChunkDataDB, ChunkDataKey, term_to_binary({Chunk, DataPath})) of + ok -> + {ok, Packing}; + Error -> + Error + end end. + update_chunks_index(Args, State) -> AbsoluteChunkOffset = element(1, Args), case ar_tx_blacklist:is_byte_blacklisted(AbsoluteChunkOffset) of @@ -2765,7 +2788,7 @@ update_chunks_index2(Args, State) -> ok -> StartOffset = ar_block:get_chunk_padded_offset(AbsoluteOffset - ChunkSize), PaddedOffset = ar_block:get_chunk_padded_offset(AbsoluteOffset), - case ar_sync_record:add(PaddedOffset, StartOffset, Packing, ?MODULE, StoreID) of + case ar_sync_record:add(PaddedOffset, StartOffset, Packing, ar_data_sync, StoreID) of ok -> ok; {error, Reason} -> @@ -2816,7 +2839,7 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) -> {noreply, State}; true -> #sync_data_state{ store_id = StoreID } = State, - case ar_sync_record:is_recorded(Byte + 1, ?MODULE, StoreID) of + case ar_sync_record:is_recorded(Byte + 1, ar_data_sync, StoreID) of {true, _} -> %% The chunk has been synced by another job already. decrement_chunk_cache_size(), @@ -2950,7 +2973,7 @@ store_chunk2(ChunkArgs, Args, State) -> %% The 2.9 chunk storage is write-once. ok; _ -> - ar_sync_record:delete(PaddedOffset, StartOffset, ?MODULE, StoreID) + ar_sync_record:delete(PaddedOffset, StartOffset, ar_data_sync, StoreID) end, case CleanRecord of {error, Reason} -> @@ -2969,17 +2992,8 @@ store_chunk2(ChunkArgs, Args, State) -> StoreIndex = case write_chunk(AbsoluteOffset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) of - ok -> - Packing2 = - case Packing of - unpacked_padded -> - ar_storage_module:get_packing(StoreID); - _ -> - Packing - end, - {true, Packing2}; - {error, stored_without_entropy} -> - {true, unpacked_padded}; + {ok, NewPacking} -> + {true, NewPacking}; Error -> Error end, @@ -3021,7 +3035,7 @@ log_failed_to_store_chunk(already_stored, {store_id, StoreID}]), PaddedEndOffset = ar_block:get_chunk_padded_offset(AbsoluteOffset), ar_sync_record:add_async(already_store_chunk_ensure_sync_record, - PaddedEndOffset, PaddedEndOffset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID); + PaddedEndOffset, PaddedEndOffset - ?DATA_CHUNK_SIZE, ar_data_sync, StoreID); log_failed_to_store_chunk(Reason, AbsoluteOffset, Offset, DataRoot, DataPathHash, StoreID) -> ?LOG_ERROR([{event, failed_to_store_chunk}, {reason, io_lib:format("~p", [Reason])}, @@ -3162,9 +3176,9 @@ delete_disk_pool_chunk(Iterator, Args, State) -> PaddedOffset = ar_block:get_chunk_padded_offset(AbsoluteOffset), StartOffset = ar_block:get_chunk_padded_offset( AbsoluteOffset - ChunkSize), - ok = ar_sync_record:delete(PaddedOffset, StartOffset, ?MODULE, + ok = ar_sync_record:delete(PaddedOffset, StartOffset, ar_data_sync, StoreID), - case ar_sync_record:is_recorded(PaddedOffset, ?MODULE) of + case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync) of false -> ar_events:send(sync_record, {global_remove_range, StartOffset, PaddedOffset}); @@ -3260,7 +3274,7 @@ process_disk_pool_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, MayConc process_disk_pool_immature_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, Args, State) -> #sync_data_state{ store_id = StoreID } = State, - case ar_sync_record:is_recorded(AbsoluteOffset, ?MODULE, StoreID) of + case ar_sync_record:is_recorded(AbsoluteOffset, ar_data_sync, StoreID) of {true, unpacked} -> %% Pass MayConclude as false because we have encountered an offset %% above the disk pool threshold => we need to keep the chunk in the @@ -3334,7 +3348,7 @@ process_disk_pool_matured_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, {noreply, State3} -> {noreply, State3}; StoreID3 -> - case ar_sync_record:is_recorded(AbsoluteOffset, ?MODULE, StoreID3) of + case ar_sync_record:is_recorded(AbsoluteOffset, ar_data_sync, StoreID3) of {true, _Packing} -> gen_server:cast(self(), {process_disk_pool_chunk_offsets, Iterator, MayConclude, Args}), diff --git a/apps/arweave/src/ar_entropy_storage.erl b/apps/arweave/src/ar_entropy_storage.erl new file mode 100644 index 000000000..59f071efe --- /dev/null +++ b/apps/arweave/src/ar_entropy_storage.erl @@ -0,0 +1,381 @@ +-module(ar_entropy_storage). + +-export([acquire_semaphore/1, release_semaphore/1, is_recorded/2, is_sub_chunk_recorded/3, + delete_record/2, generate_entropies/3, generate_missing_entropy/2, generate_entropy_keys/3, + shift_entropy_offset/2, store_entropy/8, record_chunk/6]). + +-include_lib("arweave/include/ar.hrl"). +-include_lib("arweave/include/ar_chunk_storage.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%% @doc Return true if the given sub-chunk bucket contains the 2.9 entropy. +is_sub_chunk_recorded(PaddedEndOffset, SubChunkBucketStartOffset, StoreID) -> + %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new + %% sync_record id (ar_chunk_storage_replica_2_9_1_entropy) going forward. + %% The old id (ar_chunk_storage_replica_2_9_entropy) should not be used. + ID = ar_chunk_storage_replica_2_9_1_entropy, + ChunkBucketStart = ar_chunk_storage:get_chunk_bucket_start(PaddedEndOffset), + SubChunkBucketStart = ChunkBucketStart + SubChunkBucketStartOffset, + ar_sync_record:is_recorded(SubChunkBucketStart + 1, ID, StoreID). + +%% @doc Return true if the 2.9 entropy for every sub-chunk of the chunk with the +%% given offset (> start offset, =< end offset) is recorded. +%% We check every sub-chunk because the entropy is written on the sub-chunk level. +is_recorded(PaddedEndOffset, StoreID) -> + ChunkBucketStart = ar_chunk_storage:get_chunk_bucket_start(PaddedEndOffset), + is_recorded2(ChunkBucketStart, + ChunkBucketStart + ?DATA_CHUNK_SIZE, + StoreID). + +is_recorded2(Cursor, BucketEnd, _StoreID) when Cursor >= BucketEnd -> + true; +is_recorded2(Cursor, BucketEnd, StoreID) -> + %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new + %% sync_record id (ar_chunk_storage_replica_2_9_1_entropy) going forward. + %% The old id (ar_chunk_storage_replica_2_9_entropy) should not be used. + ID = ar_chunk_storage_replica_2_9_1_entropy, + case ar_sync_record:is_recorded(Cursor + 1, ID, StoreID) of + false -> + false; + true -> + SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, + is_recorded2(Cursor + SubChunkSize, BucketEnd, StoreID) + end. + +update_sync_records(IsComplete, PaddedEndOffset, StoreID, RewardAddr) -> + %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new + %% sync_record id (ar_chunk_storage_replica_2_9_1_entropy) going forward. + %% The old id (ar_chunk_storage_replica_2_9_entropy) should not be used. + ID = ar_chunk_storage_replica_2_9_1_entropy, + BucketEnd = ar_chunk_storage:get_chunk_bucket_end(PaddedEndOffset), + BucketStart = ar_chunk_storage:get_chunk_bucket_start(PaddedEndOffset), + ar_sync_record:add_async(replica_2_9_entropy, BucketEnd, BucketStart, ID, StoreID), + prometheus_counter:inc(replica_2_9_entropy_stored, [StoreID], ?DATA_CHUNK_SIZE), + case IsComplete of + true -> + StartOffset = PaddedEndOffset - ?DATA_CHUNK_SIZE, + ar_sync_record:add_async(replica_2_9_entropy_with_chunk, + PaddedEndOffset, + StartOffset, + ar_chunk_storage, + StoreID), + ar_sync_record:add_async(replica_2_9_entropy_with_chunk, + PaddedEndOffset, + StartOffset, + {replica_2_9, RewardAddr}, + ar_data_sync, + StoreID); + false -> + ok + end. + + + +delete_record(PaddedEndOffset, StoreID) -> + %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new + %% sync_record id (ar_chunk_storage_replica_2_9_1_entropy) going forward. + %% The old id (ar_chunk_storage_replica_2_9_entropy) should not be used. + ID = ar_chunk_storage_replica_2_9_1_entropy, + BucketStart = ar_chunk_storage:get_chunk_bucket_start(PaddedEndOffset), + ar_sync_record:delete(BucketStart + ?DATA_CHUNK_SIZE, BucketStart, ID, StoreID). + +generate_missing_entropy(PaddedEndOffset, RewardAddr) -> + Entropies = generate_entropies(RewardAddr, PaddedEndOffset, 0), + EntropyIndex = ar_replica_2_9:get_slice_index(PaddedEndOffset), + take_combined_entropy_by_index(Entropies, EntropyIndex). + +%% @doc Returns all the entropies needed to encipher the chunk at PaddedEndOffset. +%% ar_packing_server:get_replica_2_9_entropy/3 will query a cached entropy, or generate it +%% if it is not cached. +generate_entropies(_RewardAddr, _PaddedEndOffset, SubChunkStart) + when SubChunkStart == ?DATA_CHUNK_SIZE -> + []; +generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart) -> + SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, + [ar_packing_server:get_replica_2_9_entropy(RewardAddr, PaddedEndOffset, SubChunkStart) + | generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart + SubChunkSize)]. + +generate_entropy_keys(_RewardAddr, _Offset, SubChunkStart) + when SubChunkStart == ?DATA_CHUNK_SIZE -> + []; +generate_entropy_keys(RewardAddr, Offset, SubChunkStart) -> + SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, + [ar_replica_2_9:get_entropy_key(RewardAddr, Offset, SubChunkStart) + | generate_entropy_keys(RewardAddr, Offset, SubChunkStart + SubChunkSize)]. + +store_entropy(_Entropies, + PaddedEndOffset, + _SubChunkStartOffset, + RangeEnd, + _Keys, + _RewardAddr, + N, + WaitN) + when PaddedEndOffset > RangeEnd -> + %% The amount of entropy generated per partition is slightly more than the amount needed. + %% So at the end of a partition we will have finished processing chunks, but still have + %% some entropy left. In this case we stop the recursion early and wait for the writes + %% to complete. + wait_store_entropy_processes(WaitN), + {ok, N}; +store_entropy(Entropies, + PaddedEndOffset, + SubChunkStartOffset, + RangeEnd, + Keys, + RewardAddr, + N, + WaitN) -> + case take_and_combine_entropy_slices(Entropies) of + {<<>>, []} -> + %% We've finished processing all the entropies, wait for the writes to complete. + wait_store_entropy_processes(WaitN), + {ok, N}; + {ChunkEntropy, Rest} -> + true = + ar_replica_2_9:get_entropy_partition(PaddedEndOffset) + == ar_replica_2_9:get_entropy_partition(RangeEnd), + sanity_check_replica_2_9_entropy_keys(PaddedEndOffset, + RewardAddr, + SubChunkStartOffset, + Keys), + FindModule = + case ar_storage_module:get_strict(PaddedEndOffset, {replica_2_9, RewardAddr}) of + not_found -> + ?LOG_WARNING([{event, failed_to_find_storage_module_for_2_9_entropy}, + {padded_end_offset, PaddedEndOffset}]), + not_found; + {ok, StoreID} -> + {ok, StoreID} + end, + case FindModule of + not_found -> + PaddedEndOffset2 = shift_entropy_offset(PaddedEndOffset, 1), + store_entropy(Rest, + PaddedEndOffset2, + SubChunkStartOffset, + RangeEnd, + Keys, + RewardAddr, + N, + WaitN); + {ok, StoreID2} -> + From = self(), + spawn_link(fun() -> + StartTime = erlang:monotonic_time(), + + record_entropy(ChunkEntropy, + PaddedEndOffset, + StoreID2, + RewardAddr), + + EndTime = erlang:monotonic_time(), + ElapsedTime = + erlang:convert_time_unit(EndTime - StartTime, + native, + microsecond), + %% bytes per second + WriteRate = + case ElapsedTime > 0 of + true -> 1000000 * byte_size(ChunkEntropy) div ElapsedTime; + false -> 0 + end, + prometheus_gauge:set(replica_2_9_entropy_store_rate, + [StoreID2], + WriteRate), + From ! {store_entropy_sub_chunk_written, WaitN + 1} + end), + PaddedEndOffset2 = shift_entropy_offset(PaddedEndOffset, 1), + store_entropy(Rest, + PaddedEndOffset2, + SubChunkStartOffset, + RangeEnd, + Keys, + RewardAddr, + N + length(Keys), + WaitN + 1) + end + end. + +record_chunk(PaddedEndOffset, Chunk, RewardAddr, StoreID, FileIndex, IsPrepared) -> + StartOffset = PaddedEndOffset - ?DATA_CHUNK_SIZE, + {_ChunkFileStart, Filepath, _Position, _ChunkOffset} = + ar_chunk_storage:locate_chunk_on_disk(PaddedEndOffset, StoreID), + acquire_semaphore(Filepath), + CheckIsStoredAlready = + ar_sync_record:is_recorded(PaddedEndOffset, ar_chunk_storage, StoreID), + CheckIsEntropyRecorded = + case CheckIsStoredAlready of + true -> + {error, already_stored}; + false -> + is_recorded(PaddedEndOffset, StoreID) + end, + ReadEntropy = + case CheckIsEntropyRecorded of + {error, _} = Error -> + Error; + false -> + case IsPrepared of + false -> + no_entropy_yet; + true -> + missing_entropy + end; + true -> + ar_chunk_storage:get(StartOffset, StartOffset, StoreID) + end, + case ReadEntropy of + {error, _} = Error2 -> + release_semaphore(Filepath), + Error2; + not_found -> + release_semaphore(Filepath), + {error, not_prepared_yet2}; + missing_entropy -> + Packing = {replica_2_9, RewardAddr}, + Entropy = generate_missing_entropy(PaddedEndOffset, RewardAddr), + PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy), + Result = ar_chunk_storage:record_chunk(PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex), + release_semaphore(Filepath), + Result; + no_entropy_yet -> + Result = ar_chunk_storage:record_chunk(PaddedEndOffset, Chunk, unpacked_padded, StoreID, FileIndex), + release_semaphore(Filepath), + Result; + {_EndOffset, Entropy} -> + Packing = {replica_2_9, RewardAddr}, + release_semaphore(Filepath), + PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy), + ar_chunk_storage:record_chunk(PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex) + end. + +record_entropy(ChunkEntropy, PaddedEndOffset, StoreID, RewardAddr) -> + + true = byte_size(ChunkEntropy) == ?DATA_CHUNK_SIZE, + + IsUnpackedChunkRecorded = ar_sync_record:is_recorded( + PaddedEndOffset, ar_chunk_storage:sync_record_id(unpacked_padded), StoreID), + + {ChunkFileStart, Filepath, _Position, _ChunkOffset} = + ar_chunk_storage:locate_chunk_on_disk(PaddedEndOffset, StoreID), + + %% We allow generating and filling it the 2.9 entropy and storing unpacked chunks (to + %% be enciphered later) asynchronously. Whatever comes first, is stored. + %% If the other counterpart is stored already, we read it, encipher and store the + %% packed chunk. + acquire_semaphore(Filepath), + + Chunk = case IsUnpackedChunkRecorded of + true -> + StartOffset = PaddedEndOffset - ?DATA_CHUNK_SIZE, + case ar_chunk_storage:get(StartOffset, StartOffset, StoreID) of + {error, _} = Error -> + Error; + {_, UnpackedChunk} -> + ar_packing_server:encipher_replica_2_9_chunk(UnpackedChunk, ChunkEntropy) + end; + false -> + %% The entropy for the first sub-chunk of the chunk. + %% The zero-offset does not have a real meaning, it is set + %% to make sure we pass offset validation on read. + ChunkEntropy + end, + Result = case Chunk of + {error, _} = Error2 -> + Error2; + _ -> + case ar_chunk_storage:write_chunk(PaddedEndOffset, Chunk, #{}, StoreID) of + {ok, Filepath} -> + ets:insert(chunk_storage_file_index, {{ChunkFileStart, StoreID}, Filepath}), + update_sync_records(IsUnpackedChunkRecorded, PaddedEndOffset, StoreID, RewardAddr); + Error2 -> + Error2 + end + end, + + case Result of + {error, Reason} -> + ?LOG_ERROR([{event, failed_to_store_replica_2_9_sub_chunk_entropy}, + {filepath, Filepath}, + {padded_end_offset, PaddedEndOffset}, + {store_id, StoreID}, + {reason, io_lib:format("~p", [Reason])}]); + _ -> + ok + end, + + release_semaphore(Filepath). + + +%% @doc Take the first slice of each entropy and combine into a single binary. This binary +%% can be used to encipher a single chunk. +-spec take_and_combine_entropy_slices(Entropies :: [binary()]) -> + {ChunkEntropy :: binary(), + RemainingSlicesOfEachEntropy :: [binary()]}. +take_and_combine_entropy_slices(Entropies) -> + true = ?COMPOSITE_PACKING_SUB_CHUNK_COUNT == length(Entropies), + take_and_combine_entropy_slices(Entropies, [], []). + +take_and_combine_entropy_slices([], Acc, RestAcc) -> + {iolist_to_binary(Acc), lists:reverse(RestAcc)}; +take_and_combine_entropy_slices([<<>> | Entropies], _Acc, _RestAcc) -> + true = lists:all(fun(Entropy) -> Entropy == <<>> end, Entropies), + {<<>>, []}; +take_and_combine_entropy_slices([<> + | Entropies], + Acc, + RestAcc) -> + take_and_combine_entropy_slices(Entropies, [Acc, EntropySlice], [Rest | RestAcc]). + +take_combined_entropy_by_index(Entropies, Index) -> + take_combined_entropy_by_index(Entropies, Index, []). + +take_combined_entropy_by_index([], _Index, Acc) -> + iolist_to_binary(Acc); +take_combined_entropy_by_index([Entropy | Entropies], Index, Acc) -> + SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, + take_combined_entropy_by_index(Entropies, + Index, + [Acc, binary:part(Entropy, Index * SubChunkSize, SubChunkSize)]). + +sanity_check_replica_2_9_entropy_keys(_PaddedEndOffset, + _RewardAddr, + _SubChunkStartOffset, + []) -> + ok; +sanity_check_replica_2_9_entropy_keys(PaddedEndOffset, + RewardAddr, + SubChunkStartOffset, + [Key | Keys]) -> + Key = ar_replica_2_9:get_entropy_key(RewardAddr, PaddedEndOffset, SubChunkStartOffset), + SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, + sanity_check_replica_2_9_entropy_keys(PaddedEndOffset, + RewardAddr, + SubChunkStartOffset + SubChunkSize, + Keys). + +wait_store_entropy_processes(0) -> + ok; +wait_store_entropy_processes(N) -> + receive + {store_entropy_sub_chunk_written, N} -> + wait_store_entropy_processes(N - 1) + end. + +shift_entropy_offset(Offset, SectorCount) -> + SectorSize = ar_replica_2_9:get_sector_size(), + ar_chunk_storage:get_chunk_bucket_end(ar_block:get_chunk_padded_offset(Offset + SectorSize * SectorCount)). + +acquire_semaphore(Filepath) -> + case ets:insert_new(ar_entropy_storage, {{semaphore, Filepath}}) of + false -> + timer:sleep(20), + acquire_semaphore(Filepath); + true -> + ok + end. + +release_semaphore(Filepath) -> + ets:delete(ar_entropy_storage, {semaphore, Filepath}). diff --git a/apps/arweave/src/ar_metrics.erl b/apps/arweave/src/ar_metrics.erl index 5e3730c1d..a2b9459ad 100644 --- a/apps/arweave/src/ar_metrics.erl +++ b/apps/arweave/src/ar_metrics.erl @@ -475,10 +475,8 @@ register() -> {help, "The number of chunks in the packing server queue."}]), prometheus_gauge:new([{name, chunk_cache_size}, {help, "The number of chunks scheduled for downloading."}]), - prometheus_counter:new([{name, chunks_without_entropy_stored}, - {help, "The counter is incremented every time a 2.9 unpacked chunk is written to " - "chunk_storage to be enciphered later."}]), prometheus_counter:new([{name, chunks_stored}, + {labels, [packing]}, {help, "The counter is incremented every time a chunk is written to " "chunk_storage."}]), diff --git a/apps/arweave/src/ar_node_worker.erl b/apps/arweave/src/ar_node_worker.erl index 65f65e7ee..6f785d53f 100644 --- a/apps/arweave/src/ar_node_worker.erl +++ b/apps/arweave/src/ar_node_worker.erl @@ -2132,6 +2132,8 @@ handle_found_solution(Args, PrevB, State) -> {solution, ar_util:encode(SolutionH)}, {height, Height}, {step_number, StepNumber}, {steps, length(Steps)}, {txs, length(B#block.txs)}, + {recall_byte1, B#block.recall_byte}, + {recall_byte2, B#block.recall_byte2}, {chunks, case B#block.recall_byte2 of undefined -> 1; diff --git a/apps/arweave/src/ar_storage_module.erl b/apps/arweave/src/ar_storage_module.erl index fc53dd868..03bbebf9d 100644 --- a/apps/arweave/src/ar_storage_module.erl +++ b/apps/arweave/src/ar_storage_module.erl @@ -219,7 +219,7 @@ get_strict(Offset, Packing) -> RepackInPlaceModulesStoreIDs = [ {{BucketSize, Bucket, TargetPacking}, ar_storage_module:id(Module)} || {{BucketSize, Bucket, _Packing} = Module, TargetPacking} <- Config#config.repack_in_place_storage_modules], - ModuleStoreIDs = [ar_storage_module:id(Module) + ModuleStoreIDs = [{Module, ar_storage_module:id(Module)} || Module <- Config#config.storage_modules], get_strict(Offset, Packing, ModuleStoreIDs ++ RepackInPlaceModulesStoreIDs). diff --git a/apps/arweave/src/ar_sup.erl b/apps/arweave/src/ar_sup.erl index 0f77ad26f..fb13295a2 100644 --- a/apps/arweave/src/ar_sup.erl +++ b/apps/arweave/src/ar_sup.erl @@ -47,6 +47,7 @@ init([]) -> ets:new(ar_data_sync_worker_master, [set, public, named_table]), ets:new(ar_data_sync_state, [set, public, named_table, {read_concurrency, true}]), ets:new(ar_chunk_storage, [set, public, named_table]), + ets:new(ar_entropy_storage, [set, public, named_table]), ets:new(ar_mining_stats, [set, public, named_table]), ets:new(ar_global_sync_record, [set, public, named_table]), ets:new(ar_disk_pool_data_roots, [set, public, named_table, {read_concurrency, true}]), diff --git a/apps/arweave/src/ar_sync_record.erl b/apps/arweave/src/ar_sync_record.erl index 0bf56517a..7218968e7 100644 --- a/apps/arweave/src/ar_sync_record.erl +++ b/apps/arweave/src/ar_sync_record.erl @@ -40,7 +40,7 @@ %% this record can remain very small compared to the space taken by %% chunk identifiers, whose number grows unlimited with time. sync_record_by_id, - %% A map {ID, Type} => Intervals. + %% A map {ID, Packing} => Intervals. sync_record_by_id_type, %% The name of the WAL store. state_db, @@ -75,9 +75,9 @@ get(ID, StoreID) -> end. %% @doc Return the set of intervals. -get(ID, Type, StoreID) -> +get(ID, Packing, StoreID) -> GenServerID = list_to_atom("ar_sync_record_" ++ ar_storage_module:label_by_id(StoreID)), - case catch gen_server:call(GenServerID, {get, Type, ID}, 20000) of + case catch gen_server:call(GenServerID, {get, Packing, ID}, 20000) of {'EXIT', {timeout, {gen_server, call, _}}} -> {error, timeout}; Reply -> @@ -96,11 +96,11 @@ add(End, Start, ID, StoreID) -> end. %% @doc Add the given interval to the record with the -%% given ID and Type. Store the changes on disk before +%% given ID and Packing. Store the changes on disk before %% returning ok. -add(End, Start, Type, ID, StoreID) -> +add(End, Start, Packing, ID, StoreID) -> GenServerID = list_to_atom("ar_sync_record_" ++ ar_storage_module:label_by_id(StoreID)), - case catch gen_server:call(GenServerID, {add, End, Start, Type, ID}, 120000) of + case catch gen_server:call(GenServerID, {add, End, Start, Packing, ID}, 120000) of {'EXIT', {timeout, {gen_server, call, _}}} -> {error, timeout}; Reply -> @@ -114,9 +114,9 @@ add_async(Event, End, Start, ID, StoreID) -> %% @doc Special case of add/5 for repacked chunks. When repacking the ar_sync_record add %% happens at the end so we don't need to block on it to complete. -add_async(Event, End, Start, Type, ID, StoreID) -> +add_async(Event, End, Start, Packing, ID, StoreID) -> GenServerID = list_to_atom("ar_sync_record_" ++ ar_storage_module:label_by_id(StoreID)), - gen_server:cast(GenServerID, {add_async, Event, End, Start, Type, ID}). + gen_server:cast(GenServerID, {add_async, Event, End, Start, Packing, ID}). %% @doc Remove the given interval from the record %% with the given ID. Store the changes on disk before @@ -142,7 +142,7 @@ cut(Offset, ID, StoreID) -> Reply end. -%% @doc Return {true, StoreID} or {{true, Type}, StoreID} if a chunk containing +%% @doc Return {true, StoreID} or {{true, Packing}, StoreID} if a chunk containing %% the given Offset is found in the record with the given ID, false otherwise. %% If several types are recorded for the chunk, only one of them is returned, %% the choice is not defined. If the chunk is stored in the default storage module, @@ -152,14 +152,14 @@ cut(Offset, ID, StoreID) -> %% The offset is 1-based - if a chunk consists of a single %% byte that is the first byte of the weave, is_recorded(0, ID) %% returns false and is_recorded(1, ID) returns true. -is_recorded(Offset, {ID, Type}) -> - case is_recorded(Offset, Type, ID, "default") of +is_recorded(Offset, {ID, Packing}) -> + case is_recorded(Offset, Packing, ID, "default") of true -> - {{true, Type}, "default"}; + {{true, Packing}, "default"}; false -> StorageModules = [Module - || {_, _, Packing} = Module <- ar_storage_module:get_all(Offset), - Packing == Type], + || {_, _, ModulePacking} = Module <- ar_storage_module:get_all(Offset), + ModulePacking == Packing], is_recorded_any_by_type(Offset, ID, StorageModules) end; is_recorded(Offset, ID) -> @@ -171,7 +171,7 @@ is_recorded(Offset, ID) -> {Reply, "default"} end. -%% @doc Return true or {true, Type} if a chunk containing +%% @doc Return true or {true, Packing} if a chunk containing %% the given Offset is found in the record with the given ID %% in the storage module identified by StoreID, false otherwise. is_recorded(Offset, ID, StoreID) -> @@ -186,17 +186,17 @@ is_recorded(Offset, ID, StoreID) -> case is_recorded2(Offset, ets:first(sync_records), ID, StoreID) of false -> true; - {true, Type} -> - {true, Type} + {true, Packing} -> + {true, Packing} end end end. -%% @doc Return true if a chunk containing the given Offset and Type +%% @doc Return true if a chunk containing the given Offset and Packing %% is found in the record in the storage module identified by StoreID, %% false otherwise. -is_recorded(Offset, Type, ID, StoreID) -> - case ets:lookup(sync_records, {ID, Type, StoreID}) of +is_recorded(Offset, Packing, ID, StoreID) -> + case ets:lookup(sync_records, {ID, Packing, StoreID}) of [] -> false; [{_, TID}] -> @@ -228,8 +228,8 @@ get_next_unsynced_interval(Offset, EndOffsetUpperBound, ID, StoreID) -> %% @doc Return the lowest synced interval with the end offset strictly above the given Offset %% and at most EndOffsetUpperBound. %% Return not_found if there are no such intervals. -get_next_synced_interval(Offset, EndOffsetUpperBound, Type, ID, StoreID) -> - case ets:lookup(sync_records, {ID, Type, StoreID}) of +get_next_synced_interval(Offset, EndOffsetUpperBound, Packing, ID, StoreID) -> + case ets:lookup(sync_records, {ID, Packing, StoreID}) of [] -> not_found; [{_, TID}] -> @@ -296,16 +296,16 @@ handle_call({get, ID}, _From, State) -> #state{ sync_record_by_id = SyncRecordByID } = State, {reply, maps:get(ID, SyncRecordByID, ar_intervals:new()), State}; -handle_call({get, Type, ID}, _From, State) -> +handle_call({get, Packing, ID}, _From, State) -> #state{ sync_record_by_id_type = SyncRecordByIDType } = State, - {reply, maps:get({ID, Type}, SyncRecordByIDType, ar_intervals:new()), State}; + {reply, maps:get({ID, Packing}, SyncRecordByIDType, ar_intervals:new()), State}; handle_call({add, End, Start, ID}, _From, State) -> {Reply, State2} = add2(End, Start, ID, State), {reply, Reply, State2}; -handle_call({add, End, Start, Type, ID}, _From, State) -> - {Reply, State2} = add2(End, Start, Type, ID, State), +handle_call({add, End, Start, Packing, ID}, _From, State) -> + {Reply, State2} = add2(End, Start, Packing, ID, State), {reply, Reply, State2}; handle_call({delete, End, Start, ID}, _From, State) -> @@ -413,21 +413,21 @@ handle_cast({add_async, Event, End, Start, ID}, State) -> end, {noreply, State2}; -handle_cast({add_async, Event, End, Start, Type, ID}, State) -> - {Reply, State2} = add2(End, Start, Type, ID, State), +handle_cast({add_async, Event, End, Start, Packing, ID}, State) -> + {Reply, State2} = add2(End, Start, Packing, ID, State), case Reply of ok -> ?LOG_DEBUG([{event, Event}, {status, success}, {sync_record_id, ID}, {offset, End}, - {packing, ar_serialize:encode_packing(Type, true)}]); + {packing, ar_serialize:encode_packing(Packing, true)}]); Error -> ?LOG_ERROR([{event, Event}, {status, failed}, {sync_record_id, ID}, {offset, End}, - {packing, ar_serialize:encode_packing(Type, true)}, + {packing, ar_serialize:encode_packing(Packing, true)}, {error, io_lib:format("~p", [Error])}]) end, {noreply, State2}; @@ -466,13 +466,13 @@ add2(End, Start, ID, State) -> end, {Reply, State3}. -add2(End, Start, Type, ID, State) -> +add2(End, Start, Packing, ID, State) -> #state{ sync_record_by_id = SyncRecordByID, sync_record_by_id_type = SyncRecordByIDType, state_db = StateDB, store_id = StoreID } = State, - ByType = maps:get({ID, Type}, SyncRecordByIDType, ar_intervals:new()), + ByType = maps:get({ID, Packing}, SyncRecordByIDType, ar_intervals:new()), ByType2 = ar_intervals:add(ByType, End, Start), - SyncRecordByIDType2 = maps:put({ID, Type}, ByType2, SyncRecordByIDType), - TypeTID = get_or_create_type_tid({ID, Type, StoreID}), + SyncRecordByIDType2 = maps:put({ID, Packing}, ByType2, SyncRecordByIDType), + TypeTID = get_or_create_type_tid({ID, Packing, StoreID}), ar_ets_intervals:add(TypeTID, End, Start), SyncRecord = maps:get(ID, SyncRecordByID, ar_intervals:new()), SyncRecord2 = ar_intervals:add(SyncRecord, End, Start), @@ -481,7 +481,7 @@ add2(End, Start, Type, ID, State) -> ar_ets_intervals:add(TID, End, Start), State2 = State#state{ sync_record_by_id = SyncRecordByID2, sync_record_by_id_type = SyncRecordByIDType2 }, - {Reply, State3} = update_write_ahead_log({{add, Type}, {End, Start, ID}}, StateDB, State2), + {Reply, State3} = update_write_ahead_log({{add, Packing}, {End, Start, ID}}, StateDB, State2), case Reply of ok -> emit_add_range(Start, End, ID, StoreID); @@ -515,14 +515,14 @@ is_recorded_any(_Offset, _ID, []) -> is_recorded2(_Offset, '$end_of_table', _ID, _StoreID) -> false; -is_recorded2(Offset, {ID, Type, StoreID}, ID, StoreID) -> - case ets:lookup(sync_records, {ID, Type, StoreID}) of +is_recorded2(Offset, {ID, Packing, StoreID}, ID, StoreID) -> + case ets:lookup(sync_records, {ID, Packing, StoreID}) of [{_, TID}] -> case ar_ets_intervals:is_inside(TID, Offset) of true -> - {true, Type}; + {true, Packing}; false -> - is_recorded2(Offset, ets:next(sync_records, {ID, Type, StoreID}), ID, + is_recorded2(Offset, ets:next(sync_records, {ID, Packing, StoreID}), ID, StoreID) end; [] -> @@ -576,15 +576,15 @@ replay_write_ahead_log(SyncRecordByID, SyncRecordByIDType, N, WAL, StateDB, Stor SyncRecordByID2 = maps:put(ID, SyncRecord2, SyncRecordByID), replay_write_ahead_log( SyncRecordByID2, SyncRecordByIDType, N + 1, WAL, StateDB, StoreID); - {add, Type} -> + {add, Packing} -> {End, Start, ID} = Params, SyncRecord = maps:get(ID, SyncRecordByID, ar_intervals:new()), SyncRecord2 = ar_intervals:add(SyncRecord, End, Start), SyncRecordByID2 = maps:put(ID, SyncRecord2, SyncRecordByID), - ByType = maps:get({ID, Type}, SyncRecordByIDType, ar_intervals:new()), + ByType = maps:get({ID, Packing}, SyncRecordByIDType, ar_intervals:new()), ByType2 = ar_intervals:add(ByType, End, Start), emit_add_range(Start, End, ID, StoreID), - SyncRecordByIDType2 = maps:put({ID, Type}, ByType2, SyncRecordByIDType), + SyncRecordByIDType2 = maps:put({ID, Packing}, ByType2, SyncRecordByIDType), replay_write_ahead_log( SyncRecordByID2, SyncRecordByIDType2, N + 1, WAL, StateDB, StoreID); delete -> @@ -653,10 +653,10 @@ initialize_sync_record_by_id_type_ets(SyncRecordByIDType, StoreID) -> initialize_sync_record_by_id_type_ets2(none, _StoreID) -> ok; -initialize_sync_record_by_id_type_ets2({{ID, Type}, SyncRecord, Iterator}, StoreID) -> +initialize_sync_record_by_id_type_ets2({{ID, Packing}, SyncRecord, Iterator}, StoreID) -> TID = ets:new(sync_record_type, [ordered_set, public, {read_concurrency, true}]), ar_ets_intervals:init_from_gb_set(TID, SyncRecord), - ets:insert(sync_records, {{ID, Type, StoreID}, TID}), + ets:insert(sync_records, {{ID, Packing, StoreID}, TID}), initialize_sync_record_by_id_type_ets2(maps:next(Iterator), StoreID). store_state(State) -> @@ -687,9 +687,9 @@ store_state(State) -> {Error2, State}; ok -> maps:map( - fun ({ar_data_sync, Type}, TypeRecord) -> + fun ({ar_data_sync, Packing}, TypeRecord) -> ar_mining_stats:set_storage_module_data_size( - StoreID, Type, PartitionNumber, StorageModuleSize, + StoreID, Packing, PartitionNumber, StorageModuleSize, StorageModuleIndex, ar_intervals:sum(TypeRecord)); (_, _) ->