spring 整合 kafka
使用spring-integration-kafka傳送訊息
Outbound Channel Adapter用來發送訊息到Kafka。 訊息從Spring Integration Channel中讀取。 你可以在Spring application context指定這個channel。
一旦配置好這個Channel,就可以利用這個Channel往Kafka發訊息。 明顯地,Spring Integration特定的訊息傳送給這個Adaptor,然後傳送前在內部被轉為Kafka訊息。當前的版本要求你必須指定訊息key和topic作為頭部資料 (header),訊息作為有載荷(payload)。
例如
123456 |
final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);channel.send( MessageBuilder.withPayload(payload) //設定有效載荷 .setHeader("messageKey", "key") //指定key .setHeader("topic", "test").build()); /指定topic/ |
實際程式碼如下:
12345678910111213141516171819202122232425262728 |
import java.util.Random;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.integration.support.MessageBuilder;import org.springframework.messaging.MessageChannel;public class Producer { private static final String CONFIG = "/context.xml"; private |
Spring 配置檔案:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" |