首页 > 解决方案 > Rust 声明先分配后分配模式

问题描述

我有一个双向grpc流,充当到 kafka 集群的桥梁。首次初始化流时,我将创建 kafka 消费者并开始使用它。

为此,我想初始化一个空的consumer,等待第一个输入,然后将创建的消费者分配给一个空的消费者。我试图通过遵循这里的模式来做到这一点。

https://doc.rust-lang.org/rust-by-example/variable_bindings/declare.html

Rust 抛出 a possibly-unitialized variable error,这是因为它是在异步流中初始化的吗?

use std::pin::Pin;

use futures::{Stream, StreamExt};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use tonic::transport::Server;
use tonic::{Request, Response, Status};

use bridge::kafka_stream_server::{KafkaStream, KafkaStreamServer};
use bridge::{KafkaResponse, PublishRequest};

pub mod bridge {
    tonic::include_proto!("bridge"); // The string specified here must match the proto package name
}

#[derive(Default)]
pub struct KafkaStreamService {}

pub fn create_kafka_consumer(topic: String) -> Consumer {
    Consumer::from_hosts(vec!["localhost:9092".to_owned()])
        .with_topic(topic.to_owned())
        .with_fallback_offset(FetchOffset::Latest)
        .with_group("".to_owned())
        .with_offset_storage(GroupOffsetStorage::Kafka)
        .create()
        .unwrap()
}

#[tonic::async_trait]
impl KafkaStream for KafkaStreamService {
    type SubscribeStream =
        Pin<Box<dyn Stream<Item = Result<KafkaResponse, Status>> + Send + Sync + 'static>>;

    async fn subscribe(
        &self,
        request: Request<tonic::Streaming<PublishRequest>>,
    ) -> Result<Response<Self::SubscribeStream>, Status> {
        println!("Initiated stream!");
        let mut stream = request.into_inner();

        let mut consumer_created_flag: bool = false;
        let consumer: Consumer; //declared here
        let output = async_stream::try_stream! {
            while let Some(publication) = stream.next().await {
                let message = publication?;
                let topic = message.topic.clone();
                if consumer_created_flag == false {
                    consumer = create_kafka_consumer(topic); //error occurs here
                    consumer_created_flag = true;
                }
                let reply = bridge::KafkaResponse {
                    content: format!("Hello {}!", "world"),
                };
                yield reply.clone();
            }
        };
        Ok(Response::new(Box::pin(output) as Self::SubscribeStream))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse().unwrap();

    println!("KafkaService listening on: {}", addr);

    let svc = KafkaStreamServer::new(KafkaStreamService::default());

    Server::builder().add_service(svc).serve(addr).await?;

    Ok(())
}

编辑:根据要求的详细错误:

error[E0381]: use of possibly-uninitialized variable: `consumer`
  --> src/server.rs:42:22
   |
42 |           let output = async_stream::try_stream! {
   |  ______________________^
43 | |             while let Some(publication) = stream.next().await {
44 | |                 let message = publication?;
45 | |                 let topic = message.topic.clone();
46 | |                 if consumer_created_flag == false {
47 | |                     consumer = create_kafka_consumer(topic);
   | |                     -------- use occurs due to use in generator
...  |
54 | |             }
55 | |         };
   | |_________^ use of possibly-uninitialized `consumer`
   |
   = note: this error originates in a macro (in Nightly builds, run with -Z macro-backtrace for more info)

标签: rustgrpckafka-consumer-api

解决方案


声明第一个模式仅适用于基本控制流(if, match,{}等)。async当引用或移动到另一个对象(如块或闭包)时,它会分崩离析:

fn main() {
    let val: i32;
    let func = move || {
        val = 5;
    };
}
error[E0594]: cannot assign to `val`, as it is not declared as mutable
 --> src/main.rs:4:9
  |
2 |     let val: i32;
  |         --- help: consider changing this to be mutable: `mut val`
3 |     let func = move || {
4 |         val = 5;
  |         ^^^^^^^ cannot assign

error[E0381]: use of possibly-uninitialized variable: `val`
 --> src/main.rs:3:16
  |
3 |     let func = move || {
  |                ^^^^^^^ use of possibly-uninitialized `val`
4 |         val = 5;
  |         --- use occurs due to use in closure

一个潜在的解决方法是将其声明移动到try_stream!宏中:

let output = async_stream::try_stream! {
    let mut consumer_created_flag: bool = false;
    let consumer: Consumer;
    while let Some(publication) = stream.next().await {
        let message = publication?;
        let topic = message.topic.clone();
        if consumer_created_flag == false {
            consumer = create_kafka_consumer(topic);
            consumer_created_flag = true;
        }
        let reply = KafkaResponse {
            content: format!("Hello {}!", "world"),
        };
        yield reply.clone();
    }
};

但是,这会导致一个新错误,因为您可能会分配给它两次(编译器不知道它consumer_created_flag正在保护它):

error[E0384]: cannot assign twice to immutable variable `consumer`
  --> src\lib.rs:1348:21
   |
44 |             let consumer: Consumer; //declared here
   |                 -------- help: make this binding mutable: `mut consumer`
...
49 |                     consumer = create_kafka_consumer(topic); //error occurs here
   |                     ^^^^^^^^ cannot assign twice to immutable variable

幸运的是,一个快速的解决方法是简单地使consumer可变。然后编译器唯一抱怨的是它没有被使用,但我认为你把它放在那里是有原因的。


推荐阅读