tx_indexer/handler/
retry.rs1use crate::{
2 error::{ErrorPolicy, ErrorPolicyProvider},
3 handler::{callback::EventHandler, chain_event::parse_oura_event},
4 progress_tracker::ProgressTracker,
5};
6use oura::model as oura;
7use std::{fmt::Debug, ops::Mul, time::Duration};
8use strum_macros::Display;
9use tracing::{debug, debug_span, error, warn, warn_span, Instrument};
10
11#[derive(Debug, Copy, Clone)]
15pub struct RetryPolicy {
16 pub max_retries: u32,
17 pub backoff_unit: Duration,
18 pub backoff_factor: u32,
19 pub max_backoff: Duration,
20}
21
22#[derive(Display)]
23enum EventOutcome {
24 Success,
25 FailureExit,
26 FailureSkip,
27 FailureRetry,
28 RetriesExhausted,
29 RetryBackoff,
30}
31
32impl Default for RetryPolicy {
33 fn default() -> Self {
34 Self {
35 max_retries: 20,
36 backoff_unit: Duration::from_millis(5_000),
37 backoff_factor: 2,
38 max_backoff: Duration::from_millis(20 * 5_000),
39 }
40 }
41}
42
43fn compute_backoff_delay(policy: &RetryPolicy, retry: u32) -> Duration {
44 let units = policy.backoff_factor.pow(retry);
45 let backoff = policy.backoff_unit.mul(units);
46 core::cmp::min(backoff, policy.max_backoff)
47}
48
49pub(crate) async fn perform_with_retry<H: EventHandler>(
53 handler: &H,
54 oura_event: oura::Event,
55 policy: &RetryPolicy,
56 progress_tracker: &mut Option<ProgressTracker>,
57) -> Result<(), H::Error> {
58 let span = debug_span!("perform_with_retry");
59 let _enter = span.enter();
60
61 match parse_oura_event(oura_event, progress_tracker) {
62 Ok(Some(event)) => {
63 let mut retry = 0;
66
67 loop {
68 let span = debug_span!("TryingOperation", retry_count = retry);
70 let res = async {
71 let result = handler.handle(event.clone())
72 .instrument(debug_span!("UserDefinedHandler")).await;
73
74 match result {
75 Ok(_) => {
76 debug!(label=%EventOutcome::Success);
77 Some(Ok(()))
78 }
79 Err(err) => match err.get_error_policy() {
80 ErrorPolicy::Exit => {
81 error!(label=%EventOutcome::FailureExit);
82 Some(Err(err))
83 }
84 ErrorPolicy::Skip => {
85 warn!(label=%EventOutcome::FailureSkip, err=?err);
86 Some(Ok(()))
87 }
88 ErrorPolicy::Call(err_f) => warn_span!("OperationFailureCall").in_scope(|| {
89 err_f(err);
90 Some(Ok(()))
91 }),
92 ErrorPolicy::Retry if retry < policy.max_retries => {
93 warn!(label=%EventOutcome::FailureRetry, err=?err);
94
95 retry += 1;
96
97 let backoff = compute_backoff_delay(policy, retry);
98
99 debug!(label=%EventOutcome::RetryBackoff, backoff_secs=backoff.as_secs());
100
101 std::thread::sleep(backoff);
102
103 None
104 }
105 _ => {
106 debug!(label=%EventOutcome::RetriesExhausted);
107 Some(Err(err))
108 }
109 },
110 }
111 }
112 .instrument(span)
113 .await;
114
115 if let Some(res) = res {
116 break res;
117 }
118 }
119 }
120 Ok(None) => Ok(()),
121 Err(err) => {
122 error!(err = ?err);
123
124 Ok(())
125 }
126 }
127}