Spark Streaming原始碼解讀之State管理之updateStateByKey和mapWithState解密
源地址:http://blog.csdn.net/snail_gesture/article/details/5151058
背景:
整個Spark Streaming是按照Batch Duractions劃分Job的。但是很多時候我們需要算過去的一天甚至一週的資料,這個時候不可避免的要進行狀態管理,而Spark
Streaming每個Batch Duractions都會產生一個Job,Job裡面都是RDD,所以此時面臨的問題就是怎麼對狀態進行維護?這個時候就需要藉助updateStateByKey和mapWithState方法完成核心的步驟。
原始碼分析:
1. 無論是updateStateByKey還是mapWithState方法在DStream中均沒有,但是是通過隱身轉換函式實現其功能。
<code class="hljs markdown has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">object DStream { // <span class="hljs-code" style="box-sizing: border-box;">`toPairDStreamFunctions`</span> was in SparkContext before 1.3 and users had to // <span class="hljs-code" style="box-sizing: border-box;">`import StreamingContext._`</span> to enable it. Now we move it here to make the compiler find // it automatically. However, we still keep the old function in StreamingContext for backward // compatibility and forward to the following function directly. implicit def toPairDStreamFunctions[<span class="hljs-link_label" style="box-sizing: border-box;">K, V</span>](<span class="hljs-link_url" style="box-sizing: border-box;">stream: DStream[(K, V</span>)]) <span class="hljs-code" style="box-sizing: border-box;"> (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):</span> <span class="hljs-code" style="box-sizing: border-box;"> PairDStreamFunctions[K, V] = {</span> <span class="hljs-code" style="box-sizing: border-box;"> new PairDStreamFunctions[K, V](stream)</span> } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li></ul>
updateStateByKey:
1. 在PairDStreamFunctions中updateStateByKey具體實現如下:
在已有的歷史基礎上,updateFunc對歷史資料進行更新。該函式的返回值是DStream型別的。
<code class="hljs fsharp has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/** * Return a <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"state"</span> DStream where the state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> each key is updated by applying * the given <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span> on the previous state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> the key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> the <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> values <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each key. * Hash partitioning is used <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> generate the RDDs <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">with</span> Spark's <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">default</span> number <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> partitions. * @param updateFunc State update <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span>. If `this` <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span> returns None, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">then</span> * corresponding state key-value pair will be eliminated. * @tparam S State <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">type</span></span> */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = ssc.withScope { <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// defaultPartitioner</span> updateStateByKey(updateFunc, defaultPartitioner()) } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li></ul>
2. defaultPartitioner:
<code class="hljs cs has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span>[streaming] def <span class="hljs-title" style="box-sizing: border-box;">defaultPartitioner</span>(numPartitions: Int = self.ssc.sc.defaultParallelism) = { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> HashPartitioner(numPartitions) } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul>
3. partitioner就是控制RDD的每個patition
<code class="hljs coffeescript has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/** * Return a <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"state"</span> DStream where the state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> each key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> updated <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">by</span> applying * the given <span class="hljs-reserved" style="box-sizing: border-box;">function</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">on</span> the previous state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> the key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> the <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> values <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> the key. * org.apache.spark.Partitioner <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> used to control the partitioning <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each RDD. * <span class="hljs-property" style="box-sizing: border-box;">@param</span> updateFunc State update <span class="hljs-reserved" style="box-sizing: border-box;">function</span>. If `<span class="javascript" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span></span>` <span class="hljs-reserved" style="box-sizing: border-box;">function</span> returns None, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">then</span> * corresponding state key-value pair will be eliminated. * <span class="hljs-property" style="box-sizing: border-box;">@param</span> partitioner Partitioner <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> controlling the partitioning <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each RDD <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> the <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> * DStream. * <span class="hljs-property" style="box-sizing: border-box;">@tparam</span> S State type */ def updateStateByKey[<span class="hljs-attribute" style="box-sizing: border-box; color: rgb(0, 136, 0);">S</span>: ClassTag]( <span class="hljs-attribute" style="box-sizing: border-box; color: rgb(0, 136, 0);">updateFunc</span>: <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(Seq[V], Option[S])</span> =></span> Option[S], <span class="hljs-attribute" style="box-sizing: border-box; color: rgb(0, 136, 0);">partitioner</span>: Partitioner ): DStream[(K, S)] = ssc.withScope { val cleanedUpdateF = sparkContext.clean(updateFunc) val <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-title" style="box-sizing: border-box;">newUpdateFunc</span> = <span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(iterator: Iterator[(K, Seq[V], Option[S])])</span> =></span> { iterator.flatMap(t<span class="hljs-function" style="box-sizing: border-box;"> =></span> cleanedUpdateF(t._2, t._3).map(s<span class="hljs-function" style="box-sizing: border-box;"> =></span> (t._1, s))) } updateStateByKey(newUpdateFunc, partitioner, <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">true</span>) } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul>
4. rememberPartitioner預設為true
<code class="hljs applescript has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/** * Return a new <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"state"</span> DStream <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">where</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> each key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> updated <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">by</span> applying * <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">given</span> function <span class="hljs-function_start" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">on</span></span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> previous state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> new values <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each key. * org.apache.spark.Partitioner <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> used <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> control <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> partitioning <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each RDD. * @param updateFunc State update function. Note, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">that</span> this function may generate a different * tuple <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">with</span> a different key than <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> input key. Therefore keys may be removed * <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">or</span> added <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> this way. It <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> up <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> developer <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> decide whether <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> * remember <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> partitioner despite <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> key being changed. * @param partitioner Partitioner <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> controlling <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> partitioning <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each RDD <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> new * DStream * @param rememberPartitioner Whether <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> remember <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> paritioner object <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> generated RDDs. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul>
5. 在StateDStream中,StorageLevel是直接儲存到磁碟,因為此時的資料非常大
<code class="hljs haskell has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">StateDStream</span>[<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">ClassTag</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">V</span>: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">ClassTag</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">ClassTag</span>]<span class="hljs-container" style="box-sizing: border-box;">( <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">parent</span>: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">DStream</span>[(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">V</span>)</span>], updateFunc: <span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Iterator</span>[(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Seq</span>[<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">V</span>], <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Option</span>[<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>])</span>]) => <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Iterator</span>[<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>)</span>], partitioner: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Partitioner</span>, preservePartitioning: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Boolean</span>, initialRDD : <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Option</span>[<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">RDD</span>[<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>)</span>]] ) extends <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">DStream</span>[<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>)</span>]<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">parent</span>.<span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">ssc</span>)</span> { super.persist<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">StorageLevel</span>.<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">MEMORY_ONLY_SER</span>)</span> </span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul>
- 在computeUsingPreiviousRDD原始碼如下:
<code class="hljs scala has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> [<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>] <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">def</span> computeUsingPreviousRDD ( parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = { <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// Define the function for the mapPartition operation on cogrouped RDD;</span> <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// first map the cogrouped tuple to tuples of required type,</span> <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// and then apply the update function</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> updateFuncLocal = updateFunc <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> i = iterator.map(t => { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> itr = t._2._2.iterator <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> headOption = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (itr.hasNext) Some(itr.next()) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> None (t._1, t._2._1.toSeq, headOption) }) updateFuncLocal(i) } <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//cogroup每次計算的時候都會遍歷prevSrateRDD中的所有parititioner的資訊</span> <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD) } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul>
所以,如果資料很多的時候不建議使用updateStateByKey。
updateStateByKey函式實現如下:
mapWithState:
1. 返回MapWithStateDStream函式,維護和更新歷史狀態都是基於Key。使用一個function對key-value形式的資料進行狀態維護。
<code class="hljs lua has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/** * :: Experimental :: * Return a <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">[[MapWithStateDStream]]</span> by applying a <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span> <span class="hljs-title" style="box-sizing: border-box;">to</span> <span class="hljs-title" style="box-sizing: border-box;">every</span> <span class="hljs-title" style="box-sizing: border-box;">key</span>-<span class="hljs-title" style="box-sizing: border-box;">value</span> <span class="hljs-title" style="box-sizing: border-box;">element</span> <span class="hljs-title" style="box-sizing: border-box;">of</span> * `<span class="hljs-title" style="box-sizing: border-box;">this</span>` <span class="hljs-title" style="box-sizing: border-box;">stream</span>, <span class="hljs-title" style="box-sizing: border-box;">while</span> <span class="hljs-title" style="box-sizing: border-box;">maintaining</span> <span class="hljs-title" style="box-sizing: border-box;">some</span> <span class="hljs-title" style="box-sizing: border-box;">state</span> <span class="hljs-title" style="box-sizing: border-box;">data</span> <span class="hljs-title" style="box-sizing: border-box;">for</span> <span class="hljs-title" style="box-sizing: border-box;">each</span> <span class="hljs-title" style="box-sizing: border-box;">unique</span> <span class="hljs-title" style="box-sizing: border-box;">key</span>. <span class="hljs-title" style="box-sizing: border-box;">The</span> <span class="hljs-title" style="box-sizing: border-box;">mapping</span> <span class="hljs-title" style="box-sizing: border-box;">function</span> * <span class="hljs-title" style="box-sizing: border-box;">and</span> <span class="hljs-title" style="box-sizing: border-box;">other</span> <span class="hljs-title" style="box-sizing: border-box;">specification</span> <span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(e.g. partitioners, timeouts, initial state data, etc.)</span></span> of this * transformation can be specified using <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">[[StateSpec]]</span> class. The state data is accessible <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> * as a parameter of <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">type</span> <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">[[State]]</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> the mapping <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span>. * * <span class="hljs-title" style="box-sizing: border-box;">Example</span> <span class="hljs-title" style="box-sizing: border-box;">of</span> <span class="hljs-title" style="box-sizing: border-box;">using</span> `<span class="hljs-title" style="box-sizing: border-box;">mapWithState</span>`: * {{{ * // <span class="hljs-title" style="box-sizing: border-box;">A</span> <span class="hljs-title" style="box-sizing: border-box;">mapping</span> <span class="hljs-title" style="box-sizing: border-box;">function</span> <span class="hljs-title" style="box-sizing: border-box;">that</span> <span class="hljs-title" style="box-sizing: border-box;">maintains</span> <span class="hljs-title" style="box-sizing: border-box;">an</span> <span class="hljs-title" style="box-sizing: border-box;">integer</span> <span class="hljs-title" style="box-sizing: border-box;">state</span> <span class="hljs-title" style="box-sizing: border-box;">and</span> <span class="hljs-title" style="box-sizing: border-box;">return</span> <span class="hljs-title" style="box-sizing: border-box;">a</span> <span class="hljs-title" style="box-sizing: border-box;">String</span> //此時的<span class="hljs-title" style="box-sizing: border-box;">state</span>就可以看成一張表,這張表記錄了狀態維護中所有的歷史狀態。 * <span class="hljs-title" style="box-sizing: border-box;">def</span> <span class="hljs-title" style="box-sizing: border-box;">mappingFunction</span><span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(key: String, value: Option[Int], state: State[Int])</span></span>: Option[String] = { * // Use state.exists(), state.get(), state.update() <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> state.remove() * // to manage state, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> the necessary <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">string</span> * } * * val spec = StateSpec.function(mappingFunction).numPartitions(<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">10</span>) * * val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec) * }}} * * @param spec Specification of this transformation * @tparam StateType Class <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">type</span> of the state data * @tparam MappedType Class <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">type</span> of the mapped data */ @Experimental def mapWithState[StateType: ClassTag, MappedType: ClassTag]( spec: StateSpec[K, V, StateType, MappedType] ): MapWithStateDStream[K, V, StateType, MappedType] = { new MapWithStateDStreamImpl[K, V, StateType, MappedType]( self, // StateSpecImpl類封裝了StateSpec操作。 spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]] ) } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li></ul>
2. MapWithStateDStream原始碼如下:
相關推薦
Spark Streaming原始碼解讀之State管理之updateStateByKey和mapWithState解密
源地址:http://blog.csdn.net/snail_gesture/article/details/5151058 背景: 整個Spark Streaming是按照Batch Duractions劃分Job的。但是很多時候我們需要算過去的一天甚
Spark Streaming原始碼解讀之Receiver在Driver的精妙實現全生命週期徹底研究和思考
在Spark Streaming中對於ReceiverInputDStream來說,都是現實一個Receiver,用來接收資料。而Receiver可以有很多個,並且執行在不同的worker節點上。這些Receiver都是由ReceiverTracker來管理的。
Spark 定製版:015~Spark Streaming原始碼解讀之No Receivers徹底思考
本講內容: a. Direct Acess b. Kafka 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們講Spark Streaming中一個非常重要的內容:State狀態管理
Spark Streaming原始碼解讀之資料清理內幕徹底解密
本篇部落格的主要目的是: 1. 理清楚Spark Streaming中資料清理的流程 組織思路如下: a) 背景 b) 如何研究Spark Streaming資料清理? c) 原始碼解析
Spark 定製版:013~Spark Streaming原始碼解讀之Driver容錯安全性
本講內容: a. ReceiverBlockTracker容錯安全性 b. DStreamGraph和JobGenerator容錯安全性 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,
第15課:Spark Streaming原始碼解讀之No Receivers徹底思考
背景: 目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義R
Spark 定製版:010~Spark Streaming原始碼解讀之流資料不斷接收全生命週期徹底研究和思考
本講內容: a. 資料接收架構設計模式 b. 資料接收原始碼徹底研究 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們給大傢俱體分析了Receiver啟動的方式及其啟動設計帶來的多個
Spark Streaming原始碼解讀之No Receivers詳解
背景: 目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義
Spark Streaming原始碼解讀之Driver中的ReceiverTracker詳解
本篇博文的目標是: Driver的ReceiverTracker接收到資料之後,下一步對資料是如何進行管理 一:ReceiverTracker的架構設計 1. Driver在Executor啟動Receiver方式,每個Receiver都封裝成一個Tas
Spark定製班第9課:Spark Streaming原始碼解讀之Receiver在Driver的精妙實現全生命週期徹底研究和思考
本期內容: 1. Receiver啟動的方式設想 2. Receiver啟動原始碼徹底分析 1. Receiver啟動的方式設想 Spark Streaming是個執行在Spark Core上的應用程式。這個應用程式既要接收資料,還要處理資料,這些都是在分散式的
Spark——Streaming原始碼解析之容錯
此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 策略 優點 缺點 (1) 熱備
Spark——Streaming原始碼解析之資料的產生與匯入
此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 資料的產生與匯入主要分為以下五個部分
Spark——Streaming原始碼解析之DAG定義
此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 1. DStream 1.1. RD
大資料學習之路107-spark streaming基於mysql歷史state統計
package com.test.sparkStreaming import java.sql.{DriverManager, PreparedStatement} import com.typesafe.config.{Config, ConfigFactory} im
Spark MLlib原始碼解讀之樸素貝葉斯分類器,NaiveBayes
Spark MLlib 樸素貝葉斯NaiveBayes 原始碼分析 基本原理介紹 首先是基本的條件概率求解的公式。 P(A|B)=P(AB)P(B) 在現實生活中,我們經常會碰到已知一個條件概率,求得兩個時間交換後的概率的問題。也就是在已知P(A
faster rcnn原始碼解讀(六)之minibatch
原始碼: # -------------------------------------------------------- # Fast R-CNN # Copyright (c) 2015 Microsoft # Licensed under The MIT Li
node總結之包管理器npm和cnpm瞭解下
npm(node package manager)是nodejs的包管理器,用於node外掛管理(包括安裝、解除安裝、管理依賴等), NPM是隨同NodeJS一起安裝的包管理工具,能解決NodeJS程式碼部署上的很多問題,它是 Node 獲得成功的重要原因之一。 但是我們平常安裝包的時候,還
Android面試系列文章2018之記憶體管理之UI卡頓篇
Android面試系列文章2018之記憶體管理之UI卡頓篇 1.UI卡頓的原理 60ftp –> 16ms: Android系統每隔16ms都會對介面進行渲染一次,造成卡頓的原因就是Android系統在渲染的時候丟幀了, 16ms = 1000/60hz,相當於60fps
Spark Streaming狀態管理函式(一)——updateStateByKey和mapWithState
updateStateByKey和mapWithState 什麼是狀態管理函式 updateStateByKey mapWithState updateStateByKey和mapWithState的區別 適用場景 什麼是狀態管理函
SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理 與分散式快取整合
在實際大資料工作中,常常有實時監測資料庫變化或實時同步資料到大資料儲存,解決大資料實時分析的需求。同時,增量同步資料庫資料相比全量查詢也減少了網路頻寬消耗。本文以Mysql的bin-log到Kafka為例,使用Canal Server,通過SODBASE引擎不用寫程式就可以設定資料同步規則。