Workflow Workers

Infinitic workers can be configured to orchestrate workflows. The roles of workflow workers are:
- to listen to Pulsar for messages intended for this workflow
- update workflow history
- dispatch tasks or sub-workflows based on the workflow definition
Do not start multiple workers with the same configuration on one machine, instead increase the concurrency
settings. To increases throughput and resilience, just start multiple workers on multiple servers
Workflow workers can be scaled horizontally.
Starting a workflow worker
We can start a worker with:
import io.infinitic.factory.InfiniticWorkerFactory;
import io.infinitic.worker.InfiniticWorker;
public class App {
public static void main(String[] args) {
try(InfiniticWorker worker = InfiniticWorkerFactory.fromConfigFile("infinitic.yml")) {
worker.start();
}
}
}
import io.infinitic.factory.InfiniticWorkerFactory
fun main(args: Array<String>) {
InfiniticWorkerFactory.fromConfigFile("infinitic.yml").use { worker ->
worker.start()
}
}
We can also use .fromConfigResource("/infinitic.yml")
if the configuration file is located in the resource folder.
Workflow worker configuration file
Here is an example of a valid infinitic.yml
file:
name: optional_worker_name
stateStorage: redis
redis:
host: localhost
port: 6379
user:
password:
database: 0
pulsar:
brokerServiceUrl: pulsar://localhost:6650
webServiceUrl: http://localhost:8080
tenant: infinitic
namespace: dev
workflows:
- name: example.booking.workflows.BookingWorkflow
class: example.booking.workflows.BookingWorkflowImpl
concurrency: 10
This configuration contains
- a worker name (optional) - when provided, this name must be unique among all our workers and clients connected to the same Pulsar namespace
- the state storage settings
- the Pulsar settings
- and for each workflow:
Name | Type | Description |
---|---|---|
name | string | name of the workflow (its interface per default) |
class | string | name of the class to instantiate |
concurrency | integer | maximum number of messages processed in parallel |
A worker must be able to instantiate any class
described in its configuration file.
Redis state storage
Example of a configuration for using Redis for state storage:
stateStorage: redis
redis:
host: 127.0.0.1
port: 6379
timeout: 2000
user:
password:
database: 0
ssl: false
poolConfig:
maxTotal: -1
maxIdle: 8
minIdle: 0
MySQL state storage configuration
Example of a configuration for using MySQL for state storage:
stateStorage: mysql
mysql:
host: 127.0.0.1
port: 3306
user: root
password:
database: infinitic
Caffeine state cache
Per default, Infinitic uses Caffeine as a cache when requesting state storage:
Here is the equivalent default configuration:
stateCache: caffeine
caffeine:
maximumSize: 10000
expireAfterAccess: 3600
expireAfterWrite:
No state cache
Example of a configuration for removing cache when requesting state storage:
stateCache: none