python - 线程不使用python中的所有核心
问题描述
我最近遇到了在多个线程/内核上运行某些东西的问题。我的设置:(带有 GIL 的 Cpython)python 3.6.3(anaconda)操作系统:windows 10 CPU:i7 8700(6 核/12 线程) GPU:1080、1070 tensorflow==1.8.0 tensorflow-gpu==1.8.0 keras ==2.1.5
而且肯定没有瓶颈。RAM 使用 6/24 GB 磁盘使用:0%
问题是,根据任务管理器,线程模块似乎只使用了我一半的内核/线程,这显示 CPU 负载仅为 50% 而不是 100。
这是我的代码
class Environment(Thread):
stop_signal = False
def __init__(self, testing=False, eps_start=EPS_START, eps_end=EPS_STOP, eps_steps=EPS_STEPS):
Thread.__init__(self)
self.testing = testing
self.env = Market(1000, train_data, testing=testing)
self.agent = Agent(eps_start, eps_end, eps_steps)
def runEpisode(self):
s = self.env.reset()
R = 0
done = False
while not done:
time.sleep(THREAD_DELAY) # yield
a = self.agent.act(s)
s_, r, done, info = self.env.step(a)
if done: # terminal state
s_ = None
self.agent.train(s, a, r, s_)
s = s_
R += r
print("Total reward:", R)
def run(self):
while not self.stop_signal:
self.runEpisode()
if self.testing: break
def stop(self):
self.stop_signal = True
class Optimizer(Thread):
stop_signal = False
def __init__(self):
Thread.__init__(self)
def run(self):
while not self.stop_signal:
brain.optimize()
def stop(self):
self.stop_signal = True
if __name__ == '__main__':
#-- main
env_test = Environment(testing=True, eps_start=0., eps_end=0.)
NUM_STATE = env_test.env.observation_space.shape[0]
NUM_ACTIONS = env_test.env.action_space.n
NONE_STATE = np.zeros(NUM_STATE)
brain = Brain() # brain is global in A3C
envs = [Environment() for i in range(THREADS)]
opts = [Optimizer() for i in range(OPTIMIZERS)]
start_time = time.time()
for o in opts:
o.start()
for e in envs:
e.start()
time.sleep(RUN_TIME)
for e in envs:
e.stop()
for e in envs:
e.join()
for o in opts:
o.stop()
for o in opts:
o.join()
print("Training finished in ", time.time() - start_time)
brain.model.save('dense.h5')
解决方案
原来解决方案是使用多处理模块。但我在整合它时遇到了问题。我收到 BrokenPipeError: [Errno 32] Broken pipe。我认为它在某种程度上与这条线大脑 = Brain() 相关,因为大脑是一个全局变量
class Environment():
stop_signal = False
def __init__(self, testing=False, data=train_data, eps_start=EPS_START, eps_end=EPS_STOP, eps_steps=EPS_STEPS):
self.testing = testing
self.env = Market(1000, data, testing=testing)
self.agent = Agent(eps_start, eps_end, eps_steps)
def runEpisode(self):
s = self.env.reset()
done = False
while True:
time.sleep(THREAD_DELAY) # yield
a = self.agent.act(s)
s_, r, done, info = self.env.step(a)
if done: # terminal state
s_ = None
self.agent.train(s, a, r, s_)
s = s_
if done or self.stop_signal:
break
print("Total reward:", self.env.total)
def run(self):
while not self.stop_signal:
self.runEpisode()
if self.testing: break
def stop(self):
self.stop_signal = True
class Optimizer():
stop_signal = False
def run(self):
while not self.stop_signal:
brain.optimize()
def stop(self):
self.stop_signal = True
np.seterr(all='raise')
env_test = Environment(testing=True, data=test_data, eps_start=0., eps_end=0.)
NUM_STATE = env_test.env.observation_space.shape[0]
NUM_ACTIONS = env_test.env.action_space.n
NONE_STATE = np.zeros(NUM_STATE)
brain = Brain(True) # brain is global in A3C
envs = [Environment() for i in range(THREADS)]
opts = [Optimizer() for i in range(OPTIMIZERS)]
if __name__ == '__main__':
start = time.time()
env_threads, opt_threads = [], []
for env in envs:
p = Process(target=env.run)
p.start()
env_threads.append(p)
for opt in opts:
p = Process(target=env.run)
p.start()
opt_threads.append(p)
time.sleep(RUN_TIME)
for p in env_threads:
p.stop()
for p in env_threads:
p.join()
for p in opt_threads:
p.stop()
for p in opt_threads:
p.join()
print('finished in ', time.time() - start)
brain.model.save('dense.h5')
推荐阅读
- typescript - remove null or undefined from properties of a type
- python - 熊猫:在每列上应用 lambda 时使用行名
- android - Angular Material 单选按钮在 Android 4.4 上不起作用
- csv - CSV 阅读器工作,CSV 写入器出现问题
- javascript - Angular http client observable pipe map
- r - Power BI - R Script Visual - Apriori
- c++ - std::sort() function not able to sort a part of a vector
- postgresql - 强制 Postgresql 使用 Merge Append
- r - 在r中的几个目录中读取多个csv文件
- java - 应用程序未在 Tomcat 9.0.12 中启动