首页 > 解决方案 > groovy 脚本在 nifi executescript 处理器中不起作用

问题描述

我正在尝试通过 executescript 处理器执行某些操作;里面有一个时髦的代码。在代码中,我试图创建一个 scala 脚本,该脚本将在另一个处理器中的 spark 上执行。

// Get flow file
def flowFile = session.get()
if (!flowFile) return
// Create output directory
def userInputDir = flowFile.getAttribute("user.input.path")
def finalFolder = new File(userInputDir + "/" + innerDir)

try
{
if (!finalFolder.exists()) finalFolder.mkdirs()
// Write script
file = "spark.sqlContext.setConf(\"hive.exec.dynamic.partition\", \"true\")\n"
file = file + "spark.sqlContext.setConf(\"hive.exec.dynamic.partition.mode\", \"nonstrict\")\n"
file = file + "import org.apache.spark.sql._"
file = file + "\n"
file = file + "import java.io._"
file = file + "\n"

} 。. . 其余的其他步骤是向script变量添加一些其他火花特定的命令。脚本很大,因此跳过了完整的代码粘贴。最后,以 catch 结束

// Output file path
flowFile = session.putAttribute(flowFile, "generatedScript", scalaFile.getCanonicalPath())
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e)
{
 log.info("File: {}\n", finalFolder.file)
 session.transfer(flowFile, REL_FAILURE)
}

处理器甚至没有开始执行 groovy 脚本,它失败并出现以下错误:

groovy.lang.MissingPropertyException: No such property: script for calss: javal.io.File

语句“甚至没有开始启动”意味着前一个队列不为空并且处理器抛出错误。我猜这是一个语法问题,但我没有在脚本中找到任何相关的语法问题。我还尝试在本地机器的 groovy shell 中运行脚本,并且在那里也有同样的错误,但没有语法问题。

谷歌搜索错误让我建议在脚本中包含导入,但即使在包含相关导入之后,错误也是一样的。

有什么线索吗?

标签: groovyapache-nifi

解决方案


您指的是未在任何地方定义的变量“innerDir”。您是否希望用户向 ExecuteScript 添加一个名为 innerDir 的用户定义属性?如果是这样,则脚本中的 innerDir 变量是一个 PropertyValue 对象,因此您需要对其调用 getValue() 以获取该属性的实际值:

innerDir.value

您还指的是 scalaFile.getCanonicalPath() 但上面未定义 scalaFile ,并且 getCanonicalPath() 不会给您脚本的内容,这是您的意思吗?

我重新编写了上面的部分脚本,假设 innerDir 是用户定义的属性,并且您将文件变量的内容写入 scalaFile 指向的 File;我也通过使用heredoc而不是附加到文件变量来使它更加Groovy:

// Get flow file
def flowFile = session.get()
if (!flowFile) return
// Create output directory
def userInputDir = flowFile.getAttribute("user.input.path")
def finalFolder = new File(userInputDir + "/" + innerDir?.value ?: '')

try
{
  if (!finalFolder.exists()) finalFolder.mkdirs()
  // Write script
  file = 
"""
  spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
  spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
  import org.apache.spark.sql._
  import java.io._
"""
  scalaFile = new File(finalFolder, 'script.scala')
  scalaFile.withWriter {w -> w.write(file)}
  // Output file path
  flowFile = session.putAttribute(flowFile, "generatedScript", scalaFile.canonicalPath)
  session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
  log.info("File: {}\n", finalFolder.file)
  session.transfer(flowFile, REL_FAILURE)
}

推荐阅读