Kafka學習(一)生產者producer(個人規範用法)
阿新 • • 發佈:2018-11-08
生產者:
@Slf4j @Component public class KafkaProducerTest { //配置類 public Map<String, Object> init() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "*********"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return properties; } //測試解鎖 @Test public void send() throws JsonProcessingException { KafkaProducer<String, String> producer = new KafkaProducer<>(init()); String topic = KafkaMessageConfig.BUSINESS_SERVICE_TOPIC; Map<String, Object> params = new HashMap<>(); params.put("OperationType", KafkaMessageConfig.START_CHARGE_REQUEST); params.put("StartChargeSeq", "1354367"); params.put("UserId", 123); params.put("StartType", "THRAPP"); params.put("Port", "8137"); String msg = JsonUtil.map2Json(params); for (int i = 0; i < 5; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "1" + i, msg); producer.send(record); log.info("kafka send message:[{}]", record); } producer.close(); }
單例方式:
@Component @EnableKafka @Configuration public class KafkaProducer { @Autowired KafkaTemplate kafkaTemplate; private static final KafkaProducer kafkaProducer = new KafkaProducer(); public synchronized static KafkaProducer getInstance() { return kafkaProducer; } public Future<RecordMetadata> send(String topic, String key, String message) { final ProducerRecord <String, String> record = new ProducerRecord<String, String>(topic, key, message); Future<RecordMetadata> result = kafkaTemplate.send(topic,key,message); return result; } }
配置:
@Configuration @EnableKafka public class KafkaProducersConfig { @Value("${spring.kafka.bootstrap-servers}") private String brokers; @Value("${spring.kafka.producer.key-serializer}") private String keyType; @Value("${spring.kafka.producer.value-serializer}") private String valueType; @Bean("kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()); return kafkaTemplate; } public ProducerFactory<String, String> producerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyType); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueType); return new DefaultKafkaProducerFactory<String, String>(properties); } }
還是自己寫的程式碼適合自己用,網上的很多程式碼雜而無用!