tx_indexer/handler/
chain_event.rs

1use crate::{
2    from_oura::{FromOura, OuraParseError},
3    progress_tracker::ProgressTracker,
4};
5use anyhow::anyhow;
6use itertools::Itertools;
7use num_bigint::BigInt;
8use oura::model as oura;
9use plutus_ledger_api::v3::{
10    address::Address,
11    datum::{Datum, DatumHash, OutputDatum},
12    redeemer::Redeemer,
13    script::ScriptHash,
14    transaction::{ScriptPurpose, TransactionHash, TransactionInput, TransactionOutput, TxInInfo},
15    value::{CurrencySymbol, Value},
16};
17use serde_with::serde_as;
18use std::fmt::Debug;
19use std::{collections::BTreeMap, sync::atomic::Ordering};
20use tracing::{event, Level};
21
22/// Indication of when an event happened in the context of the chain.
23#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
24pub struct ChainEventTime {
25    pub block_number: u64,
26    pub block_hash: String,
27    pub slot: u64,
28}
29
30/// Chain events that the indexer is configured to produce.
31#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
32pub enum ChainEvent {
33    /// A filtered transaction was confirmed
34    TransactionEvent {
35        time: ChainEventTime,
36        transaction: TransactionEventRecord,
37    },
38
39    /// Rollback event occurred
40    RollbackEvent { block_slot: u64, block_hash: String },
41
42    /// Chain synchronisation progressed
43    SyncProgressEvent {
44        block_slot: u64,
45        block_hash: String,
46        percentage: f32,
47    },
48}
49
50/// Details on an transaction event (excluding unnecessary information).
51#[serde_as]
52#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
53pub struct TransactionEventRecord {
54    pub hash: TransactionHash,
55    pub fee: u64,
56    pub size: u32,
57
58    pub inputs: Vec<TransactionInput>,
59    pub outputs: Vec<TxInInfo>,
60    pub mint: Value,
61    #[serde_as(as = "Vec<(_, _)>")]
62    pub redeemers: BTreeMap<ScriptPurpose, Redeemer>,
63
64    #[serde_as(as = "Vec<(_, _)>")]
65    pub plutus_data: BTreeMap<DatumHash, Datum>,
66    // TODO(chase): Which of these would be realistically be interested in?
67    // pub vkey_witnesses: Option<Vec<VKeyWitnessRecord>>,
68    // pub native_witnesses: Option<Vec<NativeWitnessRecord>>,
69    // pub plutus_witnesses: Option<Vec<PlutusWitnessRecord>>,
70    // pub plutus_redeemers: Option<Vec<PlutusRedeemerRecord>>,
71}
72
73pub fn parse_oura_event(
74    ev: oura::Event,
75    progress_tracker: &mut Option<ProgressTracker>,
76) -> Result<Option<ChainEvent>, OuraParseError> {
77    Ok(match ev.data {
78        oura::EventData::Transaction(tx_rec) => {
79            event!(Level::DEBUG, label="TransactionEvent", transaction_record=?tx_rec);
80
81            Some(ChainEvent::TransactionEvent {
82                time: ChainEventTime {
83                    // These unwraps should not fail.
84                    block_hash: ev.context.block_hash.unwrap(),
85                    block_number: ev.context.block_number.unwrap(),
86                    slot: ev.context.slot.unwrap(),
87                },
88                transaction: TransactionEventRecord::from_oura(tx_rec)?,
89            })
90        }
91        oura::EventData::RollBack {
92            block_slot,
93            block_hash,
94        } => {
95            event!(Level::DEBUG, label="RollbackEvent", block_slot=?block_slot, block_hash=?block_hash);
96            Some(ChainEvent::RollbackEvent {
97                block_slot,
98                block_hash,
99            })
100        }
101        oura::EventData::Block(block_rec) => {
102            event!(Level::DEBUG, label="BlockEvent", block_record=?block_rec);
103            match progress_tracker {
104                Some(progress_tracker) => {
105                    let block_slot = block_rec.slot;
106                    let block_hash = block_rec.hash;
107
108                    let percentage = progress_tracker.get_percentage(block_slot)?;
109
110                    let throttled_sync_progress = (percentage * 10.0) as usize;
111                    let is_updated = progress_tracker
112                        .sync_progress
113                        .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev_status| {
114                            if prev_status < throttled_sync_progress {
115                                Some(throttled_sync_progress)
116                            } else {
117                                None
118                            }
119                        })
120                        .is_ok();
121
122                    if is_updated {
123                        event!(
124                            Level::INFO,
125                            percentage = format!("{:.1}%", percentage),
126                            ?block_slot,
127                            ?block_hash,
128                            label = "Chain synchronization progress"
129                        );
130                    }
131
132                    Some(ChainEvent::SyncProgressEvent {
133                        percentage,
134                        block_slot,
135                        block_hash,
136                    })
137                }
138
139                None => Some(ChainEvent::SyncProgressEvent {
140                    percentage: 100.0,
141                    block_slot: block_rec.slot,
142                    block_hash: block_rec.hash,
143                }),
144            }
145        }
146        _ => panic!("absurd: Indexer filter should only allow transaction event variant."),
147    })
148}
149
150impl FromOura<oura::TransactionRecord> for TransactionEventRecord {
151    fn from_oura(tx: oura::TransactionRecord) -> Result<TransactionEventRecord, OuraParseError> {
152        Ok(TransactionEventRecord {
153            hash: TransactionHash::from_oura(tx.hash.clone())?,
154            fee: tx.fee,
155            redeemers: tx
156                .plutus_redeemers
157                .unwrap()
158                .into_iter()
159                .map(|redeemer_record| {
160                    // TODO(szg251): parse other redeemer tags
161                    let script_purpose = match &redeemer_record.purpose[..] {
162                        "spend" => {
163                            let inputs = tx.inputs.as_ref().unwrap();
164                            let input = inputs
165                                .get(redeemer_record.input_idx as usize)
166                                .ok_or(OuraParseError::ParseError(anyhow!(
167                                    "No input found at redeemer index {}",
168                                    redeemer_record.input_idx
169                                )))?
170                                .clone();
171
172                            let transaction_input = TransactionInput {
173                                transaction_id: TransactionHash::from_oura(input.tx_id)?,
174                                index: BigInt::from(input.index),
175                            };
176                            ScriptPurpose::Spending(transaction_input)
177                        }
178                        "mint" => {
179                            let policies = tx
180                                .mint
181                                .as_ref()
182                                .unwrap()
183                                .iter()
184                                .map(|mint| mint.policy.clone())
185                                .sorted()
186                                .dedup()
187                                .collect::<Vec<_>>();
188                            let policy = policies
189                                .get(redeemer_record.input_idx as usize)
190                                .ok_or(OuraParseError::ParseError(anyhow!(
191                                    "No mint found at redeemer index {}",
192                                    redeemer_record.input_idx
193                                )))?
194                                .clone();
195
196                            ScriptPurpose::Minting(CurrencySymbol::from_oura(policy)?)
197                        }
198                        // "cert" => ScriptPurpose::Certifying (),
199                        // "reward" => ScriptPurpose::Rewarding(0)
200                        _ => Err(OuraParseError::ParseError(anyhow!(
201                            "Cannot parse redeemer tag variant: {}",
202                            redeemer_record.purpose
203                        )))?,
204                    };
205
206                    Ok((
207                        script_purpose,
208                        Redeemer::from_oura(redeemer_record.plutus_data)?,
209                    ))
210                })
211                .collect::<Result<_, OuraParseError>>()?,
212            size: tx.size,
213            // All these unwraps should succeed since we enable `include_transaction_details`
214            // in the mapper config.
215            inputs: tx
216                .inputs
217                .unwrap()
218                .into_iter()
219                .map(|oura::TxInputRecord { tx_id, index }| {
220                    Ok(TransactionInput {
221                        transaction_id: TransactionHash::from_oura(tx_id)?,
222                        index: BigInt::from(index),
223                    })
224                })
225                .collect::<Result<_, OuraParseError>>()?,
226            outputs: tx
227                .outputs
228                .unwrap()
229                .into_iter()
230                .enumerate()
231                .map(
232                    |(
233                        index,
234                        oura::TxOutputRecord {
235                            address,
236                            amount,
237                            assets,
238                            datum_hash,
239                            inline_datum,
240                            reference_script,
241                        },
242                    )| {
243                        let reference = TransactionInput {
244                            transaction_id: TransactionHash::from_oura(tx.hash.clone())?,
245                            index: index.into(),
246                        };
247                        let output = TransactionOutput {
248                            address: Address::from_oura(address)?,
249                            datum: match (datum_hash, inline_datum) {
250                                (None, None) => OutputDatum::None,
251                                (_, Some(datm)) => {
252                                    OutputDatum::InlineDatum(Datum::from_oura(datm.plutus_data)?)
253                                }
254                                (Some(dh), _) => OutputDatum::DatumHash(DatumHash::from_oura(dh)?),
255                            },
256                            // NOTE(chase): There is currently no way to know about reference scripts with Oura.
257                            reference_script: reference_script
258                                .map(ScriptHash::from_oura)
259                                .transpose()?,
260                            value: Value::ada_value(&BigInt::from(amount))
261                                + Value::from_oura(assets.unwrap_or_default())?,
262                        };
263
264                        Ok(TxInInfo { reference, output })
265                    },
266                )
267                .collect::<Result<_, OuraParseError>>()?,
268            mint: tx.mint.map_or(Ok(Value::new()), Value::from_oura)?,
269            plutus_data: tx
270                .plutus_data
271                .unwrap_or_default()
272                .into_iter()
273                .map(
274                    |oura::PlutusDatumRecord {
275                         plutus_data,
276                         datum_hash,
277                     }| {
278                        Ok((
279                            DatumHash::from_oura(datum_hash)?,
280                            Datum::from_oura(plutus_data)?,
281                        ))
282                    },
283                )
284                .collect::<Result<_, OuraParseError>>()?,
285        })
286    }
287}