This commit is contained in:
Víctor Martínez 2021-03-07 18:49:30 +01:00
parent 7bef1322a3
commit a7b482be4a
16 changed files with 236 additions and 75 deletions

1
Cargo.lock generated
View file

@ -125,6 +125,7 @@ dependencies = [
"actix-web",
"actix-web-actors",
"derive_more",
"serde",
"serde_json",
"uuid",
]

View file

@ -5,9 +5,6 @@ authors = ["Víctor Martínez <victorcoder2@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "lib"
path = "src/lib.rs"
[dependencies]
actix = "0.10.0"
@ -15,4 +12,5 @@ actix-web = "3"
actix-web-actors = "3.0.0"
uuid = { version = "0.8", features = ["serde", "v4"] }
derive_more = "0.99.11"
serde_json = "1.0.64"
serde_json = "1.0.64"
serde = "1.0.124"

View file

@ -1,12 +1,11 @@
use actix::{Actor, Context, Handler, MessageResult, Recipient};
use std::collections::{HashMap, HashSet};
use uuid::Uuid;
use crate::messages::{
chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom},
chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave},
chat_session::Message,
};
use crate::models::{RoomId, SessionId};
use actix::{Actor, Context, Handler, MessageResult, Recipient};
use std::collections::{HashMap, HashSet};
use uuid::Uuid;
pub struct ChatServer {
sessions: HashMap<SessionId, Recipient<Message>>,
@ -32,6 +31,20 @@ impl ChatServer {
});
});
}
pub fn leave_rooms(&mut self, session_id: &Uuid) {
let mut rooms = Vec::new();
// remove session from all rooms
for (id, sessions) in &mut self.rooms {
if sessions.remove(&session_id) {
rooms.push(id.to_owned());
}
}
// send message to other users
for room in rooms {
self.send_message(&room, "Someone disconnected", &session_id);
}
}
}
impl Actor for ChatServer {
@ -63,12 +76,19 @@ impl Handler<Disconnect> for ChatServer {
}
}
impl Handler<Leave> for ChatServer {
type Result = ();
fn handle(&mut self, Leave { session }: Leave, _ctx: &mut Self::Context) -> Self::Result {
self.leave_rooms(&session);
}
}
impl Handler<ClientMessage> for ChatServer {
type Result = ();
fn handle(&mut self, msg: ClientMessage, _ctx: &mut Self::Context) -> Self::Result {
let ClientMessage { session, room, msg } = msg;
self.send_message(&room, &msg, &session);
}
}
@ -79,37 +99,32 @@ impl Handler<CreateRoom> for ChatServer {
fn handle(&mut self, msg: CreateRoom, _ctx: &mut Self::Context) -> Self::Result {
let CreateRoom { session } = msg;
let room_id = RoomId::new_v4();
self.leave_rooms(&session);
self.rooms.insert(
room_id,
vec![session].into_iter().collect::<HashSet<Uuid>>(),
);
println!("{:?}", self.rooms);
MessageResult(room_id)
}
}
impl Handler<JoinRoom> for ChatServer {
type Result = ();
type Result = MessageResult<JoinRoom>;
fn handle(
&mut self,
JoinRoom { session, room }: JoinRoom,
_ctx: &mut Self::Context,
) -> Self::Result {
let mut rooms = Vec::new();
self.leave_rooms(&session);
// remove session from all rooms
for (id, sessions) in &mut self.rooms {
if sessions.remove(&session) {
rooms.push(id.to_owned());
}
}
// send message to other users
for room in rooms {
self.send_message(&room, "Someone disconnected", &session);
}
self.rooms
let result: Result<(), String> = self
.rooms
.get_mut(&room)
.map(|sessions| sessions.insert(session))
.map(|_| self.send_message(&room, "Someone connected", &session));
.map(|_| self.send_message(&room, "Someone connected", &session))
.ok_or("The room doesn't exists".into());
MessageResult(result)
}
}

View file

