1. 程式人生 > >java和python之間資料互動,不同語言間傳輸資料:使用RabbitMQ

java和python之間資料互動,不同語言間傳輸資料:使用RabbitMQ

java和python之間資料互動,不同語言間傳輸資料:使用RabbitMQ

問題描述

專案過程中可能會遇到,java從資料庫取了很多資料,但java本身不方便處理,所以傳遞給python去處理,如何傳?這裡我結合很多已存在的方法,然後再談談怎麼用rabbitmq來操作。
這裡會舉簡單的例子示範,當然前提你需要了解基本的java,python程式碼編寫,大致瞭解下佇列、rabbitmq是啥,還有相關的檔案操作、json等等。

方法

方法一:java直接執行python指令碼.py檔案,把資料放到引數裡傳遞

舉例說明:
java程式碼,

    @Test
	public void test()  {
        String pythonPath="/Library/Frameworks/Python.framework/Versions/3.6/bin/python3 ";
        String filePath="/Users/guang/Documents/Python_Project/Test/test/receiver.py ";

        //首先定義個list,賦值。
        List<
Integer>
list1 = new ArrayList<Integer>(); int i = 1; while(i<=10000) { list1.add(i); i++; } //呼叫python指令碼並傳遞list try { Process process = Runtime.getRuntime().exec( pythonPath + filePath + list1)
; BufferedReader in=new BufferedReader(new InputStreamReader(process.getInputStream())); String line; while ((line=in.readLine())!=null){ System.out.println(line); } in.close(); int re=process.waitFor(); System.out.println(re==1?"----狀態碼1----執行失敗":"----狀態碼0----執行成功"); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }

python程式碼,

if __name__ == '__main__':
    print("------------- start -------------")
    # print("len(sys.argv[1:])==", len(sys.argv[1:]))
    print("sys.argv[1:]==", sys.argv[1:])

執行結果:

在這裡插入圖片描述
這樣的方式有兩個問題:
1、引數傳輸過程中形式會發生變化,比如上面我實際想傳一個list過去,但通過sys.argv[1:]可以看到,拿到的資料是一個被加工過的字串形式,這樣你又要花費大量精力去切割或者其他操作來把它在python中變成一個列表;
2、你不妨試著把資料量調大10倍,從10000 -> 100000,發現會報下面的錯誤,因為傳不了那麼大的。
在這裡插入圖片描述

方法二:通過第三方檔案作為中間站

(因為遇到專案的特殊性,這裡舉的例子是把大量資料儲存到了excel中),即增加了寫入和讀取的過程。

資料準備:
為了方便測試,這裡模擬java從資料庫取出的每條資料是這麼個簡單的形式:
某地區|某年指標1|某年指標2|某年指標3| …
在這裡插入圖片描述模擬生成資料程式碼:

public class testObj {
	
	private String place;
	private List<Integer> datalist;
	
	public String getPlace() {
		return place;
	}
	public void setPlace(String place) {
		this.place = place;
	}
	public List<Integer> getDatalist() {
		return datalist;
	}
	public void setDatalist(List<Integer> datalist) {
		this.datalist = datalist;
	}
	public testObj(String place, List<Integer> datalist) {
		super();
		this.place = place;
		this.datalist = datalist;
	}

}
public class MockData {
	
	public static List<testObj> mockData() {
		List<testObj> list_obj = new ArrayList<testObj>();
		for (int i = 0; i < 50000; i++) {  //模擬5萬行,每行70列
			List<Integer> list_int = new ArrayList<Integer>();
            for(int j=1949; j<=2019; j++) {
            	list_int.add(j);
        	}
			testObj obj = new testObj("address: 中國"+i+"省,"+i+"市,"+i+"縣,"+"某水利部門", list_int);
			list_obj.add(obj);
        }
		return list_obj;
	}

}

寫入excel:

public class writeToExcel {
	
