1. 程式人生 > >hadoop streaming map端join

hadoop streaming map端join

  • 測試資料
    # lixiang_list.txt(小表,可以在map端載入到記憶體中)
    #立項ID 立項名稱
    1800 心願券測試003
    1801 fw心願單
    1802 wtest心願券0524
    1803 HW心願單01
    1804 心願券測試006
    1805 心願券測試007
    1806 心願券測試008


    # order_list.txt (大表)
    #訂單編號            使用者手機號  訂單金額   建立時間 立項ID  使用者ID
    18050411131170468193 18511745550 0.01 1525403591
1768 6502216546 18050411131741948542 18511745550 0.01 1525403597 1768 6502216546 18050411132051347006 18511745550 0.01 1525403600 1768 6502216546 18050411132624157487 18511745550 0.01 1525403606 1768 6502216546 #連線結果 立項ID 立項名稱 訂單編號 訂單金額 使用者手機號 使用者ID 建立時間
  • 思路

    提交作業的時候,將小表文件(lixiang_list.txt)檔案以快取的形式上提交到HDFS上,然後在每個map中先將lixiang_list.txt中的資料載入到記憶體,然後根據接收到的大表的資料,判斷是否需要進行連線。

  • 例項程式碼:

    
    #work.bash
    
    
    #! /bin/bash
    
    
    
    #定義map和reduce的目錄
    
    export WORK_PATH=/var/tmp/map_join
    
    #執行stream的jar包地址
    
    stream=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar
    
    #資料輸入目錄
    
    input=/lcy/map_join/input
    
    #輸出結果目錄
    
    output=/lcy/map_join/output
    
    
    #刪除mr輸出目錄
    
    if $HADOOP_HOME/bin/hdfs dfs -test -d $output
    then
    $HADOOP_HOME/bin/hdfs dfs -rm -r $output fi #執行mapreduce程式 $HADOOP_HOME/bin/hadoop jar $stream \ -D mapreduce.job.reduces=3 \ -D num.key.fields.for.partition=1 \ -D stream.num.map.output.key.fields=1 \ -files $WORK_PATH/lixiang_list.txt \ -input $input/* \ -output $output \ -mapper "python mr.py mapper" \ -file $WORK_PATH/mr.py

    mapreduce檔案

    
    #! /usr/bin/env python
    
    
    # coding:utf-8
    
    
    import sys
    import os
    
    dct_lx = {}
    
    def read_input(file,sepr=' '):
        for line in file:
            data = line.split(sepr)
            yield data
    
    
    def read_lx_data():
        global dct_lx
        with open('lixiang_list.txt','r') as f:
            for line in f:
                line = line.split(' ',1)
                dct_lx[line[0].strip()] = line[1].strip()
    
    def mapper():
        global dct_lx
        filepath = os.environ['map_input_file']
        filename = os.path.split(filepath)[1]
        lines = read_input(sys.stdin)
        for data in lines:
            if "order_list.txt" == filename:
                if len(data) != 6:
                    continue
                if data[4] in dct_lx:
                    print "%s\t%s\t%s\t%s\t%s\t%s\t%s" % (data[4],dct_lx[data[4]],data[0],data[2],data[1],data[5].strip('\n'),data[3])
    
    if __name__ == '__main__':
        d = {"mapper":mapper}
        if sys.argv[1] in d:
            #載入小表資料
            read_lx_data()
            d[sys.argv[1]]()

相關推薦

hadoop streaming mapjoin

測試資料 # lixiang_list.txt(小表,可以在map端載入到記憶體中) #立項ID 立項名稱 1800 心願券測試003 1801 fw心願單 1802 wtest心願券0524 1803

hadoop streaming reducejoin的python兩種實現方式

實現student和course資料表的join操作,以學生編號(sno)為連線欄位 測試資料 student.txt檔案 #以一個空格分隔 #學生編號 姓名 #sno sname 01 lily 02 tom 03 jac

mapjoin

path auth not config 單表 mapreduce == 書包 task package my.hadoop.hdfs.mapreduceJoin; import java.io.BufferedReader; import java.io.FileIn

大資料教程(9.6)mapjoin實現

        上一篇文章講了mapreduce配合實現join,本節博主將講述在map端的join實現;         一、需求     &n

Hadoop應用——ReduceJoin操作

聯接 使用案例 Table EMP: Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Ta

