actix added

This commit is contained in:
JasterV 2021-07-29 02:48:43 +02:00
parent d08286beb6
commit cc9eacbeec
7 changed files with 179 additions and 6 deletions

80
Cargo.lock generated
View file

@ -2,6 +2,51 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "actix"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3720d0064a0ce5c0de7bd93bdb0a6caebab2a9b5668746145d7b3b0c5da02914"
dependencies = [
"actix-rt",
"actix_derive",
"bitflags",
"bytes",
"crossbeam-channel",
"futures-core",
"futures-sink",
"futures-task",
"futures-util",
"log",
"once_cell",
"parking_lot",
"pin-project-lite",
"smallvec",
"tokio",
"tokio-util",
]
[[package]]
name = "actix-rt"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc7d7cd957c9ed92288a7c3c96af81fa5291f65247a76a34dac7b6af74e52ba0"
dependencies = [
"futures-core",
"tokio",
]
[[package]]
name = "actix_derive"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "anyhow"
version = "1.0.42"
@ -44,6 +89,26 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "csv-async"
version = "1.2.1"
@ -479,10 +544,25 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "txcmp"
version = "0.1.0"
dependencies = [
"actix",
"anyhow",
"csv-async",
"serde",

View file

@ -11,4 +11,5 @@ tokio-stream = "0.1.7"
serde = { version = "1.0", features = ["derive"] }
csv-async = { version = "1.2.1", features = ["with_serde", "tokio"]}
thiserror = "1.0.26"
actix = "0.12.0"
anyhow = "1.0.40"

44
src/actors/account.rs Normal file
View file

@ -0,0 +1,44 @@
use std::collections::HashMap;
use actix::{Actor, ActorContext, Context, Handler, MessageResult, dev::MessageResponse};
use crate::{messages::{Command, Stop}, models::{account::Account, transaction::Transaction}};
pub struct AccountActor {
account: Account,
transactions: HashMap<u32, Transaction>
}
impl AccountActor {
pub fn new(client_id: u16) -> Self {
Self {
account: Account::new(client_id),
transactions: HashMap::new()
}
}
}
impl Actor for AccountActor {
type Context = Context<Self>;
fn stopping(&mut self, _ctx: &mut Self::Context) -> actix::Running {
actix::Running::Stop
}
}
// now we need to implement `Handler` on `Calculator` for the `Sum` message.
impl Handler<Command> for AccountActor {
type Result = (); // <- Message response type
fn handle(&mut self, msg: Command, ctx: &mut Context<Self>) -> Self::Result {
// TODO: Implement commands
}
}
impl Handler<Stop> for AccountActor {
type Result = MessageResult<Stop>; // <- Message response type
fn handle(&mut self, _msg: Stop, ctx: &mut Context<Self>) -> Self::Result {
ctx.stop();
MessageResult(self.account.clone())
}
}

1
src/actors/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod account;

View file

@ -1,14 +1,17 @@
mod errors;
mod models;
mod actors;
mod messages;
use actix::{Addr, Actor};
use anyhow::Result;
use models::{account::Account, transaction::Transaction};
use std::{collections::HashMap, env};
use tokio::fs::File;
use tokio::{fs::File};
use tokio_stream::StreamExt;
use crate::{messages::{Command, Stop}, models::{account::Account, transaction::Transaction}};
use actors::account::AccountActor;
type TransactionsMap = HashMap<u32, Transaction>;
type AccountsMap = HashMap<u16, Account>;
type AccountActors = HashMap<u16, Addr<AccountActor>>;
#[tokio::main]
async fn main() -> Result<()> {
@ -16,9 +19,25 @@ async fn main() -> Result<()> {
let mut rdr = csv_async::AsyncDeserializer::from_reader(File::open(file_path).await?);
let mut records = rdr.deserialize::<Transaction>();
let mut addr_map: AccountActors = HashMap::new();
while let Some(record) = records.next().await {
let transaction = record?;
println!("{:#?}", transaction);
if addr_map.contains_key(&transaction.client) {
addr_map.get(&transaction.client).unwrap().do_send::<Command>(transaction.into());
} else {
let actor = AccountActor::new(transaction.client);
let addr = actor.start();
addr_map.insert(transaction.client, addr.clone());
addr.do_send::<Command>(transaction.into());
}
}
let mut result: Vec<Account> = vec![];
for addr in addr_map.values() {
let account: Account = addr.send(Stop).await?;
result.push(account);
}
Ok(())

28
src/messages/mod.rs Normal file
View file

@ -0,0 +1,28 @@
use crate::models::{transaction::{Transaction, TransactionType}, account::Account};
use actix::Message;
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub enum Command {
Withdraw(f32),
Deposit(f32),
Dispute(u32),
Resolve(u32),
Chargeback(u32),
}
#[derive(Message)]
#[rtype(result = "Account")]
pub struct Stop;
impl From<Transaction> for Command {
fn from(tx: Transaction) -> Self {
match tx.ty {
TransactionType::Chargeback => Command::Chargeback(tx.tx),
TransactionType::Deposit => Command::Deposit(tx.amount),
TransactionType::Withdrawal => Command::Withdraw(tx.amount),
TransactionType::Dispute => Command::Dispute(tx.tx),
TransactionType::Resolve => Command::Resolve(tx.tx)
}
}
}

View file

@ -3,7 +3,7 @@ use serde::Serialize;
use crate::errors::TransactionError;
#[derive(Serialize, Debug)]
#[derive(Serialize, Clone, Debug)]
pub struct Account {
client: u16,
available: f32,