	public static void writeExcel(List<testObj> list, String path) {
        try {
            // Excel底部的表名
            String sheetn = "sheet1";
            // 用JXL向新建的檔案中新增內容
            File myFilePath = new File(path);
            if (!myFilePath.exists())
                myFilePath.createNewFile();
            OutputStream outf = new FileOutputStream(path);
            WritableWorkbook wwb = Workbook.createWorkbook(outf);
            jxl.write.WritableSheet writesheet = wwb.createSheet(sheetn, 1);
            // 內容新增
            for (int i = 0; i < list.size(); i++) {
            	testObj obj = (testObj)list.get(i);
            	Label label = new Label(0, i, obj.getPlace()); 
                writesheet.addCell(label);
            	List<Integer> numbersList = obj.getDatalist();
            	for(int j=1;j<numbersList.size();j++) {
                    writesheet.addCell(new jxl.write.Number(j,i,(Integer)numbersList.get(j)));
            	}
            }
            wwb.write();
            wwb.close();
        } catch (WriteException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
	
	public static void main(String[] args) {
		
		long startTime=System.currentTimeMillis(); //獲取開始時間
		
        //資料準備
        List<testObj> list = MockData.mockData();
        //存放路徑
        String path;
        try {
        	path = "/Users/guang/Documents/Java_Project/testdemo/src/main/java/com/test/testdemo/test.xls";
            //System.out.println(path);
            writeExcel(list, path);
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        long endTime=System.currentTimeMillis(); //獲取結束時間
        
        System.out.println("程式執行時間: "+(endTime-startTime)/1000.0+"s");
        
    }
}

在這裡插入圖片描述
excel裡已存在這麼多模擬資料,

在這裡插入圖片描述python讀excel:

import xlrd
import datetime

if __name__ == '__main__':
    starttime = datetime.datetime.now()
    workbook = xlrd.open_workbook('/Users/guang/Documents/Java_Project/testdemo/src/main/java/com/test/testdemo/test.xls')
    sheet = workbook.sheets()[0]
    endtime = datetime.datetime.now()
    print("拿到資料時間:{}".format(endtime - starttime))
    nrows = sheet.nrows
    print('資料行數:{}'.format(nrows))

![
這種方法其實是可取的,在資料量不是很大的情況下,效率和後面的rabbitmq差別並不是很大,但是資料量特別大的情況下速度可能會慢,在本例中有時候還要去考慮excel單個sheet的行列最大限制容量問題。

方法三:使用訊息佇列的方式,RabbitMQ

基本概念簡述:
MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊。

在這裡插入圖片描述
MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。MQ和JMS類似,但不同的是JMS是SUN JAVA訊息中介軟體服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。RabbitMQ是一個在AMQP基礎上完成的,可複用的企業訊息系統。

在這裡插入圖片描述
上圖是簡單抽象的描述,具體到 rabbitmq 有很多詳細的概念,這裡不一一詳述。當然,我們上面也說過 ,rabbitmq 是 AMQP 協議的一個開源實現,所以其內部實際上也是 AMQP 中的基本概念,包括:Message(訊息)、Publisher(訊息生產者),Exchange(交換器),Binding(繫結),Queue(訊息佇列),Connection(網路連線),Channel(通道),Consumer(訊息消費者),Virtual Host(虛擬主機),Broker(訊息佇列伺服器)。

使用說明:
首先,安裝rabbitmq,這個可以參考網上相關教程比較簡單,可以安裝在本地,也可以其他地方,而對於我們這裡java/python傳輸問題上,區別其實也就是程式碼中的host到底是localhost還是你的其他ip地址192.168.***.***, 因為我們是來告訴大家怎麼傳輸,所以安裝過程這裡不一一贅述,大家可以去網路上了解一下。我是單獨安裝在虛擬機器的ubuntu上的,安裝成功後,主機也可以訪問管理介面了:
在這裡插入圖片描述
當然我們這裡只是簡單的傳資料問題,也不需要用這個複雜介面管理啥,有興趣的同學可以去詳細瞭解下rabbitmq綜合的知識。
程式碼部分(資料我們依舊是用上面的模擬資料):
java程式碼(即生產者),

public class Producer1 {
	
	public final static String QUEUE_NAME="data";
	
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException{
        
    	long startTime=System.currentTimeMillis(); //獲取開始時間
		
        //資料準備
        List<testObj> list = MockData.mockData();
    	
        //建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定RabbitMQ相關資訊
        factory.setHost("192.168.43.211");
        factory.setUsername("test");
        factory.setPassword("123456");
        //factory.setPort(5672);
        //建立一個新的連線
        Connection connection = factory.newConnection();
        //建立一個通道
        Channel channel = connection.createChannel();
        //宣告一個佇列        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        List<testObj> list1 = new ArrayList<testObj>();
        list1 = new MockData().mockData();
        
        //傳送訊息到佇列中
        ObjectMapper mapper=new ObjectMapper();
        String message = mapper.writeValueAsString(list1);
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        
        //關閉通道和連線
        channel.close();
        connection.close();
        
        long endTime=System.currentTimeMillis(); //獲取結束時間  
        System.out.println("程式執行時間: "+(endTime-startTime)/1000.0+"s");
        
        
    }

}

在這裡插入圖片描述
這時候可以看到rabbitmq管理介面訊息佇列queue那裡出現了我們定義的data佇列,被java傳送上去了
在這裡插入圖片描述
python程式碼(即消費者),

if __name__ == '__main__':
    starttime = datetime.datetime.now()
    # 建立socket連結
    credentials = pika.PlainCredentials('test', '123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
     '192.168.43.211', 5672, '/', credentials))
    # 建立管道
    channel = connection.channel()
    # 建立佇列
    queue_name = 'data'
    channel.queue_declare(queue_name)
    # 如果接受到訊息就呼叫回撥函式,準備接受訊息

    # 宣告回撥函式
    def callback(ch, method, properties, body):
        message = json.loads(body.decode())
        endtime = datetime.datetime.now()
        print("拿資料時間:{}".format(endtime - starttime))
        list = message
        for i in list:
            print(i)

    channel.basic_consume(callback, queue=queue_name, no_ack=False)
    channel.start_consuming()

結果:
在這裡插入圖片描述
在這裡插入圖片描述我們可以看到python順利拿到java傳輸的資料,至於你拿到資料後,後面要做什麼複雜的操作、分析,那就是看你自己的需要了。
另外,你不妨嘗試把模擬資料條數提高到10萬條甚至更多,前兩種方法未必能得到有效的支援,但rabbitmq基本能保持穩定有效。

總結

在解決不同語言程式之間資料傳輸問題上,方法各異,rabbitmq是一個很好的選擇,處理資料量大,且從時間效率上來說也快(單從我的資料測試結果上來看整個過程速度提高3倍左右,當然其他複雜結構資料可能稍有差異)。但如果你的資料量並不巨大,其他方法也可行,根據不同情況,做最好的選擇保證代價最低一定是我們優先考慮的事兒。
最後宣告,本篇文章只是針對java向python傳輸資料的幾種方法進行分析,所以裡面用到的一些知識不面面詳解,具體可以去參考網上相關教程。另外,由於不同機器、不同系統環境下的差異是不確定性的,所以資料測試結果可能和其他環境下有所差異,純屬正常,本文僅供參考,請根據你自己的想法去合理選擇。若有更好的方法,歡迎大家一起交流共同改進!謝謝!本篇文章著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。