1. 程式人生 > >解析資料踩過的坑

解析資料踩過的坑

import sys

from pyspark.sql.types import StructType, StringType, StructField

reload(sys)
sys.setdefaultencoding('utf8')
# Path for spark source folder
import os

# os.environ['SPARK_HOME'] = "/opt/spark-2.0.1-bin-hadoop2.7/opt/spark-2.0.1-bin-hadoop2.7"

# Append pyspark  to Python Path
# sys.path.append("/opt/spark-2.0.1-bin-hadoop2.7/python/")
# sys.path.append("/opt/spark-2.0.1-bin-hadoop2.7/python/lib/")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext
    from pyspark.sql import DataFrame
    from pyspark.sql import Row
    print("Successfully imported Spark Modules")
except ImportError as e:
    print("Can not import Spark Modules", e)
    sys.exit(1)
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import Row
import os
sparkpath = "spark://192.168.31.10:7077"

# Path for spark source folder
# os.environ['SPARK_HOME'] = "/opt/spark-2.0.1-bin-hadoop2.7"

# Append pyspark  to Python Path
# sys.path.append("/opt/spark-2.0.1-bin-hadoop2.7/python/")
# sys.path.append("/opt/spark-2.0.1-bin-hadoop2.7/python/lib/")

hdfspath = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/courtnotice.csv'
hdfspath_1 = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc_litigant.txt'
hdfspath_2 = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc_detail_tmp.csv'
hdfspath_3 ='hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/feature/feature_extract_shixin.csv'
hdfspath_4 ='hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/mid/saic/share_as_fr.csv'
hdfs_temp = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/update_trade_cnt_feature_data.csv'
hdfs_temp_02 = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/new_update_lgbm_model_data.csv'

hdfs_temp_03 = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/test_revoke_features_to_results.csv '
trademark_raw = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/trademark.csv'
trademark_mid = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/mid/trademark_mid.csv'
trademark_feature ='hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/feature/feature_extract_trademark.csv'
feature_extract_judgedoc ='hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/feature/feature_extract_judgedoc.csv'
judgedoc_litigant_mid = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/mid/judgedoc_litigant.csv'
judgedoc_litigant_raw = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc_litigant.txt'

#對外投資公司執行次數
network_c_inv_c_zhixing_cnt_temp ='hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/feature/network_c_inv_c_zhixing_cnt.csv'

# 公司型別的股東
raw_path_c_inv_c_share_from_saic = "hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/mid/list_company_a/c_inv_c_share.csv"
#該公司沒有公司型別的股東;

#公司   ---投資--->  公司  /hdfs/riskModelNotDaily/raw/saic/company_inv_company.csv
company_inv_company = "hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/raw/saic/company_inv_company.csv"
#主體公司的股東投資的公司
rst_c_inv_c_inv_c = "hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/mid/network/c_inv_c_inv_c.csv"

#主體公司股東對外作為股東的公司
share_as_share_output = "hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/mid/network/share_as_share.csv"
cq_bank_1000 = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank.csv'

shixin = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/shixin.csv'
punish = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/punish.csv'
punish_litigants = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/punish_litigants.csv'
patent = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/company_patent.csv'
# trademark = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/mid/trademark_mid.csv'
courtnotice = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/mid/courtnotice_mid.csv'
courtannouncement = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/part-r-00000-83ceb54d-a0ac-4c67-b8c9-948bb2d11aa4.csv'
courtsszc_company = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/court_sszc_crawl_company.csv'
courtsszc_detail = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/court_sszc_crawl.csv'

judgedoc_company = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc_litigant.csv'
judgedoc_detail = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc.csv'

shixin_company = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/court_shixin_company.csv'
trademark = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/trademark.csv'
# punish = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/punish.csv'
# punish_litigants = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/punish_litigants.csv'

