c# - 适用于消费者的 Amazon Kinesis KCL 客户端无法在 .NET 中运行
问题描述
请帮忙。我在 .NET 控制台应用程序中为 Kinesis 数据流设置消费者时遇到问题。
我已经根据文档完成了所有操作,但是每当我运行消费者时,我仍然会得到一个空白的控制台屏幕。到目前为止,生产者工作正常,AWS 凭证正在工作。
- 我的系统上的 JDK 配置良好(对 Java 开发来说不是新手)
- 我已将所有必要的策略附加到我的 IAM 用户
- 我可以看到生产者可以使用相同的 AWS 凭证以编程方式创建流、描述流等
创建 KclProcess 时,我可以在 Program 中打断点,但我无法在下面的 KinesisTest 类中打任何断点
至于消费者,我创建了一个类 Program.cs,如下所示:
class Program
{
public static void Main(string[] args)
{
//added these lines after trying everything
Environment.SetEnvironmentVariable("AWS_ACCESS_KEY_ID", "***");
Environment.SetEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "***");
Environment.SetEnvironmentVariable("AWS_REGION", "us-east-1");
try
{
KclProcess.Create(new KinesisTest()).Run();
}
catch (Exception e)
{
Console.Error.WriteLine("ERROR: " + e);
}
}
}
和另一个班级
public class KinesisTest: IRecordProcessor
{
private static readonly TimeSpan Backoff = TimeSpan.FromSeconds(3);
private static readonly TimeSpan CheckpointInterval = TimeSpan.FromMinutes(1);
private static readonly int NumRetries = 10;
/// <value>The shard ID on which this record processor is working.</value>
private string _kinesisShardId;
private DateTime _nextCheckpointTime = DateTime.UtcNow;
public void Initialize(InitializationInput input)
{
Console.Error.WriteLine("Initializing record processor for shard: " + input.ShardId);
this._kinesisShardId = input.ShardId;
}
public void ProcessRecords(ProcessRecordsInput input)
{
Console.Error.WriteLine("Processing " + input.Records.Count + " records from " + _kinesisShardId);
ProcessRecordsWithRetries(input.Records);
// Checkpoint once every checkpoint interval.
if (DateTime.UtcNow >= _nextCheckpointTime)
{
Checkpoint(input.Checkpointer);
_nextCheckpointTime = DateTime.UtcNow + CheckpointInterval;
}
}
public void Shutdown(ShutdownInput input)
{
Console.Error.WriteLine("Shutting down record processor for shard: " + _kinesisShardId);
// Checkpoint after reaching end of shard, so we can start processing data from child shards.
if (input.Reason == ShutdownReason.TERMINATE)
{
Checkpoint(input.Checkpointer);
}
}
private void ProcessRecordsWithRetries(List<Record> records)
{
foreach (Record rec in records)
{
bool processedSuccessfully = false;
string data = null;
for (int i = 0; i < NumRetries; ++i)
{
try
{
data = System.Text.Encoding.UTF8.GetString(rec.Data);
Console.Error.WriteLine( String.Format("Retrieved record:\n\tpartition key = {0},\n\tsequence number = {1},\n\tdata = {2}", rec.PartitionKey, rec.SequenceNumber, data));
// Your own logic to process a record goes here.
processedSuccessfully = true;
break;
}
catch (Exception e)
{
Console.Error.WriteLine("Exception processing record data: " + data, e);
}
//Back off before retrying upon an exception.
Thread.Sleep(Backoff);
}
if (!processedSuccessfully)
{
Console.Error.WriteLine("Couldn't process record " + rec + ". Skipping the record.");
}
}
}
private void Checkpoint(Checkpointer checkpointer)
{
Console.Error.WriteLine("Checkpointing shard " + _kinesisShardId);
checkpointer.Checkpoint(RetryingCheckpointErrorHandler.Create(NumRetries, Backoff));
}
}
最后是 kcl.properties 文件:
executableName = dotnet KinesisTest.dll
streamName = testStream
applicationName = KinesisTest
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
processingLanguage = C#
initialPositionInStream = TRIM_HORIZON
regionName = us-east-1
maxRecords = 5000
idleTimeBetweenReadsInMillis = 1000
# failoverTimeMillis = 10000
# workerId =
# shardSyncIntervalMillis = 60000
# callProcessRecordsEvenForEmptyRecordList = false
# parentShardPollIntervalMillis = 10000
# cleanupLeasesUponShardCompletion = true
# taskBackoffTimeMillis = 500
# metricsBufferTimeMillis = 10000
# metricsMaxQueueSize = 10000
# validateSequenceNumberBeforeCheckpointing = true
# maxActiveThreads = 0
如果我做错了什么,请告诉我。
我期待看到消费者处理流中的数据,但它只是一个空控制台
解决方案
虽然我从未找到这个问题的答案,但我找到了使用 lambda 更好、更合适的方法。
我的最终设置涉及使用 SNS 从 Kinesis 获取消息/事件,然后将消息散播给任何订阅者(在我的情况下,SQS 队列),然后将排队的消息提供给 lambdas。
像这样的东西:
SQS Queue ----------------> Lambda
^
/
/
/
KINESIS STREAM ------->SNS-------->SQS Queue ------> Lambda
\
\
\
>SQS Queue ----------------> Lambda
您可能会问为什么不将 Kinesis 直接连接到 SQS?或者为什么不直接 SNS 到 Lambda?
第一个问题的答案是 Kinesis 不是为多次读取而构建的。SNS 允许您阅读一次,然后根据需要将其分发给尽可能多的订阅者。如果您说有 20 个听众正在等待对某个事件采取行动,那么您不能将他们全部插入 Kinesis。所以去SNS。
第二个问题的答案是,SNS 有时可能会丢失消息并且不会传递或触发 lambda(非常正确)。但是(根据经验推测)如果将其连接到 SQS,此概率会降低,它几乎总是会出现在队列中。然后队列可以触发 lambdas。无论如何,它们只有一项工作。
所以我希望这对某人有帮助。
推荐阅读
- django - 保存到数据库后如何获取对象
- wordpress - 按尺寸过滤未显示 WooCommerce 中过滤器下的所有产品
- linux - 有选择地为特定参数记录内核 Ftrace 点
- java - 为什么 Java 将某些 IPv6 地址解释为 IPv4?
- react-native - 如何在 React Native 中使用 apisauce 下载 pdf 文件?
- javascript - Shopify API - 使用选择字段使用 javascript 更新所选变体
- python - 为什么在尝试获取 txt 文件的文件路径时出现“权限被拒绝”?
- flutter - 平台不允许不安全的 HTTP
- typescript - 在 TypeScript 中键入构造函数
- authentication - Azure AD B2C MSAL 身份验证问题