Skip to main content

rpfm_server/
session.rs

1//---------------------------------------------------------------------------//
2// Copyright (c) 2017-2026 Ismael Gutiérrez González. All rights reserved.
3//
4// This file is part of the Rusted PackFile Manager (RPFM) project,
5// which can be found here: https://github.com/Frodo45127/rpfm.
6//
7// This file is licensed under the MIT license, which can be found here:
8// https://github.com/Frodo45127/rpfm/blob/master/LICENSE.
9//---------------------------------------------------------------------------//
10
11//! Per-client session state and lifecycle.
12//!
13//! Each WebSocket connection (and each MCP client) is wrapped in a [`Session`]
14//! managed by a [`SessionManager`]. Sessions are isolated: open packs in one
15//! session aren't visible from another, and each one owns a dedicated
16//! background thread (see [`crate::background_thread`]) that processes its
17//! commands serially.
18//!
19//! ## Lifecycle
20//!
21//! 1. **Create.** A new session gets a unique [`SessionId`] from the manager
22//!    plus a fresh background thread spawned on the `tokio` runtime.
23//! 2. **Connect / disconnect.** Clients increment [`Session::connect`] on
24//!    attach and [`Session::disconnect`] on detach. The connection count is
25//!    what the timeout logic watches.
26//! 3. **Reconnect.** A client can pass its previous `session_id` back on the
27//!    next WebSocket handshake to adopt the same session and recover its
28//!    in-memory state. See [`SessionManager::get_or_create_session`].
29//! 4. **Timeout.** When the connection count drops to zero, the session
30//!    enters a [`DEFAULT_SESSION_TIMEOUT_SECS`]-long grace period. Reconnects
31//!    cancel the timeout; otherwise the cleanup task removes the session and
32//!    its background thread exits.
33//! 5. **Empty manager → process exit.** When the last session is removed the
34//!    server process terminates, so no orphaned backend lingers in the
35//!    background.
36
37use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
38use tokio::time::{Duration, Instant};
39
40use std::collections::HashMap;
41use std::sync::{Arc, Mutex, RwLock, atomic::{AtomicBool, AtomicU32, Ordering}};
42
43use rpfm_ipc::helpers::SessionInfo;
44use rpfm_ipc::messages::{Command, Response};
45use rpfm_telemetry::info;
46
47use crate::background_thread;
48
49/// Error messages for session communication.
50pub const SESSION_SENDER_ERROR: &str = "Error in session communication system. Sender failed to send message.";
51
52/// Default session timeout in seconds (5 minutes).
53pub const DEFAULT_SESSION_TIMEOUT_SECS: u64 = 300;
54
55//-------------------------------------------------------------------------------//
56//                              Enums & Structs
57//-------------------------------------------------------------------------------//
58
59/// Unique identifier for a session.
60pub type SessionId = u64;
61
62/// Manages all active sessions.
63///
64/// Provides thread-safe access to create, retrieve, and remove sessions.
65/// Sessions persist for a configurable timeout after all clients disconnect.
66pub struct SessionManager {
67
68    /// Map of session IDs to managed sessions.
69    sessions: Mutex<HashMap<SessionId, ManagedSession>>,
70
71    /// Counter for generating unique session IDs.
72    next_id: Mutex<SessionId>,
73
74    /// Session timeout duration.
75    timeout: Duration,
76}
77
78/// Internal state for a managed session.
79struct ManagedSession {
80
81    /// The session itself.
82    session: Arc<Session>,
83
84    /// When the last client disconnected (None if clients are connected).
85    disconnected_at: Option<Instant>,
86}
87
88/// A session represents a single client's connection state.
89///
90/// Each session has its own background thread for processing commands,
91/// ensuring complete isolation between clients.
92pub struct Session {
93
94    /// Unique identifier for this session.
95    id: SessionId,
96
97    /// Sender to communicate with this session's background thread.
98    sender: UnboundedSender<(UnboundedSender<Response>, Command)>,
99
100    /// Number of active connections using this session.
101    connection_count: AtomicU32,
102
103    /// Whether this session has been marked for shutdown.
104    shutdown_requested: AtomicBool,
105
106    /// Names of the pack files currently open in this session.
107    pack_names: RwLock<Vec<String>>,
108}
109
110//-------------------------------------------------------------------------------//
111//                             Implementations
112//-------------------------------------------------------------------------------//
113
114impl Session {
115
116    /// Create a new session with its own background thread.
117    pub fn new(id: SessionId) -> Arc<Self> {
118        let (sender, receiver) = unbounded_channel();
119
120        let session = Arc::new(Self {
121            id,
122            sender,
123            connection_count: AtomicU32::new(0),
124            shutdown_requested: AtomicBool::new(false),
125            pack_names: RwLock::new(Vec::new()),
126        });
127
128        // Spawn a dedicated background thread for this session.
129        let session_clone = session.clone();
130        tokio::spawn(async move {
131            info!("Session {} background thread starting...", id);
132            background_thread::background_loop(receiver, session_clone).await;
133            info!("Session {} background thread terminated.", id);
134        });
135
136        session
137    }
138
139    /// Get the session ID.
140    pub fn id(&self) -> SessionId {
141        self.id
142    }
143
144    /// Increment the connection count.
145    pub fn connect(&self) {
146        self.connection_count.fetch_add(1, Ordering::SeqCst);
147    }
148
149    /// Decrement the connection count.
150    pub fn disconnect(&self) {
151        self.connection_count.fetch_sub(1, Ordering::SeqCst);
152    }
153
154    /// Get the current connection count.
155    pub fn connection_count(&self) -> u32 {
156        self.connection_count.load(Ordering::SeqCst)
157    }
158
159    /// Check if shutdown has been requested.
160    pub fn is_shutdown_requested(&self) -> bool {
161        self.shutdown_requested.load(Ordering::SeqCst)
162    }
163
164    /// Get the pack names for this session.
165    pub fn pack_names(&self) -> Vec<String> {
166        self.pack_names.read().unwrap().clone()
167    }
168
169    /// Add a pack name to this session.
170    pub fn add_pack_name(&self, name: &str) {
171        let mut names = self.pack_names.write().unwrap();
172        if !names.contains(&name.to_string()) {
173            names.push(name.to_string());
174        }
175    }
176
177    /// Remove a pack name from this session.
178    pub fn remove_pack_name(&self, name: &str) {
179        let mut names = self.pack_names.write().unwrap();
180        names.retain(|n| n != name);
181    }
182
183    /// Shutdown this session by sending an Exit command.
184    pub fn shutdown(&self) {
185        info!("Session {} shutting down...", self.id);
186
187        if self.shutdown_requested.swap(true, Ordering::SeqCst) {
188            info!("Session {} already marked for shutdown before...", self.id);
189            return;
190        }
191
192        // Send exit command - ignore errors if channel is already closed.
193        let (sender_back, _) = unbounded_channel();
194        let _ = self.sender.send((sender_back, Command::Exit));
195    }
196
197    /// Send a command to this session's background thread.
198    ///
199    /// Returns a receiver to get the response.
200    pub fn send(&self, command: Command) -> UnboundedReceiver<Response> {
201        let (sender_back, receiver_back) = unbounded_channel();
202        if let Err(error) = self.sender.send((sender_back, command)) {
203            panic!("{SESSION_SENDER_ERROR}: {error}");
204        }
205        receiver_back
206    }
207}
208
209impl Default for SessionManager {
210    fn default() -> Self {
211        Self {
212            sessions: Mutex::new(HashMap::new()),
213            next_id: Mutex::new(1),
214            timeout: Duration::from_secs(DEFAULT_SESSION_TIMEOUT_SECS),
215        }
216    }
217}
218
219impl SessionManager {
220
221    /// Create a new session and return a reference to it.
222    pub fn create_session(&self) -> Arc<Session> {
223        let id = {
224            let mut next_id = self.next_id.lock().unwrap();
225            let id = *next_id;
226            *next_id += 1;
227            id
228        };
229
230        let session = Session::new(id);
231        session.connect();
232
233        self.sessions.lock().unwrap().insert(id, ManagedSession {
234            session: session.clone(),
235            disconnected_at: None,
236        });
237
238        info!("Created new session with ID: {}", id);
239        session
240    }
241
242    /// Get an existing session by ID, or create a new one if the ID doesn't exist.
243    ///
244    /// If `session_id` is `Some`, attempts to retrieve that session.
245    /// If the session doesn't exist or `session_id` is `None`, creates a new session.
246    ///
247    /// Returns the session and whether it was newly created.
248    pub fn get_or_create_session(&self, session_id: Option<SessionId>) -> (Arc<Session>, bool) {
249        if let Some(id) = session_id {
250            let mut sessions = self.sessions.lock().unwrap();
251            if let Some(managed) = sessions.get_mut(&id) {
252
253                // Check if the session is still valid (not shut down).
254                if !managed.session.is_shutdown_requested() {
255                    managed.session.connect();
256                    managed.disconnected_at = None;
257                    info!("Client reconnected to existing session {}", id);
258                    return (managed.session.clone(), false);
259                }
260            }
261        }
262
263        // Either no session_id provided, or session not found/invalid.
264        // Create a new session.
265        (self.create_session(), true)
266    }
267
268    /// Get a session by ID without incrementing the connection count.
269    pub fn get_session(&self, id: SessionId) -> Option<Arc<Session>> {
270        let sessions = self.sessions.lock().unwrap();
271        sessions.get(&id).map(|m| m.session.clone())
272    }
273
274    /// Mark a session as disconnected by a client.
275    ///
276    /// If no more clients are connected, starts the timeout countdown.
277    /// The session will be removed after the timeout unless a client reconnects.
278    pub fn client_disconnected(manager: Arc<Self>, id: SessionId) {
279        let should_schedule_cleanup = {
280            let mut sessions = manager.sessions.lock().unwrap();
281            if let Some(managed) = sessions.get_mut(&id) {
282                managed.session.disconnect();
283
284                if managed.session.connection_count() == 0 {
285                    managed.disconnected_at = Some(Instant::now());
286                    info!("Session {} has no active connections, will timeout in {:?}", id, manager.timeout);
287                    true
288                } else {
289                    false
290                }
291            } else {
292                false
293            }
294        };
295
296        if should_schedule_cleanup {
297            Self::schedule_cleanup(manager.clone(), id);
298        }
299    }
300
301    /// Schedule a cleanup check for a session after the timeout period.
302    fn schedule_cleanup(manager: Arc<Self>, id: SessionId) {
303        let timeout = manager.timeout;
304        let manager = manager.clone();
305
306        tokio::spawn(async move {
307            tokio::time::sleep(timeout).await;
308            info!("Session {} timeout check triggered (cleanup handled by manager)", id);
309            manager.remove_session(id);
310
311            // Check if this was the last session and shutdown the server if so.
312            if manager.session_count() == 0 {
313                info!("No more active sessions, shutting down server...");
314                std::process::exit(0);
315            }
316        });
317    }
318
319    /// Perform cleanup of expired sessions.
320    ///
321    /// This should be called periodically or after timeout events.
322    pub fn cleanup_expired_sessions(&self) {
323        let now = Instant::now();
324        let mut to_remove = Vec::new();
325
326        {
327            let sessions = self.sessions.lock().unwrap();
328            for (id, managed) in sessions.iter() {
329                if let Some(disconnected_at) = managed.disconnected_at {
330                    if now.duration_since(disconnected_at) >= self.timeout
331                        && managed.session.connection_count() == 0
332                    {
333                        to_remove.push(*id);
334                    }
335                }
336            }
337        }
338
339        for id in to_remove {
340            self.remove_session(id);
341        }
342    }
343
344    /// Remove a session immediately.
345    pub fn remove_session(&self, id: SessionId) -> Option<Arc<Session>> {
346        let mut sessions = self.sessions.lock().unwrap();
347        if let Some(managed) = sessions.remove(&id) {
348            info!("Removing session {}", id);
349            managed.session.shutdown();
350            Some(managed.session)
351        } else {
352            None
353        }
354    }
355
356    /// Get the number of active sessions.
357    pub fn session_count(&self) -> usize {
358        let sessions = self.sessions.lock().unwrap();
359        sessions.len()
360    }
361
362    /// Get all active session IDs.
363    pub fn session_ids(&self) -> Vec<SessionId> {
364        let sessions = self.sessions.lock().unwrap();
365        sessions.keys().cloned().collect()
366    }
367
368    /// Get information about all active sessions.
369    ///
370    /// Returns a vector of [`SessionInfo`] structs containing session state snapshots
371    /// for use by session management tools.
372    pub fn get_sessions_info(&self) -> Vec<SessionInfo> {
373        let sessions = self.sessions.lock().unwrap();
374        let now = Instant::now();
375
376        sessions.values().map(|managed| {
377            let timeout_remaining_secs = managed.disconnected_at.map(|disconnected_at| {
378                let elapsed = now.duration_since(disconnected_at);
379                if elapsed < self.timeout {
380                    (self.timeout - elapsed).as_secs()
381                } else {
382                    0
383                }
384            });
385
386            SessionInfo::new(
387                managed.session.id(),
388                managed.session.connection_count(),
389                timeout_remaining_secs,
390                managed.session.is_shutdown_requested(),
391                managed.session.pack_names(),
392            )
393        }).collect()
394    }
395
396    /// Start a background task that periodically cleans up expired sessions.
397    pub fn start_cleanup_task(manager: Arc<Self>) {
398        let cleanup_interval = manager.timeout / 2; // Check twice per timeout period.
399
400        tokio::spawn(async move {
401            loop {
402                tokio::time::sleep(cleanup_interval).await;
403                manager.cleanup_expired_sessions();
404            }
405        });
406    }
407}
408
409/// Helper function to receive a response from a session.
410///
411/// This is async and will wait for the response.
412pub async fn recv_response(receiver: &mut UnboundedReceiver<Response>) -> Response {
413    match receiver.recv().await {
414        Some(response) => response,
415        None => panic!("Session response channel closed unexpectedly"),
416    }
417}