New in 0.16.2

v0.16.3

Workflow

Workflow Deployment

This guide covers three essential components for deploying Infinitic Workflows:

  1. Workflow Executors: These are stateless components that execute workflow logic. They contain the implementation of your workflow interface and determine the sequence of tasks and sub-workflows to run. When triggered by the Workflow State Engine, a Workflow Executor:

    • Replays the workflow method from the beginning
    • Uses the existing history to skip already completed steps but inject the results of these steps into the current execution
    • Determines what new steps should be executed next
    • Dispatches new tasks or sub-workflows as needed
    • Handles workflow completion and failures
  2. Workflow State Engines: These are stateless components that maintain workflow state in a database. They coordinate the overall workflow execution by:

    • Listening for events from Service and Workflow Executors
    • Recording workflow history and state changes
    • Dispatching commands to Workflow Executors to advance workflow instances
  3. Workflow Tag Engines: These are stateless components that manage workflow tags in a database. They:

    • Track relationships between a workflow tag and the workflow IDs of workflow instances that share this tag
    • Enable bulk operations on existing workflows based on a tag

Each component can be configured through either builders or YAML configuration files. The following sections detail how to set up and configure each component type.

Workflow Executors

You can setup an Infinitic Worker to run a Workflow Executor throuh builders or using a YAML configuration. Whatever the chosen method, you'll need:

  1. The transport configuration, describes how to connect to the event broker.

  2. The Workflow Executor configuration, describes how to instanciate Workflow classes for a given workflow name, and defines optional parameters such as the concurrency level, batching policy, retry policy, execution timeout.

Once an Infinitic Worker is created and configured to run a Workflow Executor, it can be started with the start() method.

Workflow Executor have the following optional configuration parameters:

  • concurrency: the number of steps that can be executed concurrently by the Workflow Executor.
  • batch: the batching policy for receiving and sending messages from and to the message broker.
  • retry: the retry policy to use when a task fails.
  • timeout: the execution timeout for a task.

Prerequisites

To build a Worker you need first to add the infinitic-worker dependency into your project:

Minimal Example Using Builders

Minimal Example Using YAML Configuration

Here is a minimal infinitic.yml configuration file to create a Workflow Executor for a Workflow MyWorkflow.

# Transport configuration for a local Pulsar
transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650/
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

# Workflow Executor configuration for workflow MyWorkflow
workflows:
  - name: MyWorkflow
    executor:
      class: example.MyWorkflowImpl

This Workflow Executor can be started with:

Note: Infinitic proposes multiple ways to create a worker from a YAML configuration:

Concurrency

By default, messages are executed sequentially, one after another, within the same Workflow Executor. However, we can increase the level of parallelism with the concurrency parameter.

With concurrency = 50, a Workflow Executor will execute up to 50 messages concurrently. If 50 messages are already running, the worker will stop consuming messages until a slot becomes available.

By design, Infinitic guarantees that only one Workflow Executor can process a given workflow instance at ta given time.

Configuration Using Builders

Configuration Using YAML

executor:
  class: example.MyWorkflowImpl
  concurrency: 50

Batching (Beta)

Batching refers to the process of grouping multiple messages together into a single batch:

  • while receiving messages from the broker;
  • while processing messages;
  • while sending messages to the broker.

This technique improves efficiency and reduces latency, especially for high-throughput applications, by reducing the number of network calls required.

Batching can be configured with 2 parameters:

  • maxMessages (int): the maximal number of messages in a batch.
  • maxSeconds (double): the maximal duration of a batch in seconds.

When batching, the concurrency settings indicates how many batches are processed in parallel.

Configuration Using Builders

Configuration Using YAML

executor:
  class: example.MyWorkflowImpl
  batch: 
    maxMessages: 1000
    maxSeconds: 0.1

Retry Policy

It's possible to set a retry policy for the workflow executor, in the same way as for the Service Executor. But this is not recommended, as a workflow implemention is expected to be deterministic ans should not have transiant errors.

Execution Timeout

It's possible to set an execution timeout for the workflow executor, in the same way as for the Service Executor. But this is not recommended, as a workflow processing is expected to be short.

Workflow State Engines

You can setup an Infinitic Worker to run a Workflow State Engine throuh builders or using a YAML configuration. Whatever the chosen method, you'll need:

  1. The transport configuration, describes how to connect to the event broker.

  2. The Workflow State Engine configuration, describes where to store the state of workflows, and defines optional parameters.

Once an Infinitic Worker is created and configured to run a Workflow State Engine, it can be started with the start() method.

Workflow State Engine have the following configuration parameters:

  • storage: the storage configuration to use to store the state of workflows.
  • batch: the batching policy when receiving and sending messages from and to the message broker.

Minimal Example Using Builders

Here is a minimal configuration to create and start a Workflow Tag Engine associated to a Workflow MyWorkflow.

Minimal Example Using YAML

Here is a minimal infinitic.yml configuration file to create a Workflow State Engine associated to a Workflow MyWorkflow.

# Transport configuration for a local Pulsar
transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650/
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

