首页 > 解决方案 > 使用 lamda 进行 Pentaho ETL 转换

问题描述

是否可以使用 AWS Lamda 函数运行 Pentaho ETL 作业/转换?

我在 Windows 服务器上按计划运行 Pentaho ETL 作业,我们计划迁移到 AWS。我正在考虑 Lambda 函数。只是想了解是否可以使用 AWS Lamdba 安排 Pentaho ETL 作业

标签: amazon-web-servicesaws-lambdapentahopentaho-data-integration

解决方案


这是我能够在 AWS Lambda 函数中成功运行的代码片段。

从 AWS Lambda 函数调用 handleRequest 函数

    public Integer handleRequest(String input, Context context) {
        parseInput(input);
        return executeKtr(transName);

    }

parseInput:该函数用于解析出 Lambda 函数传递的字符串参数,以提取 KTR 名称及其带值的参数。输入格式为“ktrfilename param1=value1 param2=value2”

    public static void parseInput(String input) {
        String[] tokens = input.split(" ");     
        transName = tokens[0].replace(".ktr", "") + ".ktr";
        for (int i=1; i<tokens.length; i++) {
            params.add(tokens[i]);
        }
    }

执行 KTR:我正在使用 git repo 存储我所有的 KTR 文件,并根据作为参数传递的名称执行 KTR

    public static Integer executeKtr(String ktrName) {
        try {
            System.out.println("Present Project Directory : " + System.getProperty("user.dir"));

            String transName = ktrName.replace(".ktr", "") + ".ktr";
            String gitURI = awsSSM.getParaValue("kattle-trans-git-url");
            String repoLocalPath = clonePDIrepo.cloneRepo(gitURI);

            String path = new File(repoLocalPath + "/" + transName).getAbsolutePath();
            File ktrFile = new File(path);

            System.out.println("KTR Path: " + path);
            try {
                /**
                 * IMPORTANT NOTE FOR LAMBDA FUNCTION MUST CREATE .KEETLE DIRECOTRY OTHERWISE
                 * CODE WILL FAIL IN LAMBDA FUNCTION WITH ERROR CANT CREATE
                 * .kettle/kettle.properties file.
                 * 
                 * ALSO SET ENVIRNOMENT VARIABLE ON LAMBDA FUNCTION TO POINT
                 * KETTLE_HOME=/tmp/.kettle
                 */
                Files.createDirectories(Paths.get("/tmp/.kettle"));
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException("Error Creating /tmp/.kettle directory");

            }

            if (ktrFile.exists()) {
                KettleEnvironment.init();
                TransMeta metaData = new TransMeta(path);
                Trans trans = new Trans(metaData);
//           SETTING PARAMETERS
                trans = parameterSetting(trans);

                trans.execute( null );
                trans.waitUntilFinished();
                if (trans.getErrors() > 0) {
                    System.out.print("Error Executing transformation");
                    throw new RuntimeException("There are errors in running transformations");
                } else {
                    System.out.print("Successfully Executed Transformation");
                    return 1;
                }
            } else {
                System.out.print("KTR File:" + path + " not found in repo");
                throw new RuntimeException("KTR File:" + path + " not found in repo");
            }

        } catch (KettleException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }

parameterSetting:如果 KTR 接受参数并且在调用 AWS Lambda 函数时传递,则使用 parameterSetting 函数进行设置。

    public static Trans parameterSetting(Trans trans) {
        String[] transParams = trans.listParameters();

        for (String param : transParams) {
            for (String p: params) {
                String name = p.split("=")[0];
                String val = p.split("=")[1];
                
                if (name.trim().equals(param.trim())) {
                    try {
                        System.out.println("Setting Parameter:"+ name + "=" + val);
                        trans.setParameterValue(name, val);
                    } catch (UnknownParamException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }
        trans.activateParameters();
        return trans;
    }

克隆GitRepo:

public class clonePDIrepo {
    /**
     * Clones the given repo to local folder
     *
     * @param pathWithPwd Gir repo URL with access token included in the url. e.g.
     *                    https://token_name:token_value@github.com/ktr-git-repo.git
     * @return returns Local Repository String Path
     */
    public static String cloneRepo(String pathWithPwd) {
        try {
            /**
             * CREATING TEMP DIR TO AVOID FOLDER EXISTS ERROR, THIS TEMP DIRECTORY LATER CAN
             * BE USED TO GET ABSOLETE PATH FOR FILES IN DIRECTORY
             */
            File pdiLocalPath = Files.createTempDirectory("repodir").toFile();
            Git git = Git.cloneRepository().setURI(pathWithPwd).setDirectory(pdiLocalPath).call();
            System.out.println("Git repository cloned successfully");
            System.out.println("Local Repository Path:" + pdiLocalPath.getAbsolutePath());

            // }
            return pdiLocalPath.getAbsolutePath();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

AWSSSMgetParaValue:获取传递参数的字符串值。

    public static String getParaValue(String paraName) {

        try {
            Region region = Region.US_EAST_1;
            SsmClient ssmClient = SsmClient.builder()
                    .region(region)
                    .build();

            
            GetParameterRequest parameterRequest = GetParameterRequest.builder()
                .name(paraName)
                .withDecryption(true)
                .build();
            GetParameterResponse parameterResponse = ssmClient.getParameter(parameterRequest);
            
            System.out.println(paraName+ " value retreived from AWS SSM");
            ssmClient.close();
            return parameterResponse.parameter().value();

        } catch (SsmException e) {
        System.err.println(e.getMessage());
        return null;
        }
   }

假设:

  1. Git 存储库是使用存储库根目录中的 KTR 文件创建的
  2. git repo url 存在于 aws SSM 上,具有用于克隆 repo 的有效令牌
  3. 输入字符串包含 KTR 文件的名称
  4. 环境变量在 Lambda 函数上配置为 KETTLE_HOME=/tmp/.kettle
  5. Lambda 函数对 SSM 和 S3 VPC 网络具有必要的权限
  6. 设置了适当的安全组规则以允许 KTR 文件所需的网络访问

我打算将完整的代码上传到 git。我将使用存储库的 URL 更新这篇文章。


推荐阅读