SDK overview

Infinitic is still in active development. Subscribe here to follow its progress.

The abstract class io.infinitic.workflows.Workflow exposes a set of functions useful to code workflows:

  • dispatch a new task with newTask and dispatch
  • dispatch a child-workflow with newWorkflow and dispatch
  • inline simple task with inline
  • receive signal with channel
  • manage time with timer
  • interacting with other workflows with getWorkflowById or getWorkflowByTag

Dispatch a new task

Workflows only needs to know the interface of a remote service to be able to use it.

By using the newTask function on the service interface, we create a stub that behaves syntactically as an instance of the remote service, but actually sends a message to Pulsar that will trigger the remote execution of the service.

Each call of a method will dispatch a new task. For example:

public class MyWorkflow extends Workflow implements MyWorkflowInterface {
    // create a stub of ServiceInterface
    private final ServiceInterface service = newTask(ServiceInterface.class);

    @Override
    public String start(String name, String email) {
        // the code below triggers 2 calls of `ServiceInterface::handle(name, email)`,
        // expecting a boolean return type.
        // both tasks will be processed in parallel as the first one is dispatched without waiting

        // dispatching a new task without waiting for the result
        Deferred<Boolean> deferred = dispatch(service::handle, name, email);

        // dispatching a new task and wait for its result
        Boolean result2 = service.handle(name, email);

        // wait and get result of the first call
        Boolean result1 = deferred.await();
    }
}
class MyWorkflow : Workflow(), MyWorkflowInterface {
    // create a stub of ServiceInterface
    private val service : ServiceInterface = newTask(ServiceInterface::class.java)

    override fun start(name: String, email: String): String {
        // the code below triggers 2 calls of `ServiceInterface::handle(name, email)`,
        // expecting a boolean return type.
        // both tasks will be processed in parallel as the first one is dispatched without waiting

        // dispatching a new task without waiting for the result
        val deferred: Deferred<Boolean> = dispatch(service::handle, name, email)

        // dispatching a new task and wait for its result
        val result2: Boolean = service.handle(name, email)

        // wait and get result of the first call
        val result1: Boolean = deferred.await()
    }
}

newTask stubs can to be defined only once. You can use it multiple times to dispatch multiple new tasks.

If the return type of the task is void, we need to use dispatchVoid function instead of dispatch.

We can also add tags to this stub. If we do that, every task dispatched with it will be tagged as well. It's very useful to target later this instance by tag:

Set<String> tags = new HashSet<>();
tags.add("foo");
tags.add("bar");

Service service = client.newTask(Service.class, tags);
val service: Service = newTask(Service::class.java, tags = setOf("foo", "bar"))

Dispatch a child-workflow

By using the newWorkflow function on a workflow interface, we create a stub that behaves syntactically as an instance of the workflow, but actually sends a message to Pulsar that will trigger the remote execution of the workflow.

Each call of a method will dispatch a new child-workflow. For example:

public class MyWorkflowImpl extends Workflow implements MyWorkflowInterface {
    // create a stub of OtherWorkflowInterface
    private final OtherWorkflowInterface otherWorkflow = newWorkflow(OtherWorkflowInterface.class);

    @Override
    public String start(String name, UUID userId) {
        // the code below triggers 2 calls of `OtherWorkflowInterface::start(name, userId)`,
        // expecting a boolean return type.
        // both workflows will be processed in parallel as the first one is dispatched without waiting

        // dispatching a new workflow without waiting for the result
        Deferred<Boolean> deferred = dispatch(otherWorkflow::start, name, userId);

        // dispatching a new workflow and wait for its result
        Boolean result2 = otherWorkflow.start(name, userId);

        // get result of the first workflow
        Boolean result1 = deferred.await();
    }
}
class MyWorkflowImpl : Workflow(), MyWorkflowInterface {
    // create a stub of OtherWorkflowInterface
    private val otherWorkflow: OtherWorkflowInterface = newWorkflow(OtherWorkflowInterface::class.java)

