1. 程式人生 > >Apache Ignite 改裝(一) -- 服務異步化支持

Apache Ignite 改裝(一) -- 服務異步化支持

ignite 微服務 異步 服務網格

本文假設讀者了解Apache Ignite,閱讀過ignite service grid的官方文檔,或使用過ignite的service grid,本文同樣假設讀者了解 java的CompletionStage的相關用法。本文涉及的ignite版本為2.4.0。

使用Apache Ignite的Service grid作為微服務開發框架, 通常是如下定義和實現Service的:

服務接口:
public interface MyService {
    public String sayHello(String to);
}

本文將實現如下樣式的Service,使其異步化:

異步化的服務接口:
public interface MyServiceAsync {
    public CompletionStage<String> sayHello(String to);
}

當前ignite對上邊這樣的異步的service方法並沒有remote支持。當調用端與服務部署再同一節點時,ignite會發起一個本地方法調用,這樣是沒有問題的,但是當服務部署端與調用端在不同節點時,ignite通過發起一個distributed task,將調用通過消息方式發布到服務部署節點,由於服務實現是異步的,通常來說,會返回一個未完成狀態的CompletionStage,後續當真正complete的時候,調用端的CompletionStage並不會被notify,即調用端永遠無法得到真正的調用結果。

為了能夠支持CompletionStage的遠程狀態專遞,我們需要對ignite進行如下改動:

org/apache/ignite/internal/processors/service/GridServiceProxy.java
...
// line 192
if(CompletionStage.class.isAssignableFrom(mtd.getReturnType())) {
    CompletableFuture<Object> cs = new CompletableFuture<>();               
    //call async and notify completion stage
    ctx.closure().callAsyncNoFailover(
        GridClosureCallMode.BROADCAST,
        new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args),
            Collections.singleton(node),
            false,
            waitTimeout,
            true).listen(f -> {
                if(f.error() != null) {
                    cs.completeExceptionally(f.error());
                }else if(f.isCancelled()) {
                    cs.cancel(false);
                }
                if(f.isDone()) {
                    try {
                        Object result = f.get();
                        if(result != null && IgniteException.class.isAssignableFrom(result.getClass())) {
                            cs.completeExceptionally((IgniteException)result);
                        }else {
                            cs.complete(f.get());
                        }
                    } catch (IgniteCheckedException e) {
                        cs.completeExceptionally(e);
                    }
                }
            });
    return cs;
}
...

這段代碼做了如下的事情:檢測當服務方法返回值是一個CompletionStage的時候,則創建一個CompletableFuture作為代理對象返回給調用端。隨後監聽服務的遠程調用的結果,並且用這個結果來更新這個CompletableFuture。到這裏,調用端的service proxy的改造就完成了。接下來,我們還需要改造服務節點這一端:

org/apache/ignite/internal/processors/job/GridJobWorker.java(line 618起的這個finally塊),改造前:
finally {
    // Finish here only if not held by this thread.
  if (!HOLD.get())
        finishJob(res, ex, sndRes);
    else
    // Make sure flag is not set for current thread.
    // This may happen in case of nested internal task call with continuation.
    HOLD.set(false);
    ctx.job().currentTaskSession(null);
    if (reqTopVer != null)
        GridQueryProcessor.setRequestAffinityTopologyVersion(null);
    }
}
改造後:
finally {
    if(res != null && CompletionStage.class.isAssignableFrom(res.getClass())) {
        final boolean sendResult = sndRes;
        final IgniteException igException = ex;
        @SuppressWarnings("unchecked")
        CompletionStage<Object> cs = (CompletionStage<Object>)res;
        cs.exceptionally(t->{
            return new IgniteException(t);
        }).thenAccept(r->{
            if (!HOLD.get()) {
                IgniteException e = igException;
                finishJob(r, e, sendResult);
            } else
            // Make sure flag is not set for current thread.
            // This may happen in case of nested internal task call with continuation.
                HOLD.set(false);
            ctx.job().currentTaskSession(null);
            if (reqTopVer != null)
                GridQueryProcessor.setRequestAffinityTopologyVersion(null);
        });
    } else {
        // Finish here only if not held by this thread.
        if (!HOLD.get())
            finishJob(res, ex, sndRes);
        else
        // Make sure flag is not set for current thread.
        // This may happen in case of nested internal task call with continuation.
            HOLD.set(false);
        ctx.job().currentTaskSession(null);
        if (reqTopVer != null)
            GridQueryProcessor.setRequestAffinityTopologyVersion(null);
    }
}

這裏做的事情是:當在服務部署節點上拿到執行結果的時候,如果發現服務返回結果是一個CompletionStage,那麽處理這個CompletionStage的exceptionally和thenAccept, 把結果發送給remote的調用端。

就這樣,通過簡單的改裝,我們使ignite有了處理異步服務方法調用的能力。下邊我們實現一個服務來看看改裝結果:

服務定義與實現:
import java.util.concurrent.CompletionStage;
import org.apache.ignite.services.Service;
public interface MyService extends Service {
    public CompletionStage<String> sayHelloAsync(String to);
    public String sayHelloSync(String to);
}
import java.util.concurrent.CompletionStage;
public class MyServiceImpl implements MyService {
    private ScheduledExecutorService es;
    @Override public void init(ServiceContext ctx) throws Exception {
        es = Executors.newSingleThreadScheduledExecutor();
    }
    @Override public CompletionStage<String> sayHelloAsync(String to){
      CompletableFuture<String> ret = new CompletableFuture<>();
        //return "async hello $to" after 3 secs
        es.schedule(()->ret.complete("async hello " + to), 3, TimeUnit.SECONDS);
        return ret;
    }
    @Override public String sayHelloSync(String to){
        return "sync hello " + to;
    }
    ...
}

然後將服務部署在Service grid中:

...
ServiceConfiguration sConf = new ServiceConfiguration();
sConf.setName("myservice.version.1");
sConf.setService(new MyServiceImpl());
sConf.setMaxPerNodeCount(2);
sConf.setTotalCount(4);
ignite.services().deploy(sConf);
...

然後啟動一個客戶端節點進行服務調用:

MyService service = ignite.services().serviceProxy("myservice.version.1",  MyService.class, false);
//test async service
service.sayHelloAsync("nathan").thenAccept(r->{
    System.out.println(r);
});
//test sync service
System.out.println(service.sayHelloSync("nathan"));
...

輸出結果:

sync hello nathan
async hello nathan

可以看到先輸出了sync的結果,大約3秒後輸出async的結果。

Apache Ignite 改裝(一) -- 服務異步化支持