1. 程式人生 > >kafka資料匯入hbase

kafka資料匯入hbase

我們在使用kafka處理資料的過程中會使用kafka跟一下資料庫進行互動,Hbase就是其中的一種。下面給大家介紹一下kafka中的資料是如何匯入Hbase的。

本文的思路是通過consumers把資料消費到Hbase中。

首先在Hbase中建立表,建立表可以在Hbase客戶端建立也可以通過API建立,這裡介紹通過API建立表的方法:

建立CreatTableTest類

import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.HColumnDescriptor; 
import org.apache.hadoop.hbase.HTableDescriptor; 
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class CreatTableTest {
	public static void main(String[] args) throws IOException  { 
		//設定HBase據庫的連線配置引數
		Configuration conf = HBaseConfiguration.create(); 
		conf.set("hbase.zookeeper.quorum",  "192.168.5.128");  //  Zookeeper的地址
		conf.set("hbase.zookeeper.property.clientPort", "42182"); 
		String tableName = "emp";
		String[] family = { "basicinfo","deptinfo"}; 
		HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); 
		//建立表物件
		HTableDescriptor hbaseTableDesc = new HTableDescriptor(tableName); 
		for(int i = 0; i < family.length; i++) { 
		//設定表字段
		hbaseTableDesc.addFamily(new HColumnDescriptor(family[i])); 

		} 
		//判斷表是否存在,不存在則建立,存在則列印提示資訊
		if(hbaseAdmin.tableExists(tableName)) { 
		System.out.println("TableExists!"); 
		System.exit(0); 
		} else{ 
		hbaseAdmin.createTable(hbaseTableDesc); 
		System.out.println("Create table Success!"); 
		} 
	} 
}

建立表之後我們建立一個consumer來消費資料到Hbase中

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.teamsun.kafka.m001.KafkaProperties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer3 extends Thread {
	private final ConsumerConnector consumer;
	private final String topic;

	public KafkaConsumer3(String topic) {
		consumer = kafka.consumer.Consumer
				.createJavaConsumerConnector(createConsumerConfig());
		this.topic = topic;
	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", KafkaProperties.zkConnect);
		props.put("group.id", KafkaProperties.groupId1);
		props.put("zookeeper.session.timeout.ms", "40000");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		return new ConsumerConfig(props);
	}

	@Override
	public void run() {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(1));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
				.createMessageStreams(topicCountMap);
		KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
	    HBaseUtils hbase = new HBaseUtils();  
		while (it.hasNext()) {
			System.out.println("3receive:" + new String(it.next().message()));
			try {
				hbase.put(new String(it.next().message()));
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
//			try {
//				sleep(300);    // 每條訊息延遲300ms
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//			}
		}
	}
}

再建立一個HBaseUtils來指定要連線的Hbase資料庫

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseUtils {
	public  void put(String string) throws IOException { 
	//設定HBase據庫的連線配置引數
	Configuration conf = HBaseConfiguration.create(); 
	conf.set("hbase.zookeeper.quorum",  "192.168.5.128");  //  Zookeeper的地址
	conf.set("hbase.zookeeper.property.clientPort", "42182"); 
	Random random = new Random();
	long a = random.nextInt(1000000000);           
	String tableName = "emp"; 
	String rowkey = "rowkey"+a ;
	String columnFamily = "basicinfo"; 
	String column = "empname"; 
	//String value = string; 
	HTable table=new HTable(conf, tableName); 
	Put put=new Put(Bytes.toBytes(rowkey)); 
	put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(string)); 
	table.put(put);//放入表
	table.close();//釋放資源
	}
}

最後再加上consumer的配置檔案就大功告成了。

public interface KafkaProperties {

