首页 > 解决方案 > 由多个并行线程 NIO 异步写入(追加)到文件

问题描述

我可以利用 NIO Async API 通过多个并行线程在单个文件中写入(追加)吗?我想要做的是将查询分成不同的类别,并为每个类别分配一个线程以从数据库中获取记录并写入文件(比如 user-records.json)。

线程 A 某些类别的
用户按用户 ID 查询 - 将结果集写入 user-records.json

线程 B 某些类别 B
的用户按用户 ID 查询 - 将结果集附加到 user-records.json

线程 C 某些类别的 C
用户按用户 ID 查询 - 将结果集附加到 user-records.json

等等 ....

以下是不是实际实现的类,而只是启用异步文件写入/附加的示例-

public class AsyncAppender {

    private final AsynchronousFileChannel channel;
    /** Where new append operations are told to start writing. */
    private final AtomicLong              projectedSize;

    AsyncAppender( AsynchronousFileChannel channel) throws IOException {
        this.channel = channel;
        this.projectedSize = new AtomicLong(channel.size());
    }

    public void append( ByteBuffer buf) throws IOException {
        final int buflen = buf.remaining();
        long size;
        do {
            size = projectedSize.get();
        } while (!projectedSize.compareAndSet(size, size + buflen));

        channel.write(buf, channel.size(), channel, new WriteOp(buf, size));
    }
}

public class WriteOp implements CompletionHandler<Integer, AsynchronousFileChannel> {

    private final ByteBuffer buf;
    private long             position;

    WriteOp( ByteBuffer buf, long position) {
        this.buf = buf;
        this.position = position;
    }

    @Override
    public void completed( Integer result, AsynchronousFileChannel channel) {
        if (buf.hasRemaining()) { // incomplete write
            position += result;
            channel.write(buf, position, channel, this);
        }
    }

    @Override
    public void failed( Throwable ex, AsynchronousFileChannel channel) {
        // ?
    }
}

主要课程——

public class AsyncWriteMain {

    public static void main( String[] args) {
        AsyncWriteMain m = new AsyncWriteMain();
        m.asyncWrite();
    }

    public void asyncWrite() {
        try {
            String filePath = "D:\\temp\\user-records.txt";
            Path file = Paths.get(filePath);
            AsynchronousFileChannel asyncFile = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE);

            AsyncAppender aa = new AsyncAppender(asyncFile);

            for ( int i = 0; i < 10; i++) {
                aa.append(ByteBuffer.wrap((i + " Some text to be written").getBytes()));
            }
        } catch (IOException e) {
            System.out.println(e.getMessage());
        }
    }
}

标签: javamultithreadingasynchronousnio

解决方案


我想你可以。基本上,你需要文件锁

Future<FileLock> featureLock = asynchFileChannel.lock();
        log.info("Waiting for the file to be locked ...");
        FileLock lock = featureLock.get();
        if (lock.isValid()) {...}

这里来


推荐阅读