首页 > 解决方案 > 使用 scala 在 Spark 结构化流中将行转置为列

问题描述

我们使用Spark结构化流 2.4.1 版本来处理从 Kafka 到 Cassandra 的事件。该事件是一个嵌套的 JSON,我们需要将数据展平并加载到 Cassandra 表中。

我尝试在数据框上使用数据透视,但它抛出了错误消息。有人可以帮我解决这个问题。

Json 事件结构 -

{
  "event_name": "some event",
  "groups": [
    {
      "data_group_name": "personname",
      "fields": [
        {
          "col_name": "firstname",
          "value": "John"
        },
        {
          "col_name": "lastname",
          "value": "williams"
        }
      ]
    },
    {
      "data_group_name": "contact",
      "fields": [
        {
          "col_name": "mobile",
          "value": "1234567890"
        },
        {
          "col_name": "home",
          "value": "0987654321"
        }
      ]
    }
  ]
}

df.pivot($"col_name").agg(first($"value"),null)

预期结果:

----------------
event_name  firstname   lastname    mobile          home
----------------------------------------------------------------------------
some event  John            williams    1234567890  987654321

错误信息 -

带有流源的查询必须使用 writeStream.start();; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)

标签: apache-sparkstreaming

解决方案


推荐阅读