This commit is contained in:
Víctor Martínez 2021-03-21 19:41:02 +01:00
commit 4e0ca5821e
11 changed files with 167 additions and 156 deletions

View file

@ -1,6 +1,6 @@
use crate::messages::{
chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave},
chat_session::Message,
server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave},
session::Message,
};
use crate::models::{RoomId, SessionId};
use actix::{Actor, Context, Handler, MessageResult, Recipient};
@ -63,7 +63,7 @@ impl Actor for ChatServer {
impl Handler<Connect> for ChatServer {
type Result = ();
fn handle(&mut self, msg: Connect, _ctx: &mut Self::Context) -> Self::Result {
let Connect { id, addr } = msg;
self.sessions.insert(id, addr);

View file

@ -1,17 +1,17 @@
use crate::{
actors::chat_server::ChatServer,
constants::CLIENT_TIMEOUT,
models::{
commands::Command,
ws::{MessageType, WsMessage},
RoomId, SessionId,
},
models::{RoomId, SessionId},
};
use crate::{
constants::HEARTBEAT_INTERVAL,
messages::{
chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave},
chat_session::Message,
server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave},
session::{
command::Command,
wsmessage::{MessageType, WsMessage},
Message,
},
},
};
use actix::{
@ -55,124 +55,6 @@ impl WsChatSession {
ctx.ping(b"");
});
}
pub fn handle_msg(&self, msg: WsMessage, ctx: &mut WebsocketContext<Self>) {
let data = msg.data.unwrap_or("".into());
match msg.ty {
MessageType::Create => self.create(ctx),
MessageType::Join => self.join(data, ctx),
MessageType::Msg => self.msg(data, ctx),
MessageType::Leave => self.leave(ctx),
MessageType::Err => (),
}
}
pub fn execute(&self, cmd: Command, _ctx: &mut WebsocketContext<Self>) {
match cmd {
Command::Msg(msg) => {
self.addr.do_send(ClientMessage {
session: self.id.clone(),
room: self.room.clone().unwrap(),
msg,
});
}
}
}
fn create(&self, ctx: &mut WebsocketContext<Self>) {
self.addr
.send(CreateRoom {
session: self.id.clone(),
})
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(res) => {
act.room = Some(res.clone());
ctx.text(WsMessage {
ty: MessageType::Create,
data: Some(res.to_string()),
});
}
// something is wrong with chat server
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
fn join(&self, room_id: String, ctx: &mut WebsocketContext<Self>) {
match Uuid::from_str(&room_id) {
Ok(uuid) => {
self.addr
.send(JoinRoom {
room: uuid,
session: self.id.clone(),
})
.into_actor(self)
.then(move |res, act, ctx| {
match res {
Ok(res) => match res {
Ok(_) => {
act.room = Some(uuid.clone());
ctx.text(WsMessage {
ty: MessageType::Msg,
data: Some("Joined!".into()),
})
}
Err(err) => ctx.text(WsMessage::err(err.to_string())),
},
// something is wrong with chat server
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
Err(err) => ctx.text(WsMessage::err(err.to_string())),
}
}
fn msg(&self, msg: String, ctx: &mut WebsocketContext<Self>) {
match Command::from_str(&msg) {
Ok(cmd) if self.room.is_some() => self.execute(cmd, ctx),
Ok(_) => ctx.text(WsMessage::err("You are not in a room yet".into())),
Err(err) => ctx.text(WsMessage::err(err.to_string())),
}
}
fn leave(&self, ctx: &mut WebsocketContext<Self>) {
self.addr
.send(Leave {
session: self.id.clone(),
})
.into_actor(self)
.then(move |res, act, ctx| {
match res {
Ok(_) => {
act.room = None;
ctx.text(WsMessage {
ty: MessageType::Leave,
data: Some("Room leaved".into()),
})
}
// something is wrong with chat server
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
}
impl Actor for WsChatSession {
@ -227,7 +109,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
match msg {
ws::Message::Text(msg) => match serde_json::from_str::<WsMessage>(&msg) {
Ok(content) => self.handle_msg(content, ctx),
Ok(content) => ctx.notify(content),
Err(err) => ctx.text(WsMessage::err(err.to_string())),
},
ws::Message::Ping(msg) => {
@ -245,3 +127,121 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
}
}
}
impl Handler<WsMessage> for WsChatSession {
type Result = ();
fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) -> Self::Result {
let data = msg.data.unwrap_or("".into());
match msg.ty {
MessageType::Create => self.create(ctx),
MessageType::Join => match Uuid::from_str(&data) {
Ok(uuid) => self.join(uuid, ctx),
Err(err) => ctx.text(WsMessage::err(err.to_string())),
},
MessageType::Msg => self.msg(data, ctx),
MessageType::Leave => self.leave(ctx),
_ => (),
}
}
}
impl WsChatSession {
fn create(&self, ctx: &mut WebsocketContext<Self>) {
let send_create = self.addr.send(CreateRoom {
session: self.id.clone(),
});
let send_create = send_create.into_actor(self);
send_create
.then(move |res, act, ctx| {
// Actor's state updated here
match res {
Ok(res) => {
act.room = Some(res.clone());
ctx.text(WsMessage::info(res.to_string()));
}
// something is wrong with chat server
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
fn join(&self, room_id: Uuid, ctx: &mut WebsocketContext<Self>) {
let join_room = self.addr.send(JoinRoom {
room: room_id,
session: self.id.clone(),
});
let join_room = join_room.into_actor(self);
join_room
.then(move |response, act, ctx| {
match response {
Ok(res) if res.is_ok() => {
act.room = Some(room_id.clone());
ctx.text(WsMessage {
ty: MessageType::Msg,
data: Some("Joined!".into()),
})
}
Ok(res) => ctx.text(WsMessage::err(res.unwrap_err().to_string())),
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
fn msg(&self, msg: String, ctx: &mut WebsocketContext<Self>) {
match Command::from_str(&msg) {
Ok(cmd) if self.room.is_some() => ctx.notify(cmd),
Ok(_) => ctx.text(WsMessage::err("You are not in a room yet".into())),
Err(err) => ctx.text(WsMessage::err(err.to_string())),
}
}
fn leave(&self, ctx: &mut WebsocketContext<Self>) {
self.addr
.send(Leave {
session: self.id.clone(),
})
.into_actor(self)
.then(move |res, act, ctx| {
match res {
Ok(_) => {
act.room = None;
ctx.text(WsMessage::info("Room leaved".into()))
}
// something is wrong with chat server
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
}
impl Handler<Command> for WsChatSession {
type Result = ();
fn handle(&mut self, msg: Command, _ctx: &mut Self::Context) -> Self::Result {
match msg {
Command::Msg(msg) => {
self.addr.do_send(ClientMessage {
session: self.id.clone(),
room: self.room.clone().unwrap(),
msg,
});
}
}
}
}

View file

@ -7,7 +7,7 @@ mod routes;
use crate::{actors::chat_server::ChatServer, models::AppState};
use actix::Actor;
use actix_web::{App, HttpServer};
use routes::ws::connect;
use routes::connect;
#[actix_web::main]
async fn main() -> std::io::Result<()> {

View file

@ -1,2 +1,2 @@
pub mod chat_server;
pub mod chat_session;
pub mod server;
pub mod session;

View file

@ -1,4 +1,4 @@
use super::chat_session::Message;
use super::session::Message;
use crate::models::{RoomId, SessionId};
use actix::{Message as ActixMessage, Recipient};
use uuid::Uuid;

View file

@ -1,17 +1,20 @@
use actix::Message as ActixMessage;
use derive_more::{Display, Error};
use std::convert::Into;
use std::str::FromStr;
#[derive(ActixMessage)]
#[rtype(result = "()")]
pub enum Command {
Msg(String),
}
#[derive(Debug, Display, Error)]
#[display(fmt = "Invalid command: {}", msg)]
pub struct CommandError {
msg: &'static str,
}
pub enum Command {
Msg(String),
}
// TODO: IMPLEMENT MORE COMMANDS
impl FromStr for Command {
type Err = CommandError;

View file

@ -1,3 +1,5 @@
pub mod command;
pub mod wsmessage;
use actix::Message as ActixMessage;
#[derive(ActixMessage)]

View file

@ -1,6 +1,9 @@
use actix::Message as ActixMessage;
use serde::{Deserialize, Serialize};
use std::convert::Into;
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, ActixMessage)]
#[rtype(result = "()")]
pub struct WsMessage {
pub ty: MessageType,
pub data: Option<String>,
@ -13,6 +16,13 @@ impl WsMessage {
data: Some(msg),
}
}
pub fn info(msg: String) -> Self {
WsMessage {
ty: MessageType::Info,
data: Some(msg),
}
}
}
#[derive(Serialize, Deserialize)]
@ -22,10 +32,11 @@ pub enum MessageType {
Leave,
Msg,
Err,
Info,
}
impl Into<String> for WsMessage {
fn into(self) -> String {
serde_json::to_string(&self).unwrap()
}
}
}

View file

@ -1,5 +1,3 @@
pub mod commands;
pub mod ws;
use crate::actors::chat_server::ChatServer;
use actix::Addr;
use uuid::Uuid;

View file

@ -1,3 +1,13 @@
pub mod ws;
use crate::{actors::chat_session::WsChatSession, models::AppState};
use actix_web::{get, web, HttpRequest, Responder};
use actix_web_actors::ws;
#[get("/")]
pub async fn connect(
req: HttpRequest,
stream: web::Payload,
state: web::Data<AppState>,
) -> impl Responder {
let chat = state.chat.clone();
ws::start(WsChatSession::new(chat), &req, stream)
}

View file

@ -1,13 +0,0 @@
use crate::{actors::chat_session::WsChatSession, models::AppState};
use actix_web::{get, web, HttpRequest, Responder};
use actix_web_actors::ws;
#[get("/ws")]
pub async fn connect(
req: HttpRequest,
stream: web::Payload,
state: web::Data<AppState>,
) -> impl Responder {
let chat = state.chat.clone();
ws::start(WsChatSession::new(chat), &req, stream)
}