Apache mina-sshd 使用中的一些坑

笔者目前工作在一家云计算公司,主要业务是云网络。年初的时候,公司有一些功能的重构,需要一个用Java写一个与设备进行通信的RPC框架,笔者独自负责这个框架的技术选型与开发。所谓的设备其实就是一些交换机或路由器。通信的方式是通过netconf协议,而netconf基于ssh,因此需要一款合适的ssh客户端,笔者最终选择的是Apache mina-sshd,但使用过程中遇到了不少坑,mina的文档又非常的少,所以在这里记录下踩过的一些坑。

首先在技术选型的时候,笔者一直想的是需要一个NIO的客户端,这很重要。在Java下NIO首先想到的是Netty,但netty没有实现默认的ssh协议,自己从零写一版是完全不现实的。除了netty,还剩mina,这俩框架的其实都出自Trustin Lee之手,但这两个框架的区别还是非常大的。首先最大的区别就是生态,netty的资料很多很多,最经典的是其核心贡献者Norman Maurer写的Netty in Action。但mina就不同了,你找遍全网,也很难找到靠谱的mina文档,唯一的文档可能就是官网下的几个md,可以说是写的非常非常不靠谱了,因为作为一个基本的NIO框架,竟然完全没写怎么如何响应式(异步)地读取和写入数据,官方demo居然都是阻塞式等待的写法。其次的区别就是API,netty的API设计非常非常的优雅,阅读netty源码的时候,你随时随地都能感受到netty中的asynchronousevent-driven思想。反观mina却设计的很乱,它似乎也想有异步和事件驱动,但总是这一个入口,那一个入口,又没有详细的文档,导致使用起来十分的困难。

笔者淌了很多地坑,所以想在此记录一下这些内容。

1. 响应式地读取和写出数据

读取

public ClientChannel createChannel() throws Exception {
    //创建netconf的subsystem 详情请见:https://www.rfc-editor.org/rfc/pdfrfc/rfc6242.txt.pdf
    ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SUBSYSTEM, NETCONF_SUBSYSTEM_NAME);
    //设为异步channel
    channel.setStreaming(ClientChannel.Streaming.Async);
    channel.open().verify();
    //并同时为channel注册一个监听器,用于监听读取事件
    channel.getAsyncOut().read(new ByteArrayBuffer(BUFFER_PIPE_SIZE)).addListener(new NetconfChannelResponseListener(this));
    return channel;
}

上面通过sesion创建出channel后,需要将channelstreaming设置为Streaming.Async,并同时为channel注册一个监听器,用于监听读取事件。监听器的主要内容如下:

public static class ChannelResponseListener implements SshFutureListener<IoReadFuture> {
    protected final BaseSshSessionSender sender;
    /**
     * 当前可以理解为是一个字节缓冲池,用于承装设备返回的字节,但由于netconf和ssh都是字符型协议,所以这里直接用一个StringBuilder来作为缓冲池
     */
    protected StringBuilder responseBuilder;


    public ChannelResponseListener(BaseSshSessionSender sender) {
        this.sender = sender;
        this.responseBuilder = new StringBuilder();
    }

    /**
     * 一旦设备响应,触发当前方法
     *
     * @param readFuture 读future
     */
    @Override
    public void operationComplete(IoReadFuture readFuture) {
        Buffer buffer = null;
        try {
            readFuture.verify(READ_TIMEOUT);
            buffer = readFuture.getBuffer();
            byte[] bytes = new byte[buffer.available()];
            buffer.getRawBytes(bytes, buffer.rpos(), buffer.available());
            responseBuilder.append(new String(bytes, StandardCharsets.UTF_8));
            // 下面是针对拆包的处理,如netconf发送的响应结尾是]]>]]>,则代表一条完整报文响应完成,否则代表还未响应完成,继续监听响应
            if (isEnd()) {
                String netconfPackage = assemblyPackage();
                if (netconfPackage != null) {
                    handleResponse(netconfPackage);
                }
            }
        } catch (Exception e) {
            log.error("设备{},响应处理失败,失败原因:", sender.device().metaData().getDeviceId(), e);
        } finally {
            if (buffer != null) {
                buffer.rpos(buffer.rpos() + buffer.available());
                buffer.compact();
            }
            this.sender.channel.getAsyncOut().read(buffer).addListener(this);
        }
    }
}

