aboutsummaryrefslogtreecommitdiff
path: root/learn-you-some-erlang/ppool-1.0/src
diff options
context:
space:
mode:
Diffstat (limited to 'learn-you-some-erlang/ppool-1.0/src')
-rw-r--r--learn-you-some-erlang/ppool-1.0/src/ppool.erl26
-rw-r--r--learn-you-some-erlang/ppool-1.0/src/ppool_serv.erl112
-rw-r--r--learn-you-some-erlang/ppool-1.0/src/ppool_sup.erl17
-rw-r--r--learn-you-some-erlang/ppool-1.0/src/ppool_supersup.erl22
-rw-r--r--learn-you-some-erlang/ppool-1.0/src/ppool_worker_sup.erl15
5 files changed, 192 insertions, 0 deletions
diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool.erl b/learn-you-some-erlang/ppool-1.0/src/ppool.erl
new file mode 100644
index 0000000..5723f98
--- /dev/null
+++ b/learn-you-some-erlang/ppool-1.0/src/ppool.erl
@@ -0,0 +1,26 @@
+%%% API module for the pool
+-module(ppool).
+-behaviour(application).
+-export([start/2, stop/1, start_pool/3,
+ run/2, sync_queue/2, async_queue/2, stop_pool/1]).
+
+start(normal, _Args) ->
+ ppool_supersup:start_link().
+
+stop(_State) ->
+ ok.
+
+start_pool(Name, Limit, {M,F,A}) ->
+ ppool_supersup:start_pool(Name, Limit, {M,F,A}).
+
+stop_pool(Name) ->
+ ppool_supersup:stop_pool(Name).
+
+run(Name, Args) ->
+ ppool_serv:run(Name, Args).
+
+async_queue(Name, Args) ->
+ ppool_serv:async_queue(Name, Args).
+
+sync_queue(Name, Args) ->
+ ppool_serv:sync_queue(Name, Args).
diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool_serv.erl b/learn-you-some-erlang/ppool-1.0/src/ppool_serv.erl
new file mode 100644
index 0000000..a16130c
--- /dev/null
+++ b/learn-you-some-erlang/ppool-1.0/src/ppool_serv.erl
@@ -0,0 +1,112 @@
+-module(ppool_serv).
+-behaviour(gen_server).
+-export([start/4, start_link/4, run/2, sync_queue/2, async_queue/2, stop/1]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ code_change/3, terminate/2]).
+
+%% The friendly supervisor is started dynamically!
+-define(SPEC(MFA),
+ {worker_sup,
+ {ppool_worker_sup, start_link, [MFA]},
+ temporary,
+ 10000,
+ supervisor,
+ [ppool_worker_sup]}).
+
+-record(state, {limit=0,
+ sup,
+ refs,
+ queue=queue:new()}).
+
+start(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) ->
+ gen_server:start({local, Name}, ?MODULE, {Limit, MFA, Sup}, []).
+
+start_link(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) ->
+ gen_server:start_link({local, Name}, ?MODULE, {Limit, MFA, Sup}, []).
+
+run(Name, Args) ->
+ gen_server:call(Name, {run, Args}).
+
+sync_queue(Name, Args) ->
+ gen_server:call(Name, {sync, Args}, infinity).
+
+async_queue(Name, Args) ->
+ gen_server:cast(Name, {async, Args}).
+
+stop(Name) ->
+ gen_server:call(Name, stop).
+
+%% Gen server
+init({Limit, MFA, Sup}) ->
+ %% We need to find the Pid of the worker supervisor from here,
+ %% but alas, this would be calling the supervisor while it waits for us!
+ self() ! {start_worker_supervisor, Sup, MFA},
+ {ok, #state{limit=Limit, refs=gb_sets:empty()}}.
+
+handle_call({run, Args}, _From, S = #state{limit=N, sup=Sup, refs=R}) when N > 0 ->
+ {ok, Pid} = supervisor:start_child(Sup, Args),
+ Ref = erlang:monitor(process, Pid),
+ {reply, {ok,Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}};
+handle_call({run, _Args}, _From, S=#state{limit=N}) when N =< 0 ->
+ {reply, noalloc, S};
+
+handle_call({sync, Args}, _From, S = #state{limit=N, sup=Sup, refs=R}) when N > 0 ->
+ {ok, Pid} = supervisor:start_child(Sup, Args),
+ Ref = erlang:monitor(process, Pid),
+ {reply, {ok,Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}};
+handle_call({sync, Args}, From, S = #state{queue=Q}) ->
+ {noreply, S#state{queue=queue:in({From, Args}, Q)}};
+
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State};
+handle_call(_Msg, _From, State) ->
+ {noreply, State}.
+
+
+handle_cast({async, Args}, S=#state{limit=N, sup=Sup, refs=R}) when N > 0 ->
+ {ok, Pid} = supervisor:start_child(Sup, Args),
+ Ref = erlang:monitor(process, Pid),
+ {noreply, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}};
+handle_cast({async, Args}, S=#state{limit=N, queue=Q}) when N =< 0 ->
+ {noreply, S#state{queue=queue:in(Args,Q)}};
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', Ref, process, _Pid, _}, S = #state{refs=Refs}) ->
+ case gb_sets:is_element(Ref, Refs) of
+ true ->
+ handle_down_worker(Ref, S);
+ false -> %% Not our responsibility
+ {noreply, S}
+ end;
+handle_info({start_worker_supervisor, Sup, MFA}, S = #state{}) ->
+ {ok, Pid} = supervisor:start_child(Sup, ?SPEC(MFA)),
+ link(Pid),
+ {noreply, S#state{sup=Pid}};
+handle_info(Msg, State) ->
+ io:format("Unknown msg: ~p~n", [Msg]),
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_down_worker(Ref, S = #state{limit=L, sup=Sup, refs=Refs}) ->
+ case queue:out(S#state.queue) of
+ {{value, {From, Args}}, Q} ->
+ {ok, Pid} = supervisor:start_child(Sup, Args),
+ NewRef = erlang:monitor(process, Pid),
+ NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref,Refs)),
+ gen_server:reply(From, {ok, Pid}),
+ {noreply, S#state{refs=NewRefs, queue=Q}};
+ {{value, Args}, Q} ->
+ {ok, Pid} = supervisor:start_child(Sup, Args),
+ NewRef = erlang:monitor(process, Pid),
+ NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref,Refs)),
+ {noreply, S#state{refs=NewRefs, queue=Q}};
+ {empty, _} ->
+ {noreply, S#state{limit=L+1, refs=gb_sets:delete(Ref,Refs)}}
+ end.
diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool_sup.erl b/learn-you-some-erlang/ppool-1.0/src/ppool_sup.erl
new file mode 100644
index 0000000..71ec31d
--- /dev/null
+++ b/learn-you-some-erlang/ppool-1.0/src/ppool_sup.erl
@@ -0,0 +1,17 @@
+-module(ppool_sup).
+-export([start_link/3, init/1]).
+-behaviour(supervisor).
+
+start_link(Name, Limit, MFA) ->
+ supervisor:start_link(?MODULE, {Name, Limit, MFA}).
+
+init({Name, Limit, MFA}) ->
+ MaxRestart = 1,
+ MaxTime = 3000,
+ {ok, {{one_for_all, MaxRestart, MaxTime},
+ [{serv,
+ {ppool_serv, start_link, [Name, Limit, self(), MFA]},
+ permanent,
+ 5000,
+ worker,
+ [ppool_serv]}]}}.
diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool_supersup.erl b/learn-you-some-erlang/ppool-1.0/src/ppool_supersup.erl
new file mode 100644
index 0000000..06fa0af
--- /dev/null
+++ b/learn-you-some-erlang/ppool-1.0/src/ppool_supersup.erl
@@ -0,0 +1,22 @@
+-module(ppool_supersup).
+-behaviour(supervisor).
+-export([start_link/0, start_pool/3, stop_pool/1]).
+-export([init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ppool}, ?MODULE, []).
+
+start_pool(Name, Limit, MFA) ->
+ ChildSpec = {Name,
+ {ppool_sup, start_link, [Name, Limit, MFA]},
+ permanent, 10500, supervisor, [ppool_sup]},
+ supervisor:start_child(ppool, ChildSpec).
+
+stop_pool(Name) ->
+ supervisor:terminate_child(ppool, Name),
+ supervisor:delete_child(ppool, Name).
+
+init([]) ->
+ MaxRestart = 6,
+ MaxTime = 3000,
+ {ok, {{one_for_one, MaxRestart, MaxTime}, []}}.
diff --git a/learn-you-some-erlang/ppool-1.0/src/ppool_worker_sup.erl b/learn-you-some-erlang/ppool-1.0/src/ppool_worker_sup.erl
new file mode 100644
index 0000000..2467c47
--- /dev/null
+++ b/learn-you-some-erlang/ppool-1.0/src/ppool_worker_sup.erl
@@ -0,0 +1,15 @@
+-module(ppool_worker_sup).
+-export([start_link/1, init/1]).
+-behaviour(supervisor).
+
+start_link(MFA = {_,_,_}) ->
+ supervisor:start_link(?MODULE, MFA).
+
+init({M,F,A}) ->
+ MaxRestart = 5,
+ MaxTime = 3600,
+ {ok, {{simple_one_for_one, MaxRestart, MaxTime},
+ [{ppool_worker,
+ {M,F,A},
+ temporary, 5000, worker, [M]}]}}.
+