1. 程式人生 > >資料基礎---spark中的資料型別

資料基礎---spark中的資料型別

mllib中的資料型別

本文是對官方文件的翻譯整理

1、資料型別

Local vector(本地向量)
Labeled point(帶標籤資料點)
Local matrix(本地矩陣)
Distrubuted matrix(分散式矩陣):RowMatrix、IndexedRowMatrix、CoordinateMatrix、BlockMatrix
MLlib支援儲存在單個機器上的本地的向量和矩陣,以及一個或多個RDD組成的分散式矩陣。本地向量和本地矩陣是用作公共介面的簡單資料模型,底層的線性代數運算由Breeze提供。在監督學習中的樣本被稱為帶標籤資料點。

1.1、Local vector

本地向量有兩種型別,密集型和稀疏型。密集型向量由一系列的雙精度數構成,而稀疏型向量由索引和值構成,其中索引是從0開始的整型數(不同語言中向量或陣列的索引起始值並不一樣,比如在R語言中索引是從1開始)。比如向量(1.0,0.0,3.0),它的密集型表示方法為[1.0,0.0,3.0],而稀疏型表示方法為(3,[0,2],[1.0,3.0]),其中3表示向量的維度是3維,0和2代表非0維度的索引值(稀疏本來的意思是有大量0元素,因而只要知道總共有幾個元素,哪些是非0的就可以方便的進行後續的計算,降低儲存空間和cpu的消耗),1.0和3.0是對應維度上的幅度值。
MLlib中本地向量的來源:
密集型向量:
Numpy.array():


python’s list:比如[1,2,3]
在Spark中自動將Numpy.array()和python裡的list()視為密集型向量
稀疏型向量:
MLlib’s 稀疏向量:在pyspark.mllib.linalg下的Vectors包內
scipy.sparse.csc_matrix():
在Spark中自動將以上兩種資料型別視為稀疏型向量
在Spark中np.array()比list()產生的密集型向量效率要高,而mllib自帶的Vectors也要比scipy產生的稀疏向量的計算效率要高。
下面是一個例子:

import numpy as np
import scipy.sparse as
sps from pyspark.mllib.linalg import Vectors dv1=np.array([1.0,0.0,3.0])#密集型 dv2=[1.0,0.0,3.0]#密集型 sv1=Vectors.sparse(3,[0,2],[1.0,3.0]) sv2=sps.csc_matrix((np.array([1.0,3.0]),np.array([0,2]),shape=(3,1))

1.2、Labeled point

帶標籤資料點帶有類別標籤,是本地向量資料型別,即可以是密集型也可以是稀疏型。帶標籤資料點在Mllib中是用雙精度的,即可以用來分類也可以用來回歸建模,是有監督學習方法中的資料來源。在二分類模型中,標籤是0,1;在多類別資料中,標籤從0開始遞增,如0,1,2……

from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
#建立密度型標籤資料點
pos=LabeledPoint(1.0,[1.0,0.0,3.0])
neg=LabeledPoint(0.0,[3,[0,2],[1.0,3.0]])


或者參考官網
由LabeledPoint作為元素構成的rdd如果要檢視,不能用top()這類函式,但collect()可以,當然collect()要注意資料量的問題。
稀疏資料
在平常的應用中稀疏資料很常用,mllib支援讀取一種稱為LIBSVM的資料格式,它是LIBSVMLIBLINEAR的預設格式,它是儲存在本地txt文字中的資料格式,標準結構如下:

label index1:value1 index2:value2 ……

排在最前面的是標籤資料,後面是具體資料點,每個資料點由索引和值構成,索引是從1開始升序排列的,在mllib中將LIBSVM資料讀入之後,索引會自動調整為從0開始。由於有索引,我們只需要儲存非0資料(還沒實踐驗證),這大概就是這類資料被稱為稀疏資料的原因。可以在網上下到這樣的檔案,sample_libsvm_data.txt,我擷取開頭的4行資料如下:

