mirror of
https://codeberg.org/JasterV/transactions-processor.git
synced 2026-04-26 18:10:06 +00:00
code cleaning
This commit is contained in:
parent
d4c06cd18e
commit
6eae8e97f5
6 changed files with 48 additions and 39 deletions
|
|
@ -1,4 +1,8 @@
|
|||
use crate::{models::{account::Account, responder::Responder, transaction::{Transaction, TransactionType}}};
|
||||
use crate::models::{
|
||||
account::Account,
|
||||
transaction::{Transaction, TransactionType},
|
||||
};
|
||||
use tokio::sync::oneshot::Sender as Responder;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Command {
|
||||
|
|
@ -7,17 +11,17 @@ pub enum Command {
|
|||
Dispute(u32),
|
||||
Resolve(u32),
|
||||
Chargeback(u32),
|
||||
Stop(Responder<Account>)
|
||||
Stop(Responder<Account>),
|
||||
}
|
||||
|
||||
impl From<Transaction> for Command {
|
||||
fn from(tx: Transaction) -> Self {
|
||||
match tx.ty {
|
||||
TransactionType::Chargeback => Command::Chargeback(tx.tx),
|
||||
TransactionType::Deposit => Command::Deposit(tx.tx, tx.amount.unwrap()),
|
||||
TransactionType::Withdrawal => Command::Withdraw(tx.tx, tx.amount.unwrap()),
|
||||
TransactionType::Deposit => Command::Deposit(tx.tx, tx.amount.unwrap_or_default()),
|
||||
TransactionType::Withdrawal => Command::Withdraw(tx.tx, tx.amount.unwrap_or_default()),
|
||||
TransactionType::Dispute => Command::Dispute(tx.tx),
|
||||
TransactionType::Resolve => Command::Resolve(tx.tx)
|
||||
TransactionType::Resolve => Command::Resolve(tx.tx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
use crate::models::{account::Account, responder::Responder, transaction::Transaction};
|
||||
use crate::models::{account::Account, transaction::Transaction};
|
||||
use tokio::sync::oneshot::Sender as Responder;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Command {
|
||||
SendTx(Transaction),
|
||||
Stop(Responder<Vec<Account>>)
|
||||
}
|
||||
Stop(Responder<Vec<Account>>),
|
||||
}
|
||||
|
|
|
|||
59
src/main.rs
59
src/main.rs
|
|
@ -1,44 +1,53 @@
|
|||
#[macro_use]
|
||||
extern crate async_trait;
|
||||
|
||||
mod actors;
|
||||
mod errors;
|
||||
mod models;
|
||||
mod actors;
|
||||
|
||||
use actors::{
|
||||
run_async_actor,
|
||||
tx_processor::{actor::TxProcessor, messages::Command},
|
||||
};
|
||||
use anyhow::Result;
|
||||
use csv_async::DeserializeRecordsStream;
|
||||
use models::transaction::TransactionType;
|
||||
use csv_async::AsyncDeserializer;
|
||||
use models::{account::Account, transaction::Transaction};
|
||||
use std::env;
|
||||
use tokio::{fs::File, sync::{mpsc::Sender, oneshot}};
|
||||
use tokio::{
|
||||
fs::File,
|
||||
sync::{mpsc::Sender, oneshot},
|
||||
};
|
||||
use tokio_stream::StreamExt;
|
||||
use crate::{models::{account::Account, transaction::Transaction}};
|
||||
use actors::{run_async_actor, tx_processor::{actor::TxProcessor, messages::Command}};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let file_path = env::args().nth(1).expect("CSV path required");
|
||||
let mut rdr = csv_async::AsyncDeserializer::from_reader(File::open(file_path).await?);
|
||||
let records = rdr.deserialize::<Transaction>();
|
||||
_main(records).await?;
|
||||
Ok(())
|
||||
let path: String = env::args().nth(1).expect("CSV path required");
|
||||
let rdr: AsyncDeserializer<File> = csv_deserializer(&path).await?;
|
||||
let addr: Sender<Command> = run_async_actor(TxProcessor::new());
|
||||
process_transactions(&addr, rdr).await?;
|
||||
let accounts: Vec<Account> = stop_processor(&addr).await?;
|
||||
Ok(display_accounts(&accounts))
|
||||
}
|
||||
|
||||
async fn _main(mut records: DeserializeRecordsStream<'_, File, Transaction>) -> Result<()> {
|
||||
let tx_processor_addr = run_async_actor(TxProcessor::new());
|
||||
|
||||
async fn process_transactions(
|
||||
addr: &Sender<Command>,
|
||||
mut rdr: AsyncDeserializer<File>,
|
||||
) -> Result<()> {
|
||||
let mut records = rdr.deserialize::<Transaction>();
|
||||
while let Some(record) = records.next().await {
|
||||
let transaction = record?;
|
||||
if (transaction.ty == TransactionType::Withdrawal || transaction.ty == TransactionType::Deposit) && transaction.amount.is_none() {
|
||||
panic!("Transaction {} does not have a valid amount", transaction.tx)
|
||||
}
|
||||
tx_processor_addr.send(Command::SendTx(transaction)).await?;
|
||||
addr.send(Command::SendTx(transaction)).await?;
|
||||
}
|
||||
let accounts = send_stop(&tx_processor_addr).await?;
|
||||
display_accounts(&accounts);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_stop(processor: &Sender<Command>) -> Result<Vec<Account>> {
|
||||
async fn csv_deserializer(path: &str) -> Result<AsyncDeserializer<File>> {
|
||||
Ok(csv_async::AsyncDeserializer::from_reader(
|
||||
File::open(path).await?,
|
||||
))
|
||||
}
|
||||
|
||||
async fn stop_processor(processor: &Sender<Command>) -> Result<Vec<Account>> {
|
||||
let (resp_tx, resp_rx) = oneshot::channel();
|
||||
processor.send(Command::Stop(resp_tx)).await?;
|
||||
Ok(resp_rx.await?)
|
||||
|
|
@ -48,12 +57,12 @@ fn display_accounts(accounts: &[Account]) {
|
|||
println!("client,available,held,total,locked");
|
||||
for account in accounts.iter() {
|
||||
println!(
|
||||
"{},{},{},{},{}",
|
||||
account.get_client(),
|
||||
account.get_available(),
|
||||
"{},{},{},{},{}",
|
||||
account.get_client(),
|
||||
account.get_available(),
|
||||
account.get_held(),
|
||||
account.get_total(),
|
||||
account.get_locked()
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
pub mod account;
|
||||
pub mod actor;
|
||||
pub mod transaction;
|
||||
pub mod responder;
|
||||
pub mod actor;
|
||||
|
|
@ -1,3 +0,0 @@
|
|||
use tokio::sync::oneshot;
|
||||
|
||||
pub type Responder<T> = oneshot::Sender<T>;
|
||||
|
|
@ -22,4 +22,3 @@ pub struct Transaction {
|
|||
pub tx: u32,
|
||||
pub amount: Option<f32>,
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue