library(rush)
wl_random_search = function(rush, branin) {
while(TRUE) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
rush = rsh(
network = "test-network",
config = redux::redis_config())
branin = function(x1, x2) {
(x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}A rush network consists of thousands of workers started on different machines. This article explains how to start workers in a rush network. Rush offers three ways to start workers: local workers with processx, remote workers with mirai and script workers.
Local Workers
We use the random search example from the Rush article to demonstrate how the controller works.
Start Workers
Workers may be initiated locally or remotely. Local workers run on the same machine as the controller, whereas remote workers operate on separate machines. The $start_local_workers() method initiates local workers using the processx package. The n_workers parameter specifies the number of workers to launch. The worker_loop parameter defines the function executed by each worker. If the worker_loop function depends on additional objects, these can be passed as arguments to worker_loop. Required packages for the worker_loop can be specified using the packages parameter.
worker_ids = rush$start_local_workers(
worker_loop = wl_random_search,
branin = branin,
n_workers = 2)Worker information is accessible through the $worker_info method. Each worker is identified by a worker_id. The pid field denotes the process identifier, and the hostname field indicates the machine name. The remote column specifies whether the worker is remote, and the heartbeat column indicates the presence of a heartbeat process. The state column indicates the current state of the worker. Possible states include "running", "terminated", "killed", and "lost". Heartbeat mechanisms are discussed in the Error Handling section.
rush$worker_info worker_id pid remote hostname heartbeat state
<char> <int> <lgcl> <char> <lgcl> <char>
1: loopy_quin... 10010 FALSE runnervmmt... FALSE running
2: entertaini... 10021 FALSE runnervmmt... FALSE running
Additional workers may be added to the network at any time.
rush$start_local_workers(
worker_loop = wl_random_search,
branin = branin,
n_workers = 2)
rush$worker_info worker_id pid remote hostname heartbeat state
<char> <int> <lgcl> <char> <lgcl> <char>
1: loopy_quin... 10010 FALSE runnervmmt... FALSE running
2: entertaini... 10021 FALSE runnervmmt... FALSE running
3: selfevolve... 10084 FALSE runnervmmt... FALSE running
4: unpronounc... 10086 FALSE runnervmmt... FALSE running
Rush Plan
When rush is integrated into a third-party package, the starting of workers is typically managed by the package itself. In such cases, users may configure worker options by invoking the rush_plan() function. This function allows explicit specification of the number of workers, the type of workers, and the configuration for connecting to the Redis database.
rush_plan(n_workers = 2, config = redux::redis_config(), worker_type = "local")Passing data to workers
Objects required by the worker loop can be passed as arguments to $start_local_workers() / $start_remote_workers(). These arguments are serialized and stored in the Redis database as part of the worker configuration. Upon initialization, each worker retrieves and unserializes the worker configuration before invoking the worker loop.
Note
The maximum size of a Redis string is 512 MiB. If the serialized worker configuration exceeds this limit, Rush will raise an error. In scenarios where both the controller and the workers have access to a shared file system, Rush will instead write large objects to disk. The
large_objects_pathargument ofrush_plan()specifies the directory used for storing such large objects.
Stop Worker
Workers can be stopped individually or all at once. To terminate a specific worker, the $stop_workers() method is invoked with the corresponding worker_ids argument.
rush$stop_workers(worker_ids = worker_ids[1])This command terminates the selected worker process.
rush$worker_info worker_id pid remote hostname heartbeat state
<char> <int> <lgcl> <char> <lgcl> <char>
1: entertaini... 10021 FALSE runnervmmt... FALSE running
2: selfevolve... 10084 FALSE runnervmmt... FALSE running
3: unpronounc... 10086 FALSE runnervmmt... FALSE running
4: loopy_quin... 10010 FALSE runnervmmt... FALSE killed
To stop all workers and reset the network, the $reset() method is used.
rush$reset()Instead of killing the worker processes, it is also possible to send a terminate signal to the worker. The worker then terminates after it has finished the current task. The worker loop must implement the rush$terminated flag. Then the rush controller can terminate the optimization.
wl_random_search = function(rush, branin) {
while(!rush$terminated) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
rush = rsh(
network = "test-random-search-terminate",
config = redux::redis_config())
rush$start_local_workers(
worker_loop = wl_random_search,
n_workers = 2,
branin = branin)The random search proceeds as usual.
rush$fetch_finished_tasks() x1 x2 y pid worker_id keys
<num> <num> <num> <int> <char> <char>
1: 7.0888916 0.07065129 17.947437 10159 exilable_v... 4dd3f8df-a...
2: -0.6849254 2.77981585 36.541105 10159 exilable_v... bde40e60-e...
3: 7.5997890 12.31104774 132.212092 10159 exilable_v... 52e30de7-a...
4: 8.8313747 0.45771748 4.479984 10159 exilable_v... 17ede0b4-a...
5: -0.4790338 0.73898474 55.160909 10159 exilable_v... 94763e07-1...
---
578: 4.8946469 10.31117015 92.853754 10159 exilable_v... 9a8bca88-1...
579: 3.7721624 14.69661569 167.675632 10157 mad_montan... d3d7b914-e...
580: 0.6445911 9.16144724 34.762633 10159 exilable_v... f0a027f5-c...
581: 8.5444915 6.51198198 25.781063 10157 mad_montan... 376a8120-4...
582: 3.4271851 9.43639706 55.156537 10159 exilable_v... 67cdb02d-0...
To terminate the optimization, the following command is used.
rush$stop_workers(type = "terminate")The workers are terminated.
rush$worker_info worker_id pid remote hostname heartbeat state
<char> <int> <lgcl> <char> <lgcl> <char>
1: mad_montan... 10157 FALSE runnervmmt... FALSE terminated
2: exilable_v... 10159 FALSE runnervmmt... FALSE terminated
Failed Workers
We simulate a segfault on the worker by killing the worker process.
rush = rsh(network = "test-failed-workers")
wl_failed_worker = function(rush) {
tools::pskill(Sys.getpid(), tools::SIGKILL)
}Workers are then started using the faulty worker loop.
worker_ids = rush$start_local_workers(
worker_loop = wl_failed_worker,
n_workers = 2)The $detect_lost_workers() method is used to identify failed workers. When a worker is detected, its state is updated to "lost".
rush$detect_lost_workers()
rush$worker_info worker_id pid remote hostname heartbeat state
<char> <int> <lgcl> <char> <lgcl> <char>
1: bacteriolo... 10235 FALSE runnervmmt... FALSE lost
2: light_grea... 10233 FALSE runnervmmt... FALSE lost
Restart Workers
Workers that have failed can be restarted using the $restart_workers() method. This method accepts the worker_ids of the workers to be restarted.
rush$restart_workers(worker_ids = worker_ids[1])The first worker is restarted and its state is updated to "running".
rush$worker_info worker_id pid remote hostname heartbeat state
<char> <int> <lgcl> <char> <lgcl> <char>
1: light_grea... 10305 FALSE runnervmmt... FALSE running
2: bacteriolo... 10235 FALSE runnervmmt... FALSE lost
Log Messages
Workers record all messages generated using the lgr package to the database. The lgr_thresholds argument of $start_local_workers() specifies the logging level for each logger, e.g. c("mlr3/rush" = "debug"). While enabling log message storage introduces a minor performance overhead, it is valuable for debugging purposes. By default, log messages are not stored. To enable logging, workers are started with the desired logging threshold.
rush = rsh(network = "test-log-messages")
wl_log_message = function(rush) {
lg = lgr::get_logger("mlr3/rush")
lg$info("This is an info message from worker %s", rush$worker_id)
}
rush$start_local_workers(
worker_loop = wl_log_message,
n_workers = 2,
lgr_thresholds = c(rush = "info"))The most recent log messages can be printed as follows.
Sys.sleep(1)
rush$print_log()To retrieve all log entries, use the $read_log() method.
rush$read_log()Null data.table (0 rows and 0 cols)
We reset the network.
rush$reset()Remote Workers
The mirai package provides a straightforward mechanism for launching rush workers on remote machines. mirai manages daemons, which are persistent background processes capable of executing arbitrary R code in parallel. These daemons communicate with the main session.
Start Workers
Usually mirai is used to start daemons on remote machines but it can also be used to start local daemons.
Daemons may also be launched on a remote machine via SSH.
daemons(
n = 2L,
url = host_url(port = 5555),
remote = ssh_config(remotes = "ssh://10.75.32.90")
)On high performance computing clusters, daemons can be started using a scheduler.
We define a worker loop.
wl_random_search = function(rush, branin) {
while(TRUE) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
rush = rsh(
network = "test-network",
config = redux::redis_config())We start two new daemons.
daemons(2)After the daemons are started, we can start the remote workers.
worker_ids = rush$start_remote_workers(
worker_loop = wl_random_search,
n_workers = 2,
branin = branin)
rush$worker_info worker_id pid remote hostname heartbeat state
<char> <int> <lgcl> <char> <lgcl> <char>
1: carnivales... 10455 TRUE runnervmmt... FALSE running
2: symbiotic_... 10457 TRUE runnervmmt... FALSE running
We stop the daemons.
rush$reset()Failed Workers
Failed workers started with mirai are also detected by the controller. We simulate a segfault on the worker by killing the worker process.
rush = rsh(network = "test-failed-mirai-workers")
wl_failed_worker = function(rush) {
tools::pskill(Sys.getpid(), tools::SIGKILL)
}Start two new daemons.
daemons(2)Start two remote workers.
worker_ids = rush$start_remote_workers(
worker_loop = wl_failed_worker,
n_workers = 2)
rush$detect_lost_workers()A segmentation fault also terminates the associated mirai daemon. Therefore, it is necessary to restart the daemon before restarting the workers.
daemons(2)Workers can then be restarted using the $restart_workers() method.
rush$restart_workers(worker_ids)Script Workers
The most flexible method for starting workers is to use a script generated with the $worker_script() method. This script can be executed either on the local machine or on a remote machine. The only requirement is that the machine is capable of running R scripts and has access to the Redis database.
rush = rsh(
network = "test-script-workers",
config = redux::redis_config())
rush$worker_script(
worker_loop = wl_random_search)Error Handling
Workers started with processx or mirai are monitored by these packages. The heartbeat is a mechanism to monitor the status of script workers. The mechanism consists of a heartbeat key with a set expiration timeout and a dedicated heartbeat process that refreshes the timeout periodically. The heartbeat process is started with callr and is linked to the main process of the worker. In the event of a worker’s failure, the associated heartbeat process also ceases to function, thus halting the renewal of the timeout. The absence of the heartbeat key acts as an indicator to the controller that the worker is no longer operational. Consequently, the controller updates the worker’s status to "lost".
Heartbeats are initiated upon worker startup by specifying the heartbeat_period and heartbeat_expire parameters. The heartbeat_period defines the frequency at which the heartbeat process will update the timeout. The heartbeat_expire sets the duration, in seconds, before the heartbeat key expires. The expiration time should be set to a value greater than the heartbeat period to ensure that the heartbeat process has sufficient time to refresh the timeout.
rush$worker_script(
worker_loop = wl_random_search,
heartbeat_period = 1,
heartbeat_expire = 3)The heartbeat process is also the only way to kill a script worker. The $stop_workers(type = "kill") method pushes a kill signal to the heartbeat process. The heartbeat process terminates the main process of the worker.