akka - Akka Streams 高 CPU 使用率和创建的线程过多
问题描述
我有一个使用 akka 流的消费者,我们正在使用来自 kafka 的数据包。线程的数量有数千并且一直在增加。
尝试进行线程转储,其中我们发现了两种具有最大计数的线程:
以下是配置文件
akka.kafka.consumer {
# Tuning property of scheduled polls.
# Controls the interval from one scheduled poll to the next.
poll-interval = 250ms
# Tuning property of the `KafkaConsumer.poll` parameter.
# Note that non-zero value means that the thread that
# is executing the stage will be blocked. See also the `wakup-timeout` setting below.
poll-timeout = 50ms
# The stage will delay stopping the internal actor to allow processing of
# messages already in the stream (required for successful committing).
# Prefer use of `DrainingControl` over a large stop-timeout.
stop-timeout = 30s
# Duration to wait for `KafkaConsumer.close` to finish.
close-timeout = 20s
# If offset commit requests are not completed within this timeout
# the returned Future is completed `CommitTimeoutException`.
# The `Transactional.source` waits this amount of time for the producer to mark messages as not
# being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
commit-timeout = 15s
# If commits take longer than this time a warning is logged
commit-time-warning = 1s
# Not used anymore (since 1.0-RC1)
# wakeup-timeout = 3s
# Not used anymore (since 1.0-RC1)
# max-wakeups = 10
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-interval = infinite
# Not used anymore (since 1.0-RC1)
# wakeup-debug = true
# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
use-dispatcher = "akka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# Disable auto-commit by default
enable.auto.commit = false
}
# Time to wait for pending requests when a partition is closed
wait-close-partition = 500ms
# Limits the query to Kafka for a topic's position
position-timeout = 5s
# When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
# call to Kafka's API
offset-for-times-timeout = 5s
# Timeout for akka.kafka.Metadata requests
# This value is used instead of Kafka's default from `default.api.timeout.ms`
# which is 1 minute.
metadata-request-timeout = 5s
# Interval for checking that transaction was completed before closing the consumer.
# Used in the transactional flow for exactly-once-semantics processing.
eos-draining-check-interval = 30ms
}
#####################################
# Akka Stream Reference Config File #
#####################################
akka {
stream {
# Default materializer settings
materializer {
# Initial size of buffers used in stream elements
initial-input-buffer-size = 4
# Maximum size of buffers used in stream elements
max-input-buffer-size = 8
# Fully qualified config path which holds the dispatcher configuration
# to be used by ActorMaterializer when creating Actors.
# When this value is left empty, the default-dispatcher will be used.
dispatcher = "akka.default-dispatcher"
# Cleanup leaked publishers and subscribers when they are not used within a given
# deadline
subscription-timeout {
# when the subscription timeout is reached one of the following strategies on
# the "stale" publisher:
# cancel - cancel it (via `onError` or subscribing to the publisher and
# `cancel()`ing the subscription right away
# warn - log a warning statement about the stale element (then drop the
# reference to it)
# noop - do nothing (not recommended)
mode = cancel
# time after which a subscriber / publisher is considered stale and eligible
# for cancelation (see `akka.stream.subscription-timeout.mode`)
timeout = 5s
}
# Enable additional troubleshooting logging at DEBUG log level
debug-logging = off
# Maximum number of elements emitted in batch if downstream signals large demand
output-burst-limit = 1000
# Enable automatic fusing of all graphs that are run. For short-lived streams
# this may cause an initial runtime overhead, but most of the time fusing is
# desirable since it reduces the number of Actors that are created.
# Deprecated, since Akka 2.5.0, setting does not have any effect.
auto-fusing = on
# Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
# buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed
# buffer upon stream materialization if the requested buffer size is less than this
# configuration parameter. The default is very high because failing early is better
# than failing under load.
#
# Buffers sized larger than this will dynamically grow/shrink and consume more memory
# per element than the fixed size buffers.
max-fixed-buffer-size = 1000000000
# Maximum number of sync messages that actor can process for stream to substream communication.
# Parameter allows to interrupt synchronous processing to get upsteam/downstream messages.
# Allows to accelerate message processing that happening withing same actor but keep system responsive.
sync-processing-limit = 1000
debug {
# Enables the fuzzing mode which increases the chance of race conditions
# by aggressively reordering events and making certain operations more
# concurrent than usual.
# This setting is for testing purposes, NEVER enable this in a production
# environment!
# To get the best results, try combining this setting with a throughput
# of 1 on the corresponding dispatchers.
fuzzing-mode = off
}
}
# Fully qualified config path which holds the dispatcher configuration
# to be used by ActorMaterializer when creating Actors for IO operations,
# such as FileSource, FileSink and others.
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
default-blocking-io-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 16
}
}
}
# configure overrides to ssl-configuration here (to be used by akka-streams, and akka-http – i.e. when serving https connections)
ssl-config {
protocol = "TLSv1.2"
}
}
ssl-config {
logger = "com.typesafe.sslconfig.akka.util.AkkaLoggerBridge"
}
akka.default-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 5
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 20
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 5
}
以下是最大线程数的错误:
一个是:
priority:5 - threadId:0x00007f9414008800 - nativeId:0x5575 - nativeId (十进制):21877 - state:WAITING stackTrace: java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - 停车在 akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 在 akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 等待 <0x0000000740597170> (aakka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)在 akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 锁定可拥有同步器:
其他是:
default-scheduler-1 priority:5 - threadId:0x00007f947805d800 - nativeId:0x638a - nativeId (decimal):25482 - state:TIMED_WAITING stackTrace: java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep( Native Method) at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:87) at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:271) at akka.actor.LightArrayRevolverScheduler$$anon$3.run( LightArrayRevolverScheduler.scala:241) at java.lang.Thread.run(Thread.java:745) 锁定的可拥有同步器:- 无
CPU 利用率将达到 800%。是什么导致了如此高的 CPU 使用率。请指教。
解决方案
推荐阅读
- sql - 在 ON 条件下加入多个案例的两个表
- reactjs - react-sortablejs - 列表不会更新,组件不会重新渲染
- javascript - 异步函数给我 _40: 0, _65: 0, _55: null, _72: null
- jupyter-notebook - 使用 iplantuml 库从文本文件描述生成植物序列图
- android - Android 如何实现基于箭头的布局?
- python - 如何使用python 3.8取消tkinter中关闭延迟计时器的after方法
- micronaut - 测试中应用程序引导的最佳实践
- cordova - cordova:更改基本 href 使应用程序“违反以下内容安全性”
- azure-active-directory - Azure AD 打造摇滚用户同步
- c# - Linq2db:按嵌套属性字段过滤