1. 程式人生 > >大數據離線分析平臺 JavaSDK數據收集引擎編寫

大數據離線分析平臺 JavaSDK數據收集引擎編寫

url end version .com reader utf append 工程 監控

JavaSDK設計規則

JavaSDK提供兩個事件觸發方法,分別為onChargeSuccess和onChargeRefund。我們在java sdk中通過一個單獨的線程來發送線程數據,這樣可以減少對業務系統的延時性。

SDK測試

啟動集群上的hdfs+nginx+flume進程,通過模擬數據的發送然後將數據發送到nginx服務器中,查看最終是否在hdfs中有數據的寫入。

命令:

start-dfs.sh: 啟動hdfs命令

su root:切換用戶

service nginx restart: 啟動nginx進程

啟動flume進程:

進入flume安裝根目錄,執行命令:


flume-ng agent --conf ./conf/ --conf-file ./conf/test2.conf --name agent &


工程目錄結構

技術分享圖片

AnalyticsEngineSDK如下:
package com.kk.ae.sdk;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; /* * 分析引擎sdk java服務器數據收集 * */ public class AnalyticsEngineSDK { //日誌記錄對象 private static final Logger log=Logger.getGlobal();
//請求url的主體部分 public static final String accessUrl="http://hadoop-001:8090/kkImg.gif"; public static final String platformName="java_server"; public static final String sdkName="jdk"; private static final String version = "1"; /** * 觸發訂單支付成功事件,發送事件數據到服務器 * * @param orderId * 訂單支付id * @param memberIdd * 訂單支付會員id * @return 如果發送數據成功(加入到發送隊列中),那麽返回true;否則返回false(參數異常&添加到發送隊列失敗). * @throws InterruptedException */ public static boolean chargeSuccess(String orderId,String memberId) throws InterruptedException { if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) { Map<String, String> map=new HashMap<String,String>(); map.put("u_mid", memberId); map.put("oid", orderId); map.put("c_time", String.valueOf(System.currentTimeMillis())); map.put("ver", version); map.put("en", "e_cs"); map.put("p1", platformName); map.put("sdk", sdkName); //創建url String url= buildUrl(map); // 發送url&將url加入到隊列 SendDataMonitor.addSendUrl(url); System.out.println(url); return true; } else { log.log(Level.WARNING, "訂單id和會員id不能為空"); return false; } } /** * 觸發訂單退款事件,發送退款數據到服務器 * * @param orderId * 退款訂單id * @param memberIdd * 退款會員id * @return 如果發送數據成功,返回true。否則返回false。 * @throws InterruptedException */ public static boolean chargeRefund(String orderId,String memberId) throws InterruptedException { if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) { Map<String, String> map=new HashMap<String,String>(); map.put("u_mid", memberId); map.put("oid", orderId); map.put("c_time", String.valueOf(System.currentTimeMillis())); map.put("ver", version); map.put("en", "e_cr"); map.put("p1", platformName); map.put("sdk", sdkName); //創建url String url= buildUrl(map); // 發送url&將url加入到隊列 SendDataMonitor.addSendUrl(url); System.out.println(url); return true; } else { log.log(Level.WARNING, "訂單id和會員id不能為空"); return false; } } private static String buildUrl(Map<String, String> map) { StringBuffer stringBuffer=new StringBuffer(); stringBuffer.append(accessUrl).append("?"); for(Map.Entry<String, String> entry:map.entrySet()) { if (entry.getKey()!=null&&!entry.getKey().isEmpty()&&entry.getValue()!=null&&!entry.getValue().isEmpty()) { { try { stringBuffer.append(entry.getKey().trim()).append("=").append(URLEncoder.encode(entry.getValue().trim(),"utf-8")).append("&"); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } return stringBuffer.substring(0, stringBuffer.length() - 1); } }

SendDataMonitor 如下:
package com.kk.ae.sdk;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 發送url數據的監控者,用於啟動一個單獨的線程來發送數據
 * 
 * @author gerry
 *
 */
public class SendDataMonitor {
    //收集日誌
    public static final Logger log=Logger.getGlobal();
    // 隊列,用戶存儲發送url
    public static final BlockingQueue<String> queue=new LinkedBlockingQueue<String>();
    //用於單例的一個類對象
    private static SendDataMonitor monitor=null;
    
    private SendDataMonitor() {
        // 私有構造方法,進行單列模式的創建
    }
    
    
    public static SendDataMonitor getMonitor() {
        if (monitor==null) {
            synchronized (SendDataMonitor.class) {
                if (monitor==null) {
                    monitor=new SendDataMonitor();
                    Thread thread=new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                        // TODO Auto-generated method stub
                        SendDataMonitor.monitor.run();    
                        
                        }
                    });
                    thread.start();
                }    
            }
        } 
        return monitor;
    }


    protected void run() {
        while (true) {
            try {
                String url=this.queue.take();
                // 正式的發送url
                HttpRequestUtil.sendData(url);
            } catch (Throwable e) {
                log.log(Level.WARNING, "發送url異常", e);
            }    
        }
    }


    public static void setMonitor(SendDataMonitor monitor) {
        SendDataMonitor.monitor = monitor;
        
    }


    /**
     * 添加一個url到隊列中去
     * 
     * @param url
     * @throws InterruptedException
     */
    public static void addSendUrl(String url) throws InterruptedException {
         getMonitor().queue.put(url);
    
    }
    /**
     * 內部類,用戶發送數據的http工具類
     * 
     * @author gerry
     *
     */
    public static class HttpRequestUtil{
        /**
         * 具體發送url的方法
         * 
         * @param url
         * @throws IOException
         */
        public static void sendData(String url) throws IOException {
            HttpURLConnection con=null;
            BufferedReader bf=null;
            try {
                URL obj=new URL(url);
                con=(HttpURLConnection) obj.openConnection();
                // 設置連接參數
                con.setConnectTimeout(5000);//連接過期時間
                con.setReadTimeout(5000);//讀取數據過期時間
                con.setRequestMethod("GET");//設置請求類型為get
                System.out.println("發送url:" + url);
                // 發送連接請求
                bf=new BufferedReader(new InputStreamReader(con.getInputStream()));
                
            } finally {
                try {
                    if (bf!=null) {
                        bf.close();
                        
                    }
                } catch (Throwable e) {
                    // TODO: handle exception
                    
                }
                try {
                    con.disconnect();
                } catch (Throwable e) {
                    // TODO: handle exception
                }
            }
        }
    
    }

}

測試類:

package com.kk.ae.sdk;

public class Test {
    
public static void main(String[] args) {
    try {
        AnalyticsEngineSDK.chargeSuccess("order3516", "0958");
        AnalyticsEngineSDK.chargeRefund("kk3", "9009");
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}

大數據離線分析平臺 JavaSDK數據收集引擎編寫