Skip to contents

rush is equipped with an advanced error-handling mechanism designed to manage and mitigate errors encountered during the execution of tasks. It adeptly handles a range of error scenarios, from standard R errors to more complex issues such as segmentation faults and network errors.t If all of this fails, the user can manually debug the worker loop.

Simple R Errors

To illustrate the error-handling mechanism in rush, we employ the random search example from the main vignette. This time we introduce a random error with a 50% probability. Within the worker loop, users are responsible for catching errors and marking the corresponding task as "failed" using the $push_failed() method.

library(rush)

branin = function(x1, x2) {
  (x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}

wl_random_search = function(rush) {

  while(rush$n_finished_tasks < 100) {

    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))

    tryCatch({
      if (runif(1) < 0.5) stop("Random Error")
      ys = list(y = branin(xs$x1, xs$x2))
      rush$push_results(key, yss = list(ys))
    }, error = function(e) {
      condition = list(message = e$message)
      rush$push_failed(key, conditions = list(condition))
    })

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

We start the workers.

rush = rsh(
  network = "test-simply-error",
  config = redux::redis_config())

rush$start_local_workers(
  worker_loop = wl_random_search,
  n_workers = 4,
  globals = "branin")

When an error occurs, the task is marked as "failed", and the error message is stored in the "message" column. This approach ensures that errors do not interrupt the overall execution process. It allows for subsequent inspection of errors and the reevaluation of failed tasks as necessary.

rush$fetch_failed_tasks()
            x1          x2   pid     worker_id       message          keys
         <num>       <num> <int>        <char>        <char>        <char>
 1:  5.3306905 13.34125971 10263 shaded_ane... Random Err... 51b8feab-0...
 2:  4.6248816 14.03182793 10263 shaded_ane... Random Err... d7bf28a5-b...
 3: -3.7980307 14.81245595 10263 shaded_ane... Random Err... c91f90cd-7...
 4:  5.2517295  3.74385157 10274 skyblue_bl... Random Err... 8ef9214f-c...
 5: -1.0890001 14.16882488 10289 glazed_bac... Random Err... 1e42567a-5...
 6:  8.8137591  5.16665979 10263 shaded_ane... Random Err... cd2933d1-c...
 7:  1.8933140  9.25528622 10263 shaded_ane... Random Err... 9d926d08-3...
 8:  1.8758475 10.34500553 10263 shaded_ane... Random Err... cb900e23-3...
 9:  1.6627131  2.86706344 10263 shaded_ane... Random Err... 9f296a79-d...
10:  2.1547699  0.81432617 10263 shaded_ane... Random Err... 83b2061b-7...
11: -3.3906523  8.71414121 10274 skyblue_bl... Random Err... 37fd32ad-b...
12: -1.9625533  0.14588584 10263 shaded_ane... Random Err... 23740d7c-0...
13:  0.3581200 13.76013633 10274 skyblue_bl... Random Err... fa25d49f-3...
14: -0.5029045 12.96263149 10279 dunderhead... Random Err... 98cddd79-9...
15:  7.8999051  0.07507688 10263 shaded_ane... Random Err... 472cab9f-b...
16:  1.6267545  7.19040913 10274 skyblue_bl... Random Err... ae5ed90a-d...
17: -0.4395247 10.93476297 10274 skyblue_bl... Random Err... 344bbb26-e...
18:  9.6633177  6.01652526 10263 shaded_ane... Random Err... 2b1ce82a-e...
19: -1.1491126  8.53421091 10274 skyblue_bl... Random Err... bab3ecec-7...
20:  8.5985205  4.24345337 10289 glazed_bac... Random Err... 6ea53980-d...
21:  6.2427895  3.57835047 10279 dunderhead... Random Err... 61bcad41-2...
22:  5.2690097 14.64578461 10274 skyblue_bl... Random Err... a273f92b-a...
23: -0.2384118  4.78702079 10263 shaded_ane... Random Err... ba842f57-9...
24:  9.4717935  5.45352554 10289 glazed_bac... Random Err... 6772641c-8...
25: -1.9212540 14.99319278 10263 shaded_ane... Random Err... 2d9786ca-d...
26:  2.5693082  6.36700058 10279 dunderhead... Random Err... 84a2ba94-4...
27:  5.5095419  9.96424221 10289 glazed_bac... Random Err... e9ebcfd6-9...
28: -3.3753937 14.35489359 10279 dunderhead... Random Err... 95bf4ff7-b...
29:  0.4198621  3.54809090 10274 skyblue_bl... Random Err... 1aae52b1-5...
30:  0.2783702  8.76949143 10279 dunderhead... Random Err... 86f4eda1-3...
31:  7.4539628  8.82947285 10274 skyblue_bl... Random Err... e8947a11-9...
32:  7.9936219  4.60347461 10279 dunderhead... Random Err... fd2276a0-5...
33:  5.8792902  6.64972266 10289 glazed_bac... Random Err... b1d1069a-4...
            x1          x2   pid     worker_id       message          keys

Handling Failing Workers

The rush package provides mechanisms to address situations in which workers fail due to crashes or lost connections. Such failures may result in tasks remaining in the “running” state indefinitely. To illustrate this, we define a function that simulates a segmentation fault by terminating the worker process.

wl_failed_worker = function(rush) {
  xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
  key = rush$push_running_tasks(xss = list(xs))

  tools::pskill(Sys.getpid(), tools::SIGKILL)
}

rush = rsh(network = "test-failed-workers")

worker_ids =  rush$start_local_workers(
  worker_loop = wl_failed_worker,
  n_workers = 2)

The package offers the $detect_lost_workers() method, which is designed to identify and manage these occurrences.

rush$detect_lost_workers()

This method works for workers started with $start_local_workers() and $start_remote_workers(). Workers started with $worker_script() must be started with a heartbeat mechanism (see vignette).

The $detect_lost_workers() method also supports automatic restarting of lost workers when the option restart_workers = TRUE is specified. Alternatively, lost workers may be restarted manually using the $restart_workers() method. Automatic restarting is only available for local workers. When a worker fails, the status of the task that caused the failure is set to "failed".

rush$fetch_failed_tasks()
         x1       x2   pid     worker_id       message          keys
      <num>    <num> <int>        <char>        <char>        <char>
1: 5.181922 14.30088 10412 extralegal... Worker has... efc17dc1-e...
2: 7.142554 13.71919 10414 selfdestro... Worker has... f334b02e-2...

Debugging

When the worker loop fails unexpectedly due to an uncaught error, it is necessary to debug the worker loop. Consider the following example, in which the worker loop randomly generates an error.

wl_error = function(rush) {

  repeat {
    x1 = runif(1)
    x2 = runif(1)

    xss = list(list(x1 = x1, x2 = x2))

    key = rush$push_running_tasks(xss = xss)

    if (x1 > 0.90) {
      stop("Unexpected error")
    }

    rush$push_results(key, yss = list(list(y = x1 + x2)))
  }
}

To begin debugging, the worker loop is executed locally. This requires the initialization of a RushWorker instance. Although the rush worker is typically created during worker initialization, it can also be instantiated manually. The worker instance is then passed as an argument to the worker loop.

rush_worker = RushWorker$new("test", remote = FALSE)

wl_error(rush_worker)
Error in wl_error(rush_worker): Unexpected error

When an error is raised in the main process, the traceback() function can be invoked to examine the stack trace. Breakpoints may also be set within the worker loop to inspect the program state. This approach provides substantial control over the debugging process. Certain errors, such as missing packages or undefined global variables, may not be encountered when running locally. However, such issues can be readily identified using the $detect_lost_workers() method.

rush = rsh("test-error")

rush$start_local_workers(
  worker_loop = wl_error,
  n_workers = 1
)

The $detect_lost_workers() method can be used to identify lost workers.

rush$detect_lost_workers()

Output and message logs can be written to files by specifying the message_log and output_log arguments.

rush = rsh("test-error")

message_log = tempdir()
output_log = tempdir()

worker_ids = rush$start_local_workers(
  worker_loop = wl_error,
  n_workers = 1,
  message_log = message_log,
  output_log = output_log
)

Sys.sleep(5)

readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))
[1] "Debug message logging on worker homebrewed_gossamerwingedbutterfly started"
[2] "Error in start_args$worker_loop(rush = rush) : Unexpected error"
[3] "Calls: <Anonymous> ... <Anonymous> -> eval.parent -> eval -> eval -> <Anonymous>"
[4] "Execution halted"                                                                
readLines(file.path(output_log, sprintf("output_%s.log", worker_ids[1])))
[1] "[1] \"Debug output logging on worker homebrewed_gossamerwingedbutterfly started\""