	final static String zkConnect = "hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182";
	final static String groupId1= "group1";
	final static String topic = "test3";
	final static String kafkaServerURL = "hadoop0,hadoop1";
	final static int kafkaServerPort = 9092;
	final static int kafkaProducerBufferSize = 64 * 1024;
	final static int connectionTimeOut = 20000;
	final static int reconnectInterval = 10000;
	final static String clientId = "SimpleConsumerDemoClient";
}

然後執行consumer就可以了,注意要保證topic中有訊息才可以消費。

public class KafkaConsumerProducerTest {

	public static void main(String[] args) {
//		 KafkaProducer1 producerThread1 = new KafkaProducer1(KafkaProperties.topic);
//       producerThread1.start();
//       KafkaProducer2 producerThread2 = new KafkaProducer2(KafkaProperties.topic);
//       producerThread2.start();
//       KafkaProducer3 producerThread3 = new KafkaProducer3(KafkaProperties.topic);
//       producerThread3.start();
        
//       KafkaConsumer1 consumerThread1 = new KafkaConsumer1(KafkaProperties.topic);
//       consumerThread1.start();
//       KafkaConsumer2 consumerThread2 = new KafkaConsumer2(KafkaProperties.topic);
//       consumerThread2.start();
         KafkaConsumer3 consumerThread3 = new KafkaConsumer3(KafkaProperties.topic);
         consumerThread3.start();
//       KafkaConsumer4 consumerThread4 = new KafkaConsumer4(KafkaProperties.topic);
//       consumerThread4.start();
	}
}

在HBase客戶端執行

            hbase(main):063:0> scan  'emp'  

就可以檢視到資料了。

以上就是kafka資料進入Hbase的一個例子,當然上訴只是保證資料走通了,大家在具體專案中什麼需求,還需要自行修改和完善。

——————————————————————————————————

作者:桃花惜春風

轉載請標明出處,原文地址:  

如果感覺本文對您有幫助,請留下您的贊,您的支援是我堅持寫作最大的動力,謝謝!

相關推薦

kafka資料匯入hbase

我們在使用kafka處理資料的過程中會使用kafka跟一下資料庫進行互動,Hbase就是其中的一種。下面給大家介紹一下kafka中的資料是如何匯入Hbase的。 本文的思路是通過consumers把資料消費到Hbase中。 首先在Hbase中建立表,建立表可以在H

kafka資料hbase遷移到hdfs,並按天載入到hive表(hbase與hadoop為不同叢集)

需求:由於我們用的阿里雲Hbase,按儲存收費,現在需要把kafka的資料直接同步到自己搭建的hadoop叢集上,(kafka和hadoop叢集在同一個區域網),然後對接到hive表中去,表按每天做分割槽 一、首先檢視kafka最小偏移量(offset) /usr/local/kafka/bin/k

簡單實現kafka資料寫入hbase

測試資料格式 19392963501,17816115082,2018-09-18 16:19:44,1431 19392963501,17816115082,2018-09-18 16:19:44,1431 14081946321,13094566759,2018-05-23

Atlas kafka資料匯入失敗問題與zkUtils中Zookeeper連線解析

Atlas kafka資料匯入失敗 atlas版本:apache-atlas-1.0.0 Atlas安裝完後需要匯入hive和kafka的資料 呼叫Atlas的import-kafka.sh匯入kafka資料 結果顯示Kafka Data Model impor

Mysql 資料匯入 Hbase

目錄 一、前言 一、前言 在大資料專案中需要做資料遷移時,我們第一時間總會想到sqoop。sqoop是apache 旗下一款“Hadoop 和關係資料庫伺服器之間傳送資料”的工具,

將sqlserver的資料匯入hbase

將sqlserver的資料匯入hbase中 1.解壓sqoop-sqlserver-1.0.tar.gz,並改名(可以不改)          tar  -zxvf  sqoop- sql

flume將資料匯入hbase

1 將hbase的lib目錄下jar拷貝到flume的lib目錄下;2 在hbase中建立儲存資料的表hbase(main):002:0> create 'test_idoall_org','uid','name'3 建立flume配置檔案 vi.confa1.sour

