Workflow
Parallelization
When a workflow need to do different actions in parallel, it can:
- dispatch tasks asynchronously
- dispatch child-workflows
- run multiple methods in parallel
Asynchronous tasks
Instead of running tasks sequentially, we may want to run some of them asynchronously, as illustrates below:
This is often the case for tasks for which the returned value is not important, like sending an email.
To dispatch task asynchronously, we also use the stub, created by the newService
workflow function from a class interface:
public class MyWorkflow extends Workflow implements MyWorkflowInterface {
// create a stub of ServiceInterface
private final ServiceInterface service = newService(ServiceInterface.class);
@Override
public String start(String name, String email) {
...
// dispatching taskA without waiting for the result
Deferred<Boolean> deferredA = dispatch(service::taskA, parametersA);
// dispatching taskB and wait for its result
service.taskB(parametersB)
// dispatching taskC and wait for its result
service.taskC(parametersC)
}
}
class MyWorkflow : Workflow(), MyWorkflowInterface {
// create a stub of ServiceInterface
private val service : ServiceInterface = newService(ServiceInterface::class.java)
override fun start(name: String, email: String): String {
...
// dispatching taskA waiting for the result
val deferredA: Deferred<Boolean> = dispatch(service::taskA, parameterA)
// dispatching taskB and wait for its result
service.taskB(parameterB)
// dispatching taskC and wait for its result
deferred.taskC(parameterC)
}
}
When dispatching a task asynchronously, a Deferred<T>
object is created (T
being the return type of the task). We can use it to synchronously wait for the completion of taskA if needed:
public class MyWorkflow extends Workflow implements MyWorkflowInterface {
// create a stub of ServiceInterface
private final ServiceInterface service = newService(ServiceInterface.class);
@Override
public String start(String name, String email) {
...
// dispatching taskA without waiting for the result
Deferred<Boolean> deferredA = dispatch(service::taskA, parametersA);
// dispatching taskB and wait for its result
service.taskB(parametersB)
// wait and get result of the dispatched taskA
Boolean result = deferredA.await();
// dispatching taskC and wait for its result
service.taskC(parametersC)
}
}
class MyWorkflow : Workflow(), MyWorkflowInterface {
// create a stub of ServiceInterface
private val service : ServiceInterface = newService(ServiceInterface::class.java)
override fun start(name: String, email: String): String {
...
// dispatching taskA waiting for the result
val deferredA: Deferred<Boolean> = dispatch(service::taskA, parameterA)
// dispatching taskB and wait for its result
service.taskB(parameterB)
// wait and get result of the dispatched taskA
val result : Boolean = deferredA.await()
// dispatching taskC and wait for its result
deferred.taskC(parameterC)
}
}
Child workflows
If we want to run asynchrously more than a single task, we can use child-worlflows. Dispatching a child-workflow is as easy as dispatching a task. When the child-workflow completes, the return value is sent back to the parent workflow.
The illustration below illustrates this, with a child-workflow of 3 sequential tasks:
Similarly to tasks, we handle child-workflows through stubs created from their interface by the newWorkflow
function.
For example, a distributed (and inefficient) way to calculate the factorial of n is exposed below, using n workflow instances, each of them - excepted the last one - dispatching a child-workflow.
public class Calculate extends Workflow implements CalculateInterface {
private final Calculate calculate = newWorkflow(CalculateInterface.class);
@Override
public Long factorial(Long n) {
if (n > 1) {
return n * calculate.factorial(n - 1);
}
return 1;
}
}
class Calculate() : Workflow(), CalculateInterface {
private val calculate = workflow<CalculateInterface>()
override fun factorial(n: Long) = when {
n > 1 -> n * workflow.factorial(n - 1)
else -> 1
}
}
Parallel methods
When we dispatch a child-workflow, we create a new workflow instance. But it is also possible to run multiple methods within the same workflow instance, as illustrated below:
In this illustration,
- the
methodA
was the method used at the workflow start - the
methodB
was dispatched from a client or another workflow but run inside the same instance thanmethodA
.
The main raison to dispatch parallel methods instead of a child-workflow is related to the instance properties:
When multiple methods of the same workflow instance are running in parallel, they share the values of the workflow properties, and are able to read and update them to exchange information.
This can be very useful to get properties or trigger new actions on a running workflow, as described here.
To run a method on a running workflow, we can target
- a workflow by id using the
getWorkflowById
function - all workflows having a specific tag, using the
getWorkflowByTag
function Those functions are available both on the Infinitic client and within workflows:
// create a stub targeting by id a running workflow
HelloWorkflow w = client.getWorkflowById(HelloWorkflow.class, id);
or
// create a stub targeting by tag some running workflows
HelloWorkflow w = client.getWorkflowByTag(HelloWorkflow.class, "foo");
// create a stub targeting by id running workflow
val w : HelloWorkflow = client.getWorkflowById(HelloWorkflow::class.java, id)
or
// create a stub targeting by tag some running workflows
val w : HelloWorkflow = client.getWorkflowByTag(HelloWorkflow::class.java, tag = "foo")
then
// asynchronously dispach a method on targeted instances
client.dispatch(w::other, parameters);
or
// synchronously run a method on targeted instances
w.other(parameters);
// asynchronously dispach a method on targeted instances
client.dispatch(w::other, parameters)
or
// synchronously run a method on targeted instances
w.other(parameters)
To dispatch another method on the same workflow, we can define a stub targeting the very same workflow with:
// create a stub targeting the current workflow
@Ignore private HelloWorkflow self() {
return getWorkflowById(HelloWorkflow.class, getWorkflowId());
}
...
// asynchronously dispach a method on same instance
dispatch(self()::other, parameters);
// create a stub targeting the current workflow
@Ignore private val self by lazy {
getWorkflowById(HelloWorkflow::class.java, workflowId)
}
...
// asynchronously dispach a method on same instance
dispatch(self::other, parameters)