Spark-數據源及機器學習算法部署
阿新 • • 發佈:2019-05-07
gist user roc 加載 機器學習 mongo country 第三方 string
1、數據源讀取
使用的時候,需要加載驅動 --jars 或者添加到classpath中 或scaddjar
Spark對Oracle數據庫讀取,代碼如下:
conf = SparkConf().setAppName(string_test)
sc = SparkContext(conf=conf)
ctx = SQLContext(sc)
sqltext = "(select dbms_lob.substr(title,500) as title,id,content,country,languages,time as publishDate,source,subject,source_url from news t where id <= 24) news"news =ctx.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:username/[email protected]//ip:port/sid") \
.option("dbtable", sql) \
.option("user", "user") \
.option("password", "password") \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.load()
news.registerTempTable("news")
Spark 對Mongo讀數據
ctx = SQLContext(sc)
mongourl = "mongodb://username:[email protected]:port"
mongoDB = "dbname"
mongoCollection = "collectionName"
mongoRows = ctx.read.format("com.mongodb.spark.sql").options(uri=mongourl,database=mongoDB, collection=mongoCollection).load()
mongoResultRdd = mongoRows.rdd
2、機器學習算法轉換
機器學習算法有兩類不能直接添加到spark中:
1) 包中含有復雜依賴關系的,如scipy、numpy等,scipy.special.beta函數在spark中不可以使用的。
2) 包不是.py結尾的,而是有第三方編譯包的,不可以添加到spark中
解決辦法:
在spark改寫的代碼中使用到上述相關的程序,闊以用subprocess調用python程序,以進行數據處理,然後得到程序返回結果。如下:
test= subprocess.getoutput("python /home/pytest.py \""+content.replace("\‘","’")+"\"")
re= test[test.index("::")+2:len(test)].replace(" ","")
Spark-數據源及機器學習算法部署