Base API and schedulers
The main worker functions are wrapped in an R6 class with
the name of QSys
. This provides a standardized API to the
lower-level
messages that are sent via ZeroMQ.
The base class itself is derived in scheduler classes that add the
required functions for submitting and cleaning up jobs:
+ QSys
|- Multicore
|- LSF
+ SGE
|- PBS
|- Torque
|- etc.
The user-visible object is a worker Pool
that wraps
this, and will eventually allow to manage different workers.
Workers
Creating a worker pool
A pool of workers can be created using the workers()
function, which instantiates a Pool
object of the
corresponding QSys
-derived scheduler class. See
?workers
for details.
# start up a pool of three workers using the default scheduler
w = workers(n_jobs=3)
# if we make an unclean exit for whatever reason, clean up the jobs
on.exit(w$cleanup())
Worker startup
For workers that are started up via a scheduler, we do not know which
machine they will run on. This is why we start up every worker with a
TCP/IP address of the master socket that will distribute work.
This is achieved by the call to R common to all schedulers:
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
Worker communication
On the master’s side, we wait until a worker connects:
msg = w$recv() # this will block until a worker is ready
We can then send any expression to be evaluated on the worker using
the send
method:
After the expression (in ...
), any variables that should
be passed along with the call can be added. For batch processing that
clustermq
usually does, this command is
work_chunk
, where the chunk
data is added:
w$send(clustermq:::work_chunk(chunk, fun, const, rettype, common_seed),
chunk = chunk(iter, submit_index))
Worker environment
We can add any number of objects to a worker environment using the
env
method:
This will also invisibly return a data.frame
with all
objects currently in the environment. If a user wants to inspect the
environment without changing it they can call w$env()
without arguments. The environment will be propagated to all workers
automatically in a greedy fashion.
Main event loop
Putting the above together in an event loop, we get what is
essentially implemented in master
. w$send
invisibly returns an identifier to track which call was submitted, and
w$current()
matches the same to w$recv()
.
w = workers(3)
on.exit(w$cleanup())
w$env(...)
while (we have new work to send || jobs pending) {
res = w$recv() # the result of the call, or NULL for a new worker
w$current()$call_ref # matches answer to request, -1 otherwise
# handle result
if (more work)
call_ref = w$send(expression, ...) # call_ref tracks request identity
else
w$send_shutdown()
}
A loop of a similar structure can be used to extend
clustermq
. As an example, this
was done by the targets package.