1. 程式人生 > 其它 >Hadoop生態相關元件配置記錄

Hadoop生態相關元件配置記錄

環境變數

cat /etc/profile.d/my_env.sh

#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export HADOOP_HOME=/opt/module/hadoop-3.2.2

#export SCALA_HOME=/opt/module/scala-2.13.0
export SCALA_HOME=/opt/module/scala-2.12.11

export HIVE_HOME=/opt/module/apache-hive-3.1.2-bin

export ZOOKEEPER_HOME=/opt/module/apache-zookeeper-3.6.3-bin

# export SPARK_HOME=/opt/module/spark-3.2.0-bin-hadoop3.2-scala2.13
export SPARK_HOME=/opt/module/spark-3.2.0-bin-hadoop3.2

export KAFKA_HOME=/opt/module/kafka_2.11-2.4.1

export MYBIN_HOME=/opt/module/bin

export FLUME_HOME=/opt/module/flume-1.9.0

export SQOOP_HOME=/opt/module/sqoop-1.4.6

export PATH=$SQOOP_HOME/bin:$FLUME_HOME/bin:$KAFKA_HOME/bin:$MYBIN_HOME:$SPARK_HOME/sbin:$SCALA_HOME/bin:$HIVE_HOME/bin:$ZOOKEEPER_HOME/bin:$SPARK_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$JAVA_HOME/bin:$PATH

元件啟停指令碼

#!/bin/bash
if [ $# -lt 1 ]
then
    echo "No Args Input..."
    exit ;
fi
case $1 in
"start")
        echo " =================== 啟動 zookeeper  ==================="
        ssh hadoop001 "zkServer.sh start"
        ssh hadoop002 "zkServer.sh start"
        ssh hadoop003 "zkServer.sh start"

        echo " =================== 啟動 hadoop叢集 ==================="

        echo " --------------- 啟動 hdfs ---------------"
        ssh hadoop001 "/opt/module/hadoop-3.2.2/sbin/start-dfs.sh"
        echo " --------------- 啟動 yarn ---------------"
        ssh hadoop001 "/opt/module/hadoop-3.2.2/sbin/start-yarn.sh"
        echo " --------------- 啟動 historyserver ---------------"
        ssh hadoop001 "/opt/module/hadoop-3.2.2/bin/mapred --daemon start historyserver"
        
        echo " =================== 啟動 kafka叢集 ==================="
        ssh hadoop001 "kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties"
        ssh hadoop002 "kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties"
        ssh hadoop003 "kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties"

;;
"stop")
        echo " =================== 關閉 kafka叢集 ==================="
        ssh hadoop001 "kafka-server-stop.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties"
        ssh hadoop002 "kafka-server-stop.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties"
        ssh hadoop003 "kafka-server-stop.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties"

        echo " =================== 關閉 hadoop叢集 ==================="

        echo " --------------- 關閉 historyserver ---------------"
        ssh hadoop001 "/opt/module/hadoop-3.2.2/bin/mapred --daemon stop historyserver"
        echo " --------------- 關閉 yarn ---------------"
        ssh hadoop001 "/opt/module/hadoop-3.2.2/sbin/stop-yarn.sh"
        echo " --------------- 關閉 hdfs ---------------"
        ssh hadoop001 "/opt/module/hadoop-3.2.2/sbin/stop-dfs.sh"

        echo " =================== 關閉 zookeeper  ==================="
        ssh hadoop001 "zkServer.sh stop"
        ssh hadoop002 "zkServer.sh stop"
        ssh hadoop003 "zkServer.sh stop"
;;
*)
    echo "Input Args Error..."
;;
esac

hadoop的配置檔案

hadoop版本:3.2.2
core-site.xml

<configuration>
<!-- HDFS主入口,mycluster僅是作為叢集的邏輯名稱,可隨意更改但務必與
     hdfs-site.xml中dfs.nameservices值保持一致-->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
    </property>

<!-- 預設的hadoop.tmp.dir指向的是/tmp目錄,將導致namenode與datanode>資料全都儲存在易失目錄中
,此處進行修改-->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-3.2.2/tmp</value>
    </property>

