首页 > 解决方案 > 类星体纤维在线程启动后返回空结果

问题描述

我正在我的 Spring Boot 应用程序上本地测试我的 POST 端点。我有一个方法可以生成一个光纤线程来运行一组调用端点 A 的指令,并且我的 POST 端点返回 A 返回的结果。但是,当我的 POST 请求完成时,邮递员中显示的结果为空。我的代码如下

@RequestMapping("/prediction")
    public CustomResponse prediction(@RequestBody CustomRequest input, HttpServletRequest request) {
        return predictionClass.prediction(input);
    }

public CustomResponse prediction(CustomRequest input) {
        CustomResponse customResponse = new customResponse();
        new Fiber<CustomResponse>(new SuspendableRunnable() {

            public void  run() throws SuspendExecution, InterruptedException {
                
                List<CustomRequest> inputs = new ArrayList<>();

                // A for loop is here to duplicate CustomRequest input parameter received and populate the inputs list
                
                List<CustomResponse> customResponses = inputs.stream()
                        .map(req -> processPrediction(req)).collect(Collectors.toList());

                for (CustomResponse x : customResponses) {
                    if (inputs.size() > 1) {
                        for (String outputKey : x.getOutputVars().keySet()) {
                            customResponse.getOutputVars().put(x.getModelName() + "_" + outputKey, x.getOutputVars().get(outputKey));
                        }
                    } else {
                        // Else statement will be run because the input is only size 1
                        customResponse.getOutputVars().putAll(x.getOutputVars());
                    
                }
                System.out.println(customResponse.getOutputVars().size());
            }
        }).start();
        return customResponse;

    }

    public CustomResponse processPrediction(CustomRequest input) {
        CustomResponse res = new CustomResponse();

        RestTemplate gzipRestTemplate = new RestTemplateBuilder()
                .additionalInterceptors(new GzipHttpRequestInterceptor())
                .build();

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<Map<String, Object>> entity = new HttpEntity<>(input, headers);

        ResponseEntity<Map> responseEntity = gzipRestTemplate.postForEntity("an-endpoint-url", entity, Map.class);

        Map<String, Object> outputs = (Map<String, Object>) responseEntity.getBody();

        res.getOutputVars().putAll(outputs);

        return res;

    }

在这个测试中,我的输入只有大小 1,当我使用 Postman 触发 POST 请求时,System.out.println(customResponse.getOutputVars().size());返回 16,但在 Postman 上它显示我的 outputVars 为空。

有趣的是,我决定做 2 个实验,如下所示。

实验一

public CustomResponse prediction() {
        CustomResponse customResponse = new CustomResponse ();
        new Fiber<Void>(new SuspendableRunnable() {

            public void  run() throws SuspendExecution, InterruptedException {
                customResponse .setModelName("name");
                Map<String, Object> test = new HashMap<>();
                test.put("pcd4Score", "hello");
                customResponse .getOutputVars().put("message", "hello");
            }
        }).start();
        return customResponse ;

    }

邮递员返回 customResponse,其中包含 message 和 hello

实验二

该实验与实验 1 相同,但使用 Thread.sleep(1000); 我在想 thread.sleep 可以代表processPrediction我在我的原始代码中

public CustomResponse prediction() {
        CustomResponse customResponse = new CustomResponse ();
        new Fiber<Void>(new SuspendableRunnable() {

            public void  run() throws SuspendExecution, InterruptedException {
                customResponse .setModelName("name");
                Map<String, Object> test = new HashMap<>();
                test.put("pcd4Score", "hello");
                customResponse .getOutputVars().put("message", "hello");
            }
        }).start();
        return customResponse ;
    }

这次 customResponse 是空的,在我的 Spring Boot 应用程序终端中,错误是

[quasar] ERROR: while transforming {the-path-to-my-class-for-prediction-method}$1: Unable to instrument {the-path-to-my-class-for-prediction-method}$1#run()V because of blocking call to java/lang/Thread#sleep(J)V

感觉实验 1 是成功的,因为指令不是 cpu 密集型的,我知道我可以用一种方式对它进行编码,在一个单独的方法中启动纤程,然后只调用prediction,因为看起来邮递员在空的 CustomResponse 中返回,然后只有里面的指令run()开始运行,我只是想了解一下Fiber的行为。我在谷歌搜索我的情况时遇到了麻烦(我的谷歌关键字是休息端点在启动光纤线程后不返回结果)因此我在 stackoverflow 上问这个问题。我对 java 中的整个多线程主题也很陌生。

标签: spring-bootrestapi-gatewayquasarfibers

解决方案


customResponse我通过在像这样返回之前添加光纤连接来解决它。但是,仅针对 .join() 进行尝试和捕获似乎不是很优雅,是否有更优雅的方法来重做整个方法?

public CustomResponse prediction(CustomRequest input) {
        CustomResponse customResponse = new customResponse();
        Fiber fiber = new Fiber<CustomResponse>(new SuspendableRunnable() {

            public void  run() throws SuspendExecution, InterruptedException {
                
                List<CustomRequest> inputs = new ArrayList<>();

                // A for loop is here to duplicate CustomRequest input parameter received and populate the inputs list
                
                List<CustomResponse> customResponses = inputs.stream()
                        .map(req -> processPrediction(req)).collect(Collectors.toList());

                for (CustomResponse x : customResponses) {
                    if (inputs.size() > 1) {
                        for (String outputKey : x.getOutputVars().keySet()) {
                            customResponse.getOutputVars().put(x.getModelName() + "_" + outputKey, x.getOutputVars().get(outputKey));
                        }
                    } else {
                        // Else statement will be run because the input is only size 1
                        customResponse.getOutputVars().putAll(x.getOutputVars());
                    
                }
                System.out.println(customResponse.getOutputVars().size());
            }
        }).start();
        try {
          fiber.join();
        } catch (Exception e) {
          e.printStackTrace();
        }

        return customResponse;

    }

推荐阅读