Adithya V. Sastry

Robotics + Origami
Creator of Tin Can Linux
Lover of all things tiny


"A dream is not that which you see while
sleeping, it is that which does not let
you sleep."

-- Dr. APJ Abdul Kalam

Shared Memory IPC in Rust                                             06/23/2024
--------------------------------------------------------------------------------

I recently came across an interesting crate for Rust that enables interprocess
communication via shared memory. At the time I was looking for a faster method
of transferring large amounts of data between different processes, and this
crate proved to be an effective solution.

PSA: this involves the use of unsafe Rust. Yipee!


Initial Setup
-------------

Besides just this crate, you need a few other things to make this work. There
needs to be a way to transfer file descriptors between the two programs. The
examples provided in the repository use DBUS to achieve this, but this is
honestly overkill. Instead, we use good ol' Unix Sockets along with a nifty
crate called passfd to transfer the file descriptors and set up the shared
memory pipeline.

Start by creating a new project the usual way:

--------------------------------------------------------------------------------

  $ cargo new shmem-test
  $ cd shmem-test

--------------------------------------------------------------------------------


You will also need to modify Cargo.toml to include the two crates previously
mentioned and create separate client and server binaries:

--------------------------------------------------------------------------------

[package]
name = "shmem-test"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0"
shmem-ipc = "0.3.0"
passfd = "0.1.6"

[[bin]]
name = "client"
path = "src/client.rs"

[[bin]]
name = "server"
path = "src/server.rs"
--------------------------------------------------------------------------------


Basic Server-Client Interaction
-------------------------------

Let's modify the example from the shmem-ipc repo to use sockets and passfd
instead of dbus.

Starting with server.rs:

--------------------------------------------------------------------------------

use std::fs::{self, File};
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixListener;
use std::sync::{Arc, Mutex};
use std::thread;

use anyhow::Result;
use passfd::FdPassingExt;
use shmem_ipc::sharedring::Receiver;

const CAPACITY: usize = 500000;

struct State {
    listener: UnixListener,
    sum: Arc<Mutex<f64>>,
}

impl State {
    pub fn new() -> Result<Self> {
        // If the socket exists, remove it to avoid an error.
        if fs::metadata("/tmp/shmem-test.sock").is_ok() {
            fs::remove_file("/tmp/shmem-test.sock")?;
        }

        Ok(Self {
            listener: UnixListener::bind("/tmp/shmem-test.sock")?,
            sum: Arc::new(Mutex::new(0.0)),
        })
    }

    pub fn add_receiver(&mut self) -> Result<(File, File, File)> {
        // Create a receiver in shared memory.
        let mut r = Receiver::new(CAPACITY as usize)?;
        let m = r.memfd().as_file().try_clone()?;
        let e = r.empty_signal().try_clone()?;
        let f = r.full_signal().try_clone()?;
        // In this example, we spawn a thread for every ringbuffer. More complex
        // real-world scenarios might multiplex using non-block frameworks, as
        // well as having a mechanism to detect when a client is gone.
        let sum = self.sum.clone();
        thread::spawn(move || {
            loop {
                r.block_until_readable().unwrap(); 

                // Instead of using receive_raw like the example, we use
                // receive_trusted to work with normal slices. Receive_raw may
                // be slightly faster since it eliminates a call to
                // slice::from_raw_parts.
                unsafe {
                    r.receive_trusted(|p: &[f64]| {
                        let mut s = 0.0f64;
                        for i in p {
                            s += *i;
                        }

                        *sum.lock().unwrap() += s;
                        return p.len();
                    }).unwrap();
                }
            }
        });
        Ok((m, e, f))
    }

    pub fn send_fds(&mut self) -> Result<()> {
        // Add a new shared memory reciever and get the file descriptors.
        let (m, e, f) = self.add_receiver()?;
        // Wait for an incoming connection over the socket.
        let (stream, _) = self.listener.accept()?;

        // Once a client has connected, send the file descriptors.
        stream.send_fd(m.as_raw_fd())?;
        stream.send_fd(e.as_raw_fd())?;
        stream.send_fd(f.as_raw_fd())?;

        Ok(())
    }
}

fn main() -> Result<()> {
    // Wait for an incoming connection.
    let mut com = State::new()?;
    com.send_fds()?;

    // Once we have a connection, read the values and do something. A real
    // application may do something more interesting here.
    let mut last_sum = 0.0;
    loop {
        let sum = *com.sum.lock().unwrap();
        if sum != last_sum {
            last_sum = sum;
            println!("Sum: {}", sum);
        }
    }
}
--------------------------------------------------------------------------------


