New version 0.13.0!

v0.13.0

Workflows

Workflow Workers

Infinitic workers can be configured to orchestrate workflows. The roles of workflow workers are:

  • to listen to Pulsar for messages intended for this workflow
  • record workflow history in database
  • dispatch tasks or sub-workflows based on the workflow definition

Workflow worker

Workflow workers are horizontally scalable: to increase throughput and resilience, just start workers on multiple servers.

Starting a Workflow worker

First, let's add the infinitic-worker dependency into our project:

Then, we can start a worker with:

We can also use .fromConfigResource("/infinitic.yml") if the configuration file is located in the resource folder.

Configuration file

Here is an example of a valid infinitic.yml file:

name: optional_worker_name

storage:
  redis:
    host: localhost
    port: 6379
    user:
    password:
    database: 0

pulsar:
  brokerServiceUrl: pulsar://localhost:6650
  webServiceUrl: http://localhost:8080
  tenant: infinitic
  namespace: dev

# (Optional) default values for workflows
workflowDefault:
  concurrency: 10
  timeoutInSeconds: 400
  retry:
    maximumRetries: 6
  checkMode: strict

workflows:
  - name: example.booking.workflows.BookingWorkflow
    class: example.booking.workflows.BookingWorkflowImpl
    concurrency: 10

This configuration defines

  • a worker name (optional)
  • which storage is used to store states
  • the Pulsar settings
  • optional default values for workflow's concurrency, timeoutInSeconds, retry and checkMode
  • the workflows

Worker name (when provided) must be unique among all our workers and clients connected to the same Pulsar namespace.

Workflows

NameTypeDescription
namestringname of the workflow (its interface per default)
classstringname of the class to instantiate
concurrencyintegermaximum number of messages processed in parallel
timeoutInSecondsdoublemaximum duration of a workflow task execution before timeout
retryRetryPolicyretry policy for the workflow tasks of this workflow
checkModeWorkflowCheckModemode used to check if a workflow is modified while still running

Any class declared in this configuration file must have an empty constructor (to be instantiable by workers).

Concurrency

Per default, workflow instances are executed one after the other for a given workflow. If we provide a value for concurrency, like:

concurrency: 50

the Workflow worker will process at most 50 workflow tasks in parallel for this service.

Whatever the concurrency value, we can have millions of workflows alive. The concurrency value describes how many workflows (at most) this worker moves one step forward at a given time.

Timeout policy

Per default, workflow tasks have a timeout of 60 seconds. Except in the case of a very long history with thousands of tasks and complex (de)/serialization, there is no reason why a workflow task should take so long.

Nevertheless - like for services - it's possible to change this behavior through the timeoutInSeconds parameter, or directly from the Workflow, through a WithTimeout interface or a @Timeout annotation

Retries policy

Per default, the workflow tasks are not retried. Indeed, since workflows' implementation must be deterministic, a retry would result in the same failure.

Nevertheless - like for services - it's possible to change this behavior through the retry parameter, or directly from the Workflow, through a WithRetry interface or a @Retry annotation.

Workflow Check Mode

The checkMode parameter lets us define how Infinitic checks that a workflow was not modified while running.

  • none: no verification is done
  • simple: verification that the current workflow execution is the same as the workflow's history, but without checking the values of tasks' parameters
  • strict: verification that the current workflow execution is the same as the workflow's history

The default value is simple. The check mode can also be defined directly from the Workflow, through a @CheckMode annotation

Storage

Redis state storage

Redis is not recommended in production, because in case of a crash, last states may not have been saved correctly on disk.

Example of a configuration for using Redis for state storage:

storage: 
  redis:
    host: 127.0.0.1
    port: 6379
    timeout: 2000
    user: 
    password: 
    database: 0
    ssl: false
    poolConfig:
      maxTotal: -1
      maxIdle: 8
      minIdle: 0

MySQL state storage

Example of a configuration for using MySQL for state storage:

storage:
  mysql:
    host: 127.0.0.1
    port: 3306
    user: root
    password: 
    database: infinitic
    maxPoolSize: 5

State compression

By default, the states of workflows are stored as uncompressed Avro binaries. To compress them and save storage space in exchange for CPU and a little time, we can add a compression option:

storage:
  compression: "deflate"
  ...

The possible options are deflate, gzip, and bzip2, and use the Apache Commons Compress algorithms.

Cache

Caffeine cache

Per default, Infinitic uses Caffeine as an in-memory cache when requesting state storage.

Here is the default configuration:

cache:
  caffeine:
    maximumSize: 10000
    expireAfterAccess: 3600
    expireAfterWrite:

No cache

Here is the configuration for removing the cache:

cache:
  none:

Workflow registration

We can register a service directly with a worker. It can be useful if we need to inject some dependencies in our service:

Logging

Exceptions are caught within workflow workers. Let's not forget to add a Log4J implementation to our project to be able to see errors.

For example, to use SimpleLogger just add the dependency in our Gradle build file:

and this simplelogger.properties example file in our resources directory:

# SLF4J's SimpleLogger configuration file
# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err.

# Uncomment this line to use a log file
#org.slf4j.simpleLogger.logFile=infinitic.log

# Default logging detail level for all instances of SimpleLogger.
# Must be one of ("trace", "debug", "info", "warn", or "error").
# If not specified, defaults to "info".
org.slf4j.simpleLogger.defaultLogLevel=warn

# Set to true if you want the current date and time to be included in output messages.
# Default is false, and will output the number of milliseconds elapsed since startup.
org.slf4j.simpleLogger.showDateTime=true

# Set to true if you want to output the current thread name.
# Defaults to true.
org.slf4j.simpleLogger.showThreadName=false


# Set to true if you want the last component of the name to be included in output messages.
# Defaults to false.
org.slf4j.simpleLogger.showShortLogName=true
Previous
Service Events