The Rush
controller manages workers in a rush network.
Value
Object of class R6::R6Class and Rush
with controller methods.
Local Workers
A local worker runs on the same machine as the controller. Local workers are spawned with the `$start_local_workers() method via the processx package.
Remote Workers
A remote worker runs on a different machine than the controller. Remote workers are spawned with the `$start_remote_workers() method via the mirai package.
Script Workers
Workers can be started with a script anywhere.
The only requirement is that the worker can connect to the Redis database.
The script is created with the $worker_script()
method.
Public fields
network_id
(
character(1)
)
Identifier of the rush network.config
(redux::redis_config)
Redis configuration options.connector
(redux::redis_api)
Returns a connection to Redis.processes_processx
(processx::process)
List of processes started with$start_local_workers()
.processes_mirai
(mirai::mirai)
List of mirai processes started with$start_remote_workers()
.
Active bindings
n_workers
(
integer(1)
)
Number of workers.n_running_workers
(
integer(1)
)
Number of running workers.n_terminated_workers
(
integer(1)
)
Number of terminated workers.n_killed_workers
(
integer(1)
)
Number of killed workers.n_lost_workers
(
integer(1)
)
Number of lost workers. Run$detect_lost_workers()
to update the number of lost workers.n_pre_workers
(
integer(1)
)
Number of workers that are not yet completely started.worker_ids
(
character()
)
Ids of workers.running_worker_ids
(
character()
)
Ids of running workers.terminated_worker_ids
(
character()
)
Ids of terminated workers.killed_worker_ids
(
character()
)
Ids of killed workers.lost_worker_ids
(
character()
)
Ids of lost workers.pre_worker_ids
(
character()
)
Ids of workers that are not yet completely started.tasks
(
character()
)
Keys of all tasks.queued_tasks
(
character()
)
Keys of queued tasks.running_tasks
(
character()
)
Keys of running tasks.finished_tasks
(
character()
)
Keys of finished tasks.failed_tasks
(
character()
)
Keys of failed tasks.n_queued_tasks
(
integer(1)
)
Number of queued tasks.n_queued_priority_tasks
(
integer(1)
)
Number of queued priority tasks.n_running_tasks
(
integer(1)
)
Number of running tasks.n_finished_tasks
(
integer(1)
)
Number of finished tasks.n_failed_tasks
(
integer(1)
)
Number of failed tasks.n_tasks
(
integer(1)
)
Number of all tasks.worker_info
(
data.table::data.table()
)
Contains information about the workers.worker_states
(
data.table::data.table()
)
Contains the states of the workers.all_workers_terminated
(
logical(1)
)
Whether all workers are terminated.all_workers_lost
(
logical(1)
)
Whether all workers are lost. Runs$detect_lost_workers()
to detect lost workers.priority_info
(data.table::data.table)
Contains the number of tasks in the priority queues.snapshot_schedule
(
character()
)
Set a snapshot schedule to periodically save the data base on disk. For example,c(60, 1000)
saves the data base every 60 seconds if there are at least 1000 changes. Overwrites the redis configuration file. Set toNULL
to disable snapshots. For more details see redis.io.redis_info
(
list()
)
Information about the Redis server.
Methods
Method new()
Creates a new instance of this R6 class.
Usage
Rush$new(network_id = NULL, config = NULL, seed = NULL)
Arguments
network_id
(
character(1)
)
Identifier of the rush network. Controller and workers must have the same instance id. Keys in Redis are prefixed with the instance id.config
(redux::redis_config)
Redis configuration options. IfNULL
, configuration set byrush_plan()
is used. Ifrush_plan()
has not been called, theREDIS_URL
environment variable is parsed. IfREDIS_URL
is not set, a default configuration is used. See redux::redis_config for details.seed
(
integer()
)
Initial seed for the random number generator. Either a L'Ecuyer-CMRG seed (integer(7)
) or a regular RNG seed (integer(1)
). The later is converted to a L'Ecuyer-CMRG seed. IfNULL
, no seed is used for the random number generator.
Method reconnect()
Reconnect to Redis. The connection breaks when the Rush object is saved to disk. Call this method to reconnect after loading the object.
Method start_local_workers()
Start workers locally with processx
.
The processx::process are stored in $processes_processx
.
Alternatively, use $start_remote_workers()
to start workers on remote machines with mirai
.
Parameters set by rush_plan()
have precedence over the parameters set here.
Usage
Rush$start_local_workers(
worker_loop = NULL,
...,
n_workers = 1,
globals = NULL,
packages = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = NULL,
supervise = TRUE
)
Arguments
worker_loop
(
function
)
Loop run on the workers....
(
any
)
Arguments passed toworker_loop
.n_workers
(
integer(1)
)
Number of workers to be started. Default is1
.globals
(
character()
)
Global variables to be loaded to the workers global environment.packages
(
character()
)
Packages to be loaded by the workers.lgr_thresholds
(named
character()
| namednumeric()
)
Logger threshold on the workers e.g.c(rush = "debug")
.lgr_buffer_size
(
integer(1)
)
By default (lgr_buffer_size = 0
), the log messages are directly saved in the Redis data store. Iflgr_buffer_size > 0
, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.supervise
(
logical(1)
)
Whether to kill the workers when the main R process is shut down.
Method start_remote_workers()
Start workers on remote machines with mirai
.
The mirai::mirai are stored in $processes_mirai
.
Parameters set by rush_plan()
have precedence over the parameters set here.
Usage
Rush$start_remote_workers(
worker_loop,
...,
n_workers = 1,
globals = NULL,
packages = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = NULL
)
Arguments
worker_loop
(
function
)
Loop run on the workers....
(
any
)
Arguments passed toworker_loop
.n_workers
(
integer(1)
)
Number of workers to be started. Default is1
.globals
(
character()
)
Global variables to be loaded to the workers global environment.packages
(
character()
)
Packages to be loaded by the workers.lgr_thresholds
(named
character()
| namednumeric()
)
Logger threshold on the workers e.g.c(rush = "debug")
.lgr_buffer_size
(
integer(1)
)
By default (lgr_buffer_size = 0
), the log messages are directly saved in the Redis data store. Iflgr_buffer_size > 0
, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.
Method worker_script()
Generate a script to start workers.
Usage
Rush$worker_script(
worker_loop,
...,
globals = NULL,
packages = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = NULL,
heartbeat_period = NULL,
heartbeat_expire = NULL
)
Arguments
worker_loop
(
function
)
Loop run on the workers....
(
any
)
Arguments passed toworker_loop
.globals
(
character()
)
Global variables to be loaded to the workers global environment.packages
(
character()
)
Packages to be loaded by the workers.lgr_thresholds
(named
character()
| namednumeric()
)
Logger threshold on the workers e.g.c(rush = "debug")
.lgr_buffer_size
(
integer(1)
)
By default (lgr_buffer_size = 0
), the log messages are directly saved in the Redis data store. Iflgr_buffer_size > 0
, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.heartbeat_period
(
integer(1)
)
Period of the heartbeat in seconds. The heartbeat is updated everyheartbeat_period
seconds.heartbeat_expire
(
integer(1)
)
Time to live of the heartbeat in seconds. The heartbeat key is set to expire afterheartbeat_expire
seconds.
Method restart_workers()
Restart workers. If the worker is is still running, it is killed and restarted.
Arguments
worker_ids
(
character()
)
Worker ids to be restarted.supervise
(
logical(1)
)
Whether to kill the workers when the main R process is shut down.
Method stop_workers()
Stop workers.
Arguments
type
(
character(1)
)
Type of stopping. Either"terminate"
or"kill"
. If"kill"
the workers are stopped immediately. If"terminate"
the workers evaluate the currently running task and then terminate. The"terminate"
option must be implemented in the worker loop.worker_ids
(
character()
)
Worker ids to be stopped. Remote workers must all be killed together. IfNULL
all workers are stopped.
Method read_log()
Read log messages written with the lgr
package from a worker.
Arguments
worker_ids
(
character(1)
)
Worker ids. IfNULL
all worker ids are used.time_difference
(
logical(1)
)
Whether to calculate the time difference between log messages.
Returns
(data.table::data.table()
) with level, timestamp, logger, caller and message, and optionally time difference.
Method push_tasks()
Pushes a task to the queue. Task is added to queued tasks.
Usage
Rush$push_tasks(
xss,
extra = NULL,
seeds = NULL,
timeouts = NULL,
max_retries = NULL,
terminate_workers = FALSE
)
Arguments
xss
(list of named
list()
)
Lists of arguments for the function e.g.list(list(x1, x2), list(x1, x2)))
.extra
(
list()
)
List of additional information stored along with the task e.g.list(list(timestamp), list(timestamp)))
.seeds
(
list()
)
List of L'Ecuyer-CMRG seeds for each task e.glist(list(c(104071, 490840688, 1690070564, -495119766, 503491950, 1801530932, -1629447803)))
. IfNULL
but an initial seed is set, L'Ecuyer-CMRG seeds are generated from the initial seed. IfNULL
and no initial seed is set, no seeds are used for the random number generator.timeouts
(
integer()
)
Timeouts for each task in seconds e.g.c(10, 15)
. A single number is used as the timeout for all tasks. IfNULL
no timeout is set.max_retries
(
integer()
)
Number of retries for each task. A single number is used as the number of retries for all tasks. IfNULL
tasks are not retried.terminate_workers
(
logical(1)
)
Whether to stop the workers after evaluating the tasks.
Returns
(character()
)
Keys of the tasks.
Method push_priority_tasks()
Pushes a task to the queue of a specific worker.
Task is added to queued priority tasks.
A worker evaluates the tasks in the priority queue before the shared queue.
If priority
is NA
the task is added to the shared queue.
If the worker is lost or worker id is not known, the task is added to the shared queue.
Arguments
xss
(list of named
list()
)
Lists of arguments for the function e.g.list(list(x1, x2), list(x1, x2)))
.extra
(
list
)
List of additional information stored along with the task e.g.list(list(timestamp), list(timestamp)))
.priority
(
character()
)
Worker ids to which the tasks should be pushed.
Returns
(character()
)
Keys of the tasks.
Method push_failed()
Pushes failed tasks to the data base.
Arguments
keys
(
character(1)
)
Keys of the associated tasks.conditions
(named
list()
)
List of lists of conditions.
Method fetch_queued_tasks()
Fetch queued tasks from the data base.
Usage
Rush$fetch_queued_tasks(
fields = c("xs", "xs_extra"),
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "xs_extra")
.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method fetch_priority_tasks()
Fetch queued priority tasks from the data base.
Usage
Rush$fetch_priority_tasks(
fields = c("xs", "xs_extra"),
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "xs_extra")
.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method fetch_running_tasks()
Fetch running tasks from the data base.
Usage
Rush$fetch_running_tasks(
fields = c("xs", "xs_extra", "worker_extra"),
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "xs_extra", "worker_extra")
.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method fetch_finished_tasks()
Fetch finished tasks from the data base. Finished tasks are cached.
Usage
Rush$fetch_finished_tasks(
fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"),
reset_cache = FALSE,
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "xs_extra", "worker_extra", "ys", "ys_extra")
.reset_cache
(
logical(1)
)
Whether to reset the cache.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method wait_for_finished_tasks()
Block process until a new finished task is available.
Returns all finished tasks or NULL
if no new task is available after timeout
seconds.
Usage
Rush$wait_for_finished_tasks(
fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra"),
timeout = Inf,
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "xs_extra", "worker_extra", "ys", "ys_extra")
.timeout
(
numeric(1)
)
Time to wait for a result in seconds.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method fetch_new_tasks()
Fetch finished tasks from the data base that finished after the last fetch. Updates the cache of the finished tasks.
Usage
Rush$fetch_new_tasks(
fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"),
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method wait_for_new_tasks()
Block process until a new finished task is available.
Returns new tasks or NULL
if no new task is available after timeout
seconds.
Usage
Rush$wait_for_new_tasks(
fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"),
timeout = Inf,
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "xs_extra", "worker_extra", "ys", "ys_extra")
.timeout
(
numeric(1)
)
Time to wait for new result in seconds.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method fetch_failed_tasks()
Fetch failed tasks from the data base.
Usage
Rush$fetch_failed_tasks(
fields = c("xs", "worker_extra", "condition"),
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "xs_extra", "worker_extra", "condition"
.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method fetch_tasks()
Fetch all tasks from the data base.
Usage
Rush$fetch_tasks(
fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"),
data_format = "data.table"
)
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "xs_extra", "worker_extra", "ys", "ys_extra", "condition", "state")
.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method fetch_tasks_with_state()
Fetch tasks with different states from the data base. If tasks with different states are to be queried at the same time, this function prevents tasks from appearing twice. This could be the case if a worker changes the state of a task while the tasks are being fetched. Finished tasks are cached.
Arguments
fields
(
character()
)
Fields to be read from the hashes. Defaults toc("xs", "ys", "xs_extra", "worker_extra", "ys_extra")
.states
(
character()
)
States of the tasks to be fetched. Defaults toc("queued", "running", "finished", "failed")
.reset_cache
(
logical(1)
)
Whether to reset the cache of the finished tasks.data_format
(
character()
)
Returned data format. Choose"data.table"
or "list". The default is"data.table"
but"list"
is easier when list columns are present.
Method wait_for_tasks()
Wait until tasks are finished. The function also unblocks when no worker is running or all tasks failed.
Arguments
keys
(
character()
)
Keys of the tasks to wait for.detect_lost_workers
(
logical(1)
)
Whether to detect failed tasks. Comes with an overhead.
Method write_hashes()
Writes R objects to Redis hashes.
The function takes the vectors in ...
as input and writes each element as a field-value pair to a new hash.
The name of the argument defines the field into which the serialized element is written.
For example, xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4))
writes serialize(list(x1 = 1, x2 = 2))
at field xs
into a hash and serialize(list(x1 = 3, x2 = 4))
at field xs
into another hash.
The function can iterate over multiple vectors simultaneously.
For example, xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))
creates two hashes with the fields xs
and ys
.
The vectors are recycled to the length of the longest vector.
Both lists and atomic vectors are supported.
Arguments that are NULL
are ignored.
Usage
Rush$write_hashes(..., .values = list(), keys = NULL)
Returns
(character()
)
Keys of the hashes.
Method read_hashes()
Reads R Objects from Redis hashes.
The function reads the field-value pairs of the hashes stored at keys
.
The values of a hash are deserialized and combined to a list.
If flatten
is TRUE
, the values are flattened to a single list e.g. list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3)) becomes list(x1 = 1, x2 = 2, y = 3).
The reading functions combine the hashes to a table where the names of the inner lists are the column names.
For example, xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))
becomes data.table(x1 = c(1, 3), x2 = c(2, 4), y = c(3, 7))
.
Arguments
keys
(
character()
)
Keys of the hashes.fields
(
character()
)
Fields to be read from the hashes.flatten
(
logical(1)
)
Whether to flatten the list.
Returns
(list of list()
)
The outer list contains one element for each key.
The inner list is the combination of the lists stored at the different fields.
Method read_hash()
Reads a single Redis hash and returns the values as a list named by the fields.
Arguments
key
(
character(1)
)
Key of the hash.fields
(
character()
)
Fields to be read from the hash.
Returns
(list of list()
)
The outer list contains one element for each key.
The inner list is the combination of the lists stored at the different fields.
Method is_running_task()
Checks whether tasks have the status "running"
.
Arguments
keys
(
character()
)
Keys of the tasks.
Method is_failed_task()
Checks whether tasks have the status "failed"
.
Arguments
keys
(
character()
)
Keys of the tasks.
Method tasks_with_state()
Returns keys of requested states.
Arguments
states
(
character()
)
States of the tasks.
Returns
(Named list of character()
).
Examples
# This example is not executed since Redis must be installed
# \donttest{
config_local = redux::redis_config()
rush = rsh(network_id = "test_network", config = config_local)
rush
#> <Rush>
#> * Running Workers: 0
#> * Queued Tasks: 0
#> * Queued Priority Tasks: 0
#> * Running Tasks: 0
#> * Finished Tasks: 0
#> * Failed Tasks: 0
# }