json - 我有一个包含多种不同类型 json 的流,每种都与用户事件有关,什么是拆分和聚合的最有效方法
问题描述
我有一个包含多种不同类型 json 消息的流。共有 65 种 json 事件类型,都有不同的模式。他们都共享一个共同的用户ID。
{'id': 123, 'event': 'clicked', 'target': 'my_button'}
{'id': 123, 'event': 'viewed', 'website': 'http://xyz1...'}
{'id': 123, 'event': 'viewed', 'website': 'http://xyz2...'}
{'id': 123, 'event': 'login', 'username': 'Bob'}
{'id': 456, 'event': 'viewed', 'website': 'http://xyz3...'}
{'id': 456, 'event': 'login', 'username': 'Susy'}
我想处理所有事件类型,每个事件类型都有自定义字段,然后按用户在所有过滤器类型中聚合所有内容。
{'id': 123, 'page_view_cnt': 100, 'user': 'Bob', 'click_cnt': 20}
{'id': 456, 'page_view_cnt': 14, 'user': 'Susy'}
有谁知道一种有效的方法来做到这一点。这是当前的思考过程
- 从线条流开始
- 使用 GSON 解析 json,而不是使用可能尝试推断类型的内置 json 解析器。
- 根据每种类型创建 65 个过滤器语句。json 将有 event=xyz 我可以区分。
- 将每个过滤器上的自定义属性聚合到用户 id -> 属性的映射中
- 合并来自所有过滤器的所有地图
这听起来很理智还是有更好的方法来做到这一点?
解决方案
这是我使用 RDD API 和 Jackson 得出的结论。我选择了低级 Spark API,因为它是无模式的,并且不确定结构化 API 如何适合可变输入事件类型。如果提到的 Gson 支持多态反序列化,它可以用来代替 Jackson,我只是选择了 Jackson,因为我更熟悉它。
问题可以分为几个步骤:
- 按事件类型将输入反序列化到对象中。
- 按 id 和类型减少。对于不同的类型,reduce 需要有不同的行为,例如视图被简单地缩减为一个总和,而用户名需要以不同的方式处理。在此示例中,我们假设用户名在其中是唯一的
id
并选择第一个。 - 收集减少的物品
id
。
第 2 步最需要注意,因为 Spark API 中没有这样的功能,并且需要某种运行时检查反序列化的事件是否属于不同的类。为了克服这个问题,让我们引入一个Reducible
可以封装不同类型的通用 trait:
trait Reducible[T] {
def reduce(that: Reducible[_]): this.type
def value: T
}
// simply reduces to sum
case class Sum(var value: Int) extends Reducible[Int] {
override def reduce(that: Reducible[_]): Sum.this.type = that match {
case Sum(thatValue) =>
value += thatValue
this
}
}
// for picking the first element, i.e. username
case class First(value: String) extends Reducible[String] {
override def reduce(that: Reducible[_]): First.this.type = this
}
运行时检查在这些类中处理,例如,Sum
如果右手对象不是同一类型,则会失败。
接下来,让我们定义事件的模型并告诉杰克逊如何处理多态性:
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, include=JsonTypeInfo.As.PROPERTY, property="event", visible=true)
sealed trait Event[T] {
val id: Int
val event: String
def value: Reducible[T]
}
abstract class CountingEvent extends Event[Int] {
override def value: Reducible[Int] = Sum(1)
}
@JsonTypeName("clicked") case class Click(id: Int, event: String, target: String) extends CountingEvent
@JsonTypeName("viewed") case class View(id: Int, event: String, website: String) extends CountingEvent
@JsonTypeName("login") case class Login(id: Int, event: String, username: String) extends Event[String] {
override def value: Reducible[String] = First(username)
}
object EventMapper {
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
// the list of classes could be auto-generated, see
// https://stackoverflow.com/questions/34534002/getting-subclasses-of-a-sealed-trait
mapper.registerSubtypes(classOf[Click], classOf[View], classOf[Login])
def apply(v1: String): Event[_] = mapper.readValue(v1, classOf[Event[_]])
}
所有事件都应具有字段id
和event
. 后者用于确定要反序列化到哪个类,Jackson 需要事先知道所有的类。特征Event
被声明为密封特征,因此可以在编译时确定所有实现类。我省略了这个反思步骤,只是对类列表进行硬编码,这里有一个很好的答案如何自动完成获取密封特征的子类
现在我们准备好编写应用程序逻辑了。为简单起见,sc.parallelize
用于加载示例数据。也可以使用 Spark 流。
val in = List(
"{\"id\": 123, \"event\": \"clicked\", \"target\": \"my_button\"}",
"{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
"{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
"{\"id\": 123, \"event\": \"login\", \"username\": \"Bob\"}",
"{\"id\": 456, \"event\": \"login\", \"username\": \"Sue\"}",
"{\"id\": 456, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}"
)
// partition (id, event) pairs only by id to minimize shuffle
// when we later group by id
val partitioner = new HashPartitioner(10) {
override def getPartition(key: Any): Int = key match {
case (id: Int, _) => super.getPartition(id)
case id: Int => super.getPartition(id)
}
}
sc.parallelize(in)
.map(EventMapper.apply)
.keyBy(e => (e.id, e.event))
.mapValues(_.value)
.reduceByKey(partitioner, (left, right) => left.reduce(right))
.map {
case ((id, key), wrapper) => (id, (key, wrapper.value))
}
.groupByKey(partitioner)
.mapValues(_.toMap)
.foreach(println)
输出:
(123,Map(clicked -> 1, viewed -> 2, login -> Bob))
(456,Map(login -> Sue, viewed -> 1))
推荐阅读
- r - 如何处理 GLMM 中的空间自相关残差
- r - 需要 arules 时使用的基本包。指定包不起作用
- django - Django基于多对多字段的第一个对象进行排序
- ruby-on-rails - 将Json插入sqlite数据库
- google-chrome - localhost 的 Chrome 扩展权限在 manifest.js 中不起作用
- android - MVVM:复杂视图/视图模型-> 多个 LiveData 对象?
- c# - 是否可以为 wpf 边框圆角的“外部”区域着色
- java - 带有 BootstrapMode.DEFERRED 的 Spring Data Jpa 多个 @EnableJpaRepositories 不起作用
- node.js - 使用 jsonwebtoken 库生成的 JWT 令牌在 jwt.io 中提供了无效签名
- java - 线图 x 轴值在库 MPAndroidchart 中重复