Skip to contents

The Rush manager class is responsible for starting, monitoring, and stopping workers within the network. This vignette describes the three mechanisms for starting workers: mirai daemons, local processx processes, and portable R scripts. We advise reading the tutorial first. We use the random search example from this vignette to demonstrate the manager.

library(rush)

branin = function(x1, x2) {
  (x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 +
    10 * (1 - 1 / (8 * pi)) * cos(x1) +
    10
}

wl_random_search = function(rush, branin) {
  while (TRUE) {
    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))
    ys = list(y = branin(xs$x1, xs$x2))
    rush$finish_tasks(key, yss = list(ys))
  }
}

config = redux::redis_config()

rush = rsh(
  network = "random-search-network",
  config = config)

Starting Workers with mirai

The mirai package provides a mechanism for launching rush workers on local and remote machines. mirai daemons are persistent background processes that execute arbitrary R code in parallel. Daemons are started using mirai::daemons(). For local daemons, the number of workers is specified.

After the daemons are started, workers are launched with the $start_workers() method. The $wait_for_workers() method blocks until all workers have registered in the network.

worker_ids = rush$start_workers(
  worker_loop = wl_random_search,
  n_workers = 2,
  branin = branin)

rush$wait_for_workers(worker_ids = worker_ids)

Worker information is accessible through the $worker_info field. Each worker is identified by a worker_id. The pid field denotes the process identifier and the hostname field indicates the machine name. The remote column specifies whether the worker is remote and the heartbeat column indicates the presence of a heartbeat process. The state column reflects the current worker state, which can be "running" or "terminated".

rush$worker_info
       worker_id   pid      hostname heartbeat   state
          <char> <int>        <char>    <lgcl>  <char>
1: pseudodemo...  9171 runnervmmk...     FALSE running
2: toy_waters...  9169 runnervmmk...     FALSE running

Stopping Workers

Workers can be stopped individually or all at once. To terminate a specific worker, the $stop_workers() method is called with the corresponding worker_ids.

rush$stop_workers(worker_ids = worker_ids[1])
rush$worker_info
       worker_id   pid      hostname heartbeat      state
          <char> <int>        <char>    <lgcl>     <char>
1: toy_waters...  9169 runnervmmk...     FALSE    running
2: pseudodemo...  9171 runnervmmk...     FALSE terminated

To stop all workers and reset the network, the $reset() method is used.

rush$reset()

Instead of killing the worker processes, the manager can send a terminate signal. The worker then terminates after completing its current task. The worker loop must check the rush$terminated flag.

wl_random_search = function(rush, branin) {
  while (!rush$terminated) {
    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))
    ys = list(y = branin(xs$x1, xs$x2))
    rush$finish_tasks(key, yss = list(ys))
  }
}

rush = rsh(
  network = "random-search-network",
  config = redux::redis_config())

rush$start_workers(
  worker_loop = wl_random_search,
  n_workers = 2,
  branin = branin)

rush$wait_for_workers(2)
rush$fetch_finished_tasks()
         worker_id         x1        x2          y          keys
            <char>      <num>     <num>      <num>        <char>
  1: achluophob... -4.2242665  9.118888  40.418686 5be3307f-2...
  2: achluophob...  5.7049550 10.225472 100.863565 c1644181-7...
  3: achluophob...  1.6188337 10.569969  55.886133 f6c0504a-3...
  4: achluophob...  4.2983776  9.215189  64.957679 fbe5f3c4-2...
  5: achluophob...  3.6801938  3.394105   4.012277 287bcc11-6...
 ---
107: achluophob...  7.9427811  2.937133  11.189069 8c42f29e-4...
108: hazardous_... -2.0685907  1.445732  75.963698 6b8d8e64-d...
109: achluophob...  4.1088898 11.212871  96.161348 b8c4b8b8-7...
110: hazardous_... -0.2911462  8.600941  23.720511 0a6daba3-4...
111: achluophob...  6.2416310  3.472982  25.230089 1d8fa495-5...

The $stop_workers() method with type = "terminate" sends the terminate signal.

rush$stop_workers(type = "terminate")
rush$worker_info
       worker_id   pid      hostname heartbeat      state
          <char> <int>        <char>    <lgcl>     <char>
1: achluophob...  9171 runnervmmk...     FALSE terminated
2: hazardous_...  9169 runnervmmk...     FALSE terminated
rush$reset()

Failed Workers

Failed workers started with mirai are automatically detected by the manager. We simulate a worker crash by killing the worker process.

rush = rsh(network_id = "random-search-network")

wl_failed_worker = function(rush) {
  tools::pskill(Sys.getpid(), tools::SIGKILL)
}
mirai::daemons(n = 2L)
worker_ids = rush$start_workers(
  worker_loop = wl_failed_worker,
  n_workers = 2L)
rush$detect_lost_workers()
[1] "catlike_megalosaurus" "prolific_panda"      
rush$reset()

Remote Workers

