branin = function(x1, x2) {
(x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}
rush is a package designed to solve large-scale problems asynchronously across a distributed network. Employing a database centric model, rush enables workers to communicate tasks and their results over a shared Redis
database. This article demonstrates how to use rush
with 3 different examples.
Random Search
We begin with a simple random search to optimize the Branin function in parallel. Although random search does not require communication between workers, it is a good way to introduce the basic ideas behind rush
. The classic Branin function (also called the Branin-Hoo function) is a well-known benchmark problem in global optimization. It is a two-dimensional function that is non-convex, multimodal, and has three global minima. The function is a toy example for optimization thats fast to evaluate but not too simple to be solved.
The Branin function is usually evaluated on the domain \(x_1 \in [-5, 10]\) and \(x_2 \in [0, 15]\).
Worker Loop
We define the worker_loop
function, which runs on each worker. It repeatedly draws tasks, evaluates them, and sends the results to the Redis database. The function takes a single argument: a RushWorker
object, which handles communication with Redis. In this example, each worker samples a random point, creates a task, evaluates it using the Branin function, and submits the result. The optimization stops after 100 tasks have been evaluated.
The most important methods of the RushWorker
are the $push_running_tasks()
and $push_results()
methods. The first method $push_running_tasks()
creates a new task in the Redis database. Since it is evaluated next, the task is marked as running. The $push_running_tasks()
method returns a unique key that is used to identify the task. The second method $push_results()
is used to push the results back to the Redis database. It takes the key of the task and a list of results. To mark the task as running is not important for a random search, but it is crucial for more sophisticated algorithms that use the tasks of other workers to decide which task to evaluate next. For example, Bayesian optimization algorithms would sample the next point further away from the previous points to explore the search space. The $n_finished_tasks
shows how many tasks are finished and is used to stop the worker loop.
Tasks
Tasks are the unit in which workers exchange information. The main components of a task are the key, computational state, input (xs
), and result (ys
). The key is a unique identifier for the task. It identifies the task in the Redis database. The four possible computational states are "running"
, "finished"
, "failed"
, and "queued"
. The $push_running_tasks()
method marks it as "running"
and returns the key of the task. The $push_results()
method marks a task as "finished"
and stores the result. Failed tasks can be marked as "failed"
with the $push_failed()
method. The error catching must be implemented in the worker loop (see Error Handling for more details). Tasks can also be pushed to a queue with the $push_tasks()
method which sets the state to "queued"
. The last example gives more details on the task queue and the different methods to push and pop tasks. The input xs
and result ys
are lists that can contain arbitrary data. Usually the methods of the RushWorker
work on multiple tasks at once, so xxs
and yss
are lists of inputs and results.
Controller
The Rush controller is responsible for starting, observing, and stopping workers within the network. It is initialized using the rsh()
function, which requires a network ID and a config argument. The config argument is a configuration file used to connect to the Redis database via the redux
package.
library(rush)
config = redux::redis_config()
rush = rsh(
network = "test-random-search",
config = config)
Workers can be started using the $start_local_workers()
method, which accepts the worker loop and the number of workers as arguments. The workers are started locally with the processx
package but it is also possible to start workers on a remote machine (see Rush Controller). We need to export the branin
function to the workers, so we set the globals
argument to "branin"
. More on globals and the different worker types can be found in the Rush Controller vignette.
rush$start_local_workers(
worker_loop = wl_random_search,
n_workers = 4,
globals = "branin")
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
The optimization is quickly finished and we retrieve the results. The $fetch_finished_tasks()
method fetches all finished tasks from the database. The method returns a data.table()
with the key, input, and result. The pid
and worker_id
column are additional information that are stored when the task is created. The worker_id
is the id of the worker that evaluated the task and the pid
is the process id of that worker. Further extra information can be passed as list
s to the $push_running_tasks()
and $push_results()
methods via the extra
argument.
rush$fetch_finished_tasks()[order(y)]
x1 x2 y pid worker_id keys
<num> <num> <num> <int> <char> <char>
1: 9.563748 3.0283004 0.678452 10270 raptorial_... 32d5a149-b...
2: 8.855728 2.2311239 1.948806 10250 sinking_co... a359b933-4...
3: 9.572234 1.3549158 2.057785 10261 departed_a... 66a077d7-2...
4: 3.105742 3.7085081 2.379161 10261 departed_a... bf74720d-2...
5: 2.665641 1.6032677 2.614644 10250 sinking_co... e3fe1f8f-f...
---
99: 4.773450 13.9620633 169.741073 10250 sinking_co... 1365f1cd-c...
100: 5.325860 13.6893392 171.813473 10250 sinking_co... 58163adb-b...
101: 7.023869 13.6822190 173.030837 10276 flowered_c... d131f9eb-2...
102: -3.927431 0.4562485 193.297033 10270 raptorial_... 170f858a-6...
103: -4.799375 1.4149137 241.848810 10250 sinking_co... 7a350b06-0...
The rush controller displays how many workers are running and how many tasks exist in each state. In this case, 103 tasks are marked as finished, and all workers have stopped. The number slightly exceeds 100 because workers check the stopping condition independently. If several workers evaluate the condition around the same time — when, for example, 99 tasks are finished — they may all create new tasks before detecting that the limit has been reached. Additionally, tasks may continue to be created while the 100th task is still being evaluated.
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 103
* Failed Tasks: 0
We can stop the workers and reset the database with the $reset()
method.
rush$reset()
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
To learn more about starting, stopping and observing workers, see the Rush Controller vignette.
Median Stopping
Random search is a simple example that doesn’t rely on information from previous tasks and therefore doesn’t require communication between workers. Now, let’s implement a more sophisticated algorithm that uses the results of completed tasks to decide whether to continue evaluating the current one. We tune an XGBoost model on the mtcars dataset and use the median stopping rule to stop the training early.
Worker Loop
The worker starts by sampling a random hyperparameter configuration with three parameters: maximum tree depth, lambda regularization, and alpha regularization. These parameters control how the XGBoost model learns from the data. The worker then trains the model incrementally, starting with 5 boosting rounds and adding one round at a time up to 20 rounds. After each round, the worker evaluates the model’s performance on a test set using root mean squared error (RMSE). At this point, the worker checks how well its model is doing compared to other workers by fetching their completed results and comparing its performance to the median score among all models with the same number of training rounds.
If the current model performs worse than the median, the worker stops this hyperparameter configuration and starts over with a new one. This early stopping mechanism prevents workers from wasting time on poor-performing configurations. If the model performs at or above the median, the worker continues training for one more round. The process continues until the network has evaluated 1000 complete models across all workers.
wl_median_stopping = function(rush) {
while(rush$n_finished_tasks < 1000) {
params = list(
max_depth = sample(1:20, 1),
lambda = runif(1, 0, 1),
alpha = runif(1, 0, 1)
)
model = NULL
for (iteration in seq(5, 20, by = 1)) {
key = rush$push_running_tasks(xss = list(c(params, list(nrounds = iteration))))
model = xgboost(
data = as.matrix(data[training_ids, ]),
label = y[training_ids],
nrounds = if (is.null(model)) 5 else 1,
params = params,
xgb_model = model,
verbose = 0
)
pred = predict(model, as.matrix(data[test_ids, ]))
rmse = sqrt(mean((pred - y[test_ids])^2))
rush$push_results(key, yss = list(list(rmse = rmse)))
tasks = rush$fetch_finished_tasks()
if (rmse > median(tasks[nrounds == iteration, rmse])) break
}
}
}
The worker loop uses a new method called $fetch_finished_tasks()
to fetch all finished tasks from the database. Other methods like $fetch_running_tasks()
and $fetch_failed_tasks()
are also available.
We sample a training and test set from the mtcars dataset. The training set is used to fit the model and the test set is used to evaluate the model. Then we initialize the rush network and start the workers. This time we have to pass the training and test set to the workers via the globals
argument and the packages
argument to load the data.table
and xgboost
packages.
data(mtcars)
training_ids = sample(1:nrow(mtcars), 20)
test_ids = setdiff(1:nrow(mtcars), training_ids)
data = mtcars[, -1]
y = mtcars$mpg
config = redux::redis_config()
rush = rsh(
network = "test-median-stopping",
config = config)
rush$start_local_workers(
worker_loop = wl_median_stopping,
n_workers = 4,
packages = c("data.table", "xgboost"),
globals = c("training_ids", "test_ids", "data", "y"))
We fetch the finished tasks and sort them by the objective value.
rush$fetch_finished_tasks()[order(y)]
Null data.table (0 rows and 0 cols)
We stop the workers and reset the database.ch
rush$reset()
Bayesian Optimization
We implement Asynchronous Distributed Bayesian Optimization (ADBO) [@egele_2023] next. This example shows how workers use information about running tasks and introduces task queues. ADBO runs sequential Bayesian optimization on multiple workers in parallel. Each worker maintains its own surrogate model (a random forest) and selects the next hyperparameter configuration by maximizing the upper confidence bounds acquisition function. To promote a varying exploration-exploitation tradeoff between the workers, the acquisition functions are initialized with different lambda values ranging from 0.1 to 10. When a worker completes an evaluation, it asynchronously sends the result to its peers via a Redis data base; each worker then updates its local model with this shared information. This decentralized design enables workers to proceed independently; eliminating the need for a central coordinator that could become a bottleneck in large-scale optimization scenarios.
We first create a new rush network.
config = redux::redis_config()
rush = rsh(
network = "test-bayesian-optimization",
config = config)
Queues
The queue system works by pushing and popping tasks from a queue. The $push_task()
method creates new tasks and pushes them to the queue. In this example, we draw an initial design of 25 points and push them to the queue.
xss = replicate(25, list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15)), simplify = FALSE)
rush$push_tasks(xss = xss)
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 25
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
We see 25 queued tasks in the database. To retrieve the tasks from the queue, we need to implement the $pop_task()
method in the worker loop.
Worker Loop
The worker loop pops tasks with the $pop_task()
method from the queue. The task is evaluated and the results are pushed back to the database with the $push_results()
method. If there are no more tasks in the queue, the $pop_task()
method returns NULL
and the worker loop starts the Bayesian optimization. First, a lambda value for the acquisition function is sampled between 0.01 and 10. Then all running and finished tasks are fetched from the database. Using rush$fetch_tasks_with_state()
instead of using $fetch_running_tasks()
and $fetch_finished_tasks()
is important because it prevents tasks from appearing twice. This could be the case if a worker changes the state of a task from "running"
to "finished"
while the tasks are being fetched. The missing y values of the running tasks are imputed with the mean of the finished tasks. Then the surrogate random forest model is fitted to the data and the acquisition function is optimized to find the next task. Marking the task as running is important for the Bayesian optimization algorithm, as it uses the already sampled points of the other workers to decide which task to evaluate next. The task is evaluated and the results are pushed back to the database. We stop the optimization process after 100 evaluated tasks.
wl_bayesian_optimization = function(rush) {
repeat {
task = rush$pop_task()
if (is.null(task)) break
ys = list(y = branin(task$xs$x1, task$xs$x2))
rush$push_results(task$key, yss = list(ys))
}
lambda = runif(1, 0.01, 10)
while(rush$n_finished_tasks < 100) {
xydt = rush$fetch_tasks_with_state(states = c("running", "finished"))
mean_y = mean(xydt$y, na.rm = TRUE)
xydt["running", y := mean_y, on = "state"]
surrogate = ranger::ranger(
y ~ x1 + x2,
data = xydt,
num.trees = 100L,
keep.inbag = TRUE)
xdt = data.table::data.table(x1 = runif(1000, -5, 10), x2 = runif(1000, 0, 15))
p = predict(surrogate, xdt, type = "se", se.method = "jack")
cb = p$predictions - lambda * p$se
xs = as.list(xdt[which.min(cb)])
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
We start the optimization process by starting 4 local workers that run the Bayesian optimization worker loop.
rush$start_local_workers(
worker_loop = wl_bayesian_optimization,
n_workers = 4,
globals = "branin")
The optimization is quickly finished and we retrieve the results.
rush$fetch_finished_tasks()[order(y)]
x1 x2 y pid worker_id keys
<num> <num> <num> <int> <char> <char>
1: 8.0712790 1.231383 8.044499 10551 unpaid_ind... 4230972a-7...
2: -2.0393063 12.249731 11.749324 10551 unpaid_ind... ce54cecb-0...
3: -4.4033048 12.877581 14.023798 10577 noncultura... eebe67b4-e...
4: 4.4950509 3.957099 14.184282 10551 unpaid_ind... 18d37620-5...
5: -1.0760614 8.166737 14.651817 10554 befuddled_... 0b3d12d0-c...
6: -1.5775285 11.096980 15.064574 10563 rheophilic... 6c657db7-f...
7: -0.7558544 6.392415 17.769431 10551 unpaid_ind... 16836aee-0...
8: 2.2254901 7.031750 19.628700 10551 unpaid_ind... c19fb978-7...
9: 0.5684921 3.328960 21.360723 10551 unpaid_ind... c89225d4-6...
10: 8.5481046 6.090872 21.971627 10554 befuddled_... 0fcc9f02-c...
11: -4.5436088 12.079039 22.973884 10551 unpaid_ind... 705494ec-c...
12: 3.8540248 6.715065 27.039188 10551 unpaid_ind... 848b762b-c...
13: -2.9288936 6.605772 27.280195 10551 unpaid_ind... fe9f74a4-f...
14: 0.4332007 2.253862 28.207207 10551 unpaid_ind... efa2280e-c...
15: 5.3267880 4.829386 28.797029 10551 unpaid_ind... 1bcd3e87-c...
16: -1.9333967 14.877685 34.871934 10551 unpaid_ind... dce08e15-5...
17: 0.2941682 9.779349 37.136330 10563 rheophilic... 96f1055f-b...
18: -3.6032269 6.323225 51.652968 10554 befuddled_... cd49aa7c-3...
19: 9.4477388 10.374830 62.501034 10554 befuddled_... d17396ac-7...
20: -1.4675507 1.402210 62.998160 10554 befuddled_... e1a85238-3...
21: 8.3002161 9.532786 67.368740 10554 befuddled_... 309f5f02-b...
22: 6.6562701 8.853999 78.603925 10577 noncultura... 4e33ed87-8...
23: -2.3447623 1.542840 82.484284 10551 unpaid_ind... 10ad7dfe-c...
24: 1.9802777 12.629618 92.197705 10551 unpaid_ind... 35e09243-6...
25: 3.1255416 12.373290 102.121266 10551 unpaid_ind... 08a701b7-a...
x1 x2 y pid worker_id keys