首页 > 解决方案 > spring boot 字段自动装配服务返回 null

问题描述

我的服务:

@Service
public interface EntryTerminalService {

    Page<EntryTerminal> getEntryTerminalEventsLog(Pageable pageable);

    void saveEntryTerminalEventToLog(EntryTerminal entryTerminal);

}

服务实现:

@Service
public class EntryTerminalServiceImpl implements EntryTerminalService {

    private final EntryTerminalRepository entryTerminalRepository;

    @Autowired
    public EntryTerminalServiceImpl(EntryTerminalRepository entryTerminalRepository) {
        this.entryTerminalRepository = entryTerminalRepository;
    }

    @Override
    public Page<EntryTerminal> getEntryTerminalEventsLog(Pageable pageable) {
        return entryTerminalRepository.findAllByOrderByIdDesc(pageable);
    }

    @Override
    public void saveEntryTerminalEventToLog(EntryTerminal entryTerminal) {
        entryTerminalRepository.save(entryTerminal);
    }

}

我的存储库:

public interface EntryTerminalRepository extends JpaRepository<EntryTerminal, Long> {
    Page<EntryTerminal> findAllByOrderByIdDesc(Pageable pageable);
}

记录处理器:

这里我使用 EntryTerminalService,所以我可以写入数据库。我使用字段注入,但我总是在这里得到 null !!!

public class RecordProcessor implements ShardRecordProcessor {

    private static final Logger log = LoggerFactory.getLogger(RecordProcessor.class);

    @Autowired
    private EntryTerminalService entryTerminalService;

    /**
     * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
     * processRecords).
     *
     * @param initializationInput Provides information related to initialization.
     */
    @Override
    public void initialize(InitializationInput initializationInput) {
        log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
    }

    /**
     * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
     * data records to the application.
     *
     * @param processRecordsInput Provides the records to be processed as well as information and capabilities
     *                            related to them (e.g. checkpointing).
     */
    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {

        log.info("Processing {} record(s)", processRecordsInput.records().size());

        // Data is read here from the Kinesis data stream
        for (KinesisClientRecord record : processRecordsInput.records()) {

            log.info("Processing Record For Partition Key : {}", record.partitionKey());
            log.info("Processing Record With Sequence Number : {}", record.sequenceNumber());

            String originalData;

            try {
                byte[] b = new byte[record.data().remaining()];
                record.data().get(b);
                log.info(new String(b, StandardCharsets.UTF_8).split("#")[0]);
                originalData = new String(b, StandardCharsets.UTF_8).split("#")[0];

                log.info("Data from kinesis stream : {}", originalData);

                ObjectMapper objectMapper = new ObjectMapper();

                EntryTerminalDTO entryTerminalDTO = objectMapper.readValue(originalData, EntryTerminalDTO.class);

                EntryTerminal entryTerminal = new EntryTerminal(entryTerminalDTO.getEntryTerminalID(), entryTerminalDTO.getEntryTerminalHardwareVersion(), entryTerminalDTO.getEntryTerminalSoftwareVersion());

                entryTerminalService.saveEntryTerminalEventToLog(entryTerminal);  **<------- null** 

                log.info(entryTerminalDTO.toString());

            } catch (Exception e) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            }

            try {
                /*
                 * KCL assumes that the call to checkpoint means that all records have been
                 * processed, records which are passed to the record processor.
                 */
                processRecordsInput.checkpointer().checkpoint();

            } catch (Exception e) {
                log.error("Error during Processing of records", e);
            }
        }
    }

    /**
     * Called when the lease tied to this record processor has been lost. Once the lease has been lost,
     * the record processor can no longer checkpoint.
     *
     * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
     */
    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        log.error("LeaseLostInput {}", leaseLostInput);
    }

    /**
     * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
     * processing to be considered complete; an exception will be thrown otherwise.
     *
     * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
     */
    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        }
    }

    /**
     * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
     * Enter). Checkpoints and logs the data a final time.
     *
     * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
     *                               before the shutdown is completed.
     */
    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        }
    }

}