# Workflow State Engine Configuration for workflow MyWorkflow
workflows:
  - name: MyWorkflow
    stateEngine:
      storage:
        postgres:
          host: localhost
          port: 5432
          username: postgres
          password: ********

The Infinitic Worker embedding this Workflow State Engine can be created and started with:

Storage

Configuring a storage is mandatory to run a Workflow State Engine. See Storage for more details.

Concurrency

By default, messages are executed sequentially, one after another, within the same Workflow State Engine. However, we can increase the level of parallelism with the concurrency parameter.

With concurrency = 10, a Workflow State Engine will execute up to 10 messages concurrently. If 10 messages are already running, the worker will stop consuming messages until a slot becomes available.

By design, Infinitic guarantees that only one Workflow State Engine can process a given workflow instance at a given time.

Configuration Using Builders

Configuration Using YAML

workflows:
  - name: MyWorkflow
    stateEngine:
      concurrency: 10
      storage:
        # storage configuration

Batching (Beta)

Batching refers to the process of grouping multiple messages together into a single batch:

  • while receiving messages from the broker;
  • while processing messages;
  • while sending messages to the broker.

This technique improves efficiency and reduces latency, especially for high-throughput applications, by reducing the number of network calls required.

Batching can be configured with 2 parameters:

  • maxMessages (int): the maximal number of messages in a batch.
  • maxSeconds (double): the maximal duration of a batch in seconds.

When batching, the concurrency settings indicates how many batches are processed in parallel.

Configuration Using Builders

Configuration Using YAML

stateEngine:
  batch: 
    maxMessages: 1000
    maxSeconds: 0.1
  storage:
    # storage configuration

Workflow Tag Engines

You can setup an Infinitic Worker to run a Workflow Tag Engine throuh builders or using a YAML configuration. Whatever the chosen method, you'll need:

  1. The transport configuration, describes how to connect to the event broker.

  2. The Workflow Tag Engine configuration, describes where to store the relationship between workflow IDs and workflow tags, and defines optional parameters.

Once an Infinitic Worker is created and configured to run a Workflow Tag Engine, it can be started with the start() method.

Workflow Tag Engine have the following configuration parameters:

  • storage: the storage configuration to use to store the relationship between workflow IDs and workflow tags.
  • batch: the batching policy when receiving and sending messages from and to the message broker.

Minimal Example Using Builders

Here is a minimal configuration to create and start a Workflow Tag Engine associated to a Workflow MyWorkflow.

Minimal Example Using YAML

Here is a minimal infinitic.yml configuration file to create a Workflow Tag Engine associated to a Workflow MyWorkflow.

# Transport configuration for a local Pulsar
transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650/
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

# Workflow Tag Engine Configuration for workflow MyWorkflow
workflows:
  - name: MyWorkflow
    tagEngine:
      storage:
        postgres:
          host: localhost
          port: 5432
          username: postgres
          password: ********

The Infinitic Worker embedding this Workflow Tag Engine can be created and started with:

Storage

Configuring a storage is mandatory to run a Service Tag Engine, to store the relationship between task IDs and task tags. See Storage for more details.

Concurrency

By default, messages are executed sequentially, one after another, within the same Workflow Tag Engine. However, we can increase the level of parallelism with the concurrency parameter.

With concurrency = 10, a Workflow Tag Engine will execute up to 10 messages concurrently. If 10 messages are already running, the worker will stop consuming messages until a slot becomes available.

By design, Infinitic guarantees that only one Workflow Tag Engine can process a message for a specific workflow tag at a given time.

Configuration Using Builders

Configuration Using YAML

tagEngine:
  concurrency: 10
  storage:
    # storage configuration

Batching (Beta)

Batching refers to the process of grouping multiple messages together into a single batch:

  • while receiving messages from the broker;
  • while processing messages;
  • while sending messages to the broker.

This technique improves efficiency and reduces latency, especially for high-throughput applications, by reducing the number of network calls required.

Batching can be configured with 2 parameters:

  • maxMessages (int): the maximal number of messages in a batch.
  • maxSeconds (double): the maximal duration of a batch in seconds.

When batching, the concurrency settings indicates how many batches are processed in parallel.

Configuration Using Builders

Configuration Using YAML

tagEngine:
  batch: 
    maxMessages: 1000
    maxSeconds: 0.1
  storage:
    # storage configuration

Mixing Components

It's possible to mix different components from different services or workflows in a single Infinitic Worker:

This capability is useful during development to have a single worker running all components. But in production, we recommend having one worker per component.

Example Using Builders

Example Using YAML

# Transport settings
transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

# We can define default Storage settings here to avoid repeating it
storage:
  redis:
    host: localhost
    port: 6379
    username: redis
    password: myRedisPassword

# Configuration of all components for Workflow MyWorkflow
workflows:
  - name: MyWorkflow
    executor:
      class: example.MyWorkflowImpl
      concurrency: 10
    stateEngine:
      concurrency: 10
    tagEngine:
      concurrency: 5
Previous
Implementation