首页 > 解决方案 > 及时测量数量[Java,多线程]

问题描述

及时测量数量的最佳做法是什么?我有一个多线程应用程序。线程可以是任意数字。我想每秒执行 N 次操作。我尝试了几种技术,但仍然没有 100% 成功。这是一个片段,您可能会更清楚地看到问题。

更清楚地说,我想在一秒钟(1000 毫秒)内发送最多 100 条消息。例如,如果这些线程能够在 450 毫秒内完成,那么我想强制所有线程等待 550 毫秒,然后一次又一次地执行相同的操作。我从线程中调用这个 speedLimitMetter.getWaitTime()。如果它给出 X > 0,那么我强制线程等待 X 毫秒。

任何提示都会有所帮助

public class SpeedLimitMeter {
    private int speedLimit;

    private volatile int messageCounter = 0;

    private volatile long firstTime = 0;
    private volatile long lastTime = 0;

    private volatile long waitTime = 0;
    private volatile long waitUntil = 0;

    public SpeedLimitMeter(int speedLimit) {
        this.speedLimit = speedLimit;
    }

    public synchronized long getWaitTime() {
        long currTime = System.currentTimeMillis();

        if (messageCounter == speedLimit) {
            if (waitTime == 0) {
                long elapedTime = currTime - firstTime;
                if (elapedTime < 1000) {
                    waitTime = 1000 - elapedTime;
                    waitUntil = currTime + waitTime;

                    return waitTime;
                }

                reset();
            } else if (currTime < waitUntil) {
                return waitTime;
            } else {
                reset();
            }

        }

        if (messageCounter == 0) firstTime = currTime;
        lastTime = currTime;

        messageCounter++;

        return 0;
    }

    private synchronized void reset() {
        firstTime = 0;
        lastTime = 0;
        waitTime = 0;
        waitUntil = 0;
        messageCounter = 0;
    }

}

标签: javamultithreading

解决方案


我建议查看 ScheduledThreadPoolExecutor ( https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html ) 提供的功能,因为无论您尝试做什么,它可能是可以解决的,以适当的方式。

ScheduledThreadPoolExecutor 允许您安排作业的定期执行。例如,当(且仅当)信号量耗尽时,您可以使用它来释放信号量。这样,您尝试调节的线程可以获取租约,而不是请求等待时间。这种方式更干净(恕我直言)。

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class ThreadLeaser {
    
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    private final Semaphore semaphore;
    private final long nanosecondsPause;
    
    public ThreadLeaser(float leasesPerSecond, boolean fair) {
        this.nanosecondsPause = Math.round(1_000_000_000f / leasesPerSecond);
        this.semaphore = new Semaphore(0, fair);
        
        Runnable semRelease = () -> {
            if (this.semaphore.availablePermits() == 0) {
                this.semaphore.release();
            }
        };
        executor.scheduleAtFixedRate(semRelease, 0, nanosecondsPause, TimeUnit.NANOSECONDS);
    }
    
    public void drawNextAvailableLease() throws InterruptedException {
        semaphore.acquire();
    }
}

请注意,此解决方案并不完全精确,因为如果一个线程刚刚获得租约,然后立即释放,那么下一个线程可能会立即获得。如果线程尝试足够有规律地获取,这并不能保证“时间距离”,而是保证某种恒定的频率。

此外,这件事需要一些充实(一种适当的终止方式等等),但我把这个留给你。

这个测试用例显示了行为(以及大致的精度),对于大量等待线程,按顺序启动(中间有一小段等待时间)。

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import thread.ThreadLeaser;

public class ThreadLeaseTester {
    @Test
    public void test_tpl() {
        final ThreadLeaser leaser = new ThreadLeaser(5f, true);
        
        List<Thread> toJoin = new ArrayList<>();
        
        long start = System.nanoTime();
        
        for (int i = 0; i < 50; ++i) {
            final int d = i;
            
            try {
                Thread.sleep(5);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            
            Thread thread = new Thread() {
                public void run() {
                    try {
                        leaser.drawNextAvailableLease();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    System.out.println("Thread: " + d + " Time: " + ((System.nanoTime() - start) / 1_000_000) + "ms");
                }
            };
            thread.start();
            toJoin.add(thread);
        }
        
        toJoin.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        assertTrue(toJoin.size() == 100);
    }
}


推荐阅读