tensorflow - 具有多处理功能的 Python 脚本挂在 for 循环中
问题描述
我在 anaconda3 python 3.6.5 env、windows 10、cpu == i7 8700、gpu == 1080 中使用 tensorflow-gpu==1.8.0 和 keras==2.1.6。而且我正在使用多处理模块paraalize 我的脚本。我在 run() 方法、类优化器和环境中有一个 for i in range(EPS) 循环。所以,问题是当我使用 EPS = 500 或任何数字运行脚本时,它工作正常。但是,如果我尝试在 EPS = 1000 或更高的情况下运行它,它只会在一开始就挂起,停止向 cmd 打印任何内容,并且 cpu 使用量进入空闲状态,但 RAM 使用量不会改变。而且我确信它在挂起之前达到了这个 for 循环,它甚至在挂起之前成功地循环了低谷。我尝试在不同的 Windows 10 机器上运行它,但并没有改变这种情况。我也尝试用纯 python 运行它,而不是 conda,也没有。for 循环如何使脚本挂起实际上真的很奇怪。我认为它可能与 Manager 类或死锁有关。
我尝试使用以下命令以 EPS = 7200 运行我的脚本:python -m trace --trace myscript.py 这就是我得到的(其中的一小部分):
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\Acer\AppData\Local\Programs\Python\Python36\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\Users\Acer\AppData\Local\Programs\Python\Python36\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Users\Acer\AppData\Local\Programs\Python\Python36\lib\multiprocessing\spawn.py", line 223, in prepare
_fixup_main_from_name(data['init_main_from_name'])
File
"C:\Users\Acer\AppData\Local\Programs\Python\Python36\lib\multiprocessing\spawn.py", line 249, in _fixup_main_from_name
alter_sys=True)
File "C:\Users\Acer\AppData\Local\Programs\Python\Python36\lib\runpy.py", line 205, in run_module
return _run_module_code(code, init_globals, run_name, mod_spec)
File "C:\Users\Acer\AppData\Local\Programs\Python\Python36\lib\runpy.py", line 93, in _run_module_code
with _TempModule(mod_name) as temp_module, _ModifiedArgv0(fname):
File "C:\Users\Acer\AppData\Local\Programs\Python\Python36\lib\runpy.py", line 55, in __enter__
sys.argv[0] = self.value
TypeError: 'tuple' object does not support item assignment
这是我的代码
class Agent:
def __init__(self, brain, eps_start, eps_end, eps_steps):
self.eps_start = eps_start
self.eps_end = eps_end
self.eps_steps = eps_steps
self.brain = brain
self.memory = [] # used for n_step return
self.R = 0.
def getEpsilon(self, frames):
if frames.value >= self.eps_steps:
return self.eps_end
else:
return self.eps_start + frames.value * (self.eps_end - self.eps_start) / self.eps_steps # linearly interpolate
def act(self, s, frames, lock):
eps = self.getEpsilon(frames)
lock.acquire()
frames.value = frames.value + 1
lock.release()
if random() < eps:
return np.random.randint(0, NUM_ACTIONS - 1)
else:
s = np.array([s])
p = self.brain.predict(s)[0][0]
# a = np.argmax(p)
a = np.random.choice(NUM_ACTIONS, p=p)
return a
def train(self, s, a, r, s_, lock):
def get_sample(memory, n):
s, a, _, _ = memory[0]
_, _, _, s_ = memory[n - 1]
return s, a, self.R, s_
a_cats = np.zeros(NUM_ACTIONS) # turn action into one-hot representation
a_cats[a] = 1
self.memory.append((s, a_cats, r, s_))
self.R = (self.R + r * GAMMA_N) / GAMMA
if s_ is None:
while len(self.memory) > 0:
n = len(self.memory)
s, a, r, s_ = get_sample(self.memory, n)
self.brain.train_push(s, a, r, s_, lock)
self.R = (self.R - self.memory[0][2]) / GAMMA
self.memory.pop(0)
self.R = 0
if len(self.memory) >= N_STEP_RETURN:
s, a, r, s_ = get_sample(self.memory, N_STEP_RETURN)
self.brain.train_push(s, a, r, s_, lock)
self.R = self.R - self.memory[0][2]
self.memory.pop(0)
class Environment:
def __init__(self, brain, data, testing=False, eps_start=EPS_START, eps_end=EPS_STOP,
eps_steps=EPS_STEPS):
self.testing = testing
self.env = Market(1000, data, testing=testing)
self.agent = Agent(brain, eps_start, eps_end, eps_steps)
def run_episode(self, lock, frames):
s = self.env.reset()
done = False
result = None
while not done:
# time.sleep(THREAD_DELAY)
a = self.agent.act(s, frames, lock)
s_, r, done, info = self.env.step(a)
if done: # terminal state
s_ = None
result = info
self.agent.train(s, a, r, s_, lock)
s = s_
lock.acquire()
print("Total reward:", self.env.total)
lock.release()
data = pd.DataFrame(result, index=[frames.value], dtype=float)
data.to_csv(PATH_HISTORY, mode='a', header=False)
def run(self, lock, frames):
for i in range(EPS):
self.run_episode(lock, frames)
if self.testing: break
class Optimizer:
def __init__(self, brain):
self.brain = brain
def run(self, lock, frames):
for i in range(EPS):
self.brain.optimize(lock)
class Brain:
train_queue = [[], [], [], [], []] # s, a, r, s', s' terminal mask
def __init__(self, saved_model=True, path='model.h5'):
self.session = tf.Session()
K.set_session(self.session)
K.manual_variable_initialization(True)
if saved_model:
self.model = self.init_model(path=path)
else:
self.model = self._build_model()
self.graph = self._build_graph(self.model)
self.session.run(tf.global_variables_initializer())
self.default_graph = tf.get_default_graph()
self.default_graph.finalize() # avoid modifications
def _build_model(self):
l_input = Input(batch_shape=(None, NUM_STATE))
l_dense = Dense(128, activation='relu')(l_input)
l_dense = Dense(256, activation='relu')(l_dense)
l_dense = Dense(128, activation='relu')(l_dense)
out_actions = Dense(NUM_ACTIONS, activation='softmax')(l_dense)
out_value = Dense(1, activation='linear')(l_dense)
model = Model(inputs=[l_input], outputs=[out_actions, out_value])
model._make_predict_function() # have to initialize before threading
return model
def init_model(self, path):
if os.path.isfile(path):
model = load_model(path)
model._make_predict_function()
return model
else:
print('WARN! Given path does not exist')
return self._build_model()
def save_model(self, path='model.h5'):
self.model.save(path)
def _build_graph(self, model):
s_t = tf.placeholder(tf.float32, shape=(None, NUM_STATE))
a_t = tf.placeholder(tf.float32, shape=(None, NUM_ACTIONS))
r_t = tf.placeholder(tf.float32, shape=(None, 1)) # not immediate, but discounted n step reward
p, v = model(s_t)
log_prob = tf.log(tf.reduce_sum(p * a_t, axis=1, keepdims=True) + 1e-10)
advantage = r_t - v
loss_policy = - log_prob * tf.stop_gradient(advantage) # maximize policy
loss_value = LOSS_V * tf.square(advantage) # minimize value error
entropy = LOSS_ENTROPY * tf.reduce_sum(p * tf.log(p + 1e-10), axis=1, keepdims=True) # maximize entropy (regularization)
loss_total = tf.reduce_mean(loss_policy + loss_value + entropy)
optimizer = tf.train.RMSPropOptimizer(LEARNING_RATE, decay=.99)
minimize = optimizer.minimize(loss_total)
return s_t, a_t, r_t, minimize
def optimize(self, lock):
if len(self.train_queue[0]) < MIN_BATCH:
# time.sleep(0) # yield
return
lock.acquire()
if len(self.train_queue[0]) < MIN_BATCH: # more thread could have passed without lock
return # we can't yield inside lock
s, a, r, s_, s_mask = self.train_queue
self.train_queue = [[], [], [], [], []]
lock.release()
s = np.vstack(s)
a = np.vstack(a)
r = np.vstack(r)
s_ = np.vstack(s_)
s_mask = np.vstack(s_mask)
if len(s) > 5*MIN_BATCH:
lock.acquire()
print(f'Optimizer alert! Minimizing batch of {len(s)}')
lock.release()
v = self.predict(s_)[1]
r = r + GAMMA_N * v * s_mask # set v to 0 where s_ is terminal state
s_t, a_t, r_t, minimize = self.graph
self.session.run(minimize, feed_dict={s_t: s, a_t: a, r_t: r})
def train_push(self, s, a, r, s_, lock):
lock.acquire()
self.train_queue[0].append(s)
self.train_queue[1].append(a)
self.train_queue[2].append(r)
if s_ is None:
self.train_queue[3].append(NONE_STATE)
self.train_queue[4].append(0.)
else:
self.train_queue[3].append(s_)
self.train_queue[4].append(1.)
lock.release()
def predict(self, s):
with self.default_graph.as_default():
p, v = self.model.predict(s)
return p, v
np.seterr(all='raise')
def run(obj, lock, frames):
obj.run(lock, frames)
if __name__ == '__main__':
start = time.time()
env_threads, opt_threads = [], []
frames = Value('i', 0)
m = Manager()
lock = m.Lock()
BaseManager.register('Brain', Brain)
manager = BaseManager()
manager.start()
brain = manager.Brain()
env_test = Environment(brain=brain, testing=True, data=test_data, eps_start=0., eps_end=0.)
envs = [Environment(brain=brain, data=train_data) for i in range(THREADS)]
opts = [Optimizer(brain=brain) for i in range(OPTIMIZERS)]
for env in envs:
p = Process(target=run, args=[env, lock, frames])
p.start()
env_threads.append(p)
for opt in opts:
p = Process(target=run, args=[opt, lock, frames])
p.start()
opt_threads.append(p)
for p in env_threads:
p.join()
for p in opt_threads:
p.join()
print('finished in ', time.time() - start)
如果有人有类似的东西或任何可能相关的东西,请告诉我,因为我一直在为这个问题苦苦挣扎一段时间。
解决方案
我解决了。我认为这是由于这条线:
if len(self.train_queue[0]) < MIN_BATCH:
return
因为我们不能在锁内返回
推荐阅读
- java - ArrayAdapter 后应用程序关闭
- javascript - 在while循环中单击不会放在事件循环中
- sql-server - SQL Server 审计 - SENSITIVE_BATCH_COMPLETED_GROUP 在哪里?
- spring-boot - 执行 liquibase 后终止 Spring Boot 应用程序
- python - 如何添加 SMOTE?
- python - 使用 pandas 创建封面
- spring - 在 webflux 中使用协程的 spring @Transactional 抛出错误
- mysql - 为什么我使用 SELECT ... FOR UPDATE 锁从 mysql 获得死锁?
- r - 如何匹配来自两个单独列表的数据框名称并加入它们?
- python - pytsk3 的文件路径