1. 程式人生 > >CDH5.7.1上Kafka+SparkStream+Hive的實戰

CDH5.7.1上Kafka+SparkStream+Hive的實戰

    目前的專案中需要將kafka佇列的資料實時存到hive表中。應為之前工作中用到的是CDH5.11,而且spark等用的基本是最新版(2.2),語言也一直是Scala,所以這次要求Java語言以及低版本的spark,在寫程式的時候還是遇到了一些頭疼的事情。

環境:Centos6.5 Spark1.6.0 Kafka0.9.x Hive1.1.0 Zookeeper3.4.5 都是基於CDH5.7.1的

閱讀本文,你將瞭解到:

  1. 採用Direct方式消費Kafka資料到hive,並將offset提交到Zookeeper。Spark一些引數的使用
  2. 使用Kafka Old Hight-Level API消費Kafka資料,手動提交offset到zookeeper
  3. Hive JDBC的注意事項

##############################首先來看第一條 

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.zookeeper.ZooDefs;
import kafka.common.TopicAndPartition;
import kafka.serializer.StringDecoder;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;

/**
 * 採用Direct 方式拉取kafka資料到hive表,可以用hiveSql寫,也可以直接將資料寫入對應的hive目錄,然後執行任意重新整理語句
 * ALTER TABLE xxx ADD IF NOT EXISTS PARTITION (yue='2018-05',ri='2018-05-20')
 * offset手動提交到zookeeper
 */
