1use std::thread::JoinHandle;
2use std::{path::PathBuf, sync::Arc};
3
4use anyhow::{anyhow, Result};
5use futures::stream;
6use oura::{
7 pipelining::{FilterProvider, SinkProvider, SourceProvider},
8 sources::{AddressArg, BearerKind},
9 utils::{Utils, WithUtils},
10 Error,
11};
12use tracing::{span, Level};
13
14use crate::{
15 config::{
16 n2c_config, n2n_config, NetworkConfig, NodeAddress, TxIndexerConfig, TxIndexerSource,
17 },
18 filter::Filter,
19 handler::{
20 callback::{Callback, EventHandler},
21 retry::RetryPolicy,
22 },
23 progress_tracker::ProgressTracker,
24};
25
26pub enum TxIndexer {
28 CardanoNode {
29 source_handle: JoinHandle<()>,
30 filter_handle: JoinHandle<()>,
31 sink_handle: JoinHandle<()>,
32 },
33
34 FixtureFiles {
35 handle: JoinHandle<()>,
36 },
37}
38
39impl TxIndexer {
40 pub async fn run<H: EventHandler>(
42 conf: TxIndexerConfig<H>,
43 ) -> Result<TxIndexer, anyhow::Error> {
44 let span = span!(Level::INFO, "Run TxIndexer");
45 let _enter = span.enter();
46
47 match conf.source {
48 TxIndexerSource::CardanoNode {
49 node_address,
50 network,
51 since_slot,
52 safe_block_depth,
53 event_filter,
54 } => source_from_cardano_node(
55 conf.handler,
56 node_address,
57 network,
58 since_slot,
59 safe_block_depth,
60 event_filter,
61 conf.retry_policy,
62 )
63 .map_err(|err| anyhow!(err.to_string())),
64
65 TxIndexerSource::FixtureFiles { dir_path } => {
66 source_from_files(conf.handler, dir_path).await
67 }
68 }
69 }
70
71 pub fn join(self) -> Result<(), anyhow::Error> {
72 match self {
73 TxIndexer::CardanoNode {
74 source_handle,
75 filter_handle,
76 sink_handle,
77 } => {
78 sink_handle
79 .join()
80 .map_err(|err| anyhow!("error in sink thread: {}", any_err_to_string(err)))?;
81 filter_handle
82 .join()
83 .map_err(|err| anyhow!("error in filter thread: {}", any_err_to_string(err)))?;
84 source_handle
85 .join()
86 .map_err(|err| anyhow!("error in source thread: {}", any_err_to_string(err)))?;
87 }
88 TxIndexer::FixtureFiles { handle } => handle
89 .join()
90 .map_err(|err| anyhow!("error in thread: {}", any_err_to_string(err)))?,
91 }
92 Ok(())
93 }
94}
95
96fn any_err_to_string(err: Box<dyn std::any::Any>) -> String {
97 if let Some(str) = err.downcast_ref::<String>() {
98 String::from(str)
99 } else {
100 String::from("Cannot print")
101 }
102}
103
104fn source_from_cardano_node(
105 handler: impl EventHandler,
106 node_address: NodeAddress,
107 network: NetworkConfig,
108 since_slot: Option<(u64, String)>,
109 safe_block_depth: usize,
110 event_filter: Filter,
111 retry_policy: RetryPolicy,
112) -> Result<TxIndexer, Error> {
113 let chain = network.to_chain_info()?;
114
115 let progress_tracker = match since_slot {
116 Some((since_slot, _)) => Some(ProgressTracker::new(since_slot, &chain)?),
117 None => None,
118 };
119
120 let utils = Arc::new(Utils::new(chain));
121
122 let (source_handle, source_rx) = match node_address {
123 NodeAddress::UnixSocket(path) => {
124 span!(Level::INFO, "BootstrapSourceViaSocket", socket_path = path).in_scope(|| {
125 WithUtils::new(
126 n2c_config(
127 AddressArg(BearerKind::Unix, path),
128 network.to_magic_arg(),
129 since_slot.clone(),
130 safe_block_depth,
131 ),
132 utils.clone(),
133 )
134 .bootstrap()
135 })
136 }
137 NodeAddress::TcpAddress(hostname, port) => {
138 span!(Level::INFO, "BootstrapSourceViaTcp", hostname, port).in_scope(|| {
139 WithUtils::new(
140 n2n_config(
141 AddressArg(BearerKind::Tcp, format!("{}:{}", hostname, port)),
142 network.to_magic_arg(),
143 since_slot.clone(),
144 safe_block_depth,
145 ),
146 utils.clone(),
147 )
148 .bootstrap()
149 })
150 }
151 }?;
152
153 let (filter_handle, filter_rx) = event_filter.to_selection_config().bootstrap(source_rx)?;
155
156 let sink_handle = span!(Level::INFO, "BootstrapSink").in_scope(|| {
157 Callback::new(handler, retry_policy, utils, progress_tracker).bootstrap(filter_rx)
158 })?;
159
160 Ok(TxIndexer::CardanoNode {
161 source_handle,
162 filter_handle,
163 sink_handle,
164 })
165}
166
167async fn source_from_files(
168 handler: impl EventHandler,
169 dir_path: PathBuf,
170) -> Result<TxIndexer, anyhow::Error> {
171 use futures::stream::{StreamExt, TryStreamExt};
172 use tokio::fs;
173 use tokio::runtime::Runtime;
174
175 let mut files = std::fs::read_dir(dir_path)
176 .map_err(|err| anyhow!(err))?
177 .collect::<Result<Vec<_>, _>>()?;
178
179 files.sort_by_key(|entry| entry.file_name());
180
181 let file_stream = stream::iter(files);
182
183 let handle = std::thread::spawn(|| {
184 let rt = Runtime::new().unwrap();
185 rt.block_on(async move {
186 let handler = &handler;
187 let _: Vec<()> = file_stream
188 .filter_map(|dir_entry| async move {
189 let path = dir_entry.path();
190
191 if let Some(ext) = path.extension() {
192 if ext == "json" {
193 return Some(path);
194 }
195 };
196 None
197 })
198 .then(|path| async move {
199 let bytes = fs::read(path).await.map_err(|err| anyhow!(err))?;
200
201 let chain_event = serde_json::from_slice(&bytes).map_err(|err| anyhow!(err))?;
202
203 handler
204 .handle(chain_event)
205 .await
206 .map_err(|err| anyhow!(err.to_string()))
207 })
208 .try_collect()
209 .await
210 .unwrap();
211 })
212 });
213
214 Ok(TxIndexer::FixtureFiles { handle })
215}