RabbitMQ四種Exchange型別之Direct (Erlang)
阿新 • • 發佈:2019-01-28
生產者:-module(mod_direct_receive). -behaviour(gen_server). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([start_link/1]). -include("common.hrl"). -record(state, {routing_key = <<"">>}). start_link(RoutingKey) -> Server1 = lists:concat([?MODULE,erlang:binary_to_list(RoutingKey)]), Server2 = erlang:list_to_atom(Server1), gen_server:start_link({local,Server2}, ?MODULE, [RoutingKey], []). init([RoutingKey]) -> start(RoutingKey), {ok, #state{routing_key=RoutingKey}}. handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info({'basic.consume_ok',_}, State) -> {noreply, State}; handle_info({#'basic.deliver'{},#amqp_msg{payload=Msg}}, State) -> io:format(" [routing_key = ~p] receive messages is ~p~n",[State#state.routing_key,Msg]), {noreply, State}; handle_info(Info, State) -> io:format("[routing_key = ~p] unknown messages is ~p~n", [State#state.routing_key,Info]), {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. start(RoutingKey) -> Params = #amqp_params_network{host=?HOST,username=?USER_NAME,password=?PASSWORD}, case amqp_connection:start(Params) of {ok,ConnectionPid} -> {ok, Channel} = amqp_connection:open_channel(ConnectionPid), %%生成佇列名稱 Queue = lists:concat([fanout_queue,now_time()]), QueueName = erlang:list_to_binary(Queue), %%宣告佇列 amqp_channel:call(Channel, #'queue.declare'{queue = QueueName,auto_delete=true}), %%宣告exchange amqp_channel:call(Channel, #'exchange.declare'{ auto_delete =true,exchange = <<"direct">>, type = ?EXCHANGE_TYPE_DIRECT}), %%佇列繫結到exchange amqp_channel:call(Channel, #'queue.bind'{queue = QueueName, exchange = <<"direct">>,routing_key = RoutingKey}), io:format(" [routing_key = ~p] Waiting for messages......~n",[RoutingKey]), amqp_channel:subscribe(Channel, #'basic.consume'{queue = QueueName,no_ack = true}, self()); {error,Resaon} -> io:format("[routing_key = ~p] connection rabbit error: ~p~n", [RoutingKey,Resaon]), Resaon end. now_time()-> {A, B, _} = os:timestamp(), A * 1000000 + B.