Workers

Infinitic is still in active development. Subscribe here to follow the progress.

Infinitic provides a generic worker in charge of task or workflow execution based on its configuration. The roles of workers are:

  • to listen to Pulsar for messages
  • to process task or workflow accordingly and send back the return value
  • to maintain the state of task or workflow.

Worker Instantiation

A worker can be instantiate through a configuration file (or resource by using fromConfigResource("/infinitic.yml")):

PulsarInfiniticWorker worker = PulsarInfiniticWorker.fromConfigFile("infinitic.yml");
val worker = PulsarInfiniticWorker.fromConfigFile("infinitic.yml")

Here is an example of a valid infinitic.yml file for a worker:

pulsar:
  serviceUrl: pulsar://localhost:6650
  serviceHttpUrl: http://localhost:8080
  tenant: infinitic
  namespace: dev

workflows:
  - name: example.booking.workflows.BookingWorkflow
    class: example.booking.workflows.BookingWorkflowImpl
    concurrency: 10

tasks:
  - name: example.booking.tasks.carRental.CarRentalService
    class: example.booking.tasks.carRental.CarRentalServiceFake
    concurrency: 5
  - name: example.booking.tasks.flight.FlightBookingService
    class: example.booking.tasks.flight.FlightBookingServiceFake
    concurrency: 5
  - name: example.booking.tasks.hotel.HotelBookingService
    class: example.booking.tasks.hotel.HotelBookingServiceFake
    concurrency: 5

This configuration contains a worker name (optional), the Pulsar settings, and for each task or workflow:

NameTypeDescription
namestringname of the task / workflow (its interface)
classstringname of the class to instantiate
concurrencyintegermaximum number of messages processed in parallel

When providing a worker name, this name MUST be unique among our workers and clients connected to Pulsar.

A worker must contain the code of all class described in its configuration file.

As illustrated above, a worker can handle multiple task or workflow, but we can also specialize a worker per task or per workflow. We just have to update to infinitic.yml file accordingly, for example:

pulsar:
  serviceUrl: pulsar://localhost:6650
  serviceHttpUrl: http://localhost:8080
  tenant: infinitic
  namespace: dev

tasks:
  - name: example.booking.tasks.carRental.CarRentalService
    class: example.booking.tasks.carRental.CarRentalServiceFake
    concurrency: 5

Worker Start

Just use the start()method to start listening Pulsar:

worker.start();
worker.start()

Notes:

  • The worker catches any exception sent by the class during its execution (see task failure). Use a logger to see the errors.

  • Do not start multiple workers with same configuration on one machine, but increase the concurrency settings instead.

  • Starting workers on multiple machines increases throughput and resilience

    Workers can be scaled horizontally.

Edit this page on GitHub Updated at Fri, May 28, 2021