tx_indexer/handler/
callback.rs1use crate::{
2 error::ErrorPolicyProvider,
3 handler::{
4 chain_event::ChainEvent,
5 retry::{perform_with_retry, RetryPolicy},
6 },
7 progress_tracker::ProgressTracker,
8};
9use oura::{
10 pipelining::{BootstrapResult, SinkProvider, StageReceiver},
11 utils::Utils,
12};
13use std::{future::Future, sync::Arc};
14use strum_macros::Display;
15use tokio::runtime::Runtime;
16use tracing::{event, span, Instrument, Level};
17
18pub trait EventHandler
19where
20 Self: Clone + Send + 'static,
21{
22 type Error: std::error::Error + ErrorPolicyProvider;
23
24 fn handle(&self, event: ChainEvent) -> impl Future<Output = Result<(), Self::Error>>;
25}
26
27pub(crate) struct Callback<H: EventHandler> {
30 pub(crate) handler: H,
31 pub(crate) retry_policy: RetryPolicy,
32 pub(crate) utils: Arc<Utils>,
33 pub(crate) progress_tracker: Option<ProgressTracker>,
34}
35
36impl<H: EventHandler> Callback<H> {
37 pub fn new(
38 handler: H,
39 retry_policy: RetryPolicy,
40 utils: Arc<Utils>,
41 progress_tracker: Option<ProgressTracker>,
42 ) -> Self {
43 Self {
44 handler,
45 retry_policy,
46 utils,
47 progress_tracker,
48 }
49 }
50}
51
52impl<H: EventHandler> SinkProvider for Callback<H> {
53 fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
54 let span = span!(Level::DEBUG, "Callback::bootstrap");
55 let _enter = span.enter();
56
57 let retry_policy = self.retry_policy;
58 let utils = self.utils.clone();
59 let handler = self.handler.clone();
60 let progress_tracker = self.progress_tracker.clone();
61
62 let handle = span!(Level::DEBUG, "SpawningThread").in_scope(|| {
63 std::thread::spawn(move || {
64 let span = span!(Level::DEBUG, "EventHandlingThread");
65 let _enter = span.enter();
66
67 let rt = Runtime::new().unwrap();
69 rt.block_on(handle_event(
70 handler,
71 input,
72 &retry_policy,
73 utils,
74 progress_tracker,
75 ))
76 .map_err(|err| {
77 event!(Level::ERROR, label=%Events::EventHandlerFailure, ?err);
78 err
79 })
80 .expect("request loop failed");
81 })
82 });
83
84 Ok(handle)
85 }
86}
87
88async fn handle_event<'a, H: EventHandler>(
90 handler: H,
91 input: StageReceiver,
92 retry_policy: &RetryPolicy,
93 utils: Arc<Utils>,
94 mut progress_tracker: Option<ProgressTracker>,
95) -> Result<(), H::Error> {
96 let span = span!(Level::DEBUG, "handle_event");
97 let _enter = span.enter();
98 for chain_event in input.into_iter() {
99 let span = span!(
100 Level::DEBUG,
101 "HandlingEvent",
102 context=?chain_event.context
103 );
104 perform_with_retry(
106 &handler,
107 chain_event.clone(),
108 retry_policy,
109 &mut progress_tracker,
110 )
111 .instrument(span)
112 .await
113 .map(|_| utils.track_sink_progress(&chain_event))?;
115 }
119 Ok(())
121}
122
123#[derive(Display)]
124pub enum Events {
125 EventHandlerFailure,
126}