Skip to contents

The Rush class manages a rush network by starting, monitoring, and stopping workers. It shares all task-related methods (e.g., fetching results, pushing tasks) with RushWorker. A Rush instance is created with the rsh() function which requires a network ID and a config argument to connect to the Redis database via the redux package.

Value

Object of class R6::R6Class and Rush.

Tasks

Tasks are the unit in which workers exchange information. The main components of a task are the key, computational state, input (xs), and output (ys). The key is a unique identifier for the task in the Redis database. The four possible computational states are "running", "finished", "failed", and "queued". The input xs and output ys are lists that can contain arbitrary data.

Methods to create a task:

  • $push_running_tasks(xss): Create running tasks

  • $push_finished_tasks(xss, yss): Create finished tasks.

  • $push_failed_tasks(xss, conditions): Create failed tasks.

  • $push_tasks(xss): Create queued tasks.

These methods return the key of the created tasks. The methods work on multiple tasks at once, so xss and yss are lists of inputs and outputs. When tasks are fetched, the xss and yss are unpacked so that the names of their inner elements become the columns of the returned table. For example, a xss stored as list(list(x1 = 2, x2 = 3), list(x1 = 4, x2 = 5)) yields x1 and x2 columns, not a xs column. The inner element names must therefore be unique across these fields.

Methods to change the state of an existing task:

  • $finish_tasks(keys, yss): Save the output of tasks and mark them as finished.

  • $fail_tasks(keys, conditions): Mark tasks as failed and optionally save the condition objects.

  • $pop_task(): Pop a task from the queue and mark it as running.

The methods $pop_task(), $push_running_tasks(xss), $finish_tasks(keys, yss), and $fail_tasks(keys, conditions) are only available on RushWorker.

The following methods are used to fetch tasks:

  • $fetch_tasks(): Fetch all tasks.

  • $fetch_finished_tasks(): Fetch finished tasks.

  • $fetch_failed_tasks(): Fetch failed tasks.

  • $fetch_tasks_with_state(): Fetch tasks with different states at once.

  • $fetch_new_tasks(): Fetch new tasks and optionally block until new tasks are available.

The methods return a data.table() with the tasks.

Tasks have the following fields:

  • xs: The input of the task.

  • ys: The output of the task.

  • xs_extra: Metadata created when creating the task.

  • ys_extra: Metadata created when finishing the task.

  • condition: Condition object when the task failed.

  • worker_id: The id of the worker that created the task.

Workers

Workers are spawned with the $start_workers() method on mirai daemons. Use mirai::daemons() to start daemons. Workers can be started on the

Alternatively, workers can be started locally with the $start_local_workers() method via the processx package. Or a help script can be generated with the $worker_script() method that can be run anywhere. The only requirement is that the worker can connect to the Redis database.

Worker Loop

The worker loop is the main function that is run on the workers. It is defined by the user and is passed to the $start_workers() method.

Debugging

The mirai::mirai objects started with $start_workers() are stored in $processes_mirai. Standard output and error of the workers can be written to log files with the message_log and output_log arguments of $start_workers().

Public fields

processes_processx

(processx::process)
List of processes started with $start_local_workers().

processes_mirai

(mirai::mirai)
List of mirai processes started with $start_workers().

Active bindings

network_id

(character(1))
Identifier of the rush network.

config

(redux::redis_config)
Redis configuration options. Assigning a new configuration does not affect the live connection. Call $reconnect() afterwards to connect with the new configuration.

connector

(redux::redis_api)
Returns a connection to Redis.

n_workers

(integer(1))
Number of workers.

n_running_workers

(integer(1))
Number of running workers.

n_terminated_workers

(integer(1))
Number of terminated workers.

worker_ids

(character())
Ids of workers.

running_worker_ids

(character())
Ids of running workers.

terminated_worker_ids

(character())
Ids of terminated workers.

tasks

(character())
Keys of all tasks.

queued_tasks

(character())
Keys of queued tasks.

running_tasks

(character())
Keys of running tasks.

finished_tasks

