I want implement a Multiplexer/Demultiplexer in rust. It should send the data of several 'upstream' DuplexStream
s via one single 'downstream' DuplexStream
by simply prepending a port_num
identifier of the upstream DuplexStream
to the packet. Of course this should also work the other way round: reading the port_num
from a packet received from downstream and sent it to the correct upstream Stream.
I started to implement such a MultiplexStream (below code will not compile). However, I'm facing a problem:
The open_ports
variable that maps port_num
to the corresponding upstream DuplexStream
must be accessible to several tasks which is not allowed in Rust.
What design pattern could be applied here to resolve my issue?
impl MultiplexStream<T,U> {
fn new(downstream: DuplexStream<(u32,T), U>) -> MultiplexStream<T,U> {
let mut open_ports = HashMap::<u32, DuplexStream<(u32,T), U>>new();
spawn do {
let res = try {
loop {
let (port_num, data) = downstream.recv();
match open_ports.find(port_num) {
Some(intermediate) => {
let res = try {
intermediate.send(data)
}
if res.is_err() {
open_ports.remove(port_num);
}
}
None => {}
}
}
}
// downstream was closed => cleanup
for intermediate in open_ports.values() {
intermediate.close();
}
open_ports.clear();
}
}
fn open<V: Send>(port_num: u32) -> Result(DuplexStream<V,T>, ()) {
if open_ports.contains_key(port_num) {
return Err(());
}
let (upstream, intermediate) = DuplexStream<V,T>::new();
open_ports.insert(port_num, intermediate);
spawn do {
let res = try {
loop {
let data = intermediate.recv();
downstream.send(~(port_num, data));
}
}
// upstream was closed => cleanup
intermediate.close();
open_ports.remove(port_num);
}
return Ok(upstream);
}
}
In rust sharing data is done via Arc (from libsync). Basic Arc is for sharing immutable data, for mutable there are MutexArc and RWArc. Sharing with Arc is copy free.
I put together a small example:
extern mod sync;
use std::hashmap::HashMap;
fn main() {
let arc = sync::RWArc::new(HashMap::<~str, int>::new());
arc.write(|m| m.insert(~"a", 0));
for num in range(1, 10) {
let arc = arc.clone();
spawn(proc() {
println!("[task {}] Value before is: {}", num, arc.read(|m| m.get(&~"a").clone()));
arc.write(|m| { m.insert_or_update_with(~"a", 0, |_, val| *val += 1); });
println!("[task {}] Value after is: {}", num, arc.read(|m| m.get(&~"a").clone()));
});
}
}
For the latest version of rust (0.10pre) use
extern crate collections;
extern crate sync;
use collections::hashmap::HashMap;
use sync::RWArc;
fn main() {
let arc = RWArc::new(HashMap::<~str, int>::new());
arc.write(|m| m.insert(~"a", 0));
for num in range(1, 10) {
let arc = arc.clone();
spawn(proc() {
println!("[task {}] Value before is: {}", num, arc.read(|m| m.get(&~"a").clone()));
arc.write(|m| { m.insert_or_update_with(~"a", 0, |_, val| *val += 1); });
println!("[task {}] Value after is: {}", num, arc.read(|m| m.get(&~"a").clone()));
});
}
}