rust – Async streaming of Reddit posts

roux is a Rust library that wraps the Reddit API. Neither Reddit nor roux provides a stream-like interface for obtaining the latest submissions automatically, so that’s what I’ve implemented here.

I’m learning Rust as a hobby (I’m a Python developer by trade) and this is my first “serious” Rust project. Please feel free to point out anything that sticks out to you. The following points are particularly interesting to me:

  • Am I using references and owned values appropriately (e.g. no unnecessary cloning)?

  • Is the interface provided by my lib.rs fitting for this use case (async streaming of items) and idiomatic?

  • As a Rust newbie, the types in the where declarations are hard to understand. Would it be useful to introduce some aliases?

  • Is my approach to error handling OK?

lib.rs:

#!(warn(missing_docs))

/*!
Streaming API for `roux`

Reddit's API does not provide "firehose"-style streaming of new posts and
comments. Instead, the endpoint for retrieving the latest posts has to be
polled regularly. This crate automates that task and provides a stream
for a subreddit's posts (submissions).

See (`stream_subreddit_submissions`) for details.

# Logging

This module uses the logging infrastructure provided by the (`log`) crate.
*/

use futures::{Sink, SinkExt};
use log::{debug, warn};
use roux::subreddit::responses::SubmissionsData;
use roux::{util::RouxError, Subreddit};
use std::collections::HashSet;
use std::marker::Unpin;
use tokio::time::{sleep, Duration};
use tokio_retry::Retry;

/// Error that may happen when streaming submissions
#(derive(Debug))
pub enum SubmissionStreamError<S>
where
    S: Sink<SubmissionsData> + Unpin,
{
    /// An issue with getting the data from Reddit
    Roux(RouxError),

    /// An issue with sending the data through the sink
    Sink(S::Error),
}

/**
Stream new submissions in a subreddit

The subreddit is polled regularly for new submissions, and each previously
unseen submission is sent into the sink.

`sleep_time` controls the interval between calls to the Reddit API, and depends
on how much traffic the subreddit has. Each call fetches the 100 latest items
(the maximum number allowed by Reddit). A warning is logged if none of those
items has been seen in the previous call: this indicates a potential miss of new
content and suggests that a smaller `sleep_time` should be chosen.

`retry_strategy` controls how to deal with errors that occur while fetching
content from Reddit. See (`tokio_retry::strategy`).
*/
pub async fn stream_subreddit_submissions<S, R, I>(
    subreddit: &Subreddit,
    mut sink: S,
    sleep_time: Duration,
    retry_strategy: &R,
) -> Result<(), SubmissionStreamError<S>>
where
    S: Sink<SubmissionsData> + Unpin,
    R: IntoIterator<IntoIter = I, Item = Duration> + Clone,
    I: Iterator<Item = Duration>,
{
    // How many submissions to fetch per request
    const LIMIT: u32 = 100;
    let mut seen_ids: HashSet<String> = HashSet::new();

    loop {
        let latest_submissions =
            Retry::spawn(retry_strategy.clone(), || subreddit.latest(LIMIT, None))
                .await
                .map_err(SubmissionStreamError::Roux)?
                .data
                .children
                .into_iter()
                .map(|thing| thing.data);

        let mut latest_ids: HashSet<String> = HashSet::new();

        let mut num_new = 0;
        for submission in latest_submissions {
            latest_ids.insert(submission.id.clone());
            if !seen_ids.contains(&submission.id) {
                num_new += 1;
                sink.send(submission)
                    .await
                    .map_err(SubmissionStreamError::Sink)?
            }
        }

        debug!(
            "Got {} new submissions for r/{} (out of {})",
            num_new, subreddit.name, LIMIT
        );
        if num_new == LIMIT && !seen_ids.is_empty() {
            warn!(
                "All received submissions for r/{} were new, try a shorter sleep_time",
                subreddit.name
            );
        }

        seen_ids = latest_ids;
        sleep(sleep_time).await;
    }
}

main.rs:

use futures::{channel::mpsc, Stream, StreamExt};
use roux::{subreddit::responses::SubmissionsData, Subreddit};
use tokio;
use tokio::time::Duration;
use tokio_retry::strategy::{jitter, ExponentialBackoff};

use subreddit_dumper;

async fn submission_reader<S>(stream: &mut S)
where
    S: Stream<Item = SubmissionsData> + Unpin,
{
    while let Some(submission) = stream.next().await {
        println!(
            "New submission in r/{} by {}",
            submission.subreddit, submission.author
        );
    }
}

#(tokio::main)
async fn main() {
    // Initialize logging
    stderrlog::new()
        .module(module_path!())
        .verbosity(3)
        .init()
        .unwrap();

    let subreddit = Subreddit::new("AskReddit");

    let (mut submission_sender, mut submission_receiver) = mpsc::unbounded();

    let retry_strategy = ExponentialBackoff::from_millis(100)
        .map(jitter) // add jitter to delays
        .take(3); // limit to 3 retries

    let (submission_res, _) = tokio::join!(
        subreddit_dumper::stream_subreddit_submissions(
            &subreddit,
            &mut submission_sender,
            Duration::from_secs(60),
            &retry_strategy,
        ),
        submission_reader(&mut submission_receiver),
    );
    submission_res.unwrap();
}