java - 及时测量数量[Java,多线程]
问题描述
及时测量数量的最佳做法是什么?我有一个多线程应用程序。线程可以是任意数字。我想每秒执行 N 次操作。我尝试了几种技术,但仍然没有 100% 成功。这是一个片段,您可能会更清楚地看到问题。
更清楚地说,我想在一秒钟(1000 毫秒)内发送最多 100 条消息。例如,如果这些线程能够在 450 毫秒内完成,那么我想强制所有线程等待 550 毫秒,然后一次又一次地执行相同的操作。我从线程中调用这个 speedLimitMetter.getWaitTime()。如果它给出 X > 0,那么我强制线程等待 X 毫秒。
任何提示都会有所帮助
public class SpeedLimitMeter {
private int speedLimit;
private volatile int messageCounter = 0;
private volatile long firstTime = 0;
private volatile long lastTime = 0;
private volatile long waitTime = 0;
private volatile long waitUntil = 0;
public SpeedLimitMeter(int speedLimit) {
this.speedLimit = speedLimit;
}
public synchronized long getWaitTime() {
long currTime = System.currentTimeMillis();
if (messageCounter == speedLimit) {
if (waitTime == 0) {
long elapedTime = currTime - firstTime;
if (elapedTime < 1000) {
waitTime = 1000 - elapedTime;
waitUntil = currTime + waitTime;
return waitTime;
}
reset();
} else if (currTime < waitUntil) {
return waitTime;
} else {
reset();
}
}
if (messageCounter == 0) firstTime = currTime;
lastTime = currTime;
messageCounter++;
return 0;
}
private synchronized void reset() {
firstTime = 0;
lastTime = 0;
waitTime = 0;
waitUntil = 0;
messageCounter = 0;
}
}
解决方案
我建议查看 ScheduledThreadPoolExecutor ( https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html ) 提供的功能,因为无论您尝试做什么,它可能是可以解决的,以适当的方式。
ScheduledThreadPoolExecutor 允许您安排作业的定期执行。例如,当(且仅当)信号量耗尽时,您可以使用它来释放信号量。这样,您尝试调节的线程可以获取租约,而不是请求等待时间。这种方式更干净(恕我直言)。
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class ThreadLeaser {
private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
private final Semaphore semaphore;
private final long nanosecondsPause;
public ThreadLeaser(float leasesPerSecond, boolean fair) {
this.nanosecondsPause = Math.round(1_000_000_000f / leasesPerSecond);
this.semaphore = new Semaphore(0, fair);
Runnable semRelease = () -> {
if (this.semaphore.availablePermits() == 0) {
this.semaphore.release();
}
};
executor.scheduleAtFixedRate(semRelease, 0, nanosecondsPause, TimeUnit.NANOSECONDS);
}
public void drawNextAvailableLease() throws InterruptedException {
semaphore.acquire();
}
}
请注意,此解决方案并不完全精确,因为如果一个线程刚刚获得租约,然后立即释放,那么下一个线程可能会立即获得。如果线程尝试足够有规律地获取,这并不能保证“时间距离”,而是保证某种恒定的频率。
此外,这件事需要一些充实(一种适当的终止方式等等),但我把这个留给你。
这个测试用例显示了行为(以及大致的精度),对于大量等待线程,按顺序启动(中间有一小段等待时间)。
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import thread.ThreadLeaser;
public class ThreadLeaseTester {
@Test
public void test_tpl() {
final ThreadLeaser leaser = new ThreadLeaser(5f, true);
List<Thread> toJoin = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 50; ++i) {
final int d = i;
try {
Thread.sleep(5);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
Thread thread = new Thread() {
public void run() {
try {
leaser.drawNextAvailableLease();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread: " + d + " Time: " + ((System.nanoTime() - start) / 1_000_000) + "ms");
}
};
thread.start();
toJoin.add(thread);
}
toJoin.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
assertTrue(toJoin.size() == 100);
}
}
推荐阅读
- php - 根据表中列的计数对表进行颜色编码
- docker - Dockerfile Maven 值
- python - 创建无限字典。这是可能的?
- python - 如何以编程方式下载 Python 中 blob 中引用的 m3u8 视频?
- jquery - Shopify 购物车 API - 将产品添加到购物车而无需变体
- java - 从 Cucumber 中的 StepDefinition 获取注释
- python - Python-Web抓取页面
- python - 如何在 django Views.py 中过滤与外键模型相关的对象
- javascript - React Native Animated 不会在 FlatList 中触发 onScroll 事件
- javascript - 试图停止更新面板内的繁忙指示器