rpfm_server/comms.rs
1//---------------------------------------------------------------------------//
2// Copyright (c) 2017-2026 Ismael Gutiérrez González. All rights reserved.
3//
4// This file is part of the Rusted PackFile Manager (RPFM) project,
5// which can be found here: https://github.com/Frodo45127/rpfm.
6//
7// This file is licensed under the MIT license, which can be found here:
8// https://github.com/Frodo45127/rpfm/blob/master/LICENSE.
9//---------------------------------------------------------------------------//
10
11//! Inter-thread communication primitives.
12//!
13//! Each [`session::Session`](crate::session::Session) owns a dedicated background
14//! thread that processes [`Command`]s serially. The HTTP / WebSocket /
15//! MCP-side handlers run on the `tokio` runtime and need to ship a
16//! `Command` into that thread plus get a stream of [`Response`]s back.
17//!
18//! [`CentralCommand`] is the thin abstraction that wires that up: a single
19//! [`tokio::sync::mpsc::unbounded_channel`] from handlers to the background
20//! loop, plus a per-request reply channel attached to each command. This
21//! keeps every request's responses isolated from every other request's,
22//! while still serialising work inside a session.
23//!
24//! [`Response`]: rpfm_ipc::messages::Response
25
26use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
27
28use std::fmt::Debug;
29use std::sync::Mutex;
30
31use rpfm_ipc::messages::Command;
32
33/// Panic message used when a receiver yields `None` (the background thread
34/// dropped the sender, which means it crashed or exited unexpectedly).
35pub const THREADS_COMMUNICATION_ERROR: &str = "Error in thread communication system. Response received: ";
36
37/// Panic message used when a `send` into the inter-thread channel fails (the
38/// receiver was dropped before we could deliver the message).
39pub const THREADS_SENDER_ERROR: &str = "Error in thread communication system. Sender failed to send message.";
40
41//-------------------------------------------------------------------------------//
42// Enums & Structs
43//-------------------------------------------------------------------------------//
44
45/// Custom type for the thread receiver, so clippy doesn't complain about long types.
46type ThreadReceiver<T> = Mutex<Option<UnboundedReceiver<(UnboundedSender<T>, Command)>>>;
47
48/// This struct contains the senders and receivers necessary to communicate between both threads.
49///
50/// You can use them by using the send/recv functions implemented for it.
51pub struct CentralCommand<T: Send + Sync + Debug> {
52 sender: UnboundedSender<(UnboundedSender<T>, Command)>,
53 receiver: ThreadReceiver<T>,
54}
55
56//-------------------------------------------------------------------------------//
57// Implementations
58//-------------------------------------------------------------------------------//
59
60impl<T: Send + Sync + Debug> Default for CentralCommand<T> {
61 fn default() -> Self {
62 let (sender, receiver) = unbounded_channel();
63 Self {
64 sender,
65 receiver: Mutex::new(Some(receiver)),
66 }
67 }
68}
69
70impl<T: Send + Sync + Debug> CentralCommand<T> {
71
72 /// This function serves as a generic way for commands to be sent to the backend.
73 ///
74 /// It returns the receiver which will receive the answers for the command, if any.
75 pub fn send(&self, data: Command) -> UnboundedReceiver<T> {
76 let (sender_back, receiver_back) = unbounded_channel();
77 if let Err(error) = self.sender.send((sender_back, data)) {
78 panic!("{THREADS_SENDER_ERROR}: {error}");
79 }
80
81 receiver_back
82 }
83
84 /// This function serves to send a message back through a generated channel.
85 pub fn send_back(sender: &UnboundedSender<T>, data: T) {
86 if let Err(error) = sender.send(data) {
87 panic!("{THREADS_SENDER_ERROR}: {error}");
88 }
89 }
90
91 /// This functions serves to take the receiver from the central command.
92 pub fn take_receiver(&self) -> Option<UnboundedReceiver<(UnboundedSender<T>, Command)>> {
93 self.receiver.lock().unwrap().take()
94 }
95
96 /// This functions serves to receive messages from a generated channel.
97 ///
98 /// This function does only try once, and it locks the thread. Panics if the response fails.
99 pub async fn recv(receiver: &mut UnboundedReceiver<T>) -> T {
100 let response = receiver.recv().await;
101 match response {
102 Some(data) => data,
103 None => panic!("{THREADS_COMMUNICATION_ERROR}{response:?}"),
104 }
105 }
106}