Workflow Workers

Subscribe here to follow Infinitic's development. Please ⭐️ us on Github!

Infinitic workers can be configured to orchestrate workflows. The roles of workflow workers are:

  • to listen to Pulsar for messages intended for this workflow
  • update workflow history
  • dispatch tasks or sub-workflows based on the workflow definition

Do not start multiple workers with the same configuration on one machine, instead increase the concurrency settings. To increases throughput and resilience, just start multiple workers on multiple servers

Workflow workers can be scaled horizontally.

Starting a workflow worker

We can start a worker with:

import io.infinitic.factory.InfiniticWorkerFactory;
import io.infinitic.worker.InfiniticWorker;

public class App {
    public static void main(String[] args) {
        try(InfiniticWorker worker = InfiniticWorkerFactory.fromConfigFile("infinitic.yml")) {
            worker.start();
        }
    }
}
import io.infinitic.factory.InfiniticWorkerFactory

fun main(args: Array<String>) {
    InfiniticWorkerFactory.fromConfigFile("infinitic.yml").use { worker ->
        worker.start()
    }
}

We can also use .fromConfigResource("/infinitic.yml") if the configuration file is located in the resource folder.

Workflow worker configuration file

Here is an example of a valid infinitic.yml file:

name: optional_worker_name

stateStorage: redis

redis:
  host: localhost
  port: 6379
  user:
  password:
  database: 0

pulsar:
  brokerServiceUrl: pulsar://localhost:6650
  webServiceUrl: http://localhost:8080
  tenant: infinitic
  namespace: dev

workflows:
  - name: example.booking.workflows.BookingWorkflow
    class: example.booking.workflows.BookingWorkflowImpl
    concurrency: 10

This configuration contains

  • a worker name (optional) - when provided, this name must be unique among all our workers and clients connected to the same Pulsar namespace
  • the state storage settings
  • the Pulsar settings
  • and for each workflow:
NameTypeDescription
namestringname of the workflow (its interface per default)
classstringname of the class to instantiate
concurrencyintegermaximum number of messages processed in parallel

A worker must be able to instantiate any class described in its configuration file.

Redis state storage

Example of a configuration for using Redis for state storage:

stateStorage: redis

redis:
  host: 127.0.0.1
  port: 6379
  timeout: 2000
  user: 
  password: 
  database: 0
  ssl: false
  poolConfig:
    maxTotal: -1
    maxIdle: 8
    minIdle: 0

MySQL state storage configuration

Example of a configuration for using MySQL for state storage:

stateStorage: mysql

mysql:
  host: 127.0.0.1
  port: 3306
  user: root
  password: 
  database: infinitic

Caffeine state cache

Per default, Infinitic uses Caffeine as a cache when requesting state storage:

Here is the equivalent default configuration:

stateCache: caffeine

caffeine:
  maximumSize: 10000
  expireAfterAccess: 3600
  expireAfterWrite:

No state cache

Example of a configuration for removing cache when requesting state storage:

stateCache: none
Edit this page on GitHub Updated at Sun, Jul 3, 2022