
好久不见,大家最近可好,近期赶了一个月的项目终于完事了,可以闲下来学点东西了,今天咱们看下dubbo的异步转同步。
之所以说这个是因为近期项目中遇到一个需求也需要异步转同步,于是借鉴了dubbo的实现,咱们今天一看下dubbo具体是怎么做的。
源码分析dubbo远程rpc协议和网络框架有多种,我们以默认的dubbo协议、网络框架netty作为切入点,做分析,包结构如下图:
DubboInvoker这个类很重要,因为客户端没有具体的实现都是通过代理实现的调用逻辑,而这个类就是最终的工作者,其内部核心方法如下:
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isoneway = RpcUtils.isoneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
invocation.put(TIMEOUT_KEY, timeout);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture appResponseFuture =
// todo 这里就是真正发起请求地方,大家可以自行打断点,远程rpc调用最后就走到这里
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
通过断点我们可以发现最后由ExchangeChannel接口的 :
CompletableFuture
api发起的rpc请求
这个接口返回CompletableFuture这是java1.8引入的一个类,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,一路跟踪下去发现最后是由HeaderExchangeChannel这个实现类做的,源码如下:
@Override
public CompletableFuture
这里分成了2步,首先通过DefaultFuture创建一个DefaultFuture然后调用send方法发送消息,再将DefaultFuture返回。
咱们看下send方法,最终会调用到NettyChannel类,这就相对简单了就是原生netty发送消息的写法如下:
public void send(Object message, boolean sent) throws RemotingException {
// whether the channel is closed
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
removeChannelIfDisconnected(channel);
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
好再来看下DefaultFuture类都有哪些 *** 作。
private static final MapCHANNELS = new ConcurrentHashMap<>(); private static final Map FUTURES = new ConcurrentHashMap<>(); // 重点,通过次方法实现异步转同步 *** 作 private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // the result is returning, but the caller thread may still waiting // to avoid endless waiting for whatever reason, notify caller thread to return. if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception.")); } } } private static class TimeoutCheckTask implements TimerTask { private final Long requestID; TimeoutCheckTask(Long requestID) { this.requestID = requestID; } @Override public void run(Timeout timeout) { DefaultFuture future = DefaultFuture.getFuture(requestID); if (future == null || future.isDone()) { return; } if (future.getExecutor() != null) { future.getExecutor().execute(() -> notifyTimeout(future)); } else { notifyTimeout(future); } } private void notifyTimeout(DefaultFuture future) { // create exception response. Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse, true); } }
这里源码只展示了关键代码,2个变量CHANNELS、FUTURS是用来保存当前链接和调用线程的。
doReceived方法就是来接收服务端返回的,并且将其返回信息设置到调用者的CompletableFuture结果中,调用链如下:
NettyClientHandler---->
channelRead-->
HeaderExchangeHandler--->
received---->
handleResponse----->
DefaultFuture---->
received---->
doReceived
涉及到的源码如下:
NettyClientHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
}
HeaderExchangeHandler
@Override
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
DefaultFuture
public static void received(Channel channel, Response response) {
received(channel, response, false);
}
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response status is " + response.getStatus()
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
}
} finally {
CHANNELS.remove(response.getId());
}
}
我们还看到DefaultFuture做了超时处理,如果一定时间内没有得到响应就设置超时返回实现方法由DefaultFuture的内部类:
TimeoutCheckTask----->notifyTimeout
总结至此呢dubbo异步转同步的核心逻辑也算梳理清楚了,其核心类便是DefaultFutrue,使用了concurrentHashMap来记录id和对应的defaultFuture对象,并使用CompletableFuture来达到同步调用的效果。
我们借鉴了dubbo的思想,但是我们的交互采用的是短连接即每次交互重新创建链接(由于服务端是通过4G的物联网卡传输信号不稳定,电量有限所以设置的短连接),所以不能在本地保存链接信息。
所以我们采用redis保存当前事务流水,服务端一但返回就将此流水的处理结果保存至redis中,当前进程轮训redis获取结果,并启动异步线程如果超过固定时间就做超时返回处理。
通过阅读源码并解决自己在实际开发中的问题这种心情非常好,至此也更加确信了自己阅读源码的行动是正确的,dubbo处处是精华,非常多的地方值得我们借鉴,今年给自己立个flag 完整的阅读dubbo源码并坚持至少每月2篇dubbo源码分析。
好了今天就到这里了,咱们下期见!
MYSQL系列经典文章
MYSQl 深入探索系列一 redo log
MYSQl深入探索系列二 undo log
MYSQl深入探索系列三 MVCC机制
MYSQl深入探索系列四 服务端优化
MYSQL深入探索系列之五 buffer_pool
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)