1. 程式人生 > 資料庫 >spark-sql實踐

spark-sql實踐

spark-sql實踐

一、安裝anaconda

安裝包連結
連結:
提取碼:batk


使用bash命令執行安裝包
在這裡插入圖片描述


一直回車,遇到選擇選yes即可
在這裡插入圖片描述
在這裡插入圖片描述


安裝成功
在這裡插入圖片描述


配置環境變數

export PATH=$PATH:/root/anaconda3/bin

在這裡插入圖片描述


可以看出安裝成功
在這裡插入圖片描述

二、配置jupyter notebook

配置環境變數後使用下面命令生成jupyter notebook配置檔案

jupyter notebook --generate-config

在這裡插入圖片描述


使用下面命令設定jupyter密碼並記住sha1值,後面配置要用

python -c "import IPython; print(IPython.lib.passwd())"

在這裡插入圖片描述
在剛剛生成的配置檔案中新增下面語句

# 允許所有IP登入
c.NotebookApp.ip = '*'
# 使用剛剛生成的sha1值
c.NotebookApp.password = 'sha1:679a04c48eec:050346283252410f864ddfbf397a5aa64dd2ae09'
# 是否自動開啟瀏覽器
c.NotebookApp.open_browser = False
# 允許使用root使用者登入
c.NotebookApp.allow_root =True
# 設定訪問jupyter notebook的埠為4040
c.NotebookApp.port = 4040
c.ContentsManager.root_dir = '/usr/jupyter'
c.NotebookApp.notebook_dir = '/usr/jupyter'

在這裡插入圖片描述


啟動jupyter notebook

jupyter notebook

在這裡插入圖片描述


輸入密碼成功登入
在這裡插入圖片描述
在這裡插入圖片描述

三、案例分析

程式碼下載連結:
連結:
提取碼:cgnc


資料下載連結:

連結:
提取碼:qskr


本案例使用的資料集來自資料網站Kaggle的美國新冠肺炎疫情資料集,該資料集以資料表us-counties.csv組織,其中包含了美國發現首例新冠肺炎確診病例至2020-05-19的相關資料
在這裡插入圖片描述

1.格式轉換

原始資料集是以.csv檔案組織的,為了方便spark讀取生成RDD或者DataFrame,首先將us-counties.csv轉換為.txt格式檔案us-counties.txt

import pandas as pd
 
#.csv->.txt
data = pd.read_csv('us-counties.csv')
with open('us-counties.txt','a+',encoding='utf-8') as f:
    for line in data.values:
        f.write((str(line[0])+'\t'+str(line[1])+'\t'
                +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))


然後將資料上傳到hdfs上

hdfs dfs -put us-counties.txt /test4

在這裡插入圖片描述

2.讀取檔案生成DataFrame

這裡讀取的路徑都是hdfs路徑

import findspark
findspark.init()
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func
def toDate(inputStr):
    newStr = ""
    if len(inputStr) == 8:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7]
        newStr = s1+"-"+"0"+s2+"-"+"0"+s3
    else:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7:]
        newStr = s1+"-"+"0"+s2+"-"+s3
    date = datetime.strptime(newStr, "%Y-%m-%d")
    return date
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
 
fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)
 
