首页 > 解决方案 > rxJava 有序(按键)任务执行

问题描述

我有一堆代表一些数据的对象。这些对象可以写入到它们对应的文件中。用户可以要求比以前写入文件的更改更快地进行一些更改。

比如说,我对文件 A、文件 B 和文件 C 进行了更改并提交它们以供执行。然后,在编写它们的同时,我对文件 A 进行更改并发布。例如,有 3 个线程在运行。一旦执行了对 A、B 和 C 的第一次更改(写入文件),对 A 的第一次和第二次更改将几乎同时执行。但是,我希望在第一个更改完成后应用第二个更改。

我怎样才能在 rxJava 中做到这一点?

还有一点。在不同的地方,我想使用最新的更改运行操作。一种选择是等到所有任务完成。

是否有合适的 RxJava 原语/方法有望涵盖这两个用例?

我是 RxJava 的新手,但我希望这是有道理的。Subjects在我看来是相关的,但会有数百个文件。

我已经使用 custom 实现了Executor

public class OrderingExecutor
implements Executor
{
    @Delegate
    private final Executor delegate;
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<>();

    public OrderingExecutor(
        Executor delegate)
    {
        this.delegate = delegate;
    }

    public void execute(
        Runnable task,
        Object key)
    {
        Objects.requireNonNull(key);

        boolean first;
        Runnable wrappedTask;
        synchronized (keyedTasks)
        {
            Queue<Runnable> dependencyQueue = keyedTasks.get(key);
            first = (dependencyQueue == null);
            if (dependencyQueue == null)
            {
                dependencyQueue = new LinkedList<>();
                keyedTasks.put(key, dependencyQueue);
            }

            wrappedTask = wrap(task, dependencyQueue, key);
            if (!first)
            {
                dependencyQueue.add(wrappedTask);
            }
        }

        // execute method can block, call it outside synchronize block
        if (first)
        {
            delegate.execute(wrappedTask);
        }

    }

    private Runnable wrap(
        Runnable task,
        Queue<Runnable> dependencyQueue,
        Object key)
    {
        return new OrderedTask(task, dependencyQueue, key);
    }

    class OrderedTask
    implements Runnable
    {

        private final Queue<Runnable> dependencyQueue;
        private final Runnable task;
        private final Object key;

        public OrderedTask(
            Runnable task,
            Queue<Runnable> dependencyQueue,
            Object key)
        {
            this.task = task;
            this.dependencyQueue = dependencyQueue;
            this.key = key;
        }

        @Override
        public void run()
        {
            try
            {
                task.run();
            }
            finally
            {
                Runnable nextTask = null;
                synchronized (keyedTasks)
                {
                    if (dependencyQueue.isEmpty())
                    {
                        keyedTasks.remove(key);
                    }
                    else
                    {
                        nextTask = dependencyQueue.poll();
                    }
                }
                if (nextTask != null)
                {
                    delegate.execute(nextTask);
                }
            }
        }
    }
}

也许一些明智的方式将它插入 rxJava?

标签: javaconcurrencyrx-java

解决方案


目前尚不完全清楚您在这里尝试实现的目标,但您可以在 RxJava 之上放置一个优先级队列。

class OrderedTask implements Comparable<OrderedTask> { ... }

PriorityBlockingQueue<OrderedTask> queue = new PriorityBlockingQueue<>();

PublishSubject<Integer> trigger = PublishSubject.create();

trigger.flatMap(v -> {
   OrderedTask t = queue.poll();
   return someAPI.workWith(t);
}, 1)
.subscribe(result -> { }, error -> { });

queue.offer(new SomeOrderedTask(1));
trigger.onNext(1);

queue.offer(new SomeOrderedTask(2));
trigger.onNext(2);

推荐阅读