首页 > 解决方案 > 无法使用 AbstractReactiveElasticsearchConfiguration 将数据写入 ElasticSearch

问题描述

我正在尝试将数据写入我的本地 Elasticsearch Docker 容器(7.4.2),为简单起见,我使用了 Spring 提供的 AbstractReactiveElasticsearchConfiguration 也覆盖了 entityMapper 函数。我构建了我的存储库,扩展了 ReactiveElasticsearchRepository 然后最后我使用我的自动装配存储库来 saveAll() 我包含数据的元素集合。但是 Elasticsearch 不会写入任何数据。我还有一个 REST 控制器,它开始我的整个过程,基本上什么都不返回,DeferredResult>

来自我的 ApiDelegateImpl 的 REST 方法

  @Override
  public DeferredResult<ResponseEntity<Void>> openUsageExporterStartPost() {

    final DeferredResult<ResponseEntity<Void>> deferredResult = new DeferredResult<>();

    ForkJoinPool.commonPool().execute(() -> {

          try {
            openUsageExporterAdapter.startExport();
            deferredResult.setResult(ResponseEntity.accepted().build());

          } catch (Exception e) {
            deferredResult.setErrorResult(e);
          }
        }
    );

    return deferredResult;
  }

我的 Elasticsearch 配置

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {

  @Value("${spring.data.elasticsearch.client.reactive.endpoints}")
  private String elasticSearchEndpoint;

  @Bean
  @Override
  public EntityMapper entityMapper() {

    final ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(), new DefaultConversionService());
    entityMapper.setConversions(elasticsearchCustomConversions());
    return entityMapper;
  }

  @Override
  public ReactiveElasticsearchClient reactiveElasticsearchClient() {
    ClientConfiguration clientConfiguration = ClientConfiguration.builder()
        .connectedTo(elasticSearchEndpoint)
        .build();

    return ReactiveRestClients.create(clientConfiguration);
  }
}

我的仓库

public interface OpenUsageRepository extends ReactiveElasticsearchRepository<OpenUsage, Long> {

}

我的 DTO

@Data
@Document(indexName = "open_usages", type = "open_usages")
@TypeAlias("OpenUsage")
public class OpenUsage {

  @Field(name = "id")
  @Id
  private Long id;

  ......
}

我的适配器实现

  @Autowired
  private final OpenUsageRepository openUsageRepository;

  ...transform entity into OpenUsage...

  public void doSomething(final List<OpenUsage> openUsages){
   openUsageRepository.saveAll(openUsages)
  }

最后是我的 IT 测试

@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestPropertySource(locations = {"classpath:application-it.properties"})
@ContextConfiguration(initializers = OpenUsageExporterApplicationIT.Initializer.class)
class OpenUsageExporterApplicationIT {


  @LocalServerPort
  private int port;

  private final static String STARTCALL = "http://localhost:%s/open-usage-exporter/start/";

  @Container
  private static ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.8.4").withExposedPorts(9200);

  static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    @Override
    public void initialize(final ConfigurableApplicationContext configurableApplicationContext) {

      final List<String> pairs = new ArrayList<>();

      pairs.add("spring.data.elasticsearch.client.reactive.endpoints=" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
      pairs.add("spring.elasticsearch.rest.uris=http://" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
      TestPropertyValues.of(pairs).applyTo(configurableApplicationContext);
    }
  }

  @Test
  void testExportToES() throws IOException, InterruptedException {

    final List<OpenUsageEntity> openUsageEntities = dbPreparator.insertTestData();
    assertTrue(openUsageEntities.size() > 0);

    final String result = executeRestCall(STARTCALL);

    // Awaitility here tells me nothing is in ElasticSearch :(

  }

  private String executeRestCall(final String urlTemplate) throws IOException {

    final String url = String.format(urlTemplate, port);

    final HttpUriRequest request = new HttpPost(url);
    final HttpResponse response = HttpClientBuilder.create().build().execute(request);

    // Get the result.
    return EntityUtils.toString(response.getEntity());
  }
}

标签: spring-bootelasticsearchjunit5java-11spring-reactive

解决方案


public void doSomething(final List<OpenUsage> openUsages){
 openUsageRepository.saveAll(openUsages)
}

这在末尾缺少分号,因此不应编译。

但我认为这只是一个错字,实际上有一个分号。

无论如何,saveAll()返回一个Flux. 这Flux只是一个保存数据的方法,直到subscribe()有人(或类似的东西blockLast())调用它才“执行”。你只是把它Flux扔掉,所以保存永远不会被执行。

如何解决这个问题?一种选择是添加.blockLast()调用:

openUsageRepository.saveAll(openUsages).blockLast();

但这将以阻塞方式保存数据,有效地破坏反应性。

另一种选择是,如果您调用的代码saveAll()支持反应性只是返回Flux返回的saveAll(),但是,由于您doSomething()void返回类型,这是值得怀疑的。

无论如何都看不到您的startExport()连接方式doSomething()。但看起来您的“调用代码”没有使用任何反应性概念,因此真正的解决方案是重写调用代码以使用反应性(获取 aPublishersubscribe()在其上,然后等到数据到达),或恢复为使用阻塞 API(ElasticsearchRepository而不是ReactiveElasticsearchRepository)。


推荐阅读