0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:252 189:57 190:6 208:10 209:60 210:224 211:252 212:253 213:252 214:202 215:84 216:252 217:253 218:122 236:163 237:252 238:252 239:252 240:253 241:252 242:252 243:96 244:189 245:253 246:167 263:51 264:238 265:253 266:253 267:190 268:114 269:253 270:228 271:47 272:79 273:255 274:168 290:48 291:238 292:252 293:252 294:179 295:12 296:75 297:121 298:21 301:253 302:243 303:50 317:38 318:165 319:253 320:233 321:208 322:84 329:253 330:252 331:165 344:7 345:178 346:252 347:240 348:71 349:19 350:28 357:253 358:252 359:195 372:57 373:252 374:252 375:63 385:253 386:252 387:195 400:198 401:253 402:190 413:255 414:253 415:196 427:76 428:246 429:252 430:112 441:253 442:252 443:148 455:85 456:252 457:230 458:25 467:7 468:135 469:253 470:186 471:12 483:85 484:252 485:223 494:7 495:131 496:252 497:225 498:71 511:85 512:252 513:145 521:48 522:165 523:252 524:173 539:86 540:253 541:225 548:114 549:238 550:253 551:162 567:85 568:252 569:249 570:146 571:48 572:29 573:85 574:178 575:225 576:253 577:223 578:167 579:56 595:85 596:252 597:252 598:252 599:229 600:215 601:252 602:252 603:252 604:196 605:130 623:28 624:199 625:252 626:252 627:253 628:252 629:252 630:233 631:145 652:25 653:128 654:252 655:253 656:252 657:141 658:37
1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 189:253 190:62 214:127 215:251 216:251 217:253 218:62 241:68 242:236 243:251 244:211 245:31 246:8 268:60 269:228 270:251 271:251 272:94 296:155 297:253 298:253 299:189 323:20 324:253 325:251 326:235 327:66 350:32 351:205 352:253 353:251 354:126 378:104 379:251 380:253 381:184 382:15 405:80 406:240 407:251 408:193 409:23 432:32 433:253 434:253 435:253 436:159 460:151 461:251 462:251 463:251 464:39 487:48 488:221 489:251 490:251 491:172 515:234 516:251 517:251 518:196 519:12 543:253 544:251 545:251 546:89 570:159 571:255 572:253 573:253 574:31 597:48 598:228 599:253 600:247 601:140 602:8 625:64 626:251 627:253 628:220 653:64 654:251 655:253 656:220 681:24 682:193 683:253 684:220
1 125:145 126:255 127:211 128:31 152:32 153:237 154:253 155:252 156:71 180:11 181:175 182:253 183:252 184:71 209:144 210:253 211:252 212:71 236:16 237:191 238:253 239:252 240:71 264:26 265:221 266:253 267:252 268:124 269:31 293:125 294:253 295:252 296:252 297:108 322:253 323:252 324:252 325:108 350:255 351:253 352:253 353:108 378:253 379:252 380:252 381:108 406:253 407:252 408:252 409:108 434:253 435:252 436:252 437:108 462:255 463:253 464:253 465:170 490:253 491:252 492:252 493:252 494:42 518:149 519:252 520:252 521:252 522:144 546:109 547:252 548:252 549:252 550:144 575:218 576:253 577:253 578:255 579:35 603:175 604:252 605:252 606:253 607:35 631:73 632:252 633:252 634:253 635:35 659:31 660:211 661:252 662:253 663:35
1 153:5 154:63 155:197 181:20 182:254 183:230 184:24 209:20 210:254 211:254 212:48 237:20 238:254 239:255 240:48 265:20 266:254 267:254 268:57 293:20 294:254 295:254 296:108 321:16 322:239 323:254 324:143 350:178 351:254 352:143 378:178 379:254 380:143 406:178 407:254 408:162 434:178 435:254 436:240 462:113 463:254 464:240 490:83 491:254 492:245 493:31 518:79 519:254 520:246 521:38 547:214 548:254 549:150 575:144 576:241 577:8 603:144 604:240 605:2 631:144 632:254 633:82 659:230 660:247 661:40 687:168 688:209 689:31

我們可以把資料載入到本地節點的記憶體中。

from pyspark.mllib.utils import MLUtils
examples=MLUtils.loadLibSVMFile(sc,data/mllib/sample_libsvm_data.txt)

