首页 > 解决方案 > Pyspark (Databricks) 性能问题。自然语言处理问题

问题描述

我在 Databricks 的 Pyspark 中处理 NLP 任务时遇到性能问题:

上下文:
我有 2 个带有“ID”列和“文本”列的 pyspark 数据框,例如:

Table A              |   Table B
ID_A  TEXT_A         |   ID_B    TEXT_B
0     text_A0        |   0       text_B0
1     text_A1        |   1       text_B1
2     text_A2        |   2       text_B2

为了找到文本之间的相似性,我想测量每个 A 记录与每个 B 记录之间的余弦相似度(类似于笛卡尔积相似度),所以我使用了一个Word2vec模型。
第一步是从下面解释的 lib中训练Word2Vec模型:ml

word2vec = Word2Vec(vectorSize = 5, windowSize=2, inputCol = 
'TEXT', outputCol = 'COORDINATES')
model_Word2Vec = word2vec.fit(Data_train)

然后我将模型应用于数据 A 和数据 B(model_Word2vec.transform(A)model_Word2vec.transform(B)),获得每个文本的坐标(单词坐标的平均值)。例如:

display(model_Word2vec.transform(A))

ID_A    TEXT_A    COORDINATES_A     
0       text A0   [1, 5, [], [0.05, 0.1, -1.5, 0.2, -0.7]]      
1       text A1   [1, 5, [], [0.15, -1.1, 0.5, 0.27, -0.1]]     
2       text A2   [1, 5, [], [1.05, 1.2, -0.55, 0.2, -1.7]]             

我不得不说数据帧是分布式且不可变的。

pyspark 中的余弦相似度示例:

X = [0.05, 0.1, -1.5, 0.2, -0.7]; Y = [1.0, 0.003, 2.12, 0.22, 1.3] 
cos(X, Y) = X.dot(Y) / ( X.norm(2)*Y.norm(2) ) 

问题:
我想要这样的东西:

Crossjoin
ID_A  COORDINATES_A                 ID_B COORDINATES_B                 Cosine
0     [0.05, 0.1, -1.5, 0.2, -0.7]  0    [1.0, 0.003, 2.12, 0.22, 1.3]  -0.89
0     [0.05, 0.1, -1.5, 0.2, -0.7]  1    [0.13, 1.1, 0.5,1.27, 1.99]    -0.4
1     [0.15, -1.1, 0.5, 0.27, -0.1] 0    [1.0, 0.003, 2.12, 0.22, 1.3]  -0.34
1     [0.15, -1.1, 0.5, 0.27, -0.1] 1    [0.13, 1.1, 0.5,1.27, 1.99]    -0.24
2     [1.05, 1.2, -0.55, 0.2, -1.7] 0    [1.0, 0.003, 2.12, 0.22, 1.3]  -0.35
2     [1.05, 1.2, -0.55, 0.2, -1.7] 1    [0.13, 1.1, 0.5,1.27, 1.99]    -0.31

我正面临性能问题,我得到这样的东西:

1° 第一种方法:

# Then we make a loop:
for i in range(0, 3):
  # Here we get the coordinates of the row “i” of the data A, in a list type.
  Y = Vectors.dense( A.select( col( COORDINATES_A ) ).collect()[i][0]

  # Here we go through every row of the coordinates of the data B, get the coordinates (with x[0]), and compute 1- cosine similarity (that we will name “coseno”) between every coordinates of B, and the coordinate “i” of A.
  Cosino_list = [[float(i)] for i in B[ ['COORDINATES_B']].rdd.map(lambda x: 1-x[0].dot(Y)/(x[0].norm(2)*Y.norm(2))).collect()]

  # Here we create a data frame with a column named “coseno” and the values of Cosino_list, and concatenate the rows with the previous data:
  df_cosino = df_cosino.union(spark.createDataFrame(Cosino_list, ["coseno"]))

我认为 2 个循环会导致以下错误:

(15) Spark Jobs
The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.

2° 第二种方法:

# We make a crossjoin between A and B:
Df_crossjoin = A.crossJoin(B).

# We extract the coordinates:
column_COORDINATES_A = [(i[0].values) for i in Df_crossjoin.select(col("COORDINATES_A")).collect()]
column_COORDINATES_B = [(i[0].values) for i in Df_crossjoin.select(col("COORDINATES_B")).collect()]

但我收到以下错误

(1) Spark Jobs
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage 27.0 (TID 54, 10.139.64.6, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
--------------------------------------------------------------------------- 
Py4JJavaError Traceback (most recent call last) <command-2001347748501300> in <module>() ----> 1 columna_que_como_c = [(i[0].values) for i in cross_que.select(col("result_que_como_c")).collect()] /databricks/spark/python/pyspark/sql/dataframe.py in collect(self) 546 # Default path used in OSS Spark / for non-DF-ACL clusters: 547 with SCCallSiteSync(self._sc) as css: --> 548 sock_info = self._jdf.collectToPython() 549 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) 550 /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw):´

如果有人可以帮助我解决问题,我将不胜感激。

标签: pysparkdatabricks

解决方案


推荐阅读