spring-boot - 无法使用 AbstractReactiveElasticsearchConfiguration 将数据写入 ElasticSearch
问题描述
我正在尝试将数据写入我的本地 Elasticsearch Docker 容器(7.4.2),为简单起见,我使用了 Spring 提供的 AbstractReactiveElasticsearchConfiguration 也覆盖了 entityMapper 函数。我构建了我的存储库,扩展了 ReactiveElasticsearchRepository 然后最后我使用我的自动装配存储库来 saveAll() 我包含数据的元素集合。但是 Elasticsearch 不会写入任何数据。我还有一个 REST 控制器,它开始我的整个过程,基本上什么都不返回,DeferredResult>
来自我的 ApiDelegateImpl 的 REST 方法
@Override
public DeferredResult<ResponseEntity<Void>> openUsageExporterStartPost() {
final DeferredResult<ResponseEntity<Void>> deferredResult = new DeferredResult<>();
ForkJoinPool.commonPool().execute(() -> {
try {
openUsageExporterAdapter.startExport();
deferredResult.setResult(ResponseEntity.accepted().build());
} catch (Exception e) {
deferredResult.setErrorResult(e);
}
}
);
return deferredResult;
}
我的 Elasticsearch 配置
@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elasticSearchEndpoint;
@Bean
@Override
public EntityMapper entityMapper() {
final ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(), new DefaultConversionService());
entityMapper.setConversions(elasticsearchCustomConversions());
return entityMapper;
}
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoint)
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
我的仓库
public interface OpenUsageRepository extends ReactiveElasticsearchRepository<OpenUsage, Long> {
}
我的 DTO
@Data
@Document(indexName = "open_usages", type = "open_usages")
@TypeAlias("OpenUsage")
public class OpenUsage {
@Field(name = "id")
@Id
private Long id;
......
}
我的适配器实现
@Autowired
private final OpenUsageRepository openUsageRepository;
...transform entity into OpenUsage...
public void doSomething(final List<OpenUsage> openUsages){
openUsageRepository.saveAll(openUsages)
}
最后是我的 IT 测试
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestPropertySource(locations = {"classpath:application-it.properties"})
@ContextConfiguration(initializers = OpenUsageExporterApplicationIT.Initializer.class)
class OpenUsageExporterApplicationIT {
@LocalServerPort
private int port;
private final static String STARTCALL = "http://localhost:%s/open-usage-exporter/start/";
@Container
private static ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.8.4").withExposedPorts(9200);
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(final ConfigurableApplicationContext configurableApplicationContext) {
final List<String> pairs = new ArrayList<>();
pairs.add("spring.data.elasticsearch.client.reactive.endpoints=" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
pairs.add("spring.elasticsearch.rest.uris=http://" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
TestPropertyValues.of(pairs).applyTo(configurableApplicationContext);
}
}
@Test
void testExportToES() throws IOException, InterruptedException {
final List<OpenUsageEntity> openUsageEntities = dbPreparator.insertTestData();
assertTrue(openUsageEntities.size() > 0);
final String result = executeRestCall(STARTCALL);
// Awaitility here tells me nothing is in ElasticSearch :(
}
private String executeRestCall(final String urlTemplate) throws IOException {
final String url = String.format(urlTemplate, port);
final HttpUriRequest request = new HttpPost(url);
final HttpResponse response = HttpClientBuilder.create().build().execute(request);
// Get the result.
return EntityUtils.toString(response.getEntity());
}
}
解决方案
public void doSomething(final List<OpenUsage> openUsages){
openUsageRepository.saveAll(openUsages)
}
这在末尾缺少分号,因此不应编译。
但我认为这只是一个错字,实际上有一个分号。
无论如何,saveAll()
返回一个Flux
. 这Flux
只是一个保存数据的方法,直到subscribe()
有人(或类似的东西blockLast()
)调用它才“执行”。你只是把它Flux
扔掉,所以保存永远不会被执行。
如何解决这个问题?一种选择是添加.blockLast()
调用:
openUsageRepository.saveAll(openUsages).blockLast();
但这将以阻塞方式保存数据,有效地破坏反应性。
另一种选择是,如果您调用的代码saveAll()
支持反应性只是返回Flux
返回的saveAll()
,但是,由于您doSomething()
有void
返回类型,这是值得怀疑的。
无论如何都看不到您的startExport()
连接方式doSomething()
。但看起来您的“调用代码”没有使用任何反应性概念,因此真正的解决方案是重写调用代码以使用反应性(获取 aPublisher
并subscribe()
在其上,然后等到数据到达),或恢复为使用阻塞 API(ElasticsearchRepository
而不是ReactiveElasticsearchRepository
)。
推荐阅读
- uibutton - UIButton 的 IBDesignable,IBInspectable 变量为零
- javascript - 如何使用javascript创建程序来计算串联电阻?
- linux - 当 Linux 操作系统无法正常关闭/重置时,如何防止文件/磁盘损坏?
- javascript - script-src 包含无效的源:''wasm-eval' 在使用 web worker 时
- botframework - LUIS 离线支持
- laravel - 我的页面是否会被谷歌抓取,因为它们是异步解释并注入 DOM 的 Markdown 文件?
- mongodb - 如何通过检查mongodb和spring条件中的数组元素来选择文档
- java - 如何在运行时更改 API Base Url(Retrofit、Android、Java)?
- amazon-web-services - AWS RDS 自动备份/快照通知
- sql - 访问要插入数据库的列表元素