mapjoin的實現 ,用來解決小表中資料的讀取

通過閱讀父類Mapper的原始碼,發現 setup方法是在maptask處理資料之前呼叫一次 可以用來做一些初始化工作 1、需求: 訂單資料表t_order: id date pid amount 1001 20150710 P0001 2

Mapjoin -- 商品跟訂單合併

參考部落格: https://my.oschina.net/leejun2005/blog/111963 需求: 之所以存在reduce join,是因為在map階段不能獲取所需要的join欄位,即同一個key對應的欄位可能位於不同的map中。但是Reduce side join 是非常

hadoop程式設計小技巧(1)---map聚合

測試hadoop版本:2.4 Map端聚合的應用場景:當我們只關心所有資料中的部分資料時,並且資料可以放入記憶體中。使用的好處:可以大大減小網路資料的傳輸量,提高效率;一般程式設計思路:在Mapper的map函式中讀入所有資料,然後新增到一個List(佇列)中,然後在clea

hadoop streaming兩個資料檔案實現join合併操作

hadoop做資料處理,大都是對集合進行操作,因此將資料檔案與另一個數據檔案進行join的操作需求非常常見。 有很多人詢問,下面將彙總一個例子讓入門的朋友掌握編寫方法: [hdfs@server1]$ more clean_item_new 100002303,3368 1

hadoop streaming anaconda python 計算平均值

sdn cat pipe cal 存在 格式 ins too stream 原始Liunx 的python版本不帶numpy ,安裝了anaconda 之後,使用hadoop streaming 時無法調用anaconda python , 後來發現是參數沒設置好。。。

hadoop +streaming 排序總結

.lib fields 排序 1.4 stream 想要 output 廣泛 sep 參考http://blog.csdn.net/baidu_zhongce/article/details/49210787 hadoop用於對key的排序和分桶的設置選項比較多,在公司中

Hadoop Streaming開發要點

而不是 使用 節點 多次 spa cal hive 程序 col 一.shell腳本中的相關配置 1 HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop" 2 STREAM_JAR_PATH="/usr/local/s

MapReduce錯誤集-mapjvm堆空間不足

錯誤 cef div bbf 節拍 aec get com htm 搪th幼蜒窒父vd脅晾破刺談偎假艙帽惺麓ns鏈撈涯http://blog.sina.com.cn/s/blog_17c5e6d070102xtuz.html煤r5漲河誑冒bh傲屹啃醋險材鞍轄關琴氛5g墾弦僨

hadoop streaming 語法

capacity hdfs 壓縮 ups har 格式 -o art str 1、hadoop streaming 命令格式 $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar -D mapred.job.name="s

大數據Hadoop Streaming編程實戰之C++、Php、Python

大數據編程 PHP語言 Python編程 C語言的應用 Streaming框架允許任何程序語言實現的程序在HadoopMapReduce中使用,方便已有程序向Hadoop平臺移植。因此可以說對於hadoop的擴展性意義重大。接下來我們分別使用C++、Php、Python語言實現HadoopWo

Hadoop Streaming

earch IT fault target generate 完成 hadoop集群 問題 tor 原文地址:http://hadoop.apache.org/docs/r1.0.4/cn/streaming.html Hadoop Streaming Stre

hadoop控制map個數(轉)

設置 mapred log AI 不能 map 整體 details net 原文鏈接:https://blog.csdn.net/lylcore/article/details/9136555 hadooop提供了一個設置map個數的參數mapred.map.ta

Hadoop基礎-MapReduce的Join操作

否則 mapred HA 原創 -m mapr red 轉載 hadoop基礎                   Hadoop基礎-MapReduce的Join操作                                     作者:尹正傑 版權聲明:原創作品,

hadoop streaming 中跑python程序,自定義模塊的導入

stack 題解 pat add 程序 oot erro them 問題解決 今天在做代碼重構,以前將所有python文件放到一個文件夾下,上傳到hadoop上跑,沒有問題;不過隨著任務的復雜性增加,感覺這樣甚是不合理,於是做了個重構,建了好幾個包存放不同功能的python

MapReduce的Map Size Join以及Distributed Cache

  首先介紹Distributed Cache(分散式快取),主要功能是把DataNode(客戶端)一些小的檔案送到DataNode上。 1. 通過job.addCacheFile(new Path(filename).toUri) 2.通過job.addCacheFile(n