tx_bakery_ogmios/
client.rs

1use std::collections::BTreeMap;
2use std::time;
3
4use anyhow::anyhow;
5use chrono::{DateTime, Utc};
6use derive_builder::Builder;
7use jsonrpsee::core::traits::ToRpcParams;
8use jsonrpsee::{
9    core::client::ClientT,
10    rpc_params,
11    ws_client::{WsClient, WsClientBuilder},
12};
13use plutus_ledger_api::csl::pla_to_csl::TryToCSL;
14use plutus_ledger_api::{
15    csl::lib as csl,
16    v3::{
17        address::Address,
18        transaction::{TransactionHash, TransactionInput},
19    },
20};
21use serde::Serialize;
22use tracing::{debug, error, info, warn};
23use tx_bakery::{
24    chain_query::{
25        ChainQuery, ChainQueryError, ChainTip, EraSummary, FullTransactionOutput, Network,
26        ProtocolParameters,
27    },
28    submitter::{Submitter, SubmitterError},
29};
30use url::Url;
31
32use super::{
33    api::{
34        to_redeemer_tag, AcquireMempoolResponse, EvaluateTransactionParams,
35        EvaluateTransactionResponse, OgmiosHealth, OutputReference,
36        QueryLedgerStateEraSummariesResponse, QueryLedgerStateProtocolParametersResponse,
37        QueryLedgerStateTipResponse, QueryLedgerStateUtxoByAddressParams,
38        QueryLedgerStateUtxoByOutputReferenceParams, QueryLedgerStateUtxoResponse,
39        QueryNetworkStartTimeResponse, ReleaseMempoolResponse, SubmitTransactionParams,
40        SubmitTransactionResponse, TransactionCbor, TransactionId,
41    },
42    error::{OgmiosError, Result},
43};
44
45#[derive(Debug, Builder, Clone)]
46#[builder(build_fn(validate = "Self::validate"))]
47pub struct OgmiosClientConfig {
48    #[builder(default = "Url::parse(\"http://127.0.0.1:1337\").unwrap()")]
49    pub url: Url,
50    #[builder(default = "Network::Testnet")]
51    pub network: Network,
52    #[builder(default = "90")]
53    pub startup_timeout: u64,
54}
55
56impl OgmiosClientConfigBuilder {
57    fn validate(&self) -> std::result::Result<(), String> {
58        if let Some(url) = &self.url {
59            match url.scheme() {
60                "http" | "https" => Ok(()),
61                scheme => Err(format!(
62                    "Url scheme invalid in OgmiosConfig. Expected https/http, but got {}",
63                    scheme,
64                )),
65            }
66        } else {
67            Ok(())
68        }
69    }
70}
71
72impl OgmiosClientConfig {
73    pub fn get_ws_url(&self) -> Url {
74        let mut url = self.url.clone();
75        // JUSTIFICATION: We ensure that the url's scheme is either http or https,
76        // so this should always success.
77        url.set_scheme("ws").unwrap();
78        url
79    }
80
81    pub fn get_restful_health_url(&self) -> Url {
82        // JUSTIFICATION: The base url and the path are always valid.
83        self.url.join("health").unwrap()
84    }
85}
86
87/// Ogmios client for interacting with the blockchain
88pub struct OgmiosClient {
89    config: OgmiosClientConfig,
90    client: WsClient,
91}
92
93impl OgmiosClient {
94    pub async fn connect(config: OgmiosClientConfig) -> Result<Self> {
95        let giveup_time = chrono::Local::now() + time::Duration::from_secs(config.startup_timeout);
96
97        let base = time::Duration::from_secs(1);
98        let mut attempt = 0;
99        loop {
100            let health = Self::check_health(&config).await;
101            if health
102                .as_ref()
103                .map_or(false, |h| h.network_synchronization == 1.0)
104            {
105                let client = WsClientBuilder::default()
106                    .build(&config.get_ws_url())
107                    .await?;
108                let client = Self { config, client };
109
110                return Ok(client);
111            } else {
112                if chrono::Local::now() > giveup_time {
113                    return match health {
114                        Err(err) => Err(OgmiosError::StartupError(anyhow!(
115                            "health request failed: {:?}",
116                            err
117                        ))),
118                        Ok(health) => Err(OgmiosError::StartupError(anyhow!(
119                            "couldn't sync: {:?}",
120                            health
121                        ))),
122                    };
123                }
124
125                // Simple exponential backoff
126                let wait_duration = base
127                    .checked_mul(2u32.pow(attempt))
128                    .ok_or(OgmiosError::StartupError(anyhow!("cannot wait any longer")))?;
129                tokio::time::sleep(wait_duration).await;
130                attempt += 1;
131            }
132        }
133    }
134
135    pub fn get_config(&self) -> &OgmiosClientConfig {
136        &self.config
137    }
138
139    async fn acquire_mempool(&self) -> Result<u64> {
140        let resp: AcquireMempoolResponse = self.request("acquireMempool", rpc_params![]).await?;
141        Ok(resp.slot)
142    }
143
144    async fn release_mempool(&self) -> Result<ReleaseMempoolResponse> {
145        self.request("releaseMempool", rpc_params![]).await
146    }
147
148    async fn has_transaction(&self, transaction_hash: &TransactionHash) -> Result<bool> {
149        let params = TransactionId::from(transaction_hash);
150        self.request("hasTransaction", params).await
151    }
152
153    /// Make a request to ogmios JSON RPC
154    /// Ogmios slightly deviates from the JSON RPC standard, so I couldn't use a 3rd party library
155    /// for this
156    async fn request<P, U>(&self, method: &str, params: P) -> Result<U>
157    where
158        U: serde::de::DeserializeOwned + Serialize,
159        P: ToRpcParams + Send,
160    {
161        self.client.request(method, params).await.map_err(|err| {
162            debug!(%err, "Ogmios JSON RPC call error.");
163            OgmiosError::JSONRpcError(err)
164        })
165    }
166
167    pub async fn check_health(config: &OgmiosClientConfig) -> Result<OgmiosHealth> {
168        Ok(reqwest::Client::new()
169            .get(config.get_restful_health_url())
170            .send()
171            .await?
172            .json::<OgmiosHealth>()
173            .await?)
174    }
175}
176
177impl ChainQuery for OgmiosClient {
178    fn get_network(&self) -> Network {
179        self.config.network.clone()
180    }
181
182    async fn query_system_start(&self) -> std::result::Result<DateTime<Utc>, ChainQueryError> {
183        let resp: QueryNetworkStartTimeResponse = self
184            .request("queryNetwork/startTime", rpc_params![])
185            .await?;
186
187        Ok(DateTime::parse_from_rfc3339(&resp)
188            .map_err(|source| OgmiosError::ConversionError {
189                label: "SystemStart datetime".to_string(),
190                source: anyhow!(source),
191            })?
192            .to_utc())
193    }
194
195    async fn query_era_summaries(&self) -> std::result::Result<Vec<EraSummary>, ChainQueryError> {
196        let resp: QueryLedgerStateEraSummariesResponse = self
197            .request("queryLedgerState/eraSummaries", rpc_params![])
198            .await?;
199
200        Ok(resp
201            .into_iter()
202            .map(EraSummary::try_from)
203            .collect::<Result<_>>()?)
204    }
205
206    /// Query protocol parameters and cost models for all languages
207    async fn query_protocol_params(
208        &self,
209    ) -> std::result::Result<ProtocolParameters, ChainQueryError> {
210        let resp: QueryLedgerStateProtocolParametersResponse = self
211            .request("queryLedgerState/protocolParameters", rpc_params![])
212            .await?;
213
214        Ok(resp.try_into()?)
215    }
216
217    /// Query current last slot of the chain
218    async fn query_tip(&self) -> std::result::Result<ChainTip, ChainQueryError> {
219        let resp: QueryLedgerStateTipResponse =
220            self.request("queryLedgerState/tip", rpc_params![]).await?;
221        Ok(resp.into())
222    }
223
224    async fn query_utxos_by_addr(
225        &self,
226        address: &Address,
227    ) -> std::result::Result<BTreeMap<TransactionInput, FullTransactionOutput>, ChainQueryError>
228    {
229        debug!(?address, "Query UTxOs by address");
230        let addr: csl::Address = address
231            .with_extra_info(self.config.network.to_network_id())
232            .try_to_csl()
233            .map_err(OgmiosError::TryFromPLAError)?;
234
235        let addr = addr.to_bech32(Some("addr".to_owned())).map_err(|source| {
236            OgmiosError::ConversionError {
237                label: "Address to Bech32".to_string(),
238                source: anyhow!(source),
239            }
240        })?;
241        let params = QueryLedgerStateUtxoByAddressParams {
242            addresses: vec![addr.to_string()],
243        };
244
245        let resp: QueryLedgerStateUtxoResponse =
246            self.request("queryLedgerState/utxo", params).await?;
247
248        decode_query_ledger_state_utxo_response(resp)
249    }
250
251    async fn query_utxos_by_ref(
252        &self,
253        references: Vec<&TransactionInput>,
254    ) -> std::result::Result<BTreeMap<TransactionInput, FullTransactionOutput>, ChainQueryError>
255    {
256        let output_references = references
257            .into_iter()
258            .cloned()
259            .map(OutputReference::try_from)
260            .collect::<std::result::Result<Vec<_>, _>>()?;
261
262        let params = QueryLedgerStateUtxoByOutputReferenceParams { output_references };
263
264        let resp: QueryLedgerStateUtxoResponse =
265            self.request("queryLedgerState/utxo", params).await?;
266
267        decode_query_ledger_state_utxo_response(resp)
268    }
269}
270
271impl Submitter for OgmiosClient {
272    /// Evaluate a transaction and return execution budgets for each script
273    async fn evaluate_transaction(
274        &self,
275        tx_builder: &csl::TransactionBuilder,
276        plutus_scripts: &[csl::PlutusScript],
277        redeemers: &[csl::Redeemer],
278    ) -> std::result::Result<BTreeMap<(csl::RedeemerTag, csl::BigNum), csl::ExUnits>, SubmitterError>
279    {
280        let mut tx_builder = tx_builder.clone();
281
282        tx_builder.set_fee(&csl::BigNum::from(0u64));
283
284        let mut witness_set = csl::TransactionWitnessSet::new();
285
286        let mut script_witnesses = csl::PlutusScripts::new();
287
288        plutus_scripts
289            .iter()
290            .for_each(|script| script_witnesses.add(script));
291
292        let mut redeemer_witnesses = csl::Redeemers::new();
293
294        redeemers
295            .iter()
296            .for_each(|redeemer| redeemer_witnesses.add(redeemer));
297
298        witness_set.set_plutus_scripts(&script_witnesses);
299        witness_set.set_redeemers(&redeemer_witnesses);
300
301        let tx_body = tx_builder.build().map_err(|err| {
302            error!(%err, "Transaction builder error.");
303            SubmitterError(anyhow::anyhow!("Transaction builder error: {}", err))
304        })?;
305        let tx = csl::Transaction::new(&tx_body, &witness_set, None);
306
307        debug!("Evaluating transaction");
308        let params = EvaluateTransactionParams {
309            transaction: TransactionCbor { cbor: tx.to_hex() },
310            additional_utxo: Vec::new(),
311        };
312
313        let resp: EvaluateTransactionResponse = self.request("evaluateTransaction", params).await?;
314
315        resp.into_iter()
316            .map(|budgets| {
317                Ok((
318                    (
319                        to_redeemer_tag(&budgets.validator.purpose)?,
320                        csl::BigNum::from(budgets.validator.index),
321                    ),
322                    csl::ExUnits::new(
323                        &csl::BigNum::from(budgets.budget.memory),
324                        &csl::BigNum::from(budgets.budget.cpu),
325                    ),
326                ))
327            })
328            .collect()
329    }
330
331    async fn submit_transaction(
332        &self,
333        tx: &csl::FixedTransaction,
334    ) -> std::result::Result<TransactionHash, SubmitterError> {
335        debug!("Submitting transaction");
336        let params = SubmitTransactionParams {
337            transaction: TransactionCbor { cbor: tx.to_hex() },
338            additional_utxo: Vec::new(),
339        };
340
341        let resp: Result<SubmitTransactionResponse> =
342            self.request("submitTransaction", params).await;
343
344        Ok(resp?.transaction.try_into()?)
345    }
346
347    async fn await_tx_confirm(
348        &self,
349        tx_hash: &TransactionHash,
350    ) -> std::result::Result<(), SubmitterError> {
351        info!(?tx_hash, "Awaiting transaction confirmation.");
352        let do_wait = || async {
353            loop {
354                let _ = self.acquire_mempool().await?;
355
356                let has_tx = self.has_transaction(tx_hash).await?;
357
358                if !has_tx {
359                    let _ = self.release_mempool().await?;
360                    return Result::Ok(());
361                }
362            }
363        };
364
365        let mut retry_counter = 0;
366
367        while retry_counter < 5 {
368            match do_wait().await {
369                Ok(_) => return Ok(()),
370                Err(err) => warn!(
371                    "Unable to confirm transaction {:?}: {}, retrying",
372                    tx_hash, err
373                ),
374            }
375
376            retry_counter += 1;
377        }
378
379        Err(SubmitterError(anyhow!(
380            "Unable to confirm transaction {:?}",
381            tx_hash
382        )))
383    }
384}
385
386fn decode_query_ledger_state_utxo_response(
387    resp: QueryLedgerStateUtxoResponse,
388) -> std::result::Result<BTreeMap<TransactionInput, FullTransactionOutput>, ChainQueryError> {
389    resp.iter()
390        .map(|utxo| {
391            Ok((
392                TransactionInput::try_from(utxo).map_err(|source| {
393                    OgmiosError::ConversionError {
394                        label: "TransactionInput".to_string(),
395                        source: anyhow!(source),
396                    }
397                })?,
398                FullTransactionOutput::try_from(utxo).map_err(|source| {
399                    OgmiosError::ConversionError {
400                        label: "TransactionInput".to_string(),
401                        source: anyhow!(source),
402                    }
403                })?,
404            ))
405        })
406        .collect()
407}