elasticsearch選擇器聚合,分組返回聚合結果
阿新 • • 發佈:2018-12-26
package tianjun.cmcc.es;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse ;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search .aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.date.InternalDateRange;
import org.elasticsearch .search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
public class Funnel {
private String[] ips = null;
private Integer[] ports = null;
private String clusterName=null;
private TransportClient client = null;
private Logger logger =Logger.getLogger(this.getClass());
public String[] getIps() {
return ips;
}
public void setIps(String[] ips) {
this.ips = ips;
}
public Integer[] getPorts() {
return ports;
}
public void setPorts(Integer[] ports) {
this.ports = ports;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public Funnel(){
}
public Funnel(String[] ips,Integer[] ports){
initClient(ips, ports,null);
}
public Funnel(String[] ips,Integer[] ports,String clusterName){
initClient(ips, ports,clusterName);
}
private void initClient(String[] ips,Integer[] ports,String clusterName) {
if(ips.length!=ports.length){
try {
throw new Exception("IPs or Ports has error!");
} catch (Exception e) {
logger.error(e.getMessage());
}
}else{
if(clusterName == null || "".equals(clusterName)){
clusterName = "my-application";
}
Settings settings =Settings.builder()
.put("cluster.name",clusterName)
.put("client.transport.sniff",true)
.build();
client = new PreBuiltTransportClient(settings);
for(int i=0;i<ips.length;i++){
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ips[i].trim()), ports[i]));
} catch (UnknownHostException e) {
logger.error(e.getMessage());
}
}
}
}
public void close(){
if(client!=null){
client.close();
}
}
/**
* 漏斗,查出例如a-b-c路徑的人數:即訪問了c同一時段也先後訪問了a和b的使用者數
* @param index 索引名
* @param type 文件名
* @param fromDate 篩選開始時間
* @param toDate 篩選結束時間
* @param pages 漏斗訪問路徑
* @return
*/
public Object getFunnelByUser(String index,String type,String fromDate,String toDate, String... pages){
if(pages.length<=0){
try {
throw new Exception("please input you path.");
} catch (Exception e) {
logger.error(e.getMessage());
}
}
Map<String,String> result_map = new ConcurrentHashMap<>();
StringBuffer script = new StringBuffer();
SearchRequestBuilder resp = client.prepareSearch(index).setTypes(type).setSize(0).setPostFilter(QueryBuilders.termsQuery("tag", pages));
DateRangeAggregationBuilder dr_aggrb = AggregationBuilders.dateRange("limit_by_time").field("local_time_string").addRange(fromDate, toDate);
TermsAggregationBuilder terms_aggrb = AggregationBuilders.terms("group_by_userId").field("user_id");
for(Integer i = 0 ; i < pages.length ; i++){
if(i==0){
result_map.put("a"+i.toString(),"tag_"+i+">min_user_time");
Integer j = i;
script.append("params."+"a"+i.toString()+"<" +"params."+"b"+(++j).toString());
FilterAggregationBuilder filter_aggrb = AggregationBuilders.filter("tag_"+i, QueryBuilders.termQuery("tag", pages[i]))
.subAggregation(AggregationBuilders.min("min_user_time").field("local_time_string"));
terms_aggrb.subAggregation(filter_aggrb);
}else if(i==pages.length-1){
result_map.put("b"+i.toString(), "tag_"+i+">max_user_time");
FilterAggregationBuilder filter_aggrb = AggregationBuilders.filter("tag_"+i, QueryBuilders.termQuery("tag", pages[i]))
.subAggregation(AggregationBuilders.max("max_user_time").field("local_time_string"));
terms_aggrb.subAggregation(filter_aggrb);
}else{
result_map.put("a"+i.toString(), "tag_"+i+">min_user_time");
result_map.put("b"+i.toString(), "tag_"+i+">max_user_time");
Integer j = i;
script.append(" && " + "params."+"a"+i.toString()+"<" +"params."+"b"+(++j).toString());
FilterAggregationBuilder filter_aggrb = AggregationBuilders.filter("tag_"+i, QueryBuilders.termQuery("tag", pages[i]))
.subAggregation(AggregationBuilders.min("min_user_time").field("local_time_string"))
.subAggregation(AggregationBuilders.max("max_user_time").field("local_time_string"));
terms_aggrb.subAggregation(filter_aggrb);
}
}
BucketSelectorPipelineAggregationBuilder pipe_selector = PipelineAggregatorBuilders.bucketSelector("result_buckets", result_map,new Script(script.toString()));
terms_aggrb.subAggregation(pipe_selector);
Long count = 0L ;
// SearchResponse response = resp.addAggregation(dr_aggrb.subAggregation(terms_aggrb.includeExclude(new IncludeExclude(1, 2)).size(100000)))
//// SearchResponse response = resp.addAggregation(dr_aggrb.subAggregation(terms_aggrb))
// .execute().actionGet();
// InternalDateRange ida = response.getAggregations().get("limit_by_time");
// Terms terms = ida.getBuckets().get(0).getAggregations().get("group_by_userId");
// count += terms.getBuckets().size();
//-----------------------------------------------------------------------------------------
resp.addAggregation(dr_aggrb.subAggregation(terms_aggrb));
for( int i =0;i<50;i++){
terms_aggrb.includeExclude(new IncludeExclude(i, 50)).size(100000);
SearchResponse response = null;
response = resp.execute().actionGet();
InternalDateRange ida = response.getAggregations().get("limit_by_time");
Terms terms = ida.getBuckets().get(0).getAggregations().get("group_by_userId");
count += terms.getBuckets().size();
}
// SearchResponse response = client.prepareSearch(index)
// .setTypes(type)
// .setSize(0)
// .setPostFilter(QueryBuilders.termsQuery("tag", "a","b","c"))
// .addAggregation(
// AggregationBuilders.dateRange("limit_by_time")
// .field("time")
// .addRange("2016-01-01", "2017-12-12")
// .subAggregation(
// AggregationBuilders
// .terms("group_by_userId")
// .field("userId")
// .subAggregation(
// AggregationBuilders.filter("tag_a", QueryBuilders.termQuery("tag", "a"))
//// .subAggregation(AggregationBuilders.max("max_user_time").field("time"))
// .subAggregation(AggregationBuilders.min("min_user_time").field("time"))
// )
// .subAggregation(
// AggregationBuilders.filter("tag_b", QueryBuilders.termQuery("tag", "b"))
// .subAggregation(AggregationBuilders.max("max_user_time").field("time"))
// .subAggregation(AggregationBuilders.min("min_user_time").field("time"))
// )
// .subAggregation(
// AggregationBuilders.filter("tag_c", QueryBuilders.termQuery("tag", "c"))
// .subAggregation(AggregationBuilders.max("max_user_time").field("time"))
//// .subAggregation(AggregationBuilders.min("min_user_time").field("time"))
// )
// .subAggregation(
// PipelineAggregatorBuilders.bucketSelector("result_buckets",
// result_map,
// new Script("params.e<params.f && params.g<params.h"))
// )
// .includeExclude(new IncludeExclude(1, 2))
// .size(10000)
// )
// )
// .execute().actionGet();
//
//
//
// InternalDateRange ida = response.getAggregations().get("limit_by_time");
// Terms terms = ida.getBuckets().get(0).getAggregations().get("group_by_userId");
// @SuppressWarnings("unchecked")
// List<Bucket> buckets =(List<Bucket>) terms.getBuckets();
// close();
return count;
}
/**
* 單頁面去重
* @param index 索引名
* @param type 文件名
* @param fromDate 篩選開始時間
* @param toDate 篩選結束時間
* @param page 單頁面
*/
public Object countDistinct(String index,String type,String fromDate,String toDate, String page){
InternalCardinality ica =null;
SearchResponse response = client.prepareSearch(index)
.setTypes(type)
.setSize(10)
.setPostFilter(QueryBuilders.termsQuery("tag", page))
.addAggregation(
AggregationBuilders.dateRange("limit_by_time")
.field("local_time_string")
.addRange(fromDate, toDate)
.subAggregation(
AggregationBuilders.cardinality("count_distinct_userId").field("user_id")
)
)
.execute().actionGet();
InternalDateRange ida = response.getAggregations().get("limit_by_time");
if(ida.getBuckets().size()>0){
ica = ida.getBuckets().get(0).getAggregations().get("count_distinct_userId");
}
if(ica!=null){
return ica.getValue();
}else{
return 0;
}
}
}
package tianjun.cmccc.es;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import tianjun.cmcc.es.Funnel;
public class FunnelTest {
// private Funnel funnel = null;
private static Funnel funnel = null;
@BeforeClass
public static void init(){
String[] ips = {"172.23.27.89","172.23.27.125","172.23.27.126"};
Integer[] ports = {9300,9300,9300};
funnel = new Funnel(ips,ports);
}
@Test
public void test1(){
// String[] ips = {"172.23.27.89","172.23.27.125","172.23.27.126"};
// Integer[] ports = {9300,9300,9300};
// Funnel funnel = new Funnel(ips,ports);
/**
* 漏斗,查出例如a-b-c路徑的人數:即訪問了c同一時段也先後訪問了a和b的使用者數
* @param index 索引名
* @param type 文件名
* @param fromDate 篩選開始時間 yyyy-MM-dd hh:mm:ss
* @param toDate 篩選結束時間 yyyy-MM-dd hh:mm:ss
* @param pages 漏斗訪問路徑
* @return
*/
Object obj = funnel.getFunnelByUser("event","t_dws_trace_event_dtl","2017-09-01","2017-09-11","Album_Pic","Album_Camera","Album_Camera_OneKeyUpload");
System.out.println(obj);
}
@Test
public void test2(){
// String[] ips = {"172.23.27.89","172.23.27.125","172.23.27.126"};
// Integer[] ports = {9300,9300,9300};
// Funnel funnel = new Funnel(ips,ports);
/**
* 漏斗,查出例如a-b-c路徑的人數:即訪問了c同一時段也先後訪問了b的使用者數
* @param index 索引名
* @param type 文件名
* @param fromDate 篩選開始時間
* @param toDate 篩選結束時間
* @param pages 漏斗訪問路徑
* @return
*/
Object obj = funnel.getFunnelByUser("event","t_dws_trace_event_dtl","2017-09-01","2017-09-01","Album_Pic","Album_Camera");
System.out.println(obj);
}
/**
* 單頁面去重
* @param index 索引名
* @param type 文件名
* @param fromDate 篩選開始時間
* @param toDate 篩選結束時間
* @param page 單頁面
*/
@Test
public void test3(){
Object obj = funnel.countDistinct("event","t_dws_trace_event_dtl","2017-09-10","2017-09-11","Album_Pic");
System.out.println(obj);
}
@AfterClass
public static void close(){
funnel.close();
}
}