Skip to contents

rush is equipped with an advanced error-handling mechanism designed to manage and mitigate errors encountered during the execution of tasks. It adeptly handles a range of error scenarios, from standard R errors to more complex issues such as segmentation faults and network errors.t If all of this fails, the user can manually debug the worker loop.

Simple R Errors

To illustrate the error-handling mechanism in rush, we employ the random search example from the main vignette. This time we introduce a random error with a 50% probability. Within the worker loop, users are responsible for catching errors and marking the corresponding task as "failed" using the $push_failed() method.

library(rush)

branin = function(x1, x2) {
  (x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}

wl_random_search = function(rush) {

  while(rush$n_finished_tasks < 100) {

    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))

    tryCatch({
      if (runif(1) < 0.5) stop("Random Error")
      ys = list(y = branin(xs$x1, xs$x2))
      rush$push_results(key, yss = list(ys))
    }, error = function(e) {
      condition = list(message = e$message)
      rush$push_failed(key, conditions = list(condition))
    })

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

We start the workers.

rush = rsh(
  network = "test-simply-error",
  config = redux::redis_config())

rush$start_local_workers(
  worker_loop = wl_random_search,
  n_workers = 4,
  globals = "branin")

When an error occurs, the task is marked as "failed", and the error message is stored in the "message" column. This approach ensures that errors do not interrupt the overall execution process. It allows for subsequent inspection of errors and the reevaluation of failed tasks as necessary.

rush$fetch_failed_tasks()
             x1         x2   pid     worker_id       message          keys
          <num>      <num> <int>        <char>        <char>        <char>
 1:  3.80275299  9.9139174  9845 hemp_yorks... Random Err... eab695b3-4...
 2:  5.30620813 10.6300671  9860 intimate_d... Random Err... 73d9881b-2...
 3:  4.06315086  7.0793266  9834     lead_lobo Random Err... 74c5fa92-d...
 4: -3.29758422  5.6756233  9834     lead_lobo Random Err... 1b7e33a9-c...
 5:  4.48913989  7.7112356  9834     lead_lobo Random Err... d76037de-e...
 6: -4.81489508  9.7766883  9834     lead_lobo Random Err... 3fa3d072-d...
 7:  2.12394886  2.5450697  9834     lead_lobo Random Err... e6eb22c9-f...
 8:  2.43412536  2.2151950  9834     lead_lobo Random Err... 8167b485-a...
 9: -2.10910088 11.7196716  9834     lead_lobo Random Err... f55d83b0-3...
10:  9.31402385 13.5452307  9845 hemp_yorks... Random Err... 6e36331b-0...
11:  5.71899128 10.2681845  9834     lead_lobo Random Err... 16c6c85c-f...
12: -1.31837321 12.9489221  9834     lead_lobo Random Err... 271c381f-7...
13:  0.07745564  7.0477217  9860 intimate_d... Random Err... 364500ba-f...
14:  0.66451758  6.3847975  9834     lead_lobo Random Err... fdfbdd3a-c...
15: -4.57596818  8.8208195  9834     lead_lobo Random Err... c8d2b65c-c...
16:  6.08748589 12.1324671  9845 hemp_yorks... Random Err... d8e088ff-f...
17:  5.02008085 10.0652536  9834     lead_lobo Random Err... e8f92968-e...
18:  1.51163244 11.9584450  9850 hurried_le... Random Err... eb874215-e...
19:  6.35715813  3.0612034  9845 hemp_yorks... Random Err... 8868670a-4...
20:  9.06794333  8.2543310  9834     lead_lobo Random Err... c72f644a-4...
21:  2.04040886  2.0767009  9834     lead_lobo Random Err... 402d7568-0...
22:  5.09811161  3.2173444  9860 intimate_d... Random Err... 94f99809-e...
23:  5.52276649  5.1299771  9850 hurried_le... Random Err... 874cafd7-6...
24:  6.01616479  0.4659816  9845 hemp_yorks... Random Err... 277048ac-6...
25:  9.42805412 14.4499124  9850 hurried_le... Random Err... 4a40fb89-c...
26:  7.08247567 12.3140297  9845 hemp_yorks... Random Err... cf2bdf50-6...
27:  6.70092965  2.2525032  9845 hemp_yorks... Random Err... 156d0ff9-f...
28:  5.04929654  5.1012601  9845 hemp_yorks... Random Err... 6cc9af24-c...
29:  0.71573698 14.5786538  9850 hurried_le... Random Err... 186f3396-2...
30:  0.63308053  7.9358208  9834     lead_lobo Random Err... 67c913f1-2...
31: -0.98503399 12.5860349  9845 hemp_yorks... Random Err... 2df1fe6d-7...
32: -3.95994872 10.3323278  9860 intimate_d... Random Err... fbdb74a6-7...
33:  6.12797612  5.1882825  9834     lead_lobo Random Err... fcafe639-a...
34:  4.75206281 11.9498249  9845 hemp_yorks... Random Err... 2834c9b3-9...
35:  0.67506538  1.8407939  9860 intimate_d... Random Err... df7a3597-4...
36:  1.66289966  3.3571175  9850 hurried_le... Random Err... 8a9a0194-3...
37:  9.10195614 14.3547913  9860 intimate_d... Random Err... 3c3f8551-c...
38: -1.48749411  3.4034063  9850 hurried_le... Random Err... 47462b67-c...
             x1         x2   pid     worker_id       message          keys

Handling Failing Workers

The rush package provides mechanisms to address situations in which workers fail due to crashes or lost connections. Such failures may result in tasks remaining in the “running” state indefinitely. To illustrate this, we define a function that simulates a segmentation fault by terminating the worker process.

wl_failed_worker = function(rush) {
  xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
  key = rush$push_running_tasks(xss = list(xs))

  tools::pskill(Sys.getpid(), tools::SIGKILL)
}

rush = rsh(network = "test-failed-workers")

worker_ids =  rush$start_local_workers(
  worker_loop = wl_failed_worker,
  n_workers = 2)

The package offers the $detect_lost_workers() method, which is designed to identify and manage these occurrences.

rush$detect_lost_workers()

This method works for workers started with $start_local_workers() and $start_remote_workers(). Workers started with $worker_script() must be started with a heartbeat mechanism (see vignette).

The $detect_lost_workers() method also supports automatic restarting of lost workers when the option restart_workers = TRUE is specified. Alternatively, lost workers may be restarted manually using the $restart_workers() method. Automatic restarting is only available for local workers. When a worker fails, the status of the task that caused the failure is set to "failed".

rush$fetch_failed_tasks()
         x1        x2   pid     worker_id       message          keys
      <num>     <num> <int>        <char>        <char>        <char>
1: 4.581689 14.727954  9983 impossible... Worker has... cbd3075d-3...
2: 1.601901  6.410424  9985 blind_gray... Worker has... 9d1e7960-1...

Debugging

When the worker loop fails unexpectedly due to an uncaught error, it is necessary to debug the worker loop. Consider the following example, in which the worker loop randomly generates an error.

wl_error = function(rush) {

  repeat {
    x1 = runif(1)
    x2 = runif(1)

    xss = list(list(x1 = x1, x2 = x2))

    key = rush$push_running_tasks(xss = xss)

    if (x1 > 0.90) {
      stop("Unexpected error")
    }

    rush$push_results(key, yss = list(list(y = x1 + x2)))
  }
}

To begin debugging, the worker loop is executed locally. This requires the initialization of a RushWorker instance. Although the rush worker is typically created during worker initialization, it can also be instantiated manually. The worker instance is then passed as an argument to the worker loop.

rush_worker = RushWorker$new("test", remote = FALSE)

wl_error(rush_worker)
Error in wl_error(rush_worker): Unexpected error

When an error is raised in the main process, the traceback() function can be invoked to examine the stack trace. Breakpoints may also be set within the worker loop to inspect the program state. This approach provides substantial control over the debugging process. Certain errors, such as missing packages or undefined global variables, may not be encountered when running locally. However, such issues can be readily identified using the $detect_lost_workers() method.

rush = rsh("test-error")

rush$start_local_workers(
  worker_loop = wl_error,
  n_workers = 1
)

The $detect_lost_workers() method can be used to identify lost workers.

rush$detect_lost_workers()

Output and message logs can be written to files by specifying the message_log and output_log arguments.

rush = rsh("test-error")

message_log = tempdir()
output_log = tempdir()

worker_ids = rush$start_local_workers(
  worker_loop = wl_error,
  n_workers = 1,
  message_log = message_log,
  output_log = output_log
)

Sys.sleep(5)

readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))
[1] "Debug message logging on worker differential_crow started"
[2] "Error in start_args$worker_loop(rush = rush) : Unexpected error"
[3] "Calls: <Anonymous> ... <Anonymous> -> eval.parent -> eval -> eval -> <Anonymous>"
[4] "Execution halted"                                                                
readLines(file.path(output_log, sprintf("output_%s.log", worker_ids[1])))
[1] "[1] \"Debug output logging on worker differential_crow started\""