References
Pulsar
Pulsar installation
To install Pulsar, refer to the Pulsar documentation.
Infinitic can run on managed Pulsar cluster. It has been tested on StreamNative, Datastax and CleverCloud. We recommend using them if you are new to Pulsar.
Pulsar setup
Infinitic does not require specific settings, nevertheless, it's recommended to set up retention policies to avoid losing messages when workers are not connected.
We recommend using Infinitic using a dedicated Pulsar tenant and using namespaces to distinguish the production environment from dev or staging. For example, we may want to create one namespace per developer, plus one for staging and one for production.
If they do not exist already, tenant and namespace are automatically created by Infinitic workers at launch time.
Connecting to a Pulsar cluster
Infinitic clients and workers need to know how to connect to our Pulsar cluster. This is done through a pulsar
entry within their configuration file.
Minimal configuration
The minimal configuration - typically needed for development - contains:
pulsar:
brokerServiceUrl: pulsar://localhost:6650
webServiceUrl: http://localhost:8080
tenant: infinitic
namespace: dev
Transport encryption
Transport Encryption using TLS can be configured with those additional parameters:
pulsar:
...
useTls: true
tlsAllowInsecureConnection: false
tlsTrustCertsFilePath: /path/to/ca.cert.pem
tlsEnableHostnameVerification: false
If we use a KeyStore, it can be configured with:
pulsar:
...
useKeyStoreTls: true
tlsTrustStoreType: JKS
tlsTrustStorePath: /var/private/tls/client.truststore.jks
tlsTrustStorePassword: clientpw
Authentication
Using Json Web Token:
pulsar:
...
authentication:
token: our_token
Using Athen:
pulsar:
...
authentication:
tenantDomain: shopping
tenantService: some_app
providerDomain: pulsar
privateKey: file:///path/to/private.pem
keyId: v1
Using OAuth2
pulsar:
...
authentication:
privateKey: file:///path/to/key/file.json
issuerUrl: https://dev-kt-aa9ne.us.auth0.com
audience: https://dev-kt-aa9ne.us.auth0.com/api/v2/
Default producer settings
We can provide default settings for all producers. All are optional. Pulsar default will be used if not provided.
pulsar:
...
producer:
autoUpdatePartitions: # Boolean
autoUpdatePartitionsIntervalSeconds: # Double
batchingMaxBytes: # Int
batchingMaxMessages: # Int
batchingMaxPublishDelaySeconds: # Double
blockIfQueueFull: # Boolean (Infinitic default: true)
compressionType: # CompressionType
cryptoFailureAction: # ProducerCryptoFailureAction
defaultCryptoKeyReader: # String
encryptionKey: # String
enableBatching: # Boolean
enableChunking: # Boolean
enableLazyStartPartitionedProducers: # Boolean
enableMultiSchema: # Boolean
hashingScheme: # HashingScheme
messageRoutingMode: # MessageRoutingMode
properties: # Map<String, String>
roundRobinRouterBatchingPartitionSwitchFrequency: # Int
sendTimeoutSeconds: # Double
Default consumer settings
We can provide default settings for all consumers. All are optional. Pulsar default will be used if not provided.
pulsar:
...
consumer:
loadConf: # Map<String, String>
subscriptionProperties: # Map<String, String>
ackTimeoutSeconds: # Double
isAckReceiptEnabled: # Boolean
ackTimeoutTickTimeSeconds: # Double
negativeAckRedeliveryDelaySeconds: # Double
defaultCryptoKeyReader: # String
cryptoFailureAction: # ConsumerCryptoFailureAction
receiverQueueSize: # Int
acknowledgmentGroupTimeSeconds: # Double
replicateSubscriptionState: # Boolean
maxTotalReceiverQueueSizeAcrossPartitions: # Int
priorityLevel: # Int
properties: # Map<String, String>
autoUpdatePartitions: # Boolean
autoUpdatePartitionsIntervalSeconds: # Double
enableBatchIndexAcknowledgment: # Boolean
maxPendingChunkedMessage: # Int
autoAckOldestChunkedMessageOnQueueFull: # Boolean
expireTimeOfIncompleteChunkedMessageSeconds: # Double
startPaused: # Boolean
maxRedeliverCount: # Int (Infinitic default: 3)