    override fun start(name: String, UUID userId): String {
        // the code below triggers 2 calls of `OtherWorkflowInterface::start(name, userId)`,
        // expecting a boolean return type.
        // both workflows will be processed in parallel as the first one is dispatched without waiting

        // dispatching a new workflow without waiting for the result
        val deferred: Deferred<Boolean> = dispatch(otherWorkflow::start, name, userId)

        // dispatching a new workflow and wait for its result
        val result2: Boolean = otherWorkflow.start(name, userId)

        // get result of the first task
        val result1: Boolean = deferred.await()
    }
}

newWorkflow stubs can to be defined only once. You can use it multiple times to dispatch multiple new workflows.

If the return type of the method used is void, we need to use dispatchVoid function instead of dispatch.

We can also add tags to this stub. If we do that, every workflow dispatched with it will be tagged as well. It's very useful to target later this instance by tag:

Set<String> tags = new HashSet<>();
tags.add("foo");
tags.add("bar");

OtherWorkflow otherWorkflow = newWorkflow(OtherWorkflow.class, tags);
val otherWorkflow: OtherWorkflow = newWorkflow(OtherWorkflow::class.java, setOf("foo", "bar"))

Inline task

As described here, any non-deterministic instructions, or instructions with side-effect, should be in tasks, not in workflows. For very simple instructions, it can be frustrating to write such simple tasks. For those cases, we can use inline tasks:

// get (non-determistic) current date
Date now = inline(() -> new Date());

// get (non-determistic) env variable
String home = inline(() -> System.getEnv("JAVA_HOME"));

// display (side-effect)
inlineVoid(() -> System.out.println("log"));
// get (non-determistic) current date
val now = inline { Date() }

// get (non-determistic) env variable
val home = inline { System.getEnv("JAVA_HOME") }

// display (side-effect)
inline { println("log") }

If the return type of the lambda describing the inline task is void, we need to use inlineVoid function instead of inline.

Receive signal

Workflow can receive signals from "outside". Signals are typed and are sent through "channels". The workflow interface must have a getter method returning a SendChannel<Type>. For example:

public interface Process {
    ...
    SendChannel<Boolean> getDecisionChannel();
}
interface Process {
    val decisionChannel: SendChannel<Boolean>
}

Workflows implement channels with the channel function:

public class ProcessImpl extends Workflow implements Process {
    // create typed channel
    final private Channel<Boolean> decisionChannel = channel();

    // channel getter
    @Override
    public Channel<Boolean> getDecisionChannel() { return decisionChannel; }

    @Override
    public void start() {
        // the workflow can asynchrounously receive a signal as soon as receive() is applied
        Deferred<Boolean> deferredDecision = decisionChannel.receive();
        ...
        // wait for the decision
        if(deferredDecision.await()) {
            ...
        } else {
            ...
        }
    }
}
class ProcessImpl : Workflow(), Process {
    // create typed channel
    override val decisionChannel = channel<Boolean>()

    override fun start() {
        // the workflow can asynchrounously receive a signal as soon as receive() is applied
        val deferredDecision: Deferred<Boolean> = decisionChannel.receive()
        ...
        // wait for the decision
        when(deferredDecision.await()) {
            true -> ...
            false -> ...
        }
    }
}

Channels can be of any serializable type.

Per default, a signal sent to a running workflow is discarded. Before a workflow can receive a signal, it must first declare that it is waiting for it using the receive method on the channel.

Manage time

Time can be managed using the timer function. A call to the timer function creates a Deferred<Instant> that will complete at the provided time:

// create a timer that will complete in 60 seconds
Deferred<Instant> timer = timer(Duration.ofSeconds(60));

...

// the workflow will stop here until the timer completion
timer.await();
// create a timer that will complete in 60 seconds
timer(Duration.ofHours(48))

...

// the workflow will stop here until the timer completion
timer.await()

We can also target a specific Instant:

