tx_indexer/handler/
callback.rs

1use crate::{
2    error::ErrorPolicyProvider,
3    handler::{
4        chain_event::ChainEvent,
5        retry::{perform_with_retry, RetryPolicy},
6    },
7    progress_tracker::ProgressTracker,
8};
9use oura::{
10    pipelining::{BootstrapResult, SinkProvider, StageReceiver},
11    utils::Utils,
12};
13use std::{future::Future, sync::Arc};
14use strum_macros::Display;
15use tokio::runtime::Runtime;
16use tracing::{event, span, Instrument, Level};
17
18pub trait EventHandler
19where
20    Self: Clone + Send + 'static,
21{
22    type Error: std::error::Error + ErrorPolicyProvider;
23
24    fn handle(&self, event: ChainEvent) -> impl Future<Output = Result<(), Self::Error>>;
25}
26
27/// This is a custom made sink for Oura. Based on a callback function.
28/// The idea is similar to a webhook, but instead of calling a web endpoint - we call a function directly.
29pub(crate) struct Callback<H: EventHandler> {
30    pub(crate) handler: H,
31    pub(crate) retry_policy: RetryPolicy,
32    pub(crate) utils: Arc<Utils>,
33    pub(crate) progress_tracker: Option<ProgressTracker>,
34}
35
36impl<H: EventHandler> Callback<H> {
37    pub fn new(
38        handler: H,
39        retry_policy: RetryPolicy,
40        utils: Arc<Utils>,
41        progress_tracker: Option<ProgressTracker>,
42    ) -> Self {
43        Self {
44            handler,
45            retry_policy,
46            utils,
47            progress_tracker,
48        }
49    }
50}
51
52impl<H: EventHandler> SinkProvider for Callback<H> {
53    fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
54        let span = span!(Level::DEBUG, "Callback::bootstrap");
55        let _enter = span.enter();
56
57        let retry_policy = self.retry_policy;
58        let utils = self.utils.clone();
59        let handler = self.handler.clone();
60        let progress_tracker = self.progress_tracker.clone();
61
62        let handle = span!(Level::DEBUG, "SpawningThread").in_scope(|| {
63            std::thread::spawn(move || {
64                let span = span!(Level::DEBUG, "EventHandlingThread");
65                let _enter = span.enter();
66
67                // Running async function sycnhronously within another thread.
68                let rt = Runtime::new().unwrap();
69                rt.block_on(handle_event(
70                    handler,
71                    input,
72                    &retry_policy,
73                    utils,
74                    progress_tracker,
75                ))
76                .map_err(|err| {
77                    event!(Level::ERROR, label=%Events::EventHandlerFailure, ?err);
78                    err
79                })
80                .expect("request loop failed");
81            })
82        });
83
84        Ok(handle)
85    }
86}
87
88// Handle a sequence of events transmitted at once.
89async fn handle_event<'a, H: EventHandler>(
90    handler: H,
91    input: StageReceiver,
92    retry_policy: &RetryPolicy,
93    utils: Arc<Utils>,
94    mut progress_tracker: Option<ProgressTracker>,
95) -> Result<(), H::Error> {
96    let span = span!(Level::DEBUG, "handle_event");
97    let _enter = span.enter();
98    for chain_event in input.into_iter() {
99        let span = span!(
100          Level::DEBUG,
101          "HandlingEvent",
102          context=?chain_event.context
103        );
104        // Have to clone twice here to please the borrow checker...
105        perform_with_retry(
106            &handler,
107            chain_event.clone(),
108            retry_policy,
109            &mut progress_tracker,
110        )
111        .instrument(span)
112        .await
113        // Notify progress to the pipeline.
114        .map(|_| utils.track_sink_progress(&chain_event))?;
115        // ^ This will exit the loop if an error is returned.
116        // After all, `perform_with_retry` will only return error if all other options,
117        // based on `ErrorPolicy`, were exhausted.
118    }
119    // All chain events in this sequence have been handled.
120    Ok(())
121}
122
123#[derive(Display)]
124pub enum Events {
125    EventHandlerFailure,
126}