1.3、Local Matrix

本地矩陣也是儲存在本地機器上的資料格式。像其他場合的矩陣一樣,都有行、列索引和對應的元素值,索引是整型、值是以雙精度型。mllib中的矩陣都是按列排列的(比如將二維的矩陣用一維的陣列來表示的時候,第一列的元素放在最前面,第二列的元素次之,而不是第一行的元素排在最前面然後第二行)。mllib中的矩陣也有密集型和稀疏型,密集型矩陣的表示由行數、列數、其對應元素值按列依次排列的一個數組組成,稀疏型矩陣採用Compressed Sparse Column(CSC)格式,關於稀疏型資料的表示方法可參考。CSC格式資料是將列索引進行壓縮的格式,表示方法:行數、列數、列索引的偏移值和非0元素個數組成的陣列、行索引組成的陣列、非0元素按列排序組成的陣列。其他的都好理解,關鍵是利用列索引的偏移進行列索引壓縮,下面先看一個例子:

from pyspark.mllib.linalg import Matrix, Matrices
#建立一個密集型矩陣[[1.0 4.0),[2.0 5.0],[3.0 6.0]],注意這裡是按傳統的以行為一組來表示的,以mllib中不同
dm2=Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
print(dm2)
#建立一個稀疏型矩陣[[9.0 0.0),[0.0 8.0],[0.0 6.0]]
sm2=Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [9, 8, 6])
print(sm2)

我們可以分析一下上例中稀疏型矩陣的列索引偏移值和非0元素個數[0,1,3],0代表第1列裡第1個非0元素在總的非0元素中的相對索引值,1代表第二列元素第1個非0元素在總的非0元素裡的相對索引值,1-0=1說明第1列的元素個數為1,這樣我們結合行索引就可以把第1列的全部元素定位出來;同樣的方法可以確定其他列的元素位置;3表示所有非0元素的個數,減掉倒數第二個偏移量這裡是1,恰恰就是最後1列的元素個數,同樣結合對應的行索引就可以定位最後一列元素的位置。我們也可以虛擬出一列,把3當成第4列第1個元素的相對索引值,這樣就可以把各列元素個數的計算統一起來。總結起來就是,通過各列第1個非0元素在總非0元素中的相對索引值可以確定各列的元素個數,結合行索引就可完全確定所有元素的位置。從這裡也可以看到,CSC是一種無失真壓縮的格式。

1.4 分散式矩陣

分散式矩陣由長整型的行列索引和雙精度的值構成,分散式矩陣儲存在1個或多個RDD中。到目前為止,在mllib中總共有4種分散式矩陣。需要注意的是,從一種分散式矩陣變換成另一種分散式矩陣可能需要資料的全域性移動(在不同節點的機器間移動),因而可能需要極大的記憶體、cpu和網路通訊開銷。
分佈矩陣所依賴的RDD必須是確定的,因為我們需要快取矩陣的大小(皮之不存,毛將焉覆)。
4種分散式矩陣分別是RowMatrix、IndexRowMatrix、coordinateMatrix、BlockMatrix。RowMatrix是最基本的分散式矩陣,它是基於行的,就是一行是完整的,完整的行儲存在本地節點上,不同節點儲存不同的行。而操作這種資料矩陣通常都是對每行都進行同樣的操作,也就是說它是矩陣的哪一行並不重要,因而這種資料矩陣的一個特點就是行索引無意義。特徵向量的集合就是這樣的一類資料矩陣。一般情況下,RowMatrix的列數都不會太大,也就是單獨的一行不會太長,這樣便於在單節點儲存和操作,也便於將某一行的資料傳輸到驅動器中去。IndexRowMatrix是在RowMatrix的基礎上增加了行索引,這樣可以定位到矩陣的某一行,否則就無法進行RDD的join操作。CoordinateMatrix是用COO格式來儲存的,關於COO參考。BlockMatrix具有元組的結構(Int,Int,Matrix)。
RowMatrix:
RowMatrix可以通過以向量形式儲存的RDD來建立。

