added pingpong to websockets
This commit is contained in:
@@ -36,7 +36,7 @@ pub struct Cli {
|
|||||||
port: String,
|
port: String,
|
||||||
|
|
||||||
/// Database URL
|
/// Database URL
|
||||||
#[arg(short, long, default_value = "localhost:5432")]
|
#[arg(short, long, default_value = "0.0.0.0:5432")]
|
||||||
database: String,
|
database: String,
|
||||||
|
|
||||||
/// Data directory path
|
/// Data directory path
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ use axum::{
|
|||||||
response::IntoResponse,
|
response::IntoResponse,
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::select;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::auth::{create_jwt, verify_jwt};
|
use crate::auth::{create_jwt, verify_jwt};
|
||||||
@@ -115,13 +117,56 @@ async fn handle_socket(
|
|||||||
mut socket: WebSocket,
|
mut socket: WebSocket,
|
||||||
mut receiver: tokio::sync::broadcast::Receiver<crate::routes::messages::Message>,
|
mut receiver: tokio::sync::broadcast::Receiver<crate::routes::messages::Message>,
|
||||||
) {
|
) {
|
||||||
while let Ok(msg) = receiver.recv().await {
|
let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
|
||||||
if socket
|
|
||||||
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap().into()))
|
loop {
|
||||||
.await
|
select! {
|
||||||
.is_err()
|
// Receive broadcast messages and send to client
|
||||||
{
|
msg = receiver.recv() => {
|
||||||
|
if let Ok(msg) = msg {
|
||||||
|
if let Ok(json) = serde_json::to_string(&msg) {
|
||||||
|
if socket.send(WsMessage::Text(json.into())).await.is_err() {
|
||||||
|
tracing::warn!("WebSocket send failed, closing socket");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send Ping
|
||||||
|
_ = ping_interval.tick() => {
|
||||||
|
if socket.send(WsMessage::Ping(vec![].into())).await.is_err() {
|
||||||
|
tracing::error!("Failed to send ping, closing connection");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!("Ping sent successfully");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get incoming messages from client
|
||||||
|
client_msg = socket.recv() => {
|
||||||
|
match client_msg {
|
||||||
|
Some(Ok(msg)) => {
|
||||||
|
match msg {
|
||||||
|
WsMessage::Pong(_) => {
|
||||||
|
tracing::info!("Received Pong");
|
||||||
|
}
|
||||||
|
WsMessage::Ping(_) => {
|
||||||
|
tracing::info!("Received Ping from client");
|
||||||
|
}
|
||||||
|
// WsMessage::Text(_) => {}
|
||||||
|
WsMessage::Close(_) => {
|
||||||
|
tracing::info!("Client disconnected");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user