rdd0 = spark.sparkContext.textFile("/test4/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
 
shemaUsInfo = spark.createDataFrame(rdd1,schema)
shemaUsInfo.createOrReplaceTempView("usInfo")

3.進行資料分析

這裡儲存的路徑都是hdfs路徑

(1)計算每日的累計確診病例數和死亡數

df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
 
#列重新命名
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("/test4/result1") 
 
#註冊為臨時表供下一步使用
df1.createOrReplaceTempView("ustotal")

在這裡插入圖片描述

(2)計算每日較昨日的新增確診病例數和死亡病例數

df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
 
df2.sort(df2["date"].asc()).repartition(1).write.json("/test4/result2") 

在這裡插入圖片描述

(3)統計截止5.19日 美國各州的累計確診人數和死亡人數

df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
 
df3.sort(df3["totalCases"].desc()).repartition(1).write.json("/test4/result3") #寫入hdfs
 
df3.createOrReplaceTempView("eachStateInfo")

在這裡插入圖片描述

(4)找出美國確診最多的10個州

df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")
df4.repartition(1).write.json("/test4/result4")

在這裡插入圖片描述

(5)找出美國死亡最多的10個州

df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
df5.repartition(1).write.json("/test4/result5")

在這裡插入圖片描述

(6)找出美國確診最少的10個州

df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
df6.repartition(1).write.json("/test4/result6")

在這裡插入圖片描述

(7)找出美國死亡最少的10個州

df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
df7.repartition(1).write.json("/test4/result7")

在這裡插入圖片描述

(8)統計截止5.19全美和各州的病死率

df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("/test4/result8")

在這裡插入圖片描述

4.資料視覺化

匯入所需庫

from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json

將hdfs生成結果放在本地,因為視覺化部分不需要使用叢集,下面使用的路徑均為本地路徑
在這裡插入圖片描述

(1)畫出每日的累計確診病例數和死亡數——>雙柱狀圖

root = "test4/result1/part-00000-35b0ecb6-8abe-4342-90ce-bd9b86acc054-c000.json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
    while True:
        line = f.readline()
        if not line:                            # 到 EOF,返回空字串,則終止迴圈
            break
        js = json.loads(line)
        date.append(str(js['date']))
        cases.append(int(js['cases']))
        deaths.append(int(js['deaths']))

d = (
Bar()
.add_xaxis(date)
.add_yaxis("累計確診人數", cases, stack="stack1")
.add_yaxis("累計死亡人數", deaths, stack="stack1")
.set_series_opts(label_opts=opts.LabelOpts(is_show=False))
.set_global_opts(title_opts=opts.TitleOpts(title="美國每日累計確診和死亡人數"))
)
d.load_javascript()
d.render_notebook()

在這裡插入圖片描述

(2)畫出每日的新增確診病例數和死亡數——>折線圖

root = "test4/result2/part-00000-6a74a9a3-dc2c-4d6b-997c-a74762a27bd0-c000.json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
    while True:
        line = f.readline()
        if not line:                            # 到 EOF,返回空字串,則終止迴圈
            break
        js = json.loads(line)
        date.append(str(js['date']))
        cases.append(int(js['caseIncrease']))
        deaths.append(int(js['deathIncrease']))

L1 = (
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
    series_name="新增確診",
    y_axis=cases,
    markpoint_opts=opts.MarkPointOpts(
        data=[
            opts.MarkPointItem(type_="max", name="最大值")

        ]
    ),
    markline_opts=opts.MarkLineOpts(
        data=[opts.MarkLineItem(type_="average", name="平均值")]
    ),
)
.set_global_opts(
    title_opts=opts.TitleOpts(title="美國每日新增確診折線圖", subtitle=""),
    tooltip_opts=opts.TooltipOpts(trigger="axis"),
    toolbox_opts=opts.ToolboxOpts(is_show=True),
    xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
)
L1.load_javascript()
L1.render_notebook()

在這裡插入圖片描述

L2 = (
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
    series_name="新增死亡",
    y_axis=deaths,
    markpoint_opts=opts.MarkPointOpts(
        data=[opts.MarkPointItem(type_="max", name="最大值")]
    ),
    markline_opts=opts.MarkLineOpts(
        data=[
            opts.MarkLineItem(type_="average", name="平均值"),
            opts.MarkLineItem(symbol="none", x="90%", y="max"),
            opts.MarkLineItem(symbol="circle", type_="max", name="最高點"),
        ]
    ),
)
.set_global_opts(
    title_opts=opts.TitleOpts(title="美國每日新增死亡折線圖", subtitle=""),
    tooltip_opts=opts.TooltipOpts(trigger="axis"),
    toolbox_opts=opts.ToolboxOpts(is_show=True),
    xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
)
L2.load_javascript()
L2.render_notebook()

在這裡插入圖片描述

(3)畫出截止5.19,美國各州累計確診、死亡人數和病死率—>表格

root = "test4/result3/part-00000-253c81bd-4448-4823-954f-e7e9934605c9-c000.json"
allState = []
with open(root, 'r') as f:
    while True:
        line = f.readline()
        if not line:                            # 到 EOF,返回空字串,則終止迴圈
            break
        js = json.loads(line)
        row = []
        row.append(str(js['state']))
        row.append(int(js['totalCases']))
        row.append(int(js['totalDeaths']))
        row.append(float(js['deathRate']))
        allState.append(row)

table = Table()

headers = ["State name", "Total cases", "Total deaths", "Death rate"]
rows = allState
table.add(headers, rows)
table.set_global_opts(
    title_opts=ComponentTitleOpts(title="美國各州疫情一覽", subtitle="")
)
table.load_javascript()
table.render_notebook()

在這裡插入圖片描述

(4)畫出美國確診最多的10個州——>詞雲圖

root = "test4/result4/part-00000-9dc04a1e-7763-4429-93fc-23b2f3d45512-c000.json"
data = []
with open(root, 'r') as f:
    while True:
        line = f.readline()
        if not line:                            # 到 EOF,返回空字串,則終止迴圈
            break
        js = json.loads(line)
        row=(str(js['state']),int(js['totalCases']))
        data.append(row)

c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診Top10"))
)
c.load_javascript()
c.render_notebook()