(character())
Keys of finished tasks.

failed_tasks

(character())
Keys of failed tasks.

n_queued_tasks

(integer(1))
Number of queued tasks.

n_running_tasks

(integer(1))
Number of running tasks.

n_finished_tasks

(integer(1))
Number of finished tasks.

n_failed_tasks

(integer(1))
Number of failed tasks.

n_tasks

(integer(1))
Number of all tasks.

worker_info

(data.table::data.table())
Contains information about the workers.

Methods


Rush$new()

Creates a new instance of this R6 class.

Usage

Rush$new(network_id = NULL, config = NULL)

Arguments

network_id

(character(1))
Identifier of the rush network. Manager and workers must have the same id. Keys in Redis are prefixed with the instance id.

config

(redux::redis_config)
Redis configuration options. If NULL, configuration set by rush_plan() is used. If rush_plan() has not been called, the REDIS_URL environment variable is parsed. If REDIS_URL is not set, a default configuration is used. See redux::redis_config for details.


Rush$format()

Helper for print outputs.

Usage

Rush$format(...)

Arguments

...

(ignored).

Returns

(character()).


Rush$print()

Print method.

Usage

Rush$print()

Returns

(character()).


Rush$reconnect()

Reconnect to Redis. The connection breaks when the Rush object is saved to disk. Call this method to reconnect after loading the object.

Usage

Rush$reconnect()


Rush$start_workers()

Start workers to run the worker loop in mirai::daemons(). Initializes a RushWorker in each process and starts the worker loop.

Usage

Rush$start_workers(
  worker_loop,
  ...,
  n_workers = NULL,
  packages = NULL,
  lgr_thresholds = NULL,
  lgr_buffer_size = NULL,
  message_log = NULL,
  output_log = NULL
)

Arguments

worker_loop

(function)
Loop run on the workers.

...

(any)
Arguments passed to worker_loop.

n_workers

(integer(1))
Number of workers to be started.

packages

(character())
Packages to be loaded by the workers.

lgr_thresholds

(named character() | named numeric())
Logger threshold on the workers e.g. c("mlr3/rush" = "debug").

lgr_buffer_size

(integer(1))
By default (lgr_buffer_size = 0), the log messages are directly saved in the Redis data store. If lgr_buffer_size > 0, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.

message_log

(character(1))
Path to the message log files e.g. /tmp/message_logs/ The message log files are named message_<worker_id>.log. If NULL, no messages, warnings or errors are stored.

output_log

(character(1))
Path to the output log files e.g. /tmp/output_logs/ The output log files are named output_<worker_id>.log. If NULL, no output is stored.


Rush$start_local_workers()

Start workers locally with processx. Initializes a RushWorker in each process and starts the worker loop. Use $wait_for_workers() to wait until the workers are registered in the network.

Usage

Rush$start_local_workers(
  worker_loop,
  ...,
  n_workers = NULL,
  packages = NULL,
  lgr_thresholds = NULL,
  lgr_buffer_size = NULL,
  supervise = TRUE,
  message_log = NULL,
  output_log = NULL
)

Arguments

worker_loop

(function)
Loop run on the workers.

...

(any)
Arguments passed to worker_loop.

n_workers

(integer(1))
Number of workers to be started.

packages

(character())
Packages to be loaded by the workers.

lgr_thresholds

(named character() | named numeric())
Logger threshold on the workers e.g. c("mlr3/rush" = "debug").

lgr_buffer_size

(integer(1))
By default (lgr_buffer_size = 0), the log messages are directly saved in the Redis data store. If lgr_buffer_size > 0, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.

supervise

(logical(1))
Whether to kill the workers when the main R process is shut down.

message_log

(character(1))
Path to the message log files e.g. /tmp/message_logs/ The message log files are named message_<worker_id>.log. If NULL, no messages, warnings or errors are stored.

output_log

(character(1))
Path to the output log files e.g. /tmp/output_logs/ The output log files are named output_<worker_id>.log. If NULL, no output is stored.


Rush$start_remote_workers()

