branin = function(x1, x2) {
(x2 - 5.1 / (4 * pi^2) * x1^2 + 5 / pi * x1 - 6)^2 + 10 * (1 - 1 / (8 * pi)) * cos(x1) + 10
}
rush is a package designed to solve large-scale problems asynchronously across a distributed network. Employing a database centric model, rush enables workers to communicate tasks and their results over a shared Redis
database. This article demonstrates how to use rush
with 3 different examples.
Random Search
We begin with a simple random search to optimize the Branin function in parallel. Although random search does not require communication between workers, it is a good way to introduce the basic ideas behind rush
. The classic Branin function (also called the Branin-Hoo function) is a well-known benchmark problem in global optimization. It is a two-dimensional function that is non-convex, multimodal, and has three global minima. The function is a toy example for optimization thats fast to evaluate but not too simple to be solved.
The Branin function is usually evaluated on the domain \(x_1 \in [-5, 10]\) and \(x_2 \in [0, 15]\).
Worker Loop
We define the worker_loop
function, which runs on each worker. It repeatedly draws tasks, evaluates them, and sends the results to the Redis database. The function takes a single argument: a RushWorker
object, which handles communication with Redis. In this example, each worker samples a random point, creates a task, evaluates it using the Branin function, and submits the result. The optimization stops after 100 tasks have been evaluated.
The most important methods of the RushWorker
are the $push_running_tasks()
and $push_results()
methods. The first method $push_running_tasks()
creates a new task in the Redis database. Since it is evaluated next, the task is marked as running. The $push_running_tasks()
method returns a unique key that is used to identify the task. The second method $push_results()
is used to push the results back to the Redis database. It takes the key of the task and a list of results. To mark the task as running is not important for a random search, but it is crucial for more sophisticated algorithms that use the tasks of other workers to decide which task to evaluate next. For example, Bayesian optimization algorithms would sample the next point further away from the previous points to explore the search space. The $n_finished_tasks
shows how many tasks are finished and is used to stop the worker loop.
Tasks
Tasks are the unit in which workers exchange information. The main components of a task are the key, computational state, input (xs
), and result (ys
). The key is a unique identifier for the task. It identifies the task in the Redis database. The four possible computational states are "running"
, "finished"
, "failed"
, and "queued"
. The $push_running_tasks()
method marks it as "running"
and returns the key of the task. The $push_results()
method marks a task as "finished"
and stores the result. Failed tasks can be marked as "failed"
with the $push_failed()
method. The error catching must be implemented in the worker loop (see Error Handling for more details). Tasks can also be pushed to a queue with the $push_tasks()
method which sets the state to "queued"
. The last example gives more details on the task queue and the different methods to push and pop tasks. The input xs
and result ys
are lists that can contain arbitrary data. Usually the methods of the RushWorker
work on multiple tasks at once, so xxs
and yss
are lists of inputs and results.
Controller
The Rush controller is responsible for starting, observing, and stopping workers within the network. It is initialized using the rsh()
function, which requires a network ID and a config argument. The config argument is a configuration file used to connect to the Redis database via the redux
package.
library(rush)
config = redux::redis_config()
rush = rsh(
network = "test-random-search",
config = config)
Workers can be started using the $start_local_workers()
method, which accepts the worker loop and the number of workers as arguments. The workers are started locally with the processx
package but it is also possible to start workers on a remote machine (see Rush Controller). We need to export the branin
function to the workers, so we set the globals
argument to "branin"
. More on globals and the different worker types can be found in the Rush Controller vignette.
rush$start_local_workers(
worker_loop = wl_random_search,
n_workers = 4,
globals = "branin")
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
The optimization is quickly finished and we retrieve the results. The $fetch_finished_tasks()
method fetches all finished tasks from the database. The method returns a data.table()
with the key, input, and result. The pid
and worker_id
column are additional information that are stored when the task is created. The worker_id
is the id of the worker that evaluated the task and the pid
is the process id of that worker. Further extra information can be passed as list
s to the $push_running_tasks()
and $push_results()
methods via the extra
argument.
rush$fetch_finished_tasks()[order(y)]
x1 x2 y pid worker_id keys
<num> <num> <num> <int> <char> <char>
1: 9.281184 2.5949004 0.5535283 10605 relevant_g... d07bf8c6-d...
2: -3.354282 12.5625198 0.6669108 10625 conspirato... 26394de7-a...
3: 3.441884 1.1116389 1.7127303 10616 refundable... 1fdcde41-6...
4: 8.998173 3.0972817 2.1774250 10605 relevant_g... 80bab376-e...
5: 9.712449 3.9284190 2.2326368 10605 relevant_g... 014621f2-a...
---
99: 5.805702 13.4348553 170.3253720 10635 expecting_... 4abb06b0-6...
100: 4.766506 14.0035503 170.6598748 10625 conspirato... 9458ebcd-8...
101: 4.614975 14.4389772 178.9139046 10605 relevant_g... bd51e865-f...
102: 5.203284 14.1780076 182.5329975 10616 refundable... cb22cfab-d...
103: -4.625576 0.5670821 251.2429495 10616 refundable... a1ec1422-7...
The rush controller displays how many workers are running and how many tasks exist in each state. In this case, 103 tasks are marked as finished, and all workers have stopped. The number slightly exceeds 100 because workers check the stopping condition independently. If several workers evaluate the condition around the same time — when, for example, 99 tasks are finished — they may all create new tasks before detecting that the limit has been reached. Additionally, tasks may continue to be created while the 100th task is still being evaluated.
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 103
* Failed Tasks: 0
We can stop the workers and reset the database with the $reset()
method.
rush$reset()
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 0
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
To learn more about starting, stopping and observing workers, see the Rush Controller vignette.
Median Stopping
Random search is a simple example that doesn’t rely on information from previous tasks and therefore doesn’t require communication between workers. Now, let’s implement a more sophisticated algorithm that uses the results of completed tasks to decide whether to continue evaluating the current one. We tune an XGBoost model on the mtcars dataset and use the median stopping rule to stop the training early.
Worker Loop
The worker starts by sampling a random hyperparameter configuration with three parameters: maximum tree depth, lambda regularization, and alpha regularization. These parameters control how the XGBoost model learns from the data. The worker then trains the model incrementally, starting with 5 boosting rounds and adding one round at a time up to 20 rounds. After each round, the worker evaluates the model’s performance on a test set using root mean squared error (RMSE). At this point, the worker checks how well its model is doing compared to other workers by fetching their completed results and comparing its performance to the median score among all models with the same number of training rounds.
If the current model performs worse than the median, the worker stops this hyperparameter configuration and starts over with a new one. This early stopping mechanism prevents workers from wasting time on poor-performing configurations. If the model performs at or above the median, the worker continues training for one more round. The process continues until the network has evaluated 1000 complete models across all workers.
wl_median_stopping = function(rush) {
while(rush$n_finished_tasks < 1000) {
params = list(
max_depth = sample(1:20, 1),
lambda = runif(1, 0, 1),
alpha = runif(1, 0, 1)
)
model = NULL
for (iteration in seq(5, 20, by = 1)) {
key = rush$push_running_tasks(xss = list(c(params, list(nrounds = iteration))))
model = xgboost(
data = as.matrix(data[training_ids, ]),
label = y[training_ids],
nrounds = if (is.null(model)) 5 else 1,
params = params,
xgb_model = model,
verbose = 0
)
pred = predict(model, as.matrix(data[test_ids, ]))
rmse = sqrt(mean((pred - y[test_ids])^2))
rush$push_results(key, yss = list(list(rmse = rmse)))
tasks = rush$fetch_finished_tasks()
if (rmse > median(tasks[nrounds == iteration, rmse])) break
}
}
}
The worker loop uses a new method called $fetch_finished_tasks()
to fetch all finished tasks from the database. Other methods like $fetch_running_tasks()
and $fetch_failed_tasks()
are also available.
We sample a training and test set from the mtcars dataset. The training set is used to fit the model and the test set is used to evaluate the model. Then we initialize the rush network and start the workers. This time we have to pass the training and test set to the workers via the globals
argument and the packages
argument to load the data.table
and xgboost
packages.
data(mtcars)
training_ids = sample(1:nrow(mtcars), 20)
test_ids = setdiff(1:nrow(mtcars), training_ids)
data = mtcars[, -1]
y = mtcars$mpg
config = redux::redis_config()
rush = rsh(
network = "test-median-stopping",
config = config)
rush$start_local_workers(
worker_loop = wl_median_stopping,
n_workers = 4,
packages = c("data.table", "xgboost"),
globals = c("training_ids", "test_ids", "data", "y"))
We fetch the finished tasks and sort them by the objective value.
rush$fetch_finished_tasks()[order(y)]
Null data.table (0 rows and 0 cols)
We stop the workers and reset the database.ch
rush$reset()
Bayesian Optimization
We implement Asynchronous Distributed Bayesian Optimization (ADBO) [@egele_2023] next. This example shows how workers use information about running tasks and introduces task queues. ADBO runs sequential Bayesian optimization on multiple workers in parallel. Each worker maintains its own surrogate model (a random forest) and selects the next hyperparameter configuration by maximizing the upper confidence bounds acquisition function. To promote a varying exploration-exploitation tradeoff between the workers, the acquisition functions are initialized with different lambda values ranging from 0.1 to 10. When a worker completes an evaluation, it asynchronously sends the result to its peers via a Redis data base; each worker then updates its local model with this shared information. This decentralized design enables workers to proceed independently; eliminating the need for a central coordinator that could become a bottleneck in large-scale optimization scenarios.
We first create a new rush network.
config = redux::redis_config()
rush = rsh(
network = "test-bayesian-optimization",
config = config)
Queues
The queue system works by pushing and popping tasks from a queue. The $push_task()
method creates new tasks and pushes them to the queue. In this example, we draw an initial design of 25 points and push them to the queue.
xss = replicate(25, list(x1 = runif(1, -5, 10), x2 = runif(1, 0, 15)), simplify = FALSE)
rush$push_tasks(xss = xss)
rush
<Rush>
* Running Workers: 0
* Queued Tasks: 25
* Queued Priority Tasks: 0
* Running Tasks: 0
* Finished Tasks: 0
* Failed Tasks: 0
We see 25 queued tasks in the database. To retrieve the tasks from the queue, we need to implement the $pop_task()
method in the worker loop.
Worker Loop
The worker loop pops tasks with the $pop_task()
method from the queue. The task is evaluated and the results are pushed back to the database with the $push_results()
method. If there are no more tasks in the queue, the $pop_task()
method returns NULL
and the worker loop starts the Bayesian optimization. First, a lambda value for the acquisition function is sampled between 0.01 and 10. Then all running and finished tasks are fetched from the database. Using rush$fetch_tasks_with_state()
instead of using $fetch_running_tasks()
and $fetch_finished_tasks()
is important because it prevents tasks from appearing twice. This could be the case if a worker changes the state of a task from "running"
to "finished"
while the tasks are being fetched. The missing y values of the running tasks are imputed with the mean of the finished tasks. Then the surrogate random forest model is fitted to the data and the acquisition function is optimized to find the next task. Marking the task as running is important for the Bayesian optimization algorithm, as it uses the already sampled points of the other workers to decide which task to evaluate next. The task is evaluated and the results are pushed back to the database. We stop the optimization process after 100 evaluated tasks.
wl_bayesian_optimization = function(rush) {
repeat {
task = rush$pop_task()
if (is.null(task)) break
ys = list(y = branin(task$xs$x1, task$xs$x2))
rush$push_results(task$key, yss = list(ys))
}
lambda = runif(1, 0.01, 10)
while(rush$n_finished_tasks < 100) {
xydt = rush$fetch_tasks_with_state(states = c("running", "finished"))
mean_y = mean(xydt$y, na.rm = TRUE)
xydt["running", y := mean_y, on = "state"]
surrogate = ranger::ranger(
y ~ x1 + x2,
data = xydt,
num.trees = 100L,
keep.inbag = TRUE)
xdt = data.table::data.table(x1 = runif(1000, -5, 10), x2 = runif(1000, 0, 15))
p = predict(surrogate, xdt, type = "se", se.method = "jack")
cb = p$predictions - lambda * p$se
xs = as.list(xdt[which.min(cb)])
key = rush$push_running_tasks(xss = list(xs))
ys = list(y = branin(xs$x1, xs$x2))
rush$push_results(key, yss = list(ys))
}
}
We start the optimization process by starting 4 local workers that run the Bayesian optimization worker loop.
rush$start_local_workers(
worker_loop = wl_bayesian_optimization,
n_workers = 4,
globals = "branin")
The optimization is quickly finished and we retrieve the results.
rush$fetch_finished_tasks()[order(y)]
x1 x2 y pid worker_id keys
<num> <num> <num> <int> <char> <char>
1: -3.5465988 14.1715292 1.988325 10906 individual... c765a914-4...
2: -3.9561508 11.7905122 9.800803 10904 striking_c... 11325115-f...
3: 2.6404297 5.6187441 10.107806 10916 dorky_acor... 0595f68e-d...
4: 5.3597145 0.4184338 16.371765 10916 dorky_acor... 3ed70618-e...
5: 5.6166865 1.5888851 17.752124 10916 dorky_acor... 4ce882e0-1...
6: 9.0259147 6.4810999 19.831284 10916 dorky_acor... 7cad870a-1...
7: -1.0757907 10.3900680 20.954088 10916 dorky_acor... 799ca06e-8...
8: -1.3234278 11.2689326 20.973379 10916 dorky_acor... 132c7162-a...
9: -0.6985079 4.6479156 23.738168 10916 dorky_acor... e0e5647f-5...
10: 3.5086668 7.9087126 35.877924 10904 striking_c... 7cf18129-6...
11: 8.7670259 7.7804853 36.092517 10916 dorky_acor... 37d42cbf-1...
12: 3.7168819 9.4890074 60.006327 10916 dorky_acor... ddaab5af-c...
13: 6.2118489 7.7034025 63.203924 10916 dorky_acor... 048f4073-e...
14: 0.6806039 11.8698920 64.979806 10904 striking_c... 683776dd-2...
15: -3.2085643 3.8999348 73.292843 10916 dorky_acor... 924fee5c-f...
16: 5.8035557 9.3600843 86.509134 10916 dorky_acor... 52503e40-b...
17: 1.3524335 13.1851155 94.913635 10916 dorky_acor... 9df8cb7f-6...
18: -2.1635719 0.1369908 102.866709 10906 individual... 97ec407f-5...
19: -3.7838975 3.3876986 112.230078 10904 striking_c... 6b45a232-e...
20: 1.7150231 13.9214246 114.113309 10916 dorky_acor... 124c7b8e-c...
21: 6.5700628 10.9525911 115.894196 10906 individual... b51b2852-0...
22: 8.2259535 12.3668575 121.373387 10930 wellplease... 9958913d-6...
23: 1.1916306 14.7560253 123.156746 10904 striking_c... e518c347-6...
24: -4.9514762 5.8009244 138.765038 10916 dorky_acor... 557d7910-6...
25: 8.8754930 14.6388090 160.272295 10904 striking_c... c38d85a2-2...
x1 x2 y pid worker_id keys