spring---訊息訂閱釋出之RabbiteMQ
阿新 • • 發佈:2019-01-06
上一篇文章我們使用spring + redis實現訊息的訂閱釋出,但是redis作為訊息元件僅適合輕量級的任務處理,例如:秒殺計數器、快取等,對於重量級,高併發的處理redis就稍顯劣勢。接下來我們spring+RabbiteMQ的實現。
準備
首先安裝ErLang環境,因為RabbiteMQ依賴ErLang,這裡我們使用windows環境,因為都有exe。
Erlang下載地址
RabbiteMQ下載地址
編碼
首先新增依賴jar包:
<dependency>
<groupId>org.springframework.amqp</groupId >
<artifactId>spring-amqp</artifactId>
<version>1.5.4.RELEASE </version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId> spring-rabbit</artifactId>
<version>1.5.4.RELEASE</version >
</dependency>
接下來同上一篇文章一樣新增訊息接收類Receiver:
import java.util.concurrent.CountDownLatch;
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
最後來看一下訊息的傳送Application:
package rabbitmq;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import ch.qos.logback.core.Context;
@SpringBootApplication
public class Application implements CommandLineRunner {
final static String queueName = "spring-boot";
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
AnnotationConfigApplicationContext context(){
return new AnnotationConfigApplicationContext();
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Waiting five seconds...");
Thread.sleep(5000);
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(queueName, "Hello from RabbitMQ!");
receiver().getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
最後一起看一下執行的結果:
Sending message...
Received <Hello from RabbitMQ!>