Start workers to run the worker loop in mirai::daemons(). Initializes a RushWorker in each process and starts the worker loop.

Usage

Rush$start_remote_workers(
  worker_loop,
  ...,
  n_workers = NULL,
  packages = NULL,
  lgr_thresholds = NULL,
  lgr_buffer_size = NULL,
  message_log = NULL,
  output_log = NULL
)

Arguments

worker_loop

(function)
Loop run on the workers.

...

(any)
Arguments passed to worker_loop.

n_workers

(integer(1))
Number of workers to be started.

packages

(character())
Packages to be loaded by the workers.

lgr_thresholds

(named character() | named numeric())
Logger threshold on the workers e.g. c("mlr3/rush" = "debug").

lgr_buffer_size

(integer(1))
By default (lgr_buffer_size = 0), the log messages are directly saved in the Redis data store. If lgr_buffer_size > 0, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.

message_log

(character(1))
Path to the message log files e.g. /tmp/message_logs/ The message log files are named message_<worker_id>.log. If NULL, no messages, warnings or errors are stored.

output_log

(character(1))
Path to the output log files e.g. /tmp/output_logs/ The output log files are named output_<worker_id>.log. If NULL, no output is stored.


Rush$worker_script()

Generate a script to start workers. Run this script n times to start n workers. The logged variant of the script redacts the Redis password.

Usage

Rush$worker_script(
  worker_loop,
  ...,
  packages = NULL,
  lgr_thresholds = NULL,
  lgr_buffer_size = NULL,
  heartbeat_period = NULL,
  heartbeat_expire = NULL,
  message_log = NULL,
  output_log = NULL
)

Arguments

worker_loop

(function)
Loop run on the workers.

...

(any)
Arguments passed to worker_loop.

packages

(character())
Packages to be loaded by the workers.

lgr_thresholds

(named character() | named numeric())
Logger threshold on the workers e.g. c("mlr3/rush" = "debug").

lgr_buffer_size

(integer(1))
By default (lgr_buffer_size = 0), the log messages are directly saved in the Redis data store. If lgr_buffer_size > 0, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.

heartbeat_period

(integer(1))
Period of the heartbeat in seconds. The heartbeat is updated every heartbeat_period seconds. Must be at least 1 second.

heartbeat_expire

(integer(1))
Time to live of the heartbeat in seconds. The heartbeat key is set to expire after heartbeat_expire seconds. Must be at least heartbeat_period, otherwise a live worker is reaped as lost between two heartbeats. Set it larger than the longest pause a worker may experience, for example from garbage collection or swapping, because a live worker wrongly declared lost can leave a task in an inconsistent state.

message_log

(character(1))
Path to the message log files e.g. /tmp/message_logs/ The message log files are named message_<worker_id>.log. If NULL, no messages, warnings or errors are stored.

output_log

(character(1))
Path to the output log files e.g. /tmp/output_logs/ The output log files are named output_<worker_id>.log. If NULL, no output is stored.

Returns

(character(1))
Shell command to start a worker.


Rush$wait_for_workers()

Wait until workers are registered in the network. Either n, worker_ids or both must be provided.

Usage

Rush$wait_for_workers(n = NULL, worker_ids = NULL, timeout = NULL)

Arguments

n

(integer(1))
Number of workers to wait for. If NULL, wait for all workers in worker_ids.

worker_ids

(character())
Worker ids to wait for. If NULL, wait for any n workers to be registered.

timeout

(numeric(1))
Timeout in seconds. Defaults to the start_worker_timeout set with rush_plan(), or Inf if none is set. A timeout of 0 checks once and errors immediately if the workers are not yet registered.


Rush$stop_workers()

Stop workers.

Usage

Rush$stop_workers(type = "kill", worker_ids = NULL)

Arguments

type

(character(1))
Type of stopping. Either "terminate" or "kill". If "kill" the workers are stopped immediately. If "terminate" the workers evaluate the currently running task and then terminate. The "terminate" option must be implemented in the worker loop.

worker_ids

(character())
Worker ids to be stopped. If NULL all workers are stopped. Ids that are not currently running are skipped with a warning.


