Infinitic Workers
Creating a Worker
To build a Worker you need first to add the infinitic-worker
dependency into your project:
dependencies {
...
implementation "io.infinitic:infinitic-worker:0.16.2"
...
}
dependencies {
...
implementation("io.infinitic:infinitic-worker:0.16.2")
...
}
An Infinitic Worker can be set up throuh builders or using YAML.
Whatever the chosen method, you'll need to:
Specify how to connect to the event broker.
(Optionally) Specify how to connect to the database.
Provide the configuration for the components you want to use.
Builder-based Configuration
An Infinitic Worker can be created using the builder pattern:
InfiniticWorker worker = InfiniticWorker.builder()
... // configuration here
.build();
val worker = InfiniticWorker.builder()
... // configuration here
.build()
It can be configured by adding the different components you want to use.
Transport
A worker requires a transport configuration that specifies the event broker to use and the connection details. This configuration is essential for the worker to communicate with the message-driven infrastructure.
Infinitic currently supports:
- Pulsar
- An in-memory implementation for testing purposes
Pulsar
Here is the minimal configuration to create a worker connecting to a Pulsar cluster:
TransportConfig transportConfig = PulsarTransportConfig.builder()
.setBrokerServiceUrl("pulsar://localhost:6650")
.setWebServiceUrl("http://localhost:8080")
.setTenant("infinitic")
.setNamespace("dev")
.build();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
... // other components configuration here
.build();
val transportConfig = PulsarTransportConfig.builder()
.setBrokerServiceUrl("pulsar://localhost:6650")
.setWebServiceUrl("http://localhost:8080")
.setTenant("infinitic")
.setNamespace("dev")
.build()
val worker = InfiniticWorker.builder()
.setTransport(transportConfig)
... // other components configuration here
.build()
Look at the builders' methods for more options.
In Memory
For testing purposes, you can use an in-memory transport:
TransportConfigBuilder transportConfig = new InMemoryTransportConfig();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
... // other components configuration here
.build();
val transportConfig = InMemoryTransportConfig()
val worker = InfiniticWorker.builder()
.setTransport(transportConfig)
... // other components configuration here
.build()
This transport should only be used for testing purposes, as it does not persist any messages. It should be used with one worker instance only (embedding all Services and Workflows components), and with the client worker.client
.
Databases
Here are the different minimal configurations for the databases. Look at the builders' methods for more details.
Storage is used by the following components: Service Tag Engines, Workflow State Engines, and Workflow Tag Engines.
Redis
StorageConfig storageConfig = RedisStorageConfig.builder()
.setHost("localhost")
.setUsername("redis")
.setPassword("********")
.setPort(6379)
.build();
val storageConfig = RedisStorageConfig.builder()
.setHost("localhost")
.setPort(6379)
.setUsername("redis")
.setPassword("********")
.build();
Postgres
StorageConfig storageConfig = PostgresStorageConfig.builder()
.setHost("localhost")
.setPort(5432)
.setUsername("postgres")
.setPassword("********")
.build();
val storageConfig = PostgresStorageConfig.builder()
.setHost("localhost")
.setPort(5432)
.setUsername("postgres")
.setPassword("********")
.build()
MySQL
StorageConfig storageConfig = MySQLStorageConfig.builder()
.setHost("localhost")
.setPort(3306)
.setUsername("root")
.setPassword("********")
.build();
val storageConfig = MySQLStorageConfig.builder()
.setHost("localhost")
.setPort(3306)
.setUsername("root")
.setPassword("********")
.build();
In Memory
StorageConfig storageConfig = InMemoryConfig();
val storageConfig = InMemoryConfig();
This storage should only be used for testing purposes, as it does not persist any data.
Service Executor
Here is the configuration to create a Service Executor associated to a Service named MyService
.
WithRetry withRetry = ... // instance of WithRetry
ServiceExecutorConfig serviceExecutorConfig = ServiceExecutorConfig.builder()
.setServiceName("MyService")
.setFactory(() -> new MyServiceImpl(/* injections here*/))
.setConcurrency(10)
.withRetry(withRetry)
.setTimeoutSeconds(100.0)
.build();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addServiceExecutor(serviceExecutorConfig)
.build();
val withRetry = ... // instance of WithRetry
val serviceExecutorConfig = ServiceExecutorConfig.builder()
.setServiceName("MyService")
.setFactory { MyServiceImpl(/* injections here*/) }
.setConcurrency(10)
.withRetry(withRetry)
.setTimeoutSeconds(100.0)
.build();
val worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addServiceExecutor(serviceExecutorConfig)
.build()
This configuration creates a Worker embedding a Service Executor for the MyService
service that will process up to 10 tasks in parallel, with a retry policy and a timeout of 100 seconds.
Service Tag Engine
Here is the configuration to create a Service Tag Engine associated to a Service named MyService
.
ServiceTagEngineConfig serviceTagEngineConfig = ServiceTagEngineConfig.builder()
.setServiceName("MyService")
.setStorage(storageConfig)
.setConcurrency(5)
.build();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addServiceTagEngine(serviceTagEngineConfig)
.build();
val serviceTagEngineConfig = ServiceTagEngineConfig.builder()
.setServiceName("MyService")
.setStorage(storageConfig)
.setConcurrency(5)
.build();
val worker = InfiniticWorker.builder()
.setTransport(transport)
.addServiceExecutor(serviceExecutorConfig)
.build()
This configuration creates a Worker embedding a Service Tag Engine for the MyService
service. The setConcurrency(5)
method sets the concurrency to 5, meaning it will process up to five messages at a time.
To prevent race conditions in the database, Infinitic ensures not to have 2 messages with the same tag (and service name) processed at the same time. Therefore, setting a concurrency higher than your number of tags is not beneficial, as it won't increase processing speed for messages with the same tag.
Workflow Executor
Here is the configuration to create a Workflow Executor associated to a Workflow named MyWorkflow
.
WorkflowExecutorConfig workflowExecutorConfig = WorkflowExecutorConfig.builder()
.setWorkflowName("MyWorkflow")
.addFactory(() -> new MyWorkflowImpl(/* injections here*/))
.setConcurrency(10)
.build();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addWorkflowExecutor(workflowExecutorConfig)
.build();
val workflowExecutorConfig = WorkflowExecutorConfig.builder()
.setWorkflowName("MyWorkflow")
.addFactory { MyWorkflowImpl(/* injections here*/) }
.setConcurrency(10)
.build();
val worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addWorkflowExecutor(workflowExecutorConfig)
.build()
This configuration creates a Worker embedding a Workflow Executor for the MyWorkflow
workflow that will process steps of up to 10 different instances in parallel.
It's also possible to set a retry policy and a timeout for the workflow executor. But this is not recommended, as a workflow implemention is expected to be deterministic.
Workflow State Engine
Here is the configuration to create a Workflow State Engine associated to a Workflow named MyWorkflow
.
WorkflowStateEngineConfig workflowStateEngineConfig = WorkflowStateEngineConfig.builder()
.setWorkflowName("MyWorkflow")
.setStorage(storageConfig)
.setConcurrency(10)
.build();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addWorkflowStateEngine(workflowStateEngineConfig)
.build();
val workflowStateEngineConfig = WorkflowStateEngineConfig.builder()
.setWorkflowName("MyWorkflow")
.setStorage(storageConfig)
.setConcurrency(10)
.build();
val worker = InfiniticWorker.builder()
.setTransport(transport)
.addWorkflowStateEngine(workflowStateEngineConfig)
.build()
This configuration creates a Worker embedding a Workflow State Engine for the MyWorkflow
workflow. The setConcurrency(10)
method sets the concurrency to 10, meaning it will process ten messages at a time.
Infinitic ensures that for a given workflow instance, only one message will be processed at a time, regardless of how many Workflow State Engines are running or their concurrency settings. This guarantees the consistency of the workflow state, and prevents race conditions in the database.
Workflow Tag Engine
Here is the configuration to create a Workflow Tag Engine associated to a Workflow named MyWorkflow
.
WorkflowTagEngineConfig workflowTagEngineConfig = WorkflowTagEngineConfig.builder()
.setWorkflowName("MyWorkflow")
.setStorage(storageConfig)
.setConcurrency(5)
.build();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addWorkflowTagEngine(workflowTagEngineConfig)
.build();
val workflowTagEngineConfig = WorkflowTagEngineConfig.builder()
.setWorkflowName("MyWorkflow")
.setStorage(storageConfig)
.setConcurrency(5)
.build();
val worker = InfiniticWorker.builder()
.setTransport(transport)
.addWorkflowTagEngine(workflowTagEngineConfig)
.build()
This configuration creates a Worker embedding a Workflow Tag Engine for the MyWorkflow
workflow. The setConcurrency(5)
method sets the concurrency to 5, meaning it will process up to five messages at a time.
To prevent race conditions in the database, Infinitic ensures not to have 2 messages with the same tag (and workflow name) processed at the same time. Therefore, setting a concurrency higher than your number of tags is not beneficial, as it won't increase processing speed for messages with the same tag.
Event Listener
EventListenerConfig eventListenerConfig = EventListenerConfig.builder()
.setListener(new MyEventListener())
.setConcurrency(50)
.setBatch(100, 10.0)
.build();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.setEventListener(eventListenerConfig)
.build();
val eventListenerConfig = EventListenerConfig.builder()
.setListener(MyEventListener())
.setConcurrency(50)
.setBatch(100, 10.0)
.build();
val worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.setEventListener(eventListenerConfig)
.build()
This configuration creates a Worker embedding an Event Listener. The setConcurrency(50)
method sets the concurrency to 50, meaning it will process up to fifty batches at a time. A batch will be sent to the listener:
- after 100 received events (default is 1000)
- or 10 seconds after the first event was received (default is 1 second)
All components
Here is an example of a valid configuration containing all components for a service and a workflow:
TransportConfig transportConfig = PulsarTransportConfig.builder()
.setBrokerServiceUrl("pulsar://localhost:6650")
.setWebServiceUrl("http://localhost:8080")
.setTenant("infinitic")
.setNamespace("dev")
.build();
StorageConfig storageConfig = PostgresStorageConfig.builder()
.setHost("localhost")
.setPort(5432)
.setUsername("postgres")
.setPassword("********")
.build();
ServiceExecutorConfig serviceExecutorConfig = ServiceExecutorConfig.builder()
.setServiceName("MyService")
.setFactory(() -> new MyServiceImpl(/* injections here*/))
.setConcurrency(10)
.withRetry(withRetry)
.setTimeoutSeconds(100.0)
.build();
ServiceTagEngineConfig serviceTagEngineConfig = ServiceTagEngineConfig.builder()
.setServiceName("MyService")
.setStorage(storageConfig)
.setConcurrency(5)
.build();
WorkflowExecutorConfig workflowExecutorConfig = WorkflowExecutorConfig.builder()
.setWorkflowName("MyWorkflow")
.addFactory(() -> new MyWorkflowImpl(/* injections here*/))
.setConcurrency(10)
.build();
WorkflowStateEngineConfig workflowStateEngineConfig = WorkflowStateEngineConfig.builder()
.setWorkflowName("MyWorkflow")
.setStorage(storageConfig)
.setConcurrency(10)
.build();
WorkflowTagEngineConfig workflowTagEngineConfig = WorkflowTagEngineConfig.builder()
.setWorkflowName("MyWorkflow")
.setStorage(storageConfig)
.setConcurrency(5)
.build();
EventListenerConfig eventListener = EventListenerConfig.builder()
.setListener(new MyEventListener())
.setConcurrency(50)
.build();
InfiniticWorker worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addServiceExecutor(serviceExecutorConfig)
.addServiceTagEngine(serviceTagEngineConfig)
.addWorkflowExecutor(workflowExecutorConfig)
.addWorkflowStateEngine(workflowStateEngineConfig)
.addWorkflowTagEngine(workflowTagEngineConfig)
.setEventListener(eventListener)
.build();
val transportConfig = PulsarTransportConfig.builder()
.setBrokerServiceUrl("pulsar://localhost:6650")
.setWebServiceUrl("http://localhost:8080")
.setTenant("infinitic")
.setNamespace("dev")
.build()
val storageConfig = PostgresStorageConfig.builder()
.setHost("localhost")
.setPort(5432)
.setUsername("postgres")
.setPassword("********")
.build()
val serviceExecutorConfig = ServiceExecutorConfig.builder()
.setServiceName("MyService")
.setFactory { MyServiceImpl(/* injections here*/) }
.setConcurrency(10)
.withRetry(withRetry)
.setTimeoutSeconds(100.0)
.build()
val serviceTagEngineConfig = ServiceTagEngineConfig.builder()
.setServiceName("MyService")
.setStorage(storageConfig)
.setConcurrency(5)
.build()
val workflowExecutorConfig = WorkflowExecutorConfig.builder()
.setWorkflowName("MyWorkflow")
.addFactory { MyWorkflowImpl(/* injections here*/) }
.setConcurrency(10)
.build()
val workflowStateEngineConfig = WorkflowStateEngineConfig.builder()
.setWorkflowName("MyWorkflow")
.setStorage(storageConfig)
.setConcurrency(10)
.build()
val workflowTagEngineConfig = WorkflowTagEngineConfig.builder()
.setWorkflowName("MyWorkflow")
.setStorage(storageConfig)
.setConcurrency(5)
.build()
val eventListenerConfig = EventListenerConfig.builder()
.setListener(MyEventListener())
.setConcurrency(50)
.build()
val worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.addServiceExecutor(serviceExecutorConfig)
.addServiceTagEngine(serviceTagEngineConfig)
.addWorkflowExecutor(workflowExecutorConfig)
.addWorkflowStateEngine(workflowStateEngineConfig)
.addWorkflowTagEngine(workflowTagEngineConfig)
.setEventListener(eventListenerConfig)
.build()
YAML-based Configuration
A ServiceExecutor can be created directly from a YAML string, a YAML file or a YAML resource:
// From a YAML string
InfiniticWorker worker = InfiniticWorker.fromYamlString("yaml content here");
// From a YAML file
InfiniticWorker worker = InfiniticWorker.fromYamlFile("infinitic.yml");
// From a YAML resource
InfiniticWorker worker = InfiniticWorker.fromYamlResource("/path/to/infinitic.yml");
// From a YAML string
val worker = InfiniticWorker.fromYamlString("yaml content here")
// From a YAML file
val worker = InfiniticWorker.fromYamlFile("infinitic.yml")
// From a YAML resource
val worker = InfiniticWorker.fromYamlResource("/path/to/infinitic.yml")
Transport
Pulsar
Here is the minimal configuration to create a worker connecting to a Pulsar cluster:
transport:
pulsar:
brokerServiceUrl: pulsar://localhost:6650/
webServiceUrl: http://localhost:8080
tenant: infinitic
namespace: dev
In Memory
For testing purposes, you can use an in-memory transport:
transport:
inMemory:
This transport should only be used for testing purposes, as it does not persist any messages. It should be used with one worker instance only (embedding all Services and Workflows components), and with the client worker.client
.
Databases
Here are the different minimal configurations for the databases. Look at the configuration files for more details.
Storage is used by the following components: Service Tag Engines, Workflow State Engines, and Workflow Tag Engines.
Redis
storage:
redis:
host: localhost
port: 6379
username: redis
password: myRedisPassword
Postgres
storage:
postgres:
host: localhost
port: 5432
username: postgres
password: myPostgresPassword
MySQL
storage:
mysql:
host: localhost
port: 3306
username: root
password: myMysqlPassword
In Memory
storage:
inMemory:
This storage should only be used for testing purposes, as it does not persist any data.
Service Executor
Here is the configuration to create a Service Executor associated to a Service named MyService
.
transport:
... # Transport configuration here
# Configuration of a Service executor
services:
- name: MyService
executor:
class: example.MyServiceImpl
concurrency: 10
timeoutSeconds: 100
retry:
minimumSeconds: 1
maximumSeconds: 1000
backoffCoefficient: 2
randomFactor: 0.5
maximumRetries: 11
ignoredExceptions:
- # fully qualified name of an exception to ignore
- # fully qualified name of an second exception to ignore
Any class
declared in this configuration file must have an empty constructor. If your service requires dependencies, consider using builders to create instances.
Additionally, ensure that the class is public and accessible from the worker's classpath. If the class is part of a module, make sure it's properly exported.
This configuration creates a Worker embedding a Service Executor for the MyService
service that will process up to 10 tasks in parallel, with a timeout of 100 seconds and retry policy using a truncated randomized exponential backoff retry strategy.
This retry strategy is designed to efficiently handle transient errors while avoiding overwhelming the system. Here's a breakdown of how it works:
- Exponential backoff: The delay between retries increases exponentially with each attempt.
- Randomization: A random factor is applied to prevent synchronized retries from parallel tasks.
- Truncation: The delay is capped at a maximum value to avoid excessively long waits.
If an exception occurs during task execution that is not listed in ignoredExceptions
, and the maximumRetries
limit has not been reached, the task will be retried after a calculated delay. The delay (in seconds) is determined by the following formula:
min(
maximumSeconds,
minimumSeconds * (backoffCoefficient ** attempt)) *
(1 + randomFactor * (2 * random() - 1)
)
where random()
is a random value between 0
and 1
.
Service Tag Engine
Here is the configuration to create a Service Tag Engine associated to a Service named MyService
.
transport:
... # Transport configuration here
# Configuration of a Service Tag Engine
services:
- name: MyService
tagEngine:
concurrency: 5
storage:
... # Storage configuration here
This configuration creates a Worker embedding a Service Tag Engine for the MyService
service. The setConcurrency(5)
method sets the concurrency to 5, meaning it will process up to five messages at a time.
To prevent race conditions in the database, Infinitic ensures not to have 2 messages with the same tag (and service name) processed at the same time. Therefore, setting a concurrency higher than your number of tags is not beneficial, as it won't increase processing speed for messages with the same tag.
Workflow Executor
Here is the configuration to create a Workflow Executor associated to a Workflow named MyWorkflow
.
transport:
... # Transport configuration here
# Configuration of a Workflow Executor
workflows:
- name: MyWorkflow
executor:
class: example.MyWorkflowImpl
concurrency: 10
This configuration creates a Worker embedding a Workflow Executor for the MyWorkflow
workflow that will process steps of up to 10 different instances in parallel.
It's also possible to set a retry policy and a timeout for the workflow executor. But this is not recommended, as a workflow implemention is expected to be deterministic.
Workflow State Engine
Here is the configuration to create a Workflow State Engine associated to a Workflow named MyWorkflow
.
transport:
... # Transport configuration here
# Configuration of a Workflow State Engine
workflows:
- name: MyWorkflow
stateEngine:
concurrency: 10
storage:
... # Storage configuration here
This configuration creates a Worker embedding a Workflow State Engine for the MyWorkflow
workflow. The setConcurrency(10)
method sets the concurrency to 10, meaning it will process ten messages at a time.
Infinitic ensures that for a given workflow instance, only one message will be processed at a time, regardless of how many Workflow State Engines are running or their concurrency settings. This guarantees the consistency of the workflow state, and prevents race conditions in the database.
Workflow Tag Engine
Here is the configuration to create a Workflow Tag Engine associated to a Workflow named MyWorkflow
.
transport:
... # Transport configuration here
# Configuration of a Workflow Tag Engine
workflows:
- name: MyWorkflow
tagEngine:
concurrency: 5
storage:
... # Storage configuration here
This configuration creates a Worker embedding a Workflow Tag Engine for the MyWorkflow
workflow. The setConcurrency(5)
method sets the concurrency to 5, meaning it will process up to five messages at a time.
To prevent race conditions in the database, Infinitic ensures not to have 2 messages with the same tag (and workflow name) processed at the same time. Therefore, setting a concurrency higher than your number of tags is not beneficial, as it won't increase processing speed for messages with the same tag.
Event Listener
transport:
... # Transport configuration here
# Configuration of an Event Listener
eventListener:
class: example.MyEventListener
concurrency: 50
batch:
maxEvents: 100
maxSeconds: 10.0
This configuration creates a Worker embedding an Event Listener. The setConcurrency(50)
method sets the concurrency to 50, meaning it will process up to fifty batches at a time. A batch will be sent to the listener:
- after 100 received events (default is 1000)
- or 10 seconds after the first event was received (default is 1 second)
All components
Here is an example of a valid yaml configuration containing all components for a service and a workflow:
# Transport settings
transport:
pulsar:
brokerServiceUrl: pulsar://localhost:6650
webServiceUrl: http://localhost:8080
tenant: infinitic
namespace: dev
# We can define default Storage settings here to avoid repeating it
storage:
redis:
host: localhost
port: 6379
username: redis
password: myRedisPassword
# Configuration of all components for Services MyService
services:
- name: MyService
executor:
class: example.MyServiceImpl
concurrency: 10
timeoutSeconds: 100
retry:
minimumSeconds: 1
maximumSeconds: 1000
backoffCoefficient: 2
randomFactor: 0.5
maximumRetries: 11
tagEngine:
concurrency: 5
# Configuration of all components for Workflow MyWorkflow
workflows:
- name: MyWorkflow
executor:
class: example.MyWorkflowImpl
concurrency: 10
stateEngine:
concurrency: 10
tagEngine:
concurrency: 5
# Configuration of an Event Listener
eventListener:
listener:
class: example.MyEventListener
concurrency: 50
Starting a Worker
Once an Infinitic Worker is created, it can be started with the start()
method.
import io.infinitic.workers.InfiniticWorker;
public class App {
public static void main(String[] args) {
// create the worker and start it
try(InfiniticWorker worker = ...) {
worker.start();
}
}
}
import io.infinitic.workers.InfiniticWorker
fun main() {
// create the worker config
val worker = ...
// start it
worker.use { it.start() }
}