add heartbeat to websocket session

This commit is contained in:
Víctor Martínez 2021-03-09 00:26:39 +01:00
parent 90771c7c26
commit f636422043
3 changed files with 43 additions and 6 deletions

View file

@ -1,18 +1,22 @@
use crate::messages::{
chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave},
chat_session::Message,
};
use crate::{
actors::chat_server::ChatServer,
constants::CLIENT_TIMEOUT,
models::{
commands::Command,
ws::{MessageType, WsMessage},
RoomId, SessionId,
},
};
use crate::{
constants::HEARTBEAT_INTERVAL,
messages::{
chat_server::{ClientMessage, Connect, CreateRoom, Disconnect, JoinRoom, Leave},
chat_session::Message,
},
};
use actix::{
fut, ActorContext, ActorFuture, ContextFutureSpawner, Handler, Running, StreamHandler,
WrapFuture,
clock::Instant, fut, ActorContext, ActorFuture, ContextFutureSpawner, Handler, Running,
StreamHandler, WrapFuture,
};
use actix::{Actor, Addr, AsyncContext};
use actix_web_actors::ws::{self, WebsocketContext};
@ -23,6 +27,7 @@ pub struct WsChatSession {
pub id: SessionId,
pub room: Option<RoomId>,
pub addr: Addr<ChatServer>,
pub hb: Instant,
}
impl WsChatSession {
@ -30,10 +35,27 @@ impl WsChatSession {
WsChatSession {
id: Uuid::new_v4(),
room: None,
hb: Instant::now(),
addr,
}
}
fn hb(&self, ctx: &mut WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");
// notify chat server
act.addr.do_send(Disconnect { session: act.id });
// stop actor
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(b"");
});
}
pub fn handle_msg(&self, msg: WsMessage, ctx: &mut WebsocketContext<Self>) {
let data = msg.data.unwrap_or("".into());
match msg.ty {
@ -157,6 +179,7 @@ impl Actor for WsChatSession {
type Context = WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
let addr = ctx.address();
self.addr
.send(Connect {
@ -207,6 +230,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
Ok(content) => self.handle_msg(content, ctx),
Err(err) => ctx.text(WsMessage::err(err.to_string())),
},
ws::Message::Ping(msg) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
ws::Message::Pong(_) => {
self.hb = Instant::now();
}
ws::Message::Close(reason) => {
ctx.close(reason);
ctx.stop();

6
src/constants.rs Normal file
View file

@ -0,0 +1,6 @@
use std::time::Duration;
/// How often heartbeat pings are sent
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
pub const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

View file

@ -3,6 +3,7 @@ mod messages;
mod models;
mod routes;
mod server;
mod constants;
use crate::{actors::chat_server::ChatServer, models::AppState, server::init};
use actix::Actor;