Rush$detect_lost_workers()

Detect lost workers. The state of the worker is changed to "terminated".

Workers started with mirai or processx are monitored through their process handle, so a worker is only declared lost after its process has actually terminated. Workers started from $worker_script() are monitored through a heartbeat and are declared lost when the heartbeat key expires. Because this is a timeout, heartbeat_expire must be larger than the longest pause a worker may experience, for example from garbage collection or swapping. If a live worker is wrongly declared lost, a task it is processing can be recorded in two states at once, for example failed and finished. Set heartbeat_expire conservatively to avoid false positives.

Usage

Rush$detect_lost_workers()

Returns

(character())
Worker ids of detected lost workers.


Rush$reset()

Stop workers and delete data stored in redis.

Usage

Rush$reset(workers = TRUE)

Arguments

workers

(logical(1))
Whether to stop the workers or only delete the data. Default is TRUE.


Rush$read_log()

Read log messages written with the lgr package by the workers.

Usage

Rush$read_log(worker_ids = NULL, time_difference = FALSE)

Arguments

worker_ids

(character())
Worker ids to be read log messages from. Defaults to all worker ids.

time_difference

(logical(1))
Whether to calculate the time difference between log messages.

Returns

data.table()
Table with columns worker_id, level, timestamp, logger, caller and msg, and optionally time_difference.


Rush$print_log()

Print log messages written with the lgr package by the workers. Log messages are printed with the original logger.

Usage

Rush$print_log()

Returns

(Rush)
Invisible self.


Rush$push_tasks()

Create tasks and add them to the queue.

Usage

Rush$push_tasks(xss, xss_extra = NULL, extra = NULL)

Arguments

xss

(list of named list())
Lists of arguments for the function e.g. list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)). If xss is empty, no tasks are created and the method returns an empty character().

xss_extra

(list of named list())
List of additional information stored along with the task e.g. list(list(timestamp_xs = Sys.time()), list(timestamp_xs = Sys.time())).

extra

(list())
Deprecated argument for additional information stored along with the task. Use xss_extra instead.

Returns

(character())
Keys of the tasks.


Rush$push_finished_tasks()

Create finished tasks. See $finish_tasks() for moving existing tasks from running to finished.

Usage

Rush$push_finished_tasks(xss, yss, xss_extra = NULL, yss_extra = NULL)

Arguments

xss

(list of named list())
Lists of arguments for the function e.g. list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)). If xss is empty, no tasks are created and the method returns an empty character().

yss

(list of named list())
Lists of results for the function e.g. list(list(y1 = 1, y2 = 2), list(y1 = 3, y2 = 4)).

xss_extra

(list of named list())
List of additional information stored along with the task e.g. list(list(timestamp_xs = Sys.time()), list(timestamp_xs = Sys.time())).

yss_extra

(list of named list())
List of additional information stored along with the results e.g. list(list(timestamp_ys = Sys.time()), list(timestamp_ys = Sys.time())).

Returns

(character())
Keys of the tasks.


Rush$push_failed_tasks()

Create failed tasks. See $fail_tasks() for moving existing tasks from queued and running to failed.

Usage

Rush$push_failed_tasks(xss, xss_extra = NULL, conditions = NULL)

Arguments

xss

(list of named list())
Lists of arguments for the function e.g. list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)). If xss is empty, no tasks are created and the method returns an empty character().

xss_extra

(list of named list())
List of additional information stored along with the task e.g. list(list(timestamp_xs = Sys.time()), list(timestamp_xs = Sys.time())).

conditions

(list())
List conditions e.g. list(simpleError("Error"), simpleError("Error")). Defaults to list(message = "Task failed").

Returns

(character())
Keys of the tasks.


Rush$empty_queue()

Remove all tasks from the queue. The state of the tasks is set to failed. The condition message is set to "Removed from queue".

Usage

Rush$empty_queue()

Returns

(Rush)
Invisible self.


Rush$fail_tasks()

Deprecated method to move tasks from queued and running to failed.

