Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

example of using PaginableApiResult? #8

Open
db48x opened this issue Jul 28, 2022 · 4 comments
Open

example of using PaginableApiResult? #8

db48x opened this issue Jul 28, 2022 · 4 comments

Comments

@db48x
Copy link

db48x commented Jul 28, 2022

Could you include or point to some existing code that uses the PaginableApiResult to fetch a list, such as a user’s list of followers?

My first several attempts to write this worked, but are super ugly. I feel like I am missing an obvious way to write it that would be readable or even beautiful. I used the governor crate to rate–limit the requests, but I haven’t yet done anything to retry transient errors; I feel like it’s ugly enough already.

@jpopesculian
Copy link
Owner

Hi @db48x, would you mind posting an example of your code and maybe highlight the pain points? Maybe I can give some tips, or maybe there's a way to make the API more friendly

@db48x
Copy link
Author

db48x commented Aug 9, 2022

After going back and forth a bit, here is what I currently have:

pub async fn get_all_pages<Auth, T>(rl: &DefaultRateLimiter,
                                initial_response: twitter_v2::Result<Option<ApiResponse<Auth, Vec<T>, ResultCountMeta>>>,
                                expected: Option<usize>) -> Result<Vec<T>>
where Auth: Authorization + Send + Sync + std::fmt::Debug,
      T: serde::de::DeserializeOwned + Clone + Debug + Send + Sync {
  let mut items: Vec<T> = match expected {
    Some(v) => Vec::with_capacity(v),
    _ => Vec::new(),
  };
  let mut response: twitter_v2::Result<Option<ApiResponse<Auth, Vec<T>, ResultCountMeta>>> = initial_response;
  loop {
    match response {
      // BUG(db48x): no way to retry if the initial request got a 429
      Err(e) => return Err(anyhow!(e)),
      Ok(None) => break,
      Ok(Some(ref r)) => {
        if let Some(new_items) = r.data() {
          items.extend_from_slice(&new_items);
        }
        if matches!(r.meta(), Some(meta) if meta.next_token().is_some()) {
          rl.until_ready().await;
          let new_response = r.next_page().await;
          match new_response {
            Err(twitter_v2::Error::Request(ref e)) if e.is_timeout() || e.is_connect() || e.status() == Some(reqwest::StatusCode::TOO_MANY_REQUESTS) => {
              warn!(error=?e, "retrying due to request error");
            },
            Err(twitter_v2::Error::Api(ref e)) if e.status == reqwest::StatusCode::TOO_MANY_REQUESTS => {
              warn!(error=?e, "retrying due to 429 response");
            },
            Err(e) => return Err(anyhow!(e)),
            _ => {
              response = new_response;
            }
          }
        } else {
          break;
        }
      }
    }
  }
  Ok(items)
}

A caller would look like this:

pub async fn get_list_members<Auth>(api: &TwitterApi<Auth>, rl: &DefaultRateLimiter, list_id: u64) -> Result<Vec<User>>
where Auth: Authorization + Send + Sync + std::fmt::Debug {
  rl.until_ready().await;
  let response = api.get_list_members(list_id)
    .user_fields([UserField::Id, UserField::Name])
    .max_results(100)
    .send()
    .await
    .map(|response| Some(response));
  get_all_pages(rl, response, None)
    .await
    .with_context(|| format!("Failed to fetch list members for list {list_id}"))
}

Three things about it still annoy me:

  • First, the caller has to remember to wait on the rate limiter.
  • Second, it is still possible for a request to happen too soon and get a 429 or for it to get any other type of transient error (connection timeout), but it only handles those for the following pages not for the initial request.
  • Finally, this produces a Vec<T> rather than a Stream<Item=T>. (I imagined that once I got it written well it wouldn’t be too hard to turn it into a stream, but my first attempt didn’t go well so who knows.)

I’m going to spend a few hours today to see if I can solve the second problem by writing a function that takes a closure returning a response and recalls the closure if the response is one that can be retried. I am hoping that will simplify get_all_pages too.

