This commit is contained in:
JasterV 2021-07-29 19:11:07 +02:00
parent 3e2723289f
commit 4f6c47899b
11 changed files with 210 additions and 54 deletions

12
Cargo.lock generated
View file

@ -8,6 +8,17 @@ version = "1.0.42"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "595d3cfa7a60d4555cb5067b99f07142a08ea778de5cf993f7b75c7d8fabc486" checksum = "595d3cfa7a60d4555cb5067b99f07142a08ea778de5cf993f7b75c7d8fabc486"
[[package]]
name = "async-trait"
version = "0.1.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.1" version = "1.0.1"
@ -484,6 +495,7 @@ name = "txcmp"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"csv-async", "csv-async",
"serde", "serde",
"thiserror", "thiserror",

View file

@ -11,4 +11,5 @@ tokio-stream = "0.1.7"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
csv-async = { version = "1.2.1", features = ["with_serde", "tokio"]} csv-async = { version = "1.2.1", features = ["with_serde", "tokio"]}
thiserror = "1.0.26" thiserror = "1.0.26"
anyhow = "1.0.40" anyhow = "1.0.40"
async-trait = "0.1.51"

View file

@ -1,10 +1,18 @@
use crate::{models::{account::Account, actor::Actor, transaction::Transaction}}; use anyhow::Result;
use crate::{errors::AccountError, models::{account::Account, actor::Actor, transaction::TransactionType}};
use std::{collections::HashMap}; use std::{collections::HashMap};
use crate::actors::account::messages::Command; use crate::actors::account::messages::Command;
struct TransactionData {
pub ty: TransactionType,
pub amount: f32,
pub disputed: bool
}
pub struct AccountActor { pub struct AccountActor {
account: Account, account: Account,
transactions: HashMap<u32, Transaction>, transactions: HashMap<u32, TransactionData>,
} }
impl AccountActor { impl AccountActor {
@ -14,13 +22,87 @@ impl AccountActor {
transactions: HashMap::new(), transactions: HashMap::new(),
} }
} }
fn withdraw(&mut self, tx_id: u32, amount: f32) -> Result<()> {
self.account.withdraw(amount)?;
self.transactions.insert(tx_id, TransactionData {
ty: TransactionType::Withdrawal,
amount,
disputed: false
});
Ok(())
}
fn deposit(&mut self, tx_id: u32, amount: f32) -> Result<()> {
self.account.deposit(amount)?;
self.transactions.insert(tx_id, TransactionData {
ty: TransactionType::Deposit,
amount,
disputed: false
});
Ok(())
}
fn dispute(&mut self, tx_id: u32) -> Result<()> {
let tx = self.transactions
.get_mut(&tx_id)
.ok_or(AccountError::TxNotFound(
tx_id,
self.account.get_client()
))?;
if tx.ty == TransactionType::Deposit && !tx.disputed {
self.account.held(tx.amount)?;
tx.disputed = true;
}
Ok(())
}
fn resolve(&mut self, tx_id: u32) -> Result<()> {
let tx = self.transactions
.get_mut(&tx_id)
.ok_or(AccountError::TxNotFound(
tx_id,
self.account.get_client()
))?;
if tx.disputed {
self.account.free(tx.amount)?;
tx.disputed = false;
}
Ok(())
}
fn chargeback(&mut self, tx_id: u32) -> Result<()> {
let tx = self.transactions
.get_mut(&tx_id)
.ok_or(AccountError::TxNotFound(
tx_id,
self.account.get_client()
))?;
if tx.disputed {
self
.account
.chargeback(tx.amount)?;
tx.disputed = false;
}
Ok(())
}
} }
impl Actor<Command> for AccountActor { impl Actor<Command> for AccountActor {
fn handle(&mut self, command: Command) { type Output = ();
println!("command: {:#?}", command);
if let Command::Stop(rx) = command { fn handle(&mut self, command: Command) -> Result<Self::Output> {
let _ = rx.send(self.account.clone()); let _ = match command {
} Command::Withdraw(tx_id, amount) => self.withdraw(tx_id, amount),
Command::Deposit(tx_id, amount) => self.deposit(tx_id, amount),
Command::Dispute(tx_id) => self.dispute(tx_id),
Command::Resolve(tx_id) => self.resolve(tx_id),
Command::Chargeback(tx_id) => self.chargeback(tx_id),
Command::Stop(rx) => {
let _ = rx.send(self.account.clone());
Ok(())
},
};
Ok(())
} }
} }

View file

