Skip to contents

rush is a package designed to solve large-scale problems asynchronously across a distributed network. Employing a database centric model, rush enables workers to communicate tasks and their results over a shared Redis database. This article demonstrates how to use rush with 3 different examples.

We begin with a simple random search to optimize the Branin function in parallel. Although random search does not require communication between workers, it is a good way to introduce the basic ideas behind rush. The classic Branin function (also called the Branin-Hoo function) is a well-known benchmark problem in global optimization. It is a two-dimensional function that is non-convex, multimodal, and has three global minima. The function is a toy example for optimization thats fast to evaluate but not too simple to be solved.

branin = function(x1, x2) {
  (x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}

The Branin function is usually evaluated on the domain \(x_1 \in [-5, 10]\) and \(x_2 \in [0, 15]\).

Worker Loop

We define the worker_loop function, which runs on each worker. It repeatedly draws tasks, evaluates them, and sends the results to the Redis database. The function takes a single argument: a RushWorker object, which handles communication with Redis. In this example, each worker samples a random point, creates a task, evaluates it using the Branin function, and submits the result. The optimization stops after 100 tasks have been evaluated.

wl_random_search = function(rush) {

  while(rush$n_finished_tasks < 100) {

    xs = list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15))
    key = rush$push_running_tasks(xss = list(xs))

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

The most important methods of the RushWorker are the $push_running_tasks() and $push_results() methods. The first method $push_running_tasks() creates a new task in the Redis database. Since it is evaluated next, the task is marked as running. The $push_running_tasks() method returns a unique key that is used to identify the task. The second method $push_results() is used to push the results back to the Redis database. It takes the key of the task and a list of results. To mark the task as running is not important for a random search, but it is crucial for more sophisticated algorithms that use the tasks of other workers to decide which task to evaluate next. For example, Bayesian optimization algorithms would sample the next point further away from the previous points to explore the search space. The $n_finished_tasks shows how many tasks are finished and is used to stop the worker loop.

Tasks

Tasks are the unit in which workers exchange information. The main components of a task are the key, computational state, input (xs), and result (ys). The key is a unique identifier for the task. It identifies the task in the Redis database. The four possible computational states are "running", "finished", "failed", and "queued". The $push_running_tasks() method marks it as "running" and returns the key of the task. The $push_results() method marks a task as "finished" and stores the result. Failed tasks can be marked as "failed" with the $push_failed() method. The error catching must be implemented in the worker loop (see Error Handling for more details). Tasks can also be pushed to a queue with the $push_tasks() method which sets the state to "queued". The last example gives more details on the task queue and the different methods to push and pop tasks. The input xs and result ys are lists that can contain arbitrary data. Usually the methods of the RushWorker work on multiple tasks at once, so xxs and yss are lists of inputs and results.

Controller

The Rush controller is responsible for starting, observing, and stopping workers within the network. It is initialized using the rsh() function, which requires a network ID and a config argument. The config argument is a configuration file used to connect to the Redis database via the redux package.

library(rush)

config = redux::redis_config()

rush = rsh(
  network = "test-random-search",
  config = config)

Workers can be started using the $start_local_workers() method, which accepts the worker loop and the number of workers as arguments. The workers are started locally with the processx package but it is also possible to start workers on a remote machine (see Rush Controller). We need to export the branin function to the workers, so we set the globals argument to "branin". More on globals and the different worker types can be found in the Rush Controller vignette.

rush$start_local_workers(
  worker_loop = wl_random_search,
  n_workers = 4,
  globals = "branin")

rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0

The optimization is quickly finished and we retrieve the results. The $fetch_finished_tasks() method fetches all finished tasks from the database. The method returns a data.table() with the key, input, and result. The pid and worker_id column are additional information that are stored when the task is created. The worker_id is the id of the worker that evaluated the task and the pid is the process id of that worker. Further extra information can be passed as lists to the $push_running_tasks() and $push_results() methods via the extra argument.

rush$fetch_finished_tasks()[order(y)]
            x1        x2           y   pid     worker_id          keys
         <num>     <num>       <num> <int>        <char>        <char>
  1:  3.010453  2.492120   0.4930212 10201 divorceabl... c10be881-8...
  2: -2.985163 11.541051   0.6455761 10201 divorceabl... 4f43d867-1...
  3:  2.903184  2.385636   0.6763106 10210 carbonifer... 16f2790b-3...
  4: -3.226419 11.948513   0.7146667 10190 unexplaina... 07740f72-6...
  5:  3.141650  3.515229   1.9361662 10190 unexplaina... b4ff0a76-9...
 ---
 99: -4.557463  3.437565 164.7454679 10210 carbonifer... b073e10d-8...
100:  6.181671 13.168862 165.2559675 10190 unexplaina... 6813cf6a-d...
101: -4.894677  4.335995 169.2208533 10210 carbonifer... d16649a2-4...
102:  6.116174 13.655933 177.1629865 10224 nondemocra... f6763dca-b...
103:  5.631527 14.845578 205.6386336 10201 divorceabl... 0f315a47-1...

The rush controller displays how many workers are running and how many tasks exist in each state. In this case, 103 tasks are marked as finished, and all workers have stopped. The number slightly exceeds 100 because workers check the stopping condition independently. If several workers evaluate the condition around the same time — when, for example, 99 tasks are finished — they may all create new tasks before detecting that the limit has been reached. Additionally, tasks may continue to be created while the 100th task is still being evaluated.

rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 103
* Failed Tasks: 0

We can stop the workers and reset the database with the $reset() method.

rush$reset()

rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0

To learn more about starting, stopping and observing workers, see the Rush Controller vignette.

Median Stopping

Random search is a simple example that doesn’t rely on information from previous tasks and therefore doesn’t require communication between workers. Now, let’s implement a more sophisticated algorithm that uses the results of completed tasks to decide whether to continue evaluating the current one. We tune an XGBoost model on the mtcars dataset and use the median stopping rule to stop the training early.

Worker Loop

The worker starts by sampling a random hyperparameter configuration with three parameters: maximum tree depth, lambda regularization, and alpha regularization. These parameters control how the XGBoost model learns from the data. The worker then trains the model incrementally, starting with 5 boosting rounds and adding one round at a time up to 20 rounds. After each round, the worker evaluates the model’s performance on a test set using root mean squared error (RMSE). At this point, the worker checks how well its model is doing compared to other workers by fetching their completed results and comparing its performance to the median score among all models with the same number of training rounds.

If the current model performs worse than the median, the worker stops this hyperparameter configuration and starts over with a new one. This early stopping mechanism prevents workers from wasting time on poor-performing configurations. If the model performs at or above the median, the worker continues training for one more round. The process continues until the network has evaluated 1000 complete models across all workers.

wl_median_stopping = function(rush) {
  while(rush$n_finished_tasks < 1000) {

    params = list(
      max_depth = sample(1:20, 1),
      lambda = runif(1, 0, 1),
      alpha = runif(1, 0, 1)
    )

    model = NULL
    for (iteration in seq(5, 20, by = 1)) {

      key = rush$push_running_tasks(xss = list(c(params, list(nrounds = iteration))))

      model = xgboost(
        data = as.matrix(data[training_ids, ]),
        label = y[training_ids],
        nrounds = if (is.null(model)) 5 else 1,
        params = params,
        xgb_model = model,
        verbose = 0
      )

      pred = predict(model, as.matrix(data[test_ids, ]))
      rmse = sqrt(mean((pred - y[test_ids])^2))

      rush$push_results(key, yss = list(list(rmse = rmse)))

      tasks = rush$fetch_finished_tasks()
      if (rmse > median(tasks[nrounds == iteration, rmse])) break
    }
  }
}

The worker loop uses a new method called $fetch_finished_tasks() to fetch all finished tasks from the database. Other methods like $fetch_running_tasks() and $fetch_failed_tasks() are also available.

We sample a training and test set from the mtcars dataset. The training set is used to fit the model and the test set is used to evaluate the model. Then we initialize the rush network and start the workers. This time we have to pass the training and test set to the workers via the globals argument and the packages argument to load the data.table and xgboost packages.

data(mtcars)

training_ids = sample(1:nrow(mtcars), 20)
test_ids = setdiff(1:nrow(mtcars), training_ids)
data = mtcars[, -1]
y = mtcars$mpg

config = redux::redis_config()

rush = rsh(
  network = "test-median-stopping",
  config = config)

rush$start_local_workers(
  worker_loop = wl_median_stopping,
  n_workers = 4,
  packages = c("data.table", "xgboost"),
  globals = c("training_ids", "test_ids", "data", "y"))

We fetch the finished tasks and sort them by the objective value.

rush$fetch_finished_tasks()[order(y)]
Null data.table (0 rows and 0 cols)

We stop the workers and reset the database.ch

rush$reset()

Bayesian Optimization

We implement Asynchronous Distributed Bayesian Optimization (ADBO) [@egele_2023] next. This example shows how workers use information about running tasks and introduces task queues. ADBO runs sequential Bayesian optimization on multiple workers in parallel. Each worker maintains its own surrogate model (a random forest) and selects the next hyperparameter configuration by maximizing the upper confidence bounds acquisition function. To promote a varying exploration-exploitation tradeoff between the workers, the acquisition functions are initialized with different lambda values ranging from 0.1 to 10. When a worker completes an evaluation, it asynchronously sends the result to its peers via a Redis data base; each worker then updates its local model with this shared information. This decentralized design enables workers to proceed independently; eliminating the need for a central coordinator that could become a bottleneck in large-scale optimization scenarios.

We first create a new rush network.

config = redux::redis_config()

rush = rsh(
  network = "test-bayesian-optimization",
  config = config)

Queues

The queue system works by pushing and popping tasks from a queue. The $push_task() method creates new tasks and pushes them to the queue. In this example, we draw an initial design of 25 points and push them to the queue.

xss = replicate(25, list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15)), simplify = FALSE)

