branin = function(x1, x2) {
(x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}
rush is a package designed to solve large-scale problems asynchronously across a distributed network. Employing a database centric model, rush enables workers to communicate tasks and their results over a shared Redis
database. This article demonstrates how to use rush
with 3 different examples.
Random Search
We begin with a simple random search to optimize the Branin function in parallel. Although random search does not require communication between workers, it is a good way to introduce the basic ideas behind rush
. The classic Branin function (also called the Branin-Hoo function) is a well-known benchmark problem in global optimization. It is a two-dimensional function that is non-convex, multimodal, and has three global minima. The function is a toy example for optimization thats fast to evaluate but not too simple to be solved.
The Branin function is usually evaluated on the domain \(x_1 \in [-5, 10]\) and \(x_2 \in [0, 15]\).
Worker Loop
We define the worker_loop
function, which runs on each worker. It repeatedly draws tasks, evaluates them, and sends the results to the Redis database. The function takes a single argument: a RushWorker
object, which handles communication with Redis. In this example, each worker samples a random point, creates a task, evaluates it using the Branin function, and submits the result. The optimization stops after 100 tasks have been evaluated.
The most important methods of the RushWorker
are the $push_running_tasks()
and $push_results()
methods. The first method $push_running_tasks()
creates a new task in the Redis database. Since it is evaluated next, the task is marked as running. The $push_running_tasks()
method returns a unique key that is used to identify the task. The second method $push_results()
is used to push the results back to the Redis database. It takes the key of the task and a list of results. To mark the task as running is not important for a random search, but it is crucial for more sophisticated algorithms that use the tasks of other workers to decide which task to evaluate next. For example, Bayesian optimization algorithms would sample the next point further away from the previous points to explore the search space. The $n_finished_tasks
shows how many tasks are finished and is used to stop the worker loop.
Tasks
Tasks are the unit in which workers exchange information. The main components of a task are the key, computational state, input (xs
), and result (ys
). The key is a unique identifier for the task. It identifies the task in the Redis database. The four possible computational states are "running"
, "finished"
, "failed"
, and "queued"
. The $push_running_tasks()
method marks it as "running"
and returns the key of the task. The $push_results()
method marks a task as "finished"
and stores the result. Failed tasks can be marked as "failed"
with the $push_failed()
method. The error catching must be implemented in the worker loop (see Error Handling for more details). Tasks can also be pushed to a queue with the $push_tasks()
method which sets the state to "queued"
. The last example gives more details on the task queue and the different methods to push and pop tasks. The input xs
and result ys
are lists that can contain arbitrary data. Usually the methods of the RushWorker
work on multiple tasks at once, so xxs
and yss
are lists of inputs and results.
Controller
The Rush controller is responsible for starting, observing, and stopping workers within the network. It is initialized using the rsh()
function, which requires a network ID and a config argument. The config argument is a configuration file used to connect to the Redis database via the redux
package.
library(rush)
config = redux::redis_config()
rush = rsh(
network = "test-random-search",
config = config)
Workers can be started using the $start_local_workers()
method, which accepts the worker loop and the number of workers as arguments. The workers are started locally with the processx
package but it is also possible to start workers on a remote machine (see Rush Controller). We need to export the branin
function to the workers, so we set the globals
argument to "branin"
. More on globals and the different worker types can be found in the Rush Controller vignette.
rush$start_local_workers(
worker_loop = wl_random_search,
n_workers = 4,
globals = "branin")
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
The optimization is quickly finished and we retrieve the results. The $fetch_finished_tasks()
method fetches all finished tasks from the database. The method returns a data.table()
with the key, input, and result. The pid
and worker_id
column are additional information that are stored when the task is created. The worker_id
is the id of the worker that evaluated the task and the pid
is the process id of that worker. Further extra information can be passed as list
s to the $push_running_tasks()
and $push_results()
methods via the extra
argument.
rush$fetch_finished_tasks()[order(y)]
x1 x2 y pid worker_id keys
<num> <num> <num> <int> <char> <char>
1: -3.006676 12.5781937 0.8758706 10265 foliaged_g... f03a7e03-5...
2: 9.393793 1.2539774 1.8305450 10265 foliaged_g... cd0a7ffe-b...
3: -2.515989 10.1796893 2.6291066 10256 ribbony_hu... 1e8fa0fa-2...
4: 3.060233 0.8436511 2.6666291 10265 foliaged_g... af534fbd-b...
5: -2.605258 12.2258812 3.1925355 10275 plantsemio... 703fdd28-9...
---
97: 7.667304 13.6300392 161.5627846 10256 ribbony_hu... 67ecf45a-a...
98: 3.723589 14.9876064 174.1846034 10245 diffractiv... 9f6aa063-b...
99: 5.868251 13.9575386 183.8713748 10265 foliaged_g... cf9e915a-e...
100: 4.835928 14.9302730 196.2998395 10265 foliaged_g... 203b7031-9...
101: 5.280994 14.9338636 203.8483776 10265 foliaged_g... c3433e2c-3...
The rush controller displays how many workers are running and how many tasks exist in each state. In this case, 103 tasks are marked as finished, and all workers have stopped. The number slightly exceeds 100 because workers check the stopping condition independently. If several workers evaluate the condition around the same time — when, for example, 99 tasks are finished — they may all create new tasks before detecting that the limit has been reached. Additionally, tasks may continue to be created while the 100th task is still being evaluated.
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 101
* Failed Tasks: 0
We can stop the workers and reset the database with the $reset()
method.
rush$reset()
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
To learn more about starting, stopping and observing workers, see the Rush Controller vignette.
Median Stopping
Random search is a simple example that doesn’t rely on information from previous tasks and therefore doesn’t require communication between workers. Now, let’s implement a more sophisticated algorithm that uses the results of completed tasks to decide whether to continue evaluating the current one. We tune an XGBoost model on the mtcars dataset and use the median stopping rule to stop the training early.
Worker Loop
The worker starts by sampling a random hyperparameter configuration with three parameters: maximum tree depth, lambda regularization, and alpha regularization. These parameters control how the XGBoost model learns from the data. The worker then trains the model incrementally, starting with 5 boosting rounds and adding one round at a time up to 20 rounds. After each round, the worker evaluates the model’s performance on a test set using root mean squared error (RMSE). At this point, the worker checks how well its model is doing compared to other workers by fetching their completed results and comparing its performance to the median score among all models with the same number of training rounds.
If the current model performs worse than the median, the worker stops this hyperparameter configuration and starts over with a new one. This early stopping mechanism prevents workers from wasting time on poor-performing configurations. If the model performs at or above the median, the worker continues training for one more round. The process continues until the network has evaluated 1000 complete models across all workers.
wl_median_stopping = function(rush) {
while(rush$n_finished_tasks < 1000) {
params = list(
max_depth = sample(1:20, 1),
lambda = runif(1, 0, 1),
alpha = runif(1, 0, 1)
)
model = NULL
for (iteration in seq(5, 20, by = 1)) {
key = rush$push_running_tasks(xss = list(c(params, list(nrounds = iteration))))
model = xgboost(
data = as.matrix(data[training_ids, ]),
label = y[training_ids],
nrounds = if (is.null(model)) 5 else 1,
params = params,
xgb_model = model,
verbose = 0
)
pred = predict(model, as.matrix(data[test_ids, ]))
rmse = sqrt(mean((pred - y[test_ids])^2))
rush$push_results(key, yss = list(list(rmse = rmse)))
tasks = rush$fetch_finished_tasks()
if (rmse > median(tasks[nrounds == iteration, rmse])) break
}
}
}
The worker loop uses a new method called $fetch_finished_tasks()
to fetch all finished tasks from the database. Other methods like $fetch_running_tasks()
and $fetch_failed_tasks()
are also available.
We sample a training and test set from the mtcars dataset. The training set is used to fit the model and the test set is used to evaluate the model. Then we initialize the rush network and start the workers. This time we have to pass the training and test set to the workers via the globals
argument and the packages
argument to load the data.table
and xgboost
packages.
data(mtcars)
training_ids = sample(1:nrow(mtcars), 20)
test_ids = setdiff(1:nrow(mtcars), training_ids)
data = mtcars[, -1]
y = mtcars$mpg
config = redux::redis_config()
rush = rsh(
network = "test-median-stopping",
config = config)
rush$start_local_workers(
worker_loop = wl_median_stopping,
n_workers = 4,
packages = c("data.table", "xgboost"),
globals = c("training_ids", "test_ids", "data", "y"))
We fetch the finished tasks and sort them by the objective value.
rush$fetch_finished_tasks()[order(y)]
Null data.table (0 rows and 0 cols)
We stop the workers and reset the database.ch
rush$reset()
Bayesian Optimization
We implement Asynchronous Distributed Bayesian Optimization (ADBO) [@egele_2023] next. This example shows how workers use information about running tasks and introduces task queues. ADBO runs sequential Bayesian optimization on multiple workers in parallel. Each worker maintains its own surrogate model (a random forest) and selects the next hyperparameter configuration by maximizing the upper confidence bounds acquisition function. To promote a varying exploration-exploitation tradeoff between the workers, the acquisition functions are initialized with different lambda values ranging from 0.1 to 10. When a worker completes an evaluation, it asynchronously sends the result to its peers via a Redis data base; each worker then updates its local model with this shared information. This decentralized design enables workers to proceed independently; eliminating the need for a central coordinator that could become a bottleneck in large-scale optimization scenarios.
We first create a new rush network.
config = redux::redis_config()
rush = rsh(
network = "test-bayesian-optimization",
config = config)
Queues
The queue system works by pushing and popping tasks from a queue. The $push_task()
method creates new tasks and pushes them to the queue. In this example, we draw an initial design of 25 points and push them to the queue.
xss = replicate(25, list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15)), simplify = FALSE)
rush$push_tasks(xss = xss)
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 25
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
We see 25 queued tasks in the database. To retrieve the tasks from the queue, we need to implement the $pop_task()
method in the worker loop.
Worker Loop
The worker loop pops tasks with the $pop_task()
method from the queue. The task is evaluated and the results are pushed back to the database with the $push_results()
method. If there are no more tasks in the queue, the $pop_task()
method returns NULL
and the worker loop starts the Bayesian optimization. First, a lambda value for the acquisition function is sampled between 0.01 and 10. Then all running and finished tasks are fetched from the database. Using rush$fetch_tasks_with_state()
instead of using $fetch_running_tasks()
and $fetch_finished_tasks()
is important because it prevents tasks from appearing twice. This could be the case if a worker changes the state of a task from "running"
to "finished"
while the tasks are being fetched. The missing y values of the running tasks are imputed with the mean of the finished tasks. Then the surrogate random forest model is fitted to the data and the acquisition function is optimized to find the next task. Marking the task as running is important for the Bayesian optimization algorithm, as it uses the already sampled points of the other workers to decide which task to evaluate next. The task is evaluated and the results are pushed back to the database. We stop the optimization process after 100 evaluated tasks.
wl_bayesian_optimization = function(rush) {
repeat {
task = rush$pop_task()
if (is.null(task)) break
ys = list(y = branin(task$xs$x1, task$xs$x2))
rush$push_results(task$key, yss = list(ys))
}
lambda = runif(1, 0.01, 10)
while(rush$n_finished_tasks < 100) {
xydt = rush$fetch_tasks_with_state(states = c("running", "finished"))
mean_y = mean(xydt$y, na.rm = TRUE)
xydt["running", y := mean_y, on = "state"]
surrogate = ranger::ranger(
y ~ x1 + x2,
data = xydt,
num.trees = 100L,
keep.inbag = TRUE)
xdt = data.table::data.table(x1 = runif(1000, -5, 10), x2 = runif(1000, 0, 15))
p = predict(surrogate, xdt, type = "se", se.method = "jack")
cb = p$predictions - lambda * p$se
xs = as.list(xdt[which.min(cb)])
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
We start the optimization process by starting 4 local workers that run the Bayesian optimization worker loop.
rush$start_local_workers(
worker_loop = wl_bayesian_optimization,
n_workers = 4,
globals = "branin")
The optimization is quickly finished and we retrieve the results.
rush$fetch_finished_tasks()[order(y)]
x1 x2 y pid worker_id keys
<num> <num> <num> <int> <char> <char>
1: 9.4789970 3.9707666 2.513488 10545 homebrewed... 82551593-3...
2: -3.5432983 12.0414037 2.650262 10545 homebrewed... 587d5cce-6...
3: 2.4457046 0.5958724 7.848923 10561 nonliterar... dc8c3990-9...
4: 1.6067889 5.7283325 13.465132 10561 nonliterar... 3ba7c5d3-8...
5: 1.1983627 3.2872288 14.476201 10561 nonliterar... a023f1b1-0...
6: 0.8837007 5.6722481 17.046695 10570 sloppy_ado... 1733e496-f...
7: 1.6876056 6.6047979 17.423614 10561 nonliterar... 68623e8d-6...
8: 6.3805064 1.6719879 19.878921 10561 nonliterar... d6b0c26d-0...
9: 9.3637869 7.2365480 23.576038 10561 nonliterar... c8e4657b-d...
10: 8.4544151 6.3538970 25.512548 10561 nonliterar... eb196808-5...
11: 9.4777738 7.6647949 26.879605 10561 nonliterar... 16982292-f...
12: -1.3962873 4.5530701 27.041801 10545 homebrewed... 584f56da-f...
13: 8.6980170 7.4120048 32.874292 10545 homebrewed... 88779ade-8...
14: -1.5632584 3.2513731 40.900726 10561 nonliterar... 132a3343-8...
15: 4.1019229 8.1324210 46.579424 10545 homebrewed... 34cbda86-3...
16: 6.8506884 6.5179567 46.808185 10545 homebrewed... 15ca3c71-e...
17: -1.0466517 14.1849046 55.479219 10545 homebrewed... 90c974ac-2...
18: -4.4170727 8.5940883 55.596318 10545 homebrewed... 9aa25b24-c...
19: -1.4530629 1.0405958 68.051658 10547 dihedral_s... 4389dbb3-0...
20: 1.6499133 12.6549530 88.971782 10570 sloppy_ado... b8c310e7-d...
21: 4.2811050 10.7582904 90.703348 10561 nonliterar... 09898598-a...
22: 4.6139028 12.1159477 123.740879 10547 dihedral_s... 8eae7433-0...
23: 7.0865246 12.2125126 137.745409 10545 homebrewed... 0427acff-0...
24: 3.2155564 14.0604033 140.666053 10545 homebrewed... 8ae816e5-6...
25: 9.0973894 14.9547970 163.269261 10561 nonliterar... 5c160110-3...
x1 x2 y pid worker_id keys