https - RXJS 队列每秒累积数据值并在每 x 秒后调度
问题描述
我正在尝试创建一个智能队列,该队列每秒或每次调用时收集数据,.next()
并且在每 x 秒后(例如 x=5),它调度current
队列中的所有项目,同时仍接收新项目。
整个想法是每隔x=5
几秒轮询一次对服务器的 http 请求,这样就不是每秒发送这些项目,而是一次发送一批项目。
下面的片段是我尝试过的。我正在使用扫描运算符累积值,但我需要每 x=5 秒从队列中获取的组合。
import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, mergeMap, map, delay, scan } from 'rxjs/operators'
import { Video, seconds } from '../types'
interface Queue {
addMoment: (data: { t: seconds }) => void
unsubscribe: () => void
}
type Moment = {
t: seconds
}
const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000
export class MomentsQueue implements Queue {
private queue = new Subject()
private observer: Observable<any>
private subscriber: Subscription | undefined
constructor(private video: Video) {
this.observer = this.queue.pipe(
map((payload) => {
console.log(payload)
return payload as Moment
}),
scan((all: Moment[], current) => [...all, current], []),
concatMap(
(payload) =>
new Promise((resolve) => {
console.log({ payload })
resolve(true)
}),
),
)
}
private subscribe() {
this.subscriber = this.observer.subscribe()
// ?? this.unsubscribe() // based on any chosen event
}
unsubscribe() {
this.subscriber?.unsubscribe()
}
addMoment(data: Moment) {
if (!this.subscriber || this.subscriber.closed) this.subscribe()
this.queue.next({ t: data.t })
}
}
export default MomentsQueue
解决方案
我能够使用@TalOhania 的帮助解决这个问题。请参阅下面代码片段中的解决方案:
import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, map, bufferTime } from 'rxjs/operators'
import { Video, seconds } from '../types'
interface Queue {
addMoment: (data: { t: seconds }) => void
unsubscribe: () => void
}
type Moment = {
t: seconds
}
const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000
export class MomentsQueue implements Queue {
private queue = new Subject()
private observer: Observable<any>
private subscriber: Subscription | undefined
constructor(private video: Video) {
this.observer = this.queue.pipe(
map((payload) => payload as Moment),
bufferTime(INTERVAL),
concatMap(
(payload) =>
new Promise((resolve) => {
// send the payload to the server.
console.log({ payload })
resolve(true)
}),
),
)
}
private subscribe() {
this.subscriber = this.observer.subscribe()
// ?? this.unsubscribe() // based on any chosen event
}
unsubscribe() {
this.subscriber?.unsubscribe()
}
addMoment(data: Moment) {
if (!this.subscriber || this.subscriber.closed) this.subscribe()
this.queue.next({ t: data.t })
}
}
export default MomentsQueue
推荐阅读
- php - WordPress重定向用户权限不足
- python - 如何将 python dict 转换为 pandas 数据框
- git - 存储库中的 GitHub 安全问题
- drag-and-drop - Meziantou Drag n Drop Blazor:CS1061“InputFile”不包含“Element”的定义
- c - 为什么我的代码不适用于我的 VS 代码和代码块?
- android - 元素在约束布局中出现在屏幕左侧
- python - QTableWidget连接多个表中的行选择
- python - 如何更快地更新window win32gui?
- html - 盒子阴影在页面结束前停止
- reactjs - Next.js 与 FC 孩子的链接必须通过 ref