diff --git a/src/actors/chat_server.rs b/src/actors/chat_server.rs index 4a202b3..eb245ab 100644 --- a/src/actors/chat_server.rs +++ b/src/actors/chat_server.rs @@ -1,6 +1,6 @@ use crate::messages::{ - chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave}, - chat_session::Message, + server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave}, + session::Message, }; use crate::models::{RoomId, SessionId}; use actix::{Actor, Context, Handler, MessageResult, Recipient}; @@ -63,7 +63,7 @@ impl Actor for ChatServer { impl Handler for ChatServer { type Result = (); - + fn handle(&mut self, msg: Connect, _ctx: &mut Self::Context) -> Self::Result { let Connect { id, addr } = msg; self.sessions.insert(id, addr); diff --git a/src/actors/chat_session.rs b/src/actors/chat_session.rs index 7a12092..a36511a 100644 --- a/src/actors/chat_session.rs +++ b/src/actors/chat_session.rs @@ -1,17 +1,17 @@ use crate::{ actors::chat_server::ChatServer, constants::CLIENT_TIMEOUT, - models::{ - commands::Command, - ws::{MessageType, WsMessage}, - RoomId, SessionId, - }, + models::{RoomId, SessionId}, }; use crate::{ constants::HEARTBEAT_INTERVAL, messages::{ - chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave}, - chat_session::Message, + server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave}, + session::{ + command::Command, + wsmessage::{MessageType, WsMessage}, + Message, + }, }, }; use actix::{ @@ -55,124 +55,6 @@ impl WsChatSession { ctx.ping(b""); }); } - - pub fn handle_msg(&self, msg: WsMessage, ctx: &mut WebsocketContext) { - let data = msg.data.unwrap_or("".into()); - match msg.ty { - MessageType::Create => self.create(ctx), - MessageType::Join => self.join(data, ctx), - MessageType::Msg => self.msg(data, ctx), - MessageType::Leave => self.leave(ctx), - MessageType::Err => (), - } - } - - pub fn execute(&self, cmd: Command, _ctx: &mut WebsocketContext) { - match cmd { - Command::Msg(msg) => { - self.addr.do_send(ClientMessage { - session: self.id.clone(), - room: self.room.clone().unwrap(), - msg, - }); - } - } - } - - fn create(&self, ctx: &mut WebsocketContext) { - self.addr - .send(CreateRoom { - session: self.id.clone(), - }) - .into_actor(self) - .then(|res, act, ctx| { - match res { - Ok(res) => { - act.room = Some(res.clone()); - ctx.text(WsMessage { - ty: MessageType::Create, - data: Some(res.to_string()), - }); - } - // something is wrong with chat server - Err(err) => { - ctx.text(WsMessage::err(err.to_string())); - ctx.stop(); - } - } - fut::ready(()) - }) - .wait(ctx); - } - - fn join(&self, room_id: String, ctx: &mut WebsocketContext) { - match Uuid::from_str(&room_id) { - Ok(uuid) => { - self.addr - .send(JoinRoom { - room: uuid, - session: self.id.clone(), - }) - .into_actor(self) - .then(move |res, act, ctx| { - match res { - Ok(res) => match res { - Ok(_) => { - act.room = Some(uuid.clone()); - ctx.text(WsMessage { - ty: MessageType::Msg, - data: Some("Joined!".into()), - }) - } - Err(err) => ctx.text(WsMessage::err(err.to_string())), - }, - // something is wrong with chat server - Err(err) => { - ctx.text(WsMessage::err(err.to_string())); - ctx.stop(); - } - } - fut::ready(()) - }) - .wait(ctx); - } - Err(err) => ctx.text(WsMessage::err(err.to_string())), - } - } - - fn msg(&self, msg: String, ctx: &mut WebsocketContext) { - match Command::from_str(&msg) { - Ok(cmd) if self.room.is_some() => self.execute(cmd, ctx), - Ok(_) => ctx.text(WsMessage::err("You are not in a room yet".into())), - Err(err) => ctx.text(WsMessage::err(err.to_string())), - } - } - - fn leave(&self, ctx: &mut WebsocketContext) { - self.addr - .send(Leave { - session: self.id.clone(), - }) - .into_actor(self) - .then(move |res, act, ctx| { - match res { - Ok(_) => { - act.room = None; - ctx.text(WsMessage { - ty: MessageType::Leave, - data: Some("Room leaved".into()), - }) - } - // something is wrong with chat server - Err(err) => { - ctx.text(WsMessage::err(err.to_string())); - ctx.stop(); - } - } - fut::ready(()) - }) - .wait(ctx); - } } impl Actor for WsChatSession { @@ -227,7 +109,7 @@ impl StreamHandler> for WsChatSession { match msg { ws::Message::Text(msg) => match serde_json::from_str::(&msg) { - Ok(content) => self.handle_msg(content, ctx), + Ok(content) => ctx.notify(content), Err(err) => ctx.text(WsMessage::err(err.to_string())), }, ws::Message::Ping(msg) => { @@ -245,3 +127,121 @@ impl StreamHandler> for WsChatSession { } } } + +impl Handler for WsChatSession { + type Result = (); + + fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) -> Self::Result { + let data = msg.data.unwrap_or("".into()); + match msg.ty { + MessageType::Create => self.create(ctx), + MessageType::Join => match Uuid::from_str(&data) { + Ok(uuid) => self.join(uuid, ctx), + Err(err) => ctx.text(WsMessage::err(err.to_string())), + }, + MessageType::Msg => self.msg(data, ctx), + MessageType::Leave => self.leave(ctx), + _ => (), + } + } +} + +impl WsChatSession { + fn create(&self, ctx: &mut WebsocketContext) { + let send_create = self.addr.send(CreateRoom { + session: self.id.clone(), + }); + let send_create = send_create.into_actor(self); + send_create + .then(move |res, act, ctx| { + // Actor's state updated here + match res { + Ok(res) => { + act.room = Some(res.clone()); + ctx.text(WsMessage::info(res.to_string())); + } + // something is wrong with chat server + Err(err) => { + ctx.text(WsMessage::err(err.to_string())); + ctx.stop(); + } + } + fut::ready(()) + }) + .wait(ctx); + } + + fn join(&self, room_id: Uuid, ctx: &mut WebsocketContext) { + let join_room = self.addr.send(JoinRoom { + room: room_id, + session: self.id.clone(), + }); + let join_room = join_room.into_actor(self); + join_room + .then(move |response, act, ctx| { + match response { + Ok(res) if res.is_ok() => { + act.room = Some(room_id.clone()); + ctx.text(WsMessage { + ty: MessageType::Msg, + data: Some("Joined!".into()), + }) + } + Ok(res) => ctx.text(WsMessage::err(res.unwrap_err().to_string())), + Err(err) => { + ctx.text(WsMessage::err(err.to_string())); + ctx.stop(); + } + } + fut::ready(()) + }) + .wait(ctx); + } + + fn msg(&self, msg: String, ctx: &mut WebsocketContext) { + match Command::from_str(&msg) { + Ok(cmd) if self.room.is_some() => ctx.notify(cmd), + Ok(_) => ctx.text(WsMessage::err("You are not in a room yet".into())), + Err(err) => ctx.text(WsMessage::err(err.to_string())), + } + } + + fn leave(&self, ctx: &mut WebsocketContext) { + self.addr + .send(Leave { + session: self.id.clone(), + }) + .into_actor(self) + .then(move |res, act, ctx| { + match res { + Ok(_) => { + act.room = None; + ctx.text(WsMessage::info("Room leaved".into())) + } + // something is wrong with chat server + Err(err) => { + ctx.text(WsMessage::err(err.to_string())); + ctx.stop(); + } + } + fut::ready(()) + }) + .wait(ctx); + } +} + +impl Handler for WsChatSession { + type Result = (); + + fn handle(&mut self, msg: Command, _ctx: &mut Self::Context) -> Self::Result { + match msg { + Command::Msg(msg) => { + self.addr.do_send(ClientMessage { + session: self.id.clone(), + room: self.room.clone().unwrap(), + msg, + }); + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 5db9b47..3041b81 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ mod routes; use crate::{actors::chat_server::ChatServer, models::AppState}; use actix::Actor; use actix_web::{App, HttpServer}; -use routes::ws::connect; +use routes::connect; #[actix_web::main] async fn main() -> std::io::Result<()> { diff --git a/src/messages/mod.rs b/src/messages/mod.rs index feae37d..1c7562d 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -1,2 +1,2 @@ -pub mod chat_server; -pub mod chat_session; +pub mod server; +pub mod session; diff --git a/src/messages/chat_server.rs b/src/messages/server/mod.rs similarity index 96% rename from src/messages/chat_server.rs rename to src/messages/server/mod.rs index fb0051c..815a119 100644 --- a/src/messages/chat_server.rs +++ b/src/messages/server/mod.rs @@ -1,4 +1,4 @@ -use super::chat_session::Message; +use super::session::Message; use crate::models::{RoomId, SessionId}; use actix::{Message as ActixMessage, Recipient}; use uuid::Uuid; diff --git a/src/models/commands.rs b/src/messages/session/command.rs similarity index 83% rename from src/models/commands.rs rename to src/messages/session/command.rs index 85d83d2..a352f79 100644 --- a/src/models/commands.rs +++ b/src/messages/session/command.rs @@ -1,17 +1,20 @@ +use actix::Message as ActixMessage; use derive_more::{Display, Error}; use std::convert::Into; use std::str::FromStr; +#[derive(ActixMessage)] +#[rtype(result = "()")] +pub enum Command { + Msg(String), +} + #[derive(Debug, Display, Error)] #[display(fmt = "Invalid command: {}", msg)] pub struct CommandError { msg: &'static str, } -pub enum Command { - Msg(String), -} - // TODO: IMPLEMENT MORE COMMANDS impl FromStr for Command { type Err = CommandError; diff --git a/src/messages/chat_session.rs b/src/messages/session/mod.rs similarity index 76% rename from src/messages/chat_session.rs rename to src/messages/session/mod.rs index 348699c..0d823a0 100644 --- a/src/messages/chat_session.rs +++ b/src/messages/session/mod.rs @@ -1,3 +1,5 @@ +pub mod command; +pub mod wsmessage; use actix::Message as ActixMessage; #[derive(ActixMessage)] diff --git a/src/models/ws.rs b/src/messages/session/wsmessage.rs similarity index 63% rename from src/models/ws.rs rename to src/messages/session/wsmessage.rs index 5196761..8fcd9e8 100644 --- a/src/models/ws.rs +++ b/src/messages/session/wsmessage.rs @@ -1,6 +1,9 @@ +use actix::Message as ActixMessage; use serde::{Deserialize, Serialize}; +use std::convert::Into; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, ActixMessage)] +#[rtype(result = "()")] pub struct WsMessage { pub ty: MessageType, pub data: Option, @@ -13,6 +16,13 @@ impl WsMessage { data: Some(msg), } } + + pub fn info(msg: String) -> Self { + WsMessage { + ty: MessageType::Info, + data: Some(msg), + } + } } #[derive(Serialize, Deserialize)] @@ -22,10 +32,11 @@ pub enum MessageType { Leave, Msg, Err, + Info, } impl Into for WsMessage { fn into(self) -> String { serde_json::to_string(&self).unwrap() } -} +} \ No newline at end of file diff --git a/src/models/mod.rs b/src/models/mod.rs index 24e68a4..5873e17 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,5 +1,3 @@ -pub mod commands; -pub mod ws; use crate::actors::chat_server::ChatServer; use actix::Addr; use uuid::Uuid; diff --git a/src/routes/mod.rs b/src/routes/mod.rs index e9b7289..69c80fb 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,3 +1,13 @@ -pub mod ws; - +use crate::{actors::chat_session::WsChatSession, models::AppState}; +use actix_web::{get, web, HttpRequest, Responder}; +use actix_web_actors::ws; +#[get("/")] +pub async fn connect( + req: HttpRequest, + stream: web::Payload, + state: web::Data, +) -> impl Responder { + let chat = state.chat.clone(); + ws::start(WsChatSession::new(chat), &req, stream) +} diff --git a/src/routes/ws.rs b/src/routes/ws.rs deleted file mode 100644 index a2aa223..0000000 --- a/src/routes/ws.rs +++ /dev/null @@ -1,13 +0,0 @@ -use crate::{actors::chat_session::WsChatSession, models::AppState}; -use actix_web::{get, web, HttpRequest, Responder}; -use actix_web_actors::ws; - -#[get("/ws")] -pub async fn connect( - req: HttpRequest, - stream: web::Payload, - state: web::Data, -) -> impl Responder { - let chat = state.chat.clone(); - ws::start(WsChatSession::new(chat), &req, stream) -}