library(rush)
branin = function(x1, x2) {
(x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 +
10 * (1 - 1 / (8 * pi)) * cos(x1) +
10
}
wl_random_search = function(rush, branin) {
while (TRUE) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$finish_tasks(key, yss = list(ys))
}
}
config = redux::redis_config()
rush = rsh(
network = "random-search-network",
config = config)The Rush manager class is responsible for starting, monitoring, and stopping workers within the network. This vignette describes the three mechanisms for starting workers: mirai daemons, local processx processes, and portable R scripts. We advise reading the tutorial first. We use the random search example from this vignette to demonstrate the manager.
Starting Workers with mirai
The mirai package provides a mechanism for launching rush workers on local and remote machines. mirai daemons are persistent background processes that execute arbitrary R code in parallel. Daemons are started using mirai::daemons(). For local daemons, the number of workers is specified.
After the daemons are started, workers are launched with the $start_workers() method. The $wait_for_workers() method blocks until all workers have registered in the network.
worker_ids = rush$start_workers(
worker_loop = wl_random_search,
n_workers = 2,
branin = branin)
rush$wait_for_workers(worker_ids = worker_ids)Worker information is accessible through the $worker_info field. Each worker is identified by a worker_id. The pid field denotes the process identifier and the hostname field indicates the machine name. The remote column specifies whether the worker is remote and the heartbeat column indicates the presence of a heartbeat process. The state column reflects the current worker state, which can be "running" or "terminated".
rush$worker_info worker_id pid hostname heartbeat state
<char> <int> <char> <lgcl> <char>
1: pseudodemo... 9171 runnervmmk... FALSE running
2: toy_waters... 9169 runnervmmk... FALSE running
Stopping Workers
Workers can be stopped individually or all at once. To terminate a specific worker, the $stop_workers() method is called with the corresponding worker_ids.
rush$stop_workers(worker_ids = worker_ids[1])
rush$worker_info worker_id pid hostname heartbeat state
<char> <int> <char> <lgcl> <char>
1: toy_waters... 9169 runnervmmk... FALSE running
2: pseudodemo... 9171 runnervmmk... FALSE terminated
To stop all workers and reset the network, the $reset() method is used.
rush$reset()Instead of killing the worker processes, the manager can send a terminate signal. The worker then terminates after completing its current task. The worker loop must check the rush$terminated flag.
wl_random_search = function(rush, branin) {
while (!rush$terminated) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$finish_tasks(key, yss = list(ys))
}
}
rush = rsh(
network = "random-search-network",
config = redux::redis_config())
rush$start_workers(
worker_loop = wl_random_search,
n_workers = 2,
branin = branin)
rush$wait_for_workers(2)
rush$fetch_finished_tasks() worker_id x1 x2 y keys
<char> <num> <num> <num> <char>
1: achluophob... -4.2242665 9.118888 40.418686 5be3307f-2...
2: achluophob... 5.7049550 10.225472 100.863565 c1644181-7...
3: achluophob... 1.6188337 10.569969 55.886133 f6c0504a-3...
4: achluophob... 4.2983776 9.215189 64.957679 fbe5f3c4-2...
5: achluophob... 3.6801938 3.394105 4.012277 287bcc11-6...
---
107: achluophob... 7.9427811 2.937133 11.189069 8c42f29e-4...
108: hazardous_... -2.0685907 1.445732 75.963698 6b8d8e64-d...
109: achluophob... 4.1088898 11.212871 96.161348 b8c4b8b8-7...
110: hazardous_... -0.2911462 8.600941 23.720511 0a6daba3-4...
111: achluophob... 6.2416310 3.472982 25.230089 1d8fa495-5...
The $stop_workers() method with type = "terminate" sends the terminate signal.
rush$stop_workers(type = "terminate")
rush$worker_info worker_id pid hostname heartbeat state
<char> <int> <char> <lgcl> <char>
1: achluophob... 9171 runnervmmk... FALSE terminated
2: hazardous_... 9169 runnervmmk... FALSE terminated
rush$reset()Failed Workers
Failed workers started with mirai are automatically detected by the manager. We simulate a worker crash by killing the worker process.
rush = rsh(network_id = "random-search-network")
wl_failed_worker = function(rush) {
tools::pskill(Sys.getpid(), tools::SIGKILL)
}
mirai::daemons(n = 2L)
worker_ids = rush$start_workers(
worker_loop = wl_failed_worker,
n_workers = 2L)
rush$detect_lost_workers()[1] "catlike_megalosaurus" "prolific_panda"
rush$reset()Remote Workers
Daemons can also be launched on remote machines via SSH.
mirai::daemons(
n = 2L,
url = host_url(port = 5555),
remote = ssh_config(remotes = "ssh://10.75.32.90")
)On high-performance computing clusters, daemons can be started using a scheduler.
Rush Plan
When rush is integrated into a third-party package, worker startup is typically managed by the package itself. Users can configure worker options by calling the rush_plan() function, which specifies the number of workers, the worker type, and the Redis configuration.
rush_plan(n_workers = 2, config = redux::redis_config(), worker_type = "mirai")Passing Data to Workers
Arguments required by the worker loop are passed as named arguments to $start_workers(). These arguments are serialized and stored in the Redis database as part of the worker configuration. Upon initialization, each worker retrieves and deserializes the configuration before executing the worker loop.
Note
The maximum size of a Redis string is 512 MiB. If the serialized worker configuration exceeds this limit,
rushraises an error. When both the manager and the workers share access to a file system,rushwill instead write large objects to disk. Thelarge_objects_pathargument ofrush_plan()specifies the directory used for storing such objects.
Log Messages
Workers can record messages generated via the lgr package to the database. The lgr_thresholds argument of $start_local_workers() specifies the logging level for each logger, e.g. c("mlr3/rush" = "debug"). Logging introduces a minor performance overhead and is disabled by default.
rush = rsh(network_id = "random-search-network")
wl_log_message = function(rush) {
lg = lgr::get_logger("mlr3/rush")
lg$info("This is an info message from worker %s", rush$worker_id)
}
rush$start_local_workers(
worker_loop = wl_log_message,
n_workers = 2,
lgr_thresholds = c(rush = "info"))The most recent log messages can be retrieved as follows.
Sys.sleep(1)
rush$print_log()To retrieve all log entries, use the $read_log() method.
rush$read_log()Null data.table (0 rows and 0 cols)
rush$reset()Starting Local Workers
The $start_local_workers() method launches workers using the processx package on the local machine. The n_workers argument specifies the number of workers to launch and worker_loop defines the function executed by each worker. Additional arguments required by the worker loop are passed as named arguments to $start_local_workers().
rush = rsh(
network = "random-search-network",
config = redux::redis_config())
worker_ids = rush$start_local_workers(
worker_loop = wl_random_search,
branin = branin,
n_workers = 2)
rush$wait_for_workers(worker_ids = worker_ids)
rush$worker_info worker_id pid hostname heartbeat state
<char> <int> <char> <lgcl> <char>
1: foliaceous... 9421 runnervmmk... FALSE running
2: edgy_degu 9423 runnervmmk... FALSE running
Additional workers can be added to the network at any time.
rush$start_local_workers(
worker_loop = wl_random_search,
branin = branin,
n_workers = 2)
rush$wait_for_workers(worker_ids = worker_ids)
rush$worker_info worker_id pid hostname heartbeat state
<char> <int> <char> <lgcl> <char>
1: foliaceous... 9421 runnervmmk... FALSE running
2: edgy_degu 9423 runnervmmk... FALSE running
rush$reset()Script Workers
The $worker_script() method generates an R script that can be executed on any machine with access to the Redis database. This is the most flexible mechanism for starting workers, as it imposes no constraints on the execution environment.
rush = rsh(
network = "random-search-network",
config = redux::redis_config())
rush$worker_script(
worker_loop = wl_random_search)[1] "Rscript -e \"rush::start_worker(network_id = 'random-search-network', config = list(scheme = 'redis', host = '127.0.0.1', port = '6379'))\""
Error Handling
Workers started with processx and mirai are monitored automatically by the respective packages. Script workers require an explicit heartbeat mechanism to detect failures. The heartbeat consists of a Redis key with a set expiration timeout, refreshed periodically by a background process linked to the main worker process. If the worker fails, the heartbeat process also ceases, the key expires, and the manager marks the worker as "terminated".
The heartbeat_period and heartbeat_expire arguments configure the heartbeat at startup. The heartbeat_period defines the refresh interval in seconds; heartbeat_expire sets the expiration duration, which must exceed the heartbeat period.
rush$worker_script(
worker_loop = wl_random_search,
heartbeat_period = 1,
heartbeat_expire = 3)[1] "Rscript -e \"rush::start_worker(network_id = 'random-search-network', config = list(scheme = 'redis', host = '127.0.0.1', port = '6379'), heartbeat_period = 1, heartbeat_expire = 3)\""
To kill a script worker, the $stop_workers(type = "kill") method pushes a kill signal to the heartbeat process, which then terminates the main worker process.