首页 > 解决方案 > 如何使用 spring stream sftp 通过 sftp 下载文件?

问题描述

我正在尝试创建一个以 sftp 作为源和任务启动器作为接收器的流。

sftp 应该只是将文件下载到本地目录,在远程主机上重命名或删除它,然后调用一个预先配置的任务,该任务读取该文件。

有流的定义

sftp --local-dir="/appdata/app/data" --rename-remote-files-to="'/home/users/test/APP_DATA.csv'" --remote-dir="/home/users/test" --username=test --host="xx.xxx.xxx.xxx" --allow-unknown-keys=true --private-key="file:///appdata/app/config/id_rsa" --filename-pattern="APP_DATA.csv" --delay-when-empty=60s --task.launch.request.task-name="appdata-import" --stream=false | tasklauncher --server-uri=http://dataflow-server:9393

这种配置的问题是,sftp 似乎试图通过 kafka 发送文件内容,尽管“--stream=false”

在日志中我看到以下

发送失败;嵌套异常是 org.apache.kafka.common.errors.RecordTooLargeException: The message is 4931767 bytes when serialized which is greater than 1048576, which is the value of the max.request.size configuration., failedMessage=GenericMessage [payload=byte[ 4930956], headers={file_remoteHostPort=xx.xxx.xxx.xxx:22, b3=90263d42cde82fff-d2aab70532d1d6d6-1, nativeHeaders={}, file_name=APP_DATA.csv, file_remoteDirectory=//home/users/test, file_originalFile=/ appdata/app/data/APP_DATA.csv,id=099a018b-9b1c-0a84-31a2-39858a7e5bbe,contentType=application/octet-stream,file_relativePath=APP_DATA.csv,file_remoteFile=APP_DATA.csv,timestamp=1629731532352}

远程文件不见了,在本地目录中,文件在那里,sftp 很好地完成了他的主要工作,但它无法启动任务。我的配置有什么问题?

第二个问题,我不明白,--rename-remote-files-to="'/home/users/test/APP_DATA.csv'" 的意思。如果我省略了参数,那么即使我设置了 delete-remote-files=true,sftp 也根本不会启动。

我在这种情况下得到以下错误

无法实例化 [org.reactivestreams.Publisher]:工厂方法 'sftpReadingFlow' 抛出异常;嵌套异常是 org.springframework.beans.factory.BeanCreationException:'currentComponent' (bean '_org.springframework.integration.errorLogger.handler') 是单向 'MessageHandler',不适合配置 'outputChannel' . 这是集成流程的结束。

我使用以下图像使用 docker-compose 启动 Dataflow

springcloud/spring-cloud-dataflow-server:2.8.1 springcloud/spring-cloud-skipper-server:2.7.1

我无法从数据流网站安装 sftp 源,因为服务器没有互联网连接。我从 maven 存储库手动下载了 sftp、tasklauncher。两者都在 3.0.2 版本中。

谢谢

更新:

通过添加参数解决的主要问题--file.consumer.mode="ref"。默认为内容。在这种模式下,只有本地文件的路径作为有效负载发送。

该参数--task.launch.request.task-name="appdata-import"被完全忽略,因此任务启动器无法正常工作,因为它收到了一个带有文件路径的字符串。但需要一个 json 对象。我不得不在两者之间添加一个处理器“变压器”。

 sftp...   | transform --expression="'{\"name\":\"app-import\"}'" | tasklauncher

标签: springstreamtasksftpdataflow

解决方案


推荐阅读