大資料專案實戰之十三:13.Spark上下文構建以及模擬資料生成
阿新 • • 發佈:2018-12-02
import com.ibeifeng.sparkproject.conf.ConfigurationManager; import com.ibeifeng.sparkproject.constant.Constants; import com.ibeifeng.sparkproject.util.MockData; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; //使用者訪問session分析Spark作業 public class UserVisitSessionAnalysisSpark { public static void main(String[] args) { //構建Spark上下文 SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(Constants.SPARK_APP_NAME_SESSION); sparkConf.setMaster("local[*]"); //構建RDD入口 JavaSparkContext sc = new JavaSparkContext(sparkConf); // 如果是在本地測試環境的話,那麼就生成SQLContext物件 // 如果是在生產環境執行的話,那麼就生成HiveContext物件 SQLContext sqlContext = getSqlContext(sc.sc());//sc.sc() 從JavaSparkContext取出SparkContext // 生成模擬測試資料 mockData(sc, sqlContext); sc.close(); } /** * 獲取SQLContext * 如果是在本地測試環境的話,那麼就生成SQLContext物件 * 如果是在生產環境執行的話,那麼就生成HiveContext物件 * * @param sc SparkContext * @return SQLContext */ private static SQLContext getSqlContext(SparkContext sc) { Boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if (local) { return new SQLContext(sc); } else { return new HiveContext(sc); } } /** * 生成模擬資料(只有本地模式,才會去生成模擬資料) * 如果程式要打包傳送到叢集上去執行,一定要在配置檔案中將local改為false * * @param sc * @param sqlContext */ private static void mockData(JavaSparkContext sc, SQLContext sqlContext) { Boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if (local) { MockData.mock(sc, sqlContext); } } }
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.UUID; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; /** * 模擬資料程式 * @author Administrator * */ public class MockData { /** * 模擬資料 * @param sc * @param sqlContext */ public static void mock(JavaSparkContext sc, SQLContext sqlContext) { List<Row> rows = new ArrayList<Row>(); String[] searchKeywords = new String[] {"火鍋", "蛋糕", "重慶辣子雞", "重慶小面", "呷哺呷哺", "新辣道魚火鍋", "國貿大廈", "太古商場", "日本料理", "溫泉"}; String date = DateUtils.getTodayDate(); String[] actions = new String[]{"search", "click", "order", "pay"}; Random random = new Random(); for(int i = 0; i < 100; i++) { long userid = random.nextInt(100); for(int j = 0; j < 10; j++) { String sessionid = UUID.randomUUID().toString().replace("-", ""); String baseActionTime = date + " " + random.nextInt(23); for(int k = 0; k < random.nextInt(100); k++) { long pageid = random.nextInt(10); String actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))); String searchKeyword = null; Long clickCategoryId = null; Long clickProductId = null; String orderCategoryIds = null; String orderProductIds = null; String payCategoryIds = null; String payProductIds = null; String action = actions[random.nextInt(4)]; if("search".equals(action)) { searchKeyword = searchKeywords[random.nextInt(10)]; } else if("click".equals(action)) { clickCategoryId = Long.valueOf(String.valueOf(random.nextInt(100))); clickProductId = Long.valueOf(String.valueOf(random.nextInt(100))); } else if("order".equals(action)) { orderCategoryIds = String.valueOf(random.nextInt(100)); orderProductIds = String.valueOf(random.nextInt(100)); } else if("pay".equals(action)) { payCategoryIds = String.valueOf(random.nextInt(100)); payProductIds = String.valueOf(random.nextInt(100)); } Row row = RowFactory.create(date, userid, sessionid, pageid, actionTime, searchKeyword, clickCategoryId, clickProductId, orderCategoryIds, orderProductIds, payCategoryIds, payProductIds); rows.add(row); } } } JavaRDD<Row> rowsRDD = sc.parallelize(rows); StructType schema = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("date", DataTypes.StringType, true), DataTypes.createStructField("user_id", DataTypes.LongType, true), DataTypes.createStructField("session_id", DataTypes.StringType, true), DataTypes.createStructField("page_id", DataTypes.LongType, true), DataTypes.createStructField("action_time", DataTypes.StringType, true), DataTypes.createStructField("search_keyword", DataTypes.StringType, true), DataTypes.createStructField("click_category_id", DataTypes.LongType, true), DataTypes.createStructField("click_product_id", DataTypes.LongType, true), DataTypes.createStructField("order_category_ids", DataTypes.StringType, true), DataTypes.createStructField("order_product_ids", DataTypes.StringType, true), DataTypes.createStructField("pay_category_ids", DataTypes.StringType, true), DataTypes.createStructField("pay_product_ids", DataTypes.StringType, true))); DataFrame df = sqlContext.createDataFrame(rowsRDD, schema); df.registerTempTable("user_visit_action"); for(Row _row : df.take(1)) { System.out.println(_row); } /** * ================================================================== */ rows.clear(); String[] sexes = new String[]{"male", "female"}; for(int i = 0; i < 100; i ++) { long userid = i; String username = "user" + i; String name = "name" + i; int age = random.nextInt(60); String professional = "professional" + random.nextInt(100); String city = "city" + random.nextInt(100); String sex = sexes[random.nextInt(2)]; Row row = RowFactory.create(userid, username, name, age, professional, city, sex); rows.add(row); } rowsRDD = sc.parallelize(rows); StructType schema2 = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("user_id", DataTypes.LongType, true), DataTypes.createStructField("username", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true), DataTypes.createStructField("professional", DataTypes.StringType, true), DataTypes.createStructField("city", DataTypes.StringType, true), DataTypes.createStructField("sex", DataTypes.StringType, true))); DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2); for(Row _row : df2.take(1)) { System.out.println(_row); } df2.registerTempTable("user_info"); } }