1. 程式人生 > >Kafka學習(一)生產者producer(個人規範用法)

Kafka學習(一)生產者producer(個人規範用法)

生產者:

@Slf4j
@Component
public class KafkaProducerTest {

   //配置類
    public Map<String, Object> init() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "*********");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }


    //測試解鎖
    @Test
    public void send() throws JsonProcessingException {
        KafkaProducer<String, String> producer = new KafkaProducer<>(init());

        String topic = KafkaMessageConfig.BUSINESS_SERVICE_TOPIC;
        Map<String, Object> params = new HashMap<>();
        params.put("OperationType", KafkaMessageConfig.START_CHARGE_REQUEST);
        params.put("StartChargeSeq", "1354367");
        params.put("UserId", 123);
        params.put("StartType", "THRAPP");
        params.put("Port", "8137");
        String msg = JsonUtil.map2Json(params);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
                    "1" + i, msg);
            producer.send(record);
            log.info("kafka send message:[{}]", record);
        }
        producer.close();
    }

 

 

單例方式:

@Component
@EnableKafka
@Configuration
public class KafkaProducer {

    @Autowired
    KafkaTemplate kafkaTemplate;

    private static final KafkaProducer kafkaProducer = new KafkaProducer();

    public synchronized static KafkaProducer getInstance() {
        return kafkaProducer;
    }

    public Future<RecordMetadata> send(String topic, String key, String message) {
        final ProducerRecord <String, String> record = new ProducerRecord<String, String>(topic, key, message);
        Future<RecordMetadata> result = kafkaTemplate.send(topic,key,message);

        return result;
    }

}

配置:

@Configuration
@EnableKafka
public class KafkaProducersConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String brokers;
    @Value("${spring.kafka.producer.key-serializer}")
    private String keyType;
    @Value("${spring.kafka.producer.value-serializer}")
    private String valueType;


    @Bean("kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        return kafkaTemplate;
    }

    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyType);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueType);

        return new DefaultKafkaProducerFactory<String, String>(properties);
    }
}

還是自己寫的程式碼適合自己用,網上的很多程式碼雜而無用!