首页 > 技术文章 > cartographer_common_task_thread_pool

heimazaifei 2020-03-07 17:51 原文

Task 和 Thread_pool


class Task 任务类

class Thread_pool 线程池

线程池:一定数量的线程集合。 用于执行task(任务,可以简单理解为函数)。执行过程中,task 被插入任务队列task_queue,线程池根据插入顺序依次执行。task之间可能有依赖关系,如task_b依赖于task_a。在,task依赖没执行完时,task不能执行,因此task有新建、调配、依赖执行完成、执行、执行完成等多个状态。包括:

NEW:新建任务,还未schedule到线程池。

DISPATCHED: 任务已经schedule 到线程池。

DEPENDENCIES_COMPLETED: 任务依赖已经执行完成。

RUNNING: 任务执行中。

COMPLETED: 任务完成。

对任一个任务的状态转换顺序为:

NEW->DISPATCHED->DEPENDENCIES_COMPLETED->RUNNING->COMPLETED


Task 和 Thread_pool 关系

  • 新的Task 通过Thread_pool -> Schedule 部署到 线程池Thread_pool 的 tasks_not_ready_队列中。 当该Task没有依赖,直接插入task_queue,准备执行,否则,等待DEPENDENCIES_COMPLETED

  • Thread_pool 通过固定数量的thread 与 task_queue(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。

  • 在执行Task过程中,tasks_not_ready_中的Task状态不断变化,一旦变为DEPENDENCIES_COMPLETED就插入到task_queue中。最终所有Task都会插入task_queue中,得到执行。

  • 整个Task状态变化过程 NEW->DISPATCHED->DEPENDENCIES_COMPLETED->RUNNING->COMPLETED

    ​ 当变为DEPENDENCIES_COMPLETED,即从tasks_not_ready_转移到task_queue,准备执行。


class Task {
 public:
  friend class ThreadPoolInterface;

  State GetState() LOCKS_EXCLUDED(mutex_);     //返回本Task当前状态 
  void SetWorkItem(const WorkItem& work_item); //设置Task 执行的任务 (函数)
  // 给当前任务添加 依赖任务,如当前任务为b,添加依赖任务为a(a——>b: b.AddDependency(a))
  // 同时并把当前任务b,加入到依赖任务a的dependent_tasks_列表中,以便执行a后,对应更改b的状态)。
  void AddDependency(std::weak_ptr<Task> dependency) LOCKS_EXCLUDED(mutex_);

 private:

  // AddDependency功能具体实现函数
  // 添加依赖本Task的Task,如b依赖a,则a-->b, a.AddDependentTask(b), 根据a的状态,改变b的状态
  // 如果a完成,则b的依赖-1;(并把当前任务b,加入到依赖任务a的dependent_tasks_列表中,以便执行a后,对应更改b的状态)。
  void AddDependentTask(Task* dependent_task);

  // 执行当前任务,比如当前任务为a,并依此更新依赖a的任务dependent_tasks_中所有任务状态,如依赖a的b。
  void Execute() LOCKS_EXCLUDED(mutex_);

  // 当前任务进入线程待执行队列
  void SetThreadPool(ThreadPoolInterface* thread_pool) LOCKS_EXCLUDED(mutex_);

  // 当前任务的依赖任务完成时候,当前任务状态随之改变
  void OnDependenyCompleted();

  WorkItem work_item_ ;// 任务具体执行过程
  ThreadPoolInterface* thread_pool_to_notify_ = nullptr;// 执行当前任务的线程池
  State state_ GUARDED_BY(mutex_) = NEW; // 初始化状态为 NEW
  unsigned int uncompleted_dependencies_ GUARDED_BY(mutex_) = 0;  //当前任务依赖的任务的数量
  std::set<Task*> dependent_tasks_ GUARDED_BY(mutex_);// 依赖当前任务的任务列表
  absl::Mutex mutex_;
};


class ThreadPool : public ThreadPoolInterface {
 public:
  explicit ThreadPool(int num_threads);//初始化一个线程数量固定的线程池。

  // When the returned weak pointer is expired, 'task' has certainly completed,
  // so dependants no longer need to add it as a dependency.
  std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task) //添加想要ThreadPool执行的task, 
     // 插入tasks_not_ready_,如果任务满足执行要求,直接插入task_queue_准备执行
      LOCKS_EXCLUDED(mutex_) override;

 private:
  void DoWork();//每个线程初始化时,执行DoWork()函数. 与线程绑定
  void NotifyDependenciesCompleted(Task* task) LOCKS_EXCLUDED(mutex_) override;
  bool running_ GUARDED_BY(mutex_) = true;//running_只是一个监视哨,只有线程池在running_状态时,才能往work_queue_加入函数.
  std::vector<std::thread> pool_ GUARDED_BY(mutex_);    
  std::deque<std::shared_ptr<Task>> task_queue_ GUARDED_BY(mutex_);  // 准备执行的task
  absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_  //未准备好的 task,task可能有依赖还未完成
      GUARDED_BY(mutex_);
};

推荐阅读