@db48x
Copy link
Author

db48x commented Aug 9, 2022

Oh, the fourth thing that annoys me is the .map(|response| Some(response)) in the callers.

@db48x
Copy link
Author

db48x commented Aug 9, 2022

Here it is with retrying pulled out as a separate function. A little better perhaps, but still plenty of annoyances. Also, async closures are nightly–only; I had forgotten about that.

type DefaultRateLimiter = governor::RateLimiter<governor::state::NotKeyed, governor::state::InMemoryState, governor::clock::DefaultClock>;
type OptionResponse<Auth, T, Meta> = twitter_v2::Result<Option<twitter_v2::ApiResponse<Auth, Vec<T>, Meta>>>;
type Response<Auth, T, Meta> = twitter_v2::Result<twitter_v2::ApiResponse<Auth, Vec<T>, Meta>>;

pub async fn get_list_members<Auth>(api: &TwitterApi<Auth>, rl: &DefaultRateLimiter, list_id: u64) -> Result<Vec<User>>
where Auth: Authorization + Send + Sync + std::fmt::Debug {
  get_all_pages(rl,
                async || {
                  api.get_list_members(list_id)
                    .user_fields([UserField::Id, UserField::Name])
                    .max_results(100)
                    .send()
                    .await
                },
                None)
    .await
    .with_context(|| format!("Failed to fetch list members for list {list_id}"))
}

pub async fn retry<Auth, T, Meta, F, Fut>(rl: &DefaultRateLimiter, func: F) -> OptionResponse<Auth, T, Meta>
where Auth: Authorization + Send + Sync + std::fmt::Debug,
      T: serde::de::DeserializeOwned + Clone + Debug + Send + Sync,
      F: Fn() -> Fut,
      Fut: std::future::Future<Output = OptionResponse<Auth, T, Meta>> {
  loop {
    rl.until_ready().await;
    match func().await {
      Err(twitter_v2::Error::Request(ref e)) if e.is_timeout() || e.is_connect() => {
        warn!(error=?e, "retrying due to request error");
      },
      Err(twitter_v2::Error::Api(ref e)) if e.status == reqwest::StatusCode::TOO_MANY_REQUESTS => {
        warn!(error=?e, "retrying due to 429 response");
        // TODO(db48x): extend the twitter_v2 crate to expose the
        //   rate–limiting information provided by the twitter api
      },
      response => {
        if let Ok(Some(ref r)) = response {
          tracing::info!(url=%r.url(), "success");
        }
        return response;
      }
    }
  }
}

pub async fn get_all_pages<Auth, T, Meta, F, Fut>(rl: &DefaultRateLimiter,
                                                  func: F,
                                                  expected: Option<usize>) -> Result<Vec<T>>
where Auth: Authorization + Send + Sync + std::fmt::Debug,
      T: serde::de::DeserializeOwned + Clone + Debug + Send + Sync,
      Meta: twitter_v2::meta::PaginationMeta + serde::de::DeserializeOwned + Send + Sync,
      F: Fn() -> Fut,
      Fut: std::future::Future<Output = Response<Auth, T, Meta>> {
  let mut items: Vec<T> = match expected {
    Some(v) => Vec::with_capacity(v),
    _ => Vec::new(),
  };
  let mut response: OptionResponse<Auth, T, Meta> = retry(rl, async || func().await.map(Some)).await;
  loop {
    match response {
      Err(e) => return Err(anyhow!(e)),
      Ok(None) => break,
      Ok(Some(ref r)) => {
        if let Some(new_items) = r.data() {
          items.extend_from_slice(&new_items);
        }
        if matches!(r.meta(), Some(meta) if meta.next_token().is_some()) {
          response = retry(rl, async || r.next_page().await).await;
        } else {
          break;
        }
      }
    }
  }
  Ok(items)
}

Note: edited to remove the fourth annoyance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants