java和golang通過protobuf協議相互通訊
目錄
- 整體結構說明
- protobuf2檔案
- golang客戶端
- 目錄結構
- 生成pb.go檔案
- main.go
- util.go
- java服務端
- 目錄結構
- pom.xml
- application.yml
- NettyConfig.java
- 生成Helloworld.java
- SocketServerHandler.java
- NettyServerListener.java
- Application.java
- 測試
因為裝置的通訊協議準備採用
protobuf
,所以準備這篇protobuf
的使用入門,golang
作為客戶端,java
作為服務端,這才能真正體現出protobuf
的無關語言特性。本文采用
protobuf2
,注重於如何快速入門使用,並不會涉及到具體的細節知識點。
整體結構說明
golang
作為客戶端,java
作為服務端,protobuf2
為兩者的通訊協議格式。
protobuf2檔案
protobuf2簡介
詳細說明
helloworld.proto
syntax = "proto2"; package proto; message ProtocolMessage { message SearchRequest{ required string name = 1; optional int32 search = 2 ; } message ActionRequest{ required string name = 1; optional int32 action = 2 ; } message SearchResponse{ required string name = 1; optional int32 search = 2 ; } message ActionResponse{ required string name = 1; optional int32 action = 2 ; } optional SearchRequest searchRequest = 1; optional ActionRequest actionRequest = 2; optional SearchResponse searchResponse = 3; optional ActionResponse actionResponse = 4; }
SearchRequest
和SearchResponse
為對應的請求和相應message;ActionRequest
和ActionResponse
為對應的請求和相應message;- 由於服務端使用
netty
框架,限制了只能接受一個message進行編碼解碼,所以把SearchRequest
、SearchResponse
、ActionRequest
和ActionResponse
都內嵌到ProtocolMessage
中,通過對ProtocolMessage
編碼解碼進行資料互動。
golang客戶端
目錄結構
client_proto/ ├── api │ ├── proto # 存放proto協議檔案以及生產的pd.go檔案 │ ├── helloworld.pb.go │ └── helloworld.proto ├── cmd │ ├── main.go │ ├── util │ └── util.go
採用go mod
進行開發
生成pb.go檔案
安裝proto
自行百度......
在.proto檔案處,輸入
protoc --go_out=./ helloworld.proto
即可生成
helloworld.pb.go
檔案
main.go
package main
import (
"github.com/gin-gonic/gin"
proto "grpc/api/grpc_proto"
"grpc/cmd/demo3/util"
"net/http"
"time"
)
func init() {
util.InitTransfer()
}
func main() {
router := gin.Default()
// search 測試
router.GET("/search", func(c *gin.Context) {
name := "search"
search := int32(12)
message := &proto.ProtocolMessage{
SearchRequest:&proto.ProtocolMessage_SearchRequest{
Name:&name,
Search:&search,
},
}
if err := util.G_transfer.SendMsg(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
return
}
if err := util.G_transfer.ReadResponse(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
return
}
c.JSON(200, gin.H{
"message": message.SearchResponse.Name,
})
})
// action測試
router.GET("/action", func(c *gin.Context) {
name := "action"
action := int32(34)
message := &proto.ProtocolMessage{
ActionRequest: &proto.ProtocolMessage_ActionRequest{
Name: &name,
Action: &action,
},
}
if err := util.G_transfer.SendMsg(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
}
if err := util.G_transfer.ReadResponse(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
}
c.JSON(200, gin.H{
"message": message.ActionResponse.Name,
})
})
ReadTimeout := time.Duration(60) * time.Second
WriteTimeout := time.Duration(60) * time.Second
s := &http.Server{
Addr: ":8090",
Handler: router,
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
MaxHeaderBytes: 1 << 20,
}
s.ListenAndServe()
}
util.go
package util
import (
"encoding/binary"
"errors"
"github.com/gogo/protobuf/proto"
grpc_proto "grpc/api/grpc_proto"
"net"
)
var (
G_transfer *Transfer
)
func InitTransfer() {
var (
pTCPAddr *net.TCPAddr
conn net.Conn
err error
)
if pTCPAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:3210"); err != nil {
return
}
if conn, err = net.DialTCP("tcp", nil, pTCPAddr); err != nil {
return
}
// 定義 Transfer 指標變數
G_transfer = &Transfer{
Conn: conn,
}
}
// 宣告 Transfer 結構體
type Transfer struct {
Conn net.Conn // 連線
Buf [1024 * 2]byte // 傳輸時,使用的緩衝
}
// 獲取並解析伺服器的訊息
func (transfer *Transfer) ReadResponse(response *grpc_proto.ProtocolMessage) (err error) {
_, err = transfer.Conn.Read(transfer.Buf[:4])
if err != nil {
return
}
// 根據 buf[:4] 轉成一個 uint32 型別
var pkgLen uint32
pkgLen = binary.BigEndian.Uint32(transfer.Buf[:4])
//根據pkglen 讀取訊息內容
n, err := transfer.Conn.Read(transfer.Buf[:pkgLen])
if n != int(pkgLen) || err != nil {
return
}
if err = proto.Unmarshal(transfer.Buf[:pkgLen], response); err != nil {
return
}
return
}
// 傳送訊息到伺服器
func (transfer *Transfer) SendMsg(action *grpc_proto.ProtocolMessage) (err error) {
var (
sendBytes []byte
readLen int
)
//sendBytes, ints := action.Descriptor()
if sendBytes, err = proto.Marshal(action); err != nil {
return
}
pkgLen := uint32(len(sendBytes))
var buf [4]byte
binary.BigEndian.PutUint32(buf[:4],pkgLen)
if readLen, err = transfer.Conn.Write(buf[:4]); readLen != 4 && err != nil {
if readLen == 0 {
return errors.New("傳送資料長度發生異常,長度為0")
}
return
}
// 傳送訊息
if readLen, err = transfer.Conn.Write(sendBytes); err != nil {
if readLen == 0 {
return errors.New("檢查到伺服器關閉,客戶端也關閉")
}
return
}
return
}
- 這裡傳送訊息和讀取訊息都需要先發送/解析資料的長度,然後傳送/解析資料本身;
- 這裡與服務端怎麼樣解析/傳送資料有關,這是由於
netty
框架中定義的編碼解碼器決定的。
java服務端
目錄結構
server_proto/
├── src
│ ├── main
│ ├── java
│ ├── com
│ ├── dust
│ ├── proto_server
│ ├── config
│ └── NettyConfig.java
│ ├── netty
│ └── NettyServerListener.java
│ └── SocketServerHandler.java
│ ├── proto
│ └── Helloworld.java
│ └── helloworld.proto # proto配置檔案
│ └── Application.java # 啟動配置類
│ ├── resources
│ └── application.yml #配置檔案
│ ├── test
└── pom.xml # maven配置檔案
採用springBoot
+netty
+maven
開發
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dust</groupId>
<artifactId>proto_server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>proto_server</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- protobuf依賴-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.19.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 注意:
protobuf-java
的版本為3.8.0
,必須和安裝proto.exe
的版本保持一致。
application.yml
# netty配置
netty:
# 埠號
port: 3210
# 最大執行緒數
maxThreads: 1024
# 資料包的最大長度
max_frame_length: 65535
NettyConfig.java
package com.dust.proto_server.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyConfig {
private int port;
}
生成Helloworld.java
- 在.proto檔案處,輸入
protoc --java_out=./ helloworld.proto
- 即可生成
Helloworld.java
檔案
SocketServerHandler.java
package com.dust.proto_server.netty;
import com.dust.proto_server.proto.Helloworld;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@ChannelHandler.Sharable
public class SocketServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerHandler.class);
public ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx){
Channel channel = ctx.channel();
LOGGER.info(channel.id().toString()+"加入");
CHANNEL_GROUP.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx){
Channel channel = ctx.channel();
LOGGER.info(channel.id().toString()+"退出");
CHANNEL_GROUP.remove(channel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
//
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LOGGER.info("開始讀取客戶端傳送過來的資料");
Helloworld.ProtocolMessage protocolMessage = (Helloworld.ProtocolMessage) msg;
Helloworld.ProtocolMessage.Builder builder = Helloworld.ProtocolMessage.newBuilder();
if (protocolMessage.getSearchRequest().getSerializedSize() != 0) {
Helloworld.ProtocolMessage.SearchRequest searchRequest = protocolMessage.getSearchRequest();
LOGGER.info("searchRequest--{}",searchRequest);
Helloworld.ProtocolMessage.SearchResponse searchResponse = Helloworld.ProtocolMessage.SearchResponse.newBuilder().setName("i am SearchResponse").setSearch(45).build();
builder.setSearchResponse(searchResponse);
} else if (protocolMessage.getActionRequest().getSerializedSize() != 0) {
Helloworld.ProtocolMessage.ActionRequest actionRequest = protocolMessage.getActionRequest();
LOGGER.info("actionRequest--{}",actionRequest);
Helloworld.ProtocolMessage.ActionResponse actionResponse = Helloworld.ProtocolMessage.ActionResponse.newBuilder().setName("i am ActionResponse").setAction(67).build();
builder.setActionResponse(actionResponse);
}
Helloworld.ProtocolMessage message = builder.build();
// 傳送資料長度
ctx.channel().writeAndFlush(message.toByteArray().length);
// 傳送資料本身
ctx.channel().writeAndFlush(message);
}
}
NettyServerListener.java
package com.dust.proto_server.netty;
import com.dust.proto_server.config.NettyConfig;
import com.dust.proto_server.proto.Helloworld;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
@Component
public class NettyServerListener {
/**
* NettyServerListener 日誌輸出器
*
* @author 葉雲軒 create by 2017/10/31 18:05
*/
private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
/**
* 建立bootstrap
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* BOSS
*/
EventLoopGroup boss = new NioEventLoopGroup();
/**
* Worker
*/
EventLoopGroup work = new NioEventLoopGroup();
@Resource
private SocketServerHandler socketServerHandler;
/**
* NETT伺服器配置類
*/
@Resource
private NettyConfig nettyConfig;
/**
* 關閉伺服器方法
*/
@PreDestroy
public void close() {
LOGGER.info("關閉伺服器....");
//優雅退出
boss.shutdownGracefully();
work.shutdownGracefully();
}
/**
* 開啟及服務執行緒
*/
public void start() {
// 從配置檔案中(application.yml)獲取服務端監聽埠號
int port = nettyConfig.getPort();
serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 負責通過4位元組Header指定的Body長度將訊息切割
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
// 負責將frameDecoder處理後的完整的一條訊息的protobuf位元組碼轉成ProtocolMessage物件
pipeline.addLast("protobufDecoder",
new ProtobufDecoder(Helloworld.ProtocolMessage.getDefaultInstance()));
// 負責將寫入的位元組碼加上4位元組Header字首來指定Body長度
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
// 負責將ProtocolMessage物件轉成protobuf位元組碼
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast(socketServerHandler);
}
}).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
LOGGER.info("netty伺服器在[{}]埠啟動監聽", port);
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.info("[出現異常] 釋放資源");
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
- 這個類就定義服務端是怎麼樣處理接受和傳送資料的;
frameDecoder
和protobufDecoder
對應的handler用於解碼Protobuf package資料包,他們都是Upstream Handles:先處理長度,然後再處理資料本身;frameEncoder
和protobufEncoder
對應的handler用於編碼Protobuf package資料包,他們都是Downstream Handles;此外還有一個handler,是一個自定義的Upstream Handles,用於開發者從網路資料中解析得到自己所需的資料
socketServerHandler
;上例Handles的執行順序為
upstream:frameDecoder,protobufDecoder,handler //解碼從Socket收到的資料 downstream:frameEncoder,protobufEncoder //編碼要通過Socket傳送出去的資料
Application.java
package com.dust.proto_server;
import com.dust.proto_server.netty.NettyServerListener;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Resource
private NettyServerListener nettyServerListener;
@Override
public void run(String... args) throws Exception {
nettyServerListener.start();
}
}
測試
先啟動服務端,再啟動客戶端
search測試
- action測試
相關推薦
java和golang通過protobuf協議相互通訊
目錄 整體結構說明 protobuf2檔案 golang客戶端 目錄結構 生成pb.go檔案 main.go util.go
Java與C通過JNI指標相互傳遞
轉載地址: http://blog.csdn.net/neo_86/article/details/24931509 注意 1、c中指標可以直接轉為java裡的int值,都是32位無損失(32位作業系統或者gcc 32編譯器)。 2、迴圈裡要注意釋放本地引用,因為迴圈太
NDK基礎(java ,c/c++, jni之間的關係及java和c/c++之間的相互呼叫)
1.java,c/c++,和jni之間的關係 java和c/c++可以相互呼叫,是因為java虛擬機器中的JNI。簡單的說就是用c/c++編寫一個動態連結庫讓Java虛擬機器去呼叫。(在windows環境下動態連結庫就是.dll檔案, 在Linux下就是.so檔案) 2.
對java和c語言之中程序間通訊的理解
程序間通訊: 1 管道: java裡是讀寫管道檔案即可。c語言利用mkfifo建立兩個管道文 件,java讀寫管道檔案即可。 c語言就是pipe。例子:linux 命令:ps -ef | grep ‘XXX’.解釋:ps和grep命令都是一個
保姆級別的RabbitMQ教程!包括Java和Golang兩種客戶端
[TOC] ### 什麼是AMQP 和 JMS? **AMQP**:即Advanced Message Queuing Protocol,是一個應用層標準高階訊息佇列協議,提供統一訊息服務。是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介
golang和c通過unix域實現雙向通訊
go的程式碼如下: package main import ( "fmt" "net" "bufio" "os" "syscall" ) func writeUnix(listener *net.UnixConn,dstAddr *net.UnixAddr) { for {
五、通過Protobuf整合Netty實現對協議訊息客戶端與伺服器通訊實戰
目錄 一、Protocol Buffers 是什麼? 二、Protocol Buffers 檔案和訊息詳解 三、專案實戰,直接掌握的protobuf應用。 一、Protocol Buffers 是什麼? 1、官網翻譯
關於c++(客戶端)和JAVA(服務端)的TCP通訊(基於stomp協議)(一)
最近在做軟體外掛的更新服務,其中涉及到客戶端和服務端通訊,採取服務端推送的形式進行更新資料包的推送。 經過商討後採取TCP通訊,由於服務端是JAVA的,客戶端是C++。 在http://wenku.baidu.com/view/5eb31ea1284ac850ad0242
mina和netty多客戶端即時通訊,基於google protobuf傳輸協議實現
基於Java服務端的即時通訊解決方案,與android 客戶端完美結合,同時支援其他語言的移動應用,物聯網,智慧家居,桌面應用,WEB應用以及後臺系統之間的即時消互動,為你解決了長連線各種訊息事件,斷線重連等繁瑣的處理,使用方便易於整合。此開源版本為基礎功能版本,只有訊息推送
如何將TS源流重新封裝並通過P2P協議傳輸在安卓終端和蘋果終端播放
ts p2p 直播 什麽是TS流TS流(TransportStream)即在MPEG-2系統中,由視頻,音頻的ES流和輔助數據復接生成的用於實際傳輸的標準信息流稱為MPEG-2傳送流。根據傳輸媒體的質量不同,MPEG-2中定義了兩種復合信息流:傳送流(TS)和節目流(PS:ProgramStrea
解決Linux 下server和client 通過TCP通訊:accept成功接收卻報錯的問題
ipv4 socket error 實例代碼 ... lis col argc 例子 今天在寫簡單的TCP通訊例子的時候,遇到了一個問題:server 和client能夠連接成功,並且client也能夠正常發送,但server就是接收不到,在網上搜索一番後,終於解決了問
JAVA wait()和notifyAll()實現線程間通訊
all row cache string runnable private sync ide cached 本例是閱讀Think in Java中相應章節後,自己實際寫了一下自己的實現 import java.util.concurrent.ExecutorService
SM4加密算法實現Java和C#相互加密解密
.net ++ println ffffff 預處理 AS 思路 load prop SM4加密算法實現Java和C#相互加密解密 近期由於項目需要使用SM4對數據進行加密,然後傳給Java後臺,Java後臺使用的也是SM4的加密算法但是就是解密不正確,經過一步步調
Java利用wait和notify實現執行緒間通訊
Java的Object類提供了wait和notify方法用於實現執行緒間通訊(因為所有的java類都繼承了Object類,所以所有的java類都有這兩個方法)。這兩個方法在Object類中籤名如下: pu
基於TCP協議的通訊(基於Java語言)
端與端通訊,經常由客戶端和服務端兩者組成,其中客戶端傳送請求給服務端,而服務端則響應請求。這兩者的通訊可以通過Socket來實現兩端的資料傳輸。其中Java自帶的Socket類可以建立客戶端socket,而ServerSocket可以建立一
Java中字串和byte陣列之間的相互轉換
1、將字元轉換成byte陣列 String str = "羅長"; byte[] sb = str.getBytes(); 2、將byte陣列轉換成字元 byte[] b={(byte)0xB8,(byte)0xDF,(byte)0xCB,(byte)0xD9}; String str=
Protobuf 協議的Java應用sample
Protobuf協議,全稱:Protocol Buffer 它跟JSON,XML一樣,是一個規定好的資料傳播格式。不過,它的序列化和反序列化的效率太變態了…… 來看看幾張圖你就知道它有多變態。 Protobuf的Java例項 一、 安裝Protobuf
protobuf協議java版本使用說明
1. 初始話協議欄位 xxx .proto syntax = "proto3"; //protobuf版本 option java_package = "com.xxx.protobuf"; //生成java程式碼的資料包路徑 option java_outer_classna
android與PC,C#與Java 利用protobuf 進行無障礙通訊【Socket】
轉自https://www.cnblogs.com/TerryBlog/archive/2011/04/23/2025654.html protobuf 是什麼? Protocol buffers是一種編碼方法構造的一種有效而可擴充套件的格式的資料。 谷歌使用其內
java回撥原理,以及Callable和FutureTask通過回撥機制建立可監控的執行緒
回撥的概念會JS的人應該都能理解。 回撥分非同步回撥,同步回撥。但是同步回撥其實沒什麼意義。都同步了,那麼直接等那邊執行完了,這邊再執行就可以了,沒必要通過回撥。我們說的回撥主要是講非同步回撥。用於兩個執行緒甚至兩個系統之間互動呼叫。 例如我在A類的方法funa()中,要呼叫B類的方法fun