其核心就是一旦收到对端发来的数据,mina会自动执行我们监听器的operationComplete方法,然后我们从IoReadFuture中读取数据,这类似于netty的ChannelInboundHandler中的channelRead事件。

assemblyPackageisEndhandleResponse涉及到具体的业务处理,这里不再给出。

这里的写法参考自项目BroadbandForum/obbaa-netconf-stack: A NETCONF stack for use in OB-BAA implementations (github.com),具体内容为

写出

IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));

如上是异步地写出,mina返回一个future,开发人员可以自己为future写一个监听器来监听是否写出成功。

这里的坑其实在读取,上面可以看到我们其实是为一个channel注册了一个监听器,只要这个channel存在就一直使用这个监听器,但实际上,笔者一开始地写法是每次发送地时候注册一个监听器,这个监听器单独处理这个发送,大概写法如下:

public void send(RequestContext context) {
    //注册监听器
    channel.getAsyncOut()
        .read(new ByteArrayBuffer(BUFFER_PIPE_SIZE))
        .addListener(new NetconfChannelResponseListener(context));
     byte[] bytes = encode(context);
    IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));
}

这样每次发送的时候其实都会为channel重新注册一个监听器,一个监听器只负责一次收发,看起来很好,非常的解耦。

但实际上这样写在连续发送的时候会报异常,所谓连续发送是指上一条请求发送完,对端还没响应完,还在读取对端的数据,这时候下一条指令又发送了。

2. 线程池

在创建客户端的时候,我们的代码往往是这样的:

protected void doConnect() throws Exception{
    //建立连接
    this.client = SshClient.setUpDefaultClient();
    this.client.start();
    ConnectFuture connectFuture = client.connect(metaData.getUserName(), metaData.getIp(), metaData.getPort())
            .verify(metaData.getConnectTimeout());
    if (connectFuture.isConnected()) {
        this.session = connectFuture.getSession();
    } else {
        this.status = SenderStatus.DISCONNECT;
        throw new ConnectException(connectFuture.getException().getMessage());
    }

    this.session.addPasswordIdentity(metaData.getPassword());
    AuthFuture authFuture = this.session.auth().verify(metaData.getAuthTimeout());
    if (!authFuture.isSuccess()) {
        throw new AuthException("认证失败:" + authFuture.getException().getMessage());
    }
    this.channel = createChannel();
    this.status = SenderStatus.CONNECTING;
}

坑1

创建SshClient的时候会默认创建cpu*2个线程,这些线程用于处理连接的读写等事件,类似于netty中的IO线程EventLoopGroup。假设我们有100个要连接的服务端,当前服务器是8核CPU,则项目启动就会创建100*8*2 = 1600个线程,如果我们与对端服务端通信并没有很频繁,那这1600个线程是非常非常浪费资源的。目前笔者并未在官方文档中找到创建SshClient的时候指定要创建的IO线程个数的代码,从这里也可以对比出,不如netty的new NioEventLoopGroup(1)简洁方便。

坑2

这是一个很大的坑,会存在内存泄露问题。内容如下:为保证与设备的长连接,以及尽可能地缩短与设备连接的不可用时间,笔者在设计框架的时候,让客户端自动重连,大致代码如下:

this.session.addSessionListener(new SessionListener() {
    @Override
    public void sessionClosed(Session session) {
        synchronized (BaseSshSessionSender.this){
            if(BaseSshSessionSender.this.status != SenderStatus.DISCONNECT && BaseSshSessionSender.this.status != SenderStatus.DEATH){
                BaseSshSessionSender.this.status = SenderStatus.DISCONNECT;
                BaseSshSessionSender.this.reConnect(true);
            }
        }
    }
});
protected void reConnect(boolean immediately){
    this.scheduledExecutorService.schedule(this::connect, immediately?0L:metaData.getReconnectInterval().getSeconds(), TimeUnit.SECONDS);
}

public void connect() {
    try {
        if(this.status != SenderStatus.DEATH){
            //doConnect代码见上面
            this.doConnect();
        }
    } catch (Exception e) {
        this.status = SenderStatus.DISCONNECT;
        this.reConnect(false);
    }
}

首先如果连接失败会隔一段时间再重连,另外为session注册一个监听器,一旦连接失败也会触发重连。

