Utiliser l’envoi de messages pour transférer des données entre les tâches
Une approche de plus en plus populaire pour garantir la sécurité de la concurrence est l’envoi de messages, avec lequel les tâches ou les acteurs communiquent en envoyant aux autres des messages contenant des données. Voici l’idée résumée, tirée d’un slogan provenant de la documentation du langage Go : “Ne communiquez pas en partageant la mémoire ; partagez plutôt la mémoire en communiquant”.
Pour accomplir l’envoi simultané de messages, la bibliothèque standard de Rust fournit une implémentation de canaux. Un canal est un concept de programmation qui permet de transmettre des données d’une tâche à une autre.
Vous pouvez imaginer un canal de programmation comme étant un canal d’eau, comme un ruisseau ou une rivière. Si vous posez quelque chose comme un canard en plastique sur une rivière, il se déplacera vers l’aval jusqu’à la fin de la voie d’eau.
Un canal est divisé en deux parties : un émetteur et un récepteur. La partie de l’émetteur est le lieu en amont où vous déposez les canards en plastique sur la rivière, et la partie du récepteur est celle où les canards en plastique finissent leur voyage. Une partie de votre code appelle des méthodes de l’émetteur en lui passant les données que vous souhaitez envoyer, tandis qu’une autre partie attend que des messages arrivent. Un canal est déclaré fermé lorsque l’une des parties, l’émetteur ou le récepteur, est libérée.
Ici, nous allons concevoir un programme qui a une tâche pour générer des valeurs et les envoyer dans un canal, et une autre tâche qui va recevoir les valeurs et les afficher. Nous allons envoyer de simples valeurs entre les tâches en utilisant un canal pour illustrer cette fonctionnalité. Une fois que vous serez familier avec cette technique, vous pourrez utiliser les canaux pour n’importe quelles tâches qui ont besoin de communiquer entre elles, comme par exemple un système de dialogue en ligne ou un système où de nombreuses tâches font chacune une partie d’un gros calcul et envoient leur résultat à une tâche chargée de les agréger.
Pour commencer, dans l’encart 16-6, nous allons créer un canal mais nous n’allons rien faire avec. Remarquez qu’il ne se compilera pas encore car Rust ne peut pas savoir le type de valeurs que nous souhaitons envoyer dans le canal.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx and rxNous créons un nouveau canal en utilisant la fonction mpsc::channel ; mpsc signifie multiple producer, single consumer, c’est-à-dire plusieurs producteurs, un seul consommateur. En bref, la façon dont la bibliothèque standard de Rust a implémenté ces canaux permet d’avoir plusieurs extrémités émettrices qui produisent des valeurs, mais seulement une seule extrémité réceptrice qui consomme ces valeurs. Imaginez plusieurs ruisseaux qui se rejoignent en une seule grosse rivière : tout ce qui est déposé sur les ruisseaux va finir dans une seule rivière à la fin. Nous allons commencer avec un seul producteur pour le moment, mais nous allons ajouter d’autres producteurs lorsque notre exemple fonctionnera.
La fonction mpsc::channel retourne un tuple dont le premier élément est celui qui permet d’envoyer —l’émetteur— et dont le second est celui qui reçoit —le récepteur—. Les abréviations tx et rx sont utilisés traditionnellement dans de nombreux domaines pour signifier respectivement transmetteur (émetteur) et récepteur, nous avons donc nommé nos variables ainsi pour indiquer clairement le rôle de chaque élément. Nous utilisons une instruction let avec un motif qui déstructure les tuples ; nous verrons l’utilisation des motifs dans les instructions let et la déstructuration au chapitre 19. Pour le moment, retenez que l’utilisation d’une instruction let est une façon d’extraire facilement les éléments du tuple retourné par mpsc::channel.
Déplaçons maintenant l’élément de transmission dans une nouvelle tâche et faisons-lui envoyer une chaîne de caractères afin que la nouvelle tâche communique avec la tâche principale, comme dans l’encart 16-7. C’est comme poser un canard en plastique sur l’amont de la rivière ou envoyer un message instantané d’une tâche à une autre.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let valeur = String::from("salut");
tx.send(valeur).unwrap();
});
}
tx to a spawned thread and sending "hi"Nous utilisons à nouveau thread::spawn pour créer une nouvelle tâche et ensuite utiliser move pour déplacer tx dans la fermeture afin que la nouvelle tâche possède désormais tx. La nouvelle tâche a besoin de posséder l’émetteur pour être en capacité d’envoyer des messages dans ce canal.
L’émetteur a une méthode send qui prend en argument la valeur que nous souhaitons envoyer. La méthode send retourne un type Result<T, E>, donc si le récepteur a déjà été libéré et qu’il n’y a nulle part où envoyer la valeur, l’opération d’envoi va retourner une erreur. Dans cet exemple, nous faisons appel à unwrap pour paniquer en cas d’erreur. Mais dans un vrai programme, nous devrions gérer ce cas correctement : retournez au chapitre 9 pour revoir les stratégies permettant de gérer correctement les erreurs.
Dans l’encart 16-8, nous allons obtenir la valeur du récepteur dans la tâche principale. C’est comme récupérer le canard en plastique dans l’eau à la fin de la rivière, ou récupérer un message instantané.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let valeur = String::from("salut");
tx.send(valeur).unwrap();
});
let recu = rx.recv().unwrap();
println!("On a reçu : {recu}");
}
"hi" in the main thread and printing itLe récepteur a deux modes intéressants : recv et try_recv. Nous avons utilisé recv, un raccourci pour recevoir, qui va bloquer l’exécution de la tâche principale et attendre jusqu’à ce qu’une valeur soit envoyée dans le canal. Une fois qu’une valeur est envoyée, recv va la retourner dans un Result<T, E>. Lorsque l’émetteur se ferme, recv va retourner une erreur pour signaler qu’il n’y aura plus de valeurs qui arriveront.
La méthode try_recv ne bloque pas, mais va plutôt retourner immédiatement un Result<T, E> : une valeur Ok qui contiendra un message s’il y en a un de disponible, et une valeur Err s’il n’y a pas de message cette fois-ci. L’utilisation de try_recv est pratique si cette tâche a d’autres choses à faire pendant qu’elle attend les messages : nous pouvons ainsi écrire une boucle qui appelle régulièrement try_recv, gère le message s’il y en a un, et sinon fait d’autres choses avant de vérifier à nouveau.
Nous avons utilisé recv dans cet exemple pour des raisons de simplicité ; nous n’avons rien d’autre à faire dans la tâche principale que d’attendre les messages, donc bloquer la tâche principale est acceptable.
Lorsque nous exécutons le code de l’encart 16-8, nous allons voir la valeur s’afficher grâce à la tâche principale :
On a reçu : salut
C’est parfait ainsi !
Transfert de possession via les canaux
Les règles de possession jouent un rôle vital dans l’envoi de messages car elles vous aident à écrire du code sûr et concurrent. Réfléchir à la possession avec vos programmes Rust vous offre l’avantage d’éviter des erreurs de développement avec la concurrence. Faisons une expérience pour montrer comment la possession et les canaux fonctionnent ensemble pour éviter les problèmes : nous allons essayer d’utiliser la valeur dans la nouvelle tâche après que nous l’avons envoyée dans le canal. Essayez de compiler le code de l’encart 16-9 pour découvrir pourquoi ce code n’est pas autorisé.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let valeur = String::from("salut");
tx.send(valeur).unwrap();
println!("valeur vaut {valeur}");
});
let recu = rx.recv().unwrap();
println!("On a reçu : {recu}");
}
val after we’ve sent it down the channelIci, nous essayons d’afficher valeur après que nous l’avons envoyée dans le canal avec tx.send. Ce serait une mauvaise idée de permettre cela : une fois que la valeur a été envoyée à une autre tâche, cette tâche peut la modifier ou la libérer avant que nous essayions de l’utiliser à nouveau. Il est possible que des modifications faites par l’autre tâche puissent causer des erreurs ou des résultats inattendus à cause de données incohérentes ou manquantes. Toutefois, Rust nous affiche une erreur si nous essayons de compiler le code de l’encart 16-9 :
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `valeur`
--> src/main.rs:10:27
|
8 | let valeur = String::from("salut");
| ------ move occurs because `valeur` has type `String`, which does not implement the `Copy` trait
9 | tx.send(valeur).unwrap();
| ------ value moved here
10 | println!("valeur vaut {valeur}");
| ^^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
Notre erreur de concurrence a provoqué une erreur à la compilation. La fonction send prend possession de ses paramètres, et lorsque la valeur est déplacée, le récepteur en prend possession. Cela nous évite d’utiliser à nouveau accidentellement la valeur après l’avoir envoyée ; le système de possession vérifie que tout est en ordre.
Envoyer plusieurs valeurs
Le code de l’encart 16-8 s’est compilé et exécuté, mais il ne nous a pas clairement indiqué que deux tâches séparées communiquaient entre elles via le canal.
Dans l’encart 16-10, nous avons fait quelques modifications qui prouvent que le code de l’encart 16-8 est exécuté avec de la concurrence : la nouvelle tâche va maintenant envoyer plusieurs messages et faire une pause d’une seconde entre chaque message.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let valeurs = vec![
String::from("salut"),
String::from("à partir"),
String::from("de la"),
String::from("nouvelle tâche"),
];
for valeur in valeurs {
tx.send(valeur).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for recu in rx {
println!("On a reçu : {recu}");
}
}
Cette fois-ci, la nouvelle tâche a un vecteur de chaînes de caractères que nous souhaitons envoyer à la tâche principale. Nous itérons sur celui-ci, nous envoyons les chaînes une par une en faisant une pause entre chaque envoi en appelant la fonction thread::sleep avec une valeur Duration d’une seconde.
Dans la tâche principale, nous n’appelons plus explicitement la fonction recv : à la place, nous utilisons rx comme un itérateur. Pour chaque valeur reçue, nous l’affichons. Lorsque le canal se fermera, l’itération se terminera.
Lorsque nous exécutons le code de l’encart 16-10, nous devrions voir la sortie suivante, avec une pause d’une seconde entre chaque ligne :
On a reçu : salut
On a reçu : à partir
On a reçu : de la
On a reçu : nouvelle tâche
Comme nous n’avons pas de code qui met en pause ou retarde la boucle for de la tâche principale, nous pouvons dire que la tâche principale est en attente de réception des valeurs de la part de la nouvelle tâche.
Création de plusieurs producteurs
Précédemment, nous avions évoqué que mpsc était un acronyme pour multiple producer, single consumer. Mettons mpsc en œuvre en élargissant le code de l’encart 16-10 pour créer plusieurs tâches qui vont toutes envoyer des valeurs au même récepteur. Nous pouvons faire ceci en clonant l’émetteur, comme dans l’encart 16-11 :
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// -- partie masquée ici --
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let valeurs = vec![
String::from("salut"),
String::from("à partir"),
String::from("de la"),
String::from("nouvelle tâche"),
];
for valeur in valeurs {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let valeurs = vec![
String::from("encore plus"),
String::from("de messages"),
String::from("pour"),
String::from("vous"),
];
for valeur in valeurs {
tx.send(valeur).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for recu in rx {
println!("On a reçu : {recu}");
}
// -- partie masquée ici --
}
Cette fois-ci, avant de créer la première nouvelle tâche, nous appelons clone sur l’émetteur. Cela va nous donner un nouvel émetteur que nous pouvons passer à la première nouvelle tâche. Nous passons ensuite l’émetteur original à une seconde nouvelle tâche. Cela va nous donner deux tâches, chacune envoyant des messages différents au récepteur.
Lorsque vous exécuterez ce code, votre sortie devrait ressembler à ceci :
On a reçu : salut
On a reçu : encore plus
On a reçu : à partir
On a reçu : de messages
On a reçu : pour
On a reçu : de la
On a reçu : nouvelle tâche
On a reçu : pour vous
Vous pourrez peut-être constater que les valeurs sont dans un autre ordre chez vous, en fonction de votre système. C’est ce qui rend la concurrence aussi intéressante que difficile. Si vous jouez avec la valeur de thread::sleep en lui donnant différentes valeurs dans différentes tâches, chaque exécution sera encore moins déterministe et créera une sortie différente à chaque fois.
Maintenant que nous avons découvert le fonctionnement des canaux, examinons un autre genre de concurrence.