kafka推送訊息(Producer)
阿新 • • 發佈:2018-12-29
// 儲存MAP
public static Map<String, IsliCodeLog> logMap = Collections
.synchronizedMap(new HashMap<String, IsliCodeLog>());
// 主題
public static String myTopic = SystemConfig.getString("kafka_log_topic");
// 服務編碼
public static String scCode = SystemConfig.getString("code");
/**
* 1.日誌直接推送給kafka伺服器;2.若kafka宕機,則日誌儲存到資料庫裡
*
* @param log
* 日誌實體
*/
public void pushLogToKafka() {
if (logLock.tryLock()) {
Producer<String, String> producer = null;
try {
if (logMap.size() > 0) {
producer = new KafkaUtil().getProducer();
// 使用Iterator遍歷Map
for (Map.Entry<String, IsliCodeLog> entry : logMap
.entrySet()) {
if (null == entry) {
break;
}
LOGGER.info(" logMap.size=======>>>>>" + logMap.size());
IsliCodeLog log = entry.getValue();
if (null != log) {
log.setServiceCode(scCode);// 服務編碼
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
myTopic, log.getIsliCode(),
JsonUtils.toJson(log));
/**
* 釋出訊息(非同步)
*
* 1.kafka伺服器正常,日誌直接傳送給kafka
*
* 2.存在的問題是:kafka宕機,send()不拋異常,這種情況則日誌插入到資料庫
*
*/
producer.send(record,
new PushLogToKafkaCallback(entry));
}
}
}
} catch (Exception e) {
LOGGER.error("[ParseIsliCodeService.pushLogToKafka] error:"
+ e.getMessage());
} finally {
if (null != producer) {
producer.close();
}
logLock.unlock();
}
}
}
/**
* 日誌訊息推送給kafka回撥函式類
*
* @author wuJH
*
*/
class PushLogToKafkaCallback implements Callback {
private Map.Entry<String, IsliCodeLog> entry;
PushLogToKafkaCallback(Map.Entry<String, IsliCodeLog> entry) {
this.entry = entry;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
try {
if (null != e) {
parseIsliCodeLogDAO.insertIsliCodeLog(entry.getValue());
LOGGER.error("PushLogToKafkaCallback[onCompletion] error:"
+ e.getMessage());
}
} catch (Exception ex) {
LOGGER.error("PushLogToKafkaCallback[onCompletion] catch_error:"
+ ex.getMessage());
} finally {
String key = entry.getKey();
if (logMap.containsKey(key)) {
logMap.remove(entry.getKey());// 刪除MAP
}
}
}
}
public static Map<String, IsliCodeLog> logMap = Collections
.synchronizedMap(new HashMap<String, IsliCodeLog>());
// 主題
public static String myTopic = SystemConfig.getString("kafka_log_topic");
// 服務編碼
public static String scCode = SystemConfig.getString("code");
/**
* 1.日誌直接推送給kafka伺服器;2.若kafka宕機,則日誌儲存到資料庫裡
*
* @param log
* 日誌實體
*/
public void pushLogToKafka() {
if (logLock.tryLock()) {
Producer<String, String> producer = null;
try {
if (logMap.size() > 0) {
producer = new KafkaUtil().getProducer();
// 使用Iterator遍歷Map
for (Map.Entry<String, IsliCodeLog> entry : logMap
.entrySet()) {
if (null == entry) {
break;
}
LOGGER.info(" logMap.size=======>>>>>" + logMap.size());
IsliCodeLog log = entry.getValue();
if (null != log) {
log.setServiceCode(scCode);// 服務編碼
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
myTopic, log.getIsliCode(),
JsonUtils.toJson(log));
/**
* 釋出訊息(非同步)
*
* 1.kafka伺服器正常,日誌直接傳送給kafka
*
* 2.存在的問題是:kafka宕機,send()不拋異常,這種情況則日誌插入到資料庫
*
*/
producer.send(record,
new PushLogToKafkaCallback(entry));
}
}
}
} catch (Exception e) {
LOGGER.error("[ParseIsliCodeService.pushLogToKafka] error:"
+ e.getMessage());
} finally {
if (null != producer) {
producer.close();
}
logLock.unlock();
}
}
}
/**
* 日誌訊息推送給kafka回撥函式類
*
* @author wuJH
*
*/
class PushLogToKafkaCallback implements Callback {
private Map.Entry<String, IsliCodeLog> entry;
PushLogToKafkaCallback(Map.Entry<String, IsliCodeLog> entry) {
this.entry = entry;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
try {
if (null != e) {
parseIsliCodeLogDAO.insertIsliCodeLog(entry.getValue());
LOGGER.error("PushLogToKafkaCallback[onCompletion] error:"
+ e.getMessage());
}
} catch (Exception ex) {
LOGGER.error("PushLogToKafkaCallback[onCompletion] catch_error:"
+ ex.getMessage());
} finally {
String key = entry.getKey();
if (logMap.containsKey(key)) {
logMap.remove(entry.getKey());// 刪除MAP
}
}
}
}