首页 > 解决方案 > DDS 阅读器不丢弃消息

问题描述

我正在学习使用 RTI 的 DDS(对于这个主题仍然很新)。我正在创建一个写入订阅者的发布者,订阅者输出消息。我想模拟的一件事是丢包。例如,假设发布者每秒向订阅者写入 4 次,但订阅者每秒只能读取一次(最近的消息)。

到目前为止,我可以创建一个发布者和订阅者,而不会删除任何包。

我通读了一些文档并找到了 HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS。

如果我错了,请纠正我,但我的印象是这基本上会保留从发布者收到的最新消息。相反,订阅者正在接收所有消息,但延迟了 1 秒。

我不想缓存消息但删除消息。如何模拟“丢弃”的包?

顺便说一句:我不想更改 .xml 文件中的任何内容。我想以编程方式进行。

这是我的一些代码片段。

//Publisher.java

            //writer = (MsgDataWriter)publisher.create_datawriter(topic, Publisher.DATAWRITER_QOS_DEFAULT,null /* listener */, StatusKind.STATUS_MASK_NONE);
            writer = (MsgDataWriter)publisher.create_datawriter(topic, write, null,        
            StatusKind.STATUS_MASK_ALL);
            if (writer == null) {
                System.err.println("create_datawriter error\n");
                return;
            }           

            // --- Write --- //
            String[] messages= {"1", "2", "test", "3"};

            /* Create data sample for writing */

            Msg instance = new Msg();


            InstanceHandle_t instance_handle = InstanceHandle_t.HANDLE_NIL;
            /* For a data type that has a key, if the same instance is going to be
            written multiple times, initialize the key here
            and register the keyed instance prior to writing */
            //instance_handle = writer.register_instance(instance);

            final long sendPeriodMillis = (long) (.25 * 1000); // 4 per second

            for (int count = 0;
            (sampleCount == 0) || (count < sampleCount);
            ++count) {
                if (count == 11)
                {
                    return;
                }
                System.out.println("Writing Msg, count " + count);

                /* Modify the instance to be written here */
                instance.message =words[count];
                instance.sender = "some user";
                /* Write data */
                writer.write(instance, instance_handle);
                try {
                    Thread.sleep(sendPeriodMillis);
                } catch (InterruptedException ix) {
                    System.err.println("INTERRUPTED");
                    break;
                }
            }

            //writer.unregister_instance(instance, instance_handle);

        } finally {

            // --- Shutdown --- //

            if(participant != null) {
                participant.delete_contained_entities();

                DomainParticipantFactory.TheParticipantFactory.
                delete_participant(participant);
            }

//Subscriber
// Customize time & Qos for receiving info 
            DataReaderQos readerQ = new DataReaderQos();
            subscriber.get_default_datareader_qos(readerQ);
            Duration_t minTime = new Duration_t(1,0);
            readerQ.time_based_filter.minimum_separation.sec = minTime.sec;
            readerQ.time_based_filter.minimum_separation.nanosec = minTime.nanosec;

            readerQ.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;

            readerQ.reliability.kind = ReliabilityQosPolicyKind.BEST_EFFORT_RELIABILITY_QOS;

            reader = (MsgDataReader)subscriber.create_datareader(topic, readerQ, listener, StatusKind.STATUS_MASK_ALL);
            if (reader == null) {
                System.err.println("create_datareader error\n");
                return;
            }


            // --- Wait for data --- //

            final long receivePeriodSec = 1;

            for (int count = 0;
            (sampleCount == 0) || (count < sampleCount);
            ++count) {
                //System.out.println("Msg subscriber sleeping for "+ receivePeriodSec + " sec...");

                try {
                    Thread.sleep(receivePeriodSec * 1000);  // in millisec
                } catch (InterruptedException ix) {
                    System.err.println("INTERRUPTED");
                    break;
                }
            }
        } finally {

            // --- Shutdown --- //

标签: javamessagepublish-subscribedata-distribution-serviceqos

解决方案


在订阅者方面,区分应用程序和 DDS 域之间的三种不同类型的交互很有用:轮询、侦听器和 WaitSets

轮询意味着应用程序决定何时读取可用数据。这通常是一种时间驱动的机制。

侦听器基本上是回调函数,一旦数据可用,基础设施线程就会调用它来读取该数据。

WaitSet 实现了一种类似于套接字select机制的机制:应用程序线程等待(阻塞)数据变得可用,并在解除阻塞后读取新数据。

您的应用程序使用侦听器机制。您没有发布回调函数的实现,但从整体情况来看,监听器实现很可能会在调用回调的那一刻立即尝试读取数据。没有时间像您所说的那样将数据“推出”或“丢弃”。此读取发生在与您的主线程不同的线程中,主线程大部分时间都在休眠。您可以在此处找到有关它的知识库文章。

唯一不清楚的是time_based_filterQoS 设置的影响。您在问题中没有提到这一点,但它确实出现在代码中。我希望这能过滤掉你的一些样本。不过,这是与推出历史不同的机制。对于不同的 DDS 实现,可以不同地实现基于时间的过滤器的行为。您使用哪种产品?


推荐阅读