1. 程式人生 > >tranquilizer實現BeamFactory流式資料寫入到Druid

tranquilizer實現BeamFactory流式資料寫入到Druid

package com.icsoc.report.druid;

import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.*;
import com.metamx.tranquility.flink.BeamFactory
; import com.metamx.tranquility.typeclass.Timestamper; import io.druid.granularity.QueryGranularities; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.indexing.granularity.GranularitySpec; import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.framework
.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.joda.time.DateTime; import org.joda.time.Period; import scala.collection.IndexedSeq; import java.util.List; import java.util.Map; /******************************************************************************* * 版權資訊:北京中通天鴻武漢分公司 * @author xuchang * Copyright: Copyright (c) 2007北京中通天鴻武漢分公司,Inc.All Rights Reserved. * Description: ******************************************************************************/
public abstract class CtiDruidEventBeamFactory implements BeamFactory<Map<String, Object>> { @Override public Beam<Map<String, Object>> makeBeam() { CuratorFramework curator = CuratorFrameworkFactory.newClient("localhost:2181" , new BoundedExponentialBackoffRetry(100, 3000, 5)); curator.start(); String indexService = "druid/overlord"; String discoveryPath = "foo"; String dataSource = "foo"; final String timestampColumnName = "timestamp"; List<String> dimensions = ImmutableList.of("bar"); List<AggregatorFactory> aggregators = ImmutableList.of(); boolean isRollup = true; final Timestamper<Map<String, Object>> timestamper = (Timestamper<Map<String, Object>>) theMap -> new DateTime(Long.valueOf(String.valueOf(theMap.get(timestampColumnName)))); return DruidBeams.builder(timestamper) .curator(curator) .discoveryPath(discoveryPath) .location(DruidLocation.create(indexService, dataSource)) .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE)) .tuning( ClusteredBeamTuning.builder() .segmentGranularity(Granularity.HOUR) .windowPeriod(new Period("PT10M")) .partitions(1) .replicants(1) .build() ) .buildBeam(); } }