@ -2,8 +2,8 @@ use crate::{models::{account::Account, responder::Responder, transaction::{Trans
#[derive(Debug)] #[derive(Debug)]
pub enum Command { pub enum Command {
Withdraw(f32), Withdraw(u32, f32),
Deposit(f32), Deposit(u32, f32),
Dispute(u32), Dispute(u32),
Resolve(u32), Resolve(u32),
Chargeback(u32), Chargeback(u32),
@ -14,8 +14,8 @@ impl From<Transaction> for Command {
fn from(tx: Transaction) -> Self { fn from(tx: Transaction) -> Self {
match tx.ty { match tx.ty {
TransactionType::Chargeback => Command::Chargeback(tx.tx), TransactionType::Chargeback => Command::Chargeback(tx.tx),
TransactionType::Deposit => Command::Deposit(tx.amount), TransactionType::Deposit => Command::Deposit(tx.tx, tx.amount),
TransactionType::Withdrawal => Command::Withdraw(tx.amount), TransactionType::Withdrawal => Command::Withdraw(tx.tx, tx.amount),
TransactionType::Dispute => Command::Dispute(tx.tx), TransactionType::Dispute => Command::Dispute(tx.tx),
TransactionType::Resolve => Command::Resolve(tx.tx) TransactionType::Resolve => Command::Resolve(tx.tx)
} }

View file

@ -1,6 +1,6 @@
use tokio::sync::mpsc::{self, Sender}; use tokio::sync::mpsc::{self, Sender};
use crate::models::actor::Actor; use crate::models::actor::{Actor, AsyncActor};
pub mod account; pub mod account;
pub mod tx_processor; pub mod tx_processor;
@ -12,7 +12,23 @@ where
{ {
let (tx, mut rx) = mpsc::channel(32); let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(msg) = rx.recv().await { actor.handle(msg) } while let Some(msg) = rx.recv().await {
let _ = actor.handle(msg);
}
});
tx
}
pub fn run_async_actor<T, E>(mut actor: E) -> Sender<T>
where
T: 'static + Send,
E: 'static + AsyncActor<T> + Send
{
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
let _ = actor.handle(msg).await;
}
}); });
tx tx
} }

View file

@ -1,12 +1,57 @@
use crate::{actors::{account::{actor::AccountActor, messages::Command as AccountCommand}, run_actor}, models::{account::Account, actor::AsyncActor, transaction::Transaction}};
use tokio::sync::{mpsc::Sender, oneshot};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
use crate::actors::account::messages::Command as AccountCommand;
use super::messages::Command; use super::messages::Command;
use anyhow::Result;
pub struct TxProcessor { pub struct TxProcessor {
accounts: HashMap<u16, Sender<AccountCommand>> accounts: HashMap<u16, Sender<AccountCommand>>
} }
impl TxProcessor { impl TxProcessor {
pub fn handle(&mut self, command: Command) {}
pub fn new() -> Self {
Self { accounts: HashMap::new() }
}
async fn send_tx(&mut self, transaction: Transaction) -> Result<()> {
if self.accounts.contains_key(&transaction.client) {
self.accounts.get(&transaction.client).unwrap().send(transaction.into()).await?;
} else {
let actor = AccountActor::new(transaction.client);
let addr = run_actor(actor);
self.accounts.insert(transaction.client, addr.clone());
addr.send(transaction.into()).await?;
}
Ok(())
}
async fn stop_actors(&self) -> Result<Vec<Account>> {
let mut accounts = vec![];
for addr in self.accounts.values() {
let (resp_tx, resp_rx) = oneshot::channel();
addr.send(AccountCommand::Stop(resp_tx)).await?;
let account = resp_rx.await;
accounts.push(account.expect("Error fetching account from actor"));
}
Ok(accounts)
}
}
#[async_trait]
impl AsyncActor<Command> for TxProcessor {
type Output = ();
async fn handle(&mut self, command: Command) -> Result<Self::Output> {
match command {
Command::SendTx(transaction ) => {
self.send_tx(transaction).await?;
},
Command::Stop(responder) => {
let accounts = self.stop_actors().await?;
let _ = responder.send(accounts);
}
}
Ok(())
}
} }

View file

@ -2,9 +2,6 @@ use crate::models::{account::Account, responder::Responder, transaction::Transac
#[derive(Debug)] #[derive(Debug)]
pub enum Command { pub enum Command {
SendTx { SendTx(Transaction),
client: u16,
transaction: Transaction
},
Stop(Responder<Vec<Account>>) Stop(Responder<Vec<Account>>)
} }

View file

@ -11,3 +11,9 @@ pub enum TransactionError {
#[error("Account `{0}` is locked")] #[error("Account `{0}` is locked")]
AccountLocked(u16), AccountLocked(u16),
} }
#[derive(Error, Debug)]
pub enum AccountError {
#[error("Transaction `{0}` not found for client {1}")]
TxNotFound(u32, u16)
}

View file

@ -1,60 +1,47 @@
#[macro_use]
extern crate async_trait;
mod errors; mod errors;
mod models; mod models;
mod actors; mod actors;
use anyhow::Result; use anyhow::Result;
use csv_async::DeserializeRecordsStream; use csv_async::DeserializeRecordsStream;
use std::{collections::HashMap, env}; use std::env;
use tokio::{fs::File, sync::{mpsc::Sender, oneshot}}; use tokio::{fs::File, sync::{mpsc::Sender, oneshot}};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use crate::{models::{account::Account, transaction::Transaction}}; use crate::{models::{account::Account, transaction::Transaction}};
use actors::{account::messages::Command, account::{actor::AccountActor}, run_actor}; use actors::{run_async_actor, tx_processor::{actor::TxProcessor, messages::Command}};
type AccountActors = HashMap<u16, Sender<Command>>;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let file_path = env::args().nth(1).expect("CSV path required"); let file_path = env::args().nth(1).expect("CSV path required");
let mut rdr = csv_async::AsyncDeserializer::from_reader(File::open(file_path).await?); let mut rdr = csv_async::AsyncDeserializer::from_reader(File::open(file_path).await?);
let records = rdr.deserialize::<Transaction>(); let records = rdr.deserialize::<Transaction>();
_main(records).await?;
let accounts = _main(records).await?;
fetch_and_display_accounts(&accounts);
Ok(()) Ok(())
} }
async fn _main(mut records: DeserializeRecordsStream<'_, File, Transaction>) -> Result<Vec<Account>> { async fn _main(mut records: DeserializeRecordsStream<'_, File, Transaction>) -> Result<()> {
let mut addr_map: AccountActors = HashMap::new(); let tx_processor_addr = run_async_actor(TxProcessor::new());
while let Some(record) = records.next().await { while let Some(record) = records.next().await {
let transaction = record?; let transaction = record?;
if addr_map.contains_key(&transaction.client) { tx_processor_addr.send(Command::SendTx(transaction)).await?;
addr_map.get(&transaction.client).unwrap().send(transaction.into()).await?;
} else {
let actor = AccountActor::new(transaction.client);
let addr = run_actor(actor);
addr_map.insert(transaction.client, addr.clone());
addr.send(transaction.into()).await?;
}
} }
let accounts: Vec<Account> = stop_actors(&addr_map).await?; let accounts = send_stop(&tx_processor_addr).await?;
Ok(accounts) display_accounts(&accounts);
Ok(())
} }
async fn stop_actors(actors: &AccountActors) -> Result<Vec<Account>> { async fn send_stop(processor: &Sender<Command>) -> Result<Vec<Account>> {
let mut accounts = vec![]; let (resp_tx, resp_rx) = oneshot::channel();
for addr in actors.values() { processor.send(Command::Stop(resp_tx)).await?;
let (resp_tx, resp_rx) = oneshot::channel(); Ok(resp_rx.await?)
addr.send(Command::Stop(resp_tx)).await?;
let account = resp_rx.await;
accounts.push(account.expect("Error fetching account from actor"));
}
Ok(accounts)
} }
fn fetch_and_display_accounts(accounts: &[Account]) { fn display_accounts(accounts: &[Account]) {
println!("client,available,held,total,locked"); println!("client,available,held,total,locked");
for account in accounts.iter() { for account in accounts.iter() {
println!( println!(

View file

@ -1,3 +1,13 @@
use anyhow::Result;
pub trait Actor<T> { pub trait Actor<T> {
fn handle(&mut self, cmd: T); type Output;
fn handle(&mut self, cmd: T) -> Result<Self::Output>;
}
#[async_trait]
pub trait AsyncActor<T> {
type Output;
async fn handle(&mut self, cmd: T) -> Result<Self::Output>;
} }

View file

@ -1,6 +1,6 @@
use serde::Deserialize; use serde::Deserialize;
#[derive(Deserialize, Debug)] #[derive(Deserialize, PartialEq, Debug)]
pub enum TransactionType { pub enum TransactionType {
#[serde(alias = "deposit")] #[serde(alias = "deposit")]
Deposit, Deposit,