<!--使用者角色配置,不配置此項會導致web頁面報錯-->
    <property>
        <name>hadoop.http.staticuser.user</name>
        <value>root</value>
    </property>

<!--zookeeper叢集地址,這裡可配置單臺,如是叢集以逗號進行分隔-->
    <property>
        <name>ha.zookeeper.quorum</name>
        <value>hadoop001:2181,hadoop002:2181,hadoop003:2181</value>
    </property>
    <!-- hadoop連結zookeeper的超時時長設定 -->
    <property>
        <name>ha.zookeeper.session-timeout.ms</name>
        <value>1000</value>
        <description>ms</description>
    </property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>、
</property>

<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
    <property>
        <name>io.compression.codecs</name>
        <value>
            org.apache.hadoop.io.compress.GzipCodec,
            org.apache.hadoop.io.compress.DefaultCodec,
            org.apache.hadoop.io.compress.BZip2Codec,
            org.apache.hadoop.io.compress.SnappyCodec,
            com.hadoop.compression.lzo.LzoCodec,
            com.hadoop.compression.lzo.LzopCodec
        </value>
    </property>

    <property>
        <name>io.compression.codec.lzo.class</name>
        <value>com.hadoop.compression.lzo.LzoCodec</value>
    </property>

</configuration>

hdfs-site.xml

<configuration>
<property>
<name>dfs.datanode.data.dir</name>
<value>/opt/module/hadoop-3.2.2/data</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/module/hadoop-3.2.2/nn</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>

<property>
<name>dfs.permissions</name>
<value>false</value>
</property>


<!--指定hdfs的nameservice為cluster1,需要和core-site.xml中的保持一致
                      dfs.ha.namenodes.[nameservice id]為在nameservice中的每一個NameNode設定唯一標示符。
        配置一個逗號分隔的NameNode ID列表。這將是被DataNode識別為所有的NameNode。
        例如,如果使用"cluster1"作為nameservice ID,並且使用"nn1"和"nn2"作為NameNodes標示符
    -->
    <property>
        <name>dfs.nameservices</name>
        <value>mycluster</value>
    </property>
        <property>
        <name>dfs.ha.namenodes.mycluster</name>
        <value>nn1,nn2</value>
    </property>

   <!-- nn1的RPC通訊地址 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn1</name>
        <value>hadoop001:9000</value>
    </property>

    <!-- nn1的http通訊地址 -->
    <property>
        <name>dfs.namenode.http-address.mycluster.nn1</name>
        <value>hadoop001:9870</value>
    </property>

    <!-- nn2的RPC通訊地址 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn2</name>
        <value>hadoop002:9000</value>
    </property>

    <!-- nn2的http通訊地址 -->
    <property>
        <name>dfs.namenode.http-address.mycluster.nn2</name>
        <value>hadoop002:9870</value>
    </property>

    <!-- 指定NameNode的edits元資料的共享儲存位置。也就是JournalNode列表
                          該url的配置格式:qjournal://host1:port1;host2:port2;host3:port3/journalId
        journalId推薦使用nameservice,預設埠號是:8485 -->
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://hadoop001:8485;hadoop002:8485;hadoop003:8485/mycluster</value>
    </property>

    <!-- 指定JournalNode在本地磁H的位置 -->
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>/opt/module/hadoop-3.2.2/jn</value>
    </property>

    <!-- 開啟NameNode失敗自動切換 -->
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>

    <!-- 配置失敗自動切換實現方式 -->
    <property>
        <name>dfs.client.failover.proxy.provider.mycluster</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>

    <!-- 配置隔離機制方法,多個機制用換行分割,即每個機制暫用一行 -->
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>
            sshfence
            shell(/bin/true)
        </value>
    </property>

    <!-- 使用sshfence隔離機制時需要ssh免登陸 -->
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/root/.ssh/id_rsa</value>
    </property>

    <!-- 配置sshfence隔離機制超時時間 -->
    <property>
        <name>dfs.ha.fencing.ssh.connect-timeout</name>
        <value>30000</value>
    </property>

    <property>
        <name>ha.failover-controller.cli-check.rpc-timeout.ms</name>
        <value>60000</value>
    </property>
    
    
    <!--指定輔助名稱節點-->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop003:9868</value>
    </property>
    
    <property>
        <name>dfs.namenode.handler.count</name>
        <value>10</value>
    </property>

