1. 程式人生 > >grpc 在java中的使用 ---2

grpc 在java中的使用 ---2

 

 

 

 

 

歡迎回來!

2.請求流介面

(客戶端可以源源不斷的給服務端傳引數,服務端會源源不斷的接受服務端的引數,最後在客戶端完成請求的時候,服務端返回一個結果)

 

在.proto檔案中新加一個方法,這個方法的引數被 stream 關鍵字修飾

rpc methodRequestStream(stream Request) returns (Result) {}

  

然後用maven,清理一下快取,重新編譯一下

 

2.1.服務端

 重新編譯之後,實現剛剛新加的方法

    @Override
    public StreamObserver<Request> methodRequestStream(StreamObserver<Result> responseObserver) {
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request request) {
                System.out.print("收到了請求 \n");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
                responseObserver.onNext(result);
                responseObserver.onCompleted();
            }
        };
    }

  

(友情提示,如果 StreamObserver  的的泛型是Result 我們就叫 返回流觀察者,如果是 Request 就叫請求流觀察者,這樣好描述一些)

這個和普通的有點不一樣,直接返回了一個 請求流觀察者 的介面實現,而且方法的引數還是一個 返回流觀察者 ,好像搞反了一樣,至於為什麼,一會在客戶端那裡 統一說

 

2.2.客戶端

請求流式非同步呼叫,普通的是同步呼叫,我們在普通的方法裡建立的例項 也是同步的,所以我們要在 JavaGrpcClient 中新加一個 非同步呼叫的方法,新增一個非同步的例項

public <Result> Result runAsync(Functional<TestServiceGrpc.TestServiceStub,Result> functional)
    {
        TestServiceGrpc.TestServiceStub testServiceStub =
                TestServiceGrpc.newStub(channel);

        return functional.run(testServiceStub);
    }

TestServiceGrpc.newStub 返回的是一個非同步的例項

 

再加一個測試

 

@Test
    public void contextLoads2() {
        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
        StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
            @Override
            public void onNext(Result result) {
                System.out.print("返回了結果 \n");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {

            }
        };
        StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
        result.onNext(request);
        result.onNext(request);
        result.onNext(request);
        result.onCompleted();

        try {
            Thread.sleep(600000);
        }
        catch (Exception ex){}
    }

  

這裡我們實現了一個 返回流觀察者 

StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
            @Override
            public void onNext(Result result) {
                System.out.print("返回了結果 \n");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {

            }
        };

  

呼叫方法的時候,將我們實現的 返回流觀察者 傳進去,返回給我們一個 請求流觀察者

StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));

  

其實這裡返回的 請求流觀察者 就是服務端那裡返回給我們的內個實現,服務端那裡 返回流觀察者 是我們實現的 傳給他的

 

由於是非同步呼叫,最後暫停一下,要不測試跑完,程式結束 開沒開始就結束了

try {
    Thread.sleep(600000);
}
catch (Exception ex){}

  

 

執行起來看結果

服務端的列印

 

客戶端的列印

 

這裡我們傳送了三次引數過去

result.onNext(request);
result.onNext(request);
result.onNext(request);

  

就相當於 服務端 那邊返回的 請求流觀察者 被呼叫了 三次 ,所以就列印了三句話

 

傳送完引數結束請求

result.onCompleted();

  

服務端那裡的結束請求中呼叫了一次我們傳給他的 返回流觀察者 中的 onNext 方法

所以客戶端就列印了一次

 

這裡會有人問 這裡不能返回 多個嗎

不能,雖然 這兩個觀察者 看上去一樣 都是 StreamObserver 介面,但是,這個方法只是請求流呼叫,在grpc的內部 最後返回的時候 只返回第一個指定的返回只,不管返回了多少個,在客戶端那邊只會收到 第一個返回的結果

 

 

3.響應流介面

(和請求流介面完全相反,請求流是非同步,響應流是同步,請求流是接受多個請求返回一個結果,響應流是接受一個請求返回多個結果)

 

 我們在.proto檔案中再增加一個方法,這回這個方法的返回值被 stream 關鍵字修飾

rpc methodResultStream(Request) returns (stream Result){}

  

清快取,重新編譯

3.1.服務端

 實現剛剛新加的方法

@Override
    public void methodResultStream(Request request, StreamObserver<Result> responseObserver) {
        System.out.print("收到了請求 \n");
        Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
        responseObserver.onNext(result);
        responseObserver.onNext(result);
        try {
            Thread.sleep(2000);
        }
        catch (Exception ex){}
        responseObserver.onNext(result);
        responseObserver.onCompleted();
    }

  

 

這裡跟普通的差不多,只是我們返回了三次結果

responseObserver.onNext(result);
responseObserver.onNext(result);
try {
    Thread.sleep(2000);
}
catch (Exception ex){}
responseObserver.onNext(result);

  

 

3.2.客戶端

沒啥好加的了,直接上測試

@Test
    public void contextLoads3() {
        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
        Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request));

        result.forEachRemaining(o ->
        {
            System.out.print("返回了結果 \n");
        });
        System.out.print("結束 \n");
    }

  

 

返回流請求是同步的,所以要調同步的方法,返回了一個迭代器

Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request));

  

迭代器中有服務端的所有返回結果

result.forEachRemaining(o ->
{
    System.out.print("返回了結果 \n");
});

  

執行結果

服務端結果

 

客戶端結果

由於是同步呼叫,在forEach中會等待服務端的每一個返回結果

 

 

4.雙向流介面

 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

歇會,抽根菸!

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

在.proto檔案中再加一個方法

rpc methodDoubleStream(stream Request) returns (stream Result){}

  

實現

 

雙向流的服務端和請求流的沒啥區別,只是在接收到請求的時候沒有立刻結束請求

@Override
    public StreamObserver<Request> methodDoubleStream(StreamObserver<Result> responseObserver) {
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request value) {
                System.out.print("收到了請求 \n");
                Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
                responseObserver.onNext(result);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

  

客戶端也沒啥區別

@Test
    public void contextLoads4() {
        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
        StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
            @Override
            public void onNext(Result result) {
                System.out.print("返回了結果 \n");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {

            }
        };
        StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodDoubleStream(responseObserver));
        result.onNext(request);
        result.onNext(request);
        result.onNext(request);
        result.onCompleted();

        try {
            Thread.sleep(600000);
        }
        catch (Exception ex){}
    }

  

雙向流也是非同步的,所以要等待

try {
    Thread.sleep(600000);
}
catch (Exception ex){}

  

 服務端結果

 

客戶端結果

 

完結!撒花!