public class SparkStreamKafka2HiveDirect {
    public static void main(String[] args) {
        String topic = "";
        String group = "";
        SparkConf conf = new SparkConf().setAppName("待儲存佇列到hive");
        //削峰,在任務積壓時,會減少每秒的拉取量
        conf.set("spark.streaming.backpressure.enabled", "true");
        // maxRetries預設就是1 接受資料相關的一共就只有兩個配置
        conf.set("spark.streaming.kafka.maxRetries", "1");
        //每秒最多拉取partition * 2 的資料
        conf.set("spark.streaming.kafka.maxRatePerPartition", "2");

        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(5));
        // 如果在這裡初始化hivecontext,在下面的運算元內使用hivecontext會報一個空指標異常,原因貌似是用的時候Hivecontext未初始化成功(請知道的大佬普及一下)
        // HiveContext hiveContext = new HiveContext(jsc);
        // kafka 引數
        HashMap<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "");
        kafkaParams.put("group.id", group);
        // kafkaParams.put("auto.offset.reset", "smallest");
        Set<String> topicSet = new HashSet<>();
        topicSet.add(topic);
        // 賦值操作不是執行緒安全的。若想不用鎖來實現,可以用AtomicReference<V>這個類,實現物件引用的原子更新
        final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
        // 讀取zookeeper中消費組的偏移量
        ZKGroupTopicDirs zgt = new ZKGroupTopicDirs(group, topic);
        final String zkTopicPath = zgt.consumerOffsetDir();
        // System.out.println(zkTopicPath);
        // 會寫在zookeeper根目錄下consumers下!!!
        ZkClient zkClient = new ZkClient("");
        int countChildren = zkClient.countChildren(zkTopicPath);

        Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();

        if (countChildren > 0) {
            for (int i = 0; i < countChildren; i++) {
                String path = zkTopicPath + "/" + i;
                String offset = zkClient.readData(path);
                TopicAndPartition topicAndPartition = new TopicAndPartition("", i);
                fromOffsets.put(topicAndPartition, Long.parseLong(offset));
            }
            /**
             * createDirectStream(JavaStreamingContext jssc, java.lang.Class<K> keyClass,
             * java.lang.Class<V> valueClass, java.lang.Class<KD> keyDecoderClass,
             * java.lang.Class<VD> valueDecoderClass, java.lang.Class<R> recordClass,
             * java.util.Map<java.lang.String,java.lang.String> kafkaParams,
             * java.util.Map<kafka.common.TopicAndPartition,java.lang.Long> fromOffsets,
             * Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler) Create an
             * input stream that directly pulls messages from Kafka Brokers without using
             * any receiver.
             */
            //幸虧java8支援lambda表示式呀,要不然寫慣了Scala的人簡直沒法活了~~~
            KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
                    String.class, kafkaParams, fromOffsets, v -> v.message()).foreachRDD(rdd -> {
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                offsetRanges.set(offsets);
                // 邏輯處理
                HiveContext hiveContext = new HiveContext(jsc);
                try {
                    //1-採用hivecontext執行"insert into table" 插入資料到hive
                    //2-將DF以Hive的儲存格式存到Hive目錄下
                    //更新zookeeper
                    ZkClient zkClient1 = new ZkClient("");
                    OffsetRange[] offsets1 = offsetRanges.get();
                    if (null != offsets1) {
                        for (OffsetRange o : offsets1) {
                            String zkPath = zkTopicPath + "/" + o.partition();
                            // System.out.println(zkPath + o.untilOffset());
                            new ZkUtils(zkClient1,
                                    new ZkConnection(""), false)
                                    .updatePersistentPath(zkPath, o.untilOffset() + "",
                                            ZooDefs.Ids.OPEN_ACL_UNSAFE);
                        }
                    }
                    zkClient.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        } else {
            KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
                    kafkaParams, topicSet).foreachRDD(rdd -> {
                if (!rdd.isEmpty()) {
                    OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                    offsetRanges.set(offsets);

                    HiveContext hiveContext = new HiveContext(jsc);
                    try {
                        //處理邏輯程式碼
                        // 更新zookeeper
                        ZkClient zkClient1 = new ZkClient("");
                        OffsetRange[] offsets1 = offsetRanges.get();
                        if (null != offsets1) {
                            for (OffsetRange o : offsets1) {
                                String zkPath = zkTopicPath + "/" + o.partition();
                                new ZkUtils(zkClient1,
                                        new ZkConnection(""), false)
                                        .updatePersistentPath(zkPath, o.untilOffset() + "",
                                                ZooDefs.Ids.OPEN_ACL_UNSAFE);
                            }
                        }
                        zkClient.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        jssc.start();
        jssc.awaitTermination();
    }
}

說說上面程式碼需要注意的地方,這種使用kafkaApi提交offset,會提交到zookeeper'/consumers/xxx'下,而我們專案需要提交到'/kafka/xxx'。除非改原始碼,否則貌似實現不了,我下載原始碼看了看,貌似路徑是寫死的,所以這種方案被pass掉了。

kafka原始碼裡是這樣的,第一個就聲明瞭根目錄

object ZkUtils {
  val ConsumersPath = "/consumers"
  val BrokerIdsPath = "/brokers/ids"
  val BrokerTopicsPath = "/brokers/topics"
  val ControllerPath = "/controller"
  val ControllerEpochPath = "/controller_epoch"
  val ReassignPartitionsPath = "/admin/reassign_partitions"
  val DeleteTopicsPath = "/admin/delete_topics"
  val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
  val BrokerSequenceIdPath = "/brokers/seqid"
  val IsrChangeNotificationPath = "/isr_change_notification"
  val EntityConfigPath = "/config"
  val EntityConfigChangesPath = "/config/changes"

所以嘗試用kafka自己的消費者API,這種方式和spark Receive方式一樣,會把offset提交到 zkBrokerList的路徑下,例如xxxx:2181;xxxx:2181;xxxx:2181/kafka,就不會走上面/consumers路徑了。

##############################第二條


import com.alibaba.fastjson.JSONObject;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.hadoop.fs.FSDataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.time.LocalDate;
import java.util.*;

public class Save2Hive {

    public static void main(String[] args) throws IOException {

        ConsumerConnector consum = new Save2Hive().createConsumer();
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(Configs.Glob_KafkaTopic_save, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consum.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStream.get(Configs.Glob_KafkaTopic_save).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        // 定義一個list存放message的value,我覺得這裡放一個數組資源佔用會少點,但是會加大程式碼量
        ArrayList<String> list = new ArrayList<String>(3000);
        HdfsFileUtil fileUtil = new HdfsFileUtil();
        String tableName1 = "source_feature_hive";
        String tableName2 = "source_detection_vehicle_all";

        while (it.hasNext()) {
            if (list.size() < 3000) {
                list.add(new String(it.next().message()));
            } else {
                String ri = LocalDate.now().toString();
                String yue = ri.substring(0, 7);
                //這裡是包裝了hdfs的append方法,追加寫一個檔案,直接寫到Hive表的資料目錄下,按照月和日分組
                FSDataOutputStream out1 = fileUtil.getOut(yue, ri, tableName1);
                FSDataOutputStream out2 = fileUtil.getOut(yue, ri, tableName2);
                list.forEach(x -> {
                    try {
                        out1.write("\n".getBytes("UTF-8"));
                        out1.hflush();
                        out2.write("\n".getBytes("UTF-8"));
                        out2.hflush();
                    } catch (UnsupportedEncodingException e) {
//							LogSet.WriteLog("Error", "4", "06", "600", false, e,"encodingException");
                    } catch (IOException e) {
//							LogSet.WriteLog("Error", "4", "06", "600", false, e,"kafka2HDFS failed");
                    }
                });
                list.clear();
                if (null != out1 && null != out2) {
                    try {
                        out1.close();
                        out2.close();
                        consum.commitOffsets();
                        //log 記錄寫入成功
                    } catch (Exception e) {
                        // LogSet.WriteLog("Error", "4", "06", "600", false, e,"FSoutput close fail");
                    } finally {
                        //TODO
                    }
                }
            }
        }
    }

    private ConsumerConnector createConsumer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", Configs.Glob_KafkaZkQuorum);// 宣告zk
        properties.put("group.id", Configs.SaveHive_KafkaConsumerGroup);// 指定消費組
        properties.put("rebalance.max.retries", "5");
        properties.put("refresh.leader.backoff.ms", "10000");
        properties.put("zookeeper.session.timeout.ms", "40000");
        properties.put("auto.commit.enable", "false");
        // properties.put("auto.offset.reset", "smallest");
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
}

上面的程式碼還是有一個致命的缺點,對是致命的缺點,那就是隻能單節點部署,因為append方法不可以同時操作一個檔案。改進:部署到三臺的機器上,配置檔案也需要三個,分別配置每臺機器的檔名。

##############################第三條

這種方式被改進後的第二種方式取代。要克服第二種方式只能單節點部署的缺點,最先想到的是將資料寫到本地,然後利用HiveJDBC'load data local inpath' 的方式寫到hive表,來看下面的錯誤程式碼:

getStream方法返回一個stmt,然後在main函式裡執行execute()load資料
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.alibaba.fastjson.JSONObject;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import ..kafkaConsumerAPI.Configs;

/**
 * 寫資料到臨時表 source_feature_hive 和 source_detection_vehicle_all
 */
public class SaveHiveService {

    public static void main(String[] args) {
        ConsumerConnector consum = new SaveHiveService().createConsumer();
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(Configs.Glob_KafkaTopic_save, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consum.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStream.get(Configs.Glob_KafkaTopic_save).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        // 定義一個list存放message的value
        ArrayList<String> list = new ArrayList<String>();

        String tableName1 = "";
        String tableName10 = "";
        String tableName2 = "";

        while (it.hasNext()) {

            if (list.size() < Integer.parseInt(Configs.Save_SyncBatchCount)) {
                list.add(new String(it.next().message()));
            } else {
                String ri = LocalDate.now().toString();
                String yue = ri.substring(0, 7);
                FileWriter stream1 = getStream(tableName1);
                FileWriter stream2 = getStream(tableName2);
                list.forEach(x -> {
                    try {
                        JSONObject value = JSONObject.parseObject(x);
                        stream1.write("" + "\n");
                        stream1.flush();

                    } catch (UnsupportedEncodingException e) {
//						LogSet.WriteLog("Error", "4", "06", "600", false, e, "encodingException");
                    } catch (IOException e) {
//						LogSet.WriteLog("Error", "4", "06", "600", false, e, "kafka2local failed");
                    }
                });

                if (null != stream1 && null != stream2) {
                    try {
                        stream1.close();
                        stream2.close();
                    } catch (Exception e) {
//						LogSet.WriteLog("Error", "4", "06", "600", false, e, "關閉流失敗");
                    }
                }
                try {
                    SaveHiveService.getStmt()
                            .execute("load data local inpath '/h/" + tableName1 + ".txt'"
                                    + " into table " + tableName10 + " partition(yue='" + yue + "',ri='" + ri + "')");
                    SaveHiveService.getStmt()
                            .execute("load data local inpath '/home/dispatch/hivedata/" + tableName2 + ".txt'"
                                    + " into table " + tableName2 + " partition(yue='" + yue + "',ri='" + ri + "')");
                    consum.commitOffsets();
//					LogSet.WriteLog("info", "4", "06", "600", true, "data write success");
                    list.clear();
                } catch (Exception e) {
//					LogSet.WriteLog("Error", "4", "06", "600", false, e, "hiveSql failed");
                }

            }

        }

    }

    private static Statement getStmt() {
        String driverName = "org.apache.hive.jdbc.HiveDriver";

        try {
            Class.forName(driverName);
        } catch (ClassNotFoundException e) {
//			LogSet.WriteLog("Error", "4", "06", "600", false, e, "hiveJDBC反射失敗");
        }
        Connection con = null;
        try {
            con = DriverManager.getConnection(Configs.HiveUrl, Configs.HiveUserName, Configs.HiveUserPwd);
        } catch (SQLException e) {
//			LogSet.WriteLog("Error", "4", "06", "600", false, e, "hiveJDBC建立連線失敗");
        }
        Statement stmt = null;
        try {
            stmt = con.createStatement();
        } catch (SQLException e) {
//			LogSet.WriteLog("Error", "4", "06", "600", false, e, "con.createStatement失敗");
        }
        return stmt;
    }

    private static FileWriter getStream(String tableName) {
        FileWriter fw = null;
        File f = null;
        try {
            f = new File("/h/" + tableName + ".txt");
            if (!f.exists())
                f.createNewFile();
            fw = new FileWriter("/h/" + tableName + ".txt");
        } catch (IOException e) {
//			LogSet.WriteLog("error", "4", "06", "600", false, e, "FileWriter 建立流失敗");
        }
        return fw;
    }

    private ConsumerConnector createConsumer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", Configs.Glob_KafkaZkQuorum);// 宣告zk
        properties.put("group.id", Configs.SaveHive_KafkaConsumerGroup);// 指定消費組
        properties.put("rebalance.max.retries", "5");
        properties.put("refresh.leader.backoff.ms", "10000");
        properties.put("zookeeper.session.timeout.ms", "40000");
        properties.put("auto.commit.enable", "false");
        properties.put("auto.offset.reset", "smallest");
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
}
 // load data into table
 // NOTE: filepath has to be local to the hive server emmm..wtf...檔案路徑必須在server本地才行!!!!OK,這種方案pass!!!
 // NOTE: /tmp/a.txt is a ctrl-A separated file with two fields per line

OK,kafka寫hive到這裡就結束了。

補充:hive官網:



時間過了一週,發現這個程式還是有個bug,我們叢集的datamanage與namenode通訊發生過故障,導致檔案副本數不足的情況下關閉流,會報close execption。想到的就是在關閉流之前,呼叫fs的getFilestatus(),判斷副本的數量,如果不夠,就等1秒。

相關推薦

CDH5.7.1Kafka+SparkStream+Hive實戰

    目前的專案中需要將kafka佇列的資料實時存到hive表中。應為之前工作中用到的是CDH5.11,而且spark等用的基本是最新版(2.2),語言也一直是Scala,所以這次要求Java語言以及低版本的spark,在寫程式的時候還是遇到了一些頭疼的事情。環境:Cent

CDH5.7.1 Hadoop2.6 HDFS Encryption KMS 實戰之功能測試

CDH KMS 測試 0、使用者說明 [x] keyAdminUser使用者是key admin user [x] hdfs 用 戶是 hdfs super user [x] user_a 、 user_b 是HDFS普通使用者 1、建立keytab

在centos 7.1部署weibbix

webbix安裝pipyum install -y openssl epel-release python-pip安裝Django 1.8.18pip install Django==1.8.18安裝MariaDByum install -y mariadb mariadb-server mariadb-de

7.1 python拉勾網實戰並儲存到mongodb

拉鉤網實戰 爬取拉勾網有關“爬蟲”的職位資訊,並把爬取的資料儲存在MongoDB資料庫中 確定網頁的載入方式是JavaScript載入通過谷歌瀏覽器開發者工具分析和尋找網頁的真實請求,確定真實資料在position.Ajax開頭的連結裡,請求方式是POST使用request

阿里雲虛擬機器搭建Hadoop-2.6.0-cdh5.7.1安裝詳解(偽分散式環境)

首先先搭配安全組 開啟映象後輸入以下內容:             重要:  yum -y install lrzsz HOSTNAME=(自己的主機名字) hostname $HOSTNAME echo "$(grep -E '127|::1' /etc/host

【Linux】教你一步一步在CentOS Linux release 7.1安裝vsftpd FTP伺服器

1.執行yum安裝vsftpd sudo yum install vsftpd 2.安裝完成後,修改vsftpd ftp登入使用者配置檔案 如果你希望可以使用root使用者登入ftp的話,需要做以下修改 /etc/vsftpd/ftpusers   該檔案中定義的使

iOS 7.0.4 和 7.1 計算文字高度有誤差的解決方法

   最近寫專案計算文字高度那裡在 7.0.4 和 7.1 上遇見 文字高度 有誤差的問題,一般寫我就直接用系統提供的方法,計算高度,如下: //得到文字的高度 +(NSValue*)getStringWithRect:(NSString*)aString

hadoop2.6.0-cdh5.7.1偽分散式編譯安裝

環境相關: OS:CentOS release 6.9 IP:192.168.1.10 MEM:10G(推薦4G以上) DISK:50G 1. 主機克隆,基礎環境部署 參照《CentOS6實驗機模板搭建部署》 克隆一臺實驗機,調整記憶體為10G,並

【Linux】教你一步一步在CentOS Linux release 7.1解除安裝vsftpd FTP伺服器

1.首先檢視系統中是否安裝了vsftpd rpm -qa | grep vsftpd  例如:如果顯示結果如下的話,說明是安裝了vsfptd3.0的ftp伺服器的 [[email protected] sbin]# rpm -qa | grep vsftpd v

hive-1.1.0-cdh5.7.0 的編譯安裝並修改元資料儲存資料庫為MySQL

1 準備工作 1.1 配置jdk1.7 1.2 部署MySQL 1.3 安裝maven 1.4編譯安裝 hadoop-2.6.0-cdh5.7.0.src.tar.gz 1.5建立hadoop使用者 安裝的目錄結構:/opt/s

在CentOS 7.2編譯安裝Nginx 1.13.6

創建 ont uname group with pen 還需 www stub 第一個裏程碑 --- 檢查軟件安裝的系統環境 [root@rainjin ~]# cat /etc/redhat-release CentOS Linux release 7.2.1511 (

webgoat 7.1 實戰指南 - 下

阻止 rec bsp 圖片 pda pps simditor 分享圖片 輸入驗證 webgoat 7.1 實戰指南 - 下 Injection Flaws Command Injection(命令註入) 對任何一個參數驅動的網站,命令註入攻擊代表一個嚴重的威脅。在攻擊後

CDH5.15.1 hive 連接mongodb配置及增刪改查

uri str prope upd info with oot ODB ble 1. 下載 wget http://repo1.maven.org/maven2/org/mongodb/mongo-hadoop/mongo-hadoop-hive/2.0.2/mongo-h

Nginx實戰 1.7-1.11 Nginx架構分析,虛擬機器配置

1.7-1.9 Nginx架構分析 Nginx模組化結構 Nginx涉及到的模組分為核心模組、標準HTTP模組、可選HTTP模組、郵件服務模組以及第三方模組等五大類。 核心模組 核心模組是指Nginx伺服器正常執行時必不可少的模組,它們提供了Nginx最基本最核心的服務,如程序管理、許可

spark2.1.0編譯 cdh5.7.0版本

一、實現目標 從spark官網下載2.1.0的原始碼,然後編譯對應hadoop版本的spark,從而可以解決很多相容性問題,使程式執行環境更加優越,順暢。 二、環境準備 1.硬體 無論雲主機還是虛擬機器,記憶體一定要4G以上,最好8G+。 2.軟體 (1)java:spark

cdh5.7.0偽分散式叢集之hive安裝

基本環境及軟體: 軟體版本 軟體包 centos-6.4   JDK-1.8 jdk-8u191-linux-x64.tar.gz hadoo

Ubuntu 16.04搭建CDH5.16.1叢集

本文參考自:《Ubuntu16.04上搭建CDH5.14叢集》    1.準備三臺安裝Ubuntu 16.04.4 LTS系統的伺服器,假設ip地址分佈為 192.168.100.19 192.168.100.20 192.168.100.21 (如果是虛擬機器,建議記憶體配置

Netty實戰開發(7):Netty結合kafka實現分散式訊息佇列

在分散式遊戲伺服器系統中,訊息處理佇列主要解決問題就是解耦系統中的業務,使得每個系統看起來功能比較單一,而且解決一些全服資料共享等問題。 通常我們知道kafka是作為訊息佇列比較火的一種方式,其實還有(Active MQ,Rabbit MQ,Zero MQ)個人

[Hadoop] CentOS7 安裝flume-ng-1.6.0-cdh5.7.0

1. Flume 安裝部署 根據官方文件描述,市面上的Flume主流版本有兩個:0.9.x and 1.x。這兩個版本差異非常非常大,舊版本已經被淘汰了,要用的話就使用新版本。當然本文中既定版本為cd

CDH5.16.1叢集增加新節點 Ubuntu 16.04搭建CDH5.16.1叢集 Ubuntu 16.04搭建CDH5.16.1叢集

如果是全新安裝叢集的話,可以參考《Ubuntu 16.04上搭建CDH5.16.1叢集》 下面是叢集新增節點步驟: 1.已經存在一個叢集,有兩個節點 192.168.100.19 hadoop-master 192.168.100.20 hadoop-slave1 新增節點ip為