References
Storage
Storage is used to store the state of the workflow, the relationship between workflow instances and tags, and task instances and tags.
Here are the different minimal configurations for the databases. Look at the builders' methods for more details.
Databases
Redis
Mandatory Parameters
host
: The hostname of the Redis serverport
: The port number the Redis server is listening onusername
: The username to connect to the Redis serverpassword
: The password to connect to the Redis server
Configuration using a builder:
StorageConfig storageConfig = RedisStorageConfig.builder()
.setHost("localhost")
.setUsername("redis")
.setPassword("********")
.setPort(6379)
.build();
val storageConfig = RedisStorageConfig.builder()
.setHost("localhost")
.setPort(6379)
.setUsername("redis")
.setPassword("********")
.build()
Configuration using a YAML configuration:
storage:
redis:
host: localhost
port: 6379
username: redis
password: ********
Optional Parameters
If you need you can use the following optional parameters:
compression
(default: none): Enable compression of data stored in Redis to reduce storage space and network bandwidthcache
(default: none): Enable caching mechanism to improve read performancedatabase
: The Redis database number to usetimeout
: Connection timeout in millisecondsssl
: Whether to use SSL/TLS for the connectionpoolConfig
: Configuration for the Redis connection poolmaxTotal
(default: -1): Maximum number of connections that can be allocated by the poolmaxIdle
(default: 8): Maximum number of idle connections in the poolminIdle
(default: 0): Minimum number of idle connections to maintain in the pool
Infinitic uses the Jedis connection pool under the hood to manage database connections efficiently. If the optional parameters above are not specified, Jedis's default values will be used. You can refer to Jedis's documentation for more details about these parameters.
Configuration using a builder:
StorageConfig storageConfig = MySQLStorageConfig.builder()
.setCompression(CompressionConfig.gzip)
.setCache(
CacheConfig.builder()
.setMaximumSize(10000L)
.setExpireAfterAccess(3600L)
.setExpireAfterWrite(3600L)
.build()
)
.setHost("localhost")
.setPort(6379)
.setUsername("redis")
.setPassword("********")
.setDatabase(0)
.setTimeout(2000)
.setSsl(true)
.setPoolConfig(
PoolConfig.builder()
.setMaxTotal(-1)
.setMaxIdle(8)
.setMinIdle(0)
.build()
)
.build();
val storageConfig = MySQLStorageConfig.builder()
.setCompression(CompressionConfig.gzip)
.setCache(
CacheConfig.builder()
.setMaximumSize(10000L)
.setExpireAfterAccess(3600L)
.setExpireAfterWrite(3600L)
.build()
)
.setHost("localhost")
.setPort(6379)
.setUsername("redis")
.setPassword("********")
.setDatabase(0)
.setTimeout(2000)
.setSsl(true)
.setPoolConfig(
PoolConfig.builder()
.setMaxTotal(-1)
.setMaxIdle(8)
.setMinIdle(0)
.build()
)
.build()
Configuration using a YAML configuration:
storage:
compression: gzip
cache:
maximumSize: 10000
expireAfterAccess: 3600
expireAfterWrite: 3600
redis:
host: localhost
port: 6379
username: redis
password: ********
database: 0
timeout: 2000
ssl: true
poolConfig:
maxTotal: -1
maxIdle: 8
minIdle: 0
Postgres
Mandatory Parameters
host
: The hostname of the Postgres serverport
: The port number the Postgres server is listening onusername
: The username to connect to the Postgres serverpassword
: The password to connect to the Postgres server
Configuration using a builder:
StorageConfig storageConfig = PostgresStorageConfig.builder()
.setHost("localhost")
.setPort(5432)
.setUsername("postgres")
.setPassword("********")
.build();
val storageConfig = PostgresStorageConfig.builder()
.setHost("localhost")
.setPort(5432)
.setUsername("postgres")
.setPassword("********")
.build()
Configuration using a YAML configuration:
storage:
postgres:
host: localhost
port: 5432
username: postgres
password: ********
Optional Parameters
If you need you can use the following optional parameters:
compression
(default: none): Enable compression of data stored in Redis to reduce storage space and network bandwidthcache
(default: none): Enable caching mechanism to improve read performancedatabase
(default: "postgres"): The name of the database to useschema
(default: "infinitic"): The name of the schema to usekeySetTable
(default: "key_set_storage"): The name of the table that stores key setskeyValueTable
(default: "key_value_storage"): The name of the table that stores key-value pairsmaximumPoolSize
: Maximum size of the connection poolminimumIdle
: Minimum number of idle connections in the poolidleTimeout
: Maximum amount of time in milliseconds that a connection can remain idleconnectionTimeout
: Maximum time in milliseconds to wait for a connection from the poolmaxLifetime
: Maximum lifetime of a connection in milliseconds
Infinitic uses the HikariCP connection pool under the hood to manage database connections efficiently. If the optional parameters above are not specified, HikariCP's default values will be used. You can refer to HikariCP's documentation for more details about these parameters.
Configuration using a builder:
StorageConfig storageConfig = MySQLStorageConfig.builder()
.setCompression(CompressionConfig.gzip)
.setCache(
CacheConfig.builder()
.setMaximumSize(10000L)
.setExpireAfterAccess(3600L)
.setExpireAfterWrite(3600L)
.build()
)
.setHost("localhost")
.setPort(5432)
.setUsername("postgres")
.setPassword("********")
.setDatabase("postgres")
.setSchema("infinitic")
.setKeySetTable("key_set_storage")
.setKeyValueTable("key_value_storage")
.setMaximumPoolSize(10)
.setMinimumIdle(10)
.setIdleTimeout(600000L)
.setConnectionTimeout(30000L)
.setMaxLifeTime(1800000L)
.build();
val storageConfig = MySQLStorageConfig.builder()
.setCompression(CompressionConfig.gzip)
.setCache(
CacheConfig.builder()
.setMaximumSize(10000L)
.setExpireAfterAccess(3600L)
.setExpireAfterWrite(3600L)
.build()
)
.setHost("localhost")
.setPort(5432)
.setUsername("postgres")
.setPassword("********")
.setDatabase("postgres")
.setSchema("infinitic")
.setKeySetTable("key_set_storage")
.setKeyValueTable("key_value_storage")
.setMaximumPoolSize(10)
.setMinimumIdle(10)
.setIdleTimeout(600000L)
.setConnectionTimeout(30000L)
.setMaxLifeTime(1800000L)
.build()
Configuration using a YAML configuration:
storage:
compression: gzip
cache:
maximumSize: 10000
expireAfterAccess: 3600
expireAfterWrite: 3600
postgres:
host: localhost
port: 5432
username: postgres
password: ********
database: postgres
schema: infinitic
keySetTable: key_set_storage
keyValueTable: key_value_storage
maximumPoolSize: 10
minimumIdle: 10
idleTimeout: 600000
connectionTimeout: 30000
maxLifetime: 1800000
MySQL
Mandatory Parameters
host
: The hostname of the MySQL serverport
: The port number the MySQL server is listening onusername
: The username to connect to the MySQL serverpassword
: The password to connect to the MySQL server
Configuration using a builder:
StorageConfig storageConfig = MySQLStorageConfig.builder()
.setHost("localhost")
.setPort(3306)
.setUsername("root")
.setPassword("********")
.build();
val storageConfig = MySQLStorageConfig.builder()
.setHost("localhost")
.setPort(3306)
.setUsername("root")
.setPassword("********")
.build()
Configuration using a YAML configuration:
storage:
mysql:
host: localhost
port: 3306
username: root
password: ********
Optional Parameters
If you need you can use the following optional parameters:
compression
(default: none): Enable compression of data stored in Redis to reduce storage space and network bandwidthcache
(default: none): Enable caching mechanism to improve read performancedatabase
(default: "infinitic"): The name of the database to usekeySetTable
(default: "key_set_storage"): The name of the table that stores key setskeyValueTable
(default: "key_value_storage"): The name of the table that stores key-value pairsmaximumPoolSize
: Maximum size of the connection poolminimumIdle
: Minimum number of idle connections in the poolidleTimeout
: Maximum amount of time in milliseconds that a connection can remain idleconnectionTimeout
: Maximum time in milliseconds to wait for a connection from the poolmaxLifetime
: Maximum lifetime of a connection in milliseconds
Infinitic uses the HikariCP connection pool under the hood to manage database connections efficiently. If the optional parameters above are not specified, HikariCP's default values will be used. You can refer to HikariCP's documentation for more details about these parameters.
Configuration using a builder:
StorageConfig storageConfig = MySQLStorageConfig.builder()
.setCompression(CompressionConfig.gzip)
.setCache(
CacheConfig.builder()
.setMaximumSize(10000L)
.setExpireAfterAccess(3600L)
.setExpireAfterWrite(3600L)
.build()
)
.setHost("localhost")
.setPort(3306)
.setUsername("root")
.setPassword("********")
.setDatabase("infinitic")
.setKeySetTable("key_set_storage")
.setKeyValueTable("key_value_storage")
.setMaximumPoolSize(10)
.setMinimumIdle(10)
.setIdleTimeout(600000L)
.setConnectionTimeout(30000L)
.setMaxLifeTime(1800000L)
.build();
val storageConfig = MySQLStorageConfig.builder()
.setCompression(CompressionConfig.gzip)
.setCache(
CacheConfig.builder()
.setMaximumSize(10000L)
.setExpireAfterAccess(3600L)
.setExpireAfterWrite(3600L)
.build()
)
.setHost("localhost")
.setPort(3306)
.setUsername("root")
.setPassword("********")
.setDatabase("infinitic")
.setKeySetTable("key_set_storage")
.setKeyValueTable("key_value_storage")
.setMaximumPoolSize(10)
.setMinimumIdle(10)
.setIdleTimeout(600000L)
.setConnectionTimeout(30000L)
.setMaxLifeTime(1800000L)
.build()
Configuration using a YAML configuration:
storage:
compression: gzip
cache:
maximumSize: 10000
expireAfterAccess: 3600
expireAfterWrite: 3600
mysql:
host: localhost
port: 3306
username: root
password: ********
database: infinitic
keySetTable: key_set_storage
keyValueTable: key_value_storage
maximumPoolSize: 10
minimumIdle: 10
idleTimeout: 600000
connectionTimeout: 30000
maxLifetime: 1800000
In Memory
Using a builder:
StorageConfig storageConfig = InMemoryConfig();
val storageConfig = InMemoryConfig();
Or using a YAML configuration:
storage:
inMemory:
This storage should only be used for testing purposes, as it does not persist any data.
Compression
Compression can be used to reduce the size of data stored in the database, which can help:
- Reduce storage costs
- Reduce network bandwidth usage
- Improve overall performance in some cases where network is the bottleneck
However, compression comes with a CPU overhead as data needs to be compressed before being stored and decompressed when retrieved. You should benchmark your specific use case to determine if compression provides a net benefit.
Infinitic uses Apache Commons Compress and support the following compression algorithms:
- gzip
- bzip2
- deflate
You can change the compression settings at any time - Infinitic will automatically detect and decompress data using the compression algorithm that was used when the data was originally stored. This means you can safely switch between different compression algorithms or disable compression entirely without losing access to previously stored data.
Cache
Caching can significantly improve read performance by storing frequently accessed data in memory, reducing the need to query the database repeatedly. This can help:
- Reduce database load
- Improve response times for frequently accessed data
- Reduce costs associated with database operations
The cache implementation uses Caffeine, a high-performance Java caching library. The cache can be configured with:
maximumSize
: Maximum number of entries the cache may containexpireAfterAccess
: Duration after which an entry should be automatically removed from the cache if it hasn't been accessedexpireAfterWrite
: Duration after which an entry should be automatically removed from the cache after it was last written
Potential issue
When deploying multiple workers, caching works effectively because messages for a specific workflow instance are always routed to the same worker. However, there is a potential issue to be aware of when starting and stopping workers in quick succession.
Let's consider this scenario:
- Worker 1 is handling messages for Workflow A and has cached its state
- Worker 2 starts up and starts processing messages previously handled by Worker 1, including those of Workflow A
- This works fine because Worker 2 has no cached state and will fetch from the database
- Worker 2 is quickly shut down
- Messages for Workflow A return to Worker 1
- This can cause issues if Worker 1 still has outdated state in its cache
When using caching you should allow workers to run for at least the full cache duration before shutting them down.