1use std::collections::BTreeMap;
2use std::time;
3
4use anyhow::anyhow;
5use chrono::{DateTime, Utc};
6use derive_builder::Builder;
7use jsonrpsee::core::traits::ToRpcParams;
8use jsonrpsee::{
9 core::client::ClientT,
10 rpc_params,
11 ws_client::{WsClient, WsClientBuilder},
12};
13use plutus_ledger_api::csl::pla_to_csl::TryToCSL;
14use plutus_ledger_api::{
15 csl::lib as csl,
16 v3::{
17 address::Address,
18 transaction::{TransactionHash, TransactionInput},
19 },
20};
21use serde::Serialize;
22use tracing::{debug, error, info, warn};
23use tx_bakery::{
24 chain_query::{
25 ChainQuery, ChainQueryError, ChainTip, EraSummary, FullTransactionOutput, Network,
26 ProtocolParameters,
27 },
28 submitter::{Submitter, SubmitterError},
29};
30use url::Url;
31
32use super::{
33 api::{
34 to_redeemer_tag, AcquireMempoolResponse, EvaluateTransactionParams,
35 EvaluateTransactionResponse, OgmiosHealth, OutputReference,
36 QueryLedgerStateEraSummariesResponse, QueryLedgerStateProtocolParametersResponse,
37 QueryLedgerStateTipResponse, QueryLedgerStateUtxoByAddressParams,
38 QueryLedgerStateUtxoByOutputReferenceParams, QueryLedgerStateUtxoResponse,
39 QueryNetworkStartTimeResponse, ReleaseMempoolResponse, SubmitTransactionParams,
40 SubmitTransactionResponse, TransactionCbor, TransactionId,
41 },
42 error::{OgmiosError, Result},
43};
44
45#[derive(Debug, Builder, Clone)]
46#[builder(build_fn(validate = "Self::validate"))]
47pub struct OgmiosClientConfig {
48 #[builder(default = "Url::parse(\"http://127.0.0.1:1337\").unwrap()")]
49 pub url: Url,
50 #[builder(default = "Network::Testnet")]
51 pub network: Network,
52 #[builder(default = "90")]
53 pub startup_timeout: u64,
54}
55
56impl OgmiosClientConfigBuilder {
57 fn validate(&self) -> std::result::Result<(), String> {
58 if let Some(url) = &self.url {
59 match url.scheme() {
60 "http" | "https" => Ok(()),
61 scheme => Err(format!(
62 "Url scheme invalid in OgmiosConfig. Expected https/http, but got {}",
63 scheme,
64 )),
65 }
66 } else {
67 Ok(())
68 }
69 }
70}
71
72impl OgmiosClientConfig {
73 pub fn get_ws_url(&self) -> Url {
74 let mut url = self.url.clone();
75 url.set_scheme("ws").unwrap();
78 url
79 }
80
81 pub fn get_restful_health_url(&self) -> Url {
82 self.url.join("health").unwrap()
84 }
85}
86
87pub struct OgmiosClient {
89 config: OgmiosClientConfig,
90 client: WsClient,
91}
92
93impl OgmiosClient {
94 pub async fn connect(config: OgmiosClientConfig) -> Result<Self> {
95 let giveup_time = chrono::Local::now() + time::Duration::from_secs(config.startup_timeout);
96
97 let base = time::Duration::from_secs(1);
98 let mut attempt = 0;
99 loop {
100 let health = Self::check_health(&config).await;
101 if health
102 .as_ref()
103 .map_or(false, |h| h.network_synchronization == 1.0)
104 {
105 let client = WsClientBuilder::default()
106 .build(&config.get_ws_url())
107 .await?;
108 let client = Self { config, client };
109
110 return Ok(client);
111 } else {
112 if chrono::Local::now() > giveup_time {
113 return match health {
114 Err(err) => Err(OgmiosError::StartupError(anyhow!(
115 "health request failed: {:?}",
116 err
117 ))),
118 Ok(health) => Err(OgmiosError::StartupError(anyhow!(
119 "couldn't sync: {:?}",
120 health
121 ))),
122 };
123 }
124
125 let wait_duration = base
127 .checked_mul(2u32.pow(attempt))
128 .ok_or(OgmiosError::StartupError(anyhow!("cannot wait any longer")))?;
129 tokio::time::sleep(wait_duration).await;
130 attempt += 1;
131 }
132 }
133 }
134
135 pub fn get_config(&self) -> &OgmiosClientConfig {
136 &self.config
137 }
138
139 async fn acquire_mempool(&self) -> Result<u64> {
140 let resp: AcquireMempoolResponse = self.request("acquireMempool", rpc_params![]).await?;
141 Ok(resp.slot)
142 }
143
144 async fn release_mempool(&self) -> Result<ReleaseMempoolResponse> {
145 self.request("releaseMempool", rpc_params![]).await
146 }
147
148 async fn has_transaction(&self, transaction_hash: &TransactionHash) -> Result<bool> {
149 let params = TransactionId::from(transaction_hash);
150 self.request("hasTransaction", params).await
151 }
152
153 async fn request<P, U>(&self, method: &str, params: P) -> Result<U>
157 where
158 U: serde::de::DeserializeOwned + Serialize,
159 P: ToRpcParams + Send,
160 {
161 self.client.request(method, params).await.map_err(|err| {
162 debug!(%err, "Ogmios JSON RPC call error.");
163 OgmiosError::JSONRpcError(err)
164 })
165 }
166
167 pub async fn check_health(config: &OgmiosClientConfig) -> Result<OgmiosHealth> {
168 Ok(reqwest::Client::new()
169 .get(config.get_restful_health_url())
170 .send()
171 .await?
172 .json::<OgmiosHealth>()
173 .await?)
174 }
175}
176
177impl ChainQuery for OgmiosClient {
178 fn get_network(&self) -> Network {
179 self.config.network.clone()
180 }
181
182 async fn query_system_start(&self) -> std::result::Result<DateTime<Utc>, ChainQueryError> {
183 let resp: QueryNetworkStartTimeResponse = self
184 .request("queryNetwork/startTime", rpc_params![])
185 .await?;
186
187 Ok(DateTime::parse_from_rfc3339(&resp)
188 .map_err(|source| OgmiosError::ConversionError {
189 label: "SystemStart datetime".to_string(),
190 source: anyhow!(source),
191 })?
192 .to_utc())
193 }
194
195 async fn query_era_summaries(&self) -> std::result::Result<Vec<EraSummary>, ChainQueryError> {
196 let resp: QueryLedgerStateEraSummariesResponse = self
197 .request("queryLedgerState/eraSummaries", rpc_params![])
198 .await?;
199
200 Ok(resp
201 .into_iter()
202 .map(EraSummary::try_from)
203 .collect::<Result<_>>()?)
204 }
205
206 async fn query_protocol_params(
208 &self,
209 ) -> std::result::Result<ProtocolParameters, ChainQueryError> {
210 let resp: QueryLedgerStateProtocolParametersResponse = self
211 .request("queryLedgerState/protocolParameters", rpc_params![])
212 .await?;
213
214 Ok(resp.try_into()?)
215 }
216
217 async fn query_tip(&self) -> std::result::Result<ChainTip, ChainQueryError> {
219 let resp: QueryLedgerStateTipResponse =
220 self.request("queryLedgerState/tip", rpc_params![]).await?;
221 Ok(resp.into())
222 }
223
224 async fn query_utxos_by_addr(
225 &self,
226 address: &Address,
227 ) -> std::result::Result<BTreeMap<TransactionInput, FullTransactionOutput>, ChainQueryError>
228 {
229 debug!(?address, "Query UTxOs by address");
230 let addr: csl::Address = address
231 .with_extra_info(self.config.network.to_network_id())
232 .try_to_csl()
233 .map_err(OgmiosError::TryFromPLAError)?;
234
235 let addr = addr.to_bech32(Some("addr".to_owned())).map_err(|source| {
236 OgmiosError::ConversionError {
237 label: "Address to Bech32".to_string(),
238 source: anyhow!(source),
239 }
240 })?;
241 let params = QueryLedgerStateUtxoByAddressParams {
242 addresses: vec![addr.to_string()],
243 };
244
245 let resp: QueryLedgerStateUtxoResponse =
246 self.request("queryLedgerState/utxo", params).await?;
247
248 decode_query_ledger_state_utxo_response(resp)
249 }
250
251 async fn query_utxos_by_ref(
252 &self,
253 references: Vec<&TransactionInput>,
254 ) -> std::result::Result<BTreeMap<TransactionInput, FullTransactionOutput>, ChainQueryError>
255 {
256 let output_references = references
257 .into_iter()
258 .cloned()
259 .map(OutputReference::try_from)
260 .collect::<std::result::Result<Vec<_>, _>>()?;
261
262 let params = QueryLedgerStateUtxoByOutputReferenceParams { output_references };
263
264 let resp: QueryLedgerStateUtxoResponse =
265 self.request("queryLedgerState/utxo", params).await?;
266
267 decode_query_ledger_state_utxo_response(resp)
268 }
269}
270
271impl Submitter for OgmiosClient {
272 async fn evaluate_transaction(
274 &self,
275 tx_builder: &csl::TransactionBuilder,
276 plutus_scripts: &[csl::PlutusScript],
277 redeemers: &[csl::Redeemer],
278 ) -> std::result::Result<BTreeMap<(csl::RedeemerTag, csl::BigNum), csl::ExUnits>, SubmitterError>
279 {
280 let mut tx_builder = tx_builder.clone();
281
282 tx_builder.set_fee(&csl::BigNum::from(0u64));
283
284 let mut witness_set = csl::TransactionWitnessSet::new();
285
286 let mut script_witnesses = csl::PlutusScripts::new();
287
288 plutus_scripts
289 .iter()
290 .for_each(|script| script_witnesses.add(script));
291
292 let mut redeemer_witnesses = csl::Redeemers::new();
293
294 redeemers
295 .iter()
296 .for_each(|redeemer| redeemer_witnesses.add(redeemer));
297
298 witness_set.set_plutus_scripts(&script_witnesses);
299 witness_set.set_redeemers(&redeemer_witnesses);
300
301 let tx_body = tx_builder.build().map_err(|err| {
302 error!(%err, "Transaction builder error.");
303 SubmitterError(anyhow::anyhow!("Transaction builder error: {}", err))
304 })?;
305 let tx = csl::Transaction::new(&tx_body, &witness_set, None);
306
307 debug!("Evaluating transaction");
308 let params = EvaluateTransactionParams {
309 transaction: TransactionCbor { cbor: tx.to_hex() },
310 additional_utxo: Vec::new(),
311 };
312
313 let resp: EvaluateTransactionResponse = self.request("evaluateTransaction", params).await?;
314
315 resp.into_iter()
316 .map(|budgets| {
317 Ok((
318 (
319 to_redeemer_tag(&budgets.validator.purpose)?,
320 csl::BigNum::from(budgets.validator.index),
321 ),
322 csl::ExUnits::new(
323 &csl::BigNum::from(budgets.budget.memory),
324 &csl::BigNum::from(budgets.budget.cpu),
325 ),
326 ))
327 })
328 .collect()
329 }
330
331 async fn submit_transaction(
332 &self,
333 tx: &csl::FixedTransaction,
334 ) -> std::result::Result<TransactionHash, SubmitterError> {
335 debug!("Submitting transaction");
336 let params = SubmitTransactionParams {
337 transaction: TransactionCbor { cbor: tx.to_hex() },
338 additional_utxo: Vec::new(),
339 };
340
341 let resp: Result<SubmitTransactionResponse> =
342 self.request("submitTransaction", params).await;
343
344 Ok(resp?.transaction.try_into()?)
345 }
346
347 async fn await_tx_confirm(
348 &self,
349 tx_hash: &TransactionHash,
350 ) -> std::result::Result<(), SubmitterError> {
351 info!(?tx_hash, "Awaiting transaction confirmation.");
352 let do_wait = || async {
353 loop {
354 let _ = self.acquire_mempool().await?;
355
356 let has_tx = self.has_transaction(tx_hash).await?;
357
358 if !has_tx {
359 let _ = self.release_mempool().await?;
360 return Result::Ok(());
361 }
362 }
363 };
364
365 let mut retry_counter = 0;
366
367 while retry_counter < 5 {
368 match do_wait().await {
369 Ok(_) => return Ok(()),
370 Err(err) => warn!(
371 "Unable to confirm transaction {:?}: {}, retrying",
372 tx_hash, err
373 ),
374 }
375
376 retry_counter += 1;
377 }
378
379 Err(SubmitterError(anyhow!(
380 "Unable to confirm transaction {:?}",
381 tx_hash
382 )))
383 }
384}
385
386fn decode_query_ledger_state_utxo_response(
387 resp: QueryLedgerStateUtxoResponse,
388) -> std::result::Result<BTreeMap<TransactionInput, FullTransactionOutput>, ChainQueryError> {
389 resp.iter()
390 .map(|utxo| {
391 Ok((
392 TransactionInput::try_from(utxo).map_err(|source| {
393 OgmiosError::ConversionError {
394 label: "TransactionInput".to_string(),
395 source: anyhow!(source),
396 }
397 })?,
398 FullTransactionOutput::try_from(utxo).map_err(|source| {
399 OgmiosError::ConversionError {
400 label: "TransactionInput".to_string(),
401 source: anyhow!(source),
402 }
403 })?,
404 ))
405 })
406 .collect()
407}