Skip to contents

RushWorker evaluates tasks and writes results to the data base. The worker inherits from Rush.

Value

Object of class R6::R6Class and RushWorker with worker methods.

Note

The worker registers itself in the data base of the rush network.

Super class

rush::Rush -> RushWorker

Public fields

worker_id

(character(1))
Identifier of the worker.

remote

(logical(1))
Whether the worker is on a remote machine.

heartbeat

(`r_process“)
Background process for the heartbeat.

Active bindings

terminated

(logical(1))
Whether to shutdown the worker. Used in the worker loop to determine whether to continue.

terminated_on_idle

(logical(1))
Whether to shutdown the worker if no tasks are queued. Used in the worker loop to determine whether to continue.

Methods

Inherited methods


Method new()

Creates a new instance of this R6 class.

Usage

RushWorker$new(
  network_id,
  config = NULL,
  remote,
  worker_id = NULL,
  heartbeat_period = NULL,
  heartbeat_expire = NULL,
  lgr_thresholds = NULL,
  lgr_buffer_size = 0,
  seed = NULL
)

Arguments

network_id

(character(1))
Identifier of the rush network. Controller and workers must have the same instance id. Keys in Redis are prefixed with the instance id.

config

(redux::redis_config)
Redis configuration options. If NULL, configuration set by rush_plan() is used. If rush_plan() has not been called, the REDIS_URL environment variable is parsed. If REDIS_URL is not set, a default configuration is used. See redux::redis_config for details.

remote

(logical(1))
Whether the worker is started on a remote machine. See Rush for details.

worker_id

(character(1))
Identifier of the worker. Keys in redis specific to the worker are prefixed with the worker id.

heartbeat_period

(integer(1))
Period of the heartbeat in seconds.

heartbeat_expire

(integer(1))
Time to live of the heartbeat in seconds.

lgr_thresholds

(named character() | named numeric())
Logger threshold on the workers e.g. c(rush = "debug").

lgr_buffer_size

(integer(1))
By default (lgr_buffer_size = 0), the log messages are directly saved in the Redis data store. If lgr_buffer_size > 0, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.

seed

(integer())
Initial seed for the random number generator. Either a L'Ecuyer-CMRG seed (integer(7)) or a regular RNG seed (integer(1)). The later is converted to a L'Ecuyer-CMRG seed. If NULL, no seed is used for the random number generator.


Method push_running_tasks()

Push a task to running tasks without queue.

Usage

RushWorker$push_running_tasks(xss, extra = NULL)

Arguments

xss

(list of named list())
Lists of arguments for the function e.g. list(list(x1, x2), list(x1, x2))).

extra

(list)
List of additional information stored along with the task e.g. list(list(timestamp), list(timestamp))).

Returns

(character())
Keys of the tasks.


Method pop_task()

Pop a task from the queue. Task is moved to the running tasks.

Usage

RushWorker$pop_task(timeout = 1, fields = "xs")

Arguments

timeout

(numeric(1))
Time to wait for task in seconds.

fields

(character())
Fields to be returned.


Method push_results()

Pushes results to the data base.

Usage

RushWorker$push_results(keys, yss, extra = NULL)

Arguments

keys

(character(1))
Keys of the associated tasks.

yss

(named list())
List of lists of named results.

extra

(named list())
List of lists of additional information stored along with the results.


Method set_terminated()

Mark the worker as terminated. Last step in the worker loop before the worker terminates.

Usage

RushWorker$set_terminated()


Method clone()

The objects of this class are cloneable with this method.

Usage

RushWorker$clone(deep = FALSE)

Arguments

deep

Whether to make a deep clone.

Examples

# This example is not executed since Redis must be installed
# \donttest{
   config_local = redux::redis_config()
   rush = rsh(network_id = "test_network", config = config_local)
#> Error in initialize(...): Can't connect to Redis. Check the configuration.

   fun = function(x1, x2, ...) list(y = x1 + x2)
   rush$start_local_workers(fun = fun)
#> Error: object 'rush' not found

   rush$stop_workers()
#> Error: object 'rush' not found
# }