Netconf和Cmd近期问题定位整理

笔者目前工作在一家云计算公司,主要业务是云网络。年初的时候,公司有一些功能的重构,需要一个用Java写一个与设备进行通信的RPC框架,笔者独自负责这个框架的技术选型与开发。所谓的设备其实就是一些交换机或路由器。通信的方式是通过netconf和cmd(ssh)协议。

0. 前言

2024-12-30 有这样一个报错:org.apache.sshd.common.io.WritePendingException: A write operation is already pending; cannot write 18 bytes。 服务在使用CMD命令向设备10.230.10.94发送命令的时候,出现了如下错误:

[ERROR] 15:29:16.894 [XNIO-1 task-23] [Ny6AcwwR7TiTUQCJVbcBMQgAvOsFqr] c.u.n.v.c.c.s.s.BaseSshSessionSender - 发送给设备10.230.10.94,10.230.10.94:22 发送失败,请求详情:[请求配置={senderType=CMD, timeout=PT10M, responseClass=class java.lang.String, expectedEnd=[]], expectedEndRegex='null', queueName='UCA_NETCONF_CMD_DEFAULT_QUEUE_INDEX', messageIdPrefix='null'}, 请求报文=irf topo-domain 0
, 响应结果=null],失败原因:
org.apache.sshd.common.io.WritePendingException: A write operation is already pending; cannot write 18 bytes

错误出现的地方在core-drvier服务的BaseSshSessionSender#send第238行,其代码如下:

IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));

这行代码是发送操作,其中参数bytes是要发送的内容,已经做了序列化。

上述内容出错的原因是:出现了同时调用当前channelwriteBuffer的操作(或者可以解释为:上一条指令的writeBuffer还没操作完,这时候又调了writeBuffer

1. 问题定位

理论上不应该出现这个问题才对,因为本框架是严格按照指令串行设计的。要想定位这个问题,不妨再细追下日志:

image-20250110150204901

我把上述日志分为了T1 ~T5 五个时间点

1.1 相关代码

要想深入分析这些问题,需要对发送流程有一些了解,我们先来看正常的代码流程:

业务操作:

deviceTemplate.cmd("10.230.10.94","irf priority 1","]").get();
deviceTemplate.cmd("10.230.10.94","irf topo-domain 0","]").get();

其中cmd指令会返回一个future,而.get()会同步阻塞等待指令的返回。

业务层调了cmd来发送指令后,会先进入:AbstractDevice#send

public DeviceFuture send(Object request, RequestConfig requestConfig) {
    //构造请求上下文
    RequestContext requestContext = getRequestContext(request, requestConfig);
    log.info("收到发送请求,要发送的设备:{},请求详情:{}",this.metaData.getDeviceId(),requestContext);
    //判断是否能发送(一般只能队列里没有其他指令当前指令才能发送,因此是在这里保证了串行发送)
    if (checkCanSend(requestContext)) {
        //如果能发送就直接发送
        doSend(requestContext);
    }
    //...
    return requestContext.getDevicePromise();
}

如果可以发送,最终会进入:BaseSshSessionSender#send的如下方法:

public Boolean send(RequestContext context) {
      //编码 序列化
    byte[] bytes = encode(context);
    //发送
    IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));
    //等待发送完成
    boolean written = writeFuture.verify().isWritten();
    //判断是否发送成功
    if (written) {
        log.info("发送给设备{},{}:{} 发送成功,请求详情:{}",
                this.device.metaData().getDeviceId(),
                this.device.metaData().getDeviceIp(),
                this.metaData.getPort(),
                context);
        return true;
    }
    //...
 }

整体流程大致如上,先序列化再发送。

我们选用的是NIO客户端,响应也是采取异步监听的,响应的代码大致如下:

//一旦设备session返回任何数据都会触发当前函数的调用
public void operationComplete(IoReadFuture readFuture) {
    Buffer buffer = null;
    try {
        //读取数据
        readFuture.verify(READ_TIMEOUT);
        buffer = readFuture.getBuffer();
        byte[] bytes = new byte[buffer.available()];
        buffer.getRawBytes(bytes, buffer.rpos(), buffer.available());
        responseBuilder.append(new String(bytes, StandardCharsets.UTF_8));
        //由于ssh是基于tcp的,而tcp是流协议,因此需要我们自己拼包拆包
        //这里是判断设备session返回的数据是否到了一个完整的包
        if (isEnd()) {
            String package = assemblyPackage();
            if (package != null) {
                //处理这个响应包
                handleResponse(package);
            }
        }
    }
    // ... 
}
protected void handleResponse(String response) throws Exception {
    //拿到这个响应对应的之前下发的请求
    RequestContext context = this.sender.device.peek();
    if (context != null) {
        //反序列化 解码
        Object result = context.getSender().decode(response.getBytes(), context.getRequestConfig().getResponseClass());
        //...
        //设置结果
        context.getDevicePromise().setResult(DeviceBaseResponse.success(result));
        log.info("设备{},响应完成,请求和响应详情:{}", context.getSender().device().metaData().getDeviceId(), context);
    }
    //....
}