from pyspark.mllib.linalg.distributed import RowMatrix
#首先建立一個以向量形式儲存的RDD.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
#在以向量形式儲存的RDD基礎上建立RowMatrix
mat = RowMatrix(rows)
#檢視矩陣的行數和列數
m = mat.numRows()  # 4
n = mat.numCols()  # 3
#以另一種方式來檢視矩陣的行數
rowsRDD = mat.rows
print(m)
print(n)
print(rowsRDD.collect())

輸出結果如下:

4
3
[DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0]), DenseVector([7.0, 8.0, 9.0]), DenseVector([10.0, 11.0, 12.0])]

IndexedRowMatrix
IndexedRowMatrix與RowMatrix很像,只不過有行索引。每一行都由長整型的索引和一個本地向量構成。
IndexedRowMatrix可以由IndexedRows構成的RDD轉換成來,IndexedRow由(long,vector)形式的元組組成。IndexRowMatrix轉化為RowMatrix.

from pyspark import SparkContext
from pyspark.sql import SparkSession
#import json
sc=SparkContext()
SparkSession(sc)
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
#先建立一個由IndexedRow構成的RDD.
#可以通過IndexedRow這個類來建立
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),IndexedRow(1, [4, 5, 6]),IndexedRow(2, [7, 8, 9]),IndexedRow(3, [10, 11, 12])])
#也可以直接用元組來建立
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),(2, [7, 8, 9]), (3, [10, 11, 12])])
#由IndexedRows構成的RDD來建立IndexedRowMatrix
mat = IndexedRowMatrix(indexedRows)
#測試一下該矩陣的行、列數
m=mat.numRows()
n=mat.numCols()
rowsRDD=mat.rows
print(m)
print(n)
print(rowsRDD.collect())
#將IndexedRowMatrix型別資料轉換成RowMatrix
rowMat=mat.toRowMatrix()

執行結果如下:

4
3
[IndexedRow(0, [1.0,2.0,3.0]), IndexedRow(1, [4.0,5.0,6.0]), IndexedRow(2, [7.0,8.0,9.0]), IndexedRow(3, [10.0,11.0,12.0])]

CoordinateMatrix
CoordinateMatrix由行、列索引和對應值構成,可以通過MatrixEntry來建立,傳入MatrixEntry的引數形式為(long,long,float),即矩陣元素的行、列索和值。CoordinateMatrix可以通過函式toRowMatrix()轉換為RowMatrix,通過toIndexRowMatrix()轉換為IndexRowMatrix,能過toBlockMatrix()轉換為BlockMatrix.

from pyspark import SparkContext
from pyspark.sql import SparkSession
#import json
sc=SparkContext()
SparkSession(sc)
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
# 通過MatrixEntry類來建立一個由MatrixEntry元素構成的RDD
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6, 1, 3.7)])
#也可以通過形如(long, long, float)的元組來建立
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])
#通過由MatrixEntries構成的RDD來建立CoordinateMatrix
mat = CoordinateMatrix(entries)
#檢視矩陣的行列數
m = mat.numRows()# 3
n = mat.numCols()# 2
print(m)
print(n)
#建立CoordinateMatrix後,後續也可以直接獲取其非0元素
entriesRDD = mat.entries
print(entriesRDD.collect())
# 轉換成RowMatrix.
rowMat = mat.toRowMatrix()
#轉換成IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
#轉換成BlockMatrix.
blockMat = mat.toBlockMatrix()

執行結果如下:

3
2
[MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)]

BlockMatrix
BlockMatrix,我們可以把這類矩陣叫做由矩陣塊組成的矩陣,可以由MatrixBlock來建立,MatrixBlock的形式為((Int,Int),Matrix),式中(Int,Int)代表子矩陣在大矩陣中的位置,而子矩陣Matrix本身本身又是rowPerBlock行,colPerBlock列的矩陣。BlockMatrix支援加法和乘法這樣的操作,還可以用validate()函式去驗證BlockMatrix的設定是否正確。