Usage

Rush$fail_tasks(keys, conditions = NULL)

Arguments

keys

(character())
Keys of the tasks to be moved. Defaults to all queued tasks.

conditions

(list())
List conditions e.g. list(simpleError("Error"), simpleError("Error")). Defaults to list(message = "Task failed").

Returns

(Rush)
Invisible self.


Rush$fetch_tasks()

Fetch all tasks from the database.

Usage

Rush$fetch_tasks(
  fields = c("xs", "ys", "xs_extra", "worker_id", "ys_extra", "condition")
)

Arguments

fields

(character())
Fields to be read from the hashes. Defaults to c("xs", "ys", "xs_extra", "worker_id", "ys_extra", "condition").

Returns

data.table()
Table of all tasks.


Rush$fetch_queued_tasks()

Fetch queued tasks from the database.

Usage

Rush$fetch_queued_tasks(fields = c("xs", "xs_extra"))

Arguments

fields

(character())
Fields to be read from the hashes. Defaults to c("xs", "xs_extra").

Returns

data.table()
Table of queued tasks.


Rush$fetch_running_tasks()

Fetch running tasks from the database.

Usage

Rush$fetch_running_tasks(fields = c("xs", "xs_extra", "worker_id"))

Arguments

fields

(character())
Fields to be read from the hashes. Defaults to c("xs", "xs_extra", "worker_id").

Returns

data.table()
Table of running tasks.


Rush$fetch_failed_tasks()

Fetch failed tasks from the database.

Usage

Rush$fetch_failed_tasks(fields = c("xs", "xs_extra", "worker_id", "condition"))

Arguments

fields

(character())
Fields to be read from the hashes. Defaults to c("xs", "xs_extra", "worker_id", "condition").

Returns

data.table()
Table of failed tasks.


Rush$fetch_finished_tasks()

Fetch finished tasks from the database. Finished tasks are cached.

Usage

Rush$fetch_finished_tasks(
  fields = c("worker_id", "xs", "ys", "xs_extra", "ys_extra", "condition")
)

Arguments

fields

(character())
Fields to be read from the hashes. Defaults to c("worker_id", "xs", "ys", "xs_extra", "ys_extra", "condition"). If the fields change between calls, fields requested only by a later call remain NA for the already cached tasks. Use $reset_cache() to reset the cache in this case.

Returns

data.table()
Table of finished tasks.


Rush$fetch_tasks_with_state()

Fetch tasks with different states from the database. If tasks with different states are to be queried at the same time, this function prevents tasks from appearing twice. This could be the case if a worker changes the state of a task while the tasks are being fetched. Finished tasks are cached.

Usage

Rush$fetch_tasks_with_state(
  fields = c("worker_id", "xs", "ys", "xs_extra", "ys_extra", "condition"),
  states = c("queued", "running", "finished", "failed")
)

Arguments

fields

(character())
Fields to be read from the hashes. Defaults to c("worker_id", "xs", "ys", "xs_extra", "ys_extra", "condition"). If the fields change between calls, fields requested only by a later call remain NA for the already cached tasks. Use $reset_cache() to reset the cache in this case.

states

(character())
States of the tasks to be fetched. Defaults to c("queued", "running", "finished", "failed").


Rush$fetch_new_tasks()

Fetch new tasks that finished after the last call of this function. Updates the cache of the finished tasks. If timeout is set, blocks until new tasks are available or the timeout is reached.

"New" is tracked relative to previous calls of this method only. $fetch_finished_tasks() grows the same cache but does not advance this counter, so a task already returned by $fetch_finished_tasks() is returned again by the next $fetch_new_tasks().

Usage

Rush$fetch_new_tasks(
  fields = c("xs", "ys", "xs_extra", "worker_id", "ys_extra", "condition"),
  timeout = 0
)

Arguments

fields

(character())
Fields to be read from the hashes. If the fields change between calls, fields requested only by a later call remain NA for the already cached tasks. Use $reset_cache() to reset the cache in this case.

timeout

