mysql 連結,資料處理
阿新 • • 發佈:2018-12-21
**
* Created by Administrator on 2018/1/28.
*/
object LogsAnalys {
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://url:3306/ntd_db?useUnicode=true&characterEncoding=utf8"
val username = "user"
val password = "password"
def Format(timestamp: String): String = {
val sdf = new
import java.sql.{Connection, DriverManager, PreparedStatement} import com.alibaba.fastjson.{JSON, JSONObject} import scala.io.{BufferedSource, Source} import java.text.SimpleDateFormat import java.util.Date /** * Created by Administrator on 2018/1/28. */ object YixinAnalys { val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://url:3306/db?useUnicode=true&characterEncoding=utf8" val username = "root" val password = "password" def yiXinTimeFormat(timestamp: String): String = { val sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss") val time = sdf.format(new Date(timestamp.toLong)) time } def main(args: Array[String]): Unit = { println("logs.....................") var connection: Connection = null var prepareStatement: PreparedStatement = null var jsonFile: BufferedSource = null var counts = 0 try { //Class.forName(driver) connection = DriverManager.getConnection(url, username, password) prepareStatement = connection.prepareStatement("insert into ntd_db.yixin_primeval_infos (date,guid,appkey,version,hour) values (?,?,?,?,?);") connection.setAutoCommit(false); // val jsonCitys = "E:\\navinfo_workspace\\x_data\\yin\\in\\1516606537778_json.txt" // val jsonCitys = "E:\\navinfo_workspace\\xu_data\\in\\in\\1516861216718_json.txt" val jsonCitys = "D:\\caIJIBAO\\20180129data\\data20180129\\yi\\1515634349929_json.txt" jsonFile = Source.fromFile(jsonCitys) for (line <- jsonFile.getLines()) { counts += 1 val parseObject: JSONObject = JSON.parseObject(line) val headObject: JSONObject = parseObject.getJSONObject("header") val ver = headObject.getString("version") val appkey = headObject.getString("appKey") val guid = headObject.getString("guid") val time_tamp = headObject.getString("timestampUTC_ms") val day = yiXinTimeFormat(time_tamp) val dateall = day.split(" ") val date = dateall(0) val hour = dateall(1) prepareStatement.setString(1, date) prepareStatement.setString(2, guid) prepareStatement.setString(3, appkey) prepareStatement.setString(4, ver) prepareStatement.setString(5,hour) prepareStatement.addBatch() if (counts % 1000 == 0) { prepareStatement.executeBatch() connection.commit() } } prepareStatement.executeBatch() connection.commit() } catch { case e: Exception => { e.printStackTrace() } } finally { prepareStatement.close() connection.close() } println("counts....." + counts) jsonFile.close() } }
MySQL 地址:172.22.52.97 埠 :3306 DataBase: ntd_db 使用者名稱:ntd_user 密碼:ntd123 //易鑫原始資料 create table ntd_db.yixin_primeval_infos( ID INT primary key auto_increment, //主鍵自增 date VARCHAR(32), //日期 guid VARCHAR(256), //guid appkey VARCHAR(64), //appkey version VARCHAR(32), //版本 hour VARCHAR(256), //預留欄位1 res02 bigint //預留欄位2 ); ALTER TABLE ntd_db.yixin_primeval_infos ADD INDEX index_name (date,guid,appkey,version) create table ntd_db.yixin_20180129_infos( ID INT primary key auto_increment, date VARCHAR(32), guid VARCHAR(256), appkey VARCHAR(64), version VARCHAR(32), hour VARCHAR(256), res02 bigint ); // insert into yixin_20180129_infos(date,guid,appkey,version,hour) select date,guid,appkey,version,hour from yixin_primeval_infos where date = 20180129 and appkey ='8cce364eaf4c3e26bedc9b0e23a41859'; //nglog原始資料 create table ntd_db.ngnixlog_primeval_infos( ID INT primary key auto_increment, date VARCHAR(32), guid VARCHAR(256), appkey VARCHAR(64), version VARCHAR(32), res01 VARCHAR(256), res02 bigint ); ALTER TABLE ntd_db.ngnixlog_primeval_infos ADD INDEX index_name (date,guid,appkey,version) //20180129 資料 log create table ntd_db.ngnixlog_20180129_infos( ID INT primary key auto_increment, date VARCHAR(32), guid VARCHAR(256), appkey VARCHAR(64), version VARCHAR(32), res01 VARCHAR(256), res02 bigint ); insert into ngnixlog_20180129_infos(date,guid,appkey,version) select date,guid,appkey,version from ngixlog_primeval_infos where date = 20180129 and appkey ='8cce364eaf4c3e26bedc9b0e23a41859'; //查詢新增活躍留存 select count(case when a.guid is not null and b.guid is null then a.guid end) as new_num, count(case when a.guid is not null and b.guid is not null then a.guid end) as act_num, count(case when a.guid is not null and b.guid is not null then a.guid end) as liucun_num from (select guid from ngnixlog_20180129_infos) as a join (select guid from ngixlog_primeval_infos) as b on a.guid=b.guid //附件 select count(case when a.guid is not null and b.guid is null then a.guid end) as new_num, distinct a.guid as act_num, count(case when a.guid is not null and b.guid is not null then a.guid end) as liucun_num from (select guid from ngnixlog_20180129_infos) as a join (select guid from ngixlog_primeval_infos) as b on a.guid=b.guid; //當日活躍使用者 select distinct guid,date,appkey from yixin_primeval_infos where date = 20180129; //新增使用者 select distinct guid ,guid from ngnixlog_20180129_infos t where not exists (select * from ngixlog_primeval_infos m where t.guid=m.guid); //留存使用者 select distinct guid ,guid from ngnixlog_20180129_infos t where exists (select * from ngixlog_primeval_infos m where t.guid=m.guid); //累計使用者 select sum(distinct guid) ,guid from ngnixlog_20180129_infos t where not exists (select * from ngixlog_primeval_infos m where t.guid=m.guid);
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>jsonparse</groupId> <artifactId>jsonparse</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <scala.version>2.10.6</scala.version> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> <version>1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.4</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency> <dependency> <groupId>org.apache.any23</groupId> <artifactId>apache-any23-csvutils</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.ahocorasick</groupId> <artifactId>ahocorasick</artifactId> <version>0.4.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <id>build-exec-bundle</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>false</minimizeJar> <artifactSet> <includes> <include>com.alibaba:fastjson</include> </includes> </artifactSet> </configuration> </execution> </executions> </plugin> <plugin> <!-- see http://davidb.github.com/scala-maven-plugin --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration> <testFailureIgnore>true</testFailureIgnore> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <!-- 自定義打zip包 --> </plugins> </build> </project>
package hourdis import java.text.SimpleDateFormat import java.util.Date import com.alibaba.fastjson.JSON import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext import scala.collection.mutable /** * 每日活躍使用者時長統計 * Created by xuejiao on 2018/1/27. */ object HourAnys { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val conf = new SparkConf() conf.setMaster("local[*]") conf.setAppName("OperaUser") conf.setAppName(s"${this.getClass.getSimpleName}") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val input = "E:\\yixin\\*" val output = "E:\\guidtime\\6" val input1 = "D:\\caIJIBAO\\20180129data\\data20180129\\yixin\\*" val output1 = "E:\\yixin\\" def main(args: Array[String]): Unit = { //analyfile() houranalysis } //時間戳轉換 def timeFormat(time: String): String = { //秒 var sdf: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd HH") //天 //var sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var date: String = sdf.format(new Date((time.toLong))) // println(date) date } def timeFormat1(time: String): String = { //秒 var sdf: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd:HH:mm:ss") //天 //var sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var date: String = sdf.format(new Date((time.toLong))) // println(date) date } def abs(x:Double) ={ if (x<0) -x else x } def houranalysis: Unit = { val file = sc.textFile(input) file.filter(y => { y.contains("20180129") }).map(line => { val split = line.split(" ") // println(line) val guid = split(0) val day = split(1) val time = split(2) val stamp = split(3) val lon = split(4).substring(0,3)+"."+split(4).substring(3,split(4).length) val lat = split(5).substring(0,2)+"."+split(5).substring(2,split(5).length) val timedate = split(6) //(guid, time + ",") (guid+","+time,(lon+","+lat+","+timedate+"\n")) }).reduceByKey(_ + _).map(line =>{ val split = line._2.split("\n") line._1+","+split(0) }).sortBy(line =>{ val split = line.split(",") split(0) }) /* .map(line => { var li = "" // try { val guidhour: Array[String] = line._2.split(",") val newhourdis = guidhour(guidhour.size - 1).toDouble - guidhour(0).toDouble println(newhourdis) line._1 + "," + newhourdis // println(line._2) /* val guidstamp = line._2.split(",") println(guidstamp) val guidmean = (guidstamp(guidstamp.size-1).toDouble)/1000 -(guidstamp(0).toDouble/1000) //println(guidmean/3600) val d = guidmean/3600 val abs1 = abs(d) val guidhour2 = abs1.toString.substring(0,3) line._1+","+abs1*/ // } catch { // case e: Exception => "" // } })*/ // .collect() .repartition(1).saveAsTextFile(output) } //解析存在里程儲存資料 def analyfile(): Unit = { val file = sc.textFile(input1) print(file) file .filter(y => { y.contains("8cce364eaf4c3e26bedc9b0e23a41859") }) // .filter(y => { // y.contains("D7827B0C-E747-45C7-AE87-1E9D30B5C947") // }) .map(line => { try { var li = "" val json = JSON.parseObject(line) val header = json.getString("header") val head = JSON.parseObject(header) val appKey = head.getString("appKey") val guid = head.getString("guid") print(guid) val path: String = json.getString("path") val position = JSON.parseArray(path) val point = position.getString(0) val parsepoint = JSON.parseObject(point) val parseArray = JSON.parseArray(parsepoint.getString("position")) val strall = new mutable.StringBuilder() var lonSum = 0L var latSum = 0L var timeutc = 0L var timegps = 0L for (i <- 0 until parseArray.size()) { val datas = parseArray.getString(i) val data = JSON.parseObject(datas) val timestampUTC_ms = data.getString("timestampUTC_ms") val timestampGPS_ms = data.getString("timestampGPS_ms") val longtimgps = timestampGPS_ms.toLong val longtime = timestampUTC_ms.toLong val longitude_deg = data.getString("longitude_deg") val latitude_deg = data.getString("latitude_deg") val lon = longitude_deg.toLong val lat = latitude_deg.toLong timeutc += longtime timegps += longtimgps val timestampGPS = timeFormat(timegps.toString) val timestampGPSF: String = timeFormat1(timegps.toString) val timestampUTC = timeFormat(timeutc.toString) lonSum += lon latSum += lat println(lonSum + +latSum) // val timeday = timeFormat(timestamp) //if ((lat >= 399826400 && lon >= 823498900) && (lat <= 455500200 && lon <= 905825900)) { // val all = strall.append(guid + "," + timestampUTC + "," + timeutc + "," + timestampGPS + "," + timegps + "," + lonSum + "," + latSum + "\n") val all = strall.append(guid + " " + timestampGPS + " "+ timegps+ " " + lonSum + " " + latSum +" "+timestampGPSF+"\n") //} li = all + "" } print(li) li } catch { case e: Exception => "" } }).filter(y => { y.trim.length > 20 }) // .collect() .repartition(1).saveAsTextFile(output1) } }