Apache mina-sshd 使用中的一些坑
笔者目前工作在一家云计算公司,主要业务是云网络。年初的时候,公司有一些功能的重构,需要一个用Java写一个与设备进行通信的RPC框架,笔者独自负责这个框架的技术选型与开发。所谓的设备其实就是一些交换机或路由器。通信的方式是通过netconf协议,而netconf基于ssh,因此需要一款合适的ssh客户端,笔者最终选择的是Apache mina-sshd,但使用过程中遇到了不少坑,mina的文档又非常的少,所以在这里记录下踩过的一些坑。
首先在技术选型的时候,笔者一直想的是需要一个NIO的客户端,这很重要。在Java下NIO首先想到的是Netty,但netty没有实现默认的ssh协议,自己从零写一版是完全不现实的。除了netty,还剩mina,这俩框架的其实都出自Trustin Lee之手,但这两个框架的区别还是非常大的。首先最大的区别就是生态,netty的资料很多很多,最经典的是其核心贡献者Norman Maurer写的Netty in Action。但mina就不同了,你找遍全网,也很难找到靠谱的mina文档,唯一的文档可能就是官网下的几个md,可以说是写的非常非常不靠谱了,因为作为一个基本的NIO框架,竟然完全没写怎么如何响应式(异步)地读取和写入数据,官方demo居然都是阻塞式等待的写法。其次的区别就是API,netty的API设计非常非常的优雅,阅读netty源码的时候,你随时随地都能感受到netty中的asynchronous 和event-driven思想。反观mina却设计的很乱,它似乎也想有异步和事件驱动,但总是这一个入口,那一个入口,又没有详细的文档,导致使用起来十分的困难。
笔者淌了很多地坑,所以想在此记录一下这些内容。
1. 响应式地读取和写出数据
读取
public ClientChannel createChannel() throws Exception {
//创建netconf的subsystem 详情请见:https://www.rfc-editor.org/rfc/pdfrfc/rfc6242.txt.pdf
ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SUBSYSTEM, NETCONF_SUBSYSTEM_NAME);
//设为异步channel
channel.setStreaming(ClientChannel.Streaming.Async);
channel.open().verify();
//并同时为channel注册一个监听器,用于监听读取事件
channel.getAsyncOut().read(new ByteArrayBuffer(BUFFER_PIPE_SIZE)).addListener(new NetconfChannelResponseListener(this));
return channel;
}
上面通过sesion
创建出channel
后,需要将channel
的streaming
设置为Streaming.Async
,并同时为channel
注册一个监听器,用于监听读取事件。监听器的主要内容如下:
public static class ChannelResponseListener implements SshFutureListener<IoReadFuture> {
protected final BaseSshSessionSender sender;
/**
* 当前可以理解为是一个字节缓冲池,用于承装设备返回的字节,但由于netconf和ssh都是字符型协议,所以这里直接用一个StringBuilder来作为缓冲池
*/
protected StringBuilder responseBuilder;
public ChannelResponseListener(BaseSshSessionSender sender) {
this.sender = sender;
this.responseBuilder = new StringBuilder();
}
/**
* 一旦设备响应,触发当前方法
*
* @param readFuture 读future
*/
@Override
public void operationComplete(IoReadFuture readFuture) {
Buffer buffer = null;
try {
readFuture.verify(READ_TIMEOUT);
buffer = readFuture.getBuffer();
byte[] bytes = new byte[buffer.available()];
buffer.getRawBytes(bytes, buffer.rpos(), buffer.available());
responseBuilder.append(new String(bytes, StandardCharsets.UTF_8));
// 下面是针对拆包的处理,如netconf发送的响应结尾是]]>]]>,则代表一条完整报文响应完成,否则代表还未响应完成,继续监听响应
if (isEnd()) {
String netconfPackage = assemblyPackage();
if (netconfPackage != null) {
handleResponse(netconfPackage);
}
}
} catch (Exception e) {
log.error("设备{},响应处理失败,失败原因:", sender.device().metaData().getDeviceId(), e);
} finally {
if (buffer != null) {
buffer.rpos(buffer.rpos() + buffer.available());
buffer.compact();
}
this.sender.channel.getAsyncOut().read(buffer).addListener(this);
}
}
}
其核心就是一旦收到对端发来的数据,mina会自动执行我们监听器的operationComplete
方法,然后我们从IoReadFuture
中读取数据,这类似于netty的ChannelInboundHandler
中的channelRead
事件。
assemblyPackage
与isEnd
和handleResponse
涉及到具体的业务处理,这里不再给出。
这里的写法参考自项目BroadbandForum/obbaa-netconf-stack: A NETCONF stack for use in OB-BAA implementations (github.com),具体内容为
- obbaa-netconf-stack/netconf-client/src/main/java/org/broadband_forum/obbaa/netconf/client/dispatcher/SshClientDispatcherImpl.java at master · BroadbandForum/obbaa-netconf-stack (github.com)
- obbaa-netconf-stack/netconf-client/src/main/java/org/broadband_forum/obbaa/netconf/client/ssh/SshNetconfClientSession.java at master · BroadbandForum/obbaa-netconf-stack (github.com)
- obbaa-netconf-stack/netconf-client/src/main/java/org/broadband_forum/obbaa/netconf/client/ssh/SshHelloMessageListener.java at master · BroadbandForum/obbaa-netconf-stack (github.com)
- obbaa-netconf-stack/netconf-client/src/main/java/org/broadband_forum/obbaa/netconf/client/ssh/SshNetconfClientSessionListener.java at master · BroadbandForum/obbaa-netconf-stack (github.com)
写出
IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));
如上是异步地写出,mina返回一个future
,开发人员可以自己为future
写一个监听器来监听是否写出成功。
坑
这里的坑其实在读取,上面可以看到我们其实是为一个channel
注册了一个监听器,只要这个channel
存在就一直使用这个监听器,但实际上,笔者一开始地写法是每次发送地时候注册一个监听器,这个监听器单独处理这个发送,大概写法如下:
public void send(RequestContext context) {
//注册监听器
channel.getAsyncOut()
.read(new ByteArrayBuffer(BUFFER_PIPE_SIZE))
.addListener(new NetconfChannelResponseListener(context));
byte[] bytes = encode(context);
IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));
}
这样每次发送的时候其实都会为channel
重新注册一个监听器,一个监听器只负责一次收发,看起来很好,非常的解耦。
但实际上这样写在连续发送的时候会报异常,所谓连续发送是指上一条请求发送完,对端还没响应完,还在读取对端的数据,这时候下一条指令又发送了。
2. 线程池
在创建客户端的时候,我们的代码往往是这样的:
protected void doConnect() throws Exception{
//建立连接
this.client = SshClient.setUpDefaultClient();
this.client.start();
ConnectFuture connectFuture = client.connect(metaData.getUserName(), metaData.getIp(), metaData.getPort())
.verify(metaData.getConnectTimeout());
if (connectFuture.isConnected()) {
this.session = connectFuture.getSession();
} else {
this.status = SenderStatus.DISCONNECT;
throw new ConnectException(connectFuture.getException().getMessage());
}
this.session.addPasswordIdentity(metaData.getPassword());
AuthFuture authFuture = this.session.auth().verify(metaData.getAuthTimeout());
if (!authFuture.isSuccess()) {
throw new AuthException("认证失败:" + authFuture.getException().getMessage());
}
this.channel = createChannel();
this.status = SenderStatus.CONNECTING;
}
坑1
创建SshClient
的时候会默认创建cpu*2
个线程,这些线程用于处理连接的读写等事件,类似于netty中的IO线程EventLoopGroup
。假设我们有100个要连接的服务端,当前服务器是8核CPU,则项目启动就会创建100*8*2 = 1600
个线程,如果我们与对端服务端通信并没有很频繁,那这1600个线程是非常非常浪费资源的。目前笔者并未在官方文档中找到创建SshClient
的时候指定要创建的IO线程个数的代码,从这里也可以对比出,不如netty的new NioEventLoopGroup(1)
简洁方便。
坑2
这是一个很大的坑,会存在内存泄露问题。内容如下:为保证与设备的长连接,以及尽可能地缩短与设备连接的不可用时间,笔者在设计框架的时候,让客户端自动重连,大致代码如下:
this.session.addSessionListener(new SessionListener() {
@Override
public void sessionClosed(Session session) {
synchronized (BaseSshSessionSender.this){
if(BaseSshSessionSender.this.status != SenderStatus.DISCONNECT && BaseSshSessionSender.this.status != SenderStatus.DEATH){
BaseSshSessionSender.this.status = SenderStatus.DISCONNECT;
BaseSshSessionSender.this.reConnect(true);
}
}
}
});
protected void reConnect(boolean immediately){
this.scheduledExecutorService.schedule(this::connect, immediately?0L:metaData.getReconnectInterval().getSeconds(), TimeUnit.SECONDS);
}
public void connect() {
try {
if(this.status != SenderStatus.DEATH){
//doConnect代码见上面
this.doConnect();
}
} catch (Exception e) {
this.status = SenderStatus.DISCONNECT;
this.reConnect(false);
}
}
首先如果连接失败会隔一段时间再重连,另外为session
注册一个监听器,一旦连接失败也会触发重连。
由于重连会重新创建客户端,因此又会重新申请cpu*2
个线程。但问题是,之前我们申请过的cpu*2
个线程竟然没销毁!!! 以笔者现在的服务为例,项目启动的时候总线程数大概在800多个,但由于有几个ssh服务端一直连不上,因此会触发重连,大概过了不到1小时就涨到了10k左右的线程数。另外整个服务的内存也在缓慢的增长,项目启动的时候大概占用1.2GB的内存,启动不到1小时后,内存就涨到了2.6GB。
另外由于创建的线程越来越多,线程上下文的切换也会越来越耗时,这会导致服务cpu的消耗也越来越高,以笔者当前的开发环境为例,Win10,i7-6700,24GB内存,项目启动的时候,大概800个线程,服务cpu占用率不到10%
(5%-8%
波动)。当不到一小时后,线程达到10k个线程,此时服务cpu占用率高达近20%
。
针对坑1和坑2,目前的改动是使用单例的SshClient
,由于这里的SshClient内建了一组线程池,且一个SshClient
可以创建多个session,因此采用所有session都用同一SshClient
,重连也用的这个SshClient
,不会再新建线程,自然就不会再内存泄露。
//单例Client
public static SshClient CLIENT = SshClient.setUpDefaultClient();
static {
CLIENT.start();
CLIENT.setGlobalRequestHandlers(Arrays.asList(KeepAliveHandler.INSTANCE, NoMoreSessionsHandler.INSTANCE));
}
protected void doConnect() throws Exception {
//使用单例的Client
ConnectFuture connectFuture = CLIENT.connect(metaData.getUserName(), metaData.getIp(), metaData.getPort())
.verify(metaData.getConnectTimeout());
//...
}
这样相当于所有连接,都共用一组线程池。无论你有100个还是10000个ssh 连接,数据的读写都用的是这组cpu*2
个线程。这就对代码有一个要求:不能阻塞这组IO线程!!!
上面的代码中,笔者用这组IO线程主要是做拼包拆包、反序列化等工作,基本都是CPU操作,所以不存在阻塞的点。
坑3
这个倒不算是坑,只是对比netty,设计上的一个不同:mina的IO线程并不绑定channel。
如果你熟悉netty,会知道对于netty而言,一个channel会唯一绑定一个IO线程,这样的好处是可以避免发生上下文切换。比如我们一个需求场景是:收到消息后,将消息尾追加一个hello
字符串响应给发送者。由于netty会将channel与IO线程绑定,所以我们可以在一个线程里处理:接收消息、尾追加hello
字符串、写出消息。整个处理无上下文切换,效率很高。对于这段代码:
public void operationComplete(IoReadFuture readFuture) {
Buffer buffer = null;
try {
readFuture.verify(READ_TIMEOUT);
buffer = readFuture.getBuffer();
byte[] bytes = new byte[buffer.available()];
buffer.getRawBytes(bytes, buffer.rpos(), buffer.available());
responseBuilder.append(new String(bytes, StandardCharsets.UTF_8));
// 下面是针对拆包的处理,如netconf发送的响应结尾是]]>]]>,则代表一条完整报文响应完成,否则代表还未响应完成,继续监听响应
if (isEnd()) {
String netconfPackage = assemblyPackage();
if (netconfPackage != null) {
handleResponse(netconfPackage);
}
}
} catch (Exception e) {
log.error("设备{},响应处理失败,失败原因:", sender.device().metaData().getDeviceId(), e);
} finally {
if (buffer != null) {
buffer.rpos(buffer.rpos() + buffer.available());
buffer.compact();
}
this.sender.channel.getAsyncOut().read(buffer).addListener(this);
}
}
mina每次响应式的读取数据都是从SshClient
创建的线程池中随机选一个线程。
这会有什么影响呢?由于netty channel绑定一个唯一的线程,那其实我们就可以认为负责这个channel的IO线程里的操作其实都是线程安全的,比如为netty注册的很多InBoundHandler和OutBoundHandler我们往往不需要考虑线程安全问题,因为这些方法只会在一个线程执行。但很明显mina就不同了,如果你能确定自己的执行方法是在IO线程内,你需要仔细考虑线程安全问题。
1 条评论
陈给蚵:文章真不错http://wap.jst-gpmx.cn/news/26796.html