java - rxJava 有序(按键)任务执行
问题描述
我有一堆代表一些数据的对象。这些对象可以写入到它们对应的文件中。用户可以要求比以前写入文件的更改更快地进行一些更改。
比如说,我对文件 A、文件 B 和文件 C 进行了更改并提交它们以供执行。然后,在编写它们的同时,我对文件 A 进行更改并发布。例如,有 3 个线程在运行。一旦执行了对 A、B 和 C 的第一次更改(写入文件),对 A 的第一次和第二次更改将几乎同时执行。但是,我希望在第一个更改完成后应用第二个更改。
我怎样才能在 rxJava 中做到这一点?
还有一点。在不同的地方,我想使用最新的更改运行操作。一种选择是等到所有任务完成。
是否有合适的 RxJava 原语/方法有望涵盖这两个用例?
我是 RxJava 的新手,但我希望这是有道理的。Subjects
在我看来是相关的,但会有数百个文件。
我已经使用 custom 实现了Executor
。
public class OrderingExecutor
implements Executor
{
@Delegate
private final Executor delegate;
private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<>();
public OrderingExecutor(
Executor delegate)
{
this.delegate = delegate;
}
public void execute(
Runnable task,
Object key)
{
Objects.requireNonNull(key);
boolean first;
Runnable wrappedTask;
synchronized (keyedTasks)
{
Queue<Runnable> dependencyQueue = keyedTasks.get(key);
first = (dependencyQueue == null);
if (dependencyQueue == null)
{
dependencyQueue = new LinkedList<>();
keyedTasks.put(key, dependencyQueue);
}
wrappedTask = wrap(task, dependencyQueue, key);
if (!first)
{
dependencyQueue.add(wrappedTask);
}
}
// execute method can block, call it outside synchronize block
if (first)
{
delegate.execute(wrappedTask);
}
}
private Runnable wrap(
Runnable task,
Queue<Runnable> dependencyQueue,
Object key)
{
return new OrderedTask(task, dependencyQueue, key);
}
class OrderedTask
implements Runnable
{
private final Queue<Runnable> dependencyQueue;
private final Runnable task;
private final Object key;
public OrderedTask(
Runnable task,
Queue<Runnable> dependencyQueue,
Object key)
{
this.task = task;
this.dependencyQueue = dependencyQueue;
this.key = key;
}
@Override
public void run()
{
try
{
task.run();
}
finally
{
Runnable nextTask = null;
synchronized (keyedTasks)
{
if (dependencyQueue.isEmpty())
{
keyedTasks.remove(key);
}
else
{
nextTask = dependencyQueue.poll();
}
}
if (nextTask != null)
{
delegate.execute(nextTask);
}
}
}
}
}
也许一些明智的方式将它插入 rxJava?
解决方案
目前尚不完全清楚您在这里尝试实现的目标,但您可以在 RxJava 之上放置一个优先级队列。
class OrderedTask implements Comparable<OrderedTask> { ... }
PriorityBlockingQueue<OrderedTask> queue = new PriorityBlockingQueue<>();
PublishSubject<Integer> trigger = PublishSubject.create();
trigger.flatMap(v -> {
OrderedTask t = queue.poll();
return someAPI.workWith(t);
}, 1)
.subscribe(result -> { }, error -> { });
queue.offer(new SomeOrderedTask(1));
trigger.onNext(1);
queue.offer(new SomeOrderedTask(2));
trigger.onNext(2);
推荐阅读
- z3 - 如何从可满足的公式中恢复估值,关于模型的问题
- python - 在 python 中使用 winreg 打开注册表项时如何避免权限错误?
- javascript - 如何在Vue的A组件中预加载B组件的图像?
- php - Ajax post 值返回空
- verilog - 系统verilog开关不改变
- c# - 当我从另一个类调用数组时,数组不会更新
- bash - 类型 .git\HEAD 在 Windows 10 的 Git Bash 中不起作用
- c# - 用户在数组中删除指定的字符串?
- r - R:更新 R 或 RStudio 后更新或安装包的许多问题
- sql-server - 在单行结果集中返回多行