首页 > 解决方案 > 在将类型化的 Akka Actor 与 Spring 集成时,声明一个注入器并将其作为状态参数传递是个好主意吗?

问题描述

我找不到一种将 Spring 与类型化 Actor 集成的巧妙方法。我没有使用扩展,而是声明了一个注入器服务

import akka.actor.typed.ActorSystem;
import com.akka.demo.a010.ASimpleSpringService;
import com.akka.demo.a010.AkkaSimpleBehaviour;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Data
public class ActorInjector {

    @Autowired
    private ASimpleSpringService aSimpleSpringService;

    @Autowired
    private ActorInjector _self;

    public ActorSystem<AkkaSimpleBehaviour.Command> createAkkaSimpleBehaviour(String actorName) {
        return ActorSystem.create(AkkaSimpleBehaviour.create(_self), actorName);
    }

}

该服务自动装配并将该引用传递给一个简单的参与者。

我的演员定义如下。

import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import com.akka.demo.a010.config.ActorInjector;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class AkkaSimpleBehaviour extends AbstractBehavior<AkkaSimpleBehaviour.Command> {

    private final List<String> messages = new ArrayList<>();
    private ActorRef<List<String>> sender;
    private ActorInjector actorInjector;

    public interface Command extends Serializable {

    }

    @AllArgsConstructor
    public static class TellMeSomething implements Command {

        private static final long serialVersionUID = -7796709831949054890L;
        @Getter
        private final String message;
    }

    @AllArgsConstructor
    public static class CollectTheResults implements Command {

        private static final long serialVersionUID = 1643210899551075153L;
        @Getter
        private final ActorRef<List<String>> sender;
    }

    private AkkaSimpleBehaviour(ActorContext<Command> context, ActorInjector actorInjector) {
        super(context);
        this.actorInjector = actorInjector;
    }

    public static Behavior<Command> create(ActorInjector actorInjector) {
        return Behaviors.setup(ctx -> new AkkaSimpleBehaviour(ctx,actorInjector));
    }

    @Override
    public Receive<Command> createReceive() {
        return newReceiveBuilder().onMessage(TellMeSomething.class, message -> {
            messages.add(message.getMessage());
            actorInjector.getASimpleSpringService().logSomething("*-*-*-*-*-*-*-*-*-*");
            return Behaviors.same();
        }).onMessage(CollectTheResults.class, message -> {
            this.sender = message.getSender();
            if (messages.size() == 4) {
                this.sender.tell(messages);
            }
            return Behaviors.same();
        }).build();
    }
}

通过注入器服务后,我可以从该服务中获取我的自动装配依赖项,例如:

actorInjector.getASimpleSpringService().logSomething("*-*-*-*-*-*-*-*-*-*");

我的 ASimpleService 只是一个记录输出的虚拟服务。

import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class ASimpleSpringService {

    public void logSomething(String message){
        log.info(message);
    }
}

然后在一个简单的 RestController 中,我使用的系统如下:

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.AskPattern;
import com.akka.demo.a010.config.ActorInjector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

@RestController
@Slf4j
public class MyRestController {

    @Autowired
    private ActorInjector actorInjector;

    @GetMapping("/hello-akka")
    public void greetings() {
        ActorSystem<AkkaSimpleBehaviour.Command> exampleActor = actorInjector.createAkkaSimpleBehaviour("anActor");
        exampleActor.tell(new AkkaSimpleBehaviour.TellMeSomething("hello"));
        exampleActor.tell(new AkkaSimpleBehaviour.TellMeSomething("Who are you"));
        exampleActor.tell(new AkkaSimpleBehaviour.TellMeSomething("create a child"));
        exampleActor.tell(new AkkaSimpleBehaviour.TellMeSomething("Here is some message"));

        CompletionStage<List<String>> result = AskPattern.ask(exampleActor, AkkaSimpleBehaviour.CollectTheResults::new, Duration.ofSeconds(10), exampleActor.scheduler());
        result.whenComplete((reply, failure) -> {
            if(reply != null){
                log.info("The system responds in time");
            } else {
                log.error("The system does not respond in time");
                exampleActor.terminate();
                throw new RuntimeException("The system does not respond in time");
            }
            exampleActor.terminate();
        });
        try{
            List<String> messages = result.toCompletableFuture().get();
            messages.forEach(log::info);
        } catch (InterruptedException | ExecutionException e){
            e.printStackTrace();
        }
    }

}

我的问题是:我打算将我的所有服务放入我的 ActorInjector 服务中,这些服务将被注入到我的演员中。我不熟悉 Akka 状态及其副作用,但我知道将所有这些单例服务存储为参与者状态可能是个坏主意。将这些服务存储为演员参数是个好主意吗?通过这种方式我可以体验到什么样的副作用?你能给我指路吗?

标签: springakka-typed

解决方案


推荐阅读