tx_indexer/
indexer.rs

1use std::thread::JoinHandle;
2use std::{path::PathBuf, sync::Arc};
3
4use anyhow::{anyhow, Result};
5use futures::stream;
6use oura::{
7    pipelining::{FilterProvider, SinkProvider, SourceProvider},
8    sources::{AddressArg, BearerKind},
9    utils::{Utils, WithUtils},
10    Error,
11};
12use tracing::{span, Level};
13
14use crate::{
15    config::{
16        n2c_config, n2n_config, NetworkConfig, NodeAddress, TxIndexerConfig, TxIndexerSource,
17    },
18    filter::Filter,
19    handler::{
20        callback::{Callback, EventHandler},
21        retry::RetryPolicy,
22    },
23    progress_tracker::ProgressTracker,
24};
25
26// Structure holding the thread handles associated to the indexer. These threads are never-ending.
27pub enum TxIndexer {
28    CardanoNode {
29        source_handle: JoinHandle<()>,
30        filter_handle: JoinHandle<()>,
31        sink_handle: JoinHandle<()>,
32    },
33
34    FixtureFiles {
35        handle: JoinHandle<()>,
36    },
37}
38
39impl TxIndexer {
40    // This is based on: https://github.com/txpipe/oura/blob/27fb7e876471b713841d96e292ede40101b151d7/src/bin/oura/daemon.rs
41    pub async fn run<H: EventHandler>(
42        conf: TxIndexerConfig<H>,
43    ) -> Result<TxIndexer, anyhow::Error> {
44        let span = span!(Level::INFO, "Run TxIndexer");
45        let _enter = span.enter();
46
47        match conf.source {
48            TxIndexerSource::CardanoNode {
49                node_address,
50                network,
51                since_slot,
52                safe_block_depth,
53                event_filter,
54            } => source_from_cardano_node(
55                conf.handler,
56                node_address,
57                network,
58                since_slot,
59                safe_block_depth,
60                event_filter,
61                conf.retry_policy,
62            )
63            .map_err(|err| anyhow!(err.to_string())),
64
65            TxIndexerSource::FixtureFiles { dir_path } => {
66                source_from_files(conf.handler, dir_path).await
67            }
68        }
69    }
70
71    pub fn join(self) -> Result<(), anyhow::Error> {
72        match self {
73            TxIndexer::CardanoNode {
74                source_handle,
75                filter_handle,
76                sink_handle,
77            } => {
78                sink_handle
79                    .join()
80                    .map_err(|err| anyhow!("error in sink thread: {}", any_err_to_string(err)))?;
81                filter_handle
82                    .join()
83                    .map_err(|err| anyhow!("error in filter thread: {}", any_err_to_string(err)))?;
84                source_handle
85                    .join()
86                    .map_err(|err| anyhow!("error in source thread: {}", any_err_to_string(err)))?;
87            }
88            TxIndexer::FixtureFiles { handle } => handle
89                .join()
90                .map_err(|err| anyhow!("error in thread: {}", any_err_to_string(err)))?,
91        }
92        Ok(())
93    }
94}
95
96fn any_err_to_string(err: Box<dyn std::any::Any>) -> String {
97    if let Some(str) = err.downcast_ref::<String>() {
98        String::from(str)
99    } else {
100        String::from("Cannot print")
101    }
102}
103
104fn source_from_cardano_node(
105    handler: impl EventHandler,
106    node_address: NodeAddress,
107    network: NetworkConfig,
108    since_slot: Option<(u64, String)>,
109    safe_block_depth: usize,
110    event_filter: Filter,
111    retry_policy: RetryPolicy,
112) -> Result<TxIndexer, Error> {
113    let chain = network.to_chain_info()?;
114
115    let progress_tracker = match since_slot {
116        Some((since_slot, _)) => Some(ProgressTracker::new(since_slot, &chain)?),
117        None => None,
118    };
119
120    let utils = Arc::new(Utils::new(chain));
121
122    let (source_handle, source_rx) = match node_address {
123        NodeAddress::UnixSocket(path) => {
124            span!(Level::INFO, "BootstrapSourceViaSocket", socket_path = path).in_scope(|| {
125                WithUtils::new(
126                    n2c_config(
127                        AddressArg(BearerKind::Unix, path),
128                        network.to_magic_arg(),
129                        since_slot.clone(),
130                        safe_block_depth,
131                    ),
132                    utils.clone(),
133                )
134                .bootstrap()
135            })
136        }
137        NodeAddress::TcpAddress(hostname, port) => {
138            span!(Level::INFO, "BootstrapSourceViaTcp", hostname, port).in_scope(|| {
139                WithUtils::new(
140                    n2n_config(
141                        AddressArg(BearerKind::Tcp, format!("{}:{}", hostname, port)),
142                        network.to_magic_arg(),
143                        since_slot.clone(),
144                        safe_block_depth,
145                    ),
146                    utils.clone(),
147                )
148                .bootstrap()
149            })
150        }
151    }?;
152
153    // Optionally create a filter handle (if filter was provided)
154    let (filter_handle, filter_rx) = event_filter.to_selection_config().bootstrap(source_rx)?;
155
156    let sink_handle = span!(Level::INFO, "BootstrapSink").in_scope(|| {
157        Callback::new(handler, retry_policy, utils, progress_tracker).bootstrap(filter_rx)
158    })?;
159
160    Ok(TxIndexer::CardanoNode {
161        source_handle,
162        filter_handle,
163        sink_handle,
164    })
165}
166
167async fn source_from_files(
168    handler: impl EventHandler,
169    dir_path: PathBuf,
170) -> Result<TxIndexer, anyhow::Error> {
171    use futures::stream::{StreamExt, TryStreamExt};
172    use tokio::fs;
173    use tokio::runtime::Runtime;
174
175    let mut files = std::fs::read_dir(dir_path)
176        .map_err(|err| anyhow!(err))?
177        .collect::<Result<Vec<_>, _>>()?;
178
179    files.sort_by_key(|entry| entry.file_name());
180
181    let file_stream = stream::iter(files);
182
183    let handle = std::thread::spawn(|| {
184        let rt = Runtime::new().unwrap();
185        rt.block_on(async move {
186            let handler = &handler;
187            let _: Vec<()> = file_stream
188                .filter_map(|dir_entry| async move {
189                    let path = dir_entry.path();
190
191                    if let Some(ext) = path.extension() {
192                        if ext == "json" {
193                            return Some(path);
194                        }
195                    };
196                    None
197                })
198                .then(|path| async move {
199                    let bytes = fs::read(path).await.map_err(|err| anyhow!(err))?;
200
201                    let chain_event = serde_json::from_slice(&bytes).map_err(|err| anyhow!(err))?;
202
203                    handler
204                        .handle(chain_event)
205                        .await
206                        .map_err(|err| anyhow!(err.to_string()))
207                })
208                .try_collect()
209                .await
210                .unwrap();
211        })
212    });
213
214    Ok(TxIndexer::FixtureFiles { handle })
215}