amazon-web-services - 使用 lamda 进行 Pentaho ETL 转换
问题描述
是否可以使用 AWS Lamda 函数运行 Pentaho ETL 作业/转换?
我在 Windows 服务器上按计划运行 Pentaho ETL 作业,我们计划迁移到 AWS。我正在考虑 Lambda 函数。只是想了解是否可以使用 AWS Lamdba 安排 Pentaho ETL 作业
解决方案
这是我能够在 AWS Lambda 函数中成功运行的代码片段。
从 AWS Lambda 函数调用 handleRequest 函数
public Integer handleRequest(String input, Context context) {
parseInput(input);
return executeKtr(transName);
}
parseInput:该函数用于解析出 Lambda 函数传递的字符串参数,以提取 KTR 名称及其带值的参数。输入格式为“ktrfilename param1=value1 param2=value2”
public static void parseInput(String input) {
String[] tokens = input.split(" ");
transName = tokens[0].replace(".ktr", "") + ".ktr";
for (int i=1; i<tokens.length; i++) {
params.add(tokens[i]);
}
}
执行 KTR:我正在使用 git repo 存储我所有的 KTR 文件,并根据作为参数传递的名称执行 KTR
public static Integer executeKtr(String ktrName) {
try {
System.out.println("Present Project Directory : " + System.getProperty("user.dir"));
String transName = ktrName.replace(".ktr", "") + ".ktr";
String gitURI = awsSSM.getParaValue("kattle-trans-git-url");
String repoLocalPath = clonePDIrepo.cloneRepo(gitURI);
String path = new File(repoLocalPath + "/" + transName).getAbsolutePath();
File ktrFile = new File(path);
System.out.println("KTR Path: " + path);
try {
/**
* IMPORTANT NOTE FOR LAMBDA FUNCTION MUST CREATE .KEETLE DIRECOTRY OTHERWISE
* CODE WILL FAIL IN LAMBDA FUNCTION WITH ERROR CANT CREATE
* .kettle/kettle.properties file.
*
* ALSO SET ENVIRNOMENT VARIABLE ON LAMBDA FUNCTION TO POINT
* KETTLE_HOME=/tmp/.kettle
*/
Files.createDirectories(Paths.get("/tmp/.kettle"));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error Creating /tmp/.kettle directory");
}
if (ktrFile.exists()) {
KettleEnvironment.init();
TransMeta metaData = new TransMeta(path);
Trans trans = new Trans(metaData);
// SETTING PARAMETERS
trans = parameterSetting(trans);
trans.execute( null );
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
System.out.print("Error Executing transformation");
throw new RuntimeException("There are errors in running transformations");
} else {
System.out.print("Successfully Executed Transformation");
return 1;
}
} else {
System.out.print("KTR File:" + path + " not found in repo");
throw new RuntimeException("KTR File:" + path + " not found in repo");
}
} catch (KettleException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
parameterSetting:如果 KTR 接受参数并且在调用 AWS Lambda 函数时传递,则使用 parameterSetting 函数进行设置。
public static Trans parameterSetting(Trans trans) {
String[] transParams = trans.listParameters();
for (String param : transParams) {
for (String p: params) {
String name = p.split("=")[0];
String val = p.split("=")[1];
if (name.trim().equals(param.trim())) {
try {
System.out.println("Setting Parameter:"+ name + "=" + val);
trans.setParameterValue(name, val);
} catch (UnknownParamException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
trans.activateParameters();
return trans;
}
克隆GitRepo:
public class clonePDIrepo {
/**
* Clones the given repo to local folder
*
* @param pathWithPwd Gir repo URL with access token included in the url. e.g.
* https://token_name:token_value@github.com/ktr-git-repo.git
* @return returns Local Repository String Path
*/
public static String cloneRepo(String pathWithPwd) {
try {
/**
* CREATING TEMP DIR TO AVOID FOLDER EXISTS ERROR, THIS TEMP DIRECTORY LATER CAN
* BE USED TO GET ABSOLETE PATH FOR FILES IN DIRECTORY
*/
File pdiLocalPath = Files.createTempDirectory("repodir").toFile();
Git git = Git.cloneRepository().setURI(pathWithPwd).setDirectory(pdiLocalPath).call();
System.out.println("Git repository cloned successfully");
System.out.println("Local Repository Path:" + pdiLocalPath.getAbsolutePath());
// }
return pdiLocalPath.getAbsolutePath();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
AWSSSMgetParaValue:获取传递参数的字符串值。
public static String getParaValue(String paraName) {
try {
Region region = Region.US_EAST_1;
SsmClient ssmClient = SsmClient.builder()
.region(region)
.build();
GetParameterRequest parameterRequest = GetParameterRequest.builder()
.name(paraName)
.withDecryption(true)
.build();
GetParameterResponse parameterResponse = ssmClient.getParameter(parameterRequest);
System.out.println(paraName+ " value retreived from AWS SSM");
ssmClient.close();
return parameterResponse.parameter().value();
} catch (SsmException e) {
System.err.println(e.getMessage());
return null;
}
}
假设:
- Git 存储库是使用存储库根目录中的 KTR 文件创建的
- git repo url 存在于 aws SSM 上,具有用于克隆 repo 的有效令牌
- 输入字符串包含 KTR 文件的名称
- 环境变量在 Lambda 函数上配置为 KETTLE_HOME=/tmp/.kettle
- Lambda 函数对 SSM 和 S3 VPC 网络具有必要的权限
- 设置了适当的安全组规则以允许 KTR 文件所需的网络访问
我打算将完整的代码上传到 git。我将使用存储库的 URL 更新这篇文章。
推荐阅读
- reactjs - react-string-replace 不适用于正则表达式
- python - Python - 查找 Twitter 句柄
- caching - 服务工作者不返回缓存的响应,除非 url 末尾包含斜杠
- roku - 如何在另一个对话框之上显示一个对话框?
- istio - 应用程序之间 mTLS 的最佳多集群配置
- reactjs - 带有 Webflux 的 Spring Boot:请求的资源上不存在“Access-Control-Allow-Origin”标头
- python - 我在使用循环期间更新的 y 值更新简单 xy 图时遇到问题
- javascript - 使用从 reddit 获取的图像预览链接时出现 Guru 调解 403 错误
- react-native - 我该如何解决这个烦人的错误?“console.error 向您的开发环境发送日志消息时出现问题...”
- vba - 设置日期字段验证规则会导致错误 3309“属性值太大”。