目前我们的异步采用future-promise,正常情况下,对于一个已发送的指令的结果获取可以有两种办法:

  1. 通过.get()同步阻塞等待响应的返回
  2. 通过.addListener()异步监听指令的返回

因此这也就意味着,一旦指令真的返回,框架层至少需要做两件事:

  1. 唤醒因为.get()而阻塞的线程,让其拿到结果,继续向下执行
  2. 执行为这个指令注册的监听器函数

其核心代码在 DefaultPromise#doSetResult()

/**
 * 设置结果,核心方法
 * 思路主要为:通过CAS加原子引用设置值,保证线程安全(避免多线程一起设置返回结果)
 * 一旦结果设置完成,代表当前任务的完成,此时就需要做两件事:
 * 1. 唤醒因此而阻塞的线程
 * 2. 执行监听器的回调方法
 * @param objResult 结果
 * @return 返回是否设置成功
 */
private boolean doSetResult(Object objResult) {
    //只有 result == null 此时代表还没有人设置,那么此时可以设置result
    if (RESULT_UPDATER.compareAndSet(this, null, objResult)) {
        //核心功能:唤醒阻塞线程并且执行回调监听器
        if (checkNotifyWaiters()) {
            //执行异步方式注册的监听器
            notifyListener();
        }
        return true;
    }
    return false;
}


protected synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
        //这里是唤醒由于await/get等 而阻塞的线程
        notifyAll();
    }
    return !listeners.isEmpty();
}



/**
 * 核心方法 回调执行监听器的方法
 * 会在任务一旦完成时调用
 */
protected void notifyListener() {
    List<GenericFutureListener<?>> newListeners;
    //....
    for (;;) {
        notifyListener0(this, newListeners);
        //...
    }

}
protected void notifyListener0(Future future, List<GenericFutureListener<?>> listeners) {
    try {
        //如果有可执行器,则用可执行器执行回调,否则用当前唤醒线程执行回调
        if(executor == null){
            //...
        }else {
            //这里采用的执行监听器策略是:由一个线程来顺序执行监听器,而非多个线程并行执行监听器
            //这是因为我们要保证一些监听器要优先于另一些监听器先执行
            executor.execute(()->{
                for(GenericFutureListener l:listeners){
                    try {
                        l.operationComplete(result);
                    } catch (Exception e) {
                        log.error("监听器回调执行失败,失败详情:",e);
                    }
                }

            });
        }
    } catch (Throwable t) {
        log.error("监听器回调执行失败,失败详情:",t);
    }
}

也即一条指令的发送流程大致如下:

  1. 通过DeviceTemplate Api请求发送某个指令,可以.get()等待返回,也可以.addListener()注册监听器异步监听,执行步骤2
  2. 到达AbstractDevice#send() 判断指令是否可以发送,一般不能发送是因为队列内还有其他指令,如果不能发送,直接返回future。如果能发送(队列内没有其他指令),执行步骤3
  3. 到达BaseSshSessionSender#send(),真实的下发指令,然后返回future。
  4. 设备响应指令或指令超时后,认为指令完成,执行指令完成操作,唤醒因为当前指令.get()而阻塞的线程和执行为当前指令注册的监听器

这里的步骤2,对于此刻不可下发的指令(队列内还有其他指令),这些指令会由其前置节点完成后下发。假设指令1未返回,这时候指令2进来了要请求下发,那指令2此刻就只能入队列,不能真的发下去。当指令1完成后(设备返回或超时),再下发指令2。那这个流程是如何实现的呢?其实是用了上面说的.addListener(),也即除了业务人员可能会用.addListener()外,框架还会为每个要发送的指令单独注册一个监听器,其代码在:DefaultDevice#doSendBefore()

/**
 * doSendBefore方法是在请求发送前的拦截方法
 * 这是队列串行发送的核心实现方法,在这里主要需要实现串行链式发送功能
 * 链式调用是在future的监听器中实现,因为一旦future完成代表请求完成,此时就需要做如下事情:
 * 1. 将自己从队列中删除
 * 2. 调用新的队列头节点下发
 *
 * @param requestContext 请求上下文
 * @return true 发送 false 取消发送
 */
