Skip to contents

A rush network consists of thousands of workers started on different machines. This article explains how to start workers in a rush network. Rush offers three ways to start workers: local workers with processx, remote workers with mirai and script workers.

Local Workers

We use the random search example from the Rush article to demonstrate how the controller works.

library(rush)

wl_random_search = function(rush, branin) {
  while(TRUE) {

    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

rush = rsh(
  network = "test-network",
  config = redux::redis_config())

branin = function(x1, x2) {
  (x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}

Start Workers

Workers may be initiated locally or remotely. Local workers run on the same machine as the controller, whereas remote workers operate on separate machines. The $start_local_workers() method initiates local workers using the processx package. The n_workers parameter specifies the number of workers to launch. The worker_loop parameter defines the function executed by each worker. If the worker_loop function depends on additional objects, these can be passed as arguments to worker_loop. Required packages for the worker_loop can be specified using the packages parameter.

worker_ids = rush$start_local_workers(
  worker_loop = wl_random_search,
  branin = branin,
  n_workers = 2)

Worker information is accessible through the $worker_info method. Each worker is identified by a worker_id. The pid field denotes the process identifier, and the hostname field indicates the machine name. The remote column specifies whether the worker is remote, and the heartbeat column indicates the presence of a heartbeat process. The state column indicates the current state of the worker. Possible states include "running", "terminated", "killed", and "lost". Heartbeat mechanisms are discussed in the Error Handling section.

rush$worker_info
       worker_id   pid remote      hostname heartbeat   state
          <char> <int> <lgcl>        <char>    <lgcl>  <char>
1: loopy_quin... 10010  FALSE runnervmmt...     FALSE running
2: entertaini... 10021  FALSE runnervmmt...     FALSE running

Additional workers may be added to the network at any time.

rush$start_local_workers(
  worker_loop = wl_random_search,
  branin = branin,
  n_workers = 2)
rush$worker_info
       worker_id   pid remote      hostname heartbeat   state
          <char> <int> <lgcl>        <char>    <lgcl>  <char>
1: loopy_quin... 10010  FALSE runnervmmt...     FALSE running
2: entertaini... 10021  FALSE runnervmmt...     FALSE running
3: selfevolve... 10084  FALSE runnervmmt...     FALSE running
4: unpronounc... 10086  FALSE runnervmmt...     FALSE running

Rush Plan

When rush is integrated into a third-party package, the starting of workers is typically managed by the package itself. In such cases, users may configure worker options by invoking the rush_plan() function. This function allows explicit specification of the number of workers, the type of workers, and the configuration for connecting to the Redis database.

rush_plan(n_workers = 2, config = redux::redis_config(), worker_type = "local")

Passing data to workers

Objects required by the worker loop can be passed as arguments to $start_local_workers() / $start_remote_workers(). These arguments are serialized and stored in the Redis database as part of the worker configuration. Upon initialization, each worker retrieves and unserializes the worker configuration before invoking the worker loop.

Note

The maximum size of a Redis string is 512 MiB. If the serialized worker configuration exceeds this limit, Rush will raise an error. In scenarios where both the controller and the workers have access to a shared file system, Rush will instead write large objects to disk. The large_objects_path argument of rush_plan() specifies the directory used for storing such large objects.

Stop Worker

Workers can be stopped individually or all at once. To terminate a specific worker, the $stop_workers() method is invoked with the corresponding worker_ids argument.

rush$stop_workers(worker_ids = worker_ids[1])

This command terminates the selected worker process.

rush$worker_info
       worker_id   pid remote      hostname heartbeat   state
          <char> <int> <lgcl>        <char>    <lgcl>  <char>
1: entertaini... 10021  FALSE runnervmmt...     FALSE running
2: selfevolve... 10084  FALSE runnervmmt...     FALSE running
3: unpronounc... 10086  FALSE runnervmmt...     FALSE running
4: loopy_quin... 10010  FALSE runnervmmt...     FALSE  killed

To stop all workers and reset the network, the $reset() method is used.

rush$reset()

Instead of killing the worker processes, it is also possible to send a terminate signal to the worker. The worker then terminates after it has finished the current task. The worker loop must implement the rush$terminated flag. Then the rush controller can terminate the optimization.

wl_random_search = function(rush, branin) {
  while(!rush$terminated) {

    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

rush = rsh(
  network = "test-random-search-terminate",
  config = redux::redis_config())

rush$start_local_workers(
  worker_loop = wl_random_search,
  n_workers = 2,
  branin = branin)

The random search proceeds as usual.

rush$fetch_finished_tasks()
             x1          x2          y   pid     worker_id          keys
          <num>       <num>      <num> <int>        <char>        <char>
  1:  7.0888916  0.07065129  17.947437 10159 exilable_v... 4dd3f8df-a...
  2: -0.6849254  2.77981585  36.541105 10159 exilable_v... bde40e60-e...
  3:  7.5997890 12.31104774 132.212092 10159 exilable_v... 52e30de7-a...
  4:  8.8313747  0.45771748   4.479984 10159 exilable_v... 17ede0b4-a...
  5: -0.4790338  0.73898474  55.160909 10159 exilable_v... 94763e07-1...
 ---
578:  4.8946469 10.31117015  92.853754 10159 exilable_v... 9a8bca88-1...
579:  3.7721624 14.69661569 167.675632 10157 mad_montan... d3d7b914-e...
580:  0.6445911  9.16144724  34.762633 10159 exilable_v... f0a027f5-c...
581:  8.5444915  6.51198198  25.781063 10157 mad_montan... 376a8120-4...
582:  3.4271851  9.43639706  55.156537 10159 exilable_v... 67cdb02d-0...

To terminate the optimization, the following command is used.

rush$stop_workers(type = "terminate")

The workers are terminated.

rush$worker_info
       worker_id   pid remote      hostname heartbeat      state
          <char> <int> <lgcl>        <char>    <lgcl>     <char>
1: mad_montan... 10157  FALSE runnervmmt...     FALSE terminated
2: exilable_v... 10159  FALSE runnervmmt...     FALSE terminated

Failed Workers

We simulate a segfault on the worker by killing the worker process.

rush = rsh(network = "test-failed-workers")

wl_failed_worker = function(rush) {
  tools::pskill(Sys.getpid(), tools::SIGKILL)
}

Workers are then started using the faulty worker loop.

worker_ids =  rush$start_local_workers(
  worker_loop = wl_failed_worker,
  n_workers = 2)

The $detect_lost_workers() method is used to identify failed workers. When a worker is detected, its state is updated to "lost".

rush$detect_lost_workers()
rush$worker_info
       worker_id   pid remote      hostname heartbeat  state
          <char> <int> <lgcl>        <char>    <lgcl> <char>
1: bacteriolo... 10235  FALSE runnervmmt...     FALSE   lost
2: light_grea... 10233  FALSE runnervmmt...     FALSE   lost

Restart Workers

Workers that have failed can be restarted using the $restart_workers() method. This method accepts the worker_ids of the workers to be restarted.

rush$restart_workers(worker_ids = worker_ids[1])

The first worker is restarted and its state is updated to "running".

rush$worker_info
       worker_id   pid remote      hostname heartbeat   state
          <char> <int> <lgcl>        <char>    <lgcl>  <char>
1: light_grea... 10305  FALSE runnervmmt...     FALSE running
2: bacteriolo... 10235  FALSE runnervmmt...     FALSE    lost

Log Messages

Workers record all messages generated using the lgr package to the database. The lgr_thresholds argument of $start_local_workers() specifies the logging level for each logger, e.g. c("mlr3/rush" = "debug"). While enabling log message storage introduces a minor performance overhead, it is valuable for debugging purposes. By default, log messages are not stored. To enable logging, workers are started with the desired logging threshold.

rush = rsh(network = "test-log-messages")

wl_log_message = function(rush) {
  lg = lgr::get_logger("mlr3/rush")
  lg$info("This is an info message from worker %s", rush$worker_id)
}

rush$start_local_workers(
  worker_loop = wl_log_message,
  n_workers = 2,
  lgr_thresholds = c(rush = "info"))

The most recent log messages can be printed as follows.

Sys.sleep(1)
rush$print_log()

To retrieve all log entries, use the $read_log() method.

rush$read_log()
Null data.table (0 rows and 0 cols)

We reset the network.

rush$reset()

Remote Workers

The mirai package provides a straightforward mechanism for launching rush workers on remote machines. mirai manages daemons, which are persistent background processes capable of executing arbitrary R code in parallel. These daemons communicate with the main session.

Start Workers

Usually mirai is used to start daemons on remote machines but it can also be used to start local daemons.

daemons(
  n = 2,
  url = host_url()
)

Daemons may also be launched on a remote machine via SSH.

daemons(
  n = 2L,
  url = host_url(port = 5555),
  remote = ssh_config(remotes = "ssh://10.75.32.90")
)

On high performance computing clusters, daemons can be started using a scheduler.

daemons(
  n = 2L,
  url = host_url(),
  remote = remote_config(
    command = "sbatch",
    args = c("--mem 512", "-n 1", "--wrap", "."),
    rscript = file.path(R.home("bin"), "Rscript"),
    quote = TRUE
  )
)

We define a worker loop.

wl_random_search = function(rush, branin) {
  while(TRUE) {

    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

rush = rsh(
  network = "test-network",
  config = redux::redis_config())

We start two new daemons.

After the daemons are started, we can start the remote workers.

worker_ids = rush$start_remote_workers(
  worker_loop = wl_random_search,
  n_workers = 2,
  branin = branin)
rush$worker_info
       worker_id   pid remote      hostname heartbeat   state
          <char> <int> <lgcl>        <char>    <lgcl>  <char>
1: carnivales... 10455   TRUE runnervmmt...     FALSE running
2: symbiotic_... 10457   TRUE runnervmmt...     FALSE running

We stop the daemons.

rush$reset()

Failed Workers

Failed workers started with mirai are also detected by the controller. We simulate a segfault on the worker by killing the worker process.

rush = rsh(network = "test-failed-mirai-workers")

wl_failed_worker = function(rush) {
  tools::pskill(Sys.getpid(), tools::SIGKILL)
}

Start two new daemons.

Start two remote workers.

worker_ids = rush$start_remote_workers(
  worker_loop = wl_failed_worker,
  n_workers = 2)
rush$detect_lost_workers()

A segmentation fault also terminates the associated mirai daemon. Therefore, it is necessary to restart the daemon before restarting the workers.

Workers can then be restarted using the $restart_workers() method.

rush$restart_workers(worker_ids)

Script Workers

The most flexible method for starting workers is to use a script generated with the $worker_script() method. This script can be executed either on the local machine or on a remote machine. The only requirement is that the machine is capable of running R scripts and has access to the Redis database.

rush = rsh(
  network = "test-script-workers",
  config = redux::redis_config())

rush$worker_script(
  worker_loop = wl_random_search)

Error Handling

Workers started with processx or mirai are monitored by these packages. The heartbeat is a mechanism to monitor the status of script workers. The mechanism consists of a heartbeat key with a set expiration timeout and a dedicated heartbeat process that refreshes the timeout periodically. The heartbeat process is started with callr and is linked to the main process of the worker. In the event of a worker’s failure, the associated heartbeat process also ceases to function, thus halting the renewal of the timeout. The absence of the heartbeat key acts as an indicator to the controller that the worker is no longer operational. Consequently, the controller updates the worker’s status to "lost".

Heartbeats are initiated upon worker startup by specifying the heartbeat_period and heartbeat_expire parameters. The heartbeat_period defines the frequency at which the heartbeat process will update the timeout. The heartbeat_expire sets the duration, in seconds, before the heartbeat key expires. The expiration time should be set to a value greater than the heartbeat period to ensure that the heartbeat process has sufficient time to refresh the timeout.

rush$worker_script(
  worker_loop = wl_random_search,
  heartbeat_period = 1,
  heartbeat_expire = 3)

The heartbeat process is also the only way to kill a script worker. The $stop_workers(type = "kill") method pushes a kill signal to the heartbeat process. The heartbeat process terminates the main process of the worker.