1. 程式人生 > >Spark亂碼處理以及儲存csv格式

Spark亂碼處理以及儲存csv格式

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import java.io.{StringReader, StringWriter}

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

import scala.collection.JavaConversions._
import au.com.bytecode.opencsv.{CSVReader, CSVWriter}
import 
com.typesafe.config.{Config, ConfigFactory} import org.apache.spark.rdd.RDD case class Data(ID: String, DEV_CHNNUM: String, DEV_NAME: String, DEV_CHNNAME: String, CAR_NUM: String, CAR_NUMTYPE: String, CAR_NUMCOLOR: String, CAR_SPEED: String, CAR_TYPE: String, CAR_COLOR: String, CAR_LENGTH: String,
CAR_DIRECT: String, CAR_WAY_CODE: String, CAP_TIME: String, CAP_DATE: String, INF_NOTE: String, MAX_SPEED: String, MIN_SPEED: String, CAR_IMG_URL: String, CAR_IMG1_URL: String, CAR_IMG2_URL: String, CAR_IMG3_URL: String, CAR_IMG4_URL: String, CAR_IMG5_URL: String, REC_STAT: String, DEV_CHNID: String
, CAR_IMG_COUNT: String, SAVE_FLAG: String, DC_CLEANFLAG: String, PIC_ID: String, CAR_IMG_PLATE_TOP: String, CAR_IMG_PLATE_LEFT: String, CAR_IMG_PLATE_BOTTOM: String, CAR_IMG_PLATE_RIGHT: String ,CAR_BRAND: String, ISSAFETYBELT: String, ISVISOR: String, BIND_STAT: String, CAR_NUM_PIC: String, COMBINED_PIC_URL: String, VERIFY_MEMO: String ) object Newfun { def main(args: Array[String]) { val conf = new SparkConf().setAppName("DeviceCom-" + System.getenv("USER")) // .setMaster("local[5]") val sc = new SparkContext(conf) val config: Config = ConfigFactory.load("test1.conf") val value: RDD[String] = sc.hadoopFile(config.getString("datapath"), classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 50) .map(p => { val str = new String(p._2.getBytes, 0, p._2.getLength, "GBK") val str1 = str.replaceAll(";", ",") str1 }).map(p => { val strings = p.split(",") Data(strings(0), strings(1),strings(2), strings(3),strings(4), strings(5),strings(6), strings(7),strings(8), strings(9),strings(10), strings(11) ,strings(12), strings(13),strings(14), strings(15),strings(16), strings(17),strings(18), strings(19),strings(20), strings(21),strings(22), strings(23), strings(24), strings(25),strings(26), strings(27),strings(28), strings(29),strings(30), strings(31),strings(32), strings(33),strings(34), strings(35),strings(36) ,strings(37), strings(38),strings(39), strings(40)) }).map(data => List(data.ID, data.DEV_CHNNUM, data.DEV_NAME, data.DEV_CHNNAME, data.CAR_NUM, data.CAR_NUMTYPE, data.CAR_NUMCOLOR, data.CAR_SPEED, data.CAR_TYPE, data.CAR_COLOR, data.CAR_LENGTH, data.CAR_DIRECT, data.CAR_WAY_CODE, data.CAP_TIME, data.CAP_DATE, data.INF_NOTE, data.MAX_SPEED, data.MIN_SPEED, data.CAR_IMG_URL, data.CAR_IMG1_URL, data.CAR_IMG2_URL, data.CAR_IMG3_URL, data.CAR_IMG4_URL, data.CAR_IMG5_URL, data.REC_STAT, data.DEV_CHNID, data.CAR_IMG_COUNT, data.SAVE_FLAG, data.DC_CLEANFLAG, data.PIC_ID, data.CAR_IMG_PLATE_TOP, data.CAR_IMG_PLATE_LEFT, data.CAR_IMG_PLATE_BOTTOM, data.CAR_IMG_PLATE_RIGHT ,data.CAR_BRAND, data.ISSAFETYBELT, data.ISVISOR, data.BIND_STAT, data.CAR_NUM_PIC, data.COMBINED_PIC_URL, data.VERIFY_MEMO).toArray) .mapPartitions { data => val stringWriter = new StringWriter() val csvWriter = new CSVWriter(stringWriter) csvWriter.writeAll(data.toList) Iterator(stringWriter.toString) } value.saveAsTextFile(config.getString("outpath")) sc.stop() } }