@Override
protected boolean doSendBefore(RequestContext requestContext) {
    //核心功能:当前请求发送并且返回后(无论是正常返回还是异常返回) 需要做如下事情:
    //  1.将自己从当前队列中移除
    //  2.判断是否有可发的继任节点(真正的头节点),如果有就需要唤醒继任节点的下发
    requestContext.getDevicePromise().addListener(result -> {
        RequestContext next;
        synchronized (requestContext.getRequestQueue()) {
            //将自己从队列内移除
            requestContext.getRequestQueue().poll();
            //取出队列里的新的头节点
            next = requestContext.getRequestQueue().peek();
            if (next != null) {
                next.setStatus(RequestContextStatus.WAIT_FOR_SEND);
            }
        }
        //如果头节点存在代表需要下发继任节点
        if (next != null) {
            log.info("DefaultDevice 触发chain发送:设备:{},请求配置:{}",this.deviceId,next);
            doSend(next);
        }
    });
    return super.doSendBefore(requestContext);
}

因此按上面的例子指令1下发的时候会为他注册这个监听器。当指令1返回后,我们会执行指令1的监听器,这时候发现队列内还有其他未下发的指令(指令2),指令1的监听器会执行下发操作,这时候就把指令2下发了。

1.2 问题定位

了解了上述代码流程后,就可以定位问题了:

image-20250110150204901

我们先只看T1、T2和T4这四个时间点,这四个时间都是发送irf priority 1指令,但其实这个日志的执行很诡异:

T1打印日志的点在AbstractDevice#send(),T2是指令已经返回了,而T4却是BaseSshSessionSender#send()指令发送成功,也即指令的返回日志比指令的发送成功日志先打印,一条指令返回比发送成功还快

这其实就是问题所在,我们把BaseSshSessionSender#send()的代码拿过来就知道了:

byte[] bytes = encode(context);
IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));
boolean written = writeFuture.verify().isWritten();
if (written) {
    log.info("发送给设备{},{}:{} 发送成功,请求详情:{}",
            this.device.metaData().getDeviceId(),
            this.device.metaData().getDeviceIp(),
            this.metaData.getPort(),
            context);
    return true;
}

其中发送成功的打印在上面代码第5行,第2行是发送,第3行是等待发送成功。

很明显,上面的T4对应第5行,那不妨把第2行IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));认为是T1.5,而第3行boolean written = writeFuture.verify().isWritten();认为是T3.5。

那就是T1时刻,AbstractDevice#send()打印收到发送请求,T1.5发送了请求,T3.5得到请求发送成功的结果,T2设备返回了,T4打印第五行,发送成功。

我们不妨认为真正的发送成功是T1.8。

也就是说T1.5我们发送了请求后,T1.8 发送成功,T2设备就返回了,只是T3.5我们才拿到发送成功的通知,然后T4打印了发送成的日志。

上述这种情况其实也合理,首先是发送和响应是两个完全不同的线程。T1.8发成功后设备响应很快,立即就返回了,触发了返回的监听处理。而第三行时间是T3.5是因为我们采用了阻塞等待发送,发送会阻塞(因为IO),发送完成后再唤醒阻塞线程,但我们知道线程的唤醒和调度是不可预测的,那很可能我们T1.5阻塞发送,T1.8发送成功然后唤醒我们的发送线程,T2收到了设备的返回,但直到T3.5之前的发送线程才被唤醒执行

因此整个流程可以梳理为:

  1. T0时刻:业务人员调用deviceTemplate.cmd("10.230.10.94","irf priority 1","]").get();发送请求
  2. T1时刻:AbstractDevice#send()收到请求,因为队列内无其他指令,因此直接发送
  3. T1.5时刻:请求发送
  4. T1.8时刻:请求发送完成
  5. T2时刻:设备响应请求,一旦设备响应了请求,我们就执行两个内容:1. 唤醒因为这个请求而阻塞线程 2. 执行这个请求注册进来的监听器。
  6. T3时刻:由于阻塞线程被唤醒,业务人员知道了自己的指令响应成功,继续发送deviceTemplate.cmd("10.230.10.94","irf topo-domain 0","]").get();,然后AbstractDevice#send()打印收到了irf topo-domain 0的请求
  7. T3.2时刻,irf topo-domain 0的发送请求走到了BaseSshSessionSender#send()但由于上一条指令irf priority 1的发送完成在T3.5时刻才唤醒,所以存在上一条指令还未完成,下一条指令就发送的情况
  8. T3.5 irf priority 1的发送阻塞线程被唤醒
  9. T4打印日志发送irf priority 1成功
  10. T5时刻 由于在T3.2~T3.5间调用IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));准备irf topo-domain 0进行实际的发送,但由于上述原因,报了错,因此打印出错误日志。

