首页 > 解决方案 > 线程不使用python中的所有核心

问题描述

我最近遇到了在多个线程/内核上运行某些东西的问题。我的设置:(带有 GIL 的 Cpython)python 3.6.3(anaconda)操作系统:windows 10 CPU:i7 8700(6 核/12 线程) GPU:1080、1070 tensorflow==1.8.0 tensorflow-gpu==1.8.0 keras ==2.1.5

而且肯定没有瓶颈。RAM 使用 6/24 GB 磁盘使用:0%

问题是,根据任务管理器,线程模块似乎只使用了我一半的内核/线程,这显示 CPU 负载仅为 50% 而不是 100。

这是我的代码

class Environment(Thread):
    stop_signal = False

    def __init__(self, testing=False, eps_start=EPS_START, eps_end=EPS_STOP, eps_steps=EPS_STEPS):
        Thread.__init__(self)
        self.testing = testing
        self.env = Market(1000, train_data, testing=testing)

        self.agent = Agent(eps_start, eps_end, eps_steps)

    def runEpisode(self):
        s = self.env.reset()

        R = 0
        done = False

        while not done:         
            time.sleep(THREAD_DELAY) # yield 

            a = self.agent.act(s)
            s_, r, done, info = self.env.step(a)

            if done: # terminal state
                s_ = None

            self.agent.train(s, a, r, s_)

            s = s_
            R += r

        print("Total reward:", R)

    def run(self):
        while not self.stop_signal:
            self.runEpisode()
            if self.testing: break

    def stop(self):
        self.stop_signal = True

class Optimizer(Thread):
    stop_signal = False

    def __init__(self):
        Thread.__init__(self)

    def run(self):
        while not self.stop_signal:
            brain.optimize()

    def stop(self):
        self.stop_signal = True


if __name__ == '__main__':

    #-- main
    env_test = Environment(testing=True, eps_start=0., eps_end=0.)
    NUM_STATE = env_test.env.observation_space.shape[0]
    NUM_ACTIONS = env_test.env.action_space.n
    NONE_STATE = np.zeros(NUM_STATE)

    brain = Brain() # brain is global in A3C

    envs = [Environment() for i in range(THREADS)]
    opts = [Optimizer() for i in range(OPTIMIZERS)]

    start_time = time.time()

    for o in opts:
        o.start()

    for e in envs:
        e.start()

    time.sleep(RUN_TIME)

    for e in envs:
        e.stop()

    for e in envs:
        e.join()

    for o in opts:
        o.stop()

    for o in opts:
        o.join()

    print("Training finished in ", time.time() - start_time)

    brain.model.save('dense.h5')

标签: pythonmultithreadingtensorflowkeras

解决方案


原来解决方案是使用多处理模块。但我在整合它时遇到了问题。我收到 BrokenPipeError: [Errno 32] Broken pipe。我认为它在某种程度上与这条线大脑 = Brain() 相关,因为大脑是一个全局变量

class Environment():
    stop_signal = False

    def __init__(self, testing=False, data=train_data, eps_start=EPS_START, eps_end=EPS_STOP, eps_steps=EPS_STEPS):
        self.testing = testing
        self.env = Market(1000, data, testing=testing)
        self.agent = Agent(eps_start, eps_end, eps_steps)

    def runEpisode(self):
        s = self.env.reset()
        done = False

        while True:         
            time.sleep(THREAD_DELAY) # yield 

            a = self.agent.act(s)
            s_, r, done, info = self.env.step(a)

            if done: # terminal state
                s_ = None

            self.agent.train(s, a, r, s_)

            s = s_

            if done or self.stop_signal:
                break

        print("Total reward:", self.env.total)



    def run(self):
        while not self.stop_signal:
            self.runEpisode()
            if self.testing: break

    def stop(self):
        self.stop_signal = True

class Optimizer():
    stop_signal = False

    def run(self):
        while not self.stop_signal:
            brain.optimize()

    def stop(self):
        self.stop_signal = True

np.seterr(all='raise')

env_test = Environment(testing=True, data=test_data, eps_start=0., eps_end=0.)
NUM_STATE = env_test.env.observation_space.shape[0]
NUM_ACTIONS = env_test.env.action_space.n
NONE_STATE = np.zeros(NUM_STATE)

brain = Brain(True) # brain is global in A3C

envs = [Environment() for i in range(THREADS)]
opts = [Optimizer() for i in range(OPTIMIZERS)]

if __name__ == '__main__':
    start = time.time()
    env_threads, opt_threads = [], []

    for env in envs:
        p = Process(target=env.run)
        p.start()
        env_threads.append(p)

    for opt in opts:
        p = Process(target=env.run)
        p.start()
        opt_threads.append(p)

    time.sleep(RUN_TIME)

    for p in env_threads:
        p.stop()

    for p in env_threads:
        p.join()

    for p in opt_threads:
        p.stop()

    for p in opt_threads:
        p.join()

    print('finished in ', time.time() - start)

brain.model.save('dense.h5')

推荐阅读