Hadoop生態相關元件配置記錄
阿新 • • 發佈:2022-01-11
環境變數
cat /etc/profile.d/my_env.sh
#JAVA_HOME export JAVA_HOME=/opt/module/jdk1.8.0_181 export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export HADOOP_HOME=/opt/module/hadoop-3.2.2 #export SCALA_HOME=/opt/module/scala-2.13.0 export SCALA_HOME=/opt/module/scala-2.12.11 export HIVE_HOME=/opt/module/apache-hive-3.1.2-bin export ZOOKEEPER_HOME=/opt/module/apache-zookeeper-3.6.3-bin # export SPARK_HOME=/opt/module/spark-3.2.0-bin-hadoop3.2-scala2.13 export SPARK_HOME=/opt/module/spark-3.2.0-bin-hadoop3.2 export KAFKA_HOME=/opt/module/kafka_2.11-2.4.1 export MYBIN_HOME=/opt/module/bin export FLUME_HOME=/opt/module/flume-1.9.0 export SQOOP_HOME=/opt/module/sqoop-1.4.6 export PATH=$SQOOP_HOME/bin:$FLUME_HOME/bin:$KAFKA_HOME/bin:$MYBIN_HOME:$SPARK_HOME/sbin:$SCALA_HOME/bin:$HIVE_HOME/bin:$ZOOKEEPER_HOME/bin:$SPARK_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$JAVA_HOME/bin:$PATH
元件啟停指令碼
#!/bin/bash if [ $# -lt 1 ] then echo "No Args Input..." exit ; fi case $1 in "start") echo " =================== 啟動 zookeeper ===================" ssh hadoop001 "zkServer.sh start" ssh hadoop002 "zkServer.sh start" ssh hadoop003 "zkServer.sh start" echo " =================== 啟動 hadoop叢集 ===================" echo " --------------- 啟動 hdfs ---------------" ssh hadoop001 "/opt/module/hadoop-3.2.2/sbin/start-dfs.sh" echo " --------------- 啟動 yarn ---------------" ssh hadoop001 "/opt/module/hadoop-3.2.2/sbin/start-yarn.sh" echo " --------------- 啟動 historyserver ---------------" ssh hadoop001 "/opt/module/hadoop-3.2.2/bin/mapred --daemon start historyserver" echo " =================== 啟動 kafka叢集 ===================" ssh hadoop001 "kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties" ssh hadoop002 "kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties" ssh hadoop003 "kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties" ;; "stop") echo " =================== 關閉 kafka叢集 ===================" ssh hadoop001 "kafka-server-stop.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties" ssh hadoop002 "kafka-server-stop.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties" ssh hadoop003 "kafka-server-stop.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties" echo " =================== 關閉 hadoop叢集 ===================" echo " --------------- 關閉 historyserver ---------------" ssh hadoop001 "/opt/module/hadoop-3.2.2/bin/mapred --daemon stop historyserver" echo " --------------- 關閉 yarn ---------------" ssh hadoop001 "/opt/module/hadoop-3.2.2/sbin/stop-yarn.sh" echo " --------------- 關閉 hdfs ---------------" ssh hadoop001 "/opt/module/hadoop-3.2.2/sbin/stop-dfs.sh" echo " =================== 關閉 zookeeper ===================" ssh hadoop001 "zkServer.sh stop" ssh hadoop002 "zkServer.sh stop" ssh hadoop003 "zkServer.sh stop" ;; *) echo "Input Args Error..." ;; esac
hadoop的配置檔案
hadoop版本:3.2.2
core-site.xml
<configuration> <!-- HDFS主入口,mycluster僅是作為叢集的邏輯名稱,可隨意更改但務必與 hdfs-site.xml中dfs.nameservices值保持一致--> <property> <name>fs.defaultFS</name> <value>hdfs://mycluster</value> </property> <!-- 預設的hadoop.tmp.dir指向的是/tmp目錄,將導致namenode與datanode>資料全都儲存在易失目錄中 ,此處進行修改--> <property> <name>hadoop.tmp.dir</name> <value>/opt/module/hadoop-3.2.2/tmp</value> </property> <!--使用者角色配置,不配置此項會導致web頁面報錯--> <property> <name>hadoop.http.staticuser.user</name> <value>root</value> </property> <!--zookeeper叢集地址,這裡可配置單臺,如是叢集以逗號進行分隔--> <property> <name>ha.zookeeper.quorum</name> <value>hadoop001:2181,hadoop002:2181,hadoop003:2181</value> </property> <!-- hadoop連結zookeeper的超時時長設定 --> <property> <name>ha.zookeeper.session-timeout.ms</name> <value>1000</value> <description>ms</description> </property> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value>、 </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property> <property> <name>io.compression.codecs</name> <value> org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value> </property> <property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property> </configuration>
hdfs-site.xml
<configuration>
<property>
<name>dfs.datanode.data.dir</name>
<value>/opt/module/hadoop-3.2.2/data</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/module/hadoop-3.2.2/nn</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<!--指定hdfs的nameservice為cluster1,需要和core-site.xml中的保持一致
dfs.ha.namenodes.[nameservice id]為在nameservice中的每一個NameNode設定唯一標示符。
配置一個逗號分隔的NameNode ID列表。這將是被DataNode識別為所有的NameNode。
例如,如果使用"cluster1"作為nameservice ID,並且使用"nn1"和"nn2"作為NameNodes標示符
-->
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通訊地址 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>hadoop001:9000</value>
</property>
<!-- nn1的http通訊地址 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>hadoop001:9870</value>
</property>
<!-- nn2的RPC通訊地址 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>hadoop002:9000</value>
</property>
<!-- nn2的http通訊地址 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>hadoop002:9870</value>
</property>
<!-- 指定NameNode的edits元資料的共享儲存位置。也就是JournalNode列表
該url的配置格式:qjournal://host1:port1;host2:port2;host3:port3/journalId
journalId推薦使用nameservice,預設埠號是:8485 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop001:8485;hadoop002:8485;hadoop003:8485/mycluster</value>
</property>
<!-- 指定JournalNode在本地磁H的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/module/hadoop-3.2.2/jn</value>
</property>
<!-- 開啟NameNode失敗自動切換 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失敗自動切換實現方式 -->
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔離機制方法,多個機制用換行分割,即每個機制暫用一行 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>
<!-- 使用sshfence隔離機制時需要ssh免登陸 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<!-- 配置sshfence隔離機制超時時間 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>30000</value>
</property>
<property>
<name>ha.failover-controller.cli-check.rpc-timeout.ms</name>
<value>60000</value>
</property>
<!--指定輔助名稱節點-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop003:9868</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>10</value>
</property>
</configuration>
hadoop-env.sh
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_ZKFC_USER=root
export HDFS_JOURNALNODE_USER=root
yarn-env.sh
YARN_RESOURCEMANAGER_USER=root
YARN_NODEMANAGER_USER=root
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 配置 MapReduce JobHistory Server 地址 ,預設埠10020 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop001:10020</value>
</property>
<!-- 配置 MapReduce JobHistory Server web ui 地址, 預設埠19888 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop001:19888</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<!-- 開啟RM高可用 -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- 分別指定RM的地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>hadoop001</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>hadoop002</value>
</property>
<property>
<!-- RM HTTP訪問地址 預設:${yarn.resourcemanager.hostname}:8088-->
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>hadoop001:8088</value>
</property>
<property>
<!-- RM HTTP訪問地址 預設:${yarn.resourcemanager.hostname}:8088-->
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>hadoop002:8088</value>
</property>
<!-- 指定zk叢集地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>hadoop001:2181,hadoop002:2181,hadoop002:2181</value>
</property>
<!--Reducer獲取資料的方式-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--日誌聚集功能開啟-->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!--日誌保留時間設定1天-->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>86400</value>
</property>
<!-- 啟用自動恢復 -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!-- 制定resourcemanager的狀態資訊儲存在zookeeper叢集上 -->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<!-- 環境變數的繼承 -->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<!-- yarn容器允許分配的最大最小記憶體 -->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>
<!-- yarn容器允許管理的實體記憶體大小 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>
<!-- 關閉yarn對實體記憶體和虛擬記憶體的限制檢查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>
zookeeper配置檔案
zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/apache-zookeeper-3.6.3-bin/data
dtaLogDir=/opt/module/apache-zookeeper-3.6.3-bin/dataLog
clientPort=2181
autopurge.snapRetainCount=20
autopurge.purgeInterval=48
server.1=hadoop001:2888:3888
server.2=hadoop002:2888:3888
server.3=hadoop003:2888:3888
每臺主機配置myid
/opt/module/apache-zookeeper-3.6.3-bin/data/myid
kafka配置檔案
版本:kafka_2.11-2.4.1
server.properties
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/module/kafka_2.11-2.4.1/datas
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
啟停
kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties
kafka-server-stop.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties
使用
kafka-topics.sh --create --topic test --bootstrap-server hadoop001:9092 --partitions 2 --replication-factor 3
kafka-topics.sh --list --bootstrap-server hadoop001:9092
kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic test
生產者
kafka-console-producer.sh --topic test --broker-list hadoop001:9092
消費者
kafka-console-consumer.sh --topic test --bootstrap-server hadoop001:9092
按組(啟動多個消費者,相同的組名,就按照分割槽消費了)
kafka-console-consumer.sh --topic test --bootstrap-server hadoop001:9092 --group g1
flume配置檔案
版本:1.9.0
刪除 guava-11.0.2.jar
flume配置: source:tailDir. channal: Kafka
/opt/module/flume-1.9.0/jobs/gmall/logserver-flume-kafka.conf
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.EtlLogInterceptor$MyBuilder
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
a1.sources.r1.channels = c1
攔截器
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>atguigu-spark-211223</artifactId>
<groupId>com.atguigu.bigdata</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.gmall</groupId>
<artifactId>collect0110</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Interceptor類
package com.atguigu.gmall.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
public class EtlLogInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), StandardCharsets.UTF_8);
try {
JSON.parseObject(body);
} catch (JSONException e) {
return null;
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext()) {
Event next = iterator.next();
if (intercept(next) == null) {
iterator.remove();
}
}
return list;
}
@Override
public void close() {
}
public static class MyBuilder implements Builder {
@Override
public Interceptor build() {
return new EtlLogInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
啟動
flume-ng agent -c $FLUME_HOME/conf -f /opt/module/flume-1.9.0/jobs/gmall/logserver-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
kafka消費者檢視
kafka-console-consumer.sh --topic topic_log --bootstrap-server hp001:9092,hadoop002:9092,hadoop003:9092 --from-beginning
指令碼
#!/bin/bash
if [ $# -lt 1 ]
then
echo "USAGE: f1.sh {start|stop}"
exit
fi
case $1 in
start)
for i in hadoop001 hadoop002
do
ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f /opt/module/flume-1.9.0/jobs/gmall/logserver-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console 1>$FLUME_HOME/logs/flume.log 2>&1 &"
done
;;
stop)
for i in hadoop001 hadoop002
do
ssh $i "ps -ef | grep logserver-flume-kafka.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
done
;;
*)
echo "USAGE: f1.sh {start|stop}"
exit
;;
esac
flume配置: source:Kafla. channal: hdfs
攔截器
package com.atguigu.gmall.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class TimeStampInterceptor implements Interceptor {
@Override
public void initialize() {}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSON.parseObject(body);
String ts = jsonObject.getString("ts");
event.getHeaders().put("timestamp", ts);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {}
public static class MyBuilder implements Builder {
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {}
}
}
kafka-flume-hdfs.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.kafka.consumer.group.id = gmall
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TimeStampInterceptor$MyBuilder
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/jobs/filechannel
a1.channels.c1.capacity = 1000000
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/jobs/checkpoint
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.maxFileSize = 2146425071
a1.channels.c1.keep-alive = 5
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟停指令碼
#!/bin/bash
if [ $# -lt 1 ]
then
echo "USAGE: f1.sh {start|stop}"
exit
fi
case $1 in
start)
for i in hadoop003
do
ssh $i "nohup flume-ng agent -c $FLUME_HOME/conf -f /opt/module/flume-1.9.0/jobs/gmall/kafka-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 1>$FLUME_HOME/logs/flume.log 2>&1 &"
done
;;
stop)
for i in hadoop003
do
ssh $i "ps -ef | grep kafka-flume-hdfs.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
done
;;
*)
echo "USAGE: f1.sh {start|stop}"
exit
;;
esac
flume記憶體調整
在 $FLUME_HOME/conf/flume-env.sh下,修改 JAVA_OPTS 下的 -Xms 和 -Xmx
sqoop 配置檔案
sqoop-env.sh
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.2.2
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.2.2
export HIVE_HOME=/opt/module/apache-hive-3.1.2-bin
export ZOOKEEPER_HOME=/opt/module/apache-zookeeper-3.6.3-bin
export ZOOCFGDIR=/opt/module/apache-zookeeper-3.6.3-bin/conf
驗證
sqoop list-databases --connect "jdbc:mysql://hadoop001:3306/gmall" --username root --password hadoop
Sqoop將資料匯入到HDFS
sqoop import \
--connect "jdbc:mysql://hadoop001:3306/gmall" \
--username root --password hadoop \
--table user_info \
--columns id,login_name,nick_name \
--where "id >=100 and id <= 200" \
--target-dir /testsqoop \
--delete-target-dir \
--num-mappers 2 \
--split-by id \
--fields-terminated-by "\t"
或
sqoop import \
--connect "jdbc:mysql://hadoop001:3306/gmall" \
--username root --password hadoop \
--query "select id,login_name,nick_name from user_info where id >= 100 and id <=200 and \$CONDITIONS" \
--target-dir /testsqoop \
--delete-target-dir \
--num-mappers 2 \
--split-by id \
--fields-terminated-by "\t" \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
指令碼
#! /bin/bash
APP=gmall
sqoop=/opt/module/sqoop-1.4.6/bin/sqoop
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$2" ] ;then
do_date=$2
else
echo "請傳入日期引數"
exit
fi
import_data(){
$sqoop import \
--connect jdbc:mysql://hadoop001:3306/$APP \
--username root \
--password hadoop \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 where \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
hadoop jar /opt/module/hadoop-3.2.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date
}
import_order_info(){
import_data order_info "select
id,
total_amount,
order_status,
user_id,
payment_way,
delivery_address,
out_trade_no,
create_time,
operate_time,
expire_time,
tracking_no,
province_id,
activity_reduce_amount,
coupon_reduce_amount,
original_total_amount,
feight_fee,
feight_fee_reduce
from order_info"
}
case $1 in
"order_info")
import_order_info
;;
"all")
import_base_category1
;;
esac
hive 配置檔案
挪動lib下的jar
mv $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.jar $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.jar.bak
cp mysql-connector-java-5.1.32.jar $HIVE_HOME/lib/
hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.exec.scratchdir</name>
<value>/opt/module/apache-hive-3.1.2-bin/tmp</value>
</property>
<!-- Hive預設在HDFS的工作目錄-->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>/user/hive/log</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop001:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hadoop</value>
<description>password to use against metastore database</description>
</property>
<!-- Hive元資料儲存的驗證-->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<!-- 指定hiveserver2連線的埠號 -->
<property>
<name>hive.server2.thrift.port</name>
<value>10010</value>
</property>
<!-- 指定hiveserver2連線的host -->
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop001</value>
</property>
<!-- 元資料儲存授權-->
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>
<!-- 指定儲存元資料要連線的地址 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop001:9083</value>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
</configuration>
hive-log4j2.properties.
property.hive.log.dir = /opt/module/apache-hive-3.1.2-bin/logs
Spark on Yarn 配置檔案
版本:3.2.0
檢查yarn-site.xml
<!--是否啟動一個執行緒檢查每個任務正使用的實體記憶體量,如果任務超出分配值,則直接將其殺掉,預設是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否啟動一個執行緒檢查每個任務正使用的虛擬記憶體量,如果任務超出分配值,則直接將其殺掉,預設是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
把hive-site.xml cp到spark conf下
cp mysql-connector到jars下
spark-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_181
export SCALA_HOME=/opt/module/scala-2.12.11
export SPARK_CONF_DIR=/opt/module/spark-3.2.0-bin-hadoop3.2/conf
export HADOOP_CONF_DIR=/opt/module/hadoop-3.2.2/etc/hadoop
export YARN_CONF_DIR=/opt/module/hadoop-3.2.2/etc/hadoop
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://mycluster/spark/sparkhistory"
spark-defauls.conf
spark.yarn.historyServer.address hadoop001:18080
spark.history.ui.port 18080
spark.eventLog.enabled true
spark.eventLog.dir hdfs://mycluster/spark/sparkhistory
spark.eventLog.compress true
啟動歷史伺服器
sbin/start-history-server.sh
提交應用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn\
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10