</configuration>

hadoop-env.sh

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_ZKFC_USER=root
export HDFS_JOURNALNODE_USER=root

yarn-env.sh

YARN_RESOURCEMANAGER_USER=root
YARN_NODEMANAGER_USER=root

mapred-site.xml

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 配置 MapReduce JobHistory Server 地址 ,預設埠10020 -->
        <property>
                <name>mapreduce.jobhistory.address</name>
                <value>hadoop001:10020</value>
        </property>

        <!-- 配置 MapReduce JobHistory Server web ui 地址, 預設埠19888 -->
        <property>
                <name>mapreduce.jobhistory.webapp.address</name>
                <value>hadoop001:19888</value>
        </property>
</configuration>

yarn-site.xml

<configuration>
<!-- 開啟RM高可用 -->
    <property>
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
    </property>

    <!-- 指定RM的cluster id -->
    <property>
        <name>yarn.resourcemanager.cluster-id</name>
        <value>yrc</value>
    </property>

    <!-- 指定RM的名字 -->
    <property>
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
    </property>

    <!-- 分別指定RM的地址 -->
    <property>
        <name>yarn.resourcemanager.hostname.rm1</name>
        <value>hadoop001</value>
    </property>

    <property>
        <name>yarn.resourcemanager.hostname.rm2</name>
        <value>hadoop002</value>
    </property>

<property>
    <!-- RM HTTP訪問地址 預設:${yarn.resourcemanager.hostname}:8088-->
    <name>yarn.resourcemanager.webapp.address.rm1</name>
    <value>hadoop001:8088</value>
</property>
<property>
    <!-- RM HTTP訪問地址 預設:${yarn.resourcemanager.hostname}:8088-->
    <name>yarn.resourcemanager.webapp.address.rm2</name>
    <value>hadoop002:8088</value>
</property>


    <!-- 指定zk叢集地址 -->
    <property>
        <name>yarn.resourcemanager.zk-address</name>
        <value>hadoop001:2181,hadoop002:2181,hadoop002:2181</value>
    </property>
<!--Reducer獲取資料的方式-->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
<!--日誌聚集功能開啟-->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>
<!--日誌保留時間設定1天-->
    <property>
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>86400</value>
    </property>

    <!-- 啟用自動恢復 -->
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
    </property>

    <!-- 制定resourcemanager的狀態資訊儲存在zookeeper叢集上 -->
    <property>
        <name>yarn.resourcemanager.store.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
    </property>

<!-- 環境變數的繼承 -->
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>
    
    <!-- yarn容器允許分配的最大最小記憶體 -->
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>512</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>4096</value>
    </property>
    
    <!-- yarn容器允許管理的實體記憶體大小 -->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>4096</value>
    </property>
    
    <!-- 關閉yarn對實體記憶體和虛擬記憶體的限制檢查 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
</configuration>

zookeeper配置檔案

zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/apache-zookeeper-3.6.3-bin/data
dtaLogDir=/opt/module/apache-zookeeper-3.6.3-bin/dataLog
clientPort=2181
autopurge.snapRetainCount=20
autopurge.purgeInterval=48


server.1=hadoop001:2888:3888
server.2=hadoop002:2888:3888
server.3=hadoop003:2888:3888

每臺主機配置myid

/opt/module/apache-zookeeper-3.6.3-bin/data/myid

kafka配置檔案

版本:kafka_2.11-2.4.1
server.properties




broker.id=1

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/opt/module/kafka_2.11-2.4.1/datas

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

啟停

kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties
kafka-server-stop.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties

使用

kafka-topics.sh --create --topic test --bootstrap-server hadoop001:9092 --partitions 2 --replication-factor 3
kafka-topics.sh --list --bootstrap-server hadoop001:9092
kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic test
生產者
kafka-console-producer.sh --topic test --broker-list hadoop001:9092
消費者
kafka-console-consumer.sh --topic test --bootstrap-server hadoop001:9092
按組(啟動多個消費者,相同的組名,就按照分割槽消費了)
kafka-console-consumer.sh --topic test --bootstrap-server hadoop001:9092 --group g1

flume配置檔案