# df1 = pd.read_csv('/home/sc/Downloads/staticdata/3.01_data/update_trade_cnt_feature_data.csv')
# print(df1.groupby(df1['_c1']).size())
def test_courtsszc(spark,sc):
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        courtsszc_company,header = True)
    dfkk2.createOrReplaceTempView('y2')
    dfkk3 = sqlContext.read.csv(
        courtsszc_detail, header=True)
    dfkk3.createOrReplaceTempView('y3')
    # dfkk2.show()
    dfhh1 = sqlContext.sql(
        """select y2.company_name,y3.* from y2 left join y3 on y2.sc_data_id = y3.sc_data_id
         """)
    dfhh1.show()
    dfhh1.createOrReplaceTempView('hh1')
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh2 = sqlContext.sql(
        """select y4.company_name as company_name_a,hh1.* from y4 left join hh1 on y4.company_name = hh1.company_name
         """)
    dfhh2.show()
    dfhh2.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_courtsszc.csv",
                                   mode='overwrite', header=True,quoteAll = True)
    spark.stop()

def test_judgedoc(spark,sc):
    'litigant_name, doc_id,litigant_type,case_type,case_reason,publish_date,trail_date,case_result'
    judgedoc_sample_field = ['litigant_name','doc_id','litigant_type','case_type',
                             'case_reason','publish_date','trail_date','case_result']
    judgedoc_sample_schema = StructType(
    [StructField(field_name, StringType(), True) for field_name in judgedoc_sample_field])
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        judgedoc_company,header = False,sep = '\t')
    dfkk2.createOrReplaceTempView('y2')
    # print(dfkk2.columns)
    # spark.stop()
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh2 = sqlContext.sql(
        """select y4.company_name,y2._c0 as doc_id from y4 left join y2 on y4.company_name = y2._c1
         """)
    # dfhh2.show()
    dfhh2.createOrReplaceTempView('hh2')
    dfkk3 = sqlContext.read.csv(
        judgedoc_detail,quote="",schema=judgedoc_sample_schema,header=False,nanValue='nan',
    nullValue='nan',inferSchema=True)
    print(dfkk3.columns)
    print(type(dfkk3))
    # print(dfkk3[1:3])
    spark.stop()
    # dfkk3 = dfkk3 \
    #     .filter(lambda line: len((line["value"] or "").split(',')) == 8) \
    #     .map(lambda line: Row(**dict(zip(judgedoc_sample_field, line["value"].split(','))))) \
    #     .toDF(judgedoc_sample_schema)
    #     #  \

    dfkk3.createOrReplaceTempView('y3')
    # dfkk2.show()
    dfhh1 = sqlContext.sql(
        """select hh2.company_name,y3.* from hh2 left join y3 on hh2.doc_id = y3.doc_id
         """)
    # dfhh1.show()
    dfhh1.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_judgedoc.csv",
                                   mode='overwrite', header=True,quoteAll = True,sep = '\t')
    spark.stop()

def test_shixin(spark,sc):
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        shixin_company,header = True)
    dfkk2.createOrReplaceTempView('y2')
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh2 = sqlContext.sql(
        """select y4.company_name as company_name_a,y2.* from y4 left join y2 on y4.company_name = y2.company_name
         """)
    dfhh2.show()
    dfhh2.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_shixin.csv",
                                   mode='overwrite', header=True, quoteAll = True )
    spark.stop()

def test_punish(spark,sc):
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        punish_litigants,header = True)
    dfkk2.createOrReplaceTempView('y2')
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh3 = sqlContext.sql(
        """select company_name,y2.sc_data_id from y4 left join y2 on y4.company_name = y2.name
         """)
    dfhh3.createOrReplaceTempView('y3')
    dfkk6 = sqlContext.read.csv(
        punish, header=True)
    dfkk6.createOrReplaceTempView('y6')
    dfhh5 = sqlContext.sql(
        """select company_name,y6.* from y3 left join y6 on y3.sc_data_id = y6.sc_data_id
         """)

    dfhh5.show()
    dfhh5.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_punish.csv",
                                   mode='overwrite', header=True, quoteAll = True )
    spark.stop()

