首页 > 解决方案 > 通过在 scala 中消费来自 kafka 生产者的 Spark 流

问题描述

我正在尝试使用 kafka 主题进行火花流式传输。但是代码对我不起作用。据我了解,kafka 流的输出是一个键值对,所以我使用 rdd.value() 来提取它的值部分,然后拆分和用例类将其转换为 DF。

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLImplicits



object ProcessingKafkaLogs {
  case class TransactionLogs (CustomerID:String, CreditCardNo:Long, TransactionLocation:String, TransactionAmount:Int, TransactionCurrency:String, 
        MerchantName:String, NumberofPasswordTries:Int, TotalCreditLimit:Long, CreditCardCurrency:String)
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingKafkaFraudAnalysis")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    val topicsSet = "FraudTransactionLogs".map(_.toString).toSet
    
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.17.128:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "test-consumer-group", **//Idont know if some other value is required for this field**
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
      
    val LogsStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topicsSet,kafkaParams))
    //val LogsStream = KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent, Subscribe[String, String](topicsSet,kafkaParams))
    //the above line is also not working

    
    
    val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
    import sqlContext.implicits._
    LogsStream.foreachRDD(
    rdd =>
    {
    val dfTransactionLogs = rdd.map(line => line.value.split(',')).map( c => TransactionLogs(c(0), c(1).trim.toInt, c(2), c(3).trim.toInt, c(4), c(5), c(6).trim.toInt, c(7).trim.toInt, c(8)) ).toDF()
    dfTransactionLogs.registerTempTable("CustomerTransactionLogs")
    val dfFraud = sqlContext.sql("Select CustomerID, CreditCardNo, TransactionAmount, TransactionCurrency, NumberofPasswordTries, TotalCreditLimit, CreditCardCurrency from CustomerTransactionLogs where NumberofPasswordTries > 3 OR TransactionCurrency != CreditCardCurrency OR ( TransactionAmount * 100.0 / TotalCreditLimit ) > 50");
    dfFraud.show();
    })
    
    
    
  }
  
}

错误跟踪

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 10582
    at com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.accept(BytecodeReadingParanamer.java:563)
    at com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.access$200(BytecodeReadingParanamer.java:338)
    at com.thoughtworks.paranamer.BytecodeReadingParanamer.lookupParameterNames(BytecodeReadingParanamer.java:103)
    at com.thoughtworks.paranamer.CachingParanamer.lookupParameterNames(CachingParanamer.java:90)
    at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.getCtorParams(BeanIntrospector.scala:44)
    at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1(BeanIntrospector.scala:58)
    at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1$adapted(BeanIntrospector.scala:58)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:241)
    at scala.collection.Iterator.foreach(Iterator.scala:929)
    at scala.collection.Iterator.foreach$(Iterator.scala:929)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
    at scala.collection.IterableLike.foreach(IterableLike.scala:71)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:241)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:238)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.findConstructorParam$1(BeanIntrospector.scala:58)
    at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$19(BeanIntrospector.scala:176)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:191)
    at scala.collection.TraversableLike.map(TraversableLike.scala:234)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:191)
    at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14(BeanIntrospector.scala:170)
    at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14$adapted(BeanIntrospector.scala:169)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:241)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:238)
    at scala.collection.immutable.List.flatMap(List.scala:352)
    at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.apply(BeanIntrospector.scala:169)
    at com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$._descriptorFor(ScalaAnnotationIntrospectorModule.scala:22)
    at com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.fieldName(ScalaAnnotationIntrospectorModule.scala:30)
    at com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.findImplicitPropertyName(ScalaAnnotationIntrospectorModule.scala:78)
    at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findImplicitPropertyName(AnnotationIntrospectorPair.java:467)
    at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._addFields(POJOPropertiesCollector.java:351)
    at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:283)
    at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:169)
    at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:223)
    at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:348)
    at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:210)
    at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153)
    at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203)
    at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157)
    at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481)
    at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107)
    at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
    at org.apache.spark.rdd.RDDOperationScope.toJson(RDDOperationScope.scala:52)
    at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:80)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:57)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
    at com.project.FraudDetection.ProcessingKafkaLogs$.main(ProcessingKafkaLogs.scala:29)

据我了解,kafka 流的输出是一个键值对,所以我使用 rdd.value() 来提取它的值部分,然后拆分和用例类将其转换为 DF。

标签: scalaapache-sparkapache-kafkaspark-streaming

解决方案


推荐阅读