RabbitMQ can be used as a storage for jobs. Instead of internal processing, all jobs are routed into common RabbitMQ queue and dequeued as needed, potentialy with more consumers/servers.
This is an advanced guide for complex data processing, using mostly for long jobs and huge data. Do not use by default.
Consumers are used to run jobs from queue (actual workers)
Generators are used to insert jobs into queue (they prepare and manage jobs to be inserted into queue)
There are commands which can be used to manage queues. Use 'queue' for standard jobs and 'equeue' for error jobs.
To run consumers:
fcpc job queue run
To run first job in queue, but do not delete from queue. Used mostly for testing
fcpc job queue run1
To see first job in queue. Used mostly for testing.
fcpc job queue pop
To purge queue
fcpc job queue purge
To see queue statistics
fcpc job queue stats
Note that error job handling can be more complex.
By default, when error occurs, consumer is finished and parent process exits with error. If --jobserver.queue.onjoberror=to-equeue, error jobs are requeued to error queue. You can process/test them later with job management.
To check if there are messages in error queue, run
fcpc job equeue stats
By default, generators and consumers are run for every command when job creation is needed and all of them are stopped when fcpc process exits. So it is transparent for user without any other configuration.
Set --jobserver.consumers to number of simultanous processes which will be automatically spawned.
To run 2 consumers and 2 generators, see example:
fcpc --jobservers.consumers=2 --jobserver.max_jobs=4 job execute assetlinker
It is possible to separate generators and consumers. In theory, generator can be run on other host than consumers. There can be even more hosts, each of them will run consumers. Note that all of nodes needs to have same configuration and if job processing relies on external files, they needs to be accessible from all consumers.
Set --jobserver.consumers=0 on generator
run 'fcpc job queue run' on consumer
Run consumers in background
fcpc --jobservers.consumers=10 job queue run &
Then execute jobs
fcpc --jobservers.consumers=0 job execute assetlinker
[--jobserver.sleep JOBSERVER.SLEEP] [--jobserver.chunksize JOBSERVER.CHUNKSIZE]
[--jobserver.jobs JOBSERVER.JOBS] [--jobserver.rabbitmq.url JOBSERVER.RABBITMQ.URL]
[--jobserver.rabbitmq.enable {0,1}] [--jobserver.rabbitmq.queue JOBSERVER.RABBITMQ.QUEUE]
[--jobserver.rabbitmq.equeue JOBSERVER.RABBITMQ.EQUEUE]
[--jobserver.queue.onjoberror {skip,exit,to-equeue}]
jobserver
optional arguments:
-h, --help show this help message and exit
--jobserver.max_jobs JOBSERVER.MAX_JOBS
Max simultanous jobs to execute in parallel. Use 1 for no forking. Use 0 for actual number of
processors.[Defined:fcplib.job.core] (default: 1)
--jobserver.consumers JOBSERVER.CONSUMERS
Number of consumer (worker) jobs. Defaults to max_jobs - 1[Defined:fcplib.job.core] (default: None)
--jobserver.sleep JOBSERVER.SLEEP
Sleep time in seconds to try to start new job.[Defined:fcplib.job.core] (default: 1)
--jobserver.chunksize JOBSERVER.CHUNKSIZE
Chunk size for jobs.[Defined:fcplib.job.core] (default: 1000)
--jobserver.jobs JOBSERVER.JOBS
List of jobs to enable/disable, separated by comma or 'all'. Use '-' to disable. For example
'all,-simpletagger' to enable all but simpletagger[Defined:fcplib.job.core] (default: all,-linktagger)
--jobserver.rabbitmq.url JOBSERVER.RABBITMQ.URL
RabbitMQ URL[Defined:fcplib.job.core] (default: amqp://guest:guest@localhost/%2f)
--jobserver.rabbitmq.enable {0,1}
Use RabbitMQ as job server[Defined:fcplib.job.core] (default: 0)
Choices:
0
1
--jobserver.rabbitmq.queue JOBSERVER.RABBITMQ.QUEUE
RabbitMQ queue name. Default is temporary, per process[Defined:fcplib.job.core] (default: None)
--jobserver.rabbitmq.equeue JOBSERVER.RABBITMQ.EQUEUE
RabbitMQ error queue name. Default is temporary, per process[Defined:fcplib.job.core] (default: None)
--jobserver.queue.onjoberror {skip,exit,to-equeue}
What to do if there is an error with processing error jobs[Defined:fcplib.job.core] (default: exit)
Choices:
skip Skip error jobs
exit Exit on first error
to-equeue Reroute error to error queue
Job server for managing backround jobs