首页 > 解决方案 > 将 tf.argmax 结果转换为 numpy 数组

问题描述

我是 Tensorflow 的新手,编写了以下分布式训练代码。代码工作正常。

import multiprocessing
import os
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow_hub as hub
import tensorflow.python.keras.backend as K
#1. Define Workers
def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, task_id=0, task_type="worker",rpc_layer="grpc")
  return cluster_resolver

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

word = "Elephant"
sentence = "I am a sentence for which I would like to get its embedding."
paragraph = (
    "Universal Sentence Encoder embeddings also support short paragraphs. "
    "There is no hard limit on how long the paragraph is. Roughly, the longer "
    "the more 'diluted' the embedding will be.")
messages = [word, sentence, paragraph]
#labels=["1","2","3"]
reviews = [[1,0,0],[0,1,0],[0,0,1]]


encoder=hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")

X_train=encoder(messages)

BUFFER_SIZE = len(X_train)
BATCH_SIZE_PER_REPLICA = 2
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 4


with strategy.scope():

    model = keras.Sequential()

    model.add(
        keras.layers.Dense(
            units=256,
            input_shape=(X_train.shape[1],),
            activation='relu'
        )
    )
    model.add(
        keras.layers.Dropout(rate=0.5)
    )

    model.add(
        keras.layers.Dense(
            units=128,
            activation='relu'
        )
    )
    model.add(
        keras.layers.Dropout(rate=0.5)
    )

    model.add(keras.layers.Dense(3, activation='softmax'))
    # model.compile(
    #     loss='categorical_crossentropy',
    #     optimizer=keras.optimizers.Adam(0.001),
    #     metrics=['accuracy']
    # )

    # history = model.fit(
    #     np.array(X_train), np.array(reviews),
    #     epochs=10,
    #     batch_size=16,
    #     verbose=1,
    #     shuffle=True
    # )
    optimizer=keras.optimizers.Adam(0.001)
    accuracy = keras.metrics.Accuracy()


def step_fn(x_train_slice):

    x_train, y_train = next(x_train_slice)
    with tf.GradientTape() as tape:
        pred=model(x_train,training=True)
        # tf.print(x_train)
        # tf.print(pred)
        # tf.print(y_train)

        per_example_loss = keras.losses.CategoricalCrossentropy(
            reduction=tf.keras.losses.Reduction.NONE)(y_train, pred)
        loss = tf.nn.compute_average_loss(per_example_loss)
        gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    tf.print("train values are",x_train)
    tf.print(" pred Values are : ", pred)
    tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem
    tf.print(" actual_pred Values are : ", actual_pred)
    tf.print(" Labels  are : ", y_train)
    tf.print(" Labels Max Values are : ", tf.argmax(y_train))
    accuracy.update_state(y_train, actual_pred)
    tf.print("Accuracy is : ",accuracy.result())
    return loss

@tf.function
def distributed_train_step(x_train_slice):
    losses = strategy.run(step_fn,args=(x_train_slice,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)


@tf.function
def per_worker_dataset_fn():
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, reviews)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
    # test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
    train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
    # test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
    return train_dist_dataset


coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
num_epoches = 5
steps_per_epoch = 1
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
    # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))

问题是,在 step_fn 中,一旦我得到预测值,我想得到相应的标签,为此我使用了这行代码 tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem

argmax 给出了最大概率的索引数组。我想将其提取为 numpy 数组并将其索引到评论数组(One-Hot 编码值)以获取混淆矩阵。

但我无法将tf.math.argmax(pred,axis=0)张量转换为 numpy 数组。我尝试了许多方法,例如 eval(K.get_session()) 等等,但没有任何效果。任何帮助表示赞赏。

非常感谢

标签: pythonnumpytensorflow

解决方案


好的,我在这里找到了两个解决方案。

这是您可能应该这样做的方式:

在准确度之后添加更多 Keras 指标,可用于计算混淆矩阵:

accuracy = keras.metrics.Accuracy()
tp = keras.metrics.TruePositives()
tn = keras.metrics.TrueNegatives()
fp = keras.metrics.FalsePositives()
fn = keras.metrics.FalseNegatives()

现在在 step_fn 中也更新它们:

accuracy.update_state(y_train, actual_pred)
argmax_pred = tf.one_hot(tf.math.argmax(pred,axis=1),depth=pred.shape[1])
tp.update_state(y_train, argmax_pred)
tn.update_state(y_train, argmax_pred)
fp.update_state(y_train, argmax_pred)
fn.update_state(y_train, argmax_pred)

现在您可以在访问准确度结果的地方访问结果:

coordinator.join()
print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))
print ("TP=%f  TN=%f  FP=%f  FN=%f" % (tp.result().numpy(),tn.result().numpy(),fp.result().numpy(),fn.result().numpy()))

那应该对你有用。


这是另一种方法:

该策略只是继续返回您的 argmax 值,直到它们返回到您的主循环中,在那里它们将显示为 RemoteValue 对象,然后 fetch() 它们的值。

例如,在 step_fn 中,将您的 argmax 值发送回调用函数:

return (loss, tf.math.argmax(pred,axis=0))

然后,在distributed_train_step中,根据返回的元组进行调整,并继续将argmax返回到下一步,可能是这样的:

def distributed_train_step(x_train_slice):
    (losses,argmaxes) = strategy.run(step_fn,args=(x_train_slice,))
    strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
    return argmaxes

Notice there that I moved your strategy.reduce from the return line to its own line. You weren't using the returned value anyway, because you had no lval for the coordinator.schedule line, but now you can add one to grab those returned argmaxes:

argmaxes = coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
print ("Back at home, argmaxes=",argmaxes.fetch())

Make sure you use the fetch() command, because argmaxes will be different than a Tensor once it makes it back like this. The RemoteValue class is documented here: https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/coordinator/RemoteValue

You'd need to expand this solution by returning any other values you were going to use for calculating TP/FP/TN/FN on your own.


推荐阅读