第十九天 -- MongoDB -- MapReduce操作Mysql、MongoDB
第十九天 – MongoDB – MapReduce操作Mysql、MongoDB
一、MongoDB
簡介
MongoDB是一個NoSQL資料庫,是一個基於分散式檔案儲存的資料庫
它的特點是高效能、易部署、易使用,儲存資料非常方便。主要功能特性有:
*面向集合儲存,易儲存物件型別的資料。
*模式自由。
*支援動態查詢。
*支援完全索引,包含內部物件。
*支援查詢。
*支援複製和故障恢復。
*使用高效的二進位制資料儲存,包括大型物件(如視訊等)。
*自動處理碎片,以支援雲端計算層次的擴充套件性。
*支援多種語言。
*檔案儲存格式為BSON(一種JSON的擴充套件)。
*可通過網路訪問
下載
可通過
安裝
下載後通過Xftp上傳至Linux,解壓即可
tar -zxvf mongodb-linux-x86_64-3.2.1.tgz
配置環境變數
vi .bash_profile 新增以下三條
MONGODB_HOME=/home/bigdata/mongodb-linux-x86_64-3.2.1
PATH=MONGODB_HOME/bin
export MONGODB_HOME
重新載入環境變數檔案 source .bash_profile
配置啟動項
-
進入mongdb的安裝目錄
cd mongodb-linux-x86_64-3.2.1/
-
建立資料夾data
mkdir data
-
建立檔案mongdb.log
touch mongdb.log
-
建立配置檔案mongo.cfg並編輯內容
vi mongo.cfg
新增以下兩條配置項
dbpath=/home/bigdata/mongodb-linux-x86_64-3.2.1/data/ logpath=/home/bigdata/mongodb-linux-x86_64-3.2.1/mongodb.log
啟動mongodb服務
mongod -f ./mongo.cfg &
啟動mongodb客戶端
mongo -host localhost -port 27017
mongodb基本操作
use hadoop; 切換資料庫,如果沒有則建立切換,但並不一定建立(當庫中有表有資料時才會真正建立)
show dbs; / show databases;
show tables; / show collections;
建立表(集合)db.hadoop.u1;
插入資料db.hadoop.u1.insert({“uname”:“zhangsan”,“age”:“16”})
db.hadoop.u1.insert({“uname”:“lisi”,“age”:“20”},{“uname”:“wangwu”,“age”:“18”})
db.hadoop.u1.find(); 查詢全部
db.hadoop.u1.find({“uname”:“lisi”});
db.hadoop.u1.update({uname:{KaTeX parse error: Expected 'EOF', got '}' at position 10: eq:"lisi"}̲},{set:{age:18}});
db.hadoop.u1.updateMany({uname:{KaTeX parse error: Expected 'EOF', got '}' at position 10: eq:"lisi"}̲},{set:{age:33}});
db.hadoop.u1.deleteOne({“uname”:“lisi”}); 刪除單個
db.hadoop.u1.deleteMany({“uname”:“lisi”}); 刪除匹配的多個
db.hadoop.u1.count() 查詢該表的行數
二、通過MapReduce操作mysql表的複製
MysqlToMysql.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import java.io.IOException;
import java.net.URISyntaxException;
public class MysqlToMysql {
/**
* map階段
* @author lyd
*
*/
public static class MyMapper extends Mapper<Object, UserInfo, UserInfo, Text>{
UserInfo ui = new UserInfo();
@Override
protected void map(Object key, UserInfo value,Context context)
throws IOException, InterruptedException {
//直接輸出
ui.setId(value.getId());
ui.setAge(value.getAge());
context.write(ui, new Text(""));
}
}
/**
* reduce階段
* @author lyd
*
*/
public static class MyReducer extends Reducer<Text, Text, UserInfo, Text>{
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
}
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
int counter = 0;
for (Text t : values) {
counter += Integer.parseInt(t.toString());
}
UserInfo ui = new UserInfo();
ui.setId(Integer.parseInt(key.toString()));
ui.setAge(Integer.parseInt(values.iterator().next().toString()));
context.write(ui, new Text(""));
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
/**
* 驅動
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
* @throws URISyntaxException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
//配置連線mysql的資訊
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop01:3306/test", "root", "root");
Job job = Job.getInstance(conf, "MysqlToMysql");
job.setJarByClass(MysqlToMysql.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(UserInfo.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(UserInfo.class);
job.setOutputValueClass(Text.class);
setArgs(job,args);
/* //設定jar包
job.addCacheFile(new URI("hdfs://qianfeng/db/mysql-connector-java-5.1.18.jar"));
*/
//設定DBInutFormat
String[] fields = {"id","age"};
DBInputFormat.setInput(job, UserInfo.class, "u1", null, null, fields);
//設定DBOutputFormat
// String[] fields1 = {"pid","counter"};
DBOutputFormat.setOutput(job, "u2", fields);
//提交作業
int issuccessed = job.waitForCompletion(true) ? 0 : 1;
//關閉job
System.exit(issuccessed);
}
/**
* 作業引數處理
* @param job
* @param args
*/
public static void setArgs(Job job , String[] args){
try {
if(args.length != 2){
System.out.println("argments size is not enough!!!");
System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
}
//設定輸入檔案路徑
//FileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(DBInputFormat.class);
//設定輸出資料目錄
//FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputFormatClass(DBOutputFormat.class);
} catch (Exception e) {
e.printStackTrace();
}
}
}
UserInfo.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
/**
* 自定義資料型別
* @author lyd
*
*/
public class UserInfo implements Writable,DBWritable{
int id;
int age;
public UserInfo(){
}
public UserInfo(int id, int age) {
this.id = id;
this.age = age;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.id);
dataOutput.writeInt(this.age);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readInt();
this.age = dataInput.readInt();
}
@Override
public void write(PreparedStatement ps) throws SQLException {
ps.setInt(1,this.id);
ps.setInt(2,this.age);
}
@Override
public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt(1);
this.age = rs.getInt(2);
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "UserInfo{" +
"id=" + id +
", age=" + age +
'}';
}
}
三、通過MapReduce操作MongoDB表的複製
由於hadoop未提供直接操作MongoDB的介面,輸入輸出類、封裝類等所以需要仿照hadoop提供的DBInputFormat等原始碼編寫MongoDB的操作類
MongoDBInputFormat.java
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.*;
import org.bson.Document;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*
* 自定義mongoDB的輸入格式類
*/
public class MongoDBInputFormat<V extends MongoDBWritable> extends InputFormat<LongWritable,V> {
/**
* 預設的空的資料型別
*/
public static class NullMongoDBWritable implements MongoDBWritable, Writable{
@Override
public void write(MongoCollection collection) throws IOException {
}
@Override
public void readFields(Document document) throws IOException {
}
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
}
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
// 獲取mongoDB的庫
MongoDatabase mongoDatabase = new MongoClient("SZ01", 27017).getDatabase("hadoop");
// 使用者mongo的連線讀取對應的表
MongoCollection mongoCollection = mongoDatabase.getCollection("persons");
// 獲取mongoCollection的總行數
long count = mongoCollection.count();
int perSplitSize = 2; // 分片大小
long splitSize = count / perSplitSize; // 分片個數
List<InputSplit> splits = new ArrayList<>();
for(int i = 0; i < splitSize; i++){
if(i + 1 == splitSize){
splits.add(new MongoDBInputSplit(i * perSplitSize, count ));
}else{
splits.add(new MongoDBInputSplit(i * perSplitSize, i * perSplitSize + perSplitSize));
}
}
return splits;
}
/**
* 用於封裝分片的資訊
*/
public static class MongoDBInputSplit extends InputSplit implements Writable{
private long start = 0; // 分片的起始位置
private long end = 0; // 分片的結束位置
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public long getEnd() {
return end;
}
public void setEnd(long end) {
this.end = end;
}
public MongoDBInputSplit(){
}
public MongoDBInputSplit(long start, long end) {
this.start = start;
this.end = end;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.start);
out.writeLong(this.end);
}
@Override
public void readFields(DataInput in) throws IOException {
this.start = in.readLong();
this.end = in.readLong();
}
@Override
public long getLength() throws IOException, InterruptedException {
return this.end - this.start;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0];
}
}
@Override
public RecordReader<LongWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new MongoDBRecordReader(split, context);
}
}
MongoDBRecordReader.java
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.bson.Document;
import java.io.IOException;
/**
* 用於封裝讀出記錄的資訊
* @param <V>
*/
public class MongoDBRecordReader<V extends MongoDBWritable> extends RecordReader<LongWritable, V> {
private Configuration conf;
private MongoDBInputFormat.MongoDBInputSplit splits; // 分片
private LongWritable key;
private V value; // value
private int index; // 索引
private MongoCursor mongoCursor; // 資料遊標
public MongoDBRecordReader(){
}
public MongoDBRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
initialize(split, context);
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.splits = (MongoDBInputFormat.MongoDBInputSplit)split;
key = new LongWritable();
// 初始化value
Class className = context.getConfiguration().getClass("mapreduce.mongoDB.input.format.class", MongoDBWritable.class);
value = (V)ReflectionUtils.newInstance(className, context.getConfiguration());
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(mongoCursor == null){
// 獲取mongoDB的庫
MongoDatabase mongoDatabase = new MongoClient("SZ01", 27017).getDatabase("hadoop");
// 使用者mongo的連線讀取對應的表
MongoCollection mongoCollection = mongoDatabase.getCollection("persons");
// 獲取Cursor
mongoCursor = mongoCollection.find().skip((int)this.splits.getStart()).limit((int)this.splits.getLength()).iterator();
}
// 保證Cursor一定不為空
boolean isNext = mongoCursor.hasNext(); // 是否有下一個元素
if(isNext){
Document document = (Document)mongoCursor.next();
// 下一個key
this.key.set(this.splits.getStart() + index);
this.index ++;
// 下一個value
this.value.readFields(document);
}
return isNext;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return this.key;
}
@Override
public V getCurrentValue() throws IOException, InterruptedException {
return this.value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
mongoCursor.close();
}
}
MongoDBOutputFormat.java
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 自定義mongoDB的輸出格式類
*/
public class MongoDBOutputFormat<V extends MongoDBWritable> extends OutputFormat<NullWritable, V> {
@Override
public RecordWriter<NullWritable, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new MongoDBRecordWriter<>(context);
}
/**
* 用於封裝寫的記錄資訊
* @param <V>
*/
public static class MongoDBRecordWriter<V extends MongoDBWritable> extends RecordWriter<NullWritable, V>{
MongoCollection mongoCollection = null;
public MongoDBRecordWriter(TaskAttemptContext context){
MongoDatabase mongoDatabase = new MongoClient("SZ01", 27017).getDatabase("hadoop");
// 使用者mongo的連線讀取對應的表
mongoCollection = mongoDatabase.getCollection("result");
}
@Override
public void write(NullWritable key, V value) throws IOException, InterruptedException {
// 寫到mongoDB
value.write(mongoCollection);
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
}
}
/**
* 檢測輸出空間
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
//return new FileOutputCommitter(null, context);
return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
}
介面MongoDBWritable.java
import com.mongodb.client.MongoCollection;
import org.bson.Document;
import java.io.IOException;
public interface MongoDBWritable {
/**
* 將資料寫入到mango中
* @param collection
* @throws Exception
*
*/
public void write(MongoCollection collection) throws IOException;
/**
* 將mongo中的資料讀出
* @param document
* @throws Exception
*
*/
public void readFields(Document document) throws IOException;
}
MyMongoDBWritable.java
import com.mongodb.client.MongoCollection;
import org.apache.hadoop.io.Writable;
import org.bson.Document;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MyMongoDBWritable implements Writable, MongoDBWritable{
private String uname;
private Integer age;
private String sex = "";
private int counter = 1;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.uname);
if(this.age == null){
out.writeBoolean(false);
}else {
out.writeBoolean(true);
out.writeInt(this.age);
}
out.writeUTF(this.sex);
out.writeInt(this.counter);
}
@Override
public void readFields(DataInput in) throws IOException {
this.uname = in.readUTF();
if(in.readBoolean()){
this.age = in.readInt();
}else{
this.age = null;
}
this.sex = in.readUTF();
this.counter = in.readInt();
}
/**
* 寫最終結果到mongo中
*/
@Override
public void write(MongoCollection collection) throws IOException {
// 獲取Object
Document document = new Document("title","count")
.append("age",this.age)
.append("counter", this.counter);
collection.insertOne(document);
}
/**
* 從mongo中讀取資料
* @param document
* @throws IOException
*/
@Override
public void readFields(Document document) throws IOException {
this.uname = document.get("name").toString();
if(document.get("age") == null){
this.age = null;
}else {
this.age = Double.valueOf(document.get("age").toString()).intValue();
}
}
public String getUname() {
return uname;
}
public void setUname(String uname) {
this.uname = uname;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getCounter() {
return counter;
}
public void setCounter(int counter) {
this.counter = counter;
}
}
測試類MongoDBTest.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MongoDBMRTest {
public static class MyMapper extends Mapper<LongWritable,MyMongoDBWritable, IntWritable,IntWritable>{
@Override
protected void map(LongWritable key, MyMongoDBWritable value, Context context) throws IOException, InterruptedException {
// 獲取value中的年齡
if(value.getAge() == null){
System.out.println("uname:"+value.getUname());
return;
}
context.write(new IntWritable(value.getAge()), new IntWritable(1));
}
}
public static class MyReducer extends Reducer<IntWritable,IntWritable, NullWritable, MyMongoDBWritable> {
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable v : values){
count += v.get();
}
MyMongoDBWritable myMongoDBWritable = new MyMongoDBWritable();
myMongoDBWritable.setAge(key.get());
myMongoDBWritable.setCounter(count);
context.write(null, myMongoDBWritable);
}
}
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
//設定conf中到的引數
conf.setClass("mapreduce.mongoDB.input.format.class",MyMongoDBWritable.class
,MongoDBWritable.class);
Job job = Job.getInstance(conf, "mongoDemo");
job.setJarByClass(MongoDBMRTest.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(MyMongoDBWritable.class);
//設定輸入輸出格式類
job.setInputFormatClass(MongoDBInputFormat.class);
job.setOutputFormatClass(MongoDBOutputFormat.class);
//提交作業
int issuccessed = job.waitForCompletion(true) ? 0 : 1;
//關閉job
System.exit(issuccessed);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}