1. 程式人生 > >kafka消費者實時消費數據存入hdfs java scalca 代碼

kafka消費者實時消費數據存入hdfs java scalca 代碼

cto tel exec prope println support asn 我們 main



hadoop-client依賴很亂 調試很多次cdh版本好多jar沒有 用hadoop2.7.3可以

   自定義輸出流的池子進行流管理
 public void writeLog2HDFS(String path, byte[] log) {
        try {
            //得到我們的裝飾流
            FSDataOutputStream out = HDFSOutputStreamPool.getInstance().takeOutputStream(path);
            out.write(log);
            out.write(
"\r\n".getBytes()); out.hsync(); out.close(); } catch (Exception e) { e.printStackTrace(); } } }
/**
  * @created by imp ON 2019/3/1
  */
object KafkaScalaConsumer {

  val  write=new HDFSWriter()

  def ZK_CONN     = "192.168.121.12:2181
" def GROUP_ID = "1test-consumer-group109" def TOPIC = "eshop" def main(args: Array[String]): Unit = { //println(" 開始了 ") val connector = Consumer.create(createConfig()) val topicCountMap = new HashMap[String, Int]() topicCountMap.put(TOPIC, 3) // TOPIC在創建時就指定了它有3個partition val msgStreams: Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]]
= connector.createMessageStreams(topicCountMap) println("# of streams is " + msgStreams.get(TOPIC).get.size) val threadPool:ExecutorService=Executors.newFixedThreadPool(3) var index = 0; for (stream <- msgStreams.get(TOPIC).get) { threadPool.execute(new ThreadDemo("consumer_"+index,stream)) index+=1; } } class ThreadDemo(threadName:String,stream:KafkaStream[Array[Byte], Array[Byte]]) extends Runnable{ override def run(): Unit = { val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator(); while(it.hasNext()){ val data : MessageAndMetadata[Array[Byte], Array[Byte]] = it.next() val msg=data.message() val log = new String(msg) val arr = StringUtil.splitLog(log) if (arr == null || arr.length < 1) return //todo: continue is not supported //主機名 val hostname = StringUtil.getHostname(arr) //日期串 val dateStr = StringUtil.formatYyyyMmDdHhMi(arr) //path val rawPath = "/spark/eshop/" + dateStr + "/" + hostname + ".log" //寫入數據到hdfs System.out.println(log) write .writeLog2HDFS(rawPath, msg) } } } def createConfig(): ConsumerConfig = { val props = new Properties() props.put("zookeeper.connect", ZK_CONN) // props.put("bootstrap.servers","localhost:9092") props.put("group.id", GROUP_ID) props.put("zookeeper.session.timeout.ms", "5000") props.put("zookeeper.connection.timeout.ms","10000") props.put("auto.offset.reset", "smallest") props.put("auto.commit.interval.ms", "300") props.put("rebalance.backoff.ms","2000") props.put("rebalance.max.retries","10") props.put("auto.offset.reset", "smallest") new ConsumerConfig(props) } }

kafka消費者實時消費數據存入hdfs java scalca 代碼