Managing Workflows

Infinitic is still in active development. Subscribe here to follow the progress.

Infinitic client let us start and cancel workflows, usually from our Web App controllers.

Starting New Workflows

New workflow stub

The Infinitic client manages workflows through stubs built from the workflow interface. Here is an example of workflow interface from our Hello World app:

public interface HelloWorld {
    String greet(@Nullable String name);
}
interface HelloWorld {
    fun greet(name: String?): String
}

Using this interface, an Infinitic client can create a stub that behaves syntactically as an instance of this workflow:

HelloWorld helloWorld = client.newWorkflow(HelloWorld.class);
val helloWorld = client.newWorkflow<HelloWorld>()

We can also add tags to this instance. It can be very useful to target later this instance by tag:

Set<String> tags = new HashSet<>();
tags.add("foo");
tags.add("bar");

HelloWorld helloWorld = client.newWorkflow(HelloWorld.class, tags);
val helloWorld = client.newWorkflow<HelloWorld>(tags = setOf("foo", "bar"))

Synchronous start

This stub can be used to trigger a synchronous execution:

String greeting = helloWorld.greet("Infinitic");
val greeting = helloWorld.greet("Infinitic")

When dispatching a workflow, the client serializes parameters and send them through Pulsar to the workflow engine, that will orchestrate the workflow. Eventually, the return value will be serialized and sent back to the client through Pulsar:

Asynchronous start

Of course, the client can also trigger an asynchronous execution:

Deferred<String> deferred = client.async(helloWorldService, t -> t.sayHello("Infinitic"));
val deferred: Deferred<String> = client.async(helloWorldService) { sayHello("Infinitic") }

We can use the returned Deferred<T> to:

  • wait for the synchronous completion:

    T result = deferred.await();
    
    val result: T = deferred.await()
    

    where T is the actual return type.

    The await() method blocks the current thread of the client - up to the workflow termination. It will throw an UnknownWorkflow exception if the workflow is already terminated.

  • retrieve the underlying workflow's id:

    java.util.UUID id = deferred.id;
    
    val id: java.util.UUID = deferred.id
    

Managing Running Workflows

A workflow is said running, as long as it is neither completed neither canceled.

Running workflow stub

We can create the stub of a running workflow from its id:

HelloWorld helloworld = client.getWorkflow(HelloWorld.class, id);
val helloworld: HelloWorld = client.getWorkflow<HelloWorld>(id)

Alternatively, we can create a stub targeting all running workflow having a given tag:

HelloWorld helloworld = client.getWorkflow(HelloWorld.class, "foo");
val helloworld: HelloWorld = client.getWorkflow<HelloWorld>(tag = "foo")

Cancel a running workflow

Using this stub, we can cancel the targeted workflow(s):

client.cancel(helloworld);
client.cancel(helloworld)

Cancelling a workflow will cancel its child workflows as well.

Send an object to a running workflow

If the running workflow(s) contains one or more SendChannel, it's possible to send an object to this workflow. Those channels should be described in the interface, for example:

public interface HelloWorld {
    SendChannel<String> getNotificationChannel();

    String greet(@Nullable String name);
}
interface HelloWorld {
    val notificationChannel: SendChannel<String>

    fun greet(name: String?): String
}

Here, we have a SendChannel of type String, but it can be of any (serializable) type. We can send an object to a running instance like this:

helloworld.getNotificationChannel().send("foobar");
helloworld.notificationChannel.send("foobar")

@Name annotation

A workflow instance is internally described by both its full java name (including its package) and the name of the method called.

In some situations we may want to avoid coupling those names with the underlying implementation, for example if we want to rename the class or method, or if we mix programming languages.

Infinitic provides a @Name annotation that let us declare explicitly the names that Infinitic should use internally. For example:

package hello.world.workflows;

import io.infinitic.annotations.Name;

@Name("HelloWorkflow")
public interface HelloWorld {

    @Name("welcome")
    String greet(@Nullable String name);
}
package hello.world.workflows

import io.infinitic.annotations.Name

@Name('HelloWorkflow')
interface HelloWorld {

    @Name('welcome')
    fun greet(name: String?): String
}

When using this annotation, the name setting in workflow worker configuration file should be the one provided by the annotation:

workflows:
  - name: HelloWorkflow
    class: hello.world.workflows.HelloWorldImpl
Edit this page on GitHub Updated at Fri, May 28, 2021