1use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
38use tokio::time::{Duration, Instant};
39
40use std::collections::HashMap;
41use std::sync::{Arc, Mutex, RwLock, atomic::{AtomicBool, AtomicU32, Ordering}};
42
43use rpfm_ipc::helpers::SessionInfo;
44use rpfm_ipc::messages::{Command, Response};
45use rpfm_telemetry::info;
46
47use crate::background_thread;
48
49pub const SESSION_SENDER_ERROR: &str = "Error in session communication system. Sender failed to send message.";
51
52pub const DEFAULT_SESSION_TIMEOUT_SECS: u64 = 300;
54
55pub type SessionId = u64;
61
62pub struct SessionManager {
67
68 sessions: Mutex<HashMap<SessionId, ManagedSession>>,
70
71 next_id: Mutex<SessionId>,
73
74 timeout: Duration,
76}
77
78struct ManagedSession {
80
81 session: Arc<Session>,
83
84 disconnected_at: Option<Instant>,
86}
87
88pub struct Session {
93
94 id: SessionId,
96
97 sender: UnboundedSender<(UnboundedSender<Response>, Command)>,
99
100 connection_count: AtomicU32,
102
103 shutdown_requested: AtomicBool,
105
106 pack_names: RwLock<Vec<String>>,
108}
109
110impl Session {
115
116 pub fn new(id: SessionId) -> Arc<Self> {
118 let (sender, receiver) = unbounded_channel();
119
120 let session = Arc::new(Self {
121 id,
122 sender,
123 connection_count: AtomicU32::new(0),
124 shutdown_requested: AtomicBool::new(false),
125 pack_names: RwLock::new(Vec::new()),
126 });
127
128 let session_clone = session.clone();
130 tokio::spawn(async move {
131 info!("Session {} background thread starting...", id);
132 background_thread::background_loop(receiver, session_clone).await;
133 info!("Session {} background thread terminated.", id);
134 });
135
136 session
137 }
138
139 pub fn id(&self) -> SessionId {
141 self.id
142 }
143
144 pub fn connect(&self) {
146 self.connection_count.fetch_add(1, Ordering::SeqCst);
147 }
148
149 pub fn disconnect(&self) {
151 self.connection_count.fetch_sub(1, Ordering::SeqCst);
152 }
153
154 pub fn connection_count(&self) -> u32 {
156 self.connection_count.load(Ordering::SeqCst)
157 }
158
159 pub fn is_shutdown_requested(&self) -> bool {
161 self.shutdown_requested.load(Ordering::SeqCst)
162 }
163
164 pub fn pack_names(&self) -> Vec<String> {
166 self.pack_names.read().unwrap().clone()
167 }
168
169 pub fn add_pack_name(&self, name: &str) {
171 let mut names = self.pack_names.write().unwrap();
172 if !names.contains(&name.to_string()) {
173 names.push(name.to_string());
174 }
175 }
176
177 pub fn remove_pack_name(&self, name: &str) {
179 let mut names = self.pack_names.write().unwrap();
180 names.retain(|n| n != name);
181 }
182
183 pub fn shutdown(&self) {
185 info!("Session {} shutting down...", self.id);
186
187 if self.shutdown_requested.swap(true, Ordering::SeqCst) {
188 info!("Session {} already marked for shutdown before...", self.id);
189 return;
190 }
191
192 let (sender_back, _) = unbounded_channel();
194 let _ = self.sender.send((sender_back, Command::Exit));
195 }
196
197 pub fn send(&self, command: Command) -> UnboundedReceiver<Response> {
201 let (sender_back, receiver_back) = unbounded_channel();
202 if let Err(error) = self.sender.send((sender_back, command)) {
203 panic!("{SESSION_SENDER_ERROR}: {error}");
204 }
205 receiver_back
206 }
207}
208
209impl Default for SessionManager {
210 fn default() -> Self {
211 Self {
212 sessions: Mutex::new(HashMap::new()),
213 next_id: Mutex::new(1),
214 timeout: Duration::from_secs(DEFAULT_SESSION_TIMEOUT_SECS),
215 }
216 }
217}
218
219impl SessionManager {
220
221 pub fn create_session(&self) -> Arc<Session> {
223 let id = {
224 let mut next_id = self.next_id.lock().unwrap();
225 let id = *next_id;
226 *next_id += 1;
227 id
228 };
229
230 let session = Session::new(id);
231 session.connect();
232
233 self.sessions.lock().unwrap().insert(id, ManagedSession {
234 session: session.clone(),
235 disconnected_at: None,
236 });
237
238 info!("Created new session with ID: {}", id);
239 session
240 }
241
242 pub fn get_or_create_session(&self, session_id: Option<SessionId>) -> (Arc<Session>, bool) {
249 if let Some(id) = session_id {
250 let mut sessions = self.sessions.lock().unwrap();
251 if let Some(managed) = sessions.get_mut(&id) {
252
253 if !managed.session.is_shutdown_requested() {
255 managed.session.connect();
256 managed.disconnected_at = None;
257 info!("Client reconnected to existing session {}", id);
258 return (managed.session.clone(), false);
259 }
260 }
261 }
262
263 (self.create_session(), true)
266 }
267
268 pub fn get_session(&self, id: SessionId) -> Option<Arc<Session>> {
270 let sessions = self.sessions.lock().unwrap();
271 sessions.get(&id).map(|m| m.session.clone())
272 }
273
274 pub fn client_disconnected(manager: Arc<Self>, id: SessionId) {
279 let should_schedule_cleanup = {
280 let mut sessions = manager.sessions.lock().unwrap();
281 if let Some(managed) = sessions.get_mut(&id) {
282 managed.session.disconnect();
283
284 if managed.session.connection_count() == 0 {
285 managed.disconnected_at = Some(Instant::now());
286 info!("Session {} has no active connections, will timeout in {:?}", id, manager.timeout);
287 true
288 } else {
289 false
290 }
291 } else {
292 false
293 }
294 };
295
296 if should_schedule_cleanup {
297 Self::schedule_cleanup(manager.clone(), id);
298 }
299 }
300
301 fn schedule_cleanup(manager: Arc<Self>, id: SessionId) {
303 let timeout = manager.timeout;
304 let manager = manager.clone();
305
306 tokio::spawn(async move {
307 tokio::time::sleep(timeout).await;
308 info!("Session {} timeout check triggered (cleanup handled by manager)", id);
309 manager.remove_session(id);
310
311 if manager.session_count() == 0 {
313 info!("No more active sessions, shutting down server...");
314 std::process::exit(0);
315 }
316 });
317 }
318
319 pub fn cleanup_expired_sessions(&self) {
323 let now = Instant::now();
324 let mut to_remove = Vec::new();
325
326 {
327 let sessions = self.sessions.lock().unwrap();
328 for (id, managed) in sessions.iter() {
329 if let Some(disconnected_at) = managed.disconnected_at {
330 if now.duration_since(disconnected_at) >= self.timeout
331 && managed.session.connection_count() == 0
332 {
333 to_remove.push(*id);
334 }
335 }
336 }
337 }
338
339 for id in to_remove {
340 self.remove_session(id);
341 }
342 }
343
344 pub fn remove_session(&self, id: SessionId) -> Option<Arc<Session>> {
346 let mut sessions = self.sessions.lock().unwrap();
347 if let Some(managed) = sessions.remove(&id) {
348 info!("Removing session {}", id);
349 managed.session.shutdown();
350 Some(managed.session)
351 } else {
352 None
353 }
354 }
355
356 pub fn session_count(&self) -> usize {
358 let sessions = self.sessions.lock().unwrap();
359 sessions.len()
360 }
361
362 pub fn session_ids(&self) -> Vec<SessionId> {
364 let sessions = self.sessions.lock().unwrap();
365 sessions.keys().cloned().collect()
366 }
367
368 pub fn get_sessions_info(&self) -> Vec<SessionInfo> {
373 let sessions = self.sessions.lock().unwrap();
374 let now = Instant::now();
375
376 sessions.values().map(|managed| {
377 let timeout_remaining_secs = managed.disconnected_at.map(|disconnected_at| {
378 let elapsed = now.duration_since(disconnected_at);
379 if elapsed < self.timeout {
380 (self.timeout - elapsed).as_secs()
381 } else {
382 0
383 }
384 });
385
386 SessionInfo::new(
387 managed.session.id(),
388 managed.session.connection_count(),
389 timeout_remaining_secs,
390 managed.session.is_shutdown_requested(),
391 managed.session.pack_names(),
392 )
393 }).collect()
394 }
395
396 pub fn start_cleanup_task(manager: Arc<Self>) {
398 let cleanup_interval = manager.timeout / 2; tokio::spawn(async move {
401 loop {
402 tokio::time::sleep(cleanup_interval).await;
403 manager.cleanup_expired_sessions();
404 }
405 });
406 }
407}
408
409pub async fn recv_response(receiver: &mut UnboundedReceiver<Response>) -> Response {
413 match receiver.recv().await {
414 Some(response) => response,
415 None => panic!("Session response channel closed unexpectedly"),
416 }
417}