From: <slf...@us...> - 2003-05-20 06:48:42
|
Update of /cvsroot/jungerl/jungerl/lib/spread_drv/src In directory sc8-pr-cvs1:/tmp/cvs-serv20860/lib/spread_drv/src Added Files: Makefile spread.erl spread_drv.erl spread_drv.hrl spread_floodrec.erl Log Message: Initial check-in of spread_drv, a driver for the Spread reliable multicast library -- http://www.spread.org/ --- NEW FILE: Makefile --- include ../../../support/include.mk # Some modules are automatically generated, so we won't use the # include makefile's auto-detected list. MODULES := spread_drv spread spread_floodrec OBJECTS := $(MODULES:%=../ebin/%.$(EMULATOR)) all: $(OBJECTS) clean: rm -f $(OBJECTS) --- NEW FILE: spread.erl --- %%%---------------------------------------------------------------------- %%% File : spread.erl %%% Author : Scott Lystig Fritchie <slf...@sn...> %%% Purpose : Erlang-friendly interface to the spread_drv driver. %%% %%% See the file "UserGuide.txt" to answer your many questions. :-) %%% %%%---------------------------------------------------------------------- -module(spread). -behaviour(gen_server). -compile(debug_info). -define(NAME, ?MODULE). -define(DRV, spread_drv). -define(Timeout, infinity). -include("spread_drv.hrl"). %% External exports -export([start_link/0, start_link/4, stop/1]). -export([subscribe/2, subscribe/3, unsubscribe/2, multicast/5, mg_multicast/5]). %% Debugging -export([dump_state/1, rec/1, set_active/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -record(state, { port, % Spread port subtab, % ETS table of subscribers membtab, % ETS table of group members privgrp % Name of Spread private group }). -record(sub, { pid, % Subscriber's pid gnotify, % Flag: want group notify monref % monitor ref }). %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- start_link() -> start_link("4803@localhost", "", 0, 1). start_link(SpreadName, PrivateName, Priority, GroupMembership) -> gen_server:start_link(?MODULE, [SpreadName, PrivateName, Priority, GroupMembership], []). stop(Pid) -> gen_server:call(Pid, {stop}, ?Timeout). subscribe(Pid, Group) -> subscribe(Pid, Group, false). subscribe(Pid, Group, WantMemberChanges) -> gen_server:call(Pid, {subscribe, self(), Group, WantMemberChanges}, ?Timeout). unsubscribe(Pid, Group) -> gen_server:call(Pid, {unsubscribe, self(), Group}, ?Timeout). multicast(Pid, ServiceType, Group, MessType, Mess) -> mg_multicast(Pid, ServiceType, [Group], MessType, Mess). mg_multicast(Pid, ServiceType, GroupList, MessType, Mess) -> gen_server:call(Pid, {mg_multicast, ServiceType, GroupList, MessType, Mess}, ?Timeout). %%% Debugging.... dump_state(Pid) -> gen_server:call(Pid, {dump_state}, ?Timeout). rec(Pid) -> gen_server:call(Pid, {rec}, ?Timeout). set_active(Pid, Value) -> gen_server:call(Pid, {set_active, Value}, ?Timeout). %%%---------------------------------------------------------------------- %%% Callback functions from gen_server %%%---------------------------------------------------------------------- %%---------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %%---------------------------------------------------------------------- init([SpreadName, PrivateName, Priority, GroupMembership]) -> {ok, Port} = ?DRV:start(), ?DRV:debug(Port, 0), {ok, PrivateGroup} = ?DRV:sp_connect(Port, SpreadName, PrivateName, Priority, GroupMembership), {ok, 1} = ?DRV:set_active(Port, 1), STab = ets:new(subscribers, [private, set]), MTab = ets:new(members, [private, set]), {ok, #state{port = Port, subtab = STab, membtab = MTab, privgrp = PrivateGroup}}. %%---------------------------------------------------------------------- %% Func: handle_call/3 %% Returns: {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | (terminate/2 is called) %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_call({stop}, From, State) -> {stop, normal, ok, State}; handle_call({set_active, Value}, From, State) -> {ok, Value} = ?DRV:set_active(State#state.port, Value), Reply = {ok, Value}, {reply, Reply, State}; handle_call({subscribe, FromPid, Group, WantMemberChanges}, From, State) -> {Reply, NewState} = do_subscribe(State, FromPid, Group, WantMemberChanges), {reply, Reply, NewState}; handle_call({unsubscribe, FromPid, Group}, From, State) -> {Reply, NewState} = do_unsubscribe(State, FromPid, Group), {reply, Reply, State}; handle_call({mg_multicast, ServiceType, GroupList, MessType, Mess}, From, State) -> {Reply, NewState} = do_mg_multicast(State, ServiceType, GroupList, MessType, Mess), {reply, Reply, State}; handle_call({dump_state}, From, State) -> Reply = do_dumpstate(State), {reply, Reply, State}; handle_call({rec}, From, State) -> Reply = ?DRV:sp_receive(State#state.port, 3, 200), {reply, Reply, State}. %%---------------------------------------------------------------------- %% Func: handle_cast/2 %% Returns: {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_cast(Msg, State) -> io:format("XXXYYYXXX ~w: ~s:handle_cast got ~w\n", [self(), ?MODULE, Msg]), {noreply, State}. %%---------------------------------------------------------------------- %% Func: handle_info/2 %% Returns: {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_info({Port, active, Msg}, State) when Port == State#state.port -> NewState = process_active_msg(State, Msg), {noreply, NewState}; handle_info({'DOWN', _Ref, process, Pid, _Reason}, State) -> NewState = unsubscribe_from_all(State, Pid), {noreply, NewState}; handle_info(Info, State) -> io:format("XXXYYYXXX ~w: ~s:handle_info got ~w\n", [self(), ?MODULE, Info]), {noreply, State}. %%---------------------------------------------------------------------- %% Func: terminate/2 %% Purpose: Shutdown the server %% Returns: any (ignored by gen_server) %%---------------------------------------------------------------------- terminate(Reason, State) -> ok. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- do_subscribe(State, From, Group, WantMemberChanges) when is_list(Group) -> do_subscribe(State, From, list_to_binary(Group), WantMemberChanges); do_subscribe(State, From, Group, WantMemberChanges) -> case already_subscribed(State, From, Group) of {true, WantMemberChanges} -> {{error, already_subscribed}, State}; {true, true} -> NewState = set_memberchanges(State, From, Group, WantMemberChanges), {ok, NewState}; {true, false} -> NewState = set_memberchanges(State, From, Group, WantMemberChanges), send_current_membership(State, From, Group), {ok, NewState}; false -> NewState = add_subscriber(State, From, Group, WantMemberChanges), case WantMemberChanges of true -> send_current_membership(State, From, Group); _ -> ok end, {ok, NewState} end. do_unsubscribe(State, From, Group) when is_list(Group) -> do_unsubscribe(State, From, list_to_binary(Group)); do_unsubscribe(State, From, Group) -> case already_subscribed(State, From, Group) of {true, _} -> NewState = del_subscriber(State, From, Group), {ok, NewState}; false -> {{error, not_subscribed}, State} end. do_mg_multicast(State, ServiceType, GroupList, MessType, Mess) -> ServiceTypeNum = convert_servicetype(ServiceType), Res = (catch ?DRV:sp_multigroup_multicast(State#state.port, ServiceTypeNum, GroupList, MessType, Mess)), {Res, State}. already_subscribed(State, From, Group) -> case ets:lookup(State#state.subtab, Group) of [] -> false; [{Group, SubList}] -> case lists:keysearch(From, #sub.pid, SubList) of false -> false; {value, Sub} -> {true, Sub#sub.gnotify} end end. %% We assume that the caller knows that From _is_ subscribed to Group. set_memberchanges(State, From, Group, WantMemberChanges) -> [{Group, SubList}] = ets:lookup(State#state.subtab, Group), Sub = lists:keysearch(From, #sub.pid, SubList), NewSub = #sub{pid = From, gnotify = WantMemberChanges, monref = Sub#sub.monref}, NewSubList = [NewSub | lists:keydelete(From, #sub.pid, SubList)], ets:insert(State#state.subtab, {Group, NewSubList}), State. %% We assume that the caller knows that From is _not_ subscribed to Group. add_subscriber(State, From, Group, WantMemberChanges) -> Sub = #sub{pid = From, gnotify = WantMemberChanges, monref = erlang:monitor(process, From)}, case ets:lookup(State#state.subtab, Group) of [] -> case catch ?DRV:sp_join(State#state.port, Group) of {ok, _} -> ets:insert(State#state.subtab, {Group, [Sub]}), State; _ -> io:format("XXX add_subscriber: what to do here!?\n"), State end; [{Group, SubList}] -> NewSubList = [Sub|SubList], ets:insert(State#state.subtab, {Group, NewSubList}), State end. %% We assume that the caller knows that From _is_ subscribed to Group. del_subscriber(State, From, Group) -> case ets:lookup(State#state.subtab, Group) of [{Group, SubList}] -> NewSubList = lists:keydelete(From, #sub.pid, SubList), case NewSubList of [] -> ets:delete(State#state.subtab, Group), catch ?DRV:sp_leave(State#state.port, Group); _ -> ets:insert(State#state.subtab, {Group, NewSubList}) end; [] -> ok end, State. send_current_membership(State, ToPid, Group) -> case ets:lookup(State#state.membtab, Group) of [{Group, Members}] -> ets:lookup(State#state.membtab, Group), ToPid ! {spread, self(), membership, Group, Members}; _ -> ok end. process_active_msg(State, Msg) -> {ServiceType, Sender, GroupList, MessType, EndianMismatch, Mess} = Msg, case make_servicelist(ServiceType) of {reg_memb_mess, _Reason} -> % Sender = group name, GroupList = new membership list NewState = update_membership(State, Sender, GroupList), notify_member_change(State, Sender, GroupList), NewState; {regular_mess, Type} -> NewState = forward_message(State, GroupList, Type, Sender, MessType, Mess), NewState; _ -> throw({err, "XXX Unknown message type, cannot process"}) end. %% XXX NOTE: This does not take REJECT_MESS into account, nor does it %% pay attention to some other service bits that may or may %% not be important. {shrug} make_servicelist(ServiceType) -> Memb = ServiceType band 16#3000, Reason = ServiceType band 16#0f00, MessFlavor = ServiceType band ?REGULAR_MESS, First = decode_memb(Memb, Reason), Second = if Memb > 0 -> decode_reason(Reason, Memb); true -> decode_messflavor(MessFlavor) end, {First, Second}. decode_memb(0, 0) -> regular_mess; decode_memb(0, ?CAUSED_BY_LEAVE) -> reg_memb_mess; % XXX new type for self leave?? decode_memb(?REG_MEMB_MESS, _Reason) -> reg_memb_mess; decode_memb(?TRANSITION_MESS, _Reason) -> transition_mess. decode_reason(?CAUSED_BY_JOIN, _) -> caused_by_join; decode_reason(?CAUSED_BY_LEAVE, _) -> caused_by_leave; decode_reason(?CAUSED_BY_DISCONNECT, _) -> caused_by_disconnect; decode_reason(?CAUSED_BY_NETWORK, _) -> caused_by_network; decode_reason(0, ?TRANSITION_MESS) -> caused_by_network. % XXX is this wise? decode_messflavor(?UNRELIABLE_MESS) -> unreliable_mess; decode_messflavor(?RELIABLE_MESS) -> reliable_mess; decode_messflavor(?FIFO_MESS) -> fifo_mess; decode_messflavor(?CAUSAL_MESS) -> causal_mess; decode_messflavor(?AGREED_MESS) -> agreed_mess; decode_messflavor(?SAFE_MESS) -> safe_mess; decode_messflavor(0) -> self_leave. %XXX is this wise? update_membership(State, Group, []) -> %% This is the self-leave case: we left the group, so we don't know %% who remains in the group. ets:delete(State#state.membtab, Group), State; update_membership(State, Group, Members) -> ets:insert(State#state.membtab, {Group, Members}), State. notify_member_change(State, Group, Members) -> case ets:lookup(State#state.subtab, Group) of [{Group, SubList}] -> notify_member_change2(SubList, Group, Members); [] -> ok end. notify_member_change2([], Group, Members) -> ok; notify_member_change2([H|T], Group, Members) when H#sub.gnotify == true -> H#sub.pid ! {spread, self(), membership, Group, Members}, notify_member_change2(T, Group, Members); notify_member_change2([H|T], Group, Members) -> notify_member_change2(T, Group, Members). forward_message(State, [], Type, Sender, MessType, Mess) -> State; forward_message(State, [H|T], Type, Sender, MessType, Mess) -> forward_msg(State, H, Type, Sender, MessType, Mess), forward_message(State, T, Type, Sender, MessType, Mess). forward_msg(State, Group, Type, Sender, MessType, Mess) -> case ets:lookup(State#state.subtab, Group) of [{Group, SubList}] -> [Pid ! {spread, self(), msg, Group, Type, Sender, MessType, Mess} || #sub{pid = Pid} <- SubList]; _ -> ok % XXX really ok? end. unsubscribe_from_all(State, Pid) -> do_dumpstate(State), Groups = [G || [G] <- ets:match(State#state.subtab, {'$1', '_'})], Fun = fun (Group, State) -> del_subscriber(State, Pid, Group) % Returns new state! end, NewState = lists:foldl(Fun, State, Groups), NewState. convert_servicetype(unreliable_mess) -> ?UNRELIABLE_MESS; convert_servicetype(reliable_mess) -> ?RELIABLE_MESS; convert_servicetype(fifo_mess) -> ?FIFO_MESS; convert_servicetype(causal_mess) -> ?CAUSAL_MESS; convert_servicetype(agreed_mess) -> ?AGREED_MESS; convert_servicetype(safe_mess) -> ?SAFE_MESS. do_dumpstate(State) -> M = io_lib:format("State = ~p\n", [State]) ++ io_lib:format("subtab = ~p\n", [ets:tab2list(State#state.subtab)]) ++ io_lib:format("membtab = ~p\n", [ets:tab2list(State#state.membtab)]), io:format(M), lists:flatten(M). --- NEW FILE: spread_drv.erl --- %%%---------------------------------------------------------------------- %%% File : spread_drv.erl %%% Summary : Spread toolkit version 3.17.0 driver %%% %%% %%% NOTICE: This file was generated by the tools of the Erlang Driver %%% toolkit. Do not edit this file by hand unless you know %%% what you're doing! %%% %%% Copyright (c) 2002, Scott Lystig Fritchie. All rights reserved. %%% See the file "LICENSE" at the top of the source distribution for %%% full license terms. %%% %%%---------------------------------------------------------------------- -module(spread_drv). -include("spread_drv.hrl"). %% Xref with erl_driver_tk.h's PIPE_DRIVER_TERM_* values -define(T_NIL, 0). -define(T_ATOM, 1). -define(T_PORT, 2). -define(T_INT, 3). -define(T_TUPLE, 4). -define(T_BINARY, 5). -define(T_STRING, 6). -define(T_LIST, 7). %% External exports -export([shutdown/1]). -export([make_grouplist/1]). -export([start/0, start_pipe/0]). -export([debug/2]). -export([ sp_connect/5, sp_disconnect/1, sp_join/2, sp_leave/2, sp_multicast/5, sp_multigroup_multicast/5, sp_poll/1, sp_receive/3, sp_get_gid_offset_memb_mess/1, sp_get_num_vs_offset_memb_mess/1, sp_get_vs_set_offset_memb_mess/1, set_active/2 ]). start() -> {ok, Path} = load_path(?DRV_NAME ++ ".so"), erl_ddll:start(), ok = erl_ddll:load_driver(Path, ?DRV_NAME), case open_port({spawn, ?DRV_NAME}, []) of P when port(P) -> {ok, P}; Err -> Err end. start_pipe() -> {ok, PipeMain} = load_path("pipe-main"), {ok, ShLib} = load_path("./spread_drv.so"), Cmd = PipeMain ++ "/pipe-main " ++ ShLib ++ "/spread_drv.so", case open_port({spawn, Cmd}, [exit_status, binary, use_stdio, {packet, 4}]) of P when port(P) -> {ok, P}; Err -> Err end. debug(Port, Flags) when port(Port), integer(Flags) -> case catch erlang:port_command(Port, <<?_DEBUG, Flags:32>>) of true -> get_port_reply(Port); Err -> throw(Err) % XXX too drastic? end. sp_connect(Port, Spread_Name, Private_Name, Priority, Group_Membership ) when port(Port) -> % TODO: Add additional constraints here {Spread_NameBinOrList, Spread_NameLen} = serialize_contiguously(Spread_Name, 1), {Private_NameBinOrList, Private_NameLen} = serialize_contiguously(Private_Name, 1), IOList_____ = [ <<?_SP_CONNECT, Spread_NameLen:32/integer>>, % I/O list length Spread_NameBinOrList, << Private_NameLen:32/integer>>, % I/O list length Private_NameBinOrList, << Priority:32/integer, Group_Membership:32/integer >> ], case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_disconnect(Port ) when port(Port) -> % TODO: Add additional constraints here IOList_____ = <<?_SP_DISCONNECT>>, case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_join(Port, Group ) when port(Port) -> % TODO: Add additional constraints here {GroupBinOrList, GroupLen} = serialize_contiguously(Group, 1), IOList_____ = [ <<?_SP_JOIN, GroupLen:32/integer>>, % I/O list length GroupBinOrList, << >> ], case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_leave(Port, Group ) when port(Port) -> % TODO: Add additional constraints here {GroupBinOrList, GroupLen} = serialize_contiguously(Group, 1), IOList_____ = [ <<?_SP_LEAVE, GroupLen:32/integer>>, % I/O list length GroupBinOrList, << >> ], case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_multicast(Port, Service_Type, Group, Mess_Type, Mess ) when port(Port) -> % TODO: Add additional constraints here {GroupBinOrList, GroupLen} = serialize_contiguously(Group, 1), {MessBinOrList, MessLen} = serialize_contiguously(Mess, 0), IOList_____ = [ <<?_SP_MULTICAST, Service_Type:32/unsigned-integer, GroupLen:32/integer>>, % I/O list length GroupBinOrList, << Mess_Type:16/unsigned-integer, MessLen:32/integer>>, % I/O list length MessBinOrList, << >> ], case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_multigroup_multicast(Port, Service_Type, Groups, Mess_Type, Mess ) when port(Port) -> % TODO: Add additional constraints here {GroupsBinOrList, GroupsLen} = serialize_contiguously(Groups, 0), {MessBinOrList, MessLen} = serialize_contiguously(Mess, 0), IOList_____ = [ <<?_SP_MULTIGROUP_MULTICAST, Service_Type:32/unsigned-integer, GroupsLen:32/integer>>, % I/O list length GroupsBinOrList, << Mess_Type:16/unsigned-integer, MessLen:32/integer>>, % I/O list length MessBinOrList, << >> ], case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_poll(Port ) when port(Port) -> % TODO: Add additional constraints here IOList_____ = <<?_SP_POLL>>, case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_receive(Port, Max_Groups, Max_Mess_Len ) when port(Port) -> % TODO: Add additional constraints here IOList_____ = <<?_SP_RECEIVE, Max_Groups:32/integer, Max_Mess_Len:32/integer >>, case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_get_gid_offset_memb_mess(Port ) when port(Port) -> % TODO: Add additional constraints here IOList_____ = <<?_SP_GET_GID_OFFSET_MEMB_MESS>>, case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_get_num_vs_offset_memb_mess(Port ) when port(Port) -> % TODO: Add additional constraints here IOList_____ = <<?_SP_GET_NUM_VS_OFFSET_MEMB_MESS>>, case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. sp_get_vs_set_offset_memb_mess(Port ) when port(Port) -> % TODO: Add additional constraints here IOList_____ = <<?_SP_GET_VS_SET_OFFSET_MEMB_MESS>>, case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. set_active(Port, Value ) when port(Port) -> % TODO: Add additional constraints here IOList_____ = <<?_SET_ACTIVE, Value:32/integer >>, case catch erlang:port_command(Port, IOList_____) of true -> get_port_reply(Port); Err -> throw(Err) % XXX Is this too drastic? end. %%% %%% Internal functions. %%% load_path(File) -> case lists:filter(fun(D) -> case file:read_file_info(D ++ "/" ++ File) of {ok, _} -> true; _ -> false end end, code:get_path()) of [Dir|_] -> {ok, Dir}; [] -> io:format("Error: ~s not found in code path\n", [File]), {error, enoent} end. %%% %%% Note that an 'xtra_return' that only returns one item in its %%% tuple will return {Port, ok, {Thingie}}, so we'll return %%% {ok, {Thingie}}, which is *sooooooo* maddening because I keep %%% forgetting the extra tuple wrapper. So, if there's only one %%% thingie in the return tuple, strip it off: {ok, Thingie} %%% get_port_reply(Port) when port(Port) -> receive {Port, ok} = T -> proc_reply(T); {Port, ok, {M}} = T -> proc_reply(T); {Port, ok, M} = T -> proc_reply(T); {Port, error, {Reason}} = T -> proc_reply(T); {Port, error, Reason} = T -> proc_reply(T); %% Pipe driver messages {Port, {data, Bytes}} -> proc_reply(pipedrv_deser(Port, Bytes)); {'EXIT', Port, Reason} -> throw({port_error, Reason}); % XXX too drastic? {Port, Reason} -> throw({port_error, Reason}) % XXX too drastic? end. %% This function exists to provide consistency of replies %% given by linked-in and pipe drivers. The "receive" statement %% in get_port_reply/1 is specific because we want it to be %% very selective about what it will grab out of the mailbox. proc_reply({Port, ok}) when port(Port) -> ok; proc_reply({Port, ok, {M}}) when port(Port) -> {ok, M}; proc_reply({Port, ok, M}) when port(Port) -> {ok, M}; proc_reply({Port, error, {Reason}}) when port(Port) -> {error, Reason}; proc_reply({Port, error, Reason}) when port(Port) -> {error, Reason}. %%% io_list_len() is an extremely useful function. BEAM has got this %%% implemented quite efficiently in C. It would be *fabulous* to be able %%% to use it from Erlang via a BIF. io_list_len(B) when binary(B) -> {B, size(B)}; io_list_len(L) when list(L) -> io_list_len(L, 0). io_list_len([H|T], N) -> if H >= 0, H =< 255 -> io_list_len(T, N+1); list(H) -> io_list_len(T, io_list_len(H,N)); binary(H) -> io_list_len(T, size(H) + N); true -> throw({error, partial_len, N}) end; io_list_len(H, N) when binary(H) -> size(H) + N; io_list_len([], N) -> N. %%% We need to make the binary thing we're passing in contiguous %%% because the C function we're calling is expecting a single %%% contiguous buffer. If IOList is ["Hello, ", <<"World">>, "!"], %%% that binary in the middle element will end up with the argument %%% spanning three parts of an ErlIOVec. If that happens, then we'd %%% have to have the driver do the dirty work of putting the argument %%% into a single contiguous buffer. %%% %%% Frankly, we're lazy, and this code is short and won't be much %%% slower than doing it in C. %%% 2nd arg: if 1, NUL-terminate the IOList serialize_contiguously(B, 0) when binary(B) -> {B, size(B)}; serialize_contiguously([B], 0) when binary(B) -> {B, size(B)}; serialize_contiguously(IOList, 1) -> serialize_contiguously([IOList, 0], 0); serialize_contiguously(IOList, 0) -> B = list_to_binary(IOList), {B, size(B)}. %% pipedrv_deser/2 -- Deserialize the term that the pipe driver is %% is returning to Erlang. The pipe driver doesn't know it's a pipe %% driver, it thinks it's a linked-in driver, so it tries to return %% an arbitrary Erlang term to us. The pipe-main program is sneaky: %% it has a driver_output_term() function that serializes the term %% that the driver built. With the help of a list-as-stack, we %% deserialize that term. pipedrv_deser(Port, B) -> pipedrv_deser(Port, B, []). pipedrv_deser(Port, <<>>, []) -> throw(icky_i_think); pipedrv_deser(Port, <<>>, [T]) -> T; pipedrv_deser(Port, <<?T_NIL:8, Rest/binary>>, Stack) -> pipedrv_deser(Port, Rest, [foo___foo_nil___|Stack]); pipedrv_deser(Port, <<?T_ATOM:8, Len:8, Rest/binary>>, Stack) -> <<A:Len/binary, Rest2/binary>> = Rest, pipedrv_deser(Port, Rest2, [list_to_atom(binary_to_list(A))|Stack]); pipedrv_deser(Port, <<?T_PORT:8, P:32/unsigned, Rest/binary>>, Stack) -> %% The pipe driver tried sending us a port, but it cannot know what %% port ID was assigned to this port, so we'll assume it is Port. pipedrv_deser(Port, Rest, [Port|Stack]); pipedrv_deser(Port, <<?T_INT:8, I:32/signed, Rest/binary>>, Stack) -> pipedrv_deser(Port, Rest, [I|Stack]); pipedrv_deser(Port, <<?T_TUPLE:8, N:8, Rest/binary>>, Stack) -> {L, NewStack} = popN(N, Stack), pipedrv_deser(Port, Rest, [list_to_tuple(L)|NewStack]); pipedrv_deser(Port, <<?T_LIST:8, N:32, Rest/binary>>, Stack) -> {L, NewStack} = popN(N, Stack), pipedrv_deser(Port, Rest, [L|NewStack]); pipedrv_deser(Port, <<?T_BINARY:8, Len:32/signed, Rest/binary>>, Stack) -> <<Bin:Len/binary, Rest2/binary>> = Rest, pipedrv_deser(Port, Rest2, [Bin|Stack]); pipedrv_deser(Port, <<?T_STRING:8, Len:32/signed, Rest/binary>>, Stack) -> <<Bin:Len/binary, Rest2/binary>> = Rest, pipedrv_deser(Port, Rest2, [binary_to_list(Bin)|Stack]); pipedrv_deser(Port, X, Y) -> throw({bah, X, Y}). popN(N, Stack) -> popN(N, Stack, []). popN(0, Stack, Acc) -> {Acc, Stack}; popN(N, [foo___foo_nil___|T], Acc) -> %% This is the nonsense we put on the stack to represent NIL. Ignore it. popN(N - 1, T, Acc); popN(N, [H|T], Acc) -> popN(N - 1, T, [H|Acc]). %%% %%% Begin code included via <custom_erl> tags %%% shutdown(Port) -> case catch port_close(Port) of true -> ok; _ -> error end, ok. %% %% make_grouplist() -- Use to format the group list for %% sp_multigroup_multicast(). The members of the list NUL-terminated %% and concatenated. If the first byte of a member name is NUL, then %% you've reached the end of the list. %% %% QQQ We are very trusting about our input: no empty lists or %% binaries as list members, no list members with NULs in them, etc. %% make_grouplist(L) -> list_to_binary(make_grouplist2(L)). make_grouplist2([]) -> [0]; make_grouplist2([H|T]) -> [H | [0 | make_grouplist2(T)]]. --- NEW FILE: spread_drv.hrl --- %%%---------------------------------------------------------------------- %%% File : spread_drv.hrl %%% Summary : Spread toolkit version 3.17.0 driver %%% %%% %%% NOTICE: This file was generated by the tools of the Erlang Driver %%% toolkit. Do not edit this file by hand unless you know %%% what you're doing! %%% %%% Copyright (c) 2002, Scott Lystig Fritchie. All rights reserved. %%% See the file "LICENSE" at the top of the source distribution for %%% full license terms. %%% %%%---------------------------------------------------------------------- -define(DRV_NAME, "spread_drv"). -define(MAX_UINT32, 16#FFFFFFFF). -define(LEN_NUL_TERM, ?MAX_UINT32). %%% %%% Driver<->emulator communication codes (xref with top of spread_drv.h) %%% -define(_DEBUG, 0). -define(_SP_CONNECT, 1). -define(_SP_DISCONNECT, 2). -define(_SP_JOIN, 3). -define(_SP_LEAVE, 4). -define(_SP_MULTICAST, 5). -define(_SP_MULTIGROUP_MULTICAST, 6). -define(_SP_POLL, 7). -define(_SP_RECEIVE, 8). -define(_SP_GET_GID_OFFSET_MEMB_MESS, 9). -define(_SP_GET_NUM_VS_OFFSET_MEMB_MESS, 10). -define(_SP_GET_VS_SET_OFFSET_MEMB_MESS, 11). -define(_SET_ACTIVE, 12). %%% %%% Constants %%% -ifndef(_MAX_MULTIGROUPS). -define(_MAX_MULTIGROUPS, 64). % Max number of groups in a multi-group multicast message -endif. -ifndef(_MAX_MSGLEN). -define(_MAX_MSGLEN, (99*1024)). % Max length of a multicast message -endif. %%% %%% Verbatim stuff %%% -define(LOW_PRIORITY, 0). -define(MEDIUM_PRIORITY, 1). -define(HIGH_PRIORITY, 2). -define(DEFAULT_SPREAD_PORT, 4803). -define(UNRELIABLE_MESS, 16#00000001). -define(RELIABLE_MESS, 16#00000002). -define(FIFO_MESS, 16#00000004). -define(CAUSAL_MESS, 16#00000008). -define(AGREED_MESS, 16#00000010). -define(SAFE_MESS, 16#00000020). -define(REGULAR_MESS, 16#0000003f). -define(SELF_DISCARD, 16#00000040). -define(DROP_RECV, 16#01000000). -define(REG_MEMB_MESS, 16#00001000). -define(TRANSITION_MESS, 16#00002000). -define(CAUSED_BY_JOIN, 16#00000100). -define(CAUSED_BY_LEAVE, 16#00000200). -define(CAUSED_BY_DISCONNECT, 16#00000400). -define(CAUSED_BY_NETWORK, 16#00000800). -define(MEMBERSHIP_MESS, 16#00003f00). -define(ENDIAN_RESERVED, 16#80000080). -define(RESERVED, 16#003fc000). -define(REJECT_MESS, 16#00400000). -define(ACCEPT_SESSION, 1). -define(ILLEGAL_SPREAD, -1). -define(COULD_NOT_CONNECT, -2). -define(REJECT_QUOTA, -3). -define(REJECT_NO_NAME, -4). -define(REJECT_ILLEGAL_NAME, -5). -define(REJECT_NOT_UNIQUE, -6). -define(REJECT_VERSION, -7). -define(CONNECTION_CLOSED, -8). -define(REJECT_AUTH, -9). -define(ILLEGAL_SESSION, -11). -define(ILLEGAL_SERVICE, -12). -define(ILLEGAL_MESSAGE, -13). -define(ILLEGAL_GROUP, -14). -define(BUFFER_TOO_SHORT, -15). -define(GROUPS_TOO_SHORT, -16). -define(MESSAGE_TOO_LONG, -17). -define(MAX_GROUP_NAME, 32). -define(MAX_PRIVATE_NAME, 10).%largest possible size of private_name field of SP_connect() -define(MAX_PROC_NAME, 20).%largest possible size of process name of daemon %%% %%% End of autogenerated code %%% script = ../../edtk/hrl_template.gsl %%% filename = spread.xml %%% gslgen version = 2.000 Beta 1 %%% date = 2003/05/20 %%% time = 0:49:28 %%% --- NEW FILE: spread_floodrec.erl --- %%%---------------------------------------------------------------------- %%% File : spread_floodrec.erl %%% Author : Scott Lystig Fritchie <slf...@sn...> %%% Purpose : Spread flooder receiver %%% %%% usage: {ok, Pid} = spread_floodrec:start_link("flooder"). %%% %%% Then run the "spflooder" application, which is distributed with %%% the Spread toolkit. The spread_floodrec process will spit out %%% a message every time it receives 1,000 messages from the "flooder" %%% group. At any time, spread_floodrec:dump_state(Pid) can be used %%% to dump the (simple) state of the gen_server. Currently it just %%% keeps track of how many regular messages and how many membership %%% messages have been received. %%% %%%---------------------------------------------------------------------- -module(spread_floodrec). %%-compile(export_all). %%-export([Function/Arity, ...]). -behaviour(gen_server). -define(NAME, ?MODULE). -define(Timeout, infinity). %% External exports -export([start_link/1]). -export([dump_state/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -record(state, { pid, % Pid of spread membs = 0, msgs = 0 }). %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- start_link(Group) -> gen_server:start_link({local, ?NAME}, ?MODULE, [Group], []). dump_state(Pid) -> gen_server:call(Pid, {dump_state}, ?Timeout). %%%---------------------------------------------------------------------- %%% Callback functions from gen_server %%%---------------------------------------------------------------------- %%---------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %%---------------------------------------------------------------------- init([Group]) -> {ok, Pid} = spread:start_link(), spread:subscribe(Pid, Group, true), {ok, #state{pid = Pid}}. %%---------------------------------------------------------------------- %% Func: handle_call/3 %% Returns: {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | (terminate/2 is called) %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_call({dump_state}, From, State) -> Reply = State, {reply, Reply, State}. %%---------------------------------------------------------------------- %% Func: handle_cast/2 %% Returns: {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_cast(Msg, State) -> io:format("XXXYYYXXX ~w: ~s:handle_cast got ~w\n", [self(), ?MODULE, Msg]), {noreply, State}. %%---------------------------------------------------------------------- %% Func: handle_info/2 %% Returns: {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_info({spread, Pid, membership, _, _}, State) when Pid == State#state.pid -> N = State#state.membs + 1, {noreply, State#state{membs = N}}; handle_info({spread, Pid, msg, _, _, _, _, _}, State) when Pid == State#state.pid -> N = State#state.msgs + 1, if N rem 1000 == 0 -> io:format("Msg count = ~w\n", [N]); true -> ok end, {noreply, State#state{msgs = N}}; handle_info(Info, State) -> io:format("XXXYYYXXX ~w: ~s:handle_info got ~w\n", [self(), ?MODULE, Info]), {noreply, State}. %%---------------------------------------------------------------------- %% Func: terminate/2 %% Purpose: Shutdown the server %% Returns: any (ignored by gen_server) %%---------------------------------------------------------------------- terminate(Reason, State) -> ok. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- |