library(rush)
branin = function(x1, x2) {
(x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}
wl_random_search = function(rush, branin) {
while (rush$n_finished_tasks < 100) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
tryCatch({
if (runif(1) < 0.5) stop("Random Error")
ys = list(y = branin(xs$x1, xs$x2))
rush$finish_tasks(key, yss = list(ys))
}, error = function(e) {
condition = list(message = e$message)
rush$fail_tasks(key, conditions = list(condition))
})
}
}rush provides error-handling mechanisms for two failure modes: standard R errors during task evaluation and unexpected worker failures such as crashes or lost connections. If errors cannot be caught automatically, the worker loop can be debugged locally.
Simple R Errors
We use the random search example from the tutorial and introduce a random error with 50% probability. Within the worker loop, users are responsible for catching errors and marking the corresponding task as "failed" using the $fail_tasks() method.
We initialize the network and start the workers.
rush = rsh(
network = "test-simple-error",
config = redux::redis_config())
mirai::daemons(4)
rush$start_workers(
worker_loop = wl_random_search,
n_workers = 4,
branin = branin)When an error occurs, the task is marked as "failed" and the error message is stored in the "message" column. This ensures that errors do not interrupt the overall execution and allows subsequent inspection and reevaluation of failed tasks.
rush$fetch_failed_tasks() x1 x2 worker_id message keys
<num> <num> <char> <char> <char>
1: 9.1833830 14.960407 unremarkab... Random Err... 91768041-6...
2: -3.7727678 2.083436 empathic_p... Random Err... 6c6325bf-f...
3: 0.8354427 12.896917 empirical_... Random Err... 16e9f051-b...
4: -1.4372524 9.702121 empathic_p... Random Err... 9baa5e5e-2...
5: 6.9375512 12.234767 unremarkab... Random Err... 40518bd5-5...
---
92: 6.8012699 13.904621 empirical_... Random Err... 68a7390a-4...
93: -2.0780650 5.762801 bumpy_hyen... Random Err... 7e16ab7c-2...
94: 4.3184172 9.609530 unremarkab... Random Err... 74257469-c...
95: 4.2898619 12.938496 empathic_p... Random Err... 2b4d3f47-4...
96: -2.6527788 6.925925 unremarkab... Random Err... e19df063-c...
Handling Failing Workers
When a worker fails due to a crash or lost connection, its tasks may remain in the "running" state indefinitely. We simulate a segmentation fault by terminating the worker process.
wl_failed_worker = function(rush) {
xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
key = rush$push_running_tasks(xss = list(xs))
tools::pskill(Sys.getpid(), tools::SIGKILL)
}
rush = rsh(
network = "test-failed-workers",
config = redux::redis_config())
mirai::daemons(2)
worker_ids = rush$start_workers(
worker_loop = wl_failed_worker,
n_workers = 2)The $detect_lost_workers() method identifies such workers and updates their state to "terminated". For workers started with $start_local_workers() or $start_workers(), lost worker detection works automatically by checking process status. Workers started via $worker_script() require an additional heartbeat mechanism (see the manager vignette).
rush$detect_lost_workers()[1] "parasiticidal_stoat" "ethnological_blackrhino"
When a worker fails, the state of any task it was evaluating is set to "failed".
rush$fetch_failed_tasks() x1 x2 worker_id message keys
<num> <num> <char> <char> <char>
1: -3.338709 2.027362 parasitici... Worker has... 68effdda-a...
2: -1.053197 12.201687 ethnologic... Worker has... 3ae07b97-c...
Debugging
When the worker loop fails due to an uncaught error, the loop can be executed locally to reproduce and inspect the failure. Consider the following worker loop, which generates an error for large values of x1.
To debug the worker loop locally, a RushWorker instance is instantiated manually and passed as argument to the worker loop.
rush_worker = RushWorker$new(network_id = "test-error")
wl_error(rush_worker)Error in `wl_error()`:
! Unexpected error
When an error is raised in the main process, traceback() can be called to examine the stack trace and breakpoints can be set within the worker loop to inspect the program state. Note that certain errors such as missing packages may not be reproducible locally but can be identified by running the worker loop in a separate process and using $detect_lost_workers().
rush = rsh(
network = "test-error",
config = redux::redis_config())
mirai::daemons(1)
rush$start_workers(
worker_loop = wl_error,
n_workers = 1)
rush$detect_lost_workers()[1] "antimedicative_skink"
Output and message logs can be written to files via the message_log and output_log arguments.
rush = rsh(
network = "test-error",
config = redux::redis_config())
message_log = tempdir()
output_log = tempdir()
mirai::daemons(1)
worker_ids = rush$start_workers(
worker_loop = wl_error,
n_workers = 1,
message_log = message_log,
output_log = output_log)
Sys.sleep(5)
readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))[1] "Debug message logging on worker hopping_vipersquid started"
[1] "[1] \"Debug output logging on worker hopping_vipersquid started\""