版本:1.9.0
刪除 guava-11.0.2.jar

flume配置: source:tailDir. channal: Kafka

/opt/module/flume-1.9.0/jobs/gmall/logserver-flume-kafka.conf

a1.sources = r1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.EtlLogInterceptor$MyBuilder

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

a1.sources.r1.channels = c1

攔截器
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>atguigu-spark-211223</artifactId>
        <groupId>com.atguigu.bigdata</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.gmall</groupId>
    <artifactId>collect0110</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

Interceptor類

package com.atguigu.gmall.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class EtlLogInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        try {
            JSON.parseObject(body);
        } catch (JSONException e) {
            return null;
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()) {
            Event next = iterator.next();
            if (intercept(next) == null) {
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {
    }

    public static class MyBuilder implements Builder {

        @Override
        public Interceptor build() {
            return new EtlLogInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

啟動

flume-ng agent -c $FLUME_HOME/conf -f /opt/module/flume-1.9.0/jobs/gmall/logserver-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

kafka消費者檢視

kafka-console-consumer.sh --topic topic_log --bootstrap-server hp001:9092,hadoop002:9092,hadoop003:9092 --from-beginning

指令碼

#!/bin/bash
if [ $# -lt 1 ]
then 
  echo "USAGE: f1.sh {start|stop}"
  exit
fi

case $1 in
start)
        for i in hadoop001 hadoop002
        do
          ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f /opt/module/flume-1.9.0/jobs/gmall/logserver-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console 1>$FLUME_HOME/logs/flume.log 2>&1 &"
        done  
;;

stop)
        for i in hadoop001 hadoop002
        do
          ssh $i "ps -ef | grep logserver-flume-kafka.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
        done 
;;

*)
  echo "USAGE: f1.sh {start|stop}"
  exit
;;
esac 

flume配置: source:Kafla. channal: hdfs

攔截器

package com.atguigu.gmall.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class TimeStampInterceptor implements Interceptor {
    @Override
    public void initialize() {}

    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject jsonObject = JSON.parseObject(body);
        String ts = jsonObject.getString("ts");
        event.getHeaders().put("timestamp", ts);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {}

    public static class MyBuilder implements Builder {

        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {}
    }
}

kafka-flume-hdfs.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.kafka.consumer.group.id = gmall
a1.sources.r1.batchDurationMillis = 2000

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TimeStampInterceptor$MyBuilder

a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/jobs/filechannel
a1.channels.c1.capacity = 1000000
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/jobs/checkpoint
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.maxFileSize = 2146425071
a1.channels.c1.keep-alive = 5

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟停指令碼

#!/bin/bash
if [ $# -lt 1 ]
then 
  echo "USAGE: f1.sh {start|stop}"
  exit
fi

case $1 in
start)
        for i in hadoop003
        do
          ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f /opt/module/flume-1.9.0/jobs/gmall/kafka-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 1>$FLUME_HOME/logs/flume.log 2>&1 &"
        done  
;;

stop)
        for i in hadoop003
        do
          ssh $i "ps -ef | grep kafka-flume-hdfs.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
        done 
;;

*)
  echo "USAGE: f1.sh {start|stop}"
  exit
;;
esac 

flume記憶體調整

在 $FLUME_HOME/conf/flume-env.sh下,修改 JAVA_OPTS 下的 -Xms 和 -Xmx

sqoop 配置檔案

sqoop-env.sh

export HADOOP_COMMON_HOME=/opt/module/hadoop-3.2.2
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.2.2
export HIVE_HOME=/opt/module/apache-hive-3.1.2-bin
export ZOOKEEPER_HOME=/opt/module/apache-zookeeper-3.6.3-bin
export ZOOCFGDIR=/opt/module/apache-zookeeper-3.6.3-bin/conf

驗證

sqoop list-databases --connect "jdbc:mysql://hadoop001:3306/gmall" --username root --password hadoop

Sqoop將資料匯入到HDFS

sqoop import \
--connect "jdbc:mysql://hadoop001:3306/gmall" \
--username root --password hadoop \
--table user_info \
--columns id,login_name,nick_name \
--where "id >=100 and id <= 200" \
--target-dir /testsqoop \
--delete-target-dir \
--num-mappers 2 \
--split-by id \
--fields-terminated-by "\t"

