1. 程式人生 > >storm中一個Bolt發emit多次相同型別訊息

storm中一個Bolt發emit多次相同型別訊息

在storm中的Bolt中可以處理完成邏輯後,向後面的Blot繼續傳送訊息。

可以傳送多個不同的訊息,如:

collector.emit("update-delivered-status",new Values(emailDeliverStatus));

collector.emit("save-request",new Values(udsn));


也可以同一個型別的訊息傳送多個不同內容如;

for (int i = 0; i < emailParamVo.getReceiverNum(); i++) 
			{
				EmailDeliverStatus emailDeliverStatus = new EmailDeliverStatus();
				emailDeliverStatus.setCategoryId(emailParamVo.getCategoryId());
				emailDeliverStatus.setUpdateTime(emailParamVo.getUpdateTime());
				emailDeliverStatus.setStatus(emailParamVo.getEventType());
				emailDeliverStatus.setUserId(emailParamVo.getUserId());
				emailDeliverStatus.setMessageDetail(emailParamVo.getMessageDetail());
				
				StringBuilder receiverBuilder = new StringBuilder(emailParamVo.getReceivers());
				receiverBuilder = receiverBuilder.deleteCharAt(0);
				receiverBuilder = receiverBuilder.deleteCharAt(receiverBuilder.length()-1);
				String[] receivers = receiverBuilder.toString().split(" ");
				String receiver = receivers[i];
				emailDeliverStatus.setEmailId(emailParamVo.getEmailIdPre() + i + "$" + receiver);
				emailDeliverStatus.setReceiver(receiver);
				collector.emit("update-delivered-status",new Values(emailDeliverStatus));
			}


上面的寫法是沒有問題的,因為for迴圈裡面每次傳送的物件都是一個新的例項,但是如果把建立例項的動作放到外面,如:

EmailDeliverStatus emailDeliverStatus = new EmailDeliverStatus();
			emailDeliverStatus.setCategoryId(emailParamVo.getCategoryId());
			emailDeliverStatus.setUpdateTime(emailParamVo.getUpdateTime());
			emailDeliverStatus.setStatus(emailParamVo.getEventType());
			emailDeliverStatus.setUserId(emailParamVo.getUserId());
			emailDeliverStatus.setMessageDetail(emailParamVo.getMessageDetail());
			
			for (int i = 0; i < emailParamVo.getReceiverNum(); i++) 
			{
				StringBuilder receiverBuilder = new StringBuilder(emailParamVo.getReceivers());
				receiverBuilder = receiverBuilder.deleteCharAt(0);
				receiverBuilder = receiverBuilder.deleteCharAt(receiverBuilder.length()-1);
				String[] receivers = receiverBuilder.toString().split(" ");
				String receiver = receivers[i];
				emailDeliverStatus.setEmailId(emailParamVo.getEmailIdPre() + i + "$" + receiver);
				emailDeliverStatus.setReceiver(receiver);
				collector.emit("update-delivered-status",new Values(emailDeliverStatus));
			}

這樣就有問題,按照邏輯,我們是想讓其emit多個不同emailDeliverStatus物件的訊息,但是實際上這樣不不行的,因為storm的emit操作並不是立即執行的,

上面的程式碼就是建立在假設呼叫emit後,storm就會立即去傳送訊息。如果按照上面寫法,會發現接收訊息的bolt收到的for迴圈中的多個訊息都是最後一個訊息的重複多次。

因為storm並不是立即執行emit,而是在這個bolt執行的一個固定時間去emit的,所以emailDeliverStatus例項的初始化必須放到for迴圈的外面執行。