library(rush)
branin = function(x1, x2) {
(x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}
wl_random_search = function(rush, branin) {
while (rush$n_finished_tasks < 100) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
tryCatch({
if (runif(1) < 0.5) stop("Random Error")
ys = list(y = branin(xs$x1, xs$x2))
rush$finish_tasks(key, yss = list(ys))
}, error = function(e) {
condition = list(message = e$message)
rush$fail_tasks(key, conditions = list(condition))
})
}
}rush provides error-handling mechanisms for two failure modes: standard R errors during task evaluation and unexpected worker failures such as crashes or lost connections. If errors cannot be caught automatically, the worker loop can be debugged locally.
Simple R Errors
We use the random search example from the tutorial and introduce a random error with 50% probability. Within the worker loop, users are responsible for catching errors and marking the corresponding task as "failed" using the $fail_tasks() method.
We initialize the network and start the workers.
rush = rsh(
network = "test-simple-error",
config = redux::redis_config())
mirai::daemons(4)
rush$start_workers(
worker_loop = wl_random_search,
n_workers = 4,
branin = branin)When an error occurs, the task is marked as "failed" and the error message is stored in the "message" column. This ensures that errors do not interrupt the overall execution and allows subsequent inspection and reevaluation of failed tasks.
rush$fetch_failed_tasks() x1 x2 worker_id message keys
<num> <num> <char> <char> <char>
1: 7.6917408 1.771299 laudable_j... Random Err... d943116b-2...
2: -1.6571573 8.807278 finespun_b... Random Err... 7c447322-c...
3: 0.0157243 13.025460 open_swall... Random Err... 67166bfc-a...
4: 8.6915715 12.272576 unoutlawed... Random Err... 7a48bbf8-8...
5: 8.4977428 9.029268 finespun_b... Random Err... 67a8bf79-6...
---
122: 3.0551111 9.512381 unoutlawed... Random Err... 00f9758e-0...
123: 9.6231552 14.944322 open_swall... Random Err... 32429319-8...
124: 4.2680911 14.593730 laudable_j... Random Err... ef01f21f-2...
125: -1.8105981 1.026905 unoutlawed... Random Err... aae8d484-5...
126: 5.5433422 2.592839 open_swall... Random Err... f0b9e696-0...
Handling Failing Workers
When a worker fails due to a crash or lost connection, its tasks may remain in the "running" state indefinitely. We simulate a segmentation fault by terminating the worker process.
wl_failed_worker = function(rush) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
tools::pskill(Sys.getpid(), tools::SIGKILL)
}
rush = rsh(
network = "test-failed-workers",
config = redux::redis_config())
mirai::daemons(2)
worker_ids = rush$start_workers(
worker_loop = wl_failed_worker,
n_workers = 2)The $detect_lost_workers() method identifies such workers and updates their state to "terminated". For workers started with $start_local_workers() or $start_workers(), lost worker detection works automatically by checking process status. Workers started via $worker_script() require an additional heartbeat mechanism (see the manager vignette).
rush$detect_lost_workers()[1] "nonangelic_germanpinscher" "strongminded_cub"
When a worker fails, the state of any task it was evaluating is set to "failed".
rush$fetch_failed_tasks() x1 x2 worker_id message keys
<num> <num> <char> <char> <char>
1: -1.492087 1.170886 nonangelic... Worker has... 5a7e7bb8-8...
2: 3.591012 2.315620 strongmind... Worker has... f6d5fdb8-f...
Debugging
When the worker loop fails due to an uncaught error, the loop can be executed locally to reproduce and inspect the failure. Consider the following worker loop, which generates an error for large values of x1.
To debug the worker loop locally, a RushWorker instance is instantiated manually and passed as argument to the worker loop.
rush_worker = RushWorker$new(network_id = "test-error")
wl_error(rush_worker)Error in `wl_error()`:
! Unexpected error
When an error is raised in the main process, traceback() can be called to examine the stack trace and breakpoints can be set within the worker loop to inspect the program state. Note that certain errors such as missing packages may not be reproducible locally but can be identified by running the worker loop in a separate process and using $detect_lost_workers().
rush = rsh(
network = "test-error",
config = redux::redis_config())
mirai::daemons(1)
rush$start_workers(
worker_loop = wl_error,
n_workers = 1)
rush$detect_lost_workers()[1] "chummy_upupa"
Output and message logs can be written to files via the message_log and output_log arguments.
rush = rsh(
network = "test-error",
config = redux::redis_config())
message_log = tempdir()
output_log = tempdir()
mirai::daemons(1)
worker_ids = rush$start_workers(
worker_loop = wl_error,
n_workers = 1,
message_log = message_log,
output_log = output_log)
Sys.sleep(5)
readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))[1] "Debug message logging on worker unacceptable_blackbuck started"
[1] "[1] \"Debug output logging on worker unacceptable_blackbuck started\""