首页 > 解决方案 > 在 Clojure Flambo api 调用中进行 DataFrame 查找时无法获得结果

问题描述

我读取了一个镶木地板文件,并使用 Flambo api 将数据作为 RDD 获取。我们应用列名的 zipmap 并创建一个哈希映射/Clojure 映射

假设我的地图具有以下值

[{:a 1 :b2} 
 {:a 2 :b 2}]

(:require [flambo.api :as f])

核心.clj

我在用

(f/map rdd-records (f/fn[each-rdd] 
                   (perform-calcs each-red)))

在基于 map 输入的 perform-calcs 函数中,我们进行了额外的计算,例如

cals.clj

(defn perform-calcs 
[r] 
(merge r {:c (+ (:a r) (:b r))}))

我们有一个新要求是基于另一个文件中的另一个 DataFrame 执行另一个计算。我们不想为每条记录加载文件,因此保留代码以将 DataFrame 加载到 calc 外部并在 commons 文件中定义。此 DataFrame 作为应用程序的一部分加载,并且可以跨应用程序访问。

commons.clj

(def another-csv-df 
     (load-file->df "file-name"))

计算.clj

(defn df-lookup
[r df] 
 {:d (-> 
      df (.filter (format "a = %d and b = %d" (:a r) (:b r) )    
      (.select (into [] (map #(Column. %) ["d"] )))                     
      (first)
      (.getString(0))})

通过将其包含在 perform-calcs 函数中,将发生如下变化。

(defn perform-calcs 
 [r] 
 (-> r  
 (merge {:c (+ (:a r) (:b r))}) 
 (df-lookup commons/another-csv-df))

实际上,我看到了数据框上的值...代码按预期工作,没有这个 DF 的外部调用和这个 DF 查找代码它运行了很长时间......并且永远不会完成这个过程

标签: apache-sparkclojure

解决方案


Spark 中根本不允许像这样的嵌套转换。您将不得不重新考虑您的方法,可能通过将 RDD 转换为Dataset并在两者之间执行连接。


推荐阅读