actors added

This commit is contained in:
JasterV 2021-07-29 14:17:17 +02:00
parent 86ae78bd83
commit 3e2723289f
14 changed files with 113 additions and 152 deletions

80
Cargo.lock generated
View file

@ -2,51 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "actix"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3720d0064a0ce5c0de7bd93bdb0a6caebab2a9b5668746145d7b3b0c5da02914"
dependencies = [
"actix-rt",
"actix_derive",
"bitflags",
"bytes",
"crossbeam-channel",
"futures-core",
"futures-sink",
"futures-task",
"futures-util",
"log",
"once_cell",
"parking_lot",
"pin-project-lite",
"smallvec",
"tokio",
"tokio-util",
]
[[package]]
name = "actix-rt"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc7d7cd957c9ed92288a7c3c96af81fa5291f65247a76a34dac7b6af74e52ba0"
dependencies = [
"futures-core",
"tokio",
]
[[package]]
name = "actix_derive"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "anyhow"
version = "1.0.42"
@ -89,26 +44,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "csv-async"
version = "1.2.1"
@ -544,25 +479,10 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "txcmp"
version = "0.1.0"
dependencies = [
"actix",
"anyhow",
"csv-async",
"serde",

View file

@ -11,5 +11,4 @@ tokio-stream = "0.1.7"
serde = { version = "1.0", features = ["derive"] }
csv-async = { version = "1.2.1", features = ["with_serde", "tokio"]}
thiserror = "1.0.26"
actix = "0.12.0"
anyhow = "1.0.40"

View file

@ -1,44 +0,0 @@
use std::collections::HashMap;
use actix::{Actor, ActorContext, Context, Handler, MessageResult, dev::MessageResponse};
use crate::{messages::{Command, Stop}, models::{account::Account, transaction::Transaction}};
pub struct AccountActor {
account: Account,
transactions: HashMap<u32, Transaction>
}
impl AccountActor {
pub fn new(client_id: u16) -> Self {
Self {
account: Account::new(client_id),
transactions: HashMap::new()
}
}
}
impl Actor for AccountActor {
type Context = Context<Self>;
fn stopping(&mut self, _ctx: &mut Self::Context) -> actix::Running {
actix::Running::Stop
}
}
// now we need to implement `Handler` on `Calculator` for the `Sum` message.
impl Handler<Command> for AccountActor {
type Result = (); // <- Message response type
fn handle(&mut self, msg: Command, ctx: &mut Context<Self>) -> Self::Result {
// TODO: Implement commands
}
}
impl Handler<Stop> for AccountActor {
type Result = MessageResult<Stop>; // <- Message response type
fn handle(&mut self, _msg: Stop, ctx: &mut Context<Self>) -> Self::Result {
ctx.stop();
MessageResult(self.account.clone())
}
}

View file

@ -0,0 +1,26 @@
use crate::{models::{account::Account, actor::Actor, transaction::Transaction}};
use std::{collections::HashMap};
use crate::actors::account::messages::Command;
pub struct AccountActor {
account: Account,
transactions: HashMap<u32, Transaction>,
}
impl AccountActor {
pub fn new(client_id: u16) -> Self {
Self {
account: Account::new(client_id),
transactions: HashMap::new(),
}
}
}
impl Actor<Command> for AccountActor {
fn handle(&mut self, command: Command) {
println!("command: {:#?}", command);
if let Command::Stop(rx) = command {
let _ = rx.send(self.account.clone());
}
}
}

View file

@ -1,20 +1,15 @@
use crate::models::{transaction::{Transaction, TransactionType}, account::Account};
use actix::Message;
use crate::{models::{account::Account, responder::Responder, transaction::{Transaction, TransactionType}}};
#[derive(Message, Debug)]
#[rtype(result = "()")]
#[derive(Debug)]
pub enum Command {
Withdraw(f32),
Deposit(f32),
Dispute(u32),
Resolve(u32),
Chargeback(u32),
Stop(Responder<Account>)
}
#[derive(Message)]
#[rtype(result = "Account")]
pub struct Stop;
impl From<Transaction> for Command {
fn from(tx: Transaction) -> Self {
match tx.ty {

View file

@ -0,0 +1,2 @@
pub mod messages;
pub mod actor;

View file

@ -1 +1,18 @@
pub mod account;
use tokio::sync::mpsc::{self, Sender};
use crate::models::actor::Actor;
pub mod account;
pub mod tx_processor;
pub fn run_actor<T, E>(mut actor: E) -> Sender<T>
where
T: 'static + Send,
E: 'static + Actor<T> + Send
{
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await { actor.handle(msg) }
});
tx
}

View file

@ -0,0 +1,12 @@
use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
use crate::actors::account::messages::Command as AccountCommand;
use super::messages::Command;
pub struct TxProcessor {
accounts: HashMap<u16, Sender<AccountCommand>>
}
impl TxProcessor {
pub fn handle(&mut self, command: Command) {}
}

View file

@ -0,0 +1,10 @@
use crate::models::{account::Account, responder::Responder, transaction::Transaction};
#[derive(Debug)]
pub enum Command {
SendTx {
client: u16,
transaction: Transaction
},
Stop(Responder<Vec<Account>>)
}

View file

@ -0,0 +1,2 @@
pub mod actor;
pub mod messages;

View file

@ -1,47 +1,62 @@
mod errors;
mod models;
mod actors;
mod messages;
use actix::{Addr, Actor};
use anyhow::Result;
use csv_async::DeserializeRecordsStream;
use std::{collections::HashMap, env};
use tokio::{fs::File};
use tokio::{fs::File, sync::{mpsc::Sender, oneshot}};
use tokio_stream::StreamExt;
use crate::{messages::{Command, Stop}, models::{account::Account, transaction::Transaction}};
use actors::account::AccountActor;
use crate::{models::{account::Account, transaction::Transaction}};
use actors::{account::messages::Command, account::{actor::AccountActor}, run_actor};
type AccountActors = HashMap<u16, Addr<AccountActor>>;
type AccountActors = HashMap<u16, Sender<Command>>;
#[tokio::main]
async fn main() -> Result<()> {
let file_path = env::args().nth(1).expect("CSV path required");
let mut rdr = csv_async::AsyncDeserializer::from_reader(File::open(file_path).await?);
let mut records = rdr.deserialize::<Transaction>();
let records = rdr.deserialize::<Transaction>();
let accounts = _main(records).await?;
fetch_and_display_accounts(&accounts);
Ok(())
}
async fn _main(mut records: DeserializeRecordsStream<'_, File, Transaction>) -> Result<Vec<Account>> {
let mut addr_map: AccountActors = HashMap::new();
while let Some(record) = records.next().await {
let transaction = record?;
if addr_map.contains_key(&transaction.client) {
addr_map.get(&transaction.client).unwrap().do_send::<Command>(transaction.into());
addr_map.get(&transaction.client).unwrap().send(transaction.into()).await?;
} else {
let actor = AccountActor::new(transaction.client);
let addr = actor.start();
let addr = run_actor(actor);
addr_map.insert(transaction.client, addr.clone());
addr.do_send::<Command>(transaction.into());
addr.send(transaction.into()).await?;
}
}
fetch_and_display_accounts(&addr_map).await?;
Ok(())
let accounts: Vec<Account> = stop_actors(&addr_map).await?;
Ok(accounts)
}
async fn fetch_and_display_accounts(actors: &AccountActors) -> Result<()> {
println!("client,available,held,total,locked");
async fn stop_actors(actors: &AccountActors) -> Result<Vec<Account>> {
let mut accounts = vec![];
for addr in actors.values() {
let account: Account = addr.send(Stop).await?;
let (resp_tx, resp_rx) = oneshot::channel();
addr.send(Command::Stop(resp_tx)).await?;
let account = resp_rx.await;
accounts.push(account.expect("Error fetching account from actor"));
}
Ok(accounts)
}
fn fetch_and_display_accounts(accounts: &[Account]) {
println!("client,available,held,total,locked");
for account in accounts.iter() {
println!(
"{},{},{},{},{}",
account.get_client(),
@ -51,5 +66,4 @@ async fn fetch_and_display_accounts(actors: &AccountActors) -> Result<()> {
account.get_locked()
)
}
Ok(())
}

3
src/models/actor.rs Normal file
View file

@ -0,0 +1,3 @@
pub trait Actor<T> {
fn handle(&mut self, cmd: T);
}

View file

@ -1,2 +1,4 @@
pub mod account;
pub mod transaction;
pub mod responder;
pub mod actor;

3
src/models/responder.rs Normal file
View file

@ -0,0 +1,3 @@
use tokio::sync::oneshot;
pub type Responder<T> = oneshot::Sender<T>;