首页 > 解决方案 > 在 PGConnection.getNotifications 中获取大小

问题描述

我的 postgresql 数据库中的一个函数会在更新表时发送通知。我正在通过 scalikejdbc 轮询该 postgresql 数据库,以获取所有通知,然后对它们执行一些操作。此处解释了该过程。对 sql 表更新的典型反应式系统。我从 java.sql.Connection 获得 PGConnection。而且,在那之后,我以这种方式收到通知:

val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())

我试图通过将获取大小设置为 1000 并禁用自动提交来获取 1000 块的通知。但 fetch size 属性被忽略。

任何想法我怎么能做到这一点?我不想在通知数据集的单个地图中处理数十万条通知。

pgConnection.getNotifications.size 可能很大,因此,此代码无法很好地扩展。

谢谢!!!

标签: postgresqlakkascalikejdbc

解决方案


为了更好地扩展,考虑使用postgresql-asyncAkka Streams:前者是一个可以异步获取 PostgreSQL 通知的库,而前者是一个提供背压的Reactive Streams实现(这将消除对分页的需要)。例如:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser

import scala.concurrent.duration._
import scala.concurrent.Await

class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
  private implicit val ec = context.system.dispatcher

  val queue =  
    Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
      .to(Sink.foreach(println))
      .run()

  val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
  val connection = new PostgreSQLConnection(configuration)
  Await.result(connection.connect, 5 seconds)

  connection.sendQuery("LISTEN my_channel")
  connection.registerNotifyListener { message =>
    val msg = message.payload
    log.debug("Sending the payload: {}", msg)
    self ! msg
  }

  def receive = {
    case payload: String =>
      queue.offer(payload).pipeTo(self)
    case QueueOfferResult.Dropped =>
      log.warning("Dropped a message.")
    case QueueOfferResult.Enqueued =>
      log.debug("Enqueued a message.")
    case QueueOfferResult.Failure(t) =>
      log.error("Stream failed: {}", t.getMessage)
    case QueueOfferResult.QueueClosed =>
      log.debug("Stream closed.")
  }
}

上面的代码只是在 PostgreSQL 发生通知时打印它们;您可以将 替换为Sink.foreach(println)另一个Sink. 要运行它:

import akka.actor._
import akka.stream.ActorMaterializer

object Example extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  system.actorOf(Props(classOf[DbActor], materializer))
}

推荐阅读