1. 程式人生 > 其它 >02_Hadoop序列化_2.2 自定義Bean物件 實現序列化介面(Writable)

02_Hadoop序列化_2.2 自定義Bean物件 實現序列化介面(Writable)

程式碼示例

package GroupByPoneNumPk {

  import java.io.{DataInput, DataOutput}
  import java.lang

  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path
  import org.apache.hadoop.io.{LongWritable, Text, Writable}
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} // Mapper 類 // 每個Mapper類例項 處理一個切片檔案 class GroupByPoneNumMapper extends Mapper[LongWritable, Text, Text, FlowBean] { var text = new Text // 每行記錄呼叫一次map方法 override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, FlowBean]#Context) = {
//1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 //1. 切割資料 var line: Array[String] = value.toString.split(" +") println("第一行 : " + line.mkString("-")) var phone = line(1) var upflow = line.reverse(2) var downflow
= line.reverse(1) //2. 建立FlowBean物件 var flowBean = new FlowBean(upflow.toInt, downflow.toInt, 0) //3. 寫入到環形緩衝區 text.set(phone) context.write(text, flowBean) println(flowBean) } } // Reducer 類 // 所有Mapper例項 執行完畢後 Reducer才會執行 // Mapper類的輸出型別 = Reducer類的輸入型別 class GroupByPoneNumReducer extends Reducer[Text, FlowBean, Text, FlowBean] { // 每個key呼叫一次 override def reduce(key: Text, values: lang.Iterable[FlowBean], context: Reducer[Text, FlowBean, Text, FlowBean]#Context) = { println("reduce into ....") //1. 對 upflow、downflow求和 var sumUpflow = 0 var sumDownflow = 0 values.forEach( bean => { sumUpflow += bean.upflow sumDownflow += bean.downflow } ) //2. 求 總流量 var flowBean = new FlowBean(sumUpflow, sumDownflow, sumUpflow + sumDownflow) //2. 寫出資料 context.write(key, flowBean) println("第二行 :" + flowBean) } } // Driver object Driver { def main(args: Array[String]): Unit = { //1. 獲取配置資訊以及 獲取job物件 var configuration = new Configuration var job: Job = Job.getInstance(configuration) //2. 註冊本Driver程式的jar job.setJarByClass(this.getClass) job.setJobName("scala mr") //3. 註冊 Mapper 和 Reducer的jar job.setMapperClass(classOf[GroupByPoneNumMapper]) job.setReducerClass(classOf[GroupByPoneNumReducer]) //4. 設定Mapper 類輸出key-value 資料型別 job.setMapOutputKeyClass(classOf[Text]) job.setMapOutputValueClass(classOf[FlowBean]) //5. 設定最終輸出key-value 資料型別 job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[FlowBean]) //6. 設定輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path("src/main/data/input/phone_data.txt")) FileOutputFormat.setOutputPath(job, new Path("src/main/data/output")) //7. 提交job val bool: Boolean = job.waitForCompletion(true) System.exit(bool match { case true => "0".toInt case false => "1".toInt }) } } // 1363157985066 id // 13726230503 手機號 // 00-FD-07-A4-72-B8:CMCC MAC地址 // 120.196.100.82 網路ip // i02.c.aliimg.com 域名 // 24 // 27 // 2481 上行流量 // 24681 下行流量 // 200 網路狀態碼 class FlowBean() extends Writable { var upflow = 0 var downflow = 0 var sumflow = 0 //輔助構造器 def this(upflow: Int, downflow: Int, sumflow: Int) { this() this.upflow = upflow this.downflow = downflow this.sumflow = sumflow } override def write(out: DataOutput): Unit = { out.writeInt(upflow) out.writeInt(downflow) out.writeInt(sumflow) } override def readFields(in: DataInput): Unit = { upflow = in.readInt downflow = in.readInt sumflow = in.readInt } override def toString: String = { s"${upflow} \t ${downflow} \t ${sumflow}" } } }