(numeric(1))
Time to wait for new results in seconds. Defaults to 0 (no waiting).

Returns

data.table()
Table of latest results.


Rush$reset_cache()

Reset the cache of the finished tasks.

Usage

Rush$reset_cache()

Returns

(Rush)
Invisible self.


Rush$wait_for_tasks()

Wait until tasks are finished. The function also unblocks when no worker is running or all tasks failed.

Usage

Rush$wait_for_tasks(keys, detect_lost_workers = FALSE)

Arguments

keys

(character())
Keys of the tasks to wait for.

detect_lost_workers

(logical(1))
Whether to detect failed tasks. Comes with an overhead.


Rush$write_hashes()

Writes R objects to Redis hashes. The function takes the vectors in ... as input and writes each element as a field-value pair to a new hash. The name of the argument defines the field into which the serialized element is written. For example, xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)) writes serialize(list(x1 = 1, x2 = 2)) at field xs into a hash and serialize(list(x1 = 3, x2 = 4)) at field xs into another hash. The function can iterate over multiple vectors simultaneously. For example, xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7)) creates two hashes with the fields xs and ys. All value lists must either have the same length (the number of hashes) or length 1, in which case the value is broadcast across all hashes. Other length mismatches raise an error. Both lists and atomic vectors are supported. Arguments that are NULL are ignored.

Usage

Rush$write_hashes(..., .values = list(), keys = NULL)

Arguments

...

(named list())
Lists to be written to the hashes. The names of the arguments are used as fields.

.values

(named list())
Lists to be written to the hashes. The names of the list are used as fields.

keys

(character())
Keys of the hashes. If NULL new keys are generated.

Returns

(character())
Keys of the hashes.


Rush$read_hashes()

Reads R Objects from Redis hashes. The function reads the field-value pairs of the hashes stored at keys. The values of a hash are deserialized and combined to a list. If flatten is TRUE, the values are flattened to a single list e.g. list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3)) becomes list(x1 = 1, x2 = 2, y = 3). The reading functions combine the hashes to a table where the names of the inner lists are the column names. For example, xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7)) becomes data.table(x1 = c(1, 3), x2 = c(2, 4), y = c(3, 7)). Names must be unique across the flattened fields. Colliding names produce duplicate columns, of which only the first is reachable by name.

Usage

Rush$read_hashes(keys, fields, flatten = TRUE)

Arguments

keys

(character())
Keys of the hashes.

fields

(character())
Fields to be read from the hashes.

flatten

(logical(1))
Whether to flatten the list.

Returns

(list of list())
The outer list contains one element for each key. The inner list is the combination of the lists stored at the different fields.


Rush$read_hash()

Reads a single Redis hash and returns the values as a list named by the fields.

Usage

Rush$read_hash(key, fields)

Arguments

key

(character(1))
Key of the hash.

fields

(character())
Fields to be read from the hash.

Returns

(list of list())
The outer list contains one element for each key. The inner list is the combination of the lists stored at the different fields.


Rush$is_running_task()

Checks whether tasks have the status "running".

Usage

Rush$is_running_task(keys)

Arguments

keys

(character())
Keys of the tasks.


Rush$is_failed_task()

Checks whether tasks have the status "failed".

Usage

Rush$is_failed_task(keys)

Arguments

keys

(character())
Keys of the tasks.


Rush$tasks_with_state()

Returns keys of requested states.

Usage

Rush$tasks_with_state(states)

Arguments

states

(character())
States of the tasks.

Returns

(Named list of character()).


Rush$clone()

The objects of this class are cloneable with this method.

Usage

Rush$clone(deep = FALSE)

Arguments

deep

Whether to make a deep clone.

Examples

if (redux::redis_available()) {
  config_local = redux::redis_config()
  rush = rsh(network_id = "test_network", config = config_local)
  rush
}
#> 
#> ── <Rush> ──────────────────────────────────────────────────────────────────────
#> • Running Workers: 0
#> • Queued Tasks: 0
#> • Running Tasks: 0
#> • Finished Tasks: 0
#> • Failed Tasks: 0