1. 程式人生 > >spark的學習,lambda,map,filter,flatmap/按照字典表中的指定鍵或值排序

spark的學習,lambda,map,filter,flatmap/按照字典表中的指定鍵或值排序

spark的學習,lambda,map,filter,flatmap
重點:需要明白各個不同函式作用後剩下的資料的情況,是保留全部列,還是當前作用的列;其次,還需明白不同函式他們的對映條件,通常都是二值變數作為條件:

經典寫法1:

 df_crawler_merged_name_err = df_crawler_merged.rdd.filter(lambda _: not _legal_check(_["name"])).map(lambda _: Row(name=_["name"]))

經典寫法2:

 a_data = json.loads(a_str)

    if
a_data and a_data.get("shareHolderList", []): a_b_l = a_data.get("shareHolderList", []) shareholderName_s = filter(lambda _: len(_) > 0, [_.get("shareholderName", "") for _ in a_b_l if _]) return shareholderName_s return []

經典寫法3
比較複雜

 news_path_data_rows = news_data_2_gz_rawRdd\
     .map(lambda _: (news_data_2_aPathPattern.findall(_), contentPattern.findall(_)))\
     .filter
(lambda _: len(_)>1 and len(_[0]) > 0 and len(_[1]) > 0 and json.loads(_[1][0]).get("content"))\ .map(lambda _: Row(pth=_[0][0].replace('newsAnalysis/', ""), content=json.loads(_[1][0]).get("content")))\ .map(lambda _: Row(result=mapping_tag_by_content(_["content"], _["pth"])))\ .filter
(lambda _: len(_["result"]) > 1)

經典寫法4:filter裡面含多個條件,類似地,其他對映函式也可以;

df_eid_person_text = spark.read.text("hdfs://sc-bd-10:9000/scdata/huangyu/person_new.csv")
df_eid_person = df_eid_person_text\
    .rdd\
    .map(lambda _: Row(**clean_person_row(_["value"])))\
    .filter(lambda _: _["new_eid"] and person_is_legal(_["person_name"]) and not filter_inv_name(_["person_name"])).toDF()
df_eid_person.createOrReplaceTempView("eid_person_table")

上述filter裡面含有多個條件:

filter(lambda _: _["new_eid"] and person_is_legal(_["person_name"]) and not filter_inv_name(_["person_name"]))

按照字典表中的指定鍵或值排序:

 classifies = sorted(result.items(), key=lambda _: _[1], reverse=True)

上式中lambda表示式逗號後面的列表組成一個判斷的物件(變數列表),[.get(“shareholderName”, “”) for in a_b_l if ],表示變數,首先是for _ in a_b_l ,再是if ,再是.get(“shareholderName”, “”),最後為一個列表;

#!/usr/bin/env python
# encoding: utf-8

import sys

reload(sys)
sys.setdefaultencoding('utf8')


def _parse_ent_status(a_str):
    if not a_str:
        return ""
    a_data = json.loads(a_str)
    if a_data and a_data.get("basicList", []):
        a_b_l = a_data.get("basicList", [])
        return a_b_l[0].get("enterpriseStatus", "")
    return ""


def parse_ent_status(a_str):
    try:
        return _parse_ent_status(a_str)
    except:
        return ""


def statistic_status():
    parse_ent_status_udf = udf(parse_ent_status, StringType())
    df_crawler_status = df_crawler_merged.withColumn("status", parse_ent_status_udf(df_crawler_merged["results"]))
    df_crawler_status.createOrReplaceTempView("crawler_status")
    spark.sql("select DISTINCT(status) from crawler_status").show(2000, False)


def _parse_ent_regCapCur(a_str):
    regCapCur = ""
    if not a_str:
        return ""
    a_data = json.loads(a_str)
    if a_data and a_data.get("basicList", []):
        a_b_l = a_data.get("basicList", [])
        regCapCur = a_b_l[0].get("regCapCur", "")

    for a_share in a_data.get("shareHolderList", []):
        regCapCur = regCapCur or a_share.get("regCapCur", "")

    return regCapCur


