java - Spring 集成丰富了拆分
问题描述
假设我有一个产品,我需要用下载的图像 ID 来丰富这个产品。
作为输入消息,我有 java pojo。为简单起见,将其呈现为 json :
{
"id" : "productId",
"price" : 10000,
"productPhotos" : ["http://url1", "http://url2", ...],
"marketPhotos" : ["http://url1", "http://url2", ...]
}
我也有可轮询的香奈儿,可以=下载图像并将其放在存储中的某个位置并返回下载的照片ID
@Bean
public IntegrationFlow imageDownloadFlow() {
return IntegrationFlows.from(inputChannel())
.transform(Message.class, messageTransformer::transformToImageMassage, e -> e.poller(queuePoller()))
.transform(imageDownloader::download)
.transform(imageS3Uploader::upload)
.channel(outputChannel())
.get();
}
因此,我的任务是并行运行“productPhotos”和“marketPhotos”,并使用下载的 id 丰富产品消息。例如
{
"id" : "productId",
"price" : 10000,
"productPhotos" : ["id1", "id2", ...],
"marketPhotos" : ["id3", "id4", ...]
}
是否可以在 IntegrationFlows 中进行丰富?
解决方案
是的,在图像下载器的下游和聚合器中使用 a ContentEnricher
with a PublishSubscribeChannel
(with a task executor) 。requestChannel
使用enrich()
DSL 方法。
编辑
这是一个例子:
@SpringBootApplication
public class So57357544Application {
public static void main(String[] args) {
SpringApplication.run(So57357544Application.class, args);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(() -> new Pojo("one", 42, Collections.singletonList("https://localhost/foo"),
Collections.singletonList("https://localhost/bar")),
e -> e.poller(Pollers.fixedRate(50000)))
.enrich(enr -> enr.requestChannel("enricherFlow.input")
.<List<Pojo>>propertyFunction("productPhotos", msg -> {
List<String> photos = msg.getPayload().get(0).getProductPhotos();
photos.addAll(msg.getPayload().get(1).getProductPhotos());
return photos;
})
.<List<Pojo>>propertyFunction("marketPhotos", msg -> {
List<String> photos = msg.getPayload().get(0).getMarketPhotos();
photos.addAll(msg.getPayload().get(1).getMarketPhotos());
return photos;
}))
.log()
.get();
}
@Bean
public IntegrationFlow enricherFlow() {
return f -> f
.<Pojo, Pojo> transform(pojo -> new Pojo(pojo.getId(), pojo.getPrice(),
pojo.getProductPhotos(),
pojo.getMarketPhotos()))
.publishSubscribeChannel(exec(), ps -> ps
.applySequence(true)
.subscribe(f1 -> f1.handle("urlToId", "product").channel("aggregator.input"))
.subscribe(f1 -> f1.handle("urlToId", "market").channel("aggregator.input")));
}
@Bean
public IntegrationFlow aggregator() {
return f -> f.aggregate();
}
@Bean
public Executor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(2);
return exec;
}
}
@Component
class UrlToId {
public Pojo product(Pojo pojo) {
List<String> productPhotos = pojo.getProductPhotos().stream()
.map(phot -> phot.substring(phot.lastIndexOf('/')))
.collect(Collectors.toList());
return new Pojo(pojo.getId(), pojo.getPrice(), productPhotos, Collections.emptyList());
}
public Pojo market(Pojo pojo) {
List<String> marketPhotos = pojo.getMarketPhotos().stream()
.map(phot -> phot.substring(phot.lastIndexOf('/')))
.collect(Collectors.toList());
return new Pojo(pojo.getId(), pojo.getPrice(), Collections.emptyList(), marketPhotos);
}
}
class Pojo {
private final String id;
private final int price;
private final List<String> productPhotos = new ArrayList<>();
private final List<String> marketPhotos = new ArrayList<>();
public Pojo(String id, int price, List<String> productPhotes, List<String> marketPhotos) {
this.id = id;
this.price = price;
setProductPhotos(productPhotes);
setMarketPhotos(marketPhotos);
}
public String getId() {
return this.id;
}
public int getPrice() {
return this.price;
}
public List<String> getProductPhotos() {
return new ArrayList<>(this.productPhotos);
}
public List<String> getMarketPhotos() {
return new ArrayList<>(this.marketPhotos);
}
public final void setProductPhotos(List<String> photos) {
if (photos.size() > 0) {
this.productPhotos.clear();
this.productPhotos.addAll(photos);
}
}
public final void setMarketPhotos(List<String> photos) {
if (photos.size() > 0) {
this.marketPhotos.clear();
this.marketPhotos.addAll(photos);
}
}
@Override
public String toString() {
return "Pojo [id=" + this.id + ", price=" + this.price
+ ", productPhotos=" + this.productPhotos
+ ", marketPhotos=" + this.marketPhotos + "]";
}
}
推荐阅读
- c# - XDocument 在一行中写入特定的 XElement
- reactjs - TypeError: date[("get" + method)] 不是 React-big-calendar 中的函数
- react-native - 导航抽屉未打开且未找到 toggleDrawer
- mysql - 从最后一个 WHERE 条件获取结果
- ios - 无法为 ios 应用设置深度链接或通用链接
- python - 防止窗口最小化或关闭
- groovy - 在 Groovy 中使用特殊字符创建强密码
- r - 没有 scales="free" 的 facet_wrap 中的单个轴标签
- javascript - “不应使用命名函数表达式”的 SonarError 的解决方法
- json - 将配置值从 json 文件导入 scss