Service Workers

Infinitic provides a generic worker that executes tasks or workflows depending on its configuration. When configured to run a service, a worker will:

  • listen to Pulsar for messages intended for this service
  • deserialize parameters and process the requested task
  • serialize the return value and send it back.

Service workers

Service workers are horizontally scalable: to increase throughput and resilience, just start workers on multiple servers.

Service workers also catch any thrown exception to automatically retry the task (see task failure).

Starting a Service worker

First, let's add the infinitic-worker dependency into our project:

Then, we can start a worker with:

We can also use .fromConfigResource("/infinitic.yml") if the configuration file is located in the resource folder.

Configuration file

Here is an example of a valid infinitic.yml file:

# (Optional) Worker name
name: gilles_worker

# Pulsar settings
  brokerServiceUrl: pulsar://localhost:6650
  webServiceUrl: http://localhost:8080
  tenant: infinitic
  namespace: dev

# (Optional) default values for services
  concurrency: 10
  timeoutInSeconds: 400
    maximumRetries: 6

# Services definition
  - name: example.booking.services.carRental.CarRentalService
    class: example.booking.services.carRental.CarRentalServiceImpl
    concurrency: 5
    timeoutInSeconds: 100
  - name: example.booking.services.flight.FlightBookingService
    class: example.booking.services.flight.FlightBookingServiceImpl
  - name: example.booking.services.hotel.HotelBookingService
    class: example.booking.services.hotel.HotelBookingServiceImpl

This configuration contains

  • an optional worker name
  • optional default values for service's concurrency, timeoutInSeconds and retry
  • the Pulsar settings
  • the description of services:
namestringname of the service (its interface per default)
classstringname of the class to instantiate
concurrencyintegernumber of tasks executed in parallel1
timeoutInSecondsdoublemaximum duration of a task execution before timeoutnull
retryRetryPolicyretry policy for the tasks of this servicecf. below

Worker name (when provided) must be unique among all our workers and clients connected to the same Pulsar namespace.

Any class declared in this configuration file must have an empty constructor (to be instantiable by workers).


Per default, tasks are executed one after the other for a given service. If we provide a value for concurrency, like:

concurrency: 50

the Service worker will execute at most 50 tasks in parallel for this service.

Timeout policy

Per default, tasks have no timeout defined. If we provide a value for timeoutInSeconds:

timeoutInSeconds: 100

the Service worker will throw an io.infinitic.exceptions.tasks.TimeoutException if the task is still executing after timeoutInSeconds seconds.

The task will be then retried - or not - based on its retry policy.

The timeout can also be defined directly from the Service, through a WithTimeout interface or @Timeout annotation

Retries policy

The retry parameter lets us define a truncated randomized exponential backoff retry policy. If none is provided, this default setting is applied:

  minimumSeconds: 1      
  maximumSeconds: 1000   # default = 1000 * minimumSeconds
  backoffCoefficient: 2  
  randomFactor: 0.5     
  maximumRetries: 11    
    - # name of an first exception to ignore
    - # name of an second exception to ignore
    - # name of an third exception to ignore

If an exception - not listed in ignoredExceptions - is thrown during the task execution, and if maximumRetries is not reached yet, then the task will be retried after (seconds):

min(maximumSeconds, minimumSeconds * (backoffCoefficient ^ attempt)) * 
  (1 + randomFactor * (2 * random() - 1))

where random() is a random value between 0 and 1.

If we do not want any retries, the simplest configuration is:

  maximumRetries: 0

The retry policy can also be defined directly from the Service, through a WithRetry interface or @Retry annotation

Service registration

We can register a service directly with a worker. It can be useful if you need to inject some dependencies in our service:

(cf. withTimeout and withRetry)


Exceptions are caught within service workers. Let's not forget to add a Log4J implementation to our project to be able to see errors.

For example, to use SimpleLogger just add the dependency in our Gradle build file:

and this simplelogger.properties example file in our resources directory:

# SLF4J's SimpleLogger configuration file
# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err.

# Uncomment this line to use a log file

# Default logging detail level for all instances of SimpleLogger.
# Must be one of ("trace", "debug", "info", "warn", or "error").
# If not specified, defaults to "info".

# Set to true if you want the current date and time to be included in output messages.
# Default is false, and will output the number of milliseconds elapsed since startup.

# Set to true if you want to output the current thread name.
# Defaults to true.

# Set to true if you want the last component of the name to be included in output messages.
# Defaults to false.
Cancel Running Workflows

New version 0.11.2!