在這裡插入圖片描述

(5)畫出美國死亡最多的10個州——>象柱狀圖

root = "test4/result5/part-00000-a8169860-0a64-4c5c-b740-fcdafc74505e-c000.json"
state = []
totalDeath = []
with open(root, 'r') as f:
    while True:
        line = f.readline()
        if not line:                            # 到 EOF,返回空字串,則終止迴圈
            break
        js = json.loads(line)
        state.insert(0,str(js['state']))
        totalDeath.insert(0,int(js['totalDeaths']))

c = (
PictorialBar()
.add_xaxis(state)
.add_yaxis(
    "",
    totalDeath,
    label_opts=opts.LabelOpts(is_show=False),
    symbol_size=18,
    symbol_repeat="fixed",
    symbol_offset=[0, 0],
    is_symbol_clip=True,
    symbol=SymbolType.ROUND_RECT,
)
.reversal_axis()
.set_global_opts(
    title_opts=opts.TitleOpts(title="PictorialBar-美國各州死亡人數Top10"),
    xaxis_opts=opts.AxisOpts(is_show=False),
    yaxis_opts=opts.AxisOpts(
        axistick_opts=opts.AxisTickOpts(is_show=False),
        axisline_opts=opts.AxisLineOpts(
            linestyle_opts=opts.LineStyleOpts(opacity=0)
        ),
    ),
)
)
c.load_javascript()
c.render_notebook()

在這裡插入圖片描述

(6)找出美國確診最少的10個州——>詞雲圖

root = "test4/result6/part-00000-9dc41291-7691-4ab3-8a09-2e4fb32bbd02-c000.json"
data = []
with open(root, 'r') as f:
    while True:
        line = f.readline()
        if not line:                            # 到 EOF,返回空字串,則終止迴圈
            break
        js = json.loads(line)
        row=(str(js['state']),int(js['totalCases']))
        data.append(row)

c = (
WordCloud()
.add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
)
c.load_javascript()
c.render_notebook()

在這裡插入圖片描述

(7)找出美國死亡最少的10個州——>漏斗圖

root = "test4/result7/part-00000-0891d181-56a9-4d70-a94c-259bda524607-c000.json"
data = []
with open(root, 'r') as f:
    while True:
        line = f.readline()
        if not line:                            # 到 EOF,返回空字串,則終止迴圈
            break
        js = json.loads(line)
        data.insert(0,[str(js['state']),int(js['totalDeaths'])])

c = (
Funnel()
.add(
    "State",
    data,
    sort_="ascending",
    label_opts=opts.LabelOpts(position="inside"),
)
.set_global_opts(title_opts=opts.TitleOpts(title=""))
)
c.load_javascript()
c.render_notebook()

在這裡插入圖片描述

(8)美國的病死率—>餅狀圖

root = "test4/result8/part-00000-47009151-50c4-4bb2-acb1-ddc2e101f6e2-c000.json"
values = []
with open(root, 'r') as f:
    while True:
        line = f.readline()
        if not line:                            # 到 EOF,返回空字串,則終止迴圈
            break
        js = json.loads(line)
        if str(js['state'])=="USA":
            values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
            values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
c = (
Pie()
.add("", values)
.set_colors(["blcak","orange"])
.set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
)
c.load_javascript()
c.render_notebook()

在這裡插入圖片描述

四、遇到的問題

1.找不到spark

在這裡插入圖片描述
在開頭加上下面兩行程式碼即可

import findspark
findspark.init()

2.找不到python

檢視日誌發現不是master中找不到python,而是slave中沒找到,然後發現slave中沒有安裝python,在兩個slave中按照第一步安裝anaconda即可
在這裡插入圖片描述