def test_trademark(spark,sc):
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        trademark,header = True)
    dfkk2.createOrReplaceTempView('y2')
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh3 = sqlContext.sql(
        """select y4.company_name as company_name_a,y2.* from y4 left join y2 on y4.company_name = y2.company_name
         """)
    dfhh3.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_trademark.csv",
                                   mode='overwrite', header=True, quoteAll = True )
    spark.stop()

#
if __name__ == '__main__':

    spark = SparkSession.builder.master(sparkpath).appName("SC_ETL_ccs_spark") \
        .getOrCreate()
    # spark.conf.set("spark.driver.maxResultSize", "4g")
    # spark.conf.set("spark.sql.broadcastTimeout", 1200)
    # spark.conf.set("spark.sql.crossJoin.enabled", "true")
    # spark.conf.set("spark.cores.max", "10")
    sc = spark.sparkContext
    # test_courtsszc(spark, sc)
    # test_judgedoc(spark, sc)
    # test_shixin(spark, sc)
    # test_punish(spark, sc)
    test_trademark(spark, sc)

對於儲存的結果表,用csv開啟後為亂碼.可以儲存的時候設定引數:quoteAll = True;就是
引數詳細解釋:

對於裁判文書:
dfkk3 = sqlContext.read.csv(
judgedoc_detail,quote="",schema=judgedoc_sample_schema,header=False,nanValue=‘nan’,
nullValue=‘nan’,inferSchema=True)
讀表的時候有問題,這樣設定引數還是有問題;

相關推薦

解析資料

import sys from pyspark.sql.types import StructType, StringType, StructField reload(sys) sys.setdefaultencoding('utf8') # Path fo

資料

1.jps有資料①瀏覽器輸入:(機器名):50070打不開,把使用者名稱替換成ip地址訪問②瀏覽器輸入:(ip地址):50070還打不開,關閉防火牆  service iptables stop

EclipseSpring外掛不顯示問題,我,分享下

網上最多的說法是版本不相容(spring外掛和eclipse),但是其實大家都不是傻子,都會看對應版本。 所以最多情況還是版本都對的情況下出現外掛不顯示等問題,下面貼出穩穩的安裝外掛和顯示方法; 1、先確定環境,比如eclipse+jdk+spring外掛, 這裡4.5版本的eclipse最好搭

資料

hbase出現的問題:1、Hbase叢集啟動不了。regionserver.HRegionServer: error telling master we are upcom.google.protobuf.ServiceException: java.io.IOExcepti

程式設計師,Mybatis你嗎?

摘自:https://yq.aliyun.com/roundtable/49835?&utm_campaign=sys&utm_medium=market&utm_source=edm_email&msctype=email&msca

27.Spring-Boot中攔截器中靜態資源的處理()以及Spring mvc configuring拓展介紹

一.springboot中對靜態資源的處理  預設情況下,springboot提供存放放置靜態資源的資料夾:  /static  /public   /resources  /META-INF/resources 對於maven專案即就是存在src/main/re

Fragment全解析系列(一):那些年

本篇主要介紹一些最常見的Fragment的坑以及官方Fragment庫的那些自身的BUG,並給出解決方案;這些BUG在你深度使用時會遇到,比如Fragment巢狀時或者單Activity+多Fragment架構時遇到的坑。 Fragment是可以讓你的app縱享絲滑的設計,如果你的app想在

利用pandas的to_sql將資料插入MySQL資料庫和所

前言 最近做一個Django web的專案要把爬取的一些資料存入MySQL中,資料儲存為csv格式,想到pandas中有to_sql這個方法,就採用它了 準備:連線MySQL資料庫所需的第三方包pymysql、sqlalchemy(pip安裝即可) 實現 from sql

資料治理:那些年,我們一起

