1. 程式人生 > >利用mapWithState實現按照首字母統計的有狀態的wordCount

利用mapWithState實現按照首字母統計的有狀態的wordCount

最近在做sparkstreaming整合kafka的時候遇到了一個問題:

可以抽象成這樣一個問題:有狀態的wordCount,且按照word的第一個字母為key,但是要求輸出的格式為(word,1)這樣的形式

舉例來說:

例如第一批資料為: hello how when hello

則要求輸出為:(hello,1) (how,2) (when,1) (hello,3)

第二批資料為: hello how when what hi

則要求輸出為: (hello,4) (how,5) (when,2) (what,3) (hi,6)

首先了解一下mapWithState的常規用法:

ref: https://www.jianshu.com/p/a54b142067e5

http://sharkdtu.com/posts/spark-streaming-state.html

稍微總結一下mapWithState的幾個tips:

  1. mapWithState是1.6版本之後推出的
  2. 必須設定checkpoint來儲存歷史資料
  3. mapWithState和updateStateByKey的區別 : 他們類似,都是有狀態DStream操作, 區別在於,updateStateByKey是輸出增量資料,隨著時間的增加, 輸出的資料越來越多,這樣會影響計算的效率, 對CPU和記憶體壓力較大.而mapWithState則輸出本批次資料,但是也含有狀態更新.
  4. checkpoint的資料會分散儲存在不同的分割槽中, 在進行狀態更新時, 首先會對當前 key 做 hash , 再到對應的分割槽中去更新狀態 , 這種方式大大提高了效率.

解決問題的思路:

State中儲存狀態為(String,Int) 元組型別, 其中String為word的全量, 而Int為word的計數.

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.MapWithStateDStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object MapWithStateApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("MapWithStateApp")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("C:\\Users\\hylz\\Desktop\\checkpoint")
    val lines = ssc.socketTextStream("192.168.100.11",8888)
    val words = lines.flatMap(_.split(" "))

    def mappingFunc(key: String, value: Option[(String, Int)], state: State[(String, Int)]): (String, Int) = {
      val cnt: Int = value.getOrElse((null, 0))._2 + state.getOption.getOrElse((null, 0))._2
      val allField: String = value.getOrElse((null, 0))._1
      state.update((allField, cnt))
      (allField, cnt)
    }

    val cnt: MapWithStateDStream[String, (String, Int), (String, Int), (String, Int)] = words.map(x => (x.substring(0, 1), (x, 1))).mapWithState(StateSpec.function(mappingFunc _))

    cnt.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

測試結果如下

input: hello how when hello

input: hello how when what hi

相關推薦

利用mapWithState實現按照字母統計狀態wordCount

