首页 > 解决方案 > 优化 Flink 转换

问题描述

我有以下方法来计算 a 中值的概率DataSet

/**
   * Compute the probabilities of each value on the given [[DataSet]]
   *
   * @param x single colum [[DataSet]]
   * @return Sequence of probabilites for each value
   */
  private[this] def probs(x: DataSet[Double]): Seq[Double] = {
        val counts = x.groupBy(_.doubleValue)
          .reduceGroup(_.size.toDouble)
          .name("X Probs")
          .collect

        val total = counts.sum

        counts.map(_ / total)
  }

问题是,当我提交使用此方法的 flink 作业时,它会导致 flink 由于任务而终止作业TimeOut。我正在为DataSet只有 40.000 个实例和 9 个属性的每个属性执行此方法。

有没有办法可以更有效地执行此代码?

经过几次尝试mapPartition,我使它与InformationTheorySymmetricalUncertainty

/**
   * Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
   *
   * It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
   *
   * @param xy [[DataSet]] with two features
   * @return SU value
   */
  def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
    val su = xy.mapPartitionWith {
      case in ⇒
        val x = in map (_._2)
        val y = in map (_._1)

        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)

        Some(2 * mu / (Hx + Hy))
    }

    su.collect.head.head
  }

有了这个,我可以有效地计算entropy,互信息等。关键是,它只能在并行度为 1 的情况下工作,问题出在mapPartition.

有没有办法我可以做一些类似于我在这里所做的事情SymmetricalUncertainty,但无论并行度如何?

标签: scalaoptimizationapache-flink

解决方案


我终于做到了,不知道它是否是最好的解决方案,但它使用 n 级并行性:

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
    val su = xy.reduceGroup { in ⇒
        val invec = in.toVector
        val x = invec map (_._2)
        val y = invec map (_._1)

        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)

        2 * mu / (Hx + Hy)
    }

    su.collect.head
  } 

您可以在InformationTheory.scala检查整个代码,并测试InformationTheorySpec.scala


推荐阅读