首页 > 解决方案 > 用于并行处理的优先级队列(堆)实现

问题描述

给定“n”个线程和“m”个不同时间完成的作业。如果有空闲线程,它会立即从列表中获取下一个作业。如果一个线程已经开始处理一个作业,它不会中断或停止,直到它完成处理该作业。如果多个线程尝试同时从列表中获取作业,则具有较小索引的线程将获取该作业。

为每个作业确定哪个线程将处理它以及何时开始处理。

输入 :

2 5 # n m
1 2 3 4 5 #  integers  — the times in seconds it takes any thread to process the -the job.

输出 :

0 0  # worker starting_time
1 0
0 1
1 2
0 4

我的代码:

from collections import namedtuple

AssignedJob = namedtuple("AssignedJob", ["worker", "started_at"])

def heapify(arr, i=0):# MIN HEAP

  #  0 --> time, 1 --> worker

  smallest = i
  l = 2*i + 1 # left child of node i
  r = 2*i + 2 # right child of node i

  if l < len(arr)  and arr[l][0] < arr[smallest][0]:
    smallest = l

  if r < len(arr)  and arr[r][0] < arr[smallest][0]:
    smallest = r
  
  if smallest != i:

    arr[i], arr[smallest] = arr[smallest], arr[i]
    heapify(arr, smallest)

  return arr[0][1] # worker whose time is lowest (top of min-heap)


def assign_jobs(n_workers, jobs):

    result = []
    next_free_time = [0] * n_workers

    for job in jobs:

        zip_time_workers = list(zip(next_free_time, [i for i in range(n_workers)]))
        #list of tuples --> (time, worker)

        next_worker = heapify(zip_time_workers)
        # get worker with highest priority

        result.append(AssignedJob(next_worker, next_free_time[next_worker]))
        next_free_time[next_worker] += job

    return result


def main():
    n_workers, n_jobs = map(int, input().split())
    jobs = list(map(int, input().split()))
    assert len(jobs) == n_jobs

    assigned_jobs = assign_jobs(n_workers, jobs)

    for job in assigned_jobs:
        print(job.worker, job.started_at)


if __name__ == "__main__":
    main()

它通过了上面提到的示例案例,但在其他案例中失败了。

失败的测试用例输入:

4 20
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1

预期输出:

0 0
1 0
2 0
3 0
0 1
1 1
2 1
3 1
0 2
1 2
2 2
3 2
0 3
1 3
2 3
3 3
0 4
1 4
2 4
3 4

我究竟做错了什么?也欢迎更好的实施,谢谢!

编辑:不能将 python 的预定义函数用于优先级队列(需要底层逻辑/方法)

标签: pythonalgorithmdata-structurespriority-queuebinary-heap

解决方案


推荐阅读