rush$push_tasks(xss = xss)

rush
<Rush>
* Running Workers: 0
* Queued Tasks: 25
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0

We see 25 queued tasks in the database. To retrieve the tasks from the queue, we need to implement the $pop_task() method in the worker loop.

Worker Loop

The worker loop pops tasks with the $pop_task() method from the queue. The task is evaluated and the results are pushed back to the database with the $push_results() method. If there are no more tasks in the queue, the $pop_task() method returns NULL and the worker loop starts the Bayesian optimization. First, a lambda value for the acquisition function is sampled between 0.01 and 10. Then all running and finished tasks are fetched from the database. Using rush$fetch_tasks_with_state() instead of using $fetch_running_tasks() and $fetch_finished_tasks() is important because it prevents tasks from appearing twice. This could be the case if a worker changes the state of a task from "running" to "finished" while the tasks are being fetched. The missing y values of the running tasks are imputed with the mean of the finished tasks. Then the surrogate random forest model is fitted to the data and the acquisition function is optimized to find the next task. Marking the task as running is important for the Bayesian optimization algorithm, as it uses the already sampled points of the other workers to decide which task to evaluate next. The task is evaluated and the results are pushed back to the database. We stop the optimization process after 100 evaluated tasks.

wl_bayesian_optimization = function(rush) {
  repeat {
    task = rush$pop_task()
    if (is.null(task)) break
    ys = list(y = branin(task$xs$x1, task$xs$x2))
    rush$push_results(task$key, yss = list(ys))
  }

  lambda = runif(1, 0.01, 10)

  while(rush$n_finished_tasks < 100) {

    xydt = rush$fetch_tasks_with_state(states = c("running", "finished"))
    mean_y = mean(xydt$y, na.rm = TRUE)
    xydt["running", y := mean_y, on = "state"]

    surrogate = ranger::ranger(
      y ~ x1 + x2,
      data = xydt,
      num.trees = 100L,
      keep.inbag = TRUE)
    xdt = data.table::data.table(x1 = runif(1000, -5, 10), x2 = runif(1000, 0, 15))
    p = predict(surrogate, xdt, type = "se", se.method = "jack")
    cb = p$predictions - lambda * p$se
    xs = as.list(xdt[which.min(cb)])
    key = rush$push_running_tasks(xss = list(xs))

    ys = list(y = branin(xs$x1, xs$x2))
    rush$push_results(key, yss = list(ys))
  }
}

