Skip to contents

rush is a package designed to solve large-scale problems asynchronously across a distributed network. Employing a database centric model, rush enables workers to communicate tasks and their results over a shared Redis database. This article demonstrates how to use rush with 3 different examples.

We begin with a simple random search to optimize the Branin function in parallel. Although random search does not require communication between workers, it is a good way to introduce the basic ideas behind rush. The classic Branin function (also called the Branin-Hoo function) is a well-known benchmark problem in global optimization. It is a two-dimensional function that is non-convex, multimodal, and has three global minima. The function is a toy example for optimization thats fast to evaluate but not too simple to be solved.

branin = function(x1, x2) {
  (x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}

The Branin function is usually evaluated on the domain \(x_1 \in [-5, 10]\) and \(x_2 \in [0, 15]\).

Worker Loop

We define the worker_loop function, which runs on each worker. It repeatedly draws tasks, evaluates them, and sends the results to the Redis database. The function takes a single argument: a RushWorker object, which handles communication with Redis. In this example, each worker samples a random point, creates a task, evaluates it using the Branin function, and submits the result. The optimization stops after 100 tasks have been evaluated.

wl_random_search = function(rush) {

  while(rush$n_finished_tasks < 100) {

    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

The most important methods of the RushWorker are the $push_running_tasks() and $push_results() methods. The first method $push_running_tasks() creates a new task in the Redis database. Since it is evaluated next, the task is marked as running. The $push_running_tasks() method returns a unique key that is used to identify the task. The second method $push_results() is used to push the results back to the Redis database. It takes the key of the task and a list of results. To mark the task as running is not important for a random search, but it is crucial for more sophisticated algorithms that use the tasks of other workers to decide which task to evaluate next. For example, Bayesian optimization algorithms would sample the next point further away from the previous points to explore the search space. The $n_finished_tasks shows how many tasks are finished and is used to stop the worker loop.

Tasks

Tasks are the unit in which workers exchange information. The main components of a task are the key, computational state, input (xs), and result (ys). The key is a unique identifier for the task. It identifies the task in the Redis database. The four possible computational states are "running", "finished", "failed", and "queued". The $push_running_tasks() method marks it as "running" and returns the key of the task. The $push_results() method marks a task as "finished" and stores the result. Failed tasks can be marked as "failed" with the $push_failed() method. The error catching must be implemented in the worker loop (see Error Handling for more details). Tasks can also be pushed to a queue with the $push_tasks() method which sets the state to "queued". The last example gives more details on the task queue and the different methods to push and pop tasks. The input xs and result ys are lists that can contain arbitrary data. Usually the methods of the RushWorker work on multiple tasks at once, so xxs and yss are lists of inputs and results.

Controller

The Rush controller is responsible for starting, observing, and stopping workers within the network. It is initialized using the rsh() function, which requires a network ID and a config argument. The config argument is a configuration file used to connect to the Redis database via the redux package.

library(rush)

config = redux::redis_config()

rush = rsh(
  network = "test-random-search",
  config = config)

Workers can be started using the $start_local_workers() method, which accepts the worker loop and the number of workers as arguments. The workers are started locally with the processx package but it is also possible to start workers on a remote machine (see Rush Controller). We need to export the branin function to the workers, so we set the globals argument to "branin". More on globals and the different worker types can be found in the Rush Controller vignette.

rush$start_local_workers(
  worker_loop = wl_random_search,
  n_workers = 4,
  globals = "branin")

rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0

The optimization is quickly finished and we retrieve the results. The $fetch_finished_tasks() method fetches all finished tasks from the database. The method returns a data.table() with the key, input, and result. The pid and worker_id column are additional information that are stored when the task is created. The worker_id is the id of the worker that evaluated the task and the pid is the process id of that worker. Further extra information can be passed as lists to the $push_running_tasks() and $push_results() methods via the extra argument.

rush$fetch_finished_tasks()[order(y)]
            x1         x2          y   pid     worker_id          keys
         <num>      <num>      <num> <int>        <char>        <char>
  1: -3.647598 13.1506460   1.740640 10447 grandiose_... 4d620d39-3...
  2:  9.852127  1.7911356   2.401915 10458 islandlike... e98f63c3-f...
  3:  9.363334  0.8563421   2.872488 10447 grandiose_... 61004f5f-5...
  4:  2.984817  4.0637092   3.282119 10447 grandiose_... aae03183-d...
  5: -3.933311 13.1516803   4.478744 10458 islandlike... bd97305d-5...
 ---
 99: -4.428579  2.8444521 169.554094 10447 grandiose_... 53c355e9-f...
100:  5.277117 13.8416638 174.982620 10447 grandiose_... f1f2f6aa-6...
101: -4.229373  1.6711407 184.321638 10447 grandiose_... d411913b-f...
102:  5.813253 14.0535984 186.005563 10458 islandlike... 6cae78bb-a...
103:  5.014075 14.4768169 187.334562 10447 grandiose_... f150eaa6-e...

The rush controller displays how many workers are running and how many tasks exist in each state. In this case, 103 tasks are marked as finished, and all workers have stopped. The number slightly exceeds 100 because workers check the stopping condition independently. If several workers evaluate the condition around the same time — when, for example, 99 tasks are finished — they may all create new tasks before detecting that the limit has been reached. Additionally, tasks may continue to be created while the 100th task is still being evaluated.

rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 103
* Failed Tasks: 0

We can stop the workers and reset the database with the $reset() method.

rush$reset()

rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0

To learn more about starting, stopping and observing workers, see the Rush Controller vignette.

Median Stopping

Random search is a simple example that doesn’t rely on information from previous tasks and therefore doesn’t require communication between workers. Now, let’s implement a more sophisticated algorithm that uses the results of completed tasks to decide whether to continue evaluating the current one. We tune an XGBoost model on the mtcars dataset and use the median stopping rule to stop the training early.

Worker Loop

The worker starts by sampling a random hyperparameter configuration with three parameters: maximum tree depth, lambda regularization, and alpha regularization. These parameters control how the XGBoost model learns from the data. The worker then trains the model incrementally, starting with 5 boosting rounds and adding one round at a time up to 20 rounds. After each round, the worker evaluates the model’s performance on a test set using root mean squared error (RMSE). At this point, the worker checks how well its model is doing compared to other workers by fetching their completed results and comparing its performance to the median score among all models with the same number of training rounds.

If the current model performs worse than the median, the worker stops this hyperparameter configuration and starts over with a new one. This early stopping mechanism prevents workers from wasting time on poor-performing configurations. If the model performs at or above the median, the worker continues training for one more round. The process continues until the network has evaluated 1000 complete models across all workers.

wl_median_stopping = function(rush) {
  while(rush$n_finished_tasks < 1000) {

    params = list(
      max_depth = sample(1:20, 1),
      lambda = runif(1, 0, 1),
      alpha = runif(1, 0, 1)
    )

    model = NULL
    for (iteration in seq(5, 20, by = 1)) {

      key = rush$push_running_tasks(xss = list(c(params, list(nrounds = iteration))))

      model = xgboost(
        data = as.matrix(data[training_ids, ]),
        label = y[training_ids],
        nrounds = if (is.null(model)) 5 else 1,
        params = params,
        xgb_model = model,
        verbose = 0
      )

      pred = predict(model, as.matrix(data[test_ids, ]))
      rmse = sqrt(mean((pred - y[test_ids])^2))

      rush$push_results(key, yss = list(list(rmse = rmse)))

      tasks = rush$fetch_finished_tasks()
      if (rmse > median(tasks[nrounds == iteration, rmse])) break
    }
  }
}

The worker loop uses a new method called $fetch_finished_tasks() to fetch all finished tasks from the database. Other methods like $fetch_running_tasks() and $fetch_failed_tasks() are also available.

We sample a training and test set from the mtcars dataset. The training set is used to fit the model and the test set is used to evaluate the model. Then we initialize the rush network and start the workers. This time we have to pass the training and test set to the workers via the globals argument and the packages argument to load the data.table and xgboost packages.

data(mtcars)

training_ids = sample(1:nrow(mtcars), 20)
test_ids = setdiff(1:nrow(mtcars), training_ids)
data = mtcars[, -1]
y = mtcars$mpg

config = redux::redis_config()

rush = rsh(
  network = "test-median-stopping",
  config = config)

rush$start_local_workers(
  worker_loop = wl_median_stopping,
  n_workers = 4,
  packages = c("data.table", "xgboost"),
  globals = c("training_ids", "test_ids", "data", "y"))

We fetch the finished tasks and sort them by the objective value.

rush$fetch_finished_tasks()[order(y)]
Null data.table (0 rows and 0 cols)

We stop the workers and reset the database.ch

rush$reset()

Bayesian Optimization

We implement Asynchronous Distributed Bayesian Optimization (ADBO) [@egele_2023] next. This example shows how workers use information about running tasks and introduces task queues. ADBO runs sequential Bayesian optimization on multiple workers in parallel. Each worker maintains its own surrogate model (a random forest) and selects the next hyperparameter configuration by maximizing the upper confidence bounds acquisition function. To promote a varying exploration-exploitation tradeoff between the workers, the acquisition functions are initialized with different lambda values ranging from 0.1 to 10. When a worker completes an evaluation, it asynchronously sends the result to its peers via a Redis data base; each worker then updates its local model with this shared information. This decentralized design enables workers to proceed independently; eliminating the need for a central coordinator that could become a bottleneck in large-scale optimization scenarios.

We first create a new rush network.

config = redux::redis_config()

rush = rsh(
  network = "test-bayesian-optimization",
  config = config)

Queues

The queue system works by pushing and popping tasks from a queue. The $push_task() method creates new tasks and pushes them to the queue. In this example, we draw an initial design of 25 points and push them to the queue.

xss = replicate(25, list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15)), simplify = FALSE)

rush$push_tasks(xss = xss)

rush
<Rush>
* Running Workers: 0
* Queued Tasks: 25
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0

We see 25 queued tasks in the database. To retrieve the tasks from the queue, we need to implement the $pop_task() method in the worker loop.

Worker Loop

The worker loop pops tasks with the $pop_task() method from the queue. The task is evaluated and the results are pushed back to the database with the $push_results() method. If there are no more tasks in the queue, the $pop_task() method returns NULL and the worker loop starts the Bayesian optimization. First, a lambda value for the acquisition function is sampled between 0.01 and 10. Then all running and finished tasks are fetched from the database. Using rush$fetch_tasks_with_state() instead of using $fetch_running_tasks() and $fetch_finished_tasks() is important because it prevents tasks from appearing twice. This could be the case if a worker changes the state of a task from "running" to "finished" while the tasks are being fetched. The missing y values of the running tasks are imputed with the mean of the finished tasks. Then the surrogate random forest model is fitted to the data and the acquisition function is optimized to find the next task. Marking the task as running is important for the Bayesian optimization algorithm, as it uses the already sampled points of the other workers to decide which task to evaluate next. The task is evaluated and the results are pushed back to the database. We stop the optimization process after 100 evaluated tasks.

wl_bayesian_optimization = function(rush) {
  repeat {
    task = rush$pop_task()
    if (is.null(task)) break
    ys = list(y = branin(task$xs$x1, task$xs$x2))
    rush$push_results(task$key, yss = list(ys))
  }

  lambda = runif(1, 0.01, 10)

  while(rush$n_finished_tasks < 100) {

    xydt = rush$fetch_tasks_with_state(states = c("running", "finished"))
    mean_y = mean(xydt$y, na.rm = TRUE)
    xydt["running", y := mean_y, on = "state"]

    surrogate = ranger::ranger(
      y ~ x1 + x2,
      data = xydt,
      num.trees = 100L,
      keep.inbag = TRUE)
    xdt = data.table::data.table(x1 = runif(1000, -5, 10), x2 = runif(1000, 0, 15))
    p = predict(surrogate, xdt, type = "se", se.method = "jack")
    cb = p$predictions - lambda * p$se
    xs = as.list(xdt[which.min(cb)])
    key = rush$push_running_tasks(xss = list(xs))

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

We start the optimization process by starting 4 local workers that run the Bayesian optimization worker loop.

rush$start_local_workers(
  worker_loop = wl_bayesian_optimization,
  n_workers = 4,
  globals = "branin")

The optimization is quickly finished and we retrieve the results.

rush$fetch_finished_tasks()[order(y)]
            x1         x2          y   pid     worker_id          keys
         <num>      <num>      <num> <int>        <char>        <char>
 1: -2.7770693 11.7962754   1.173317 10764 leafed_ang... 25912adf-7...
 2:  9.0569540  2.9240614   1.590496 10764 leafed_ang... 78587175-f...
 3:  3.2263386  0.6147417   2.976679 10764 leafed_ang... baf05b03-3...
 4: -2.3339471  8.5592559   6.819048 10764 leafed_ang... b49804eb-8...
 5:  4.3884598  3.1319762   9.595773 10780 deviative_... 8499f9bb-4...
 6:  4.6552681  2.0687396   9.911778 10764 leafed_ang... 241cbe2a-8...
 7:  1.6355866  1.1878595  15.904347 10764 leafed_ang... 3ebc4a00-b...
 8:  4.7153259  4.2350725  18.250297 10766 unapproach... c21313e4-d...
 9: -0.7500097  8.6577791  18.961782 10764 leafed_ang... cd6ec548-5...
10:  0.2430548  6.0970320  19.546680 10764 leafed_ang... 61cf7973-1...
11:  2.2464979  7.6505639  24.916012 10766 unapproach... 1d9bc66b-1...
12:  0.2874400  3.0175138  25.637861 10764 leafed_ang... d0ac167d-4...
13:  1.0069334  8.2011462  28.620979 10764 leafed_ang... 4059fe2b-8...
14:  1.8257129  8.3836850  31.186585 10764 leafed_ang... 475b9a8b-6...
15: -1.0156122  3.2075491  35.691923 10764 leafed_ang... 62c0324f-a...
16:  7.4857002  6.0830605  36.095228 10764 leafed_ang... 94e79f6c-f...
17:  1.1732346 10.2103177  48.524816 10764 leafed_ang... 5e5d6c20-3...
18:  1.3275163 10.1453221  48.679573 10766 unapproach... 9fe7c811-6...
19:  6.9236242  7.1456599  53.367514 10790 temporary_... 6fdcb9c3-8...
20:  2.5525688 10.7979132  66.316148 10766 unapproach... 5e66130a-a...
21:  2.0202864 11.3248716  70.035723 10764 leafed_ang... 8a1afadb-6...
22:  2.7972474 12.4099060  98.004695 10764 leafed_ang... f435d795-9...
23:  5.3070486 10.2902131  98.156459 10766 unapproach... 40755efe-3...
24:  2.2612493 13.6021446 114.986309 10764 leafed_ang... db6a5ed0-5...
25:  6.5084223 11.1289048 119.663437 10780 deviative_... 94552c17-9...
            x1         x2          y   pid     worker_id          keys