首页 > 解决方案 > 在另一个类上调用 get 方法时获取空数据

问题描述

我正在尝试显示班级Transaction中另一个班级的数据Application。我创建了一个新实例Transaction并将其传递给sendUserNotification方法,但是当我对数据调用 get 方法时,它返回为 null。

Application Class

public class Application {
    private static final String TOPIC = "suspicious-transactions";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";

    public static void main(String[] args) {
        Application kafkaConsumerApp = new Application();

        String consumerGroup = "user-notification-service";
        if (args.length == 1) {
            consumerGroup = args[0];
        }

        System.out.println("Consumer is part of consumer group " + consumerGroup);

        Consumer<String, Transaction> kafkaConsumer = kafkaConsumerApp.createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);

        kafkaConsumerApp.consumeMessages(TOPIC, kafkaConsumer);

    }

    public static void consumeMessages(String topic, Consumer<String, Transaction> kafkaConsumer) {
        kafkaConsumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, Transaction> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            if (consumerRecords.isEmpty()) {
                //do something
            }

            for (ConsumerRecord<String, Transaction> record : consumerRecords) {
               Transaction transaction =  new Transaction();
               sendUserNotification(transaction);

                System.out.println(String.format("Received record (key: %s, value %s, partition: %d, offset: %d",
                        record.key(), record.value(), record.partition(), record.offset()));
            }
        }
    }

    public static Consumer<String, Transaction> createKafkaConsumer(String bootstrapServers, String consumerGroups) {
        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer .class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroups);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return new KafkaConsumer<>(properties);
   }

    private static void sendUserNotification(Transaction transaction) {
        // Print transaction information to the console
        String h =transaction.getUser();

        System.out.println(String.format("Sending user %s notification about a suspicious transaction of %f in their account originating in %s", h, transaction.getAmount(), transaction.getTransactionLocation()));

    }

Transaction Class

public class Transaction {
    private String user;
    private double amount;
    private String transactionLocation;

    public String getUser() {
        return user;
    }

    public double getAmount() {
        return amount;
    }

    public String getTransactionLocation() {
        return transactionLocation;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void setAmount(double amount) {
        this.amount = amount;
    }

    public void setTransactionLocation(String transactionLocation) {
        this.transactionLocation = transactionLocation;
    }

    @Override
    public String toString() {
        return "Transaction{" +
                "user='" + user + '\'' +
                ", amount=" + amount +
                ", transactionLocation='" + transactionLocation + '\'' +
                '}';
    }

    /**
     * Kafka Deserializer implementation.
     * Deserializes a Transaction from JSON to a {@link Transaction} object
     */
    public static class TransactionDeserializer implements Deserializer<Transaction> {

        @Override
        public Transaction deserialize(String topic, byte[] data) {
            ObjectMapper mapper = new ObjectMapper();
            Transaction transaction = null;
            try {
                transaction = mapper.readValue(data, Transaction.class);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return transaction;

        }
    }

Output

Consumer is part of consumer group user-notification-service
Sending user null notification about a suspicious transaction of 0.000000 in their account originating in null
Received record (key: dkelly9283, value Transaction{user='dkelly9283', amount=1653.32, transactionLocation='China'}, partition: 0, offset: 12
Sending user null notification about a suspicious transaction of 0.000000 in their account originating in null
Received record (key: msmith2015, value Transaction{user='msmith2015', amount=50.43, transactionLocation='California'}, partition: 1, offset: 12

标签: javamethodsapache-kafkaparameter-passing

解决方案


您需要将值传递给用户字段。引用变量字段(在本例中为字符串用户)在 java 中默认初始化为 null。您可以使用构造函数或 setUser()

创建新的事务实例后:

Transaction tr = new Transaction();
tr.setUser("User"); 
tr.setAmount(100); 
tr.setTransactionLocation("Location");

推荐阅读