RushWorker evaluates tasks and writes results to the data base. The worker inherits from Rush.
Value
Object of class R6::R6Class and RushWorker
with worker methods.
Super class
rush::Rush
-> RushWorker
Public fields
worker_id
(
character(1)
)
Identifier of the worker.remote
(
logical(1)
)
Whether the worker is on a remote machine.heartbeat
(`r_process“)
Background process for the heartbeat.
Active bindings
terminated
(
logical(1)
)
Whether to shutdown the worker. Used in the worker loop to determine whether to continue.terminated_on_idle
(
logical(1)
)
Whether to shutdown the worker if no tasks are queued. Used in the worker loop to determine whether to continue.
Methods
Inherited methods
rush::Rush$create_worker_script()
rush::Rush$detect_lost_workers()
rush::Rush$fetch_failed_tasks()
rush::Rush$fetch_finished_tasks()
rush::Rush$fetch_new_tasks()
rush::Rush$fetch_priority_tasks()
rush::Rush$fetch_queued_tasks()
rush::Rush$fetch_running_tasks()
rush::Rush$fetch_tasks()
rush::Rush$fetch_tasks_with_state()
rush::Rush$format()
rush::Rush$is_failed_task()
rush::Rush$is_running_task()
rush::Rush$print()
rush::Rush$print_log()
rush::Rush$push_failed()
rush::Rush$push_priority_tasks()
rush::Rush$push_tasks()
rush::Rush$read_hash()
rush::Rush$read_hashes()
rush::Rush$read_log()
rush::Rush$reconnect()
rush::Rush$reset()
rush::Rush$restart_local_workers()
rush::Rush$retry_tasks()
rush::Rush$start_local_workers()
rush::Rush$start_remote_workers()
rush::Rush$stop_workers()
rush::Rush$tasks_with_state()
rush::Rush$wait_for_finished_tasks()
rush::Rush$wait_for_new_tasks()
rush::Rush$wait_for_tasks()
rush::Rush$wait_for_workers()
rush::Rush$write_hashes()
Method new()
Creates a new instance of this R6 class.
Usage
RushWorker$new(
network_id,
config = NULL,
remote,
worker_id = NULL,
heartbeat_period = NULL,
heartbeat_expire = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = 0,
seed = NULL
)
Arguments
network_id
(
character(1)
)
Identifier of the rush network. Controller and workers must have the same instance id. Keys in Redis are prefixed with the instance id.config
(redux::redis_config)
Redis configuration options. IfNULL
, configuration set byrush_plan()
is used. Ifrush_plan()
has not been called, theREDIS_URL
environment variable is parsed. IfREDIS_URL
is not set, a default configuration is used. See redux::redis_config for details.remote
(
logical(1)
)
Whether the worker is started on a remote machine. See Rush for details.worker_id
(
character(1)
)
Identifier of the worker. Keys in redis specific to the worker are prefixed with the worker id.heartbeat_period
(
integer(1)
)
Period of the heartbeat in seconds.heartbeat_expire
(
integer(1)
)
Time to live of the heartbeat in seconds.lgr_thresholds
(named
character()
| namednumeric()
)
Logger threshold on the workers e.g.c(rush = "debug")
.lgr_buffer_size
(
integer(1)
)
By default (lgr_buffer_size = 0
), the log messages are directly saved in the Redis data store. Iflgr_buffer_size > 0
, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.seed
(
integer()
)
Initial seed for the random number generator. Either a L'Ecuyer-CMRG seed (integer(7)
) or a regular RNG seed (integer(1)
). The later is converted to a L'Ecuyer-CMRG seed. IfNULL
, no seed is used for the random number generator.
Method push_running_tasks()
Push a task to running tasks without queue.
Arguments
xss
(list of named
list()
)
Lists of arguments for the function e.g.list(list(x1, x2), list(x1, x2)))
.extra
(
list
)
List of additional information stored along with the task e.g.list(list(timestamp), list(timestamp)))
.
Returns
(character()
)
Keys of the tasks.
Method pop_task()
Pop a task from the queue. Task is moved to the running tasks.
Arguments
timeout
(
numeric(1)
)
Time to wait for task in seconds.fields
(
character()
)
Fields to be returned.
Method set_terminated()
Mark the worker as terminated. Last step in the worker loop before the worker terminates.
Examples
# This example is not executed since Redis must be installed
# \donttest{
config_local = redux::redis_config()
rush = rsh(network_id = "test_network", config = config_local)
#> Error in initialize(...): Can't connect to Redis. Check the configuration.
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_local_workers(fun = fun)
#> Error: object 'rush' not found
rush$stop_workers()
#> Error: object 'rush' not found
# }