1. 程式人生 > >輕量級OLAP(二):Hive + Elasticsearch

輕量級OLAP(二):Hive + Elasticsearch

1. 引言

在做OLAP資料分析時,常常會遇到過濾分析需求,比如:除去只有性別、常駐地標籤的使用者,計算廣告媒體上的覆蓋UV。OLAP解決方案Kylin不支援複雜資料型別(array、struct、map),要求資料輸入Schema必須是平鋪的,但是平鋪後丟失了使用者的聚合標籤資訊,而沒有辦法判斷某一個使用者是否只有性別、常駐地標籤。顯然,我們需要一種支援複雜資料型別的OLAP資料庫;底層為Lucene的Elasticsearch正在向OLAP融合,騰訊內部已經用基於Lucene的分析資料庫Hermes來做多維資料分析。

Elasticsearch(ES)在設計之初是用來做全文檢索的搜尋引擎,但隨著倒排索引所表現出來優秀的查詢效能,有越來越多人拿它

做分析資料庫使。可將ES視作文件型NoSQL資料庫,一般情況下將具有相同schema的文件(document)歸屬於一個type,所有的文件儲存於某一個index;ES與RDBMS的概念對比如下:

Relational DB ⇒ Databases ⇒ Tables ⇒ Rows ⇒ Columns
Elasticsearch ⇒ Indices ⇒ Types ⇒ Documents ⇒ Fields

2. 寫資料

廣告日誌與標籤資料均落在Hive表,並且ES官方提供與Hive的整合。因此,我們首選用Hive向ES寫資料。首先,採用ES做OLAP分析引擎,建立表如下:

add jar /path/elasticsearch-hadoop-2.3.1.jar;

create external table ad_tag (
  dvc string, 
  medias array < string >, 
  c1_arr array < string >, 
  week_time string
) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' tblproperties(
  'es.nodes' = '<ip1>:9200,<ip2>:9200', 
  'es.resource' = 'ad-{week_time}/tag', 
  'es.mapping.exclude' = 'week_time'
);

在設計Hive表結構時,ES的計算UV的distinct count(cardinality)存在著計算誤差;因此,我們按dvc對其他欄位做了聚合,UV的計算轉換成了ES doc命中數。其中,es.nodes表示ES的節點,只需配置一個節點即可;es.resource對應於ES的Index/Type;es.mapping.exclude在寫ES時不會被索引的欄位。因我們只有寫操作而沒有通過Hive查詢ES資料,因此並沒有設定es.query。Hive向ES寫資料如下:

set hive.map.aggr = false;

insert overwrite table ad_tag 
select 
  media, 
  a.dvc as dvc, 
  case when c1_arr is null then array('empty') else c1_arr end as c1_arr, 
  '2016-10-08' as week_time 
from 
  (
    select 
      dvc, 
      app_name as media 
    from 
      ad_log 
    where 
      is_exposure = '1' 
      and day_time between date_sub('2016-10-08', 6) 
      and '2016-10-08' 
    group by 
      dvc, 
      app_name
  ) a 
  left outer join (
    select 
      dvc, 
      collect_set(c1) as c1_arr 
    from 
      tag lateral view inline(tag) in_tb 
    where 
      day_time = '2016-10-08' 
    group by 
      dvc
  ) b on a.dvc = b.dvc;

在寫ES時,在構建索引時不需要分詞,通過PUT index template方式實現之:

{
  "template": "ad*",
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "string_template": {
            "mapping": {
              "include_in_all": false,
              "index": "not_analyzed",
              "type": "string",
              "index_options": "docs"
            },
            "match": "*"
          }
        }
      ]
    }
  }
}

3. 多維分析

ES官方的查詢語言是DSL,主要分為兩類:

  • Query,相當於SQL中的where部分,可套用filter、match等;
  • Aggregation,相當於SQL中的group by部分,在aggs內部也可以套用filter。

DSL可以巢狀,表達異常複雜的查詢操作;但是,若以字串拼接的方式實現DSL,則顯得可維護性太差。因此,官方提供了elasticsearch-dsl-py,可以將DSL等同於一段Python程式碼。我們的多維分析器便是基於此實現的(Python 3.5 + elasticsearch_dsl 2.1.0)

整體上曝光UV、有標籤的UV、除去常用標籤UV,以及每一個媒體上曝光UV、有標籤的UV、除去常用標籤UV的分析(相當於group by media with cube):