通過sqoop將MySQL資料庫中的資料匯入Hbase

從接觸到大資料到成功的實現一個功能期間走了不少彎路也踩了不少坑,這裡作為我的學習筆記也可以作為小白們的前車之鑑,少走彎路,有不正確之處,望指出 環境準備: hadoop、hbase、sqoop、mys

用sqoop將oracle資料匯入Hbase 使用筆記

網上已經有很多關於這方面的資料,但是我在使用過程中也遇見了不少問題 1. sqoop 的環境我沒有自己搭建  直接用的公司的 2. oracle 小白怕把公司環境弄壞了,自己用容器搭建了一個 docker pull docker.io/wnameless/oracle-xe

Kafka資料匯入匯出

這個命令執行的時候建立了一個獨立模式的 Kafka 連線程序,程序中建立了兩個連線:一個是源連線(對應 connect-file-source.properties 的配置資訊),它從輸入檔案中逐行讀取資料釋出到 Kafka 主題上;另一個是讀取連線(對應 connect-file-sink.properti

MapReduce將HDFS文字資料匯入HBase

HBase本身提供了很多種資料匯入的方式,通常有兩種常用方式: 使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將資料匯入HBase 另一種方式就是使用HBase原生Client API 本文就是示範如何通過M

Kettle 將Oracle資料匯入HBase的注意事項

      使用Kettle採集Oracle資料,匯入到HBase。 Kettle是一個比較好用的ETL工具,個人感覺Kettle比Sqoop還要好用,主要是因為Kettle通過視覺化,元件式拖拉配置

hive over hbase方式將文字庫資料匯入hbase

1,建立hbase表Corpus >> create 'Corpus','CF' 2,建立hive->hbase外表logic_Corpus,並對應hbase中的Corpus表 >> CREATE EXTERNAL TABLE logic_Co

spark讀取kafka資料寫入hbase

package com.prince.demo.test import java.util.UUID import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.hbase.HBa

資料匯入HBase常用方法

【編者按】要使用Hadoop,資料合併至關重要,HBase應用甚廣。一般而言,需要 針對不同情景模式將現有的各種型別的資料庫或資料檔案中的資料轉入至HBase 中。常見方式為:使用HBase的API中的Put方法; 使用HBase 的bulk load 工具;使用定製的MapReduce Job方式。《H

HBase Shell 操作命令&&使用Sqoop將資料匯入HBase

一、HBase Shell 操作命令實驗 要求: HBase叢集正常啟動,且可以執行正常 進入客戶端 [[email protected] ~]$ cd /home/zkpk/hbase-0

文字資料匯入HBASE

在將有定界符文字檔案匯入HBASE庫中,需要將後面的定界符去掉,否則將匯入失敗。如下所示:[[email protected] bin]$ cat /tmp/emp.txt1,A,201304,2,B,201305,3,C,201306,4,D,201307,這個

資料匯入終章:如何將HBase資料匯入HDFS?

我們的最終目標是將資料匯入Hadoop,在之前的章節中,我們介紹瞭如何將傳統關係資料庫的資料匯入Hadoop,本節涉及到了HBase。HBase是一種實時分散式資料儲存系統,通常位於與Hadoop叢集相同的硬體上,或者與Hadoop叢集緊密相連,能夠直接在MapReduce中使用HBase資料,或將

hbase資料匯入匯出

hbase資料匯入 將本地檔案(test.csv)上傳到hdfs的根目錄下,然後匯入資料到hbase 1.本地寫一個檔案進行測試,檔名為test.csv,內容如下: 2.將檔案上傳到Hadoop 3.檢視是否上傳成功(檔案存在,表示成功) 4.進入hbase s

flume將kafka中topic資料匯入hive中

一、首先更加資料的表結構在hive中進行表的建立。          create table AREA1(unid string,area_punid string,area_no string,area_name s