library(rush)
wl_random_search = function(rush) {
while(rush$n_finished_tasks < 100) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
rush = rsh(
network = "test-network",
config = redux::redis_config())
A rush network consists of thousands of workers started on different machines. This article explains how to start workers in a rush network. Rush offers three ways to start workers: local workers with processx
, remote workers with mirai
and script workers.
Local Workers
We use the random search example from the Rush article to demonstrate how the controller works.
Start Workers
Workers may be initiated locally or remotely. Local workers run on the same machine as the controller, whereas remote workers operate on separate machines. The $start_local_workers()
method initiates local workers using the processx
package. The n_workers
parameter specifies the number of workers to launch. The worker_loop
parameter defines the function executed by each worker. If the worker_loop
function depends on global variables, these can be provided via the globals
parameter. Required packages for the worker_loop
can be specified using the packages
parameter.
worker_ids = rush$start_local_workers(
worker_loop = wl_random_search,
n_workers = 4)
Worker information is accessible through the $worker_info
method. Each worker is identified by a worker_id
. The pid
field denotes the process identifier, and the hostname
field indicates the machine name. The remote
column specifies whether the worker is remote, and the heartbeat
column indicates the presence of a heartbeat process. Heartbeat mechanisms are discussed in the Error Handling section.
rush$worker_info
worker_id pid remote hostname heartbeat
<char> <int> <lgcl> <char> <lgcl>
1: expanded_c... 9071 FALSE pkrvmf6wy0... FALSE
2: pseudolegi... 9084 FALSE pkrvmf6wy0... FALSE
3: halfround_... 9057 FALSE pkrvmf6wy0... FALSE
4: idiosyncra... 9072 FALSE pkrvmf6wy0... FALSE
The $worker_states
method returns the current state of each worker. Possible states include "running"
, "terminated"
, "killed"
, and "lost"
.
rush$worker_states
state worker_id
<char> <char>
1: running expanded_c...
2: running pseudolegi...
3: running halfround_...
4: running idiosyncra...
Additional workers may be added to the network at any time.
rush$start_local_workers(
worker_loop = wl_random_search,
n_workers = 2)
rush$worker_states
state worker_id
<char> <char>
1: running expanded_c...
2: running pseudolegi...
3: running halfround_...
4: running idiosyncra...
5: running duskish_ko...
6: running puppyish_m...
Rush Plan
When rush
is integrated into a third-party package, the starting of workers is typically managed by the package itself. In such cases, users may configure worker options by invoking the rush_plan()
function. This function allows explicit specification of the number of workers, the type of workers, and the configuration for connecting to the Redis database.
rush_plan(n_workers = 4, config = redux::redis_config(), worker_type = "local")
Globals
Global variables are those defined in the global environment that must also be accessible to workers. They are specified by name in the $start_local_workers()
method. These variables are serialized and stored in the Redis database. Upon initialization, each worker retrieves the serialized globals from the database and assigns them to its own global environment.
Note
The maximum size of a Redis string is 512 MiB. If the serialized globals required by the worker loop exceed this limit, Rush will raise an error. In scenarios where both the controller and the workers have access to a shared file system, Rush will instead write large objects to disk. The
large_objects_path
argument ofrush_plan()
specifies the directory used for storing such large objects.
Stop Worker
Workers can be stopped individually or all at once. To terminate a specific worker, the $stop_workers()
method is invoked with the corresponding worker_ids
argument.
rush$stop_workers(worker_ids = worker_ids[1])
ERROR [10:02:37.641] [rush] Failed to kill worker 'halfround_iaerismetalmark'
This command terminates the selected worker process.
rush$worker_states
state worker_id
<char> <char>
1: running expanded_c...
2: running pseudolegi...
3: running idiosyncra...
4: running duskish_ko...
5: running puppyish_m...
6: killed halfround_...
To stop all workers and reset the network, the $reset()
method is used.
rush$reset()
ERROR [10:02:38.743] [rush] Failed to kill worker 'expanded_cassowary'
ERROR [10:02:38.750] [rush] Failed to kill worker 'pseudolegislative_vicuna'
ERROR [10:02:38.751] [rush] Failed to kill worker 'idiosyncratic_indianpalmsquirrel'
ERROR [10:02:38.752] [rush] Failed to kill worker 'duskish_kouprey'
ERROR [10:02:38.753] [rush] Failed to kill worker 'puppyish_murrelet'
In the preceding example, the optimization process was stopped after 100 iterations. Alternatively, it is possible to terminate the optimization by sending a terminate signal. The worker loop must implement the rush$terminated
flag. Then the rush controller can terminate the optimization.
wl_random_search = function(rush) {
while(!rush$terminated) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
rush = rsh(
network = "test-random-search-terminate",
config = redux::redis_config())
rush$start_local_workers(
worker_loop = wl_random_search,
n_workers = 2)
The random search proceeds as usual.
rush$fetch_finished_tasks()
Null data.table (0 rows and 0 cols)
To terminate the optimization, the following command is used.
rush$stop_workers(type = "terminate")
The workers are terminated.
rush$worker_states
state worker_id
<char> <char>
1: running leather_pi...
2: running trypanopho...
Failed Workers
We simulate a segfault on the worker by killing the worker process.
rush = rsh(network = "test-failed-workers")
wl_failed_worker = function(rush) {
tools::pskill(Sys.getpid(), tools::SIGKILL)
}
Workers are then started using the faulty worker loop.
worker_ids = rush$start_local_workers(
worker_loop = wl_failed_worker,
n_workers = 2)
The $detect_lost_workers()
method is used to identify failed workers. When a worker is detected, its state is updated to "lost"
.
rush$detect_lost_workers()
ERROR [10:02:41.905] [rush] Lost worker 'unanguished_dunnart'
ERROR [10:02:41.906] [rush] Lost worker 'thalassophilic_boaconstrictor'
rush$worker_states
state worker_id
<char> <char>
1: lost unanguishe...
2: lost thalassoph...
Restart Workers
Workers that have failed can be restarted using the $restart_workers()
method. This method accepts the worker_ids
of the workers to be restarted.
rush$restart_workers(worker_ids = worker_ids[1])
The first worker is restarted and its state is updated to "running"
.
rush$worker_states
state worker_id
<char> <char>
1: running unanguishe...
2: lost thalassoph...
Log Messages
Workers record all messages generated using the lgr
package to the database. The lgr_thresholds
argument of $start_local_workers()
specifies the logging level for each logger, e.g. c(rush = "debug")
. While enabling log message storage introduces a minor performance overhead, it is valuable for debugging purposes. By default, log messages are not stored. To enable logging, workers are started with the desired logging threshold.
rush = rsh(network = "test-log-messages")
wl_log_message = function(rush) {
lg = lgr::get_logger("rush")
lg$info("This is an info message from worker %s", rush$worker_id)
}
rush$start_local_workers(
worker_loop = wl_log_message,
n_workers = 2,
lgr_thresholds = c(rush = "info"))
The most recent log messages can be printed as follows.
Sys.sleep(1)
rush$print_log()
To retrieve all log entries, use the $read_log()
method.
rush$read_log()
Key: <timestamp>
worker_id level timestamp logger caller msg
<char> <int> <POSc> <char> <char> <char>
1: homebrewed... 400 2025-05-30 10:02:43 rush start_args... [rush] Thi...
2: captivatin... 400 2025-05-30 10:02:43 rush start_args... [rush] Thi...
We reset the network.
rush$reset()
Remote Workers
The mirai
package provides a straightforward mechanism for launching rush
workers on remote machines. mirai
manages daemons, which are persistent background processes capable of executing arbitrary R code in parallel. These daemons communicate with the main session.
Start Workers
Usually mirai
is used to start daemons on remote machines but it can also be used to start local daemons.
[1] 0
status()
$connections
[1] 0
$daemons
[1] "tcp://10.1.0.131:39297"
$mirai
awaiting executing completed
0 0 0
Daemons may also be launched on a remote machine via SSH.
daemons(
n = 2L,
url = host_url(port = 5555),
remote = ssh_config(remotes = "ssh://10.75.32.90")
)
On high performance computing clusters, daemons can be started using a scheduler.
We define a worker loop.
wl_random_search = function(rush) {
while(rush$n_finished_tasks < 100) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
rush = rsh(
network = "test-network",
config = redux::redis_config())
We start the daemons. daemons(0)
stops all daemons.
After the daemons are started, we can start the remote workers.
worker_ids = rush$start_remote_workers(
worker_loop = wl_random_search,
n_workers = 2)
rush$worker_info
worker_id pid remote hostname heartbeat
<char> <int> <lgcl> <char> <lgcl>
1: matricidal... 9602 TRUE pkrvmf6wy0... FALSE
2: trueblue_s... 9604 TRUE pkrvmf6wy0... FALSE
Failed Workers
Failed workers started with mirai
are also detected by the controller. We simulate a segfault on the worker by killing the worker process.
daemons(0)
[1] 0
daemons(2)
[1] 2
rush = rsh(network = "test-failed-mirai-workers")
wl_failed_worker = function(rush) {
tools::pskill(Sys.getpid(), tools::SIGKILL)
}
worker_ids = rush$start_remote_workers(
worker_loop = wl_failed_worker,
n_workers = 2)
rush$detect_lost_workers()
ERROR [10:02:48.998] [rush] Lost worker 'halfsyllabled_nandoo'
ERROR [10:02:48.999] [rush] 19
ERROR [10:02:49.012] [rush] Lost worker 'cognitive_indiancow'
ERROR [10:02:49.013] [rush] 19
A segmentation fault also terminates the associated mirai
daemon. Therefore, it is necessary to restart the daemon before restarting the workers.
daemons(0)
[1] 0
daemons(2)
[1] 2
status()
$connections
[1] 2
$daemons
[1] "abstract://fd6dfe6b3526fda55c1c4eb1"
$mirai
awaiting executing completed
0 0 0
Workers can then be restarted using the $restart_workers()
method.
rush$restart_workers(worker_ids)
Script Workers
The most flexible method for starting workers is to use a script generated with the $worker_script()
method. This script can be executed either on the local machine or on a remote machine. The only requirement is that the machine is capable of running R scripts and has access to the Redis database.
rush = rsh(
network = "test-script-workers",
config = redux::redis_config())
rush$worker_script(
worker_loop = wl_random_search)
Error Handling
Workers started with processx
or mirai
are monitored by these packages. The heartbeat is a mechanism to monitor the status of script workers. The mechanism consists of a heartbeat key with a set expiration timeout and a dedicated heartbeat process that refreshes the timeout periodically. The heartbeat process is started with callr
and is linked to the main process of the worker. In the event of a worker’s failure, the associated heartbeat process also ceases to function, thus halting the renewal of the timeout. The absence of the heartbeat key acts as an indicator to the controller that the worker is no longer operational. Consequently, the controller updates the worker’s status to "lost"
.
Heartbeats are initiated upon worker startup by specifying the heartbeat_period
and heartbeat_expire
parameters. The heartbeat_period
defines the frequency at which the heartbeat process will update the timeout. The heartbeat_expire
sets the duration, in seconds, before the heartbeat key expires. The expiration time should be set to a value greater than the heartbeat period to ensure that the heartbeat process has sufficient time to refresh the timeout.
rush$worker_script(
worker_loop = wl_random_search,
heartbeat_period = 1,
heartbeat_expire = 3)
The heartbeat process is also the only way to kill a script worker. The $stop_workers(type = "kill")
method pushes a kill signal to the heartbeat process. The heartbeat process terminates the main process of the worker.