diff options
Diffstat (limited to 'learn-you-some-erlang/ppool-1.0/src')
-rw-r--r-- | learn-you-some-erlang/ppool-1.0/src/ppool.erl | 26 | ||||
-rw-r--r-- | learn-you-some-erlang/ppool-1.0/src/ppool_serv.erl | 112 | ||||
-rw-r--r-- | learn-you-some-erlang/ppool-1.0/src/ppool_sup.erl | 17 | ||||
-rw-r--r-- | learn-you-some-erlang/ppool-1.0/src/ppool_supersup.erl | 22 | ||||
-rw-r--r-- | learn-you-some-erlang/ppool-1.0/src/ppool_worker_sup.erl | 15 |
5 files changed, 192 insertions, 0 deletions
diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool.erl b/learn-you-some-erlang/ppool-1.0/src/ppool.erl new file mode 100644 index 0000000..5723f98 --- /dev/null +++ b/learn-you-some-erlang/ppool-1.0/src/ppool.erl @@ -0,0 +1,26 @@ +%%% API module for the pool +-module(ppool). +-behaviour(application). +-export([start/2, stop/1, start_pool/3, + run/2, sync_queue/2, async_queue/2, stop_pool/1]). + +start(normal, _Args) -> + ppool_supersup:start_link(). + +stop(_State) -> + ok. + +start_pool(Name, Limit, {M,F,A}) -> + ppool_supersup:start_pool(Name, Limit, {M,F,A}). + +stop_pool(Name) -> + ppool_supersup:stop_pool(Name). + +run(Name, Args) -> + ppool_serv:run(Name, Args). + +async_queue(Name, Args) -> + ppool_serv:async_queue(Name, Args). + +sync_queue(Name, Args) -> + ppool_serv:sync_queue(Name, Args). diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool_serv.erl b/learn-you-some-erlang/ppool-1.0/src/ppool_serv.erl new file mode 100644 index 0000000..a16130c --- /dev/null +++ b/learn-you-some-erlang/ppool-1.0/src/ppool_serv.erl @@ -0,0 +1,112 @@ +-module(ppool_serv). +-behaviour(gen_server). +-export([start/4, start_link/4, run/2, sync_queue/2, async_queue/2, stop/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2]). + +%% The friendly supervisor is started dynamically! +-define(SPEC(MFA), + {worker_sup, + {ppool_worker_sup, start_link, [MFA]}, + temporary, + 10000, + supervisor, + [ppool_worker_sup]}). + +-record(state, {limit=0, + sup, + refs, + queue=queue:new()}). + +start(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) -> + gen_server:start({local, Name}, ?MODULE, {Limit, MFA, Sup}, []). + +start_link(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) -> + gen_server:start_link({local, Name}, ?MODULE, {Limit, MFA, Sup}, []). + +run(Name, Args) -> + gen_server:call(Name, {run, Args}). + +sync_queue(Name, Args) -> + gen_server:call(Name, {sync, Args}, infinity). + +async_queue(Name, Args) -> + gen_server:cast(Name, {async, Args}). + +stop(Name) -> + gen_server:call(Name, stop). + +%% Gen server +init({Limit, MFA, Sup}) -> + %% We need to find the Pid of the worker supervisor from here, + %% but alas, this would be calling the supervisor while it waits for us! + self() ! {start_worker_supervisor, Sup, MFA}, + {ok, #state{limit=Limit, refs=gb_sets:empty()}}. + +handle_call({run, Args}, _From, S = #state{limit=N, sup=Sup, refs=R}) when N > 0 -> + {ok, Pid} = supervisor:start_child(Sup, Args), + Ref = erlang:monitor(process, Pid), + {reply, {ok,Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}}; +handle_call({run, _Args}, _From, S=#state{limit=N}) when N =< 0 -> + {reply, noalloc, S}; + +handle_call({sync, Args}, _From, S = #state{limit=N, sup=Sup, refs=R}) when N > 0 -> + {ok, Pid} = supervisor:start_child(Sup, Args), + Ref = erlang:monitor(process, Pid), + {reply, {ok,Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}}; +handle_call({sync, Args}, From, S = #state{queue=Q}) -> + {noreply, S#state{queue=queue:in({From, Args}, Q)}}; + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; +handle_call(_Msg, _From, State) -> + {noreply, State}. + + +handle_cast({async, Args}, S=#state{limit=N, sup=Sup, refs=R}) when N > 0 -> + {ok, Pid} = supervisor:start_child(Sup, Args), + Ref = erlang:monitor(process, Pid), + {noreply, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}}; +handle_cast({async, Args}, S=#state{limit=N, queue=Q}) when N =< 0 -> + {noreply, S#state{queue=queue:in(Args,Q)}}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', Ref, process, _Pid, _}, S = #state{refs=Refs}) -> + case gb_sets:is_element(Ref, Refs) of + true -> + handle_down_worker(Ref, S); + false -> %% Not our responsibility + {noreply, S} + end; +handle_info({start_worker_supervisor, Sup, MFA}, S = #state{}) -> + {ok, Pid} = supervisor:start_child(Sup, ?SPEC(MFA)), + link(Pid), + {noreply, S#state{sup=Pid}}; +handle_info(Msg, State) -> + io:format("Unknown msg: ~p~n", [Msg]), + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +handle_down_worker(Ref, S = #state{limit=L, sup=Sup, refs=Refs}) -> + case queue:out(S#state.queue) of + {{value, {From, Args}}, Q} -> + {ok, Pid} = supervisor:start_child(Sup, Args), + NewRef = erlang:monitor(process, Pid), + NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref,Refs)), + gen_server:reply(From, {ok, Pid}), + {noreply, S#state{refs=NewRefs, queue=Q}}; + {{value, Args}, Q} -> + {ok, Pid} = supervisor:start_child(Sup, Args), + NewRef = erlang:monitor(process, Pid), + NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref,Refs)), + {noreply, S#state{refs=NewRefs, queue=Q}}; + {empty, _} -> + {noreply, S#state{limit=L+1, refs=gb_sets:delete(Ref,Refs)}} + end. diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool_sup.erl b/learn-you-some-erlang/ppool-1.0/src/ppool_sup.erl new file mode 100644 index 0000000..71ec31d --- /dev/null +++ b/learn-you-some-erlang/ppool-1.0/src/ppool_sup.erl @@ -0,0 +1,17 @@ +-module(ppool_sup). +-export([start_link/3, init/1]). +-behaviour(supervisor). + +start_link(Name, Limit, MFA) -> + supervisor:start_link(?MODULE, {Name, Limit, MFA}). + +init({Name, Limit, MFA}) -> + MaxRestart = 1, + MaxTime = 3000, + {ok, {{one_for_all, MaxRestart, MaxTime}, + [{serv, + {ppool_serv, start_link, [Name, Limit, self(), MFA]}, + permanent, + 5000, + worker, + [ppool_serv]}]}}. diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool_supersup.erl b/learn-you-some-erlang/ppool-1.0/src/ppool_supersup.erl new file mode 100644 index 0000000..06fa0af --- /dev/null +++ b/learn-you-some-erlang/ppool-1.0/src/ppool_supersup.erl @@ -0,0 +1,22 @@ +-module(ppool_supersup). +-behaviour(supervisor). +-export([start_link/0, start_pool/3, stop_pool/1]). +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ppool}, ?MODULE, []). + +start_pool(Name, Limit, MFA) -> + ChildSpec = {Name, + {ppool_sup, start_link, [Name, Limit, MFA]}, + permanent, 10500, supervisor, [ppool_sup]}, + supervisor:start_child(ppool, ChildSpec). + +stop_pool(Name) -> + supervisor:terminate_child(ppool, Name), + supervisor:delete_child(ppool, Name). + +init([]) -> + MaxRestart = 6, + MaxTime = 3000, + {ok, {{one_for_one, MaxRestart, MaxTime}, []}}. diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool_worker_sup.erl b/learn-you-some-erlang/ppool-1.0/src/ppool_worker_sup.erl new file mode 100644 index 0000000..2467c47 --- /dev/null +++ b/learn-you-some-erlang/ppool-1.0/src/ppool_worker_sup.erl @@ -0,0 +1,15 @@ +-module(ppool_worker_sup). +-export([start_link/1, init/1]). +-behaviour(supervisor). + +start_link(MFA = {_,_,_}) -> + supervisor:start_link(?MODULE, MFA). + +init({M,F,A}) -> + MaxRestart = 5, + MaxTime = 3600, + {ok, {{simple_one_for_one, MaxRestart, MaxTime}, + [{ppool_worker, + {M,F,A}, + temporary, 5000, worker, [M]}]}}. + |