tx_indexer/handler/
retry.rs

1use crate::{
2    error::{ErrorPolicy, ErrorPolicyProvider},
3    handler::{callback::EventHandler, chain_event::parse_oura_event},
4    progress_tracker::ProgressTracker,
5};
6use oura::model as oura;
7use std::{fmt::Debug, ops::Mul, time::Duration};
8use strum_macros::Display;
9use tracing::{debug, debug_span, error, warn, warn_span, Instrument};
10
11/// Influence retrying behavior.
12/// i.e How many times and how often a failed operation should be retried.
13/// Given we are dealing with `ErrorPolicy::Retry`
14#[derive(Debug, Copy, Clone)]
15pub struct RetryPolicy {
16    pub max_retries: u32,
17    pub backoff_unit: Duration,
18    pub backoff_factor: u32,
19    pub max_backoff: Duration,
20}
21
22#[derive(Display)]
23enum EventOutcome {
24    Success,
25    FailureExit,
26    FailureSkip,
27    FailureRetry,
28    RetriesExhausted,
29    RetryBackoff,
30}
31
32impl Default for RetryPolicy {
33    fn default() -> Self {
34        Self {
35            max_retries: 20,
36            backoff_unit: Duration::from_millis(5_000),
37            backoff_factor: 2,
38            max_backoff: Duration::from_millis(20 * 5_000),
39        }
40    }
41}
42
43fn compute_backoff_delay(policy: &RetryPolicy, retry: u32) -> Duration {
44    let units = policy.backoff_factor.pow(retry);
45    let backoff = policy.backoff_unit.mul(units);
46    core::cmp::min(backoff, policy.max_backoff)
47}
48
49/// Wrap an operation with retry logic.
50/// Retrying is based on ErrorPolicy associated with particular error.
51/// Retries are only performed for ErrorPolicy::Retry - other errors won't cause invocation of given operation again.
52pub(crate) async fn perform_with_retry<H: EventHandler>(
53    handler: &H,
54    oura_event: oura::Event,
55    policy: &RetryPolicy,
56    progress_tracker: &mut Option<ProgressTracker>,
57) -> Result<(), H::Error> {
58    let span = debug_span!("perform_with_retry");
59    let _enter = span.enter();
60
61    match parse_oura_event(oura_event, progress_tracker) {
62        Ok(Some(event)) => {
63            // The retry logic is based on:
64            // https://github.com/txpipe/oura/blob/27fb7e876471b713841d96e292ede40101b151d7/src/utils/retry.rs
65            let mut retry = 0;
66
67            loop {
68                // TODO(szg251): Handle errors properly
69                let span = debug_span!("TryingOperation", retry_count = retry);
70                let res = async {
71                    let result = handler.handle(event.clone())
72                        .instrument(debug_span!("UserDefinedHandler")).await;
73
74                    match result {
75                        Ok(_) => {
76                            debug!(label=%EventOutcome::Success);
77                            Some(Ok(()))
78                        }
79                        Err(err) => match err.get_error_policy() {
80                            ErrorPolicy::Exit => {
81                                error!(label=%EventOutcome::FailureExit);
82                                Some(Err(err))
83                            }
84                            ErrorPolicy::Skip => {
85                                warn!(label=%EventOutcome::FailureSkip, err=?err);
86                                Some(Ok(()))
87                            }
88                            ErrorPolicy::Call(err_f) => warn_span!("OperationFailureCall").in_scope(|| {
89                                err_f(err);
90                                Some(Ok(()))
91                            }),
92                            ErrorPolicy::Retry if retry < policy.max_retries => {
93                                warn!(label=%EventOutcome::FailureRetry, err=?err);
94
95                                retry += 1;
96
97                                let backoff = compute_backoff_delay(policy, retry);
98
99                                debug!(label=%EventOutcome::RetryBackoff, backoff_secs=backoff.as_secs());
100
101                                std::thread::sleep(backoff);
102
103                                None
104                            }
105                            _ => {
106                                debug!(label=%EventOutcome::RetriesExhausted);
107                                Some(Err(err))
108                            }
109                        },
110                    }
111                }
112                .instrument(span)
113                .await;
114
115                if let Some(res) = res {
116                    break res;
117                }
118            }
119        }
120        Ok(None) => Ok(()),
121        Err(err) => {
122            error!(err = ?err);
123
124            Ok(())
125        }
126    }
127}