External Events

Infinitic is still in active development. Subscribe here to follow the progress.

Channels introduce a way to send "events" (any serializable object actually) to a running workflow.

In the Client section, we have described how to send events to a running workflow. Here, we will describe how to handle them.

In the examples below, Channel<String> is used as an example. But Channel supports any serializable type, not only String.

Channel definition

To use a channel, just add it to the workflow interface using the channel workflow's method. For example,

public interface HelloWorld {
    SendChannel<String> getNotificationChannel();

    ...
}
interface HelloWorld {
    val notificationChannel: SendChannel<String>

    ...
}

And in our workflow implementation:

public class HelloWorldImpl extends Workflow implements HelloWorld {
    final Channel<String> notificationChannel = channel();

    @Override
    public Channel<String> getNotificationChannel() {
        return notificationChannel;
    }

   ...
}
class HelloWorldImpl : Workflow(), HelloWorld {
    val notificationChannel = channel<String>()

    ...
}

Channel usage

We receive only the events that we are waiting for. Per default, events sent to a workflow are discarded. To receive an event, we need to explicitly ask for it, using the receive method:

...
Deferred<String> deferred = getNotificationChannel().receive();
...
...
val result: Deferred<String> = notificationChannel.receive()
...

All events sent to the workflow before it reaches the above line will be discarded. The first event sent to the workflow after it reaches this line will be caught. The following events will be discarded unless the receive() method is previously used again.

As all Deferred we use the await() method if we want to pause the workflow up to actually receiving an event:

Once received, the workflow will resume and the result variable will contain "success" in the above example.

If we do not care when an event has been received but only if it was received, then we can apply the receive method earlier, for example at workflow start:

Deferred<String> deferredNotification = getNotificationChannel().receive();
...
String result = deferredNotification.await();
val deferredNotification: Deferred<String> = notificationChannel.receive()
...
val result: String = deferredNotification.await()

The first String received during the period represented by the brace will be the await() method result.

Filtering events by type

Let's say we have a Channel<Event> channel receiving objects of type Event. If we want our workflow to wait only for a sub-type ValidationEvent:

Deferred<ValidationEvent> deferred = getEventChannel().receive(ValidationEvent.class);
val deferred: Deferred<ValidationEvent> = eventChannel.receive(ValidationEvent::class)

Filtering events by attributes

If we want our workflow to wait only for an Event with specific attributes, we can write a requirement using a JSONPath predicate that will be applied on the serialized event. For example, if we want to receive an Event with a specific ef20b7a9-849b-41f8-89e9-9c5492efb098 userId, we can do:

Deferred<Event> deferred =
    getEventChannel().receive("[?(\$.userId == \"ef20b7a9-849b-41f8-89e9-9c5492efb098\")]");
val deferred: Deferred<Event> =
    eventChannel.receive("[?(\$.userId == \"ef20b7a9-849b-41f8-89e9-9c5492efb098\")]")

or using a filter predicate (after adding com.jayway.jsonpath:json-path:2.5.0 to our project)

Deferred<Event> deferred =
    getEventChannel().receive("[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"));
val deferred: Deferred<Event> =
    eventChannel.receive("[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"))

Filtering events by type and attributes

At last, if we want to receive an event having both a specific type and specific attributes:

Deferred<ValidationEvent> deferred =
    getEventChannel().receive(ValidationEvent.class, "[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"));
val deferred: Deferred<ValidationEvent> =
    eventChannel.receive(ValidationEvent::class, "[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"))

Unit testing predicates

In our unit tests, we would like to check if an event is correctly filtered by a JSONPath predicate - below is an example of statements that should be true if event has the right userId:

import io.infinitic.common.workflows.data.channels.ChannelEventFilter;
import io.infinitic.common.workflows.data.channels.ChannelEvent;
import com.jayway.jsonpath.Criteria.where;
...

ChannelEventFilter
  .from("[?(\$.userId == \"ef20b7a9-849b-41f8-89e9-9c5492efb098\")]")
  .check(ChannelEvent.from(event));

// or

ChannelEventFilter
  .from("[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"))
  .check(ChannelEvent.from(event));
import io.infinitic.common.workflows.data.channels.ChannelEventFilter
import io.infinitic.common.workflows.data.channels.ChannelEvent
import com.jayway.jsonpath.Criteria.where
...

ChannelEventFilter
  .from("[?(\$.userId == \"ef20b7a9-849b-41f8-89e9-9c5492efb098\")]")
  .check(ChannelEvent.from(event))

// or

ChannelEventFilter
  .from("[?]", where("userId").eq("ef20b7a9-849b-41f8-89e9-9c5492efb098"))
  .check(ChannelEvent.from(event))
Edit this page on GitHub Updated at Fri, May 28, 2021