def parse_ent_regCapCur(a_str):
    try:
        return _parse_ent_regCapCur(a_str)
    except:
        return ""


def statistic_regCapCur():
    a_udf = udf(parse_ent_regCapCur, StringType())
    df_crawler_field = df_crawler_merged.withColumn("regCapCur", a_udf(df_crawler_merged["results"]))
    df_crawler_field.createOrReplaceTempView("crawler_regCapCur")
    spark.sql("select regCapCur, name from crawler_regCapCur where LENGTH(regCapCur) > 10 or regCapCur like '%.%' or regCapCur like '%0%'").show(2000, False)

    # spark.sql("select DISTINCT(regCapCur) from crawler_regCapCur").show(2000, False)


def _parse_position(a_str):
    if not a_str:
        return ""

    a_data = json.loads(a_str)
    a_person_list = [
        u"監" , u"其" , u"董" , u"經", u"負責人", u"代表" , u"理事長", u"人", u"投資人", u"支局長", u"長", u"工程師",
    ]
    if a_data and a_data.get("personList", []):
        a_b_l = a_data.get("personList", [])
        # return u"、".join(set([_.get("position", u"") for _ in a_b_l if _]))
        position_filter = u"、".join(set([_.get("position", u"") for _ in a_b_l if _]))

        # 統計過濾

        if any(map(lambda _: _ in position_filter, a_person_list)):
            return ""
        else:
            return position_filter

    return ""


def parse_position(a_str):
    try:
        return _parse_position(a_str)
    except:
        return ""


def statistic_position():
    # position_str_rdd = df_crawler_merged.rdd.map(lambda _: Row(position_str=parse_position(_["results"]), name=_["name"]))
    position_str_rdd = df_crawler_merged.rdd.map(lambda _: Row(name=_["name"], position_str=_parse_position(_["results"])))
    position_str_rdd.toDF().createOrReplaceTempView("position_all")
    spark.sql("select name from position_all where LENGTH(position_str) > 1 ").show(10000000, False)


def _parse_share_name(a_str):
    if not a_str:
        return []

    a_data = json.loads(a_str)

    if a_data and a_data.get("shareHolderList", []):
        a_b_l = a_data.get("shareHolderList", [])
        shareholderName_s = filter(lambda _: len(_) > 0, [_.get("shareholderName", "") for _ in a_b_l if _])
        return shareholderName_s
    return []


def statistic_share_name():
    # todo 需要繼續做
    position_str_rdd = df_crawler_merged.rdd.map(lambda _: Row(share_name=_parse_share_name(_["results"]))).flatMap(lambda x: x["share_name"]).map(lambda _: Row(name=_))
    position_str_rdd.toDF().createOrReplaceTempView("share_name_table")

    # df_name_all = spark.sql("select _c0 as name from base_info_named_eid UNION select name from crawler_merged")
    df_name_all = spark.sql("select _c0 as name from base_info_named_eid")

    df_name_all.createOrReplaceTempView("name_all")
    # df_share_name = spark.sql("select t1.name from share_name_table t1 where t1.name in (SELECT t2.name from name_all t2) or LENGTH(t1.name) > 5")
    df_share_name = spark.sql("select t1.name from share_name_table t1 where t1.name in (SELECT t2.name from name_all t2) and LENGTH(t1.name)>4")
    print df_share_name.count()

    # print df_share_name.show(10000, False)



def statistic_company_name():
    df_crawler_merged_name_err = df_crawler_merged.rdd.filter(lambda _: not _legal_check(_["name"])).map(lambda _: Row(name=_["name"]))
    print df_crawler_merged_name_err.distinct().toDF().show(10000, False)

python儲存csv的程式碼:

def get_origin_content(file_path):
    with codecs.open(filename=file_path, mode="r", encoding="utf8") as f:
        try:
            data = json.loads(f.read())
        except Exception as e:
            data = ""
    return data


def label_save(save_path, content, sentence_lst):
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    with codecs.open(save_path, "a", "utf8") as f:
        f.write(json.dumps({"content": content, "sentenceLst": sentence_lst}, ensure_ascii=False, indent=2))