apache-camel - Apache Camel - 创建自定义组件/端点?
问题描述
我需要从 Websocket 消费消息,但是在消费数据之前我必须做一些逻辑,所以我不能使用 Webscoket 组件。
我有一个 java 代码在这个 Websocket 中进行身份验证并订阅一个“传感器”来接收数据。
我可以创建一个使用此代码的骆驼组件,from()
并且每次我收到新数据onNext()
时骆驼都会启动该过程吗?
WebSocket webSocket = new WebSocket(uri, apiKey, (api, authenthication) -> {
console.println("Authenticated successfully as " + authenthication.getUserName());
String[] sensors = {sensorId};
api.getMetrics(sensors).subscribe(metrics -> {
Metric[] allMetrics = metrics.get(sensorId);
Arrays.sort(allMetrics, (metric1, metric2) -> metric1.getId().compareTo(metric2.getId()));
Metric firstMetric = allMetrics[0];
console.println("Metric: " + firstMetric.getDisplayName());
String metricId = firstMetric.getId();
String[] metric = {metricId};
api.getUnits(metric).subscribe(units -> {
Unit unit = units.get(metric[0])[0];
console.println("Unit: " + unit.getName());
Instant now = Instant.now();
Instant aMinuteAgo = now.minus(timeInterval, ChronoUnit.SECONDS);
Date start = Date.from(aMinuteAgo);
Date end = Date.from(now);
api.getData(sensorId, metricId, unit.getId(), emptyMap(), start, end).subscribe(new DisposableObserver<Data>() {
@Override
public void onNext(Data data) {
console.println("Data from last " + timeInterval + " seconds: ");
console.println(data.getData());
}
@Override
public void onComplete() {
console.println("Data update:");
Disposable subscription = api.subscribeData(sensors, metricId, unit.getId()).subscribe(updates -> {
console.println(updates.getData());
});
ScheduledExecutorService scheduler = newSingleThreadScheduledExecutor(daemonThreadFactory);
scheduler.schedule(subscription::dispose, cancelDelay, SECONDS);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
});
});
});
console.println("Connection was closed by server.");
}
解决方案
推荐阅读
- javascript - Electron webview html字符串视频调整大小
- gpflow - GPflow AdamOptimizer 问题
- python - 从 HSV 获取亮度值 - Python
- ios - 无法转换“UIImage”类型的值?到预期的参数类型'[Any]'
- html - 如果类存在,请使用这些样式(SASS)
- c# - 何时在具有默认实现的接口上使用抽象类?
- r - 在 R(终端)和 Rstudio 中使用 lm() 会给出具有相同输入数据的不同输出
- django - 如何在模型上添加到期日期功能?
- javascript - 如何为数组中的每个单击元素动态更改信息模式 innerHTML
- php - 如何将可重复的数据记录到数据库中