由于重连会重新创建客户端,因此又会重新申请cpu*2个线程。但问题是,之前我们申请过的cpu*2个线程竟然没销毁!!! 以笔者现在的服务为例,项目启动的时候总线程数大概在800多个,但由于有几个ssh服务端一直连不上,因此会触发重连,大概过了不到1小时就涨到了10k左右的线程数。另外整个服务的内存也在缓慢的增长,项目启动的时候大概占用1.2GB的内存,启动不到1小时后,内存就涨到了2.6GB。

另外由于创建的线程越来越多,线程上下文的切换也会越来越耗时,这会导致服务cpu的消耗也越来越高,以笔者当前的开发环境为例,Win10,i7-6700,24GB内存,项目启动的时候,大概800个线程,服务cpu占用率不到10%5%-8%波动)。当不到一小时后,线程达到10k个线程,此时服务cpu占用率高达近20%

针对坑1和坑2,目前的改动是使用单例的SshClient,由于这里的SshClient内建了一组线程池,且一个SshClient可以创建多个session,因此采用所有session都用同一SshClient,重连也用的这个SshClient,不会再新建线程,自然就不会再内存泄露。

//单例Client
public static SshClient CLIENT = SshClient.setUpDefaultClient();
static {
    CLIENT.start();
    CLIENT.setGlobalRequestHandlers(Arrays.asList(KeepAliveHandler.INSTANCE, NoMoreSessionsHandler.INSTANCE));
}
protected void doConnect() throws Exception {
    //使用单例的Client
    ConnectFuture connectFuture = CLIENT.connect(metaData.getUserName(), metaData.getIp(), metaData.getPort())
            .verify(metaData.getConnectTimeout());
    //...
}

这样相当于所有连接,都共用一组线程池。无论你有100个还是10000个ssh 连接,数据的读写都用的是这组cpu*2个线程。这就对代码有一个要求:不能阻塞这组IO线程!!!

上面的代码中,笔者用这组IO线程主要是做拼包拆包、反序列化等工作,基本都是CPU操作,所以不存在阻塞的点。

坑3

这个倒不算是坑,只是对比netty,设计上的一个不同:mina的IO线程并不绑定channel

如果你熟悉netty,会知道对于netty而言,一个channel会唯一绑定一个IO线程,这样的好处是可以避免发生上下文切换。比如我们一个需求场景是:收到消息后,将消息尾追加一个hello字符串响应给发送者。由于netty会将channel与IO线程绑定,所以我们可以在一个线程里处理:接收消息、尾追加hello字符串、写出消息。整个处理无上下文切换,效率很高。对于这段代码:

public void operationComplete(IoReadFuture readFuture) {
    Buffer buffer = null;
    try {
        readFuture.verify(READ_TIMEOUT);
        buffer = readFuture.getBuffer();
        byte[] bytes = new byte[buffer.available()];
        buffer.getRawBytes(bytes, buffer.rpos(), buffer.available());
        responseBuilder.append(new String(bytes, StandardCharsets.UTF_8));
        // 下面是针对拆包的处理,如netconf发送的响应结尾是]]>]]>,则代表一条完整报文响应完成,否则代表还未响应完成,继续监听响应
        if (isEnd()) {
            String netconfPackage = assemblyPackage();
            if (netconfPackage != null) {
                handleResponse(netconfPackage);
            }
        }
    } catch (Exception e) {
        log.error("设备{},响应处理失败,失败原因:", sender.device().metaData().getDeviceId(), e);
    } finally {
        if (buffer != null) {
            buffer.rpos(buffer.rpos() + buffer.available());
            buffer.compact();
        }
        this.sender.channel.getAsyncOut().read(buffer).addListener(this);
    }
}

mina每次响应式的读取数据都是从SshClient创建的线程池中随机选一个线程。

这会有什么影响呢?由于netty channel绑定一个唯一的线程,那其实我们就可以认为负责这个channel的IO线程里的操作其实都是线程安全的,比如为netty注册的很多InBoundHandler和OutBoundHandler我们往往不需要考虑线程安全问题,因为这些方法只会在一个线程执行。但很明显mina就不同了,如果你能确定自己的执行方法是在IO线程内,你需要仔细考虑线程安全问题。

最后修改:2024 年 06 月 18 日
如果觉得我的文章对你有用,请随意赞赏