diff --git a/src/actors/account/messages.rs b/src/actors/account/messages.rs index 0ce7f50..888f4db 100644 --- a/src/actors/account/messages.rs +++ b/src/actors/account/messages.rs @@ -1,4 +1,8 @@ -use crate::{models::{account::Account, responder::Responder, transaction::{Transaction, TransactionType}}}; +use crate::models::{ + account::Account, + transaction::{Transaction, TransactionType}, +}; +use tokio::sync::oneshot::Sender as Responder; #[derive(Debug)] pub enum Command { @@ -7,17 +11,17 @@ pub enum Command { Dispute(u32), Resolve(u32), Chargeback(u32), - Stop(Responder) + Stop(Responder), } impl From for Command { fn from(tx: Transaction) -> Self { match tx.ty { TransactionType::Chargeback => Command::Chargeback(tx.tx), - TransactionType::Deposit => Command::Deposit(tx.tx, tx.amount.unwrap()), - TransactionType::Withdrawal => Command::Withdraw(tx.tx, tx.amount.unwrap()), + TransactionType::Deposit => Command::Deposit(tx.tx, tx.amount.unwrap_or_default()), + TransactionType::Withdrawal => Command::Withdraw(tx.tx, tx.amount.unwrap_or_default()), TransactionType::Dispute => Command::Dispute(tx.tx), - TransactionType::Resolve => Command::Resolve(tx.tx) + TransactionType::Resolve => Command::Resolve(tx.tx), } } } diff --git a/src/actors/tx_processor/messages.rs b/src/actors/tx_processor/messages.rs index e21edb5..917e57b 100644 --- a/src/actors/tx_processor/messages.rs +++ b/src/actors/tx_processor/messages.rs @@ -1,7 +1,8 @@ -use crate::models::{account::Account, responder::Responder, transaction::Transaction}; +use crate::models::{account::Account, transaction::Transaction}; +use tokio::sync::oneshot::Sender as Responder; #[derive(Debug)] pub enum Command { SendTx(Transaction), - Stop(Responder>) -} \ No newline at end of file + Stop(Responder>), +} diff --git a/src/main.rs b/src/main.rs index 5ad2593..8b0896e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,44 +1,53 @@ #[macro_use] extern crate async_trait; +mod actors; mod errors; mod models; -mod actors; +use actors::{ + run_async_actor, + tx_processor::{actor::TxProcessor, messages::Command}, +}; use anyhow::Result; -use csv_async::DeserializeRecordsStream; -use models::transaction::TransactionType; +use csv_async::AsyncDeserializer; +use models::{account::Account, transaction::Transaction}; use std::env; -use tokio::{fs::File, sync::{mpsc::Sender, oneshot}}; +use tokio::{ + fs::File, + sync::{mpsc::Sender, oneshot}, +}; use tokio_stream::StreamExt; -use crate::{models::{account::Account, transaction::Transaction}}; -use actors::{run_async_actor, tx_processor::{actor::TxProcessor, messages::Command}}; #[tokio::main] async fn main() -> Result<()> { - let file_path = env::args().nth(1).expect("CSV path required"); - let mut rdr = csv_async::AsyncDeserializer::from_reader(File::open(file_path).await?); - let records = rdr.deserialize::(); - _main(records).await?; - Ok(()) + let path: String = env::args().nth(1).expect("CSV path required"); + let rdr: AsyncDeserializer = csv_deserializer(&path).await?; + let addr: Sender = run_async_actor(TxProcessor::new()); + process_transactions(&addr, rdr).await?; + let accounts: Vec = stop_processor(&addr).await?; + Ok(display_accounts(&accounts)) } -async fn _main(mut records: DeserializeRecordsStream<'_, File, Transaction>) -> Result<()> { - let tx_processor_addr = run_async_actor(TxProcessor::new()); - +async fn process_transactions( + addr: &Sender, + mut rdr: AsyncDeserializer, +) -> Result<()> { + let mut records = rdr.deserialize::(); while let Some(record) = records.next().await { let transaction = record?; - if (transaction.ty == TransactionType::Withdrawal || transaction.ty == TransactionType::Deposit) && transaction.amount.is_none() { - panic!("Transaction {} does not have a valid amount", transaction.tx) - } - tx_processor_addr.send(Command::SendTx(transaction)).await?; + addr.send(Command::SendTx(transaction)).await?; } - let accounts = send_stop(&tx_processor_addr).await?; - display_accounts(&accounts); Ok(()) } -async fn send_stop(processor: &Sender) -> Result> { +async fn csv_deserializer(path: &str) -> Result> { + Ok(csv_async::AsyncDeserializer::from_reader( + File::open(path).await?, + )) +} + +async fn stop_processor(processor: &Sender) -> Result> { let (resp_tx, resp_rx) = oneshot::channel(); processor.send(Command::Stop(resp_tx)).await?; Ok(resp_rx.await?) @@ -48,12 +57,12 @@ fn display_accounts(accounts: &[Account]) { println!("client,available,held,total,locked"); for account in accounts.iter() { println!( - "{},{},{},{},{}", - account.get_client(), - account.get_available(), + "{},{},{},{},{}", + account.get_client(), + account.get_available(), account.get_held(), account.get_total(), account.get_locked() - ) + ) } } diff --git a/src/models/mod.rs b/src/models/mod.rs index 2f10b01..42d18bf 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,4 +1,3 @@ pub mod account; +pub mod actor; pub mod transaction; -pub mod responder; -pub mod actor; \ No newline at end of file diff --git a/src/models/responder.rs b/src/models/responder.rs deleted file mode 100644 index c14b59e..0000000 --- a/src/models/responder.rs +++ /dev/null @@ -1,3 +0,0 @@ -use tokio::sync::oneshot; - -pub type Responder = oneshot::Sender; diff --git a/src/models/transaction.rs b/src/models/transaction.rs index 41633cc..96b63ee 100644 --- a/src/models/transaction.rs +++ b/src/models/transaction.rs @@ -22,4 +22,3 @@ pub struct Transaction { pub tx: u32, pub amount: Option, } -