首页 > 解决方案 > 将 Netty 与 ClamAV Instream 一起使用

问题描述

我一直在纠结使用 Netty 将字节流式传输到 ClamAV 服务的配置。我正在运行 Apache Camel 路线。

使用 Netty,我无法拦截“超出 INSTREAM 大小限制”消息。

INSTREAM 必须在此命令前加上 n 或 z。扫描数据流。在 INSTREAM 之后,流在发送命令的同一个套接字上以块的形式发送到 clamd。这避免了建立新 TCP 连接的开销和 NAT 问题。块的格式是: '' 其中是以字节为单位的以下数据的大小,以网络字节顺序表示为 4 字节无符号整数,并且是实际块。流式传输通过发送一个长度为零的块来终止。注意:不要超过 clamd.conf 中定义的 StreamMaxLength,否则 clamd 将回复超过 INSTREAM 大小限制并关闭连接。

使用直接同步套接字连接我没有问题。谁能指出我应该如何使用 Netty 来做到这一点的正确方向?或者我应该坚持使用同步套接字连接。

使用同步套接字实现。归功于https://github.com/solita/clamav-java “Antti Virtanen”。

    private class UseSocket implements Processor{
        @Override
        public void process(Exchange exchange) throws Exception{
            try (BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class));
                 Socket socket = new Socket("localhost", 3310);
                 BufferedOutputStream socketOutput = new BufferedOutputStream(socket.getOutputStream())){
                byte[] command = "zINSTREAM\0".getBytes();
                socketOutput.write(command);
                socketOutput.flush();
                byte[] chunk = new byte[2048];
                int chunkSize;
                try(BufferedInputStream socketInput = new BufferedInputStream(socket.getInputStream())){
                    for(chunkSize = message.read(chunk);chunkSize > -1;chunkSize = message.read(chunk)){
                        socketOutput.write(ByteBuffer.allocate(4).putInt(chunkSize).array());
                        socketOutput.write(chunk, 0, chunkSize);
                        socketOutput.flush();

                        if(processReply(socketInput, exchange)){
                            return;
                        }
                    }
                    socketOutput.write(ByteBuffer.allocate(4).putInt(0).array());
                    socketOutput.flush();
                    processReply(socketInput, exchange);
                }
            }
        }

        private boolean processReply(BufferedInputStream in, Exchange exchange) throws Exception{
            if(in.available() > 0) {
                logger.info("processing reply");
                byte[] replyBytes = new byte[256];
                int replySize = in.read(replyBytes);
                if (replySize > 0) {
                    String reply = new String(replyBytes, 0, replySize, StandardCharsets.UTF_8);
                    String avStatus = "infected";
                    if ("stream: OK\0".equals(reply)) {
                        avStatus = "clean";
                    } else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
                        avStatus = "overflow";
                    }
                    exchange.getIn().setHeader("av-status", avStatus);
                    return true;
                }
            }
            return false;
        }
    }   

使用带有入站和出站通道处理程序的 Netty 实现。

    private class UseNetty implements Processor{

        @Override
        public void process(Exchange exchange) throws Exception{
            logger.info(CLASS_NAME + ": Creating Netty client");
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.remoteAddress(new InetSocketAddress("localhost", 3310));
                bootstrap.handler(new ClamAvChannelIntializer(exchange));
                ChannelFuture channelFuture = bootstrap.connect().sync();
                channelFuture.channel().closeFuture().sync();
            }catch(Exception ex) {
                logger.error(CLASS_NAME + ": ERROR", ex);
            }
            finally
            {
                eventLoopGroup.shutdownGracefully();
                logger.info(CLASS_NAME + ": Netty client closed");
            }
        }
    }

public class ClamAvChannelIntializer extends ChannelInitializer<SocketChannel> {
    private Exchange exchange;
    public ClamAvChannelIntializer(Exchange exchange){
        this.exchange = exchange;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new ClamAvClientWriter());
        socketChannel.pipeline().addLast(new ClamAvClientHandler(exchange));
    }
}

