Skip to contents

rush provides error-handling mechanisms for two failure modes: standard R errors during task evaluation and unexpected worker failures such as crashes or lost connections. If errors cannot be caught automatically, the worker loop can be debugged locally.

Simple R Errors

We use the random search example from the tutorial and introduce a random error with 50% probability. Within the worker loop, users are responsible for catching errors and marking the corresponding task as "failed" using the $fail_tasks() method.

library(rush)

branin = function(x1, x2) {
  (x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}

wl_random_search = function(rush, branin) {

  while (rush$n_finished_tasks < 100) {

    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))

    tryCatch({
      if (runif(1) < 0.5) stop("Random Error")
      ys = list(y = branin(xs$x1, xs$x2))
      rush$finish_tasks(key, yss = list(ys))
    }, error = function(e) {
      condition = list(message = e$message)
      rush$fail_tasks(key, conditions = list(condition))
    })
  }
}

We initialize the network and start the workers.

rush = rsh(
  network = "test-simple-error",
  config = redux::redis_config())

mirai::daemons(4)

rush$start_workers(
  worker_loop = wl_random_search,
  n_workers = 4,
  branin = branin)

When an error occurs, the task is marked as "failed" and the error message is stored in the "message" column. This ensures that errors do not interrupt the overall execution and allows subsequent inspection and reevaluation of failed tasks.

rush$fetch_failed_tasks()
            x1         x2     worker_id condition          keys
         <num>      <num>        <char>    <list>        <char>
  1:  3.003862 13.9916757 acidformin... <list[1]> 1ab85463-0...
  2: -2.603896 10.4074370 unheeding_... <list[1]> 0a113b36-d...
  3:  6.827018  8.4938548 acidformin... <list[1]> 90c995c7-d...
  4:  5.441806  9.6496765 unheeding_... <list[1]> d372bf23-7...
  5:  7.991742  0.8324553 acidformin... <list[1]> a68dcd2d-5...
 ---
101:  8.180984  8.0348462 pure_grosb... <list[1]> b5df3316-0...
102: -2.827458  7.4691366 acidformin... <list[1]> e83dce33-8...
103:  8.875858  9.7157809 unheeding_... <list[1]> 48d34fb7-5...
104:  9.735627 10.8418376 pure_grosb... <list[1]> a9224fdc-6...
105:  1.315268 14.1246089 pure_grosb... <list[1]> e88abace-f...

Handling Failing Workers

When a worker fails due to a crash or lost connection, its tasks may remain in the "running" state indefinitely. We simulate a segmentation fault by terminating the worker process.

wl_failed_worker = function(rush) {
  xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
  key = rush$push_running_tasks(xss = list(xs))

  tools::pskill(Sys.getpid(), tools::SIGKILL)
}

rush = rsh(
  network = "test-failed-workers",
  config = redux::redis_config())

mirai::daemons(2)

worker_ids = rush$start_workers(
  worker_loop = wl_failed_worker,
  n_workers = 2)

The $detect_lost_workers() method identifies such workers and updates their state to "terminated". For workers started with $start_local_workers() or $start_workers(), lost worker detection works automatically by checking process status. Workers started via $worker_script() require an additional heartbeat mechanism (see the manager vignette).

rush$detect_lost_workers()
[1] "flannel_angelwingmussel" "bloodsucking_murrelet"  

When a worker fails, the state of any task it was evaluating is set to "failed".

rush$fetch_failed_tasks()
           x1       x2     worker_id condition          keys
        <num>    <num>        <char>    <list>        <char>
1: -1.6568655 11.26278 flannel_an... <list[1]> 747b099c-8...
2:  0.1342466 11.20979 bloodsucki... <list[1]> 0fc8b839-4...

Debugging

When the worker loop fails due to an uncaught error, the loop can be executed locally to reproduce and inspect the failure. Consider the following worker loop, which generates an error for large values of x1.

wl_error = function(rush) {

  repeat {
    x1 = runif(1)
    x2 = runif(1)

    xss = list(list(x1 = x1, x2 = x2))

    key = rush$push_running_tasks(xss = xss)

    if (x1 > 0.90) {
      stop("Unexpected error")
    }

    rush$finish_tasks(key, yss = list(list(y = x1 + x2)))
  }
}

To debug the worker loop locally, a RushWorker instance is instantiated manually and passed as argument to the worker loop.

rush_worker = RushWorker$new(network_id = "test-error")

wl_error(rush_worker)
Error in `wl_error()`:
! Unexpected error

When an error is raised in the main process, traceback() can be called to examine the stack trace and breakpoints can be set within the worker loop to inspect the program state. Note that certain errors such as missing packages may not be reproducible locally but can be identified by running the worker loop in a separate process and using $detect_lost_workers().

rush = rsh(
  network = "test-error",
  config = redux::redis_config())

mirai::daemons(1)

rush$start_workers(
  worker_loop = wl_error,
  n_workers = 1)
rush$detect_lost_workers()
[1] "breezy_starling"

Output and message logs can be written to files via the message_log and output_log arguments.

rush = rsh(
  network = "test-error",
  config = redux::redis_config())

message_log = tempdir()
output_log = tempdir()

mirai::daemons(1)

worker_ids = rush$start_workers(
  worker_loop = wl_error,
  n_workers = 1,
  message_log = message_log,
  output_log = output_log)

Sys.sleep(5)

readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))
[1] "Debug message logging on worker paramilitaristic_kestrel started"
readLines(file.path(output_log, sprintf("output_%s.log", worker_ids[1])))
[1] "[1] \"Debug output logging on worker paramilitaristic_kestrel started\""