public class gRPCWatcher {
/**
* grpc channel
*/
private final gRPCChannel gRPCChannel;
/**
* 当前重连的次数
*/
private int hasReconnectTimes = 0;
/**
* 最大重连次数
*/
private final int maxReconnectTimes = 3;
/**
* 构造函数
*/
public gRPCWatcher(gRPCChannel gRPCChannel) {
this.gRPCChannel = gRPCChannel;
}
public void startWatch(Request confRequest, WatchHandler watchHandler) {
StreamObserver<ConfResponse> streamObserver = new StreamObserver<ConfResponse>() {
@Override
public void onNext(ConfResponse confResponse) {
try {
watchHandler.process(confResponse);
} catch (Exception e) {
//...
}
hasReconnectTimes = 0;
}
@Override
public void onError(Throwable throwable) {
//error 的时候尝试重新调用,超过最大次数失败抛出异常
if (hasReconnectTimes < maxReconnectTimes) {
hasReconnectTimes++;
startWatch(confRequest, watchHandler);
} else {
throw new RuntimeException("reWatch "+maxReconnectTimes+" times still error" + throwable.getMessage());
}
}
@Override
public void onCompleted() {
//...
}
};
ConfigGrpc.newStub(gRPCChannel.getNettyChannel()).withWaitForReady().watch(confRequest, streamObserver);
}
}
这是一段处理服务端流的 grpc 调用的逻辑,当网络出问题等异常情况的时候,会进入 onError, grpc channel 底层自带重连重试机制,所以我们只要重新 stub call 就行了。
所以我在 onError 的时候重新调用该方法,为了防止无限调,所以加了以这个最大重试次数,不知道这样写优雅不优雅- -
关于 grpc 的重连 可以看官方的讨论,他们也是推荐 channel 可以复用 主要 stub 重新 call https://groups.google.com/g/grpc-io/c/quToVM4NhdQ
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.