scala - 通过在 scala 中消费来自 kafka 生产者的 Spark 流
问题描述
我正在尝试使用 kafka 主题进行火花流式传输。但是代码对我不起作用。据我了解,kafka 流的输出是一个键值对,所以我使用 rdd.value() 来提取它的值部分,然后拆分和用例类将其转换为 DF。
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLImplicits
object ProcessingKafkaLogs {
case class TransactionLogs (CustomerID:String, CreditCardNo:Long, TransactionLocation:String, TransactionAmount:Int, TransactionCurrency:String,
MerchantName:String, NumberofPasswordTries:Int, TotalCreditLimit:Long, CreditCardCurrency:String)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingKafkaFraudAnalysis")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val topicsSet = "FraudTransactionLogs".map(_.toString).toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.17.128:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "test-consumer-group", **//Idont know if some other value is required for this field**
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val LogsStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topicsSet,kafkaParams))
//val LogsStream = KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent, Subscribe[String, String](topicsSet,kafkaParams))
//the above line is also not working
val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
import sqlContext.implicits._
LogsStream.foreachRDD(
rdd =>
{
val dfTransactionLogs = rdd.map(line => line.value.split(',')).map( c => TransactionLogs(c(0), c(1).trim.toInt, c(2), c(3).trim.toInt, c(4), c(5), c(6).trim.toInt, c(7).trim.toInt, c(8)) ).toDF()
dfTransactionLogs.registerTempTable("CustomerTransactionLogs")
val dfFraud = sqlContext.sql("Select CustomerID, CreditCardNo, TransactionAmount, TransactionCurrency, NumberofPasswordTries, TotalCreditLimit, CreditCardCurrency from CustomerTransactionLogs where NumberofPasswordTries > 3 OR TransactionCurrency != CreditCardCurrency OR ( TransactionAmount * 100.0 / TotalCreditLimit ) > 50");
dfFraud.show();
})
}
}
错误跟踪
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 10582
at com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.accept(BytecodeReadingParanamer.java:563)
at com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.access$200(BytecodeReadingParanamer.java:338)
at com.thoughtworks.paranamer.BytecodeReadingParanamer.lookupParameterNames(BytecodeReadingParanamer.java:103)
at com.thoughtworks.paranamer.CachingParanamer.lookupParameterNames(CachingParanamer.java:90)
at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.getCtorParams(BeanIntrospector.scala:44)
at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1(BeanIntrospector.scala:58)
at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1$adapted(BeanIntrospector.scala:58)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:241)
at scala.collection.Iterator.foreach(Iterator.scala:929)
at scala.collection.Iterator.foreach$(Iterator.scala:929)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
at scala.collection.IterableLike.foreach(IterableLike.scala:71)
at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:241)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:238)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.findConstructorParam$1(BeanIntrospector.scala:58)
at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$19(BeanIntrospector.scala:176)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:191)
at scala.collection.TraversableLike.map(TraversableLike.scala:234)
at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:191)
at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14(BeanIntrospector.scala:170)
at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14$adapted(BeanIntrospector.scala:169)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:389)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:241)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:238)
at scala.collection.immutable.List.flatMap(List.scala:352)
at com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.apply(BeanIntrospector.scala:169)
at com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$._descriptorFor(ScalaAnnotationIntrospectorModule.scala:22)
at com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.fieldName(ScalaAnnotationIntrospectorModule.scala:30)
at com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.findImplicitPropertyName(ScalaAnnotationIntrospectorModule.scala:78)
at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findImplicitPropertyName(AnnotationIntrospectorPair.java:467)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._addFields(POJOPropertiesCollector.java:351)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:283)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:169)
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:223)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:348)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:210)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153)
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203)
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157)
at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481)
at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
at org.apache.spark.rdd.RDDOperationScope.toJson(RDDOperationScope.scala:52)
at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:80)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:57)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
at com.project.FraudDetection.ProcessingKafkaLogs$.main(ProcessingKafkaLogs.scala:29)
据我了解,kafka 流的输出是一个键值对,所以我使用 rdd.value() 来提取它的值部分,然后拆分和用例类将其转换为 DF。
解决方案
推荐阅读
- flutter - 在颤动中找到现有用户时如何更改文本字段错误标签?
- ruby-on-rails - Rails 路由:如何重命名(嵌套)资源块中的 params-Hash 键?
- windows - 如何在 Windows 批处理文件中传递密码和 ssh 命令
- active-directory - ldap 查询获取 ACL
- c# - 使用附加到触发器的此脚本时出现 NullReferenceException
- python - 一次迭代numpy矩阵块
- java - WebClient 实现 - 执行这段代码“WebClient.builder().build()”时出现错误
- c++ - 将我的自定义编译器与乱序执行并行化
- python - 如何在 Matplotlib 中将方向均值叠加到 hexbin 图?
- javascript - 200 个字段/列的 MYSQL 过滤器