from pyspark import SparkContext
from pyspark.sql import SparkSession
#import json
sc=SparkContext()
SparkSession(sc)
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
#建立由子矩陣組的RDD
#blocks=sc.parallelize([((0,0),Matrices.dense[1.0,2.0,3.0,4.0,5.0,6.0])),((0.1),Matrices.dense[7, 8, 9, 10, 11, 12]))])
blocks=sc.parallelize([((0,0),Matrices.dense(3,2,[1.0,2.0,3.0,4.0,5.0,6.0])),((0,1),Matrices.dense(3,2,[7, 8, 9, 10, 11, 12]))])
mat=BlockMatrix(blocks,3,2)#從這裡可以看到每個子矩陣的行列數應該一樣,而且傳到BlockMatrix中的引數,也就是子矩陣的行列數
m=mat.numRows()
n=mat.numCols()
print(m)
print(n)
blocksRDD=mat.blocks
print(blocksRDD.collect())
#轉化為其他矩陣
localMat=mat.toLocalMatrix()
indexedRowMat=mat.toIndexedRowMatrix()
coordinateMat=mat.toCoordinateMatrix()

結果如下:

3
4
[((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0)), ((0, 1), DenseMatrix(3, 2, [7.0, 8.0, 9.0, 10.0, 11.0, 12.0], 0))]

2、Spark 整合演算法的資料格式及評估方法

不全不準,先放這裡,以後再慢慢加

類別 演算法名稱 資料格式 評估方法
聚類 Kmeans RDD(Local Vector)
分類 DecisionTree(決策樹) RDD(LabelPoint) ACU,F-measure,ROC
分類 RadomForest(隨機森林) RDD(Local Vector) ACU,F-measure,ROC
分類 LogisticRegression(邏輯迴歸 RDD(Local Vector) ACU,F-measure,ROC
分類 NaiveBayes(樸素貝葉斯) RDD(Local Vector) ACU,F-measure,ROC
分類 SVM(支援向量機) RDD(Local Vector) ACU,F-measure,ROC
迴歸 LinearRegression線性迴歸 RDD(Local Vector) 自定義
迴歸 Lasso RDD(Local Vector) 自定義
迴歸 RidgeRegression嶺迴歸 RDD(Local Vector) 自定義
推薦 ALS RDD[rating]

ml中的資料型別

資料型別

pyspark.ml.linalg.Vectors

向量又分為:密集型和稀疏型

>>> Vectors.dense([1, 2, 3])
DenseVector([1.0, 2.0, 3.0])
>>> Vectors.dense(1.0, 2.0)
DenseVector([1.0, 2.0])
>>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
SparseVector(4, {1: 1.0, 3: 5.5})

pyspark.ml.linalg.Matrices

也分為密集型和稀疏型
static dense(numRows, numCols, values)
static sparse(numRows, numCols, colPtrs, rowIndices, values)[

DataFrame-based API

官網:

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

建立DataFrames 的入口SparkSession

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.

#spark中建立dataframe的例子
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

libsvm資料格式

首先介紹一下 libSVM的資料格式,libsvm資料格式可以直接儲存成txt檔案,並利用libsvm介面從txt檔案中讀取。

Label 1:value 2:value

Label:是類別的標識,比如上節train.model中提到的1 -1,你可以自己隨意定,比如-10,0,15。當然,如果是迴歸,這是目標值,就要實事求是了。
Value:就是要訓練的資料,從分類的角度來說就是特徵值,資料之間用空格隔開
比如:

-15 1:0.708 2:1056 3:-0.3333

需要注意的是,如果特徵值為0,則這列資料可以不寫,因此特徵冒號前面的(姑且稱做序號)可以不連續。如:

-15 1:0.708 3:-0.3333

表明第2個特徵值為0,從程式設計的角度來說,這樣做可以減少記憶體的使用,並提高做矩陣內積時的運算速度。我們平時在matlab中產生的資料都是沒有序號的常規矩陣,所以為了方便最好編一個程式進行轉化。

spark提供了方便的工具類來載入這些資料

libsvm格式中,索引是從1開始的,但spark load svm的時候會將它改為從0開始,即將檔案中所有的索引ID-1.
更詳細的資料可參考:
LIBSVM
sample_linear_regression_data.txt