client = Elasticsearch(['<host1>'], port=20009, timeout=50)


def per_media(index_name):
    """count(distinct dvc) group by media with cube"""
    ms = MultiSearch(using=client, index=index_name)
    all_doc = Search()
    all_doc.aggs.bucket('per_media', 'terms', field='medias', size=1000)
    tagged = Search().query('filtered', filter=~Q('term', c1_arr='empty'))
    tagged.aggs.bucket('per_media', 'terms', field='medias', size=1000)
    useful = Search().query('filtered', filter=~Q('term', c1_arr='empty') & Q('script',
                                                                              script="""['常駐地', '性別'].intersect(doc['c1_arr'].values).size() < doc['c1_arr'].values.size()"""))
    useful.aggs.bucket('per_media', 'terms', field='medias', size=1000)
    ms = ms.add(all_doc)
    ms = ms.add(tagged)
    ms = ms.add(useful)
    responses = ms.execute()
    result_list = []
    result_dict = defaultdict(lambda: [])
    for resp in responses:  # get per media uv(all, tagged, useful_tagged)
        print("Query %d: %r." % (responses.index(resp), resp.search.to_dict()))
        result_list.append(resp.hits.total)
        for buck in resp.aggregations['per_media']['buckets']:
            result_dict[buck['key']].append(buck['doc_count'])
    for k, v in result_dict.items():  # fill up default value 0
        if len(v) < 3:
            result_dict[k] = v + [0] * (3 - len(v))
    return result_list, result_dict

媒體與標籤組合維度下的UV統計:

def per_media_c1(index_name):
    """return {(media, c1) -> tagged_uv}"""
    s = Search(using=client, index=index_name)
    tagged = s.query('filtered', filter=~Q('term', c1_arr='empty'))
    tagged.aggs.bucket('per_media', 'terms', field='medias', size=1000) \
        .bucket('per_c1', 'terms', field='c1_arr', size=100)
    result = {}
    response = tagged.execute()
    for buck in response.aggregations['per_media']['buckets']:
        key = buck['key']
        for b in buck['per_c1']['buckets']:
            result[(key, b['key'])] = b['doc_count']
    return result

相關推薦

輕量級OLAPHive + Elasticsearch

1. 引言 在做OLAP資料分析時,常常會遇到過濾分析需求,比如:除去只有性別、常駐地標籤的使用者,計算廣告媒體上的覆蓋UV。OLAP解決方案Kylin不支援複雜資料型別(array、struct、map),要求資料輸入Schema必須是平鋪的,但是平鋪後丟失了使用者的聚合標籤資訊,而沒有辦法判斷某一個使用者

輕量級OLAPCube計算

有一個數據多維分析的任務: 日誌的周UV; APP的收集量及標註量,TOP 20 APP(周UV),TOP 20 APP標註分類(周UV); 手機機型的收集量及標註量,TOP 20 機型(周UV),TOP 20 手機廠商(周UV); 初始的解決方案:Spark讀取資料日誌,然後根據分析需求逐一進行map、

ElasticSearch筆記整理CURL操作、ES插件、集群安裝與核心概念

大數據 ElasticSearch ELK [TOC] CURL操作 CURL簡介 curl是利用URL語法在命令行方式下工作的開源文件傳輸工具,使用curl可以簡單實現常見的get/post請求。簡單的認為是可以在命令行下面訪問url的一個工具。在centos的默認庫裏面是有curl工具的,如

大資料hive分桶及抽樣查詢、自定義函式、壓縮與儲存

一、分桶及抽樣查詢 1.分桶表資料儲存         分割槽針對的是資料儲存路徑(HDFS中表現出來的便是資料夾),分桶針對的是資料檔案。分割槽提供一個隔離資料和優化查詢的便利方式。不過,並非所有的資料集都可形成合理的分割槽,特別是當資料要

Elasticsearch實踐搜尋

本文以 Elasticsearch 6.2.4為例。 經過前面的基礎入門,我們對ES的基本操作也會了。現在來學習ES最強大的部分:全文檢索。 準備工作 批量匯入資料 先需要準備點資料,然後匯入: wget https://raw.githubusercontent.com/elastic/

Elasticsearch搜尋詳解請求體搜尋

上一篇文章介紹了基於 url 的搜尋,這次要講一種更高階的搜尋方法——請求體搜尋(Request Body Search),搜尋引數不是寫在 url 上,而是作為請求的資料傳送。利用 Query DSL 的語法可以組合出更加靈活的搜尋。 簡單的例子 GET /custom

