首页 > 解决方案 > 如何在java中使用多线程查询所有50ms的数据

问题描述

我将在每次查询 50 毫秒内从机器查询数据。我有一张带有 id 的地图供查询。对于一个 id,我创建一个线程。最后我有500个线程。

for (Map.Entry<String, NodeId[]> entry1 : nodeIds.entrySet()) {
    try {
        NodeId[] entrNodeIds = entry1.getValue();

        new Thread() {
            public void run() {
                new ReadRegisteredValues().getValue(entry1.getKey(), entrNodeIds[0], client);
            };
        }.start();
    } catch (Exception e) {
        fileLogger.error("failed read node: " + entry1.getKey());
    }
}

在此之后,我每 50 毫秒读取一次每个线程while(true)的值,如下所示:

public void getValue(String nodeName, NodeId registeredNodeId, UaClient client) {

    while (true) {
        try {
            DataValue value = null;
            try {
                value = client.readAttribute(registeredNodeId, Attributes.Value);
            } catch (ServiceException e1) {
                e1.printStackTrace();
            } catch (StatusException e1) {
                e1.printStackTrace();
            }

            TeocModel curTeocModel = new TeocModel(value.getSourceTimestamp().getTimeInMillis(), nodeName, value);
            KafkaStreamer.getInstance().startStreaming(curTeocModel.toJson());

            Thread.sleep(50);
        } catch (InterruptedException e) {
            fileLogger.error(e.getMessage());
        }
    }
}

但在这种情况下,我读取的值不是平行的。这是并行查询数据的最佳方法吗?

任何想法?

标签: javamultithreadingparallel-processing

解决方案


您可以使用 ScheduledExecutorService 为您进行调度。

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(parallelism);
for (Map.Entry<String, NodeId[]> entry1 : nodeIds.entrySet()) {
    Runnable checkNode = () -> getValue(entry1.getKey(), entrNodeIds[0], client);
    scheduler.scheduleAtFixedRate(checkNode, 50, 50, ChronoUnit.MILLIS);
}

删除while循环和Thread.sleep()你的getValuefor this。

public void getValue(String nodeName, NodeId registeredNodeId, UaClient client) {

    try {
        DataValue value = null;
        try {
            value = client.readAttribute(registeredNodeId, Attributes.Value);
        } catch (ServiceException | StatusException e1) {
            e1.printStackTrace();
        }

        TeocModel curTeocModel = new TeocModel(value.getSourceTimestamp().getTimeInMillis(), nodeName, value);
        KafkaStreamer.getInstance().startStreaming(curTeocModel.toJson());

    } catch (Exception e) {
        fileLogger.error(e.getMessage());
    }
}

现在更新将从其中一个线程每 50 毫秒执行一次。请注意,如果更新时间过长且更新过多,则执行队列将永远增长,您最终会遇到某种资源问题。


推荐阅读