首页 > 解决方案 > 当 clients_per_round 超过 99 时,tff.templates.IterativeProcess.next() 中的 TensorFlow Federated (TFF) TypeError

问题描述

我用 TFF 实现了一个自定义的联邦学习 GAN 训练循​​环,类似于Google Research 的这段代码

使用以下代码片段可以找到特定训练轮次的客户端数据:

def client_dataset_fn():
    # Sample clients and data
    sampled_clients = np.random.choice(train_data.client_ids, size=cfg.clients_per_round, replace=False)
    datasets = [(next(client_gen_inputs_iterator),
                 train_data.create_tf_dataset_for_client(client_id).take(cfg.n_critic))
                for client_id in sampled_clients]
    return datasets

client_noise_inputs, client_real_data = zip(*client_dataset_fn())

在设置为 99之前cfg.clients_per_round,这一切正常。当它设置为 100 或更大的值(当然客户端总数更大)时,我收到以下错误:

Traceback (most recent call last):
  File "main.py", line 109, in main
    metrics = run_single_trial(train_data, test_data, cfg)
  File "/mnt/workspace/tff/GAN/federated/fedgan_main.py", line 73, in run_single_trial
    metrics = train_loop(iterative_process, server_dataset_fn, client_dataset_fn, model, eval_hook_fn, cfg)
  File "/mnt/workspace/tff/GAN/federated/fedgan_main.py", line 124, in train_loop
    client_real_data)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/computation/function_utils.py", line 525, in __call__
    return context.invoke(self, arg)
  File "/usr/local/lib/python3.6/dist-packages/retrying.py", line 49, in wrapped_f
    return Retrying(*dargs, **dkw).call(f, *args, **kw)
  File "/usr/local/lib/python3.6/dist-packages/retrying.py", line 206, in call
    return attempt.get(self._wrap_exception)
  File "/usr/local/lib/python3.6/dist-packages/retrying.py", line 247, in get
    six.reraise(self.value[0], self.value[1], self.value[2])
  File "/usr/local/lib/python3.6/dist-packages/six.py", line 703, in reraise
    raise value
  File "/usr/local/lib/python3.6/dist-packages/retrying.py", line 200, in call
    attempt = Attempt(fn(*args, **kwargs), attempt_number, False)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/execution_context.py", line 226, in invoke
    _ingest(executor, unwrapped_arg, arg.type_signature)))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/common_libs/tracing.py", line 396, in _wrapped
    return await coro
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/execution_context.py", line 111, in _ingest
    ingested = await asyncio.gather(*ingested)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/execution_context.py", line 116, in _ingest
    return await executor.create_value(val, type_spec)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/common_libs/tracing.py", line 201, in async_trace
    result = await fn(*fn_args, **fn_kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/reference_resolving_executor.py", line 294, in create_value
    value, type_spec))
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/common_libs/tracing.py", line 201, in async_trace
    result = await fn(*fn_args, **fn_kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/thread_delegating_executor.py", line 111, in create_value
    self._target_executor.create_value(value, type_spec))
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/thread_delegating_executor.py", line 105, in _delegate
    result_value = await _delegate_with_trace_ctx(coro, self._event_loop)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/common_libs/tracing.py", line 396, in _wrapped
    return await coro
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/common_libs/tracing.py", line 201, in async_trace
    result = await fn(*fn_args, **fn_kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/federating_executor.py", line 394, in create_value
    return await self._strategy.compute_federated_value(value, type_spec)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/core/impl/executors/federated_composing_strategy.py", line 279, in compute_federated_value
    py_typecheck.check_type(value, list)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_federated/python/common_libs/py_typecheck.py", line 41, in check_type
    type_string(type_spec), type_string(type(target))))
TypeError: Expected list, found tuple.

在调试的时候,我查看target了traceback最后一行的变量,发现就是上面提到的client_real_dataand client_noise_inputs。它们的类型实际上是元组而不是列表,但是,这不会随着不同数量的cfg.clients_per_round. 的唯一用法在cfg.clients_per_round上面的随机选择中显示。我真的无法解释为什么会发生这种情况,也许有人经历过类似的事情并且可以帮助我。

我使用的包版本如下:

作为一种解决方法,我现在手动更改client_noise_inputsclient_real_datausing的数据类型list(tuple_var),但我仍然很好奇为什么需要该列表以某种方式。

标签: tensorflowpython-3.6tensorflow-federated

解决方案


(从GitHub 上的原文复制和粘贴)

在我看来,这似乎是federated_composing_strategy和之间的实现区别federated_resolving_strategy。IIRC,默认情况下,在您达到 100 个客户端之前,我们不会将组成执行程序注入您的堆栈——这将是这个令人兴奋的谜团的来源。

特别是,组合策略是针对传入客户端放置的值表示为列表的假设进行编程的,而解析策略针对更灵活的容器集进行编码。

将客户放置的值强制到列表中并不疯狂——我们还可以在组合执行器中扩展客户端放置的值的允许表示,以匹配解析中的表示,可能将适当的逻辑拉到一个共享的地方,比如这里. 我认为这是一个贡献,如果你愿意,我们很乐意接受!


推荐阅读