New in 0.16.2

v0.16.4

Infinitic Workers

Creating a Worker

To build a Worker you need first to add the infinitic-worker dependency into your project:

An Infinitic Worker can be set up throuh builders or using YAML.

Whatever the chosen method, you'll need to:

  1. Specify how to connect to the event broker.

  2. (Optionally) Specify how to connect to the database.

  3. Provide the configuration for the components you want to use.

Builder-based Configuration

An Infinitic Worker can be created using the builder pattern:

It can be configured by adding the different components you want to use.

Transport

A worker requires a transport configuration that specifies the event broker to use and the connection details. This configuration is essential for the worker to communicate with the message-driven infrastructure.

Infinitic currently supports:

  • Pulsar
  • An in-memory implementation for testing purposes

Pulsar

Here is the minimal configuration to create a worker connecting to a Pulsar cluster:

Look at the builders' methods for more options.

In Memory

For testing purposes, you can use an in-memory transport:

This transport should only be used for testing purposes, as it does not persist any messages. It should be used with one worker instance only (embedding all Services and Workflows components), and with the client worker.client.

Databases

Here are the different minimal configurations for the databases. Look at the builders' methods for more details.

Storage is used by the following components: Service Tag Engines, Workflow State Engines, and Workflow Tag Engines.

Redis

Postgres

MySQL

In Memory

This storage should only be used for testing purposes, as it does not persist any data.

Service Executor

Here is the configuration to create a Service Executor associated to a Service named MyService.

This configuration creates a Worker embedding a Service Executor for the MyService service that will process up to 10 tasks in parallel, with a retry policy and a timeout of 100 seconds.

Service Tag Engine

Here is the configuration to create a Service Tag Engine associated to a Service named MyService.

This configuration creates a Worker embedding a Service Tag Engine for the MyService service. The setConcurrency(5) method sets the concurrency to 5, meaning it will process up to five messages at a time.

To prevent race conditions in the database, Infinitic ensures not to have 2 messages with the same tag (and service name) processed at the same time. Therefore, setting a concurrency higher than your number of tags is not beneficial, as it won't increase processing speed for messages with the same tag.

Workflow Executor

Here is the configuration to create a Workflow Executor associated to a Workflow named MyWorkflow.

This configuration creates a Worker embedding a Workflow Executor for the MyWorkflow workflow that will process steps of up to 10 different instances in parallel.

It's also possible to set a retry policy and a timeout for the workflow executor. But this is not recommended, as a workflow implemention is expected to be deterministic.

Workflow State Engine

Here is the configuration to create a Workflow State Engine associated to a Workflow named MyWorkflow.

This configuration creates a Worker embedding a Workflow State Engine for the MyWorkflow workflow. The setConcurrency(10) method sets the concurrency to 10, meaning it will process ten messages at a time.

Infinitic ensures that for a given workflow instance, only one message will be processed at a time, regardless of how many Workflow State Engines are running or their concurrency settings. This guarantees the consistency of the workflow state, and prevents race conditions in the database.

Workflow Tag Engine

Here is the configuration to create a Workflow Tag Engine associated to a Workflow named MyWorkflow.

This configuration creates a Worker embedding a Workflow Tag Engine for the MyWorkflow workflow. The setConcurrency(5) method sets the concurrency to 5, meaning it will process up to five messages at a time.

To prevent race conditions in the database, Infinitic ensures not to have 2 messages with the same tag (and workflow name) processed at the same time. Therefore, setting a concurrency higher than your number of tags is not beneficial, as it won't increase processing speed for messages with the same tag.

Event Listener

This configuration creates a Worker embedding an Event Listener. The setConcurrency(50) method sets the concurrency to 50, meaning it will process up to fifty batches at a time. A batch will be sent to the listener:

  • after 100 received events (default is 1000)
  • or 10 seconds after the first event was received (default is 1 second)

All components

Here is an example of a valid configuration containing all components for a service and a workflow:

YAML-based Configuration

A ServiceExecutor can be created directly from a YAML string, a YAML file or a YAML resource:

Transport

Pulsar

Here is the minimal configuration to create a worker connecting to a Pulsar cluster:

transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650/
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

In Memory

For testing purposes, you can use an in-memory transport:

transport:
  inMemory:

This transport should only be used for testing purposes, as it does not persist any messages. It should be used with one worker instance only (embedding all Services and Workflows components), and with the client worker.client.

Databases

Here are the different minimal configurations for the databases. Look at the configuration files for more details.

Storage is used by the following components: Service Tag Engines, Workflow State Engines, and Workflow Tag Engines.

Redis

storage:
  redis:
    host: localhost
    port: 6379
    username: redis
    password: myRedisPassword

Postgres

storage:
  postgres:
    host: localhost
    port: 5432
    username: postgres
    password: myPostgresPassword

MySQL

storage:
  mysql:
    host: localhost
    port: 3306
    username: root
    password: myMysqlPassword

In Memory

storage:
  inMemory:

This storage should only be used for testing purposes, as it does not persist any data.

Service Executor

Here is the configuration to create a Service Executor associated to a Service named MyService.

transport:
  ... # Transport configuration here

# Configuration of a Service executor
services:
  - name: MyService
    executor:
      class: example.MyServiceImpl
      concurrency: 10
      timeoutSeconds: 100
      retry:
        minimumSeconds: 1  
        maximumSeconds: 1000 
        backoffCoefficient: 2  
        randomFactor: 0.5   
        maximumRetries: 11
        ignoredExceptions:   
          - # fully qualified name of an exception to ignore
          - # fully qualified name of an second exception to ignore

