pyspark - 有没有办法在pyspark中将文件从远程位置快速复制到本地
问题描述
我正在使用 lftp 使用 mget 参数从远程位置复制文件。将 50 个 xml 文件从 sftp 机器复制到我的本地 Unix 机器大约需要 2 分钟。我希望能够复制 20k 个文件。一个 XML 文件大约为 ~15kb。数据框 df_files 包含我要复制的所有 XML 文件的列表。
我已经用 20,000 个文件尝试了下面的代码,似乎需要几个小时才能用这些文件创建一个数据框。
for row in df_files.tolist():
print row
cmd_p1 = """lftp sftp://username:password!@remotelocation-e "lcd /var/projects/u_admin/folder/;mget /var/projects/storage/folder/"""+row
cmd_p2 = """;bye " """
cmd_get_xml = cmd_p1+cmd_p2
s=subprocess.call(cmd_get_xml,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
j=0
for row in df_file.itertuples(index=True, name='Pandas'):
print getattr(row,'filename')
if j==0:
acq = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Message").load("file:///var/projects/u_admin/folder/"+df_file['filename'].iloc[j])
schema = acq.schema
else :
acq2 = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Message").load("file:///var/projects/u_admin/folder/"+df_file['filename'].iloc[j], schema = schema)
acq = acq.union(acq2)
我希望能够在最短的时间内复制这些文件。
解决方案
首先,使用Paramiko 的 SCP 模块.xml
将所有文件放入一个目录。假设您的文件具有相同的架构,因为您可以在同一目录上执行操作,一旦您将所有这些 xml 文件放在一个目录中,您就可以直接读取整个目录,而不是单独读取文件。.xml
union
此解决方案将节省您在 for 循环中花费的大量时间。
import paramiko
from scp import SCPClient
def createSSHClient(server, port, user, password):
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(server, port, user, password)
return client
ssh = createSSHClient(server, port, user, password)
scp = SCPClient(ssh.get_transport())
然后调用scp.get()
或scp.put()
做SCP操作。
acq_all = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Message").load("file:///var/projects/u_admin/folder/", schema = schema)
我了解您的用例可能会有所不同,因为您也有一个if-else
块,但架构是相同的,因此可以在读取文件后完成。您可以读取一个文件,以获得正确的架构,或者您可以在读取之前自行定义它。
推荐阅读
- selenium-webdriver - 无法通过 selenium 找到 Intranet Web UI 的 Web 元素通过 id、name..etc 查找元素一路
- ios - 可设计构建失败:无法附加到 pid:“26000”
- python - 字符串替换未写入文件
- python - 如何纠正 sklearn.naive_bayes 中的 sample_weight?
- typescript - 使用实时数据库时,与使用 firebase 函数在 typescript 中查询 firebase 等效的 .get() 方法是什么?
- javascript - 如何添加指向此动画按钮的链接
- unity3d - 清理现有 Unity 项目并将其迁移到新的一台或另一台 PC
- c++ - 如何使用 Google Speech To Text API 进行实时转录流音频
- java - 如果我动态地将 fileId 传递给查询,Google drive api to get all children 不起作用
- objective-c - 使用 swift WKWebView didFailNavigation 到 Objective-C 文件时崩溃