And client.rs:

--------------------------------------------------------------------------------

use std::fs::File;
use std::os::unix::net::UnixStream;
use std::os::unix::io::FromRawFd;
use std::time::Duration;
use std::thread::sleep;

use anyhow::Result;
use passfd::FdPassingExt;
use shmem_ipc::sharedring::Sender;

const CAPACITY: usize = 500000;

fn main() -> Result<()> {
    let fstream = UnixStream::connect("/tmp/shmem-test.sock")?;

    let mfd = fstream.recv_fd()?;
    let m = unsafe { File::from_raw_fd(mfd) };

    let efd = fstream.recv_fd()?;
    let e = unsafe { File::from_raw_fd(efd) };

    let ffd = fstream.recv_fd()?;
    let f = unsafe { File::from_raw_fd(ffd) };

    let mut r = Sender::open(CAPACITY, m, e, f)?;
    let mut items = 100000;

    loop {
        let item = 1.0f64 / (items as f64);

        // Again, we use send_trusted instead of send_raw to work with normal
        // slices instead of raw pointers. Send_raw may have a slight decrease
        // in latency since it eliminates the call to slice::from_raw_parts.
        unsafe {
            r.send_trusted(|p: &mut [f64]| {
                let num = if items < p.len() {
                    items
                } else {
                    p.len()
                };

                for i in 0..num {
                    p[i as usize] = item;
                }

                println!(
                    "Sending {} items of {}, in total {}",
                    num, item, (num as f64) * item
                );

                return num;
            }).unwrap();
        }

        items += 100000;
        sleep(Duration::from_millis(1000));
    }
}
--------------------------------------------------------------------------------


If you open two terminals and run these commands in separate terminals (starting
the server first), you should see some things start to happen.

--------------------------------------------------------------------------------

  # Run this first, in terminal 1
  $ cargo run --release --bin server

      Finished `release` profile [optimized] target(s) in 0.12s
       Running `target/release/server`
  Sum: 0.9999999999980838
  Sum: 2.0000000000004015
  Sum: 2.6673866666694828
  Sum: 3.667386666673917
  Sum: 3.867818666674076
  Sum: 4.701512000011892
  Sum: 5.416106285733997
  Sum: 6.041376285731472
  Sum: 6.597171841289496
  Sum: 7.097387841283043
  Sum: 7.552129659467069
  Sum: 7.968976326135977
  Sum: 8.353757864599249
  Sum: 8.7110550074603
  Sum: 9.044532340793014
  Sum: 9.35716734079175

--------------------------------------------------------------------------------

  # Run this after the server starts, in terminal 2:
  $ cargo run --release --bin client

      Finished `release` profile [optimized] target(s) in 0.01s
       Running `target/release/client`
  Sending 100000 items of 0.00001, in total 1
  Sending 200000 items of 0.000005, in total 1
  Sending 200216 items of 0.0000033333333333333333, in total 0.6673866666666667
  Sending 400000 items of 0.0000025, in total 1
  Sending 100216 items of 0.000002, in total 0.200432
  Sending 500216 items of 0.0000016666666666666667, in total 0.8336933333333333
  Sending 500216 items of 0.0000014285714285714286, in total 0.7145942857142857
  Sending 500216 items of 0.00000125, in total 0.6252700000000001
  Sending 500216 items of 0.000001111111111111111, in total 0.5557955555555555
  Sending 500216 items of 0.000001, in total 0.500216
  Sending 500216 items of 0.000000909090909090909, in total 0.45474181818181814
  Sending 500216 items of 0.0000008333333333333333, in total 0.41684666666666664
  Sending 500216 items of 0.0000007692307692307693, in total 0.38478153846153845
  Sending 500216 items of 0.0000007142857142857143, in total 0.35729714285714287
  Sending 500216 items of 0.0000006666666666666667, in total 0.33347733333333335
  Sending 500216 items of 0.000000625, in total 0.31263500000000005

--------------------------------------------------------------------------------


Cool!

============== Updates ==============
News - 12/01/2024 Arc: first release candidate News - 11/28/2024 Tin Can: 16 followers + 11 stars on GitHub Post - 11/19/2024 LibTorch in Rust using FFIs News - 11/07/2024 Arc: rusty package manager for Linux Post - 06/23/2024 Shared Memory IPC in Rust Post - 01/14/2024 Installing Kiss Linux