v0.11.2

References

Pulsar

Pulsar installation

To install Pulsar, refer to the Pulsar documentation.

Infinitic can run on managed Pulsar cluster. It has been tested on StreamNative, Datastax and CleverCloud. We recommend using them if you are new to Pulsar.

Pulsar setup

Infinitic does not require specific settings, nevertheless, it's recommended to set up retention policies to avoid losing messages when workers are not connected.

We recommend using Infinitic using a dedicated Pulsar tenant and using namespaces to distinguish the production environment from dev or staging. For example, we may want to create one namespace per developer, plus one for staging and one for production.

If they do not exist already, tenant and namespace are automatically created by Infinitic workers at launch time.

Connecting to a Pulsar cluster

Infinitic clients and workers need to know how to connect to our Pulsar cluster. This is done through a pulsar entry within their configuration file.

Minimal configuration

The minimal configuration - typically needed for development - contains:

pulsar:
  brokerServiceUrl: pulsar://localhost:6650
  webServiceUrl: http://localhost:8080
  tenant: infinitic
  namespace: dev

Transport encryption

Transport Encryption using TLS can be configured with those additional parameters:

pulsar:
  ...
  useTls: true
  tlsAllowInsecureConnection: false
  tlsTrustCertsFilePath: /path/to/ca.cert.pem
  tlsEnableHostnameVerification: false

If we use a KeyStore, it can be configured with:

pulsar:
  ...
  useKeyStoreTls: true
  tlsTrustStoreType: JKS
  tlsTrustStorePath: /var/private/tls/client.truststore.jks
  tlsTrustStorePassword: clientpw

Authentication

Using Json Web Token:

pulsar:
  ...
  authentication:
    token: our_token

Using Athen:

pulsar:
  ...
  authentication:
    tenantDomain: shopping
    tenantService: some_app
    providerDomain: pulsar
    privateKey: file:///path/to/private.pem
    keyId: v1

Using OAuth2

pulsar:
  ...
  authentication:
    privateKey: file:///path/to/key/file.json
    issuerUrl: https://dev-kt-aa9ne.us.auth0.com
    audience: https://dev-kt-aa9ne.us.auth0.com/api/v2/

Default producer settings

We can provide default settings for all producers. All are optional. Pulsar default will be used if not provided.

pulsar:
  ...
  producer:
    autoUpdatePartitions: # Boolean
    autoUpdatePartitionsIntervalSeconds: # Double
    batchingMaxBytes: # Int
    batchingMaxMessages: # Int
    batchingMaxPublishDelaySeconds: # Double
    blockIfQueueFull: # Boolean (Infinitic default: true)
    compressionType: # CompressionType
    cryptoFailureAction: # ProducerCryptoFailureAction
    defaultCryptoKeyReader: # String
    encryptionKey: # String
    enableBatching: # Boolean
    enableChunking: # Boolean
    enableLazyStartPartitionedProducers: # Boolean
    enableMultiSchema: # Boolean
    hashingScheme: # HashingScheme
    messageRoutingMode: # MessageRoutingMode
    properties: # Map<String, String>
    roundRobinRouterBatchingPartitionSwitchFrequency: # Int
    sendTimeoutSeconds: # Double

Default consumer settings

We can provide default settings for all consumers. All are optional. Pulsar default will be used if not provided.

pulsar:
  ...
  consumer:
    loadConf: # Map<String, String>
    subscriptionProperties: # Map<String, String>
    ackTimeoutSeconds: # Double
    isAckReceiptEnabled: # Boolean
    ackTimeoutTickTimeSeconds: # Double
    negativeAckRedeliveryDelaySeconds: # Double
    defaultCryptoKeyReader: # String
    cryptoFailureAction: # ConsumerCryptoFailureAction
    receiverQueueSize: # Int
    acknowledgmentGroupTimeSeconds: # Double
    replicateSubscriptionState: # Boolean
    maxTotalReceiverQueueSizeAcrossPartitions: # Int
    priorityLevel: # Int
    properties: # Map<String, String>
    autoUpdatePartitions: # Boolean
    autoUpdatePartitionsIntervalSeconds: # Double
    enableBatchIndexAcknowledgment: # Boolean
    maxPendingChunkedMessage: # Int
    autoAckOldestChunkedMessageOnQueueFull: # Boolean
    expireTimeOfIncompleteChunkedMessageSeconds: # Double
    startPaused: # Boolean
    maxRedeliverCount: # Int (Infinitic default: 3)
Previous
Features

New version 0.11.2!