1. 程式人生 > 其它 >java批量新增資料到elasticsearch

java批量新增資料到elasticsearch

ship物件類

public class Ship {
    private String mmsi;
    private String utc;
   


    public String getMmsi() {
        return mmsi;
    }

    public void setMmsi(String mmsi) {
        this.mmsi = mmsi;
    }

    public String getUtc() {
        return utc;
    }

    public void setUtc(String utc) {
        
this.utc = utc; } }

批量新增

public static void insertAllBulk(List<Ship> dataList,String indexName) {
        TransportClient client = null;
        try {
            client = connectionPool.getConnection();
            BulkRequestBuilder builder = client.prepareBulk();
            //請求命令數量
            int
z = 0; long l2 = System.currentTimeMillis(); int n = dataList.size(); if (n != 0) { //遍歷列表,每條資料生成一條請求命令 for (int i = 0; i < n; i++) { Ship data = dataList.remove(0); if (data != null) { String utc
= data.getUtc(); if (utc == null || utc.equals("")) { continue; } else { IndexRequestBuilder request = client.prepareIndex(indexName, "_doc").setSource( XContentFactory.jsonBuilder() .startObject() .field("mmsi", data.getMmsi()) .field("utc", utc) .endObject() ); //新增到批量請求 builder.add(request); z++; } } } //命令數不為零 if (z != 0) { long l3 = System.currentTimeMillis(); logger2.info("生成請求耗時:" + (l3 - l2)); //執行批量請求,獲取結果 BulkResponse bulkItemResponses = builder.get(); if (bulkItemResponses.hasFailures()) { logger2.error(bulkItemResponses.buildFailureMessage()); } long l4 = System.currentTimeMillis(); logger2.info("執行並返回結果耗時:" + (l4 - l3)); long millis = bulkItemResponses.getTook().getMillis(); dataList.clear(); logger2.info("船完成批量匯入ES" + n + "個,耗時" + millis); } } } catch (Exception e) { e.printStackTrace(); logger2.error("insertBulk 錯誤", e); } finally { connectionPool.releaseConnection(client); } }