-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker_pool.erl
51 lines (44 loc) · 1.36 KB
/
worker_pool.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
-module(worker_pool).
%% Public API
-export([
new/1,
take/1,
return/2
]).
% [
% Size, NextTake, NextReturn,
% WorkerId1, WorkerId2, ...
% ]
-define(SIZE_POS, 1).
-define(TAKE_POS, 2).
-define(RETURN_POS, 3).
-define(HEADER_SIZE, 3).
-spec new(pos_integer()) -> atomics:atomics_ref().
new(Size) ->
Ref = atomics:new(Size + ?HEADER_SIZE, []),
atomics:put(Ref, ?SIZE_POS, Size),
[atomics:put(Ref, Id + ?HEADER_SIZE, Id) || Id <- lists:seq(1, Size)],
Ref.
-spec take(atomics:atomics_ref()) -> pos_integer() | empty.
take(Ref) ->
NextTake = atomics:get(Ref, ?TAKE_POS),
case atomics:exchange(Ref, NextTake + ?HEADER_SIZE + 1, 0) of
0 ->
empty;
Id ->
case NextTake + 1 < atomics:get(Ref, ?SIZE_POS) of
true ->
atomics:put(Ref, ?TAKE_POS, NextTake + 1);
_ ->
atomics:put(Ref, ?TAKE_POS, 0)
end,
Id
end.
-spec return(atomics:atomics_ref(), pos_integer()) -> NeedNotify :: boolean().
return(Ref, Id) ->
Size = atomics:get(Ref, ?SIZE_POS),
NextReturn = atomics:add_get(Ref, ?RETURN_POS, 1),
NextReturn =:= Size andalso atomics:sub(Ref, ?RETURN_POS, Size),
ReturnPos = ((NextReturn - 1) rem Size),
atomics:put(Ref, ReturnPos + ?HEADER_SIZE + 1, Id),
atomics:get(Ref, ?TAKE_POS) =:= ReturnPos.