首页 > 解决方案 > 重新启动 appender 时,Chronicle Queue tailer 停止

问题描述

在下面的代码中,重新启动 tailer 进程就可以了。但是,重新启动 appender 进程会导致 tailer 无法接收更多消息。有没有办法重新启动附加程序并保持通道打开?

已编辑:以下是我用来始终如一地重新创建问题的完整课程。环境:Ubuntu 18 Chronicle-queue-5.16.9.jar

1) java com.tradeplacer.util.IpcTest 生产者

2) java com.tradeplacer.util.IpcTest 消费者

3)杀死生产者

4)重启生产者

5)注意消费者不再阅读任何东西

package com.tradeplacer.util;

import java.nio.ByteBuffer;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;

public class IpcTest {
  private static final String DIR = "chronicle-test";

  public static final void startProducer() {
    new Thread() {
      public void run() {
    System.out.println("starting producer...");
    ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
    ExcerptAppender appender = queue.acquireAppender();
    ByteBuffer ipcBuffer = ByteBuffer.allocate(8192);

    for (int i = 0; i < Integer.MAX_VALUE; i++) {
      ipcBuffer.clear();
      ipcBuffer.put(("data" + i).getBytes());
      Bytes<ByteBuffer> bbb = Bytes.wrapForWrite(ipcBuffer);
      appender.writeBytes(bbb);
      try {
        Thread.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
      }
    }.start();
  }

  public static final void startConsumer() {
    new Thread() {
      public void run() {
    System.out.println("starting consumer...");
    ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
    ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
    Bytes bytes = Bytes.allocateDirect(8192);

    while (true) {
      try {
        long ipcIndex = tailer.index();
        boolean read = tailer.readBytes(bytes);
        int len = bytes.length();
        byte[] data = new byte[len];
        bytes.read(data);
        if (read) {
          System.out.println("read " + data);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }

      }
    }.start();
  }

  public static void main(final String[] args) {
    if ("producer".equals(args[0]))
      startProducer();
    else
      startConsumer();
  }
}

标签: javachroniclechronicle-queue

解决方案


我稍微修改了代码以减少对象创建。在最新版本 5.17.1 上,我可以多次重启生产者,消费者继续读取数据。

注意:如果您要编写文本,该writeText方法可能是更好的选择。

如果您想编写更复杂的东西,我建议使用 Wire 或每个 MethodReader/MethodWriters,它们允许您进行接口方法调用。

package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;

import java.nio.ByteBuffer;

public class IpcTest {
    private static final String DIR = "chronicle-test";

    public static final void startProducer() {
        System.out.println("starting producer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        ExcerptAppender appender = queue.acquireAppender();
        Bytes<ByteBuffer> bytes = Bytes.elasticByteBuffer(8192);
        ByteBuffer ipcBuffer = bytes.underlyingObject();

        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            ipcBuffer.clear();
            ipcBuffer.put(("data" + i).getBytes());
            bytes.readPositionRemaining(0, ipcBuffer.position());
            appender.writeBytes(bytes);

            Jvm.pause(1);
        }
    }

    public static final void startConsumer() {
        System.out.println("starting consumer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
        Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(8192);
        Pauser pauser = Pauser.balanced();
        while (true) {
            try {
                long ipcIndex = tailer.index();
                bytes.clear();
                boolean read = tailer.readBytes(bytes);
                if (read) {
                    byte[] data = bytes.underlyingObject().array();
                    int len = (int) bytes.readRemaining();
                    System.out.println("read " + new String(data, 0, 0, len));
                    pauser.reset();
                } else {
                    pauser.pause();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(final String[] args) {
        if ("producer".equals(args[0]))
            startProducer();
        else
            startConsumer();
    }
}

使用 MethodReader/MethodWriter

public class IpcTest {

    interface Hello {
        void hello(String text);
    }

    private static final String DIR = "chronicle-test";

    public static final void startProducer() {
        System.out.println("starting producer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        Hello hello = queue.methodWriter(Hello.class);

        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            hello.hello("data" + i);
            Jvm.pause(1);
        }
    }

    public static final void startConsumer() {
        System.out.println("starting consumer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        Hello hello = text -> System.out.println("read " + text);
        MethodReader reader = queue.createTailer().methodReader(hello);
        Pauser pauser = Pauser.balanced();
        while (true) {
            if (reader.readOne()) {
                pauser.reset();
            } else {
                pauser.pause();
            }
        }
    }

    public static void main(final String[] args) {
        if ("producer".equals(args[0]))
            startProducer();
        else
            startConsumer();
    }
}

您可以将 DTO 与 isAbstractMarshallable一起使用,以提高序列化和反序列化的效率。

package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.MethodReader;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.AbstractMarshallable;

public class IpcTest {

    static class Hi extends AbstractMarshallable {
        String text;
        int value;
    }

    interface Hello {
        void hi(Hi hi);
    }

    private static final String DIR = "chronicle-test";

    public static final void startProducer() {
        System.out.println("starting producer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        Hello hello = queue.methodWriter(Hello.class);
        Hi hi = new Hi();
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            hi.text = "data";
            hi.value = i;
            hello.hi(hi);
            Jvm.pause(1);
        }
    }

    public static final void startConsumer() {
        System.out.println("starting consumer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        Hello hello = text -> System.out.println("read " + text);
        MethodReader reader = queue.createTailer().methodReader(hello);
        Pauser pauser = Pauser.balanced();
        while (true) {
            if (reader.readOne()) {
                pauser.reset();
            } else {
                pauser.pause();
            }
        }
    }

    public static void main(final String[] args) {
        ClassAliasPool.CLASS_ALIASES.addAlias(Hi.class);
        if ("producer".equals(args[0]))
            startProducer();
        else
            startConsumer();
    }
}

在这种情况下,消费者打印

....
read !Hi {
  text: data,
  value: 3862
}

read !Hi {
  text: data,
  value: 3863
}

read !Hi {
  text: data,
  value: 3864
}
....

推荐阅读