Elasticsearch 通關教程 索引對映Mapping問題

資料庫建表的時候,我們的DDL語句一般都會指定每個欄位的儲存型別,例如:varchar,int,datetime等等,目的很明確,就是更精確的儲存資料,防止資料型別格式混亂。 CREATE TABLE `shop_` ( `id_` varchar(36) NOT NULL COMMENT 'id',

ElasticSearch學習總結ES介紹與架構說明

本文主要從概念以及架構層面對Elasticsearch做一個簡單的介紹,在介紹ES之前,會先對ES的“發動機”Lucene做一個簡單的介紹 1. Lucene介紹 為了更深入地理解ElasticSearch的工作原理,特別是索引和查詢這兩個過程,理解Lucene的工作原理至關重要。本

Spark實戰Kafka-SparkStreaming-Elasticsearch

本文介紹saprk實時部分----spark-streaming。spark-streaming可以實現實時批處理功能,實際上還是相當於小的批處理,但是是7*24工作,可以近實時但需要維護成本。本文裡的用java寫的demo,實現功能是將kafka作為spar

Elasticsearch全文檢索企業開發記錄總結ES客戶端搭建

專案依賴 <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport<

Elasticsearch使用JAVA API實現簡單查詢、聚合查詢

ES版本:2.3.1 JDK:1.8 所需要的jar包請在ES安裝路徑下的jars包中獲得,不要使用其他的jar否則容易出現版本問題! 注意:程式碼中TransportClient client=ESLink.getTransportClient()

Javascript面向對象編程構造函數的繼承

沒有 cal type 這一 今天 nts 實現繼承 刪除 函數綁定 今天要介紹的是,對象之間的"繼承"的五種方法。 比如,現在有一個"動物"對象的構造函數。   function Animal(){     this.species = "動物";   } 還有一個

虛擬化虛擬化及vmware workstation產品使用

應該 server esxi aof 手機 text 產品 窗體 pass 虛擬化(一):虛擬化及vmware產品介紹 vmware workstation的最新版本號是10.0.2。相信大家也都使用過,當中的簡單的虛擬機的創建。刪除等,都非常easy

CSS3動畫波浪效果

col -1 loading ack css代碼 code load width ase 實現效果 如圖所示: 首先得準備三張圖,一張是淺黃色的背景圖loading_bg.png,一張是深紅色的圖loading.png,最後一張為bolang.png。 css代碼

設計模式 工廠模式

dem blank hibernate 執行 oid code 做出 void actor 工廠模式 工廠模式(Factory Pattern)是 Java 中最常用的設計模式之一。這種類型的設計模式屬於創建型模式,它提供了一種創建對象的最佳方式。 在工廠模式中,我們在創建

iptables實用教程管理鏈和策略

否則 命令顯示 accept 目的 number cep 存在 當前 末尾 概念和原理請參考上一篇文章“iptables實用教程(一)”。 本文講解如果管理iptables中的鏈和策略。 下面的代碼格式中,下劃線表示是一個占位符,需要根據實際情況輸入參數,不帶下劃線的表示是

javascript學習筆記定義函數、調用函數、參數、返回值、局部和全局變量

兩個 cnblogs bsp 結果 value ava ase com 調用 定義函數、調用函數、參數、返回值 關鍵字function定義函數,格式如下: function 函數名(){ 函數體 } 調用函數、參數、返回值的規則和c語言規則類似。 1 <!DOC

Nginx實用教程配置文件入門

affinity type 服務 源碼編譯 設置時間 shutdown ber 可用 控制指令 Nginx配置文件結構 nginx配置文件由指令(directive)組成,指令分為兩種形式,簡單指令和區塊指令。 一條簡單指令由指令名、參數和結尾的分號(;)組成,例如:

Python和C|C++的混編利用Cython進行混編

cde uil 有時 當前 class def 將在 python 混編 還能夠使用Cython來實現混編 1 下載Cython。用python setup.py install進行安裝 2 一個實例 ① 創建helloworld文件夾創建hellowor

RabbitMQ消息隊列”Hello, World“

復雜 article ins don title apple lar github publish 本文將使用Python(pika 0.9.8)實現從Producer到Consumer傳遞數據”Hello, World“。 首先復習一下上篇所學:RabbitM