这便是上述整个错误的梳理过程,虽然我们上面写了那么多Tx时刻,但实际从T2到T4都是同一毫秒,而从T1到T5整个时间也不过是3毫秒。再加上代码采用了多处异步,NIO和多线程,且涉及到诸多线程的挂起和唤醒,而这些调度又是不可预测的因此定位起来其实挺困难的。

2. V1版解决方案

定位清楚上述问题后,我们就很容易明白,其实是由于发送这个流程不可同时进行,必须要等到上一条指令完全发送完成,才能下发下一条指令。因此笔者在V1版本的解决方案很简单,就是对BaseSshSessionSender#send方法加了把同步锁。

public synchronized Boolean send(RequestContext context) {
    //...
}

3. V1版问题

很快上述V1版本就测出了死锁的bug。要聊这个新的bug,我们还回顾下之前的发送流程:

deviceTemplate.cmd("10.230.10.94","irf member 2","]").get();
deviceTemplate.cmd("10.230.10.94","irf priority 1","]").get();
deviceTemplate.cmd("10.230.10.94","irf topo-domain 0","]").get();

上述发送流程先发送irf member 2,等irf member 2返回后,再发送irf priority 1,同理等irf priority 1返回后再发送irf topo-domain 0。 我们之前聊过,本框架严格限制了串行发送,对于要发送的指令,如果队列内没其他指令,则可以直接发送;但如果队列内有别的指令,则当前指令入队列且等上一个指令完成后,由上一个指令的监听器函数执行发送操作。

那我们现在就有个问题:理论上指令是在返回后才发的下一条,那这时候应该没有等待的指令了,这时候发送irf priority 1或者irf topo-domain 0是可以直接发送的吗?还是由上一个指令的完成后的监听器发送的呢?

答案是不一定。我们可以再看下日志

image-20250115161727936

其中XNIO-1 task-23是业务层调用的线程,而pool-21-thread-12是线程池执行监听器的线程。

因此irf priority 1没有直接发送,而是由它的上一个指令irf member 2返回后监听器里发送的,而irf topo-domain 0是直接发送的。这是为什么呢?这里我们不妨再把唤醒阻塞线程和执行监听器的代码拿过来:

/**
 * 设置结果,核心方法
 * 思路主要为:通过CAS加原子引用设置值,保证线程安全(避免多线程一起设置返回结果)
 * 一旦结果设置完成,代表当前任务的完成,此时就需要做两件事:
 * 1. 唤醒因此而阻塞的线程
 * 2. 执行监听器的回调方法
 * @param objResult 结果
 * @return 返回是否设置成功
 */
private boolean doSetResult(Object objResult) {
    //只有 result == null 此时代表还没有人设置,那么此时可以设置result
    if (RESULT_UPDATER.compareAndSet(this, null, objResult)) {
        //核心功能:唤醒阻塞线程并且执行回调监听器
        if (checkNotifyWaiters()) {
            //执行异步方式注册的监听器
            notifyListener();
        }
        return true;
    }
    return false;
}


protected synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
        //这里是唤醒由于await/get等 而阻塞的线程
        notifyAll();
    }
    return !listeners.isEmpty();
}



/**
 * 核心方法 回调执行监听器的方法
 * 会在任务一旦完成时调用
 */
protected void notifyListener() {
    List<GenericFutureListener<?>> newListeners;
    //....
    for (;;) {
        notifyListener0(this, newListeners);
        //...
    }

}
protected void notifyListener0(Future future, List<GenericFutureListener<?>> listeners) {
    try {
        //如果有可执行器,则用可执行器执行回调,否则用当前唤醒线程执行回调
        if(executor == null){
            //...
        }else {
            //这里采用的执行监听器策略是:由一个线程来顺序执行监听器,而非多个线程并行执行监听器
            //这是因为我们要保证一些监听器要优先于另一些监听器先执行
            executor.execute(()->{
                for(GenericFutureListener l:listeners){
                    try {
                        l.operationComplete(result);
                    } catch (Exception e) {
                        log.error("监听器回调执行失败,失败详情:",e);
                    }
                }

            });
        }
    } catch (Throwable t) {
        log.error("监听器回调执行失败,失败详情:",t);
    }
}

