Rust Error Handling – Can this error handling be made cleaner?

Recently I started to learn Rust. I come from a mostly Java background, but spent roughly the last two years in Go. I am working on converting a small microservice that reads data from Kafka and stores data in a map locally. Right now I’m working on the Kafka piece, and while I’ve got it working it doesn’t look like the best or cleanest code. I typically like to avoid nested blocks when possible, but 1-2 levels aren’t the end of the world. However, my current error handling is going 4+ levels deep, and just seems like there is a cleaner way to do this.

The main thing so far I’m not happy with is how the error handling is being done. I’d like to avoid the deeply nested blocks and match. But I’m also curious if having a function that never returns is idiomatic to Rust. On our team in Go this is very common as the expectation is the function will be launched via a gorotuine. Here I’m just following suit and launching in by spawning a thread. The logic is to start at the beginning of the topic and read through all the messages and store the messages of a certain type we are interested in, assuming the deserialization is successful. If an error occurs the expectation is a message will be logged out, but the application will continue to read through the topic.

/// Creates and configures a Kafka consumer and listens to a Kafka topic reading
/// messages and processing them.
///
/// This function loops forever and doesn't return thus it should be called from
/// a new thread. 
pub fn consume_cms_documents() {
    let consumer: BaseConsumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("enable.auto.commit", "false")
        .set("group.id", "experimentdocument")
        .set("auto.offset.reset", "earliest")
        .set_log_level(RDKafkaLogLevel::Debug)
        .create()
        .expect("invalid consumer config");

    consumer
        .subscribe(&("CmsDocuments"))
        .expect("topic subscribe failed");

    // Continuously poll messages from Kafka
    for msg_result in consumer.iter() {
        // On error just log error message out and continue to next iteration. If
        // the message is successfully read attempt to process it.
        match msg_result {
            Err(e) => println!("failed to retrieve message from Kafka: {:?}", e),
            Ok(msg) => {
                // If the payload is present attempt to read the payload as JSON,
                // otherwise log out message and continue to next iteration
                match msg.payload() {
                    Some(payload) => {
                        // Read the message envelope
                        let event: Result<CmsEvent, serde_json::error::Error> =
                            serde_json::from_slice(payload);
                        // If the payload was successfully deserialized check the type of the
                        // event. If the event matches the expected type attempt to deserialize
                        // the datasource property. If successful store the value.
                        match event {
                            Ok(event) => {
                                // Is the event the type we care about, if so then proceed
                                // attempting to deserialize the datasource
                                if event.r#type.eq(EXPERIMENT_DOCUMENT_TYPE) {
                                    let test_def: Result<TestDefinition, serde_json::error::Error> =
                                        serde_json::from_value(event.datasource);
                                    match test_def {
                                        Ok(td) => println!("found one!"), // todo: this would actually call a fn to store the data somewhere
                                        Err(e) => println!("failed to deserialize message {:?}, offset {:?}", e, msg.offset()),
                                    }
                                }
                            }
                            Err(e) => println!("failed to deserialize message {:?}", e),
                        }
                    }
                    _ => println!("message payload was empty"),
                }
            }
        }
    }
}

I did make an attempt to break into smaller pieces but I was running into challenges because I was trying to return data that the closure owned. I was hoping by breaking into smaller pieces the result would be more readable code, that was 4 levels deep into match statements.

for msg_result in consumer.iter() {
    let payload = msg_result.and_then(|msg| {
        msg.payload().ok_or(KafkaError::Global(RDKafkaErrorCode::InvalidMessageSize))
    });

    match payload {
        Ok(data) => {
            // todo: do something with data
            println!("{:?}", data)
        }
        Err(e) => println!("Oh nooo we broke")
    }
}

The error:

error(E0515): cannot return value referencing function parameter `msg`


--> src/kafka.rs:153:13
    |
153 |             msg.payload().ok_or(KafkaError::Global(RDKafkaErrorCode::InvalidMessageSize))
    |             ---^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |             |
    |             returns a value referencing data owned by the current function
    |             `msg` is borrowed here