java和python之間資料互動,不同語言間傳輸資料:使用RabbitMQ
java和python之間資料互動,不同語言間傳輸資料:使用RabbitMQ
問題描述
專案過程中可能會遇到,java從資料庫取了很多資料,但java本身不方便處理,所以傳遞給python去處理,如何傳?這裡我結合很多已存在的方法,然後再談談怎麼用rabbitmq來操作。
這裡會舉簡單的例子示範,當然前提你需要了解基本的java,python程式碼編寫,大致瞭解下佇列、rabbitmq是啥,還有相關的檔案操作、json等等。
方法
方法一:java直接執行python指令碼.py檔案,把資料放到引數裡傳遞
舉例說明:
java程式碼,
@Test
public void test() {
String pythonPath="/Library/Frameworks/Python.framework/Versions/3.6/bin/python3 ";
String filePath="/Users/guang/Documents/Python_Project/Test/test/receiver.py ";
//首先定義個list,賦值。
List< Integer> list1 = new ArrayList<Integer>();
int i = 1;
while(i<=10000) {
list1.add(i);
i++;
}
//呼叫python指令碼並傳遞list
try {
Process process = Runtime.getRuntime().exec(
pythonPath + filePath + list1) ;
BufferedReader in=new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line=in.readLine())!=null){
System.out.println(line);
}
in.close();
int re=process.waitFor();
System.out.println(re==1?"----狀態碼1----執行失敗":"----狀態碼0----執行成功");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
python程式碼,
if __name__ == '__main__':
print("------------- start -------------")
# print("len(sys.argv[1:])==", len(sys.argv[1:]))
print("sys.argv[1:]==", sys.argv[1:])
執行結果:
這樣的方式有兩個問題:
1、引數傳輸過程中形式會發生變化,比如上面我實際想傳一個list過去,但通過sys.argv[1:]可以看到,拿到的資料是一個被加工過的字串形式,這樣你又要花費大量精力去切割或者其他操作來把它在python中變成一個列表;
2、你不妨試著把資料量調大10倍,從10000 -> 100000,發現會報下面的錯誤,因為傳不了那麼大的。
方法二:通過第三方檔案作為中間站
(因為遇到專案的特殊性,這裡舉的例子是把大量資料儲存到了excel中),即增加了寫入和讀取的過程。
資料準備:
為了方便測試,這裡模擬java從資料庫取出的每條資料是這麼個簡單的形式:
某地區|某年指標1|某年指標2|某年指標3| …
模擬生成資料程式碼:
public class testObj {
private String place;
private List<Integer> datalist;
public String getPlace() {
return place;
}
public void setPlace(String place) {
this.place = place;
}
public List<Integer> getDatalist() {
return datalist;
}
public void setDatalist(List<Integer> datalist) {
this.datalist = datalist;
}
public testObj(String place, List<Integer> datalist) {
super();
this.place = place;
this.datalist = datalist;
}
}
public class MockData {
public static List<testObj> mockData() {
List<testObj> list_obj = new ArrayList<testObj>();
for (int i = 0; i < 50000; i++) { //模擬5萬行,每行70列
List<Integer> list_int = new ArrayList<Integer>();
for(int j=1949; j<=2019; j++) {
list_int.add(j);
}
testObj obj = new testObj("address: 中國"+i+"省,"+i+"市,"+i+"縣,"+"某水利部門", list_int);
list_obj.add(obj);
}
return list_obj;
}
}
寫入excel:
public class writeToExcel {
public static void writeExcel(List<testObj> list, String path) {
try {
// Excel底部的表名
String sheetn = "sheet1";
// 用JXL向新建的檔案中新增內容
File myFilePath = new File(path);
if (!myFilePath.exists())
myFilePath.createNewFile();
OutputStream outf = new FileOutputStream(path);
WritableWorkbook wwb = Workbook.createWorkbook(outf);
jxl.write.WritableSheet writesheet = wwb.createSheet(sheetn, 1);
// 內容新增
for (int i = 0; i < list.size(); i++) {
testObj obj = (testObj)list.get(i);
Label label = new Label(0, i, obj.getPlace());
writesheet.addCell(label);
List<Integer> numbersList = obj.getDatalist();
for(int j=1;j<numbersList.size();j++) {
writesheet.addCell(new jxl.write.Number(j,i,(Integer)numbersList.get(j)));
}
}
wwb.write();
wwb.close();
} catch (WriteException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
long startTime=System.currentTimeMillis(); //獲取開始時間
//資料準備
List<testObj> list = MockData.mockData();
//存放路徑
String path;
try {
path = "/Users/guang/Documents/Java_Project/testdemo/src/main/java/com/test/testdemo/test.xls";
//System.out.println(path);
writeExcel(list, path);
} catch (Exception e) {
e.printStackTrace();
}
long endTime=System.currentTimeMillis(); //獲取結束時間
System.out.println("程式執行時間: "+(endTime-startTime)/1000.0+"s");
}
}
excel裡已存在這麼多模擬資料,
python讀excel:
import xlrd
import datetime
if __name__ == '__main__':
starttime = datetime.datetime.now()
workbook = xlrd.open_workbook('/Users/guang/Documents/Java_Project/testdemo/src/main/java/com/test/testdemo/test.xls')
sheet = workbook.sheets()[0]
endtime = datetime.datetime.now()
print("拿到資料時間:{}".format(endtime - starttime))
nrows = sheet.nrows
print('資料行數:{}'.format(nrows))
這種方法其實是可取的,在資料量不是很大的情況下,效率和後面的rabbitmq差別並不是很大,但是資料量特別大的情況下速度可能會慢,在本例中有時候還要去考慮excel單個sheet的行列最大限制容量問題。
方法三:使用訊息佇列的方式,RabbitMQ
基本概念簡述:
MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊。
MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。MQ和JMS類似,但不同的是JMS是SUN JAVA訊息中介軟體服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。RabbitMQ是一個在AMQP基礎上完成的,可複用的企業訊息系統。
上圖是簡單抽象的描述,具體到 rabbitmq 有很多詳細的概念,這裡不一一詳述。當然,我們上面也說過 ,rabbitmq 是 AMQP 協議的一個開源實現,所以其內部實際上也是 AMQP 中的基本概念,包括:Message(訊息)、Publisher(訊息生產者),Exchange(交換器),Binding(繫結),Queue(訊息佇列),Connection(網路連線),Channel(通道),Consumer(訊息消費者),Virtual Host(虛擬主機),Broker(訊息佇列伺服器)。
使用說明:
首先,安裝rabbitmq,這個可以參考網上相關教程比較簡單,可以安裝在本地,也可以其他地方,而對於我們這裡java/python傳輸問題上,區別其實也就是程式碼中的host到底是localhost還是你的其他ip地址192.168.***.***, 因為我們是來告訴大家怎麼傳輸,所以安裝過程這裡不一一贅述,大家可以去網路上了解一下。我是單獨安裝在虛擬機器的ubuntu上的,安裝成功後,主機也可以訪問管理介面了:
當然我們這裡只是簡單的傳資料問題,也不需要用這個複雜介面管理啥,有興趣的同學可以去詳細瞭解下rabbitmq綜合的知識。
程式碼部分(資料我們依舊是用上面的模擬資料):
java程式碼(即生產者),
public class Producer1 {
public final static String QUEUE_NAME="data";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException{
long startTime=System.currentTimeMillis(); //獲取開始時間
//資料準備
List<testObj> list = MockData.mockData();
//建立連線工廠
ConnectionFactory factory = new ConnectionFactory();
//設定RabbitMQ相關資訊
factory.setHost("192.168.43.211");
factory.setUsername("test");
factory.setPassword("123456");
//factory.setPort(5672);
//建立一個新的連線
Connection connection = factory.newConnection();
//建立一個通道
Channel channel = connection.createChannel();
//宣告一個佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
List<testObj> list1 = new ArrayList<testObj>();
list1 = new MockData().mockData();
//傳送訊息到佇列中
ObjectMapper mapper=new ObjectMapper();
String message = mapper.writeValueAsString(list1);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//關閉通道和連線
channel.close();
connection.close();
long endTime=System.currentTimeMillis(); //獲取結束時間
System.out.println("程式執行時間: "+(endTime-startTime)/1000.0+"s");
}
}
這時候可以看到rabbitmq管理介面訊息佇列queue那裡出現了我們定義的data佇列,被java傳送上去了
python程式碼(即消費者),
if __name__ == '__main__':
starttime = datetime.datetime.now()
# 建立socket連結
credentials = pika.PlainCredentials('test', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.43.211', 5672, '/', credentials))
# 建立管道
channel = connection.channel()
# 建立佇列
queue_name = 'data'
channel.queue_declare(queue_name)
# 如果接受到訊息就呼叫回撥函式,準備接受訊息
# 宣告回撥函式
def callback(ch, method, properties, body):
message = json.loads(body.decode())
endtime = datetime.datetime.now()
print("拿資料時間:{}".format(endtime - starttime))
list = message
for i in list:
print(i)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
結果:
我們可以看到python順利拿到java傳輸的資料,至於你拿到資料後,後面要做什麼複雜的操作、分析,那就是看你自己的需要了。
另外,你不妨嘗試把模擬資料條數提高到10萬條甚至更多,前兩種方法未必能得到有效的支援,但rabbitmq基本能保持穩定有效。
總結
在解決不同語言程式之間資料傳輸問題上,方法各異,rabbitmq是一個很好的選擇,處理資料量大,且從時間效率上來說也快(單從我的資料測試結果上來看整個過程速度提高3倍左右,當然其他複雜結構資料可能稍有差異)。但如果你的資料量並不巨大,其他方法也可行,根據不同情況,做最好的選擇保證代價最低一定是我們優先考慮的事兒。
最後宣告,本篇文章只是針對java向python傳輸資料的幾種方法進行分析,所以裡面用到的一些知識不面面詳解,具體可以去參考網上相關教程。另外,由於不同機器、不同系統環境下的差異是不確定性的,所以資料測試結果可能和其他環境下有所差異,純屬正常,本文僅供參考,請根據你自己的想法去合理選擇。若有更好的方法,歡迎大家一起交流共同改進!謝謝!本篇文章著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。