Daemons can also be launched on remote machines via SSH.

mirai::daemons(
  n = 2L,
  url = host_url(port = 5555),
  remote = ssh_config(remotes = "ssh://10.75.32.90")
)

On high-performance computing clusters, daemons can be started using a scheduler.

mirai::daemons(
  n = 2L,
  url = host_url(),
  remote = remote_config(
    command = "sbatch",
    args = c("--mem 512", "-n 1", "--wrap", "."),
    rscript = file.path(R.home("bin"), "Rscript"),
    quote = TRUE
  )
)

Rush Plan

When rush is integrated into a third-party package, worker startup is typically managed by the package itself. Users can configure worker options by calling the rush_plan() function, which specifies the number of workers, the worker type, and the Redis configuration.

rush_plan(n_workers = 2, config = redux::redis_config(), worker_type = "mirai")

Passing Data to Workers

Arguments required by the worker loop are passed as named arguments to $start_workers(). These arguments are serialized and stored in the Redis database as part of the worker configuration. Upon initialization, each worker retrieves and deserializes the configuration before executing the worker loop.

Note

The maximum size of a Redis string is 512 MiB. If the serialized worker configuration exceeds this limit, rush raises an error. When both the manager and the workers share access to a file system, rush will instead write large objects to disk. The large_objects_path argument of rush_plan() specifies the directory used for storing such objects.

Log Messages

Workers can record messages generated via the lgr package to the database. The lgr_thresholds argument of $start_local_workers() specifies the logging level for each logger, e.g. c("mlr3/rush" = "debug"). Logging introduces a minor performance overhead and is disabled by default.

rush = rsh(network_id = "random-search-network")

wl_log_message = function(rush) {
  lg = lgr::get_logger("mlr3/rush")
  lg$info("This is an info message from worker %s", rush$worker_id)
}

rush$start_local_workers(
  worker_loop = wl_log_message,
  n_workers = 2,
  lgr_thresholds = c(rush = "info"))

The most recent log messages can be retrieved as follows.

Sys.sleep(1)
rush$print_log()

To retrieve all log entries, use the $read_log() method.

rush$read_log()
Null data.table (0 rows and 0 cols)
rush$reset()

Starting Local Workers

The $start_local_workers() method launches workers using the processx package on the local machine. The n_workers argument specifies the number of workers to launch and worker_loop defines the function executed by each worker. Additional arguments required by the worker loop are passed as named arguments to $start_local_workers().

rush = rsh(
  network = "random-search-network",
  config = redux::redis_config())

worker_ids = rush$start_local_workers(
  worker_loop = wl_random_search,
  branin = branin,
  n_workers = 2)

rush$wait_for_workers(worker_ids = worker_ids)
rush$worker_info
       worker_id   pid      hostname heartbeat   state
          <char> <int>        <char>    <lgcl>  <char>
1: foliaceous...  9421 runnervmmk...     FALSE running
2:     edgy_degu  9423 runnervmmk...     FALSE running

Additional workers can be added to the network at any time.

rush$start_local_workers(
  worker_loop = wl_random_search,
  branin = branin,
  n_workers = 2)

rush$wait_for_workers(worker_ids = worker_ids)
rush$worker_info
       worker_id   pid      hostname heartbeat   state
          <char> <int>        <char>    <lgcl>  <char>
1: foliaceous...  9421 runnervmmk...     FALSE running
2:     edgy_degu  9423 runnervmmk...     FALSE running
rush$reset()

Script Workers

The $worker_script() method generates an R script that can be executed on any machine with access to the Redis database. This is the most flexible mechanism for starting workers, as it imposes no constraints on the execution environment.

rush = rsh(
  network = "random-search-network",
  config = redux::redis_config())

rush$worker_script(
  worker_loop = wl_random_search)
[1] "Rscript -e \"rush::start_worker(network_id = 'random-search-network', config = list(scheme = 'redis', host = '127.0.0.1', port = '6379'))\""

Error Handling

Workers started with processx and mirai are monitored automatically by the respective packages. Script workers require an explicit heartbeat mechanism to detect failures. The heartbeat consists of a Redis key with a set expiration timeout, refreshed periodically by a background process linked to the main worker process. If the worker fails, the heartbeat process also ceases, the key expires, and the manager marks the worker as "terminated".

The heartbeat_period and heartbeat_expire arguments configure the heartbeat at startup. The heartbeat_period defines the refresh interval in seconds; heartbeat_expire sets the expiration duration, which must exceed the heartbeat period.

rush$worker_script(
  worker_loop = wl_random_search,
  heartbeat_period = 1,
  heartbeat_expire = 3)
[1] "Rscript -e \"rush::start_worker(network_id = 'random-search-network', config = list(scheme = 'redis', host = '127.0.0.1', port = '6379'), heartbeat_period = 1, heartbeat_expire = 3)\""

To kill a script worker, the $stop_workers(type = "kill") method pushes a kill signal to the heartbeat process, which then terminates the main worker process.