可以看到我们是先唤醒的阻塞线程,再在线程池里执行的监听器方法的,但不要就那么以为阻塞线程的执行一定比监听器方法线程的执行靠前。因为唤醒阻塞线程,阻塞线程什么时候调度是不可预测的,同理启用一个新的线程执行监听器方法,这个新的线程什么时候调度也是不可预测的。这就好像你建了两个线程,分别启动他们,但这两个线程到底哪个先执行是不一定的。

如果监听器的执行线程优先于被唤醒的阻塞线程,那就是监听器方法执行的时候会先把自己(当前指令)从队列内移除。随后被唤醒的阻塞线程执行的时候,会下发下一条指令,因为队列内无其他指令所以可以直接下发。

如果被唤醒的阻塞线程优先于监听器的执行线程,那就是被唤醒的阻塞线程下发下一条指令,但由于监听器方法还未执行,所以指令并未从队列内删除,因此不满足发送条件,入队列后就直接返回。随后,监听器方法执行的时候将自己从队列内移除后,发现还有需要继续发送的指令,所以会由它来下发这条指令。

了解了这些后,我们再来对正常发送流程做个补充:我们与设备间维持的连接不会主动断,但如果一段时间不发送请求,这个连接可能会被设备端断掉。如果连接断掉,我们并不会重连,只有在有业务需要发送的时候才重连。所以发送流程会先检查当前连接是否可用,如果是断连状态,它会进行一次重连,重连上后再发送请求。下面是BaseSshSessionSender#send()的代码

public synchronized Boolean send(RequestContext context) {
    try {
        //如果是不自动重连的服务,判断断开连接后,主动重连下
        if (!this.metaData.isAutoReconnect()) {
            if (this.status == SenderStatus.DISCONNECT
                    || this.session == null
                    || this.session.isClosed()
                    || this.session.isClosing()) {
                this.connect();
            }
        }
        //...
}

可以看到我们会调用this.connect()做重连。重连除了连接外,连接完成后还会发送握手报文,握手成功后才可以继续下发请求,而握手报文的发送,也是调的BaseSshSessionSender#send()

也即业务端调send()send()发现需要重连调connect()connect()连接成功后调send()发送握手报文,握手成功后send()才能继续走下去发送实际的业务端请求。

对于Netconf握手报文只有一条;但对于CMD,握手报文有两条(对于CMD为什么会有握手报文和为什么需要两条这个问题,原因很复杂,这里不展开)。

对于CMD,想象下,假设有一个业务请求进来:

deviceTemplate.cmd("10.230.10.94","irf member 2","]").get();

我们假设客户端的请求线程是BUSINESS-1

这时候会调到BaseSshSessionSender#send(),上面我们说过V1版本send()加了同步锁synchronized,因此BUSINESS-1会持有这个锁。然后进入send()发现设备是断连的,会调用connect()connect()做连接,连接成功后发送握手报文,假设握手报文是这样发送的:

device.send("握手报文1","]").get();
device.send("握手报文2","]").get();

那么握手报文1先发送,调到BaseSshSessionSender#send(),由于现在整个线程还是BUSINESS-1synchronized是可重入锁,所以可以进入send()并发送握手报文。等握手报文1返回后,发送握手报文2,这时就存在一个问题:握手报文2不一定还是BUSINESS-1线程直接发送了,有可能是握手报文1的监听器执行线程发送的,原因我们上面解释过了。如果是监听器执行线程发送,那握手报文2调到BaseSshSessionSender#send()的时候,由于锁被BUSINESS-1持有且没释放,那就会一直阻塞。

这样就会存在握手报文2发送拿不到锁一直阻塞,而BUSINESS-1持有锁但一直要等待握手报文2完成才能继续执行的情况,这就是加了synchronized后CMD发送有概率死锁的原因。

4. V2版解决方案

了解了原因后也很容易想到解决方案,缩小锁的加锁范围就好了。也即最初问题其实只是

IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));
written = writeFuture.verify().isWritten();

这两步不能同时执行,那单独加这两步就好,因此V2的解决方案:

public Boolean send(RequestContext context) {
      //编码 序列化
    byte[] bytes = encode(context);
    boolean written;
    synchronized (this){
        IoWriteFuture writeFuture = this.channel.getAsyncIn().writeBuffer(new ByteArrayBuffer(bytes));
        written = writeFuture.verify().isWritten();
    }
    //判断是否发送成功
    if (written) {
        log.info("发送给设备{},{}:{} 发送成功,请求详情:{}",
                this.device.metaData().getDeviceId(),
                this.device.metaData().getDeviceIp(),
                this.metaData.getPort(),
                context);
        return true;
    }
    //...
 }
最后修改:2025 年 01 月 15 日
如果觉得我的文章对你有用,请随意赞赏