java - spring boot 字段自动装配服务返回 null
问题描述
我的服务:
@Service
public interface EntryTerminalService {
Page<EntryTerminal> getEntryTerminalEventsLog(Pageable pageable);
void saveEntryTerminalEventToLog(EntryTerminal entryTerminal);
}
服务实现:
@Service
public class EntryTerminalServiceImpl implements EntryTerminalService {
private final EntryTerminalRepository entryTerminalRepository;
@Autowired
public EntryTerminalServiceImpl(EntryTerminalRepository entryTerminalRepository) {
this.entryTerminalRepository = entryTerminalRepository;
}
@Override
public Page<EntryTerminal> getEntryTerminalEventsLog(Pageable pageable) {
return entryTerminalRepository.findAllByOrderByIdDesc(pageable);
}
@Override
public void saveEntryTerminalEventToLog(EntryTerminal entryTerminal) {
entryTerminalRepository.save(entryTerminal);
}
}
我的存储库:
public interface EntryTerminalRepository extends JpaRepository<EntryTerminal, Long> {
Page<EntryTerminal> findAllByOrderByIdDesc(Pageable pageable);
}
记录处理器:
这里我使用 EntryTerminalService,所以我可以写入数据库。我使用字段注入,但我总是在这里得到 null !!!
public class RecordProcessor implements ShardRecordProcessor {
private static final Logger log = LoggerFactory.getLogger(RecordProcessor.class);
@Autowired
private EntryTerminalService entryTerminalService;
/**
* Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
* processRecords).
*
* @param initializationInput Provides information related to initialization.
*/
@Override
public void initialize(InitializationInput initializationInput) {
log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
}
/**
* Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
* data records to the application.
*
* @param processRecordsInput Provides the records to be processed as well as information and capabilities
* related to them (e.g. checkpointing).
*/
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
log.info("Processing {} record(s)", processRecordsInput.records().size());
// Data is read here from the Kinesis data stream
for (KinesisClientRecord record : processRecordsInput.records()) {
log.info("Processing Record For Partition Key : {}", record.partitionKey());
log.info("Processing Record With Sequence Number : {}", record.sequenceNumber());
String originalData;
try {
byte[] b = new byte[record.data().remaining()];
record.data().get(b);
log.info(new String(b, StandardCharsets.UTF_8).split("#")[0]);
originalData = new String(b, StandardCharsets.UTF_8).split("#")[0];
log.info("Data from kinesis stream : {}", originalData);
ObjectMapper objectMapper = new ObjectMapper();
EntryTerminalDTO entryTerminalDTO = objectMapper.readValue(originalData, EntryTerminalDTO.class);
EntryTerminal entryTerminal = new EntryTerminal(entryTerminalDTO.getEntryTerminalID(), entryTerminalDTO.getEntryTerminalHardwareVersion(), entryTerminalDTO.getEntryTerminalSoftwareVersion());
entryTerminalService.saveEntryTerminalEventToLog(entryTerminal); **<------- null**
log.info(entryTerminalDTO.toString());
} catch (Exception e) {
log.error("Caught throwable while processing records. Aborting.");
Runtime.getRuntime().halt(1);
}
try {
/*
* KCL assumes that the call to checkpoint means that all records have been
* processed, records which are passed to the record processor.
*/
processRecordsInput.checkpointer().checkpoint();
} catch (Exception e) {
log.error("Error during Processing of records", e);
}
}
}
/**
* Called when the lease tied to this record processor has been lost. Once the lease has been lost,
* the record processor can no longer checkpoint.
*
* @param leaseLostInput Provides access to functions and data related to the loss of the lease.
*/
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
log.error("LeaseLostInput {}", leaseLostInput);
}
/**
* Called when all data on this shard has been processed. Checkpointing must occur in the method for record
* processing to be considered complete; an exception will be thrown otherwise.
*
* @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
*/
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at shard end. Giving up.", e);
}
}
/**
* Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
* Enter). Checkpoints and logs the data a final time.
*
* @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
* before the shutdown is completed.
*/
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
log.info("Scheduler is shutting down, checkpointing.");
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
}
}
}
唱片处理器厂:
@Component
public class RecordProcessorFactory implements ShardRecordProcessorFactory {
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new RecordProcessor();
}
}
应用:
@SpringBootApplication
public class KinesisConsumerApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(KinesisConsumerApplication.class);
@Value(value = "${aws.stream_name}")
private String streamName;
@Value(value = "${consumer.type}")
private String consumerType;
@Autowired
private ApplicationContext context;
public static void main(String[] args) {
SpringApplication.run(KinesisConsumerApplication.class, args);
}
@Override
public void run(String... args) {
log.info("Running consumer application!");
ConsumerConfig consumerConfig = context.getBean(ConsumerConfig.class);
ConfigsBuilder configsBuilder = consumerConfig.getConfigBuilder();
/**
* The Scheduler is the entry point to the KCL. This instance is configured with defaults
* provided by the ConfigsBuilder.
*/
Scheduler scheduler;
switch (consumerType) {
case "2": {
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.cleanupLeasesUponShardCompletion(true)
.maxLeasesForWorker(25)
.maxLeasesToStealAtOneTime(1)
.consistentReads(false),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig()
.callProcessRecordsEvenForEmptyRecordList(false),
configsBuilder.retrievalConfig()
.maxListShardsRetryAttempts(5)
.initialPositionInStreamExtended(InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)));
break;
}
default: {
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.cleanupLeasesUponShardCompletion(true)
.maxLeasesForWorker(25)
.maxLeasesToStealAtOneTime(1)
.consistentReads(false),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig()
.callProcessRecordsEvenForEmptyRecordList(false),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, consumerConfig.getConfigBuilder().kinesisClient()))
.maxListShardsRetryAttempts(5)
.initialPositionInStreamExtended(InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)));
break;
}
}
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}
}
我的 EntryTerminalService 上出现空指针异常,为什么?
该服务是接口,并且在我自动装配存储库的地方实现,因此我可以写入数据库。
谢谢你所有的时间
更新:
在我的控制器中,我使用此服务并且没问题!!!
我的控制器
@RestController
@RequestMapping(value = "/api")
public class ConsumerController {
private final EntryTerminalService entryTerminalService;
@Autowired
public ConsumerController(EntryTerminalService entryTerminalService) {
this.entryTerminalService = entryTerminalService;
}
@GetMapping(value = "/getEntryTerminalEventsLog")
public ResponseEntity<Page<EntryTerminal>> getEntryTerminalEventsLog(Pageable pageable) {
return ResponseEntity.ok(entryTerminalService.getEntryTerminalEventsLog(pageable));
}
}
服务备案的问题是否标注?
如何解决这个问题呢?
解决方案
推荐阅读
- python - Python没有加载json
- c++ - 如何让每个子进程输出一个字符串?
- python - 如何解决库存程序中的搜索库存问题?
- php - php一次更新所有mysql行,但我想一次只更新一个
- html - Angular 9 mat-form-field 没有轮廓,没有标签
- angular - 从外部 URL 重定向回 Angular 应用程序
- java - 休眠 - 无法创建存储过程
- python - kivy FitImage: TypeError: object.__init__() 只接受一个参数(要初始化的实例)
- html - 这个 css 只在大屏幕上工作得很好,当窗口尺寸被修改时,它会变得不方形
- node.js - 快速打字稿中的猫鼬$gt运算符?