// use inline because `LocalDate.now()` is non-deterministic
Instant mondayAt8 = inline(() ->
    LocalDate.now()
        .with(TemporalAdjusters.nextOrSame(DayOfWeek.MONDAY))
        .atTime(8,0)
        .toInstant(ZoneOffset.UTC)
);
// create a timer that will complete next monday at 8:00UTC
Deferred<Instant> timer = timer(mondayAt8);

...

// the workflow will stop here until the timer completion
timer.await();
// use inline because `LocalDate.now()` is non-deterministic
val mondayAt8 = inline {
    LocalDate.now()
        .with(TemporalAdjusters.nextOrSame(DayOfWeek.MONDAY))
        .atTime(8,0)
        .toInstant(ZoneOffset.UTC)
}
// create a timer that will complete next monday at 8:00UTC
val timer = timer(mondayAt8)

...

// the workflow will stop here until the timer completion
timer.await()

When a workflow is waiting, no resources are consumed. Internally, a delayed Pulsar message is sent to wake up the workflow when the time is right.

Interacting with other workflows

It's possible to interact with another running workflow from a workflow. To do so, we create the stub of a running workflow from its id:

HelloWorld helloworld = getWorkflowById(HelloWorld.class, id);
val helloworld: HelloWorld = getWorkflowById(HelloWorld::class.java, id)

Alternatively, we can create a stub targeting all running workflow having a given tag:

HelloWorld helloworld = getWorkflowByTag(HelloWorld.class, "foo");
val helloworld: HelloWorld = getWorkflowByTag(HelloWorld::class.java, tag = "foo")

Using this stub, we can:

  • send a signal to it
  • start another method in parallel
  • get current properties

Sending a signal to another workflow

Once you have the stub of a running workflow, we can easily send a typed signal to it:

// create stub of a running Process workflow
Process Process = getWorkflowById(Process.class, id);

// send a signal to this running workflow through a channnel
process.getDecisionChannel().send(true);
// create stub of a running Process workflow
val process: Process = getWorkflowById(Process::class.java, id)

// send a signal to this running workflow through a channnel
process.decisionChannel.send(true)

If we target a running workflow by tag, the event will be sent to all running workflows with this tag:

// create stub of all running Process workflows with this tag
Process Process = getWorkflowByTag(Process.class, tag);

// send a signal to all those workflows through their channnel
process.getDecisionChannel().send(true);
// create stub of all running Process workflows with this tag
val process: Process = getWorkflowByTag(Process::class.java, tag)

// send a signal to all those workflows through their channnel
process.decisionChannel.send(true)

Starting another method for a running workflow

When we use the stub of a running workflow to start a method, we actually create another execution running in parallel to the main one.

// create stub of a running Process workflow
Process Process = getWorkflowById(Process.class, id);

// dispatching a new task without waiting for the result
Deferred<T> deferred = dispatch(service::handle, name, email);
// create stub of a running Process workflow
val process: Process = getWorkflowById(Process::class.java, id)

// dispatching a new method without waiting for the result
val deferred: Deferred<T> = dispatch(process::handle, name, email)

Get or set current properties of another wortklow

When multiple methods (of the same workflow instance) are running in parallel, they share the instance properties.

For example, dispatching getters or setters of a workflow is a way to get or set properties in another workflow. In the example below, we can use the getter/setter methods of points property from another workflow. Also, the bonus method lets us add a bonus to the current value of points.

public class LoyaltyImpl extends Workflow implements Loyalty {
    private Integer points = 0;

    @Override
    public Integer getPoints() {
        return points;
    }

    @Override
    public Integer bonus(Integer value) {
        points += value;

        return points;
    }

    @Override
    public void start() {
        ...
    }
}
class LoyaltyImpl : Workflow(), Loyalty {
    val points: Int = 0

    override fun bonus(value: Int) : Int {
        points += value

        return points
    }

    override fun start() {
        ...
    }
}
Edit this page on GitHub Updated at Mon, Oct 25, 2021