1use crate::{
2 from_oura::{FromOura, OuraParseError},
3 progress_tracker::ProgressTracker,
4};
5use anyhow::anyhow;
6use itertools::Itertools;
7use num_bigint::BigInt;
8use oura::model as oura;
9use plutus_ledger_api::v3::{
10 address::Address,
11 datum::{Datum, DatumHash, OutputDatum},
12 redeemer::Redeemer,
13 script::ScriptHash,
14 transaction::{ScriptPurpose, TransactionHash, TransactionInput, TransactionOutput, TxInInfo},
15 value::{CurrencySymbol, Value},
16};
17use serde_with::serde_as;
18use std::fmt::Debug;
19use std::{collections::BTreeMap, sync::atomic::Ordering};
20use tracing::{event, Level};
21
22#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
24pub struct ChainEventTime {
25 pub block_number: u64,
26 pub block_hash: String,
27 pub slot: u64,
28}
29
30#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
32pub enum ChainEvent {
33 TransactionEvent {
35 time: ChainEventTime,
36 transaction: TransactionEventRecord,
37 },
38
39 RollbackEvent { block_slot: u64, block_hash: String },
41
42 SyncProgressEvent {
44 block_slot: u64,
45 block_hash: String,
46 percentage: f32,
47 },
48}
49
50#[serde_as]
52#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
53pub struct TransactionEventRecord {
54 pub hash: TransactionHash,
55 pub fee: u64,
56 pub size: u32,
57
58 pub inputs: Vec<TransactionInput>,
59 pub outputs: Vec<TxInInfo>,
60 pub mint: Value,
61 #[serde_as(as = "Vec<(_, _)>")]
62 pub redeemers: BTreeMap<ScriptPurpose, Redeemer>,
63
64 #[serde_as(as = "Vec<(_, _)>")]
65 pub plutus_data: BTreeMap<DatumHash, Datum>,
66 }
72
73pub fn parse_oura_event(
74 ev: oura::Event,
75 progress_tracker: &mut Option<ProgressTracker>,
76) -> Result<Option<ChainEvent>, OuraParseError> {
77 Ok(match ev.data {
78 oura::EventData::Transaction(tx_rec) => {
79 event!(Level::DEBUG, label="TransactionEvent", transaction_record=?tx_rec);
80
81 Some(ChainEvent::TransactionEvent {
82 time: ChainEventTime {
83 block_hash: ev.context.block_hash.unwrap(),
85 block_number: ev.context.block_number.unwrap(),
86 slot: ev.context.slot.unwrap(),
87 },
88 transaction: TransactionEventRecord::from_oura(tx_rec)?,
89 })
90 }
91 oura::EventData::RollBack {
92 block_slot,
93 block_hash,
94 } => {
95 event!(Level::DEBUG, label="RollbackEvent", block_slot=?block_slot, block_hash=?block_hash);
96 Some(ChainEvent::RollbackEvent {
97 block_slot,
98 block_hash,
99 })
100 }
101 oura::EventData::Block(block_rec) => {
102 event!(Level::DEBUG, label="BlockEvent", block_record=?block_rec);
103 match progress_tracker {
104 Some(progress_tracker) => {
105 let block_slot = block_rec.slot;
106 let block_hash = block_rec.hash;
107
108 let percentage = progress_tracker.get_percentage(block_slot)?;
109
110 let throttled_sync_progress = (percentage * 10.0) as usize;
111 let is_updated = progress_tracker
112 .sync_progress
113 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev_status| {
114 if prev_status < throttled_sync_progress {
115 Some(throttled_sync_progress)
116 } else {
117 None
118 }
119 })
120 .is_ok();
121
122 if is_updated {
123 event!(
124 Level::INFO,
125 percentage = format!("{:.1}%", percentage),
126 ?block_slot,
127 ?block_hash,
128 label = "Chain synchronization progress"
129 );
130 }
131
132 Some(ChainEvent::SyncProgressEvent {
133 percentage,
134 block_slot,
135 block_hash,
136 })
137 }
138
139 None => Some(ChainEvent::SyncProgressEvent {
140 percentage: 100.0,
141 block_slot: block_rec.slot,
142 block_hash: block_rec.hash,
143 }),
144 }
145 }
146 _ => panic!("absurd: Indexer filter should only allow transaction event variant."),
147 })
148}
149
150impl FromOura<oura::TransactionRecord> for TransactionEventRecord {
151 fn from_oura(tx: oura::TransactionRecord) -> Result<TransactionEventRecord, OuraParseError> {
152 Ok(TransactionEventRecord {
153 hash: TransactionHash::from_oura(tx.hash.clone())?,
154 fee: tx.fee,
155 redeemers: tx
156 .plutus_redeemers
157 .unwrap()
158 .into_iter()
159 .map(|redeemer_record| {
160 let script_purpose = match &redeemer_record.purpose[..] {
162 "spend" => {
163 let inputs = tx.inputs.as_ref().unwrap();
164 let input = inputs
165 .get(redeemer_record.input_idx as usize)
166 .ok_or(OuraParseError::ParseError(anyhow!(
167 "No input found at redeemer index {}",
168 redeemer_record.input_idx
169 )))?
170 .clone();
171
172 let transaction_input = TransactionInput {
173 transaction_id: TransactionHash::from_oura(input.tx_id)?,
174 index: BigInt::from(input.index),
175 };
176 ScriptPurpose::Spending(transaction_input)
177 }
178 "mint" => {
179 let policies = tx
180 .mint
181 .as_ref()
182 .unwrap()
183 .iter()
184 .map(|mint| mint.policy.clone())
185 .sorted()
186 .dedup()
187 .collect::<Vec<_>>();
188 let policy = policies
189 .get(redeemer_record.input_idx as usize)
190 .ok_or(OuraParseError::ParseError(anyhow!(
191 "No mint found at redeemer index {}",
192 redeemer_record.input_idx
193 )))?
194 .clone();
195
196 ScriptPurpose::Minting(CurrencySymbol::from_oura(policy)?)
197 }
198 _ => Err(OuraParseError::ParseError(anyhow!(
201 "Cannot parse redeemer tag variant: {}",
202 redeemer_record.purpose
203 )))?,
204 };
205
206 Ok((
207 script_purpose,
208 Redeemer::from_oura(redeemer_record.plutus_data)?,
209 ))
210 })
211 .collect::<Result<_, OuraParseError>>()?,
212 size: tx.size,
213 inputs: tx
216 .inputs
217 .unwrap()
218 .into_iter()
219 .map(|oura::TxInputRecord { tx_id, index }| {
220 Ok(TransactionInput {
221 transaction_id: TransactionHash::from_oura(tx_id)?,
222 index: BigInt::from(index),
223 })
224 })
225 .collect::<Result<_, OuraParseError>>()?,
226 outputs: tx
227 .outputs
228 .unwrap()
229 .into_iter()
230 .enumerate()
231 .map(
232 |(
233 index,
234 oura::TxOutputRecord {
235 address,
236 amount,
237 assets,
238 datum_hash,
239 inline_datum,
240 reference_script,
241 },
242 )| {
243 let reference = TransactionInput {
244 transaction_id: TransactionHash::from_oura(tx.hash.clone())?,
245 index: index.into(),
246 };
247 let output = TransactionOutput {
248 address: Address::from_oura(address)?,
249 datum: match (datum_hash, inline_datum) {
250 (None, None) => OutputDatum::None,
251 (_, Some(datm)) => {
252 OutputDatum::InlineDatum(Datum::from_oura(datm.plutus_data)?)
253 }
254 (Some(dh), _) => OutputDatum::DatumHash(DatumHash::from_oura(dh)?),
255 },
256 reference_script: reference_script
258 .map(ScriptHash::from_oura)
259 .transpose()?,
260 value: Value::ada_value(&BigInt::from(amount))
261 + Value::from_oura(assets.unwrap_or_default())?,
262 };
263
264 Ok(TxInInfo { reference, output })
265 },
266 )
267 .collect::<Result<_, OuraParseError>>()?,
268 mint: tx.mint.map_or(Ok(Value::new()), Value::from_oura)?,
269 plutus_data: tx
270 .plutus_data
271 .unwrap_or_default()
272 .into_iter()
273 .map(
274 |oura::PlutusDatumRecord {
275 plutus_data,
276 datum_hash,
277 }| {
278 Ok((
279 DatumHash::from_oura(datum_hash)?,
280 Datum::from_oura(plutus_data)?,
281 ))
282 },
283 )
284 .collect::<Result<_, OuraParseError>>()?,
285 })
286 }
287}