Any class declared in this configuration file must have an empty constructor. If your service requires dependencies, consider using builders to create instances.

Additionally, ensure that the class is public and accessible from the worker's classpath. If the class is part of a module, make sure it's properly exported.

This configuration creates a Worker embedding a Service Executor for the MyService service that will process up to 10 tasks in parallel, with a timeout of 100 seconds and retry policy using a truncated randomized exponential backoff retry strategy.

This retry strategy is designed to efficiently handle transient errors while avoiding overwhelming the system. Here's a breakdown of how it works:

  1. Exponential backoff: The delay between retries increases exponentially with each attempt.
  2. Randomization: A random factor is applied to prevent synchronized retries from parallel tasks.
  3. Truncation: The delay is capped at a maximum value to avoid excessively long waits.

If an exception occurs during task execution that is not listed in ignoredExceptions, and the maximumRetries limit has not been reached, the task will be retried after a calculated delay. The delay (in seconds) is determined by the following formula:

min(
  maximumSeconds, 
  minimumSeconds * (backoffCoefficient ** attempt)) * 
    (1 + randomFactor * (2 * random() - 1)
)

where random() is a random value between 0 and 1.

Service Tag Engine

Here is the configuration to create a Service Tag Engine associated to a Service named MyService.

transport:
  ... # Transport configuration here

# Configuration of a Service Tag Engine
services:
  - name: MyService
    tagEngine:
      concurrency: 5
      storage:
        ... # Storage configuration here

This configuration creates a Worker embedding a Service Tag Engine for the MyService service. The setConcurrency(5) method sets the concurrency to 5, meaning it will process up to five messages at a time.

To prevent race conditions in the database, Infinitic ensures not to have 2 messages with the same tag (and service name) processed at the same time. Therefore, setting a concurrency higher than your number of tags is not beneficial, as it won't increase processing speed for messages with the same tag.

Workflow Executor

Here is the configuration to create a Workflow Executor associated to a Workflow named MyWorkflow.

transport:
  ... # Transport configuration here

# Configuration of a Workflow Executor
workflows:
  - name: MyWorkflow
    executor:
      class: example.MyWorkflowImpl
      concurrency: 10

This configuration creates a Worker embedding a Workflow Executor for the MyWorkflow workflow that will process steps of up to 10 different instances in parallel.

It's also possible to set a retry policy and a timeout for the workflow executor. But this is not recommended, as a workflow implemention is expected to be deterministic.

Workflow State Engine

Here is the configuration to create a Workflow State Engine associated to a Workflow named MyWorkflow.

transport:
  ... # Transport configuration here

# Configuration of a Workflow State Engine
workflows:
  - name: MyWorkflow
    stateEngine:
      concurrency: 10
      storage:
        ... # Storage configuration here

This configuration creates a Worker embedding a Workflow State Engine for the MyWorkflow workflow. The setConcurrency(10) method sets the concurrency to 10, meaning it will process ten messages at a time.

Infinitic ensures that for a given workflow instance, only one message will be processed at a time, regardless of how many Workflow State Engines are running or their concurrency settings. This guarantees the consistency of the workflow state, and prevents race conditions in the database.

Workflow Tag Engine

Here is the configuration to create a Workflow Tag Engine associated to a Workflow named MyWorkflow.

transport:
  ... # Transport configuration here

# Configuration of a Workflow Tag Engine
workflows:
  - name: MyWorkflow
    tagEngine:
      concurrency: 5
      storage:
        ... # Storage configuration here

This configuration creates a Worker embedding a Workflow Tag Engine for the MyWorkflow workflow. The setConcurrency(5) method sets the concurrency to 5, meaning it will process up to five messages at a time.

To prevent race conditions in the database, Infinitic ensures not to have 2 messages with the same tag (and workflow name) processed at the same time. Therefore, setting a concurrency higher than your number of tags is not beneficial, as it won't increase processing speed for messages with the same tag.

Event Listener

transport:
  ... # Transport configuration here

# Configuration of an Event Listener
eventListener:
  class: example.MyEventListener
  concurrency: 50
  batch:
    maxEvents: 100
    maxSeconds: 10.0

This configuration creates a Worker embedding an Event Listener. The setConcurrency(50) method sets the concurrency to 50, meaning it will process up to fifty batches at a time. A batch will be sent to the listener:

  • after 100 received events (default is 1000)
  • or 10 seconds after the first event was received (default is 1 second)

All components

Here is an example of a valid yaml configuration containing all components for a service and a workflow:

# Transport settings
transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

# We can define default Storage settings here to avoid repeating it
storage:
  redis:
    host: localhost
    port: 6379
    username: redis
    password: myRedisPassword

# Configuration of all components for Services MyService
services:
  - name: MyService
    executor:
      class: example.MyServiceImpl
      concurrency: 10
      timeoutSeconds: 100
      retry:
        minimumSeconds: 1  
        maximumSeconds: 1000
        backoffCoefficient: 2  
        randomFactor: 0.5   
        maximumRetries: 11
    tagEngine:
      concurrency: 5

# Configuration of all components for Workflow MyWorkflow
workflows:
  - name: MyWorkflow
    executor:
      class: example.MyWorkflowImpl
      concurrency: 10
    stateEngine:
      concurrency: 10
    tagEngine:
      concurrency: 5

# Configuration of an Event Listener
eventListener:
  listener:
    class: example.MyEventListener
    concurrency: 50

Starting a Worker

Once an Infinitic Worker is created, it can be started with the start() method.