google-cloud-dataflow - 从模板启动时,数据流作业不会从 PubSub 消耗
问题描述
我目前有一项工作,将 pubsub 主题的内容输出到云存储文件夹,如果我直接启动 jar,它可以正常工作。
但是,每当我尝试使用上传的模板启动作业时,都没有消息通过管道。
它与Google 提供的模板非常相似,只是它接受订阅而不是主题。
这是我的配置:
trait Options extends PipelineOptions with StreamingOptions {
@Description("The Cloud Pub/Sub subscription to read from")
@Default.String("projects/project/subscriptions/subscription")
def getInputSubscription: String
def setInputSubscription(value: String): Unit
@Description("The Cloud Storage directory to output files to, ends with /")
@Default.String("gs://tmp/")
def getOutputDirectory: String
def setOutputDirectory(value: String): Unit
@Description("The Cloud Storage prefix to output files to")
@Default.String("subscription-")
def getOutputFilenamePrefix: String
def setOutputFilenamePrefix(value: String): Unit
@Description("The shard template which will be part of the filenames")
@Default.String("-W-P-SSSSS-of-NNNNN")
def getShardTemplate: String
def setShardTemplate(value: String): Unit
@Description("The suffix of the filenames written out")
@Default.String(".txt")
def getOutputFilenameSuffix: String
def setOutputFilenameSuffix(value: String): Unit
@Description("The window duration in minutes, defaults to 5")
@Default.Integer(5)
def getWindowDuration: Int
def setWindowDuration(value: Int): Unit
@Description("The compression used (gzip, bz2 or none), bz2 can't be loaded into BigQuery")
@Default.String("none")
def getCompression: String
def setCompression(value: String): Unit
@Description("The maximum number of output shards produced when writing")
@Default.Integer(1)
def getNumShards: Int
def setNumShards(value: Int): Unit
}
这是我启动模板的方式:
gcloud dataflow jobs run storage \
--gcs-location gs://bucket/templates/Storage \
--parameters runner=DataflowRunner,project=project,streaming=true,inputSubscription=projects/project/subscriptions/sub,outputDirectory=gs://bucket/
以下是我在没有模板的情况下启动作业的方式:
./storage \
--runner=DataFlowRunner \
--project=project \
--streaming=true \
--gcpTempLocation=gs://tmp-bucket/ \
--inputSubscription=projects/project/subscriptions/sub \
--outputDirectory=gs://bucket/
解决方案
正如@GuillemXercavins 评论所说,参数必须使用interfaceValueProvider
作为它们的 type。这将允许在运行时设置或使用管道选项,这就是导致问题的原因。
值得指出的是,正如您在评论中所做的那样,这ValueProvider
似乎在 Scio 中不受支持。
编辑:
@BenFradet在下面的评论中提供的Scio 示例。
推荐阅读
- ruby-on-rails - 我收到此错误:为数据库适配器指定了“sqlite3”,但未加载 gem
- javascript - 如何删除 2 个数组上的相同值?
- javascript - If, Else if, Else - 不输出“else”代码
- c# - 公共变量在 Inspector 中工作,但不在场景或游戏窗口中显示
- python - 在一行中遍历两个系列,然后应用逻辑
- html - 缩小工具删除的空白会影响 HTML 的外观
- laravel - 在 ubuntu 和 nginx 上部署 laravel api rest
- ios - 苹果手表上 facebook Messenger 的自定义回复
- c# - 我收到与母版页相关的解析错误
- elastic-stack - 通过 id 相关性从事件数据中计算指标