首页 > 解决方案 > 如何使用 Kafka 解码器在 Spark 中应用 Scala 泛型类型?

问题描述

[更新] 有人有类似的问题:https
://github.com/sksamuel/avro4s/issues/19 似乎没有好的解决方案。

[更新]
我不认为这是一个重复的问题,因为解决了 Apache Spark 中的依赖问题,因为当我不使用 Scala 泛型时Decoder,一切都运行良好:

// This works.
class EnigmaDecoder(props: VerifiableProperties = null) extends Decoder[AdTracking] {
  override def fromBytes(bytes: Array[Byte]): AdTracking = {
    if (...)  null
    else AdTracking.parseFrom(...)
  }
}

[原帖]
我有多种消息类型(例如AdTracking)。它们都有类似的消息操作接口,比如静态成员AdTracking AdTracking::parseFrom(ByteString)

我不想一个一个地复制消息解析器,所以我用 Scala 中的泛型类型来包装它。

trait BsParser[T] {
  def parseFrom(bs: ByteString): T  // T: AdTracking
}

object EnigmaDecoder {
  implicit object AdTrackingBsParser extends BsParser[AdTracking] {
    override def parseFrom(bs: ByteString): AdTracking = AdTracking.parseFrom(bs)
  }
}

class EnigmaDecoder[T >: Null : BsParser](props: VerifiableProperties = null) extends Decoder[T] {
  override def fromBytes(bytes: Array[Byte]): T = {
    if (...) null
    // call static method AdTracking::parseFrom(bs) to build an AdTracking object
    else implicitly[BsParser[T]].parseFrom(...)
  }
}

EnigmaDecoder[AdTracking]在 Spark 中用作:

val messages = KafkaUtils.createDirectStream[String, AdTracking, StringDecoder, EnigmaDecoder[AdTracking], (String, AdTracking)](
    ssc, kafkaParams, fromOffsets,  messageHandler)

不幸的是,当我EgnimaDecoder[AdTracking]在 Spark 中使用它时,我遇到了这样的错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost, executor driver): java.lang.NoSuchMethodException: EnigmaDecoder.<init>(kafka.utils.VerifiableProperties)
    at java.lang.Class.getConstructor0(Class.java:3082)
    at java.lang.Class.getConstructor(Class.java:1825)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:156)
    at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)

我检查了编译的类,但不知道如何修复它。

Compiled from "EnigmaDecoder.scala"
public class EnigmaDecoder<T> implements kafka.serializer.Decoder<T> {
  private final BsParser<T> evidence$2;
  public static <T> kafka.utils.VerifiableProperties apply$default$1();
  public static <T> kafka.utils.VerifiableProperties $lessinit$greater$default$1();
  public static <T> EnigmaDecoder<T> apply(kafka.utils.VerifiableProperties, BsParser<T>);
  public T fromBytes(byte[]);
  public EnigmaDecoder(kafka.utils.VerifiableProperties, BsParser<T>);
}

有人可以帮我吗?
谢谢!

标签: scalaapache-sparkapache-kafka

解决方案


推荐阅读