To a man with a hammer, everything looks like a nail.

An in-development POC project to draw streams of data from Rust microservices around the network and combine them multi-threadidly (new expression) needed a way of discovering the microservices so that the work could be allocated across them. The microservices gather information about the hardware that each is running on.

We put an endpoint on the microservice at /sysinfo. The endpoint reports various stats about the system, an example of which can be seen here: an 8GB Raspberry Pi 5. We’re using our wolves-cli-helper for this.

{
    "system_name": "Debian GNU/Linux",
    "kernel_version": "6.12.20+rpt-rpi-2712",
    "os_version": "12",
    "hostname": "rpi5",
    "cpu_cores": 4,
    "cpu_virtual_cores": 4,
    "total_memory": 8442871808,
    "used_memory": 3387604992,
    "total_swap": 209698816,
    "used_swap": 0,
    "disks": [
      {
        "disk_type": "HDD",
        "file_system": "ext4",
        "free_space": "337572405248"
      },
      {
        "disk_type": "HDD",
        "file_system": "vfat",
        "free_space": "466360320"
      },
      {
        "disk_type": "Unknown",
        "file_system": "overlay",
        "free_space": "0"
      }
    ]
  }

This information from all of the endpoints is used by the client to stitch together a multi-threaded data stream.

Combing the network

We specify a range - 192.168.0.2 to 192.168.0.240 and set off the reqwest client to search for endpoints. It’s all on a local network so we reduced the waiting time to a minimum of 1s. When it’s completed, we persist the endpoints to a file for future reference.


    let allrange = 2..=240;
    for num in allrange.iter().progress() {
        let ip = format!(
            "{}{}:{}/sysinfo",
            conf.application_config.network_mask, num, 
            conf.application_config.server_port
        );
        let endpoint = ip.as_str();

        if let Ok(info) = 
            check_and_parse_endpoint(&client, endpoint).await {
            servers.push(info);
        }
    }

Good straight-forward async/await code. Here’s the check_and_parse_endpoint() function:


async fn check_and_parse_endpoint(
    client: &Client,
    url: &str,
) -> Result<SystemInfo, Box<dyn std::error::Error>> {
    
    let response = client.get(url).send().await?;
    
    if response.status() == StatusCode::OK {
        
        // Parse JSON into a struct
        let sys_info: SystemInfo = response.json().await?;
        Ok(sys_info)
    
    } else {
    
        Err(format!("{} Unexpected status code: {}", url, response.status()).into())
    
    }
}

If we get a 200 (OK) response, then we have found our running microservice and we load the result into our SystemInfo struct.

This all worked well, but the speed was appalling - 213 seconds for the scan. Perhaps for a POC we might overlook it as it’ll be a rarely triggered function, but we have pride don’t we? As we really do bang on about speed and efficiency, it would be bad form to gloss over it.

A screenshot containing the results of a slow network scan; 213 seconds.

What’s the hammer?

Rayon! The thread parallelism crate. Multi-thread the monkeys out of it and we’ll be good to go! This reveals, in a somewhat glib way, the starting point for many a tuning journey in the cloud era. Clouds have “meh” I/O and “ugh” processor speed so lateral scaling across processors is a good start, as long as you can feed them fast enough; did we mention the I/O is “meh”?

One late-night re-work later, Rayon was ready to go. Using the par_iter function. It didn’t help very much.

Why didn’t it work?

Poor quality thinking. The fact that something is ‘go-to’ doesn’t make it right. (Seasoned campainers will enjoy this gosub reference.)

The issue here wasn’t a lack of computing power to thrust through the requests - the single thread was quite relaxed about the whole thing, almost to the point of being horizontal. The problem was async. It was simply that this relatively trivial operation was having to wait for each data request to return before proceeding to the next request.

And to fix it?

Launch the lot in an asynchronous manner and await their return. That’s what for_each_concurrent is doing here. Streaming through all the values in the IP range without waiting for individual endpoints to return.

    // Uses futures::stream for concurrent async processing
    stream::iter(allrange)
        .for_each_concurrent(Some(255), |num| {
            let ip = format!(
                "{}{}:{}/sysinfo",
                conf.application_config.network_mask, num, conf.application_config.server_port
            );
            let client = client.clone();
            let servers = Arc::clone(&servers);

            async move {
                if let Ok(info) = check_and_parse_endpoint(&client, &ip).await {
                    servers.lock().unwrap().push(info);
                }
            }
        })
        .await;

And the result?

1 second. It could be faster, but that’s the minimum timeout that the reqwest client will accept.

If our servers were more widely spread across t’Internet, then the minimal timeout would need tweaking, but for our purposes, Bingo!

A screenshot containing the results of the fast network scan; 1 second.

What did we learn?

A bit of technical humility, not necessarily a bad thing.