大數據離線分析平臺 JavaSDK數據收集引擎編寫
阿新 • • 發佈:2019-04-17
url end version .com reader utf append 工程 監控
JavaSDK設計規則
JavaSDK提供兩個事件觸發方法,分別為onChargeSuccess和onChargeRefund。我們在java sdk中通過一個單獨的線程來發送線程數據,這樣可以減少對業務系統的延時性。
SDK測試
啟動集群上的hdfs+nginx+flume進程,通過模擬數據的發送然後將數據發送到nginx服務器中,查看最終是否在hdfs中有數據的寫入。
命令:
start-dfs.sh: 啟動hdfs命令
su root:切換用戶
service nginx restart: 啟動nginx進程
啟動flume進程:
進入flume安裝根目錄,執行命令:
flume-ng agent --conf ./conf/ --conf-file ./conf/test2.conf --name agent &
工程目錄結構
AnalyticsEngineSDK如下:
package com.kk.ae.sdk; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.HashMap; import java.util.Iterator; import java.util.Map;import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; /* * 分析引擎sdk java服務器數據收集 * */ public class AnalyticsEngineSDK { //日誌記錄對象 private static final Logger log=Logger.getGlobal();//請求url的主體部分 public static final String accessUrl="http://hadoop-001:8090/kkImg.gif"; public static final String platformName="java_server"; public static final String sdkName="jdk"; private static final String version = "1"; /** * 觸發訂單支付成功事件,發送事件數據到服務器 * * @param orderId * 訂單支付id * @param memberIdd * 訂單支付會員id * @return 如果發送數據成功(加入到發送隊列中),那麽返回true;否則返回false(參數異常&添加到發送隊列失敗). * @throws InterruptedException */ public static boolean chargeSuccess(String orderId,String memberId) throws InterruptedException { if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) { Map<String, String> map=new HashMap<String,String>(); map.put("u_mid", memberId); map.put("oid", orderId); map.put("c_time", String.valueOf(System.currentTimeMillis())); map.put("ver", version); map.put("en", "e_cs"); map.put("p1", platformName); map.put("sdk", sdkName); //創建url String url= buildUrl(map); // 發送url&將url加入到隊列 SendDataMonitor.addSendUrl(url); System.out.println(url); return true; } else { log.log(Level.WARNING, "訂單id和會員id不能為空"); return false; } } /** * 觸發訂單退款事件,發送退款數據到服務器 * * @param orderId * 退款訂單id * @param memberIdd * 退款會員id * @return 如果發送數據成功,返回true。否則返回false。 * @throws InterruptedException */ public static boolean chargeRefund(String orderId,String memberId) throws InterruptedException { if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) { Map<String, String> map=new HashMap<String,String>(); map.put("u_mid", memberId); map.put("oid", orderId); map.put("c_time", String.valueOf(System.currentTimeMillis())); map.put("ver", version); map.put("en", "e_cr"); map.put("p1", platformName); map.put("sdk", sdkName); //創建url String url= buildUrl(map); // 發送url&將url加入到隊列 SendDataMonitor.addSendUrl(url); System.out.println(url); return true; } else { log.log(Level.WARNING, "訂單id和會員id不能為空"); return false; } } private static String buildUrl(Map<String, String> map) { StringBuffer stringBuffer=new StringBuffer(); stringBuffer.append(accessUrl).append("?"); for(Map.Entry<String, String> entry:map.entrySet()) { if (entry.getKey()!=null&&!entry.getKey().isEmpty()&&entry.getValue()!=null&&!entry.getValue().isEmpty()) { { try { stringBuffer.append(entry.getKey().trim()).append("=").append(URLEncoder.encode(entry.getValue().trim(),"utf-8")).append("&"); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } return stringBuffer.substring(0, stringBuffer.length() - 1); } }
SendDataMonitor 如下:
package com.kk.ae.sdk; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.ProtocolException; import java.net.URL; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; /** * 發送url數據的監控者,用於啟動一個單獨的線程來發送數據 * * @author gerry * */ public class SendDataMonitor { //收集日誌 public static final Logger log=Logger.getGlobal(); // 隊列,用戶存儲發送url public static final BlockingQueue<String> queue=new LinkedBlockingQueue<String>(); //用於單例的一個類對象 private static SendDataMonitor monitor=null; private SendDataMonitor() { // 私有構造方法,進行單列模式的創建 } public static SendDataMonitor getMonitor() { if (monitor==null) { synchronized (SendDataMonitor.class) { if (monitor==null) { monitor=new SendDataMonitor(); Thread thread=new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub SendDataMonitor.monitor.run(); } }); thread.start(); } } } return monitor; } protected void run() { while (true) { try { String url=this.queue.take(); // 正式的發送url HttpRequestUtil.sendData(url); } catch (Throwable e) { log.log(Level.WARNING, "發送url異常", e); } } } public static void setMonitor(SendDataMonitor monitor) { SendDataMonitor.monitor = monitor; } /** * 添加一個url到隊列中去 * * @param url * @throws InterruptedException */ public static void addSendUrl(String url) throws InterruptedException { getMonitor().queue.put(url); } /** * 內部類,用戶發送數據的http工具類 * * @author gerry * */ public static class HttpRequestUtil{ /** * 具體發送url的方法 * * @param url * @throws IOException */ public static void sendData(String url) throws IOException { HttpURLConnection con=null; BufferedReader bf=null; try { URL obj=new URL(url); con=(HttpURLConnection) obj.openConnection(); // 設置連接參數 con.setConnectTimeout(5000);//連接過期時間 con.setReadTimeout(5000);//讀取數據過期時間 con.setRequestMethod("GET");//設置請求類型為get System.out.println("發送url:" + url); // 發送連接請求 bf=new BufferedReader(new InputStreamReader(con.getInputStream())); } finally { try { if (bf!=null) { bf.close(); } } catch (Throwable e) { // TODO: handle exception } try { con.disconnect(); } catch (Throwable e) { // TODO: handle exception } } } } }
測試類:
package com.kk.ae.sdk; public class Test { public static void main(String[] args) { try { AnalyticsEngineSDK.chargeSuccess("order3516", "0958"); AnalyticsEngineSDK.chargeRefund("kk3", "9009"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
大數據離線分析平臺 JavaSDK數據收集引擎編寫