RabbitMQ - Work queues
阿新 • • 發佈:2017-10-06
print () rabbitmq exit con consumer def todo reat
Producer:
private static void newTask(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE,true, false, false, null); String message = getMessage(args); channel.basicPublish("", TASK_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println("[x] send ‘" + message + "‘"); } catch (Exception e) {// TODO: handle exception } } private static String getMessage(String[] strings) { if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) {int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
build.bat
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; javac -cp %CP% Sender.java -d .
run.bat
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; java -cp %CP% rm_producer.Sender hello,world
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; java -cp %CP% rm_producer.Sender hello,world...
Consumer:
private static void taskWorker() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); try { channel.queueDeclare(TASK_QUEUE, true, false, false, null); System.out.println("[*] Waiting for message, to exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[x] Receive message ‘" + message + "‘"); try { doWork(message); } finally { System.out.print("[x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE, false, consumer); } catch (Exception e) { // TODO: handle exception } // System.in.read(); // channel.close(); // connection.close(); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == ‘.‘) { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } }
build.bat
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; javac -cp %CP% Receiver.java -d .
run.bat
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; java -cp %CP% rm_consumer.Receiver > log.txt
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; java -cp %CP% rm_consumer.Receiver
RabbitMQ - Work queues