spring boot整合用docker構建的rabbitmq
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
或者不設定密碼
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 rabbitmq:management
使用 docker inspect rabbit檢視容器的ip地址,
這個留著待會兒用spring boot連線的時候進行配置。
開啟瀏覽器 localhost:15672 來到rabbitmq的web管理介面,剛剛我們的rabbit容器是以預設的方式開啟的,所以賬號密碼都是 guest。
登入進去之後就設計佇列和交換機(類比計算機網路)
我設計了兩個佇列,分別是C.Queue 用來接收spider服務傳過來給web端的,spider_queue spider元件來接收web傳過來的專案。
編號 | Exchange | RoutingKey | Queue | 描述 |
---|---|---|---|---|
1 | C.Exchange | C.* | C.Queue | web服務端監聽,訊息來自於spider服務 |
2 | topicExchange | spider.* | spider_queue | spider服務監聽,訊息來自於web服務端 |
字首最好設定成一樣的,這樣一旦佇列多了,方便維護,以免眼花
建好之後:
然後用spring boot來呼叫
在pom.xml加入:
<dependency >
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
並在配置檔案中加入配置:
這是web服務的配置
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
com.shengqian.demo.rabbitmq.ToSpiderQueueName=C.Queue
com.shengqian.demo.rabbitmq.exchangename=C.Exchange
com.shengqian.demo.rabbitmq.routekey=C.*
com.shengqian.demo.rabbitmq.recv=recvMessage
下面來看看spider服務的配置
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
com.shengqian.demo.rabbitmq.ToSpiderQueueName=spider_queue
com.shengqian.demo.rabbitmq.exchangename=topicExchange
com.shengqian.demo.rabbitmq.routekey=spider.*
com.shengqian.demo.rabbitmq.recv=recvMessage
實際上沒什麼區別,唯一的區別就是監聽的佇列引數不一樣,詳細的都在那個表裡面了。
這裡的host就是剛剛我們檢視的docker 容器的ip。之所以能這麼做是因為我在我的路由表中加了一條路由,使得本機發出的172.17.0.0/16的ip能丟到docker的預設network中,有關詳情請參見我的 用springboot連線redis叢集一文,裡面有較為詳細的操作方法。
然後加入rabbitmq配置:
@Configuration
public class RabbitMQConfig {
@Autowired
private QueueService queueService;
@Value("${com.shengqian.demo.rabbitmq.ToSpiderQueueName}")
private String queueName;
@Value("${com.shengqian.demo.rabbitmq.exchangename}")
private String exchangeName;
@Value("${com.shengqian.demo.rabbitmq.routekey}")
private String routeKey;
@Value("${com.shengqian.demo.rabbitmq.recv}")
private String messageHandler;
@Bean
Queue getQueue() {
return new Queue(queueName);
}
@Bean
TopicExchange getTopicExchange() {
return new TopicExchange(exchangeName);
}
@Bean
Binding getBinding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routeKey);
}
@Bean
MessageListenerAdapter listenerAdapter(AmqpMessageService receiver) {
return new MessageListenerAdapter(receiver, messageHandler);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setPrefetchCount(1);
container.setExposeListenerChannel(true);
return container;
}
}
現在就可以使用了,實現一個簡單的兩個服務之間用訊息佇列進行對話的demo:
在web端
public interface AmqpMessageService {
/**
* 接收訊息佇列訊息
*
* @param message 訊息內容
*/
void recvMessage(String message);
/**
* 傳送訊息到訊息佇列
*
* @param message 訊息內容
*/
void sendMessage(String routingKey, String message);
}
public interface QueueService extends AmqpMessageService {
void receive(String message);
}
@Service
public class QueueServiceImpl implements QueueService, InitializingBean {
@Autowired
public RabbitTemplate rabbitTemplate;
private final static Logger LOGGER = LoggerFactory.getLogger(QueueServiceImpl.class);
public void receive(String message) {
LOGGER.info("接收到來自C.Queue佇列的訊息:" + message);
}
@Override
public void afterPropertiesSet() throws Exception {
//執行一些初始化操作,如程式重新啟動,需要載入掃描引數
}
/**
* 接收訊息佇列訊息
*
* @param message 訊息內容
*/
@Override
public void recvMessage(String message) {
receive(message);
}
/**
* 傳送訊息到訊息佇列
*
* @param message 訊息內容
*/
@Override
public void sendMessage(String routingKey,String message) {
rabbitTemplate.convertAndSend("topicExchange", routingKey, JSON.toString(message));
}
}
//最後定義一個controller
@Controller
public class TestController {
@Autowired
private QueueService queueService;
@ApiOperation(value = "測試rabbit", notes = "測試rabbitmq")
@RequestMapping(value = Path.TEST_RABBITMQ, method = RequestMethod.GET)
@ResponseBody
public Integer send(){
Map<String, Object> ans = new HashMap<>();
ans.put("hah", "hello spider");
queueService.sendMessage("spider.*", JSON.toString(ans));
return 1;
}
}
然後在spider服務端:
public interface AmqpMessageService {
/**
* 接收訊息佇列訊息
*
* @param message 訊息內容
*/
void recvMessage(String message);
/**
* 傳送訊息到訊息佇列
*
* @param message 訊息內容
*/
void sendMessage(String routingKey, String message);
}
public interface QueueService extends AmqpMessageService {
void receive(String message);
}
@Service
public class QueueServiceImpl implements QueueService, InitializingBean {
@Autowired
public RabbitTemplate rabbitTemplate;
private final static Logger LOGGER = LoggerFactory.getLogger(QueueServiceImpl.class);
public void receive(String message) {
LOGGER.info("接收到來自spider_queue佇列的訊息:" + message);
//處理一段時間然後,返回
Map<String, Object> ans = new HashMap<String, Object>();
ans.put("hello",message);
sendMessage("C.*",JSON.toString(ans));
}
@Override
public void afterPropertiesSet() throws Exception {
//執行一些初始化操作,如程式重新啟動,需要載入掃描引數
}
/**
* 接收訊息佇列訊息
*
* @param message 訊息內容
*/
@Override
public void recvMessage(String message) {
receive(message);
}
/**
* 傳送訊息到訊息佇列
*
* @param message 訊息內容
*/
@Override
public void sendMessage(String routingKey, String message) {
rabbitTemplate.convertAndSend("C.Exchange", routingKey, JSON.toString(message));
}
}
最後分別開啟兩個spring boot服務即可,我的spider專案也是基於spring boot構建的服務,當訂閱了訊息佇列,有訊息來的時候會驅動整個服務繼續執行,所以說該服務是訊息驅動的服務。
開啟兩個服務後,我在web服務中構建了swagger2,
直接點開第三個介面執行就可以了,有關如何在springboot環境下整合swagger請看我前面的部落格。執行結果如下:
可以正常使用。接下來計劃完成spider專案與docker環境下的cockroachDB叢集的互動問題,cockroachDB是新一代newSql。