tranquilizer實現BeamFactory流式資料寫入到Druid
阿新 • • 發佈:2018-12-10
package com.icsoc.report.druid;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.*;
import com.metamx.tranquility.flink.BeamFactory ;
import com.metamx.tranquility.typeclass.Timestamper;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework .CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.joda.time.DateTime;
import org.joda.time.Period;
import scala.collection.IndexedSeq;
import java.util.List;
import java.util.Map;
/*******************************************************************************
* 版權資訊:北京中通天鴻武漢分公司
* @author xuchang
* Copyright: Copyright (c) 2007北京中通天鴻武漢分公司,Inc.All Rights Reserved.
* Description:
******************************************************************************/
public abstract class CtiDruidEventBeamFactory implements BeamFactory<Map<String, Object>> {
@Override
public Beam<Map<String, Object>> makeBeam() {
CuratorFramework curator = CuratorFrameworkFactory.newClient("localhost:2181"
, new BoundedExponentialBackoffRetry(100, 3000, 5));
curator.start();
String indexService = "druid/overlord";
String discoveryPath = "foo";
String dataSource = "foo";
final String timestampColumnName = "timestamp";
List<String> dimensions = ImmutableList.of("bar");
List<AggregatorFactory> aggregators = ImmutableList.of();
boolean isRollup = true;
final Timestamper<Map<String, Object>> timestamper = (Timestamper<Map<String, Object>>) theMap -> new DateTime(Long.valueOf(String.valueOf(theMap.get(timestampColumnName))));
return DruidBeams.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(indexService, dataSource))
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
.tuning(
ClusteredBeamTuning.builder()
.segmentGranularity(Granularity.HOUR)
.windowPeriod(new Period("PT10M"))
.partitions(1)
.replicants(1)
.build()
)
.buildBeam();
}
}