diff --git a/Cargo.lock b/Cargo.lock index 63dcf27..19b6a06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "595d3cfa7a60d4555cb5067b99f07142a08ea778de5cf993f7b75c7d8fabc486" +[[package]] +name = "async-trait" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -484,6 +495,7 @@ name = "txcmp" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "csv-async", "serde", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 90c2553..11dd5c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,4 +11,5 @@ tokio-stream = "0.1.7" serde = { version = "1.0", features = ["derive"] } csv-async = { version = "1.2.1", features = ["with_serde", "tokio"]} thiserror = "1.0.26" -anyhow = "1.0.40" \ No newline at end of file +anyhow = "1.0.40" +async-trait = "0.1.51" \ No newline at end of file diff --git a/src/actors/account/actor.rs b/src/actors/account/actor.rs index 5281b3b..6ad317e 100644 --- a/src/actors/account/actor.rs +++ b/src/actors/account/actor.rs @@ -1,10 +1,18 @@ -use crate::{models::{account::Account, actor::Actor, transaction::Transaction}}; +use anyhow::Result; + +use crate::{errors::AccountError, models::{account::Account, actor::Actor, transaction::TransactionType}}; use std::{collections::HashMap}; use crate::actors::account::messages::Command; +struct TransactionData { + pub ty: TransactionType, + pub amount: f32, + pub disputed: bool +} + pub struct AccountActor { account: Account, - transactions: HashMap, + transactions: HashMap, } impl AccountActor { @@ -14,13 +22,87 @@ impl AccountActor { transactions: HashMap::new(), } } + + fn withdraw(&mut self, tx_id: u32, amount: f32) -> Result<()> { + self.account.withdraw(amount)?; + self.transactions.insert(tx_id, TransactionData { + ty: TransactionType::Withdrawal, + amount, + disputed: false + }); + Ok(()) + } + + fn deposit(&mut self, tx_id: u32, amount: f32) -> Result<()> { + self.account.deposit(amount)?; + self.transactions.insert(tx_id, TransactionData { + ty: TransactionType::Deposit, + amount, + disputed: false + }); + Ok(()) + } + + fn dispute(&mut self, tx_id: u32) -> Result<()> { + let tx = self.transactions + .get_mut(&tx_id) + .ok_or(AccountError::TxNotFound( + tx_id, + self.account.get_client() + ))?; + if tx.ty == TransactionType::Deposit && !tx.disputed { + self.account.held(tx.amount)?; + tx.disputed = true; + } + Ok(()) + } + + fn resolve(&mut self, tx_id: u32) -> Result<()> { + let tx = self.transactions + .get_mut(&tx_id) + .ok_or(AccountError::TxNotFound( + tx_id, + self.account.get_client() + ))?; + if tx.disputed { + self.account.free(tx.amount)?; + tx.disputed = false; + } + Ok(()) + } + + fn chargeback(&mut self, tx_id: u32) -> Result<()> { + let tx = self.transactions + .get_mut(&tx_id) + .ok_or(AccountError::TxNotFound( + tx_id, + self.account.get_client() + ))?; + if tx.disputed { + self + .account + .chargeback(tx.amount)?; + tx.disputed = false; + } + Ok(()) + } } impl Actor for AccountActor { - fn handle(&mut self, command: Command) { - println!("command: {:#?}", command); - if let Command::Stop(rx) = command { - let _ = rx.send(self.account.clone()); - } + type Output = (); + + fn handle(&mut self, command: Command) -> Result { + let _ = match command { + Command::Withdraw(tx_id, amount) => self.withdraw(tx_id, amount), + Command::Deposit(tx_id, amount) => self.deposit(tx_id, amount), + Command::Dispute(tx_id) => self.dispute(tx_id), + Command::Resolve(tx_id) => self.resolve(tx_id), + Command::Chargeback(tx_id) => self.chargeback(tx_id), + Command::Stop(rx) => { + let _ = rx.send(self.account.clone()); + Ok(()) + }, + }; + Ok(()) } } \ No newline at end of file diff --git a/src/actors/account/messages.rs b/src/actors/account/messages.rs index 6461e1b..93df3af 100644 --- a/src/actors/account/messages.rs +++ b/src/actors/account/messages.rs @@ -2,8 +2,8 @@ use crate::{models::{account::Account, responder::Responder, transaction::{Trans #[derive(Debug)] pub enum Command { - Withdraw(f32), - Deposit(f32), + Withdraw(u32, f32), + Deposit(u32, f32), Dispute(u32), Resolve(u32), Chargeback(u32), @@ -14,8 +14,8 @@ impl From for Command { fn from(tx: Transaction) -> Self { match tx.ty { TransactionType::Chargeback => Command::Chargeback(tx.tx), - TransactionType::Deposit => Command::Deposit(tx.amount), - TransactionType::Withdrawal => Command::Withdraw(tx.amount), + TransactionType::Deposit => Command::Deposit(tx.tx, tx.amount), + TransactionType::Withdrawal => Command::Withdraw(tx.tx, tx.amount), TransactionType::Dispute => Command::Dispute(tx.tx), TransactionType::Resolve => Command::Resolve(tx.tx) } diff --git a/src/actors/mod.rs b/src/actors/mod.rs index 84134f2..e12a179 100644 --- a/src/actors/mod.rs +++ b/src/actors/mod.rs @@ -1,6 +1,6 @@ use tokio::sync::mpsc::{self, Sender}; -use crate::models::actor::Actor; +use crate::models::actor::{Actor, AsyncActor}; pub mod account; pub mod tx_processor; @@ -12,7 +12,23 @@ where { let (tx, mut rx) = mpsc::channel(32); tokio::spawn(async move { - while let Some(msg) = rx.recv().await { actor.handle(msg) } + while let Some(msg) = rx.recv().await { + let _ = actor.handle(msg); + } + }); + tx +} + +pub fn run_async_actor(mut actor: E) -> Sender +where + T: 'static + Send, + E: 'static + AsyncActor + Send +{ + let (tx, mut rx) = mpsc::channel(32); + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + let _ = actor.handle(msg).await; + } }); tx } \ No newline at end of file diff --git a/src/actors/tx_processor/actor.rs b/src/actors/tx_processor/actor.rs index 7468b8c..7621073 100644 --- a/src/actors/tx_processor/actor.rs +++ b/src/actors/tx_processor/actor.rs @@ -1,12 +1,57 @@ +use crate::{actors::{account::{actor::AccountActor, messages::Command as AccountCommand}, run_actor}, models::{account::Account, actor::AsyncActor, transaction::Transaction}}; +use tokio::sync::{mpsc::Sender, oneshot}; use std::collections::HashMap; -use tokio::sync::mpsc::Sender; -use crate::actors::account::messages::Command as AccountCommand; use super::messages::Command; +use anyhow::Result; pub struct TxProcessor { accounts: HashMap> } impl TxProcessor { - pub fn handle(&mut self, command: Command) {} + + pub fn new() -> Self { + Self { accounts: HashMap::new() } + } + + async fn send_tx(&mut self, transaction: Transaction) -> Result<()> { + if self.accounts.contains_key(&transaction.client) { + self.accounts.get(&transaction.client).unwrap().send(transaction.into()).await?; + } else { + let actor = AccountActor::new(transaction.client); + let addr = run_actor(actor); + self.accounts.insert(transaction.client, addr.clone()); + addr.send(transaction.into()).await?; + } + Ok(()) + } + + async fn stop_actors(&self) -> Result> { + let mut accounts = vec![]; + for addr in self.accounts.values() { + let (resp_tx, resp_rx) = oneshot::channel(); + addr.send(AccountCommand::Stop(resp_tx)).await?; + let account = resp_rx.await; + accounts.push(account.expect("Error fetching account from actor")); + } + Ok(accounts) + } +} + +#[async_trait] +impl AsyncActor for TxProcessor { + type Output = (); + + async fn handle(&mut self, command: Command) -> Result { + match command { + Command::SendTx(transaction ) => { + self.send_tx(transaction).await?; + }, + Command::Stop(responder) => { + let accounts = self.stop_actors().await?; + let _ = responder.send(accounts); + } + } + Ok(()) + } } \ No newline at end of file diff --git a/src/actors/tx_processor/messages.rs b/src/actors/tx_processor/messages.rs index 3caa3ff..e21edb5 100644 --- a/src/actors/tx_processor/messages.rs +++ b/src/actors/tx_processor/messages.rs @@ -2,9 +2,6 @@ use crate::models::{account::Account, responder::Responder, transaction::Transac #[derive(Debug)] pub enum Command { - SendTx { - client: u16, - transaction: Transaction - }, + SendTx(Transaction), Stop(Responder>) } \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs index f774b51..08849c3 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -11,3 +11,9 @@ pub enum TransactionError { #[error("Account `{0}` is locked")] AccountLocked(u16), } + +#[derive(Error, Debug)] +pub enum AccountError { + #[error("Transaction `{0}` not found for client {1}")] + TxNotFound(u32, u16) +} diff --git a/src/main.rs b/src/main.rs index 44a1623..d5568d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,60 +1,47 @@ +#[macro_use] +extern crate async_trait; + mod errors; mod models; mod actors; use anyhow::Result; use csv_async::DeserializeRecordsStream; -use std::{collections::HashMap, env}; +use std::env; use tokio::{fs::File, sync::{mpsc::Sender, oneshot}}; use tokio_stream::StreamExt; use crate::{models::{account::Account, transaction::Transaction}}; -use actors::{account::messages::Command, account::{actor::AccountActor}, run_actor}; - -type AccountActors = HashMap>; +use actors::{run_async_actor, tx_processor::{actor::TxProcessor, messages::Command}}; #[tokio::main] async fn main() -> Result<()> { let file_path = env::args().nth(1).expect("CSV path required"); let mut rdr = csv_async::AsyncDeserializer::from_reader(File::open(file_path).await?); let records = rdr.deserialize::(); - - let accounts = _main(records).await?; - - fetch_and_display_accounts(&accounts); + _main(records).await?; Ok(()) } -async fn _main(mut records: DeserializeRecordsStream<'_, File, Transaction>) -> Result> { - let mut addr_map: AccountActors = HashMap::new(); +async fn _main(mut records: DeserializeRecordsStream<'_, File, Transaction>) -> Result<()> { + let tx_processor_addr = run_async_actor(TxProcessor::new()); while let Some(record) = records.next().await { let transaction = record?; - if addr_map.contains_key(&transaction.client) { - addr_map.get(&transaction.client).unwrap().send(transaction.into()).await?; - } else { - let actor = AccountActor::new(transaction.client); - let addr = run_actor(actor); - addr_map.insert(transaction.client, addr.clone()); - addr.send(transaction.into()).await?; - } + tx_processor_addr.send(Command::SendTx(transaction)).await?; } - - let accounts: Vec = stop_actors(&addr_map).await?; - Ok(accounts) + + let accounts = send_stop(&tx_processor_addr).await?; + display_accounts(&accounts); + Ok(()) } -async fn stop_actors(actors: &AccountActors) -> Result> { - let mut accounts = vec![]; - for addr in actors.values() { - let (resp_tx, resp_rx) = oneshot::channel(); - addr.send(Command::Stop(resp_tx)).await?; - let account = resp_rx.await; - accounts.push(account.expect("Error fetching account from actor")); - } - Ok(accounts) +async fn send_stop(processor: &Sender) -> Result> { + let (resp_tx, resp_rx) = oneshot::channel(); + processor.send(Command::Stop(resp_tx)).await?; + Ok(resp_rx.await?) } -fn fetch_and_display_accounts(accounts: &[Account]) { +fn display_accounts(accounts: &[Account]) { println!("client,available,held,total,locked"); for account in accounts.iter() { println!( diff --git a/src/models/actor.rs b/src/models/actor.rs index 7929da3..5743e60 100644 --- a/src/models/actor.rs +++ b/src/models/actor.rs @@ -1,3 +1,13 @@ +use anyhow::Result; pub trait Actor { - fn handle(&mut self, cmd: T); + type Output; + + fn handle(&mut self, cmd: T) -> Result; +} + +#[async_trait] +pub trait AsyncActor { + type Output; + + async fn handle(&mut self, cmd: T) -> Result; } \ No newline at end of file diff --git a/src/models/transaction.rs b/src/models/transaction.rs index b515b79..1e53e9c 100644 --- a/src/models/transaction.rs +++ b/src/models/transaction.rs @@ -1,6 +1,6 @@ use serde::Deserialize; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, PartialEq, Debug)] pub enum TransactionType { #[serde(alias = "deposit")] Deposit,