首页 > 解决方案 > 有没有办法从 Flux 分配值到通量?

问题描述

我想要我的应用程序类中的一个方法,通过它我可以创建一个新的 DEData 通量,并在它从 API 完全接收数据后使用 PostData 的通量相应地为其分配值。我的代码如下,

我的 ConnectionManager 类:

@Service
public class ConnectionManager {


    WebClient webClient = WebClient.create();

    public Mono<String> getResponseJSON(String URI)
    {
            return webClient.get().uri(URI).retrieve().bodyToMono(String.class).log();
    }

    public <T> Flux<T> getResponseObjects(String URI,Class<T> t) 
    {
         return webClient.get().uri(URI).retrieve().bodyToFlux(t).log();        
    }

}

应用类:

@SpringBootApplication
public class WebClientConnectorApplication implements CommandLineRunner{
@Autowired
ConnectionManager connectionManager;
@Autowired
DEData deData;

public static void main(String[] args) {
    SpringApplication.run(WebClientConnectorApplication.class, args);
}

@Override
public void run(String... args) throws Exception {


    connectionManager.getResponseObjects("https://jsonplaceholder.typicode.com/posts/1/comments",PostData.class)
    .flatMap()

}

}

PostData 类:

public class PostData {


int postId;
int id;
String name,email,body;

public PostData() {

}

public PostData(int postId, int id, String name, String email, String body) {
    super();
    this.postId = postId;
    this.id = id;
    this.name = name;
    this.email = email;
    this.body = body;
}

public int getPostId() {
    return postId;
}
public void setPostId(int postId) {
    this.postId = postId;
}
public int getId() {
    return id;
}
public void setId(int id) {
    this.id = id;
}
public String getName() {
    return name;
}
public void setName(String name) {
    this.name = name;
}
public String getEmail() {
    return email;
}
public void setEmail(String email) {
    this.email = email;
}
public String getBody() {
    return body;
}
public void setBody(String body) {
    this.body = body;
}
@Override
public String toString() {
    return "PostData [postId=" + postId + ", id=" + id + ", name=" + name + ", email=" + email + ", body=" + body
            + "]";
}



}

DEData 类:

@Component

public class DEData {

int id;
String name,email;

public DEData() {
}

public DEData(int id, String name, String email) {
    super();
    this.id = id;
    this.name = name;
    this.email = email;
}
public int getId() {
    return id;
}
public void setId(int id) {
    this.id = id;
}
public String getName() {
    return name;
}
public void setName(String name) {
    this.name = name;
}
public String getEmail() {
    return email;
}
public void setEmail(String email) {
    this.email = email;
}
@Override
public String toString() {
    return "DEData [id=" + id + ", name=" + name + ", email=" + email + "]";
}



}

标签: spring-bootreactive-programmingspring-webflux

解决方案


你想要的是从 API 调用创建一个新的 Flux 。

调整模型

发布数据

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PostData {

  private int postId;
  private int id;
  private String name;
  private String email;
  private String body;
}

DEData

@Data
@Builder
public class DEData {

  private int id;
  private String name;
  private String email;
}

客户


   @Component
public class TestClient {
  WebClient webClient = WebClient.create();

  public <T> Flux<T> getResponseObjects(String uri, Class<T> t) {
    return webClient.get().uri(uri).retrieve().bodyToFlux(t);
  }

}


服务(你应该在你的主应用程序中放置什么)

@Service
public class ServiceImpl {
  @Autowired
  private TestClient testClient;

  public Flux<DEData> getNewFluxDEData() {
    return testClient.getResponseObjects("https://jsonplaceholder.typicode.com/posts/1/comments", PostData.class)
            .flatMap(postData -> Mono.just(DEData.builder()
                    .id(postData.getId())
                    .email(postData.getEmail())
                    .name(postData.getName())
                    .build()));
  }
}

控制器

@RestController
@RequestMapping("/test")
public class TestController {

  @Autowired
  ServiceImpl serviceImpl;

  @GetMapping(value = "/de-data", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
  public Flux<DEData> uploadSimple() {
    return serviceImpl.getNewFluxDEData();
  }

}

输出产生 MediaType.APPLICATION_JSON_VALUE

[
    {
        "id": 1,
        "name": "id labore ex et quam laborum",
        "email": "Eliseo@gardner.biz"
    },
    {
        "id": 2,
        "name": "quo vero reiciendis velit similique earum",
        "email": "Jayne_Kuhic@sydney.com"
    },
    {
        "id": 3,
        "name": "odio adipisci rerum aut animi",
        "email": "Nikita@garfield.biz"
    }
              .
              .
              .
]

输出产生 MediaType.APPLICATION_STREAM_JSON_VALUE


    {
        "id": 1,
        "name": "id labore ex et quam laborum",
        "email": "Eliseo@gardner.biz"
    }
    {
        "id": 2,
        "name": "quo vero reiciendis velit similique earum",
        "email": "Jayne_Kuhic@sydney.com"
    }
    {
        "id": 3,
        "name": "odio adipisci rerum aut animi",
        "email": "Nikita@garfield.biz"
    }
              .
              .
              .


推荐阅读