唱片处理器厂:

@Component
public class RecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new RecordProcessor();
    }

}

应用:

@SpringBootApplication
public class KinesisConsumerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(KinesisConsumerApplication.class);

    @Value(value = "${aws.stream_name}")
    private String streamName;

    @Value(value = "${consumer.type}")
    private String consumerType;

    @Autowired
    private ApplicationContext context;

    public static void main(String[] args) {
        SpringApplication.run(KinesisConsumerApplication.class, args);
    }

    @Override
    public void run(String... args) {

        log.info("Running consumer application!");

        ConsumerConfig consumerConfig = context.getBean(ConsumerConfig.class);

        ConfigsBuilder configsBuilder = consumerConfig.getConfigBuilder();

        /**
         * The Scheduler is the entry point to the KCL. This instance is configured with defaults
         * provided by the ConfigsBuilder.
         */
        Scheduler scheduler;

        switch (consumerType) {
            case "2": {
                scheduler = new Scheduler(
                        configsBuilder.checkpointConfig(),
                        configsBuilder.coordinatorConfig(),
                        configsBuilder.leaseManagementConfig()
                                .cleanupLeasesUponShardCompletion(true)
                                .maxLeasesForWorker(25)
                                .maxLeasesToStealAtOneTime(1)
                                .consistentReads(false),
                        configsBuilder.lifecycleConfig(),
                        configsBuilder.metricsConfig(),
                        configsBuilder.processorConfig()
                                .callProcessRecordsEvenForEmptyRecordList(false),
                        configsBuilder.retrievalConfig()
                                .maxListShardsRetryAttempts(5)
                                .initialPositionInStreamExtended(InitialPositionInStreamExtended
                                        .newInitialPosition(InitialPositionInStream.TRIM_HORIZON)));
                break;
            }
            default: {
                scheduler = new Scheduler(
                        configsBuilder.checkpointConfig(),
                        configsBuilder.coordinatorConfig(),
                        configsBuilder.leaseManagementConfig()
                                .cleanupLeasesUponShardCompletion(true)
                                .maxLeasesForWorker(25)
                                .maxLeasesToStealAtOneTime(1)
                                .consistentReads(false),
                        configsBuilder.lifecycleConfig(),
                        configsBuilder.metricsConfig(),
                        configsBuilder.processorConfig()
                                .callProcessRecordsEvenForEmptyRecordList(false),
                        configsBuilder.retrievalConfig()
                                .retrievalSpecificConfig(new PollingConfig(streamName, consumerConfig.getConfigBuilder().kinesisClient()))
                                .maxListShardsRetryAttempts(5)
                                .initialPositionInStreamExtended(InitialPositionInStreamExtended
                                        .newInitialPosition(InitialPositionInStream.TRIM_HORIZON)));
                break;
            }
        }

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

    }

}

我的 EntryTerminalService 上出现空指针异常,为什么?

该服务是接口,并且在我自动装配存储库的地方实现,因此我可以写入数据库。

谢谢你所有的时间

更新:

在我的控制器中,我使用此服务并且没问题!!!

我的控制器

@RestController
@RequestMapping(value = "/api")
public class ConsumerController {

    private final EntryTerminalService entryTerminalService;

    @Autowired
    public ConsumerController(EntryTerminalService entryTerminalService) {
        this.entryTerminalService = entryTerminalService;
    }

    @GetMapping(value = "/getEntryTerminalEventsLog")
    public ResponseEntity<Page<EntryTerminal>> getEntryTerminalEventsLog(Pageable pageable) {
        return ResponseEntity.ok(entryTerminalService.getEntryTerminalEventsLog(pageable));
    }
}

服务备案的问题是否标注?

如何解决这个问题呢?

标签: javaspringspring-boot

解决方案


推荐阅读