@ -1,13 +1,14 @@
use std::str::FromStr;
use crate::messages::{
chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave},
chat_session::Message,
};
use crate::{
actors::chat_server::ChatServer,
commands::Command,
messages::{
chat_server::{ClientMessage, Connect, Disconnect},
chat_session::Message,
models::{
commands::Command,
ws::{MessageType, WsMessage},
RoomId, SessionId,
},
models::{RoomId, SessionId},
};
use actix::{
fut, ActorContext, ActorFuture, ContextFutureSpawner, Handler, Running, StreamHandler,
@ -15,6 +16,8 @@ use actix::{
};
use actix::{Actor, Addr, AsyncContext};
use actix_web_actors::ws::{self, WebsocketContext};
use std::str::FromStr;
use uuid::Uuid;
pub struct WsChatSession {
pub id: Option<SessionId>,
@ -27,7 +30,18 @@ impl WsChatSession {
WsChatSession {
id: None,
room: None,
addr: addr,
addr,
}
}
pub fn handle_msg(&self, msg: WsMessage, ctx: &mut WebsocketContext<Self>) {
let data = msg.data.unwrap_or("".into());
match msg.ty {
MessageType::Create => self.create(ctx),
MessageType::Join => self.join(data, ctx),
MessageType::Msg => self.msg(data, ctx),
MessageType::Leave => self.leave(ctx),
MessageType::Err => (),
}
}
@ -37,11 +51,106 @@ impl WsChatSession {
self.addr.do_send(ClientMessage {
session: self.id.clone().unwrap(),
room: self.room.clone().unwrap(),
msg: msg,
msg,
});
}
}
}
fn create(&self, ctx: &mut WebsocketContext<Self>) {
self.addr
.send(CreateRoom {
session: self.id.clone().unwrap(),
})
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(res) => {
act.room = Some(res.clone());
ctx.text(WsMessage {
ty: MessageType::Create,
data: Some(res.to_string()),
});
}
// something is wrong with chat server
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
fn join(&self, room_id: String, ctx: &mut WebsocketContext<Self>) {
match Uuid::from_str(&room_id) {
Ok(uuid) => {
self.addr
.send(JoinRoom {
room: uuid,
session: self.id.clone().unwrap(),
})
.into_actor(self)
.then(move |res, act, ctx| {
match res {
Ok(res) => match res {
Ok(_) => {
act.room = Some(uuid.clone());
ctx.text(WsMessage {
ty: MessageType::Msg,
data: Some("Joined!".into()),
})
}
Err(err) => ctx.text(WsMessage::err(err.to_string())),
},
// something is wrong with chat server
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
Err(err) => ctx.text(WsMessage::err(err.to_string())),
}
}
fn msg(&self, msg: String, ctx: &mut WebsocketContext<Self>) {
match Command::from_str(&msg) {
Ok(cmd) if self.room.is_some() => self.execute(cmd, ctx),
Ok(_) => ctx.text(WsMessage::err("You are not in a room yet".into())),
Err(err) => ctx.text(WsMessage::err(err.to_string())),
}
}
fn leave(&self, ctx: &mut WebsocketContext<Self>) {
self.addr
.send(Leave {
session: self.id.clone().unwrap(),
})
.into_actor(self)
.then(move |res, act, ctx| {
match res {
Ok(_) => {
act.room = None;
ctx.text(WsMessage {
ty: MessageType::Leave,
data: Some("Room leaved".into()),
})
}
// something is wrong with chat server
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx);
}
}
impl Actor for WsChatSession {
@ -58,7 +167,10 @@ impl Actor for WsChatSession {
match res {
Ok(res) => act.id = Some(res),
// something is wrong with chat server
_ => ctx.stop(),
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
}
}
fut::ready(())
})
@ -86,17 +198,17 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match item {
Ok(msg) => msg,
_ => {
Err(err) => {
ctx.text(WsMessage::err(err.to_string()));
ctx.stop();
return;
}
};
match msg {
// TODO: Deserialize string to json first, then check action type
ws::Message::Text(msg) => match Command::from_str(&msg) {
Ok(cmd) => self.execute(cmd, ctx),
Err(err) => ctx.text(err.to_string()),
ws::Message::Text(msg) => match serde_json::from_str::<WsMessage>(&msg) {
Ok(content) => self.handle_msg(content, ctx),
Err(err) => ctx.text(WsMessage::err(err.to_string())),
},
ws::Message::Close(reason) => {
ctx.close(reason);

View file

@ -1,2 +1,2 @@
pub mod chat_server;
pub mod chat_session;
pub mod chat_session;

View file

@ -1,10 +0,0 @@
use actix_web::{App, HttpServer};
use lib::server::init;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().configure(init))
.bind("127.0.0.1:8080")?
.run()
.await
}

View file

@ -1,6 +0,0 @@
pub mod routes;
pub mod models;
pub mod messages;
pub mod commands;
pub mod server;
pub mod actors;

22
src/main.rs Normal file
View file

@ -0,0 +1,22 @@
mod actors;
mod messages;
mod models;
mod routes;
mod server;
use crate::{actors::chat_server::ChatServer, models::AppState, server::init};
use actix::Actor;
use actix_web::{App, HttpServer};
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let chat = ChatServer::new().start();
HttpServer::new(move || {
App::new()
.data(AppState { chat: chat.clone() })
.configure(init)
})
.bind("127.0.0.1:8080")?
.run()
.await
}

View file

@ -1,10 +1,8 @@
use super::chat_session::Message;
use crate::models::{RoomId, SessionId};
use actix::{Message as ActixMessage, Recipient};
use uuid::Uuid;
use crate::models::{RoomId, SessionId};
use super::chat_session::Message;
#[derive(ActixMessage)]
#[rtype(result = "()")]
pub struct ClientMessage {
@ -19,7 +17,7 @@ pub struct CreateRoom {
pub session: SessionId,
}
#[derive(ActixMessage)]
#[rtype(result = "()")]
#[rtype(result = "Result<(), String>")]
pub struct JoinRoom {
pub session: SessionId,
pub room: RoomId,
@ -36,3 +34,9 @@ pub struct Connect {
pub struct Disconnect {
pub session: SessionId,
}
#[derive(ActixMessage)]
#[rtype(result = "()")]
pub struct Leave {
pub session: SessionId,
}

View file

@ -1,2 +1,2 @@
pub mod chat_server;
pub mod chat_session;
pub mod chat_session;

View file

@ -1,13 +1,12 @@
pub mod ws_messages;
pub mod commands;
pub mod ws;
use crate::actors::chat_server::ChatServer;
use actix::Addr;
use uuid::Uuid;
use crate::actors::chat_server::ChatServer;
pub type SessionId = Uuid;
pub type RoomId = Uuid;
pub struct AppState {
pub chat: Addr<ChatServer>,
}

31
src/models/ws.rs Normal file
View file

@ -0,0 +1,31 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct WsMessage {
pub ty: MessageType,
pub data: Option<String>,
}
impl WsMessage {
pub fn err(msg: String) -> Self {
WsMessage {
ty: MessageType::Err,
data: Some(msg),
}
}
}
#[derive(Serialize, Deserialize)]
pub enum MessageType {
Join,
Create,
Leave,
Msg,
Err,
}
impl Into<String> for WsMessage {
fn into(self) -> String {
serde_json::to_string(&self).unwrap()
}
}

View file

@ -1,8 +1,7 @@
use crate::{actors::chat_session::WsChatSession, models::AppState};
use actix_web::{web, HttpRequest, Responder};
use actix_web_actors::ws;
use crate::{actors::chat_session::WsChatSession, models::AppState};
pub async fn connect(
req: HttpRequest,
stream: web::Payload,

View file

@ -1,10 +1,6 @@
use crate::{actors::chat_server::ChatServer, models::AppState, routes::ws::connect};
use actix::Actor;
use crate::routes::ws::connect;
use actix_web::web;
pub fn init(app: &mut web::ServiceConfig) {
let chat = ChatServer::new().start();
app.data(AppState { chat })
.service(web::resource("/ws/").to(connect));
app.service(web::resource("/ws").to(connect));
}