diff --git a/src/actors/chat_session.rs b/src/actors/chat_session.rs index 9ca4feb..7a12092 100644 --- a/src/actors/chat_session.rs +++ b/src/actors/chat_session.rs @@ -1,18 +1,22 @@ -use crate::messages::{ - chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave}, - chat_session::Message, -}; use crate::{ actors::chat_server::ChatServer, + constants::CLIENT_TIMEOUT, models::{ commands::Command, ws::{MessageType, WsMessage}, RoomId, SessionId, }, }; +use crate::{ + constants::HEARTBEAT_INTERVAL, + messages::{ + chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave}, + chat_session::Message, + }, +}; use actix::{ - fut, ActorContext, ActorFuture, ContextFutureSpawner, Handler, Running, StreamHandler, - WrapFuture, + clock::Instant, fut, ActorContext, ActorFuture, ContextFutureSpawner, Handler, Running, + StreamHandler, WrapFuture, }; use actix::{Actor, Addr, AsyncContext}; use actix_web_actors::ws::{self, WebsocketContext}; @@ -23,6 +27,7 @@ pub struct WsChatSession { pub id: SessionId, pub room: Option, pub addr: Addr, + pub hb: Instant, } impl WsChatSession { @@ -30,10 +35,27 @@ impl WsChatSession { WsChatSession { id: Uuid::new_v4(), room: None, + hb: Instant::now(), addr, } } + fn hb(&self, ctx: &mut WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + // notify chat server + act.addr.do_send(Disconnect { session: act.id }); + // stop actor + ctx.stop(); + // don't try to send a ping + return; + } + ctx.ping(b""); + }); + } + pub fn handle_msg(&self, msg: WsMessage, ctx: &mut WebsocketContext) { let data = msg.data.unwrap_or("".into()); match msg.ty { @@ -157,6 +179,7 @@ impl Actor for WsChatSession { type Context = WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { + self.hb(ctx); let addr = ctx.address(); self.addr .send(Connect { @@ -207,6 +230,13 @@ impl StreamHandler> for WsChatSession { Ok(content) => self.handle_msg(content, ctx), Err(err) => ctx.text(WsMessage::err(err.to_string())), }, + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.hb = Instant::now(); + } ws::Message::Close(reason) => { ctx.close(reason); ctx.stop(); diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..037d388 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,6 @@ +use std::time::Duration; + +/// How often heartbeat pings are sent +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a timeout +pub const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 1324d21..e3d5d1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod messages; mod models; mod routes; mod server; +mod constants; use crate::{actors::chat_server::ChatServer, models::AppState, server::init}; use actix::Actor;