python - Spark - 改进名称列表和压缩文件之间字符串匹配的搜索时间
问题描述
我正在尝试使用 Spark 在压缩文件列表中搜索字符串列表。下面是我使用的工作代码。已使用 int 键将字符串列表保存到字典中。我正在针对函数 fnMatch 中的每个文件构建一个逗号分隔的匹配 int 键列表。虽然代码有效,但需要几个小时才能完成。有哪些优化可以减少运行时间?
#Function to extract zip files
def zip_extract(x):
in_memory_data = io.BytesIO(x[1])
file_obj = zipfile.ZipFile(in_memory_data, "r")
files = [i for i in file_obj.namelist()]
return dict(zip(files, [file_obj.open(file).read() for file in files]))
def safeStr(obj):
try: return str(obj)
except UnicodeEncodeError:
return obj.encode('ascii', 'ignore').decode('ascii')
except: return ""
#Function to match string list contained in dictionary dcust, one by one against file doctext
def fnMatch(doctext,dcust):
retval=""
for k in dcust:
if dcust[k] in doctext:
retval=retval+","+str(k)
return retval
schema = StructType([StructField('fpath', StringType(), True),StructField('docText', StringType(), True)])
zips = sc.binaryFiles('hdfs://hp3/user/test/testhdfs/myzipfile.zip')
files_data = zips.map(zip_extract)
files_data_flat = files_data.flatMap(lambda x: x.items())
files_data_flat_tfm = files_data_flat.map(lambda x: (safeStr(x[0]),safeStr(x[1])))
df = hc.createDataFrame(files_data_flat_tfm,schema)
df2 = df.withColumn("docLength", size_(col("docText")) )
dfcust = hc.sql('select fullname from tbl_custfull').toPandas()
res=len(dfcust)
print "##################################################"+str(res)+"##############################"
dictcust = dfcust.to_dict().values()[0]
strmatches = udf(lambda x: fnMatch(x,dictcust), StringType())
df2 = df2.withColumn("strMatches", strmatches(col("docText")) )
df2.createOrReplaceTempView ("df2")
dfres=hc.sql("SELECT fpath,docLength,strMatches FROM df2 WHERE length(strMatches) >0")
dfres.show(5)
我使用提交火花作业
spark-submit \
--conf spark.executor.memory=20g \
--conf spark.executor.cores=5 \
--conf spark.executor.instances=139 \
--conf spark.driver.maxResultSize=8g \
--files /etc/spark2/conf/hive-site.xml \
--master yarn \
--deploy-mode cluster \
myprogram.py
我应该避免在这里做些什么来提高性能吗?尝试更改执行程序内存和核心,但差别不大。该列表有大约 270K 字符串和 60k 文档
解决方案
zip 扩展名不可拆分,因此无法在记录级别并行化。如果您可以使用 gzip 压缩,那么性能将大大提高。此外,将整个文件读入内存哈希映射然后搜索对于大文件是不必要且不可扩展的。如果使用 gzip 压缩,则 Spark 可以自动解压缩并搜索您。这是 Scala 中的一个示例。
val spark : SparkSession = SparkSession.builder
.appName("Test")
.master("local[2]")
.config("spark.ui.enabled",false)
.getOrCreate()
import spark.implicits._
val test = spark.read.text("/Users/..../temp/test.txt.gz")
test.filter(r => r.getAs[String]("value").contains("gzip")).show(false)
输入文件是 test.gzip 包含
this is just testing
to see that this test is passing
gzip is a splittable format
test to see it runs in parallel
结果
zgip is a splittable format
推荐阅读
- python - Pandas groupby 并在列表中获取字典
- android - 如何让与主循环器同步的处理程序等待更多数据
- ios - 由于缺少应用程序文件,jfxmobile 应用程序无法在 iphone 模拟器上安装
- mysql - 使用 codeigniter 从 db 获取数据并在视图中显示特定的数组索引
- python - How to train your own model in AWS Sagemaker?
- office365 - How to retrieve group content if you are not a member (you are Global Administrator)?
- angularjs - Replacements for isNullOrUndefined and isNumber utils methods after AngularJS v0.11.3 onwards
- python - Represent this Image django Model
- java - javax.persistence.PersistenceException:没有名为 aramis 的 EntityManager 的持久性提供程序
- sqlite - Linq2db for SQLite - Xamarin form