sqoop import \
--connect "jdbc:mysql://hadoop001:3306/gmall" \
--username root --password hadoop \
--query "select id,login_name,nick_name from user_info where id >= 100 and id <=200 and \$CONDITIONS" \
--target-dir /testsqoop \
--delete-target-dir \
--num-mappers 2 \
--split-by id \
--fields-terminated-by "\t" \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'

指令碼

#! /bin/bash

APP=gmall
sqoop=/opt/module/sqoop-1.4.6/bin/sqoop

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$2" ] ;then
   do_date=$2
else 
   echo "請傳入日期引數"
   exit
fi 

import_data(){
$sqoop import \
--connect jdbc:mysql://hadoop001:3306/$APP \
--username root \
--password hadoop \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 where \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'

hadoop jar /opt/module/hadoop-3.2.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date
}

import_order_info(){
  import_data order_info "select
                            id, 
                            total_amount, 
                            order_status, 
                            user_id, 
                            payment_way,
                            delivery_address,
                            out_trade_no, 
                            create_time, 
                            operate_time,
                            expire_time,
                            tracking_no,
                            province_id,
                            activity_reduce_amount,
                            coupon_reduce_amount,                            
                            original_total_amount,
                            feight_fee,
                            feight_fee_reduce      
                        from order_info"
}

case $1 in
  "order_info")
     import_order_info
;;
  "all")
   import_base_category1
;;
esac

hive 配置檔案

挪動lib下的jar

mv $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.jar $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.jar.bak
cp mysql-connector-java-5.1.32.jar $HIVE_HOME/lib/

hive-site.xml

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
    <name>hive.exec.scratchdir</name>
    <value>/opt/module/apache-hive-3.1.2-bin/tmp</value>
 </property>

<!-- Hive預設在HDFS的工作目錄-->
<property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive/warehouse</value>
 </property>
 <property>
    <name>hive.querylog.location</name>
    <value>/user/hive/log</value>
 </property>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://hadoop001:3306/hive?createDatabaseIfNotExist=true</value>
    <description>JDBC connect string for a JDBC metastore</description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    <description>Driver class name for a JDBC metastore</description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    <description>username to use against metastore database</description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>hadoop</value>
    <description>password to use against metastore database</description>
  </property>

    <!-- Hive元資料儲存的驗證-->
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>

    <!-- 指定hiveserver2連線的埠號 -->
    <property>
    <name>hive.server2.thrift.port</name>
    <value>10010</value>
    </property>

    <!-- 指定hiveserver2連線的host -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>hadoop001</value>
    </property>

    <!-- 元資料儲存授權-->
    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
    </property>
    
     <!-- 指定儲存元資料要連線的地址 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop001:9083</value>
</property>
      
    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>
</configuration>

hive-log4j2.properties.

property.hive.log.dir = /opt/module/apache-hive-3.1.2-bin/logs

Spark on Yarn 配置檔案

版本:3.2.0
檢查yarn-site.xml

<!--是否啟動一個執行緒檢查每個任務正使用的實體記憶體量,如果任務超出分配值,則直接將其殺掉,預設是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>

<!--是否啟動一個執行緒檢查每個任務正使用的虛擬記憶體量,如果任務超出分配值,則直接將其殺掉,預設是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

把hive-site.xml cp到spark conf下
cp mysql-connector到jars下
spark-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_181
export SCALA_HOME=/opt/module/scala-2.12.11
export SPARK_CONF_DIR=/opt/module/spark-3.2.0-bin-hadoop3.2/conf
export HADOOP_CONF_DIR=/opt/module/hadoop-3.2.2/etc/hadoop
export YARN_CONF_DIR=/opt/module/hadoop-3.2.2/etc/hadoop
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30  -Dspark.history.fs.logDirectory=hdfs://mycluster/spark/sparkhistory"

spark-defauls.conf

spark.yarn.historyServer.address hadoop001:18080
spark.history.ui.port 18080
spark.eventLog.enabled true
spark.eventLog.dir hdfs://mycluster/spark/sparkhistory
spark.eventLog.compress true

啟動歷史伺服器

sbin/start-history-server.sh 

提交應用

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn\
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10