public class ClamAvClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    String CLASS_NAME;
    Logger logger;
    private Exchange exchange;
    public static final int MAX_BUFFER = 2048;
    public ClamAvClientHandler(Exchange exchange){
        super();
        CLASS_NAME = this.getClass().getName();
        logger = LoggerFactory.getLogger(CLASS_NAME);
        this.exchange = exchange;
    }

    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception{
        logger.info(CLASS_NAME + ": Entering channelActive");
        channelHandlerContext.write(exchange);
        logger.info(CLASS_NAME + ": Exiting channelActive");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause){
        cause.printStackTrace();
        channelHandlerContext.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        logger.info(CLASS_NAME + ": Entering channelRead0");
        String reply = byteBuf.toString(CharsetUtil.UTF_8);
        logger.info(CLASS_NAME + ": Reply = " + reply);
        String avStatus = "infected";
        if ("stream: OK\0".equals(reply)) {
            avStatus = "clean";
        } else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
            avStatus = "overflow";
        } else{
            logger.warn("Infected or unknown reply = " + reply);
        }
        exchange.getIn().setHeader("av-status", avStatus);
        logger.info(CLASS_NAME + ": Exiting channelRead0");
        channelHandlerContext.close();
    }
}

public class ClamAvClientWriter extends ChannelOutboundHandlerAdapter {
    String CLASS_NAME;
    Logger logger;
    public static final int MAX_BUFFER = 64000;//2^16
    public ClamAvClientWriter(){
        CLASS_NAME = this.getClass().getName();
        logger = LoggerFactory.getLogger(CLASS_NAME);
    }
    @Override
    public void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception{
        logger.info(CLASS_NAME + ": Entering write");
        Exchange exchange = (Exchange)o;
        try(BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class))){
            channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("zINSTREAM\0".getBytes()));
            byte[] chunk = new byte[MAX_BUFFER];
            for(int i=message.read(chunk);i>-1;i=message.read(chunk)){
                byte[] chunkSize = ByteBuffer.allocate(4).putInt(i).array();
                channelHandlerContext.write(Unpooled.copiedBuffer(chunkSize));
                channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(chunk, 0, i));
            }
            channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(ByteBuffer.allocate(4).putInt(0).array()));
        }
        logger.info(CLASS_NAME + ": Exiting write");
    }
}

标签: javaapache-camelnettyclam

解决方案


我最终放弃了尝试使用 Netty 来解决这个问题。我创建了一个新的 Camel 处理器并将套接字流打包在其中。下面的代码以防有人遇到类似问题。

public class ClamAvInstream implements Processor {
Logger logger;
private final int MAX_BUFFER = 2048;

public ClamAvInstream() {
    logger = LoggerFactory.getLogger(this.getClass().getName());
}

@Override
public void process(Exchange exchange) throws Exception {
    try (BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class));
         Socket socket = new Socket("localhost", 3310);
         BufferedOutputStream socketOutput = new BufferedOutputStream(socket.getOutputStream())) {
        byte[] command = "zINSTREAM\0".getBytes();
        socketOutput.write(command);
        socketOutput.flush();
        byte[] chunk = new byte[MAX_BUFFER];
        int chunkSize;
        try (BufferedInputStream socketInput = new BufferedInputStream(socket.getInputStream())) {
            for (chunkSize = message.read(chunk); chunkSize > -1; chunkSize = message.read(chunk)) {
                socketOutput.write(ByteBuffer.allocate(4).putInt(chunkSize).array());
                socketOutput.write(chunk, 0, chunkSize);
                socketOutput.flush();

                receivedReply(socketInput, exchange);
            }
            socketOutput.write(ByteBuffer.allocate(4).putInt(0).array());
            socketOutput.flush();
            receivedReply(socketInput, exchange);
        } catch(ClamAvException ex){ //close socketInput
            logger.warn(ex.getMessage());
        }
    }//close message, socket, socketOutput
}

private class ClamAvException extends Exception{
    private ClamAvException(String error){
        super(error);
    }
}

private void receivedReply(BufferedInputStream in, Exchange exchange) throws Exception{
    if(in.available() > 0){
        byte[] replyBytes = new byte[256];
        int replySize = in.read(replyBytes);
        if (replySize > 0) {
            String reply = new String(replyBytes, 0, replySize, StandardCharsets.UTF_8);
            logger.info("reply="+reply);
            if(reply.contains("OK")){
                exchange.getIn().setHeader("av-status", "clean");
            }else if(reply.contains("ERROR")){
                if(reply.equals("INSTREAM size limit exceeded. ERROR\0")){
                    exchange.getIn().setHeader("av-status", "overflow");
                }else {
                    exchange.getIn().setHeader("av-status", "error");
                }
                throw new ClamAvException(reply);
            }else if(reply.contains("FOUND")){
                exchange.getIn().setHeader("av-status", "infected");
            }else{
                exchange.getIn().setHeader("av-status", "unknown");
            }
        }
    }
}

}


推荐阅读