寫在前面: 這是一個系列文章,沉澱了我在資料治理領域的一些實踐和思考。共分為5篇。分別是: 一、資料治理:那些年,我們一起踩過的坑 主要講講資料治理工作中常見的一些誤區。 二、要打仗,你手裡先得有張地圖:資料治理之元資料管理 這一篇講講元資料的概念和具體應用場景。 三、不忘初

小程式的一個小---解析二維碼decodeURIComponent() url解碼

  因為我們需要使用者掃碼進入小程式,每一個貨櫃都有一個對應的二維碼,當然每個二維碼裡的資訊也不一樣。使用者掃碼進入小程式之後,二維碼的資訊會以引數q帶進去,而我們只能在onLoad事件中拿到這個引數, 但是獲取到的資訊是經過encodeURIComponent()編碼的,注意

多線程和異步編程示例和實踐-

round 推送 在線 png 很慢 main.c 服務容器 con slist 上兩篇文章,主要介紹了Thread、ThreadPool和TPL 多線程異步編程示例和實踐-Thread和ThreadPool 多線程異步編程示例和實踐-Task 本文中,分享兩則我們在

java使用默認線程池(二)

true 如何 vol private popu command row 由於 ges 雲智慧(北京)科技有限公司 陳鑫 是的。一個線程不可以啟動兩次。那麽它是怎麽推斷的呢? public synchronized void start() {

phpstorm配置xdebug

evel src ini 文件名 文件的 分享 傻傻 自動 配置文件 按網上的諸多教程, 1.下載對應文件,放在php放置執行文件的文件夾 2.更改php.ini文件。一搜,是哪個呢 原先記得好像沒有第一個文件,第二個文件名為,php.ini-development.上網

Mac 下安裝wxpython

blog res alt 指定版本 打開 png ces 點擊 source 一.下載   1.wxpython 下載地址:https://sourceforge.net/projects/wxpython/files/wxPython/2.8.12.1/         

初學spring boot

9.png field ted require order false test boot mysql- 一、搭建spring boot環境   maven工程       pom文件內容 <project xmlns="http://maven

cocos2dx之WebView(android返回鍵處理問題)

lib mar 是否 12px blog tex clas ons 測試的   最近遊戲接入了一個私服平臺,由於沒有sdk,所以支付相關的操作需要在網頁端進行,也就是說點擊充值需要在遊戲內部彈出一個網頁,並定位到平臺充值的地址。查閱相關資料後決定使用cocos2dx自帶的W

總結Idea環境,吐血

你在 搜索 app 服務器 tomcat 點擊 path 詭異 服務 1)首先是JDK環境安裝,這一步千萬要出錯,我就是配錯了CLASSPATH導致了很詭異的問題。可能結果:就是RUN到tomcat不報錯,但是有404錯誤。 2)然後是IDEA安裝,這裏要十分註意如果你選擇

軟件測試曾經

密碼修改 tro 一個 不知道 勝任 兩個 賬號 bsp 提示 原文鏈接:https://www.zhihu.com/question/60591301/answer/209549333 軟件測試曾經踩過的坑? 1.自以為了解業務邏輯,實際浮於表面 這是個深坑

1.MySQL5.7.19 安裝配置

edi nor 切換 normal table 無法啟動 sql安裝 span 安裝配置 這篇文章主要是分享 安裝MySQL時遇到的一些問題,以及解決方法。 第一步:下載MySQL 下載地址:https://dev.mysql.com/downloads/mysql/5.1

前端開發工具Brackets介紹,安裝及安裝Emme插件時

module 文件 不想 現在 div 當前 user 沒有 -s   對於前端開發的園友來說有可能IDE工具有很多,層次不窮,還有每個人的喜好及習慣也不一樣,因為我是一名後端開發的.Net程序員,但是大家都知道,現在都提倡什麽全棧工程師,所以也得會點前端開發,所以我對於