首页 > 解决方案 > 如何在 InvokeScriptedProcessor 中使用“DBCPConnectionPoolLookup”?

问题描述

我正在用 groovy 编写一个 InvokeScriptedProcessor (ISP),我设置了“DBCPConnectionPoolLookup”控制器并使用“UpdateAttribute”设置了 database.name。但我得到这个错误:

由于 java.lang.UnsupportedOperationException 无法处理会话:无法查找没有属性的 DBCPConnectionPool;处理器在管理上产生了 1 秒:java.lang.RuntimeException:java.lang.UnsupportedOperationException:无法查找没有属性的 DBCPConnectionPool

如何使用“DBCPConnectionPoolLookup”连接到选定的数据库?

错误

配置

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import groovy.sql.Sql

import groovy.sql.GroovyRowResult;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

import java.nio.charset.StandardCharsets

import org.apache.commons.io.IOUtils

import org.apache.nifi.annotation.behavior.EventDriven
import org.apache.nifi.annotation.documentation.CapabilityDescription
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.processor.util.StandardValidators

@EventDriven
@CapabilityDescription("Execute a series of JDBC queries adding the results to each JSON presented in the FlowFile")
class GroovyProcessor implements Processor {

    def log

    final static Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description('FlowFiles that were successfully processed and had any data enriched are routed here')
            .build()

    final static Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description('FlowFiles that were not successfully processed are routed here')
            .build()

    Set<Relationship> getRelationships() { [REL_FAILURE, REL_SUCCESS] as Set }

    final static PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
            .name("dbcp-connection-pool-services")
            .displayName("Database Connection Pool Services")
            .description("The Controller Service that is used to obtain a connection to the database.")
            .required(true)
            .identifiesControllerService(DBCPService)
            .build()
    final static PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder()
            .name("clientId")
            .displayName("clientId")
            .description("Value to be used in queries.")
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .required(true)
            .expressionLanguageSupported(true)
            .build()

    @Override
    List<PropertyDescriptor> getPropertyDescriptors() {
        Collections.unmodifiableList([DBCP_SERVICE]) as List<PropertyDescriptor>
    }

    void initialize(ProcessorInitializationContext context) { log = context.logger }

    public static GroovyRowResult toRowResult(ResultSet rs) throws SQLException {
        ResultSetMetaData metadata = rs.getMetaData();
        Map<String, Object> lhm = new LinkedHashMap<String, Object>(metadata.getColumnCount(), 1);
        for (int i = 1; i <= metadata.getColumnCount(); i++) {
            lhm.put(metadata.getColumnLabel(i), rs.getObject(i));
        }
        return new GroovyRowResult(lhm);
    }

    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        def session = sessionFactory.createSession()
        def flowFile = session.get()
        if (!flowFile) return

        def dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService)
        def conn = dbcpService.getConnection()
        try {
            def sql = new Sql(conn)
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        def clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions(flowFile).value
                        def definitionId = flowFile.getAttribute('definition.id')
                        def jobId  = flowFile.getAttribute('job.id')
                        def q = """QUERY"""
                        def result = []

                        sql.eachRow(q) {
                            def temp = [:]
                            temp.header = [:]
                            temp.details = []
                            temp.RawData = [:]
                            temp.StagedExtractsUuid = it.uuid
                            temp.DataMapStatus=it.DataMapStatus
                            temp.RawData = new JsonSlurper().parseText(it.RawData)
                            sql.eachRow("QUERY") {
                                temp.header = toRowResult(it)
                                sql.eachRow("QUERY") {
                                    temp.details.add(toRowResult(it))
                                }
                            }
                            result.add(temp)
                        }
                        outputStream.write(new JsonBuilder(result).toString().getBytes('UTF-8'))
                    } as StreamCallback)
            session.transfer(flowFile, REL_SUCCESS)
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}', [this, t] as Object[])
            session.transfer(flowFile, REL_FAILURE)
        } finally {
            session.commit()
            conn.close()
        }
    }


    @Override
    Collection<ValidationResult> validate(ValidationContext context) { null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        switch (name) {
            case 'JSON Lookup attribute': return LOOKUP_ATTR
            case 'Database Connection Pool Services': return DBCP_SERVICE
            default: return null
        }
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {}

    @Override
    String getIdentifier() { null }

}

processor = new GroovyProcessor()

标签: groovyapache-nifi

解决方案


当您获得连接时,您需要传入流文件属性,否则它无法访问您的 database.name 属性。

所以而不是:

dbcpService.getConnection()

它应该是:

dbcpService.getConnection(flowFile.getAttributes()))

推荐阅读