首页 > 解决方案 > 通过 Pyspark (Python) 上的内部连接和过滤条件连接两个数据帧

问题描述

我需要根据正确数据框中的一列的值将两个数据框与内部连接和过滤条件连接起来。我已经尝试了一些我在这里看到的问题,但到目前为止没有任何工作,有人可以帮忙吗?

我有两个数据框:df_consumos_diarios 和 df_facturas_mes_actual_flg。他们有一个共同点:id_cliente

这是两个DF:

df_consumos_diarios.show(5)
+----------+----------------+------------+----------------------+---------------------+----------+
|id_cliente|consumo_datos_MB|sms_enviados|minutos_llamadas_movil|minutos_llamadas_fijo|     fecha|
+----------+----------------+------------+----------------------+---------------------+----------+
|         1|             664|           3|                    25|                    0|2020-08-01|
|         1|             943|           0|                    12|                    5|2020-08-02|
|         1|            1035|           1|                    46|                   10|2020-08-03|
|         1|             760|           3|                    17|                    0|2020-08-04|
|         1|            1409|           1|                    31|                    4|2020-08-05|


df_facturas_mes_actual_flg.show(5)
+----------+---------+-------+----------+----+-----------+
|id_cliente|id_oferta|importe|     fecha|edad|flg_mes_ant|
+----------+---------+-------+----------+----+-----------+
|         1|        9|   36.5|2020-07-31|  26|          1|
|         1|        6|  118.6|2020-07-31|  26|          1|
|         1|        6|  124.5|2020-07-31|  26|          1|
|         2|        4|   95.0|2020-07-31|  58|          1|
|         3|        5|  102.5|2020-07-31|  68|          1|
+----------+---------+-------+----------+----+-----------+

我想做内部连接而不是合并或连接的原因是因为这些是 pyspark.sql 数据帧,我认为这样更容易。

我想要做的是从这两个中创建一个新的数据框,我只在正确的数据框的“flg_mes_ant”下显示不等于 1 的值。当我编写内部连接子句时,代码可以正常工作,但是添加过滤条件会使一切变得混乱。这是我迄今为止尝试过的:

   df2 = df_consumos_diarios.join(df_facturas_mes_actual_flg, on=["id_cliente"] & 
         [df_facturas_mes_actual_flg["flg_mes_ant"] != "1"], how='inner')

我收到的错误消息是:

类型错误:& 不支持的操作数类型:“列表”和“列表”

有谁知道我做错了什么以及如何克服这个错误?

标签: pythonapache-sparkjoinpysparkapache-spark-sql

解决方案


您可以在加入后进行过滤:

import pyspark.sql.functions as F

df2 = df_consumos_diarios.join(
    df_facturas_mes_actual_flg, 
    on="id_cliente", 
    how='inner'
).filter(F.col("flg_mes_ant") != "1")

或者您可以在加入之前过滤正确的数据框(这应该更有效):

df2 = df_consumos_diarios.join(
    df_facturas_mes_actual_flg.filter(df_facturas_mes_actual_flg["flg_mes_ant"] != "1"), 
    on="id_cliente", 
    how='inner'
)

推荐阅读