java操作spark讀寫mongodb
阿新 • • 發佈:2019-01-02
首先要引入mongodb-spark-connector的maven依賴,具體的可見這個api網址:https://docs.mongodb.com/spark-connector/current/java-api/,然後基本上就可以按照api上面的內容來進行spark操作了。這裡面已經有spark讀入mongodb資料轉化為rdd的操作了。
有一些補充的或許有用(?)的程式碼,放在這裡。
。import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoDatabase; import com.mongodb.spark.MongoConnector; import com.mongodb.spark.MongoSpark; import com.mongodb.spark.config.ReadConfig; import com.mongodb.spark.config.WriteConfig; import com.mongodb.spark.rdd.api.java.JavaMongoRDD; import com.mongodb.spark.sql.helpers.StructFields; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.bson.Document; import org.bson.types.ObjectId; import java.util.HashMap; import java.util.List; import java.util.Map; import static java.lang.String.format; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; public final class JavaIntroduction { /** * Run this main method to see the output of this quick example. * * @param args takes an optional single argument for the connection string * @throws InterruptedException if a latch is interrupted */ public static void main(final String[] args) throws InterruptedException { JavaSparkContext jsc = createJavaSparkContext(args); // Create a RDD JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map (new Function<Integer, Document>() { @Override public Document call(final Integer i) throws Exception { return Document.parse("{test: " + i + "}"); } }); // Saving data from an RDD to MongoDB MongoSpark.save(documents); // Saving data with a custom WriteConfig Map<String, String> writeOverrides = new HashMap<String, String>(); writeOverrides.put("collection", "spark"); writeOverrides.put("writeConcern.w", "majority"); WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides); JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map (new Function<Integer, Document>() { @Override public Document call(final Integer i) throws Exception { return Document.parse("{spark: " + i + "}"); } }); // Saving data from an RDD to MongoDB MongoSpark.save(sparkDocuments, writeConfig); // Loading and analyzing data from MongoDB JavaMongoRDD<Document> rdd = MongoSpark.load(jsc); System.out.println(rdd.count()); System.out.println(rdd.first().toJson()); // Loading data with a custom ReadConfig Map<String, String> readOverrides = new HashMap<String, String>(); readOverrides.put("collection", "spark"); readOverrides.put("readPreference.name", "secondaryPreferred"); ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides); JavaMongoRDD<Document> customRdd = MongoSpark.load(jsc, readConfig); System.out.println(customRdd.count()); System.out.println(customRdd.first().toJson()); // Filtering an rdd using an aggregation pipeline before passing data to Spark JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(singletonList(Document.parse("{ $match: { test : { $gt : 5 } } }"))); System.out.println(aggregatedRdd.count()); System.out.println(aggregatedRdd.first().toJson()); // Datasets // Drop database dropDatabase(getMongoClientURI(args)); // Add Sample Data List<String> characters = asList( "{'name': 'Bilbo Baggins', 'age': 50}", "{'name': 'Gandalf', 'age': 1000}", "{'name': 'Thorin', 'age': 195}", "{'name': 'Balin', 'age': 178}", "{'name': 'K鉚li', 'age': 77}", "{'name': 'Dwalin', 'age': 169}", "{'name': '脫in', 'age': 167}", "{'name': 'Gl貿in', 'age': 158}", "{'name': 'F鉚li', 'age': 82}", "{'name': 'Bombur'}" ); MongoSpark.save(jsc.parallelize(characters).map(new Function<String, Document>() { @Override public Document call(final String json) throws Exception { return Document.parse(json); } })); // Load inferring schema Dataset<Row> df = MongoSpark.load(jsc).toDF(); df.printSchema(); df.show(); // Declare the Schema via a Java Bean SparkSession sparkSession = SparkSession.builder().getOrCreate(); Dataset<Row> explicitDF = MongoSpark.load(jsc).toDF(Character.class); explicitDF.printSchema(); // SQL explicitDF.registerTempTable("characters"); Dataset<Row> centenarians = sparkSession.sql("SELECT name, age FROM characters WHERE age >= 100"); // Saving DataFrame MongoSpark.write(centenarians).option("collection", "hundredClub").save(); MongoSpark.load(sparkSession, ReadConfig.create(sparkSession).withOption("collection", "hundredClub"), Character.class).show(); // Drop database MongoConnector.apply(jsc.sc()).withDatabaseDo(ReadConfig.create(sparkSession), new Function<MongoDatabase, Void>() { @Override public Void call(final MongoDatabase db) throws Exception { db.drop(); return null; } }); String objectId = "123400000000000000000000"; List<Document> docs = asList( new Document("_id", new ObjectId(objectId)).append("a", 1), new Document("_id", new ObjectId()).append("a", 2)); MongoSpark.save(jsc.parallelize(docs)); // Set the schema using the ObjectId helper StructType schema = DataTypes.createStructType(asList( StructFields.objectId("_id", false), DataTypes.createStructField("a", DataTypes.IntegerType, false))); // Create a dataframe with the helper functions registered df = MongoSpark.read(sparkSession).schema(schema).option("registerSQLHelperFunctions", "true").load(); // Query using the ObjectId string df.filter(format("_id = ObjectId('%s')", objectId)).show(); } private static JavaSparkContext createJavaSparkContext(final String[] args) { String uri = getMongoClientURI(args); dropDatabase(uri); SparkConf conf = new SparkConf() .setMaster("local") .setAppName("MongoSparkConnectorTour") .set("spark.app.id", "MongoSparkConnectorTour") .set("spark.mongodb.input.uri", uri) .set("spark.mongodb.output.uri", uri); return new JavaSparkContext(conf); } private static String getMongoClientURI(final String[] args) { String uri; if (args.length == 0) { uri = "mongodb://localhost/test.coll"; // default } else { uri = args[0]; } return uri; } private static void dropDatabase(final String connectionString) { MongoClientURI uri = new MongoClientURI(connectionString); new MongoClient(uri).dropDatabase(uri.getDatabase()); } }