最近在做sparkstreaming整合kafka的時候遇到了一個問題: 可以抽象成這樣一個問題:有狀態的wordCount,且按照word的第一個字母為key,但是要求輸出的格式為(word,1)這樣的形式 舉例來說: 例如第一批資料為: hello how when hello 則要求輸出為:(hello

iOS實現聯絡人按照字母進行排序

聯絡人功能的需求一般都會有按照首字母排序,並且會要求同一個姓的就要連續起來中間不能穿插別的姓,百度了一下看到UILocalizedIndexedCollation給我們提供了很方便的排序方法,它不需要將中文轉為拼音,但是有一個缺點就是如果姓氏存在多音字就無法區分(例如:姓增,它會被分配到

mysql 按照漢字的拼音排序、按照字母分類

專案中有時候需要按照漢字的拼音排序,比如聯絡人列表、礦物分類等,有的還需要按拼音字母從A到Z分類顯示。   如果儲存漢字的欄位編碼使用的是GBK字符集,因為GBK內碼編碼時本身就採用了拼音排序的方法(常用一級漢字3755個採用拼音排序,二級漢字就不是了,但考慮到人名等都是常

TreeMap Comparator按照字母排序

/** * @Modified by FANGYUKANG * @Description 查詢配件型別列表 * @return {@link JsonView} * * URL local * http://127.0.0.1:

JavaScript實現字串字母大寫

/* * 思路: * 1.將字串分割存入陣列 * 2.取出陣列中每個元素的第一個字母並大寫 * 3.取出每個元素第一個字母后的所有字母 * 4.使用‘+’連線到一起 * 5.新增到新的陣列

Mysql-按照字母查詢

開發十年,就只剩下這套架構體系了! >>>   

MySQL數據庫中實現對中文字段按照字母排序

gb2 size 查詢語句 處理 bsp 源碼編譯 情況 ets latin1 1. 在MySQL中,我們經常會對一個字段進行排序查詢,但進行中文排序和查找的時候,對漢字的排序和查找結果往往都是錯誤的。 這種情況在MySQL的很多版本中都存在。 如果這個問題不解決,那麽M

javascript 實現中文按照拼音字母排序

js提供了sort()方法來對陣列內的資料進行排序,但是隻是對英文有作用,這個時候需要自定義排序的規則 ['張三','李四','王五'].sort((a, b) => a.localeCompare(b, 'zh-Hans-CN', {sensitivity: 'accent'})) 輸出 ['李

android 實現按照城市字母(拼音)分類的應用

最近按照公司需要,寫了一個按照城市首字母排序的demo,原理就是獲取城市名稱,然後將城市名稱轉換為相應的拼音,通過對拼音的排序進而得到一個序列,實現了按照首字母分類的功能。上程式碼:獲得城市資訊,此處為假資料,大家可以自行新增自己的伺服器端資料:/* * 繫結城市資訊,此處

利用tableView分組實現省份各個城市的分組,利用字母作為索引

- (void)viewDidLoad {     [superviewDidLoad];     [self_loadData];     [self_initSubView]; } #pragma mark - 載入資料 - (void)_loadData{

關於java中實現在oracle數據庫中實現對中文字母進行排序的解決方案

capital obj create team capi substr order ring 一個 首先介紹Oracle 9i新增加的一個系統自帶的排序函數 1、按首字母排序 在oracle9i中新增了按照拼音、部首、筆畫排序功能。設置NLS_SORT值 SCHIN

簡單測試--C#實現中文漢字轉拼音字母

esp chart htm foreach ext ads linq 類庫 play 第一種: 這個是自己寫的比較簡單的實現方法,要做漢字轉拼音首字母,首先應該有一個存儲首字母的數組,然後將要轉拼音碼的漢字與每個首字母開頭的第一個漢字即“最小”的漢字作比較,這裏的最小指的是

利用shell實現判斷局域網內在線用戶那些

利用shell實現判斷局域網內在線用戶有那些#!/bin/bash while true; do for I in {100..120};do ping -c 2 -w 2 192.168.0.$I &>/dev/null if [ $? -eq 0 ];then

php按照中文字母排序

substr 工具類 rar echo from 獲取 utf turn strong 1> 網絡上很多php的工具類可以將漢字轉為拼音; 2> 將拼音進行排序即可 另一種則是類似mysql轉碼方式: 1 foreach ($array a

統計名字列表中,各名字的字母在名字列表中出現的次數

統計名字列表中各名字的首字母在名字列表name_list=[‘foster‘,"janet",‘jessus‘,‘david‘] count_dict={} for i in name_list: count_dict[i]="".join(name_list).count(i[0]) print

mysql實現字母從A-Z排序

b2c cde var urn cti 字母 mys hex desc 1.常規排序ASC DESC ASC 正序 DESC倒敘 -- 此處不用多講 2.自定義排序 自定義排序是根據自己想要的特定字符串(數字)順序進行排序。主要是使用函數 FIELD(str,st

vue2.0中實現字母大寫的過濾器

過濾器1:實現一個首字母大寫的過濾器(vue2.0中已經去除了內置的過濾器)過濾器本身就是一個函數vue2.0中實現首字母大寫的過濾器

JS實現獲取漢字字母拼音、全拼音及混拼音的方法

pla 輸入 files sta add 參考 x11 lba odi 本文實例講述了JS實現獲取漢字首字母拼音、全拼音及混拼音的方法。分享給大家供大家參考,具體如下: 這裏需要用到一個js獲取漢字拼音的插件,可點擊此處本站下載。 運行效果如下: 完整示例代碼: ?

【資訊視覺化】使用D3實現的中科院院士姓氏字母分佈視覺化

   Figure 1: The overview of the visualization. Figure 2: the distribution of the first letter in one aca

list之按照中文拼音字母排序

原文地址:https://blog.csdn.net/weixin_41751625/article/details/79735271     在實際生活中,我們經常會用到根據中文拼音的首字母進行排序。從而方便進行檢索漢字,例如一個公司的人員,按照姓名的拼音的首字母