最近被问到一个问题: 有依赖关系的两个dubbo服务。通过TCP进行连接时候,他们之间的连接是怎么控制的?怎么达到一个合理的数量?
我们从一个例子开始吧:一个订单服务 OrderService
,IP为192.168.0.110
连接了商品服务 ProductService
, ip 为192.168.0.111
,其中订单服务中的方法比较多,并且很多请求也刚好路由到192.168.0.111
的 ProductService
服务。那问题就来了:110
机器作为客户端是怎么控制连接数的?会不会无限量地和111
机器进行TCP连接?
我们先看一下Dubbo
的官方文档对“连接控制”的说明文档 : http://dubbo.apache.org/zh-cn/docs/user/demos/config-connections.html 。
在xml配置方式中xml accepts="10"
和 connections="10"
分别在服务端和客户端进行了相应的连接控制。下面我们看一下源码,追一下连接控制的原理。
我们看一下DubboProtocol.java
的创建客户端tcp连接的方法,int connectNum正是每个客户端对服务端的tcp连接数,默认是1
,当然可以修改成更大。默认是1,这样一个客户端的调用service
数最多也不会超过1000吧。这样就不会出现单机创建TCP连接数过多的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) { List<ReferenceCountExchangeClient> clients = new ArrayList<>();
for (int i = 0; i < connectNum; i++) { clients.add(buildReferenceCountExchangeClient(url)); } return clients; }
|
客户端和服务端是一对一的,建立长连接,那么如果客户端并发访问,他们是怎么和服务端交互的呢?
经过看代码:
下面咱们试图从代码中找到痕迹。一路追踪,我们来到这个类:com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.java
,先来看看其中的request
方法,大概在第101行左右:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future; }
|
注意这个方法返回的ResponseFuture
对象,当前处理客户端请求的线程在经过一系列调用后,会拿到ResponseFuture
对象,最终该线程会阻塞在这个对象的下面这个方法调用上,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); }
|
上面我已经看到请求线程已经阻塞,那么又是如何被唤醒的呢?再看一下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.java
,其实所有实现了ChannelHandler
接口的类都被设计为装饰器模式,所以你可以看到类似这样的代码:
1 2 3 4 5 6
| protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler( new HeartbeatHandler( ExtensionLoader.getExtensionLoader(Dispather.class).getAdaptiveExtension().dispath(handler, url) )); }
|
现在来仔细看一下HeaderExchangeHandler
类的定义,先看一下它定义的received
方法,下面是代码片段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { ..... } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { ..... } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
|
我们主要看中间的那个条件分支,它是用来处理响应消息的,也就是说当dubbo客户端接收到来自服务端的响应后会执行到这个分支,它简单的调用了handleResponse
方法,我们追过去看看:
1 2 3 4 5
| static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
|
熟悉的身影:DefaultFuture
,它是实现了我们上面说的ResponseFuture
接口类型,实际上细心的童鞋应该可以看到,上面request
方法中其实实例化的就是这个DefaultFutrue
对象:
1
| DefaultFuture future = new DefaultFuture(channel, req, timeout);
|
那么我们可以继续来看一下DefaultFuture.received
方法的实现细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
|
留一下我们之前提到的id的作用,这里可以看到它已经开始发挥作用了。通过id
,DefaultFuture.FUTURES
可以拿到具体的那个DefaultFuture
对象,它就是上面我们提到的,阻塞请求线程的那个对象。好,找到目标后,调用它的doReceived
方法,这就是标准的java多线程编程知识了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
|
这样我们就可以证实上图中左边的绿色箭头所标注的两点。
参考链接:https://blog.csdn.net/joeyon1985/article/details/51046548