groovy - 如何在 InvokeScriptedProcessor 中使用“DBCPConnectionPoolLookup”?
问题描述
我正在用 groovy 编写一个 InvokeScriptedProcessor (ISP),我设置了“DBCPConnectionPoolLookup”控制器并使用“UpdateAttribute”设置了 database.name。但我得到这个错误:
由于 java.lang.UnsupportedOperationException 无法处理会话:无法查找没有属性的 DBCPConnectionPool;处理器在管理上产生了 1 秒:java.lang.RuntimeException:java.lang.UnsupportedOperationException:无法查找没有属性的 DBCPConnectionPool
如何使用“DBCPConnectionPoolLookup”连接到选定的数据库?
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import groovy.sql.Sql
import groovy.sql.GroovyRowResult;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import org.apache.nifi.annotation.behavior.EventDriven
import org.apache.nifi.annotation.documentation.CapabilityDescription
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.processor.util.StandardValidators
@EventDriven
@CapabilityDescription("Execute a series of JDBC queries adding the results to each JSON presented in the FlowFile")
class GroovyProcessor implements Processor {
def log
final static Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed and had any data enriched are routed here')
.build()
final static Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
Set<Relationship> getRelationships() { [REL_FAILURE, REL_SUCCESS] as Set }
final static PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("dbcp-connection-pool-services")
.displayName("Database Connection Pool Services")
.description("The Controller Service that is used to obtain a connection to the database.")
.required(true)
.identifiesControllerService(DBCPService)
.build()
final static PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder()
.name("clientId")
.displayName("clientId")
.description("Value to be used in queries.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(true)
.build()
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
Collections.unmodifiableList([DBCP_SERVICE]) as List<PropertyDescriptor>
}
void initialize(ProcessorInitializationContext context) { log = context.logger }
public static GroovyRowResult toRowResult(ResultSet rs) throws SQLException {
ResultSetMetaData metadata = rs.getMetaData();
Map<String, Object> lhm = new LinkedHashMap<String, Object>(metadata.getColumnCount(), 1);
for (int i = 1; i <= metadata.getColumnCount(); i++) {
lhm.put(metadata.getColumnLabel(i), rs.getObject(i));
}
return new GroovyRowResult(lhm);
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session = sessionFactory.createSession()
def flowFile = session.get()
if (!flowFile) return
def dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService)
def conn = dbcpService.getConnection()
try {
def sql = new Sql(conn)
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions(flowFile).value
def definitionId = flowFile.getAttribute('definition.id')
def jobId = flowFile.getAttribute('job.id')
def q = """QUERY"""
def result = []
sql.eachRow(q) {
def temp = [:]
temp.header = [:]
temp.details = []
temp.RawData = [:]
temp.StagedExtractsUuid = it.uuid
temp.DataMapStatus=it.DataMapStatus
temp.RawData = new JsonSlurper().parseText(it.RawData)
sql.eachRow("QUERY") {
temp.header = toRowResult(it)
sql.eachRow("QUERY") {
temp.details.add(toRowResult(it))
}
}
result.add(temp)
}
outputStream.write(new JsonBuilder(result).toString().getBytes('UTF-8'))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch (final Throwable t) {
log.error('{} failed to process due to {}', [this, t] as Object[])
session.transfer(flowFile, REL_FAILURE)
} finally {
session.commit()
conn.close()
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch (name) {
case 'JSON Lookup attribute': return LOOKUP_ATTR
case 'Database Connection Pool Services': return DBCP_SERVICE
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {}
@Override
String getIdentifier() { null }
}
processor = new GroovyProcessor()
解决方案
当您获得连接时,您需要传入流文件属性,否则它无法访问您的 database.name 属性。
所以而不是:
dbcpService.getConnection()
它应该是:
dbcpService.getConnection(flowFile.getAttributes()))
推荐阅读
- sequelize.js - 如何使用流类型正确注释 Sequelize 模型?
- javascript - d3 - 访问我点击的区域后面的数据
- jira - 如何在 Jira 票证中突出显示内联代码/文本?
- javascript - JS在有边距的画布上绘制时使用错误的坐标
- c++ - 有人可以解释速记赋值运算符的实际工作原理吗?
- c# - 如果它们重复并且需要全部输出,如何输出最大的数组元素
- .net - C++/CLI .Net 程序集不会在 LabVIEW 中加载。这是由于与 C++/CLI .Net 的根本不兼容还是会出现问题?
- python - 如何使用 seaborn 将一系列值映射到特定颜色?
- angularjs - Angularjs $http.get 返回 409 错误,同样适用于邮递员和 crul 命令
- scala - 使用 spark 解析具有多个行标签的 xml