We start the optimization process by starting 4 local workers that run the Bayesian optimization worker loop.

rush$start_local_workers(
  worker_loop = wl_bayesian_optimization,
  n_workers = 4,
  globals = "branin")

The optimization is quickly finished and we retrieve the results.

rush$fetch_finished_tasks()[order(y)]
            x1        x2          y   pid     worker_id          keys
         <num>     <num>      <num> <int>        <char>        <char>
 1: -2.6769784 10.685317   1.666756 10503 cosey_tape... b9fff33a-f...
 2:  2.2496193  1.994806   5.134405 10488 nonpsychoa... 1fd3f7bb-b...
 3: -4.3935290 13.623118  10.460887 10488 nonpsychoa... 77b2f039-7...
 4: -3.5924812  9.752138  14.554161 10503 cosey_tape... 7a79adec-7...
 5:  5.3919460  2.353489  17.425054 10490 granite_st... f8830b71-5...
 6: -2.1369887  6.323539  18.300012 10490 granite_st... 5bee7738-c...
 7:  4.8647387  5.016571  25.160573 10513 authorized... f9ea289e-c...
 8: -0.8915666  4.030686  28.218887 10490 granite_st... 23f4e9ff-1...
 9: -3.7282986  8.470437  29.661019 10490 granite_st... 97a1ca59-b...
10: -0.8113646  3.568408  31.111726 10488 nonpsychoa... c32bd812-c...
11:  4.7546758  6.016665  32.154155 10490 granite_st... 5984b9c3-4...
12:  1.5507561  9.015283  36.949439 10490 granite_st... 90edf4d6-1...
13:  3.5187058  8.301880  40.795418 10488 nonpsychoa... 6dbb6b3f-7...
14:  1.2916129  9.705392  43.399119 10488 nonpsychoa... 7b11a199-3...
15:  1.5755062  9.975007  47.923029 10490 granite_st... d2505b21-3...
16:  4.6018589  7.779866  49.494782 10490 granite_st... 6cdd1595-2...
17:  6.3984892  7.136202  55.909074 10490 granite_st... a16e3bd6-7...
18:  4.5904442  9.440063  73.213072 10490 granite_st... 7eff03d2-a...
19: -2.1765962  1.479841  78.429428 10490 granite_st... 0a824ca7-7...
20:  6.0134514  9.498227  89.771473 10503 cosey_tape... 767715f5-e...
21:  6.0637882  9.903024  96.878638 10490 granite_st... b78e2ab5-3...
22: -3.0175291  1.503720 110.199709 10490 granite_st... 6c167d03-5...
23:  7.3403273 11.278734 114.732405 10490 granite_st... 9b188615-7...
24:  3.6509299 12.906753 122.516646 10490 granite_st... e5795e8f-9...
25:  3.9912582 14.385259 164.433010 10488 nonpsychoa... 78d151d8-1...
            x1        x2          y   pid     worker_id          keys