Signals

Subscribe here to follow Infinitic's development. Please ⭐️ us on Github!

We can send external signals to workflows. Typically, a signal will convey an information such as "a document has been validated" or "an order was just shipped"...

We have described how a client can send a signal to a running workflow. Here, we will describe how the workflow can handle them.

Implementing channels

Signals are sent through channels. Our client can send a signal to a running workflow like this:

// stub targeting a running HelloWorld workflow with a specific id
HelloWorld helloworld = client.getWorkflowById(HelloWorld.class, "05694902-5aa4-469f-824c-7015b0df906c");

// send a signal to this instance through a channnel
helloworld.getNotificationChannel().send("foobar");
// stub targeting a running HelloWorld workflow with a specific id
val helloworld: HelloWorld = client.getWorkflowById(HelloWorld::class.java, "05694902-5aa4-469f-824c-7015b0df906c")

// send a signal to this instance through a channnel
helloworld.notificationChannel.send("foobar")

To allow this, channels need to be present in a workflow interface as below:

public interface HelloWorld {
    SendChannel<String> getNotificationChannel();

    ...
}
interface HelloWorld {
    val notificationChannel: SendChannel<String>

    ...
}

Our workflow implements this interface using the provided channel workflow method:

public class HelloWorldImpl extends Workflow implements HelloWorld {
    final Channel<String> notificationChannel = channel();

    @Override
    public Channel<String> getNotificationChannel() {
        return notificationChannel;
    }

   ...
}
class HelloWorldImpl : Workflow(), HelloWorld {
    val notificationChannel = channel<String>()

    ...
}

In the examples above, Channel<String> is used as an example. But Channel<T> supports any serializable T type, not only String.

Receiving signals

A workflow only receives signals that it is waiting for. Per default, all signals sent to a workflow are discarded. To receive some signals, we need to explicitly ask for them with the receive method:

...
Deferred<String> deferredSignal = getNotificationChannel().receive();
...
...
val deferredSignal: Deferred<String> = notificationChannel.receive()
...

All signals sent to the workflow before reaching the receive command will be discarded. The receive command is non-blocking and will return immediately, but from there Infinitic will buffer any received signal.

Per default, a receive() call tells Infinitic to buffer all new incoming signals, but if the workflow uses only the next n signal, we can specificy that with a receive(n) call. After the next n signals, the other ones will be discarded again.

Each await() call to this Deferred returns the next signal buffered if already been received, or waits for it if not.

Deferred<String> deferredSignal = getNotificationChannel().receive();
...
String signal1 = deferredSignal.await();
...
String signal2 = deferredSignal.await();
val deferredSignal: Deferred<String> = notificationChannel.receive()
...
val signal1: String = deferredSignal.await()
...
val signal2: String = deferredSignal.await()

Filtering events by type

Let's say we have a Channel<Event> channel receiving objects of type Event. If we want our workflow to wait only for a sub-type ValidationEvent:

Deferred<ValidationEvent> deferred = getEventChannel().receive(ValidationEvent.class);
val deferred: Deferred<ValidationEvent> = eventChannel.receive(ValidationEvent::class)

Filtering events by attributes

If we want our workflow to wait only for an Event with specific attributes, we can write a requirement using a JSONPath predicate that will be applied on the serialized event. For example, if we want to receive an Event with a specific ef20b7a9-849b-41f8-89e9-9c5492efb098 userId, we can do:

Deferred<Event> deferred =
    getEventChannel().receive("[?(\$.userId == \"ef20b7a9-849b-41f8-89e9-9c5492efb098\")]");
val deferred: Deferred<Event> =
    eventChannel.receive("[?(\$.userId == \"ef20b7a9-849b-41f8-89e9-9c5492efb098\")]")

or using a filter predicate (after adding com.jayway.jsonpath:json-path:2.5.0 to our project)

Deferred<Event> deferred =
    getEventChannel().receive("[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"));
val deferred: Deferred<Event> =
    eventChannel.receive("[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"))

Filtering events by type and attributes

At last, if we want to receive an event having both a specific type and specific attributes:

Deferred<ValidationEvent> deferred =
    getEventChannel().receive(ValidationEvent.class, "[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"));
val deferred: Deferred<ValidationEvent> =
    eventChannel.receive(ValidationEvent::class, "[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"))

Unit testing predicates

In our unit tests, we would like to check if an event is correctly filtered by a JSONPath predicate - below is an example of statements that should be true if event has the right userId:

import io.infinitic.common.workflows.data.channels.ChannelEventFilter;
import io.infinitic.common.workflows.data.channels.ChannelEvent;
import com.jayway.jsonpath.Criteria.where;
...

ChannelEventFilter
  .from("[?(\$.userId == \"ef20b7a9-849b-41f8-89e9-9c5492efb098\")]")
  .check(ChannelEvent.from(event));

// or

ChannelEventFilter
  .from("[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"))
  .check(ChannelEvent.from(event));
import io.infinitic.common.workflows.data.channels.ChannelEventFilter
import io.infinitic.common.workflows.data.channels.ChannelEvent
import com.jayway.jsonpath.Criteria.where
...

ChannelEventFilter
  .from("[?(\$.userId == \"ef20b7a9-849b-41f8-89e9-9c5492efb098\")]")
  .check(ChannelEvent.from(event))

// or

ChannelEventFilter
  .from("[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"))
  .check(ChannelEvent.from(event))
Edit this page on GitHub Updated at Sun, Jul 3, 2022