Services
Service Syntax
Constraints
Upon receiving a message instructing to execute a task, a service worker instantiates the service class, deserializes the parameters, executes the requested method and returns the serialized result:
The parameters and return value of a method used as task must be serializable.
If we chose to have several executions in parallel (concurrency
> 1), they must not interfere with each other:
A method used as a task must be thread-safe.
If our method uses multi-threading, we should keep in mind that a task is considered completed when the method returns. Any exception occurring on another thread than the one that initiated the execution of the task will be ignored.
Recommendations
For an easier versioning of services, we recommend:
Each service should be given a simple name through the @Name annotation.
Methods used as tasks should have:
- One parameter of a dedicated type object
- A return value of a dedicated type object
For example,
package com.company.services;
import io.infinitic.annotations.Name;
@Name(name = "MyService")
public interface MyService {
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input);
MySecondTaskOutput mySecondTask(MySecondTaskInput input);
}
package com.company.services
import io.infinitic.annotations.Name
@Name("MyService")
interface MyService {
fun myFirstTask(input: MyFirstTaskInput): MyFirstTaskOutput
fun mySecondTask(input: MySecondTaskInput): MySecondTaskOutput
}
Task Failure
Technical failures should be distinguished from non-technical failures:
- A technical failure is triggered by an unexpected exception.
- A non-technical failure is the inability to fulfill the task due to "business" reasons. For example:
- A flight booking fails because no seats remain available.
- A bank wire fails because there is not enough money in the account.
Non-technical failures are better handled through a status in the return value. (That's another reason why having an object as return value is a good practice.)
When an exception is thrown during a task execution, the task will be automatically retried based on its retry policy.
When a task execution run for longer than a defined timeout, the task will also automatically fail and be retried.
Runtime Timeout
We can set a maximum duration of task execution inside a Service worker, by defining a runtime timeout in the Service implementation. By default, tasks do not have a runtime timeout defined.
Since 0.12.0, the runtime timeout must be defined on the class implementation. When defined in the interface, a timeout represents the maximal duration of the task dispatched by workflows (including retries and transportation) before a timeout is thrown at the workflow level.
There are multiple ways to define a runtime timeout:
- In the service implementation:
- By using the
WithTimeout
interface - By using the
@Timeout
annotation
- By using the
- In the worker:
- Through its configuration file
- Through service registration
The timeout policy used will be the first found in this order:
- Worker's configuration
@Timeout
method annotation@Timeout
class annotationWithTimeout
interface- No timeout
WithTimeout
Interface
The WithTimeout
interface requires a getTimeoutInSeconds
method. When present, Infinitic will call this method to know which timeout to apply for all tasks of the service:
package com.company.services;
import io.infinitic.tasks.WithTimeout;
public class MyServiceImpl implements MyService, WithTimeout {
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input) { /* */ }
MySecondTaskOutput mySecondTask(MySecondTaskInput input) { /* */ }
public Double getTimeoutInSeconds() {
return 100.0;
}
}
package com.company.services
import io.infinitic.tasks.WithTimeout
class MyServiceImpl : MyService, WithTimeout {
override fun myFirstTask(MyFirstTaskInput input) { /* */ }
override mySecondTask(MySecondTaskInput input) { /* */ }
override fun getTimeoutInSeconds() = 100.0
}
We can use the task context to personalize the timeout per task, for example:
package com.company.services;
import io.infinitic.tasks.Task;
import io.infinitic.tasks.WithTimeout;
public class MyServiceImpl implements MyService, WithTimeout {
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input) { /* */ }
MySecondTaskOutput mySecondTask(MySecondTaskInput input) { /* */ }
public Double getTimeoutInSeconds() {
if(Task.getTaskName().equals("myFirstTask"))
return 200.0;
else
return 100.0;
}
}
package com.company.services
import io.infinitic.tasks.Task
import io.infinitic.tasks.WithTimeout
class MyServiceImpl : MyService, WithTimeout {
override fun myFirstTask(MyFirstTaskInput input) { /* */ }
override mySecondTask(MySecondTaskInput input) { /* */ }
override fun getTimeoutInSeconds() = when (Task.taskName) {
::myFirstTask.name -> 200.0
else -> 100.0
}
}
@Timeout
Annotation
This annotation has a class implementing WithTimeout
as parameter.
It can also be used as a method annotation to define a timeout on a specific task:
package com.company.services;
import io.infinitic.annotations.Timeout;
public class MyServiceImpl implements MyService {
@Timeout(with = WithTimeoutImpl.class)
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input) { /* */ }
MySecondTaskOutput mySecondTask(MySecondTaskInput input) { /* */ }
}
package com.company.services
import io.infinitic.annotations.Timeout
class MyServiceImpl : MyService {
@Timeout(WithTimeoutImpl::class.java)
override fun myFirstTask(MyFirstTaskInput input) { /* */ }
override mySecondTask(MySecondTaskInput input) { /* */ }
}
Or as a class annotation to define a timeout on all tasks:
package com.company.services;
import io.infinitic.annotations.Timeout;
@Timeout(with = WithTimeoutImpl.class)
public class MyServiceImpl implements MyService {
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input) { /* */ }
MySecondTaskOutput mySecondTask(MySecondTaskInput input) { /* */ }
}
package com.company.services
import io.infinitic.annotations.Timeout
@Timeout(WithTimeoutImpl::class.java)
class MyServiceImpl : MyService {
override fun myFirstTask(MyFirstTaskInput input) { /* */ }
override mySecondTask(MySecondTaskInput input) { /* */ }
}
Retries Policy
Per default, all tasks have a truncated and randomized exponential backoff retry policy.
There are multiple ways to define another retry policy:
- In the service:
- In the worker:
- Through its configuration file
- Through service registration
The retry policy used will be the first found in this order:
- Worker's configuration
@Retry
method annotation@Retry
class annotationWithRetry
interface- Default retry policy
WithRetry
Interface
The WithRetry
interface requires a getSecondsBeforeRetry
method with 2 parameters:
retry
: retry index (starting at 0)exception
: the exception that triggered the failure
When present, Infinitic will call this method if an exception was thrown. The result of this call will tell Infinitc how many seconds it should wait before retrying the task.
package com.company.services;
import io.infinitic.tasks.WithRetry;
public class MyServiceImpl implements MyService, WithRetry {
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input) { /* */ }
MySecondTaskOutput mySecondTask(MySecondTaskInput input) { /* */ }
public Double getSecondsBeforeRetry(int retry, Exception e) {
return 1.0;
}
}
package com.company.services
import io.infinitic.tasks.WithRetry
class MyServiceImpl : MyService, WithRetry {
override fun myFirstTask(MyFirstTaskInput input) { /* */ }
override mySecondTask(MySecondTaskInput input) { /* */ }
override fun getSecondsBeforeRetry(retry: Int, exception: Exception) = 1.0
}
We can use the task context to personalize the delay before retry per task, for example:
package com.company.services;
import io.infinitic.tasks.Task;
import io.infinitic.tasks.WithRetry;
public class MyServiceImpl implements MyService, WithRetry {
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input) { /* */ }
MySecondTaskOutput mySecondTask(MySecondTaskInput input) { /* */ }
public Double getSecondsBeforeRetry(int retry, Exception e) {
if(Task.getTaskName().equals("mySecondTask"))
return 2.0;
else
return 1.0;
}
}
package com.company.services
import io.infinitic.tasks.Task
import io.infinitic.tasks.WithRetry
class MyServiceImpl : MyService, WithRetry {
override fun myFirstTask(MyFirstTaskInput input) { /* */ }
override mySecondTask(MySecondTaskInput input) { /* */ }
override fun getSecondsBeforeRetry(retry: Int, exception: Exception) =
when (Task.taskName) {
::mySecondTask.name -> 2.0
else -> 1.0
}
}
@Retry
Annotation
@Retry
annotation takes a class implementing WithRetry
as parameter.
It can be used as a method annotation to define a retry policy on a specific task:
package com.company.services;
import io.infinitic.annotations.Retry;
public class MyServiceImpl implements MyService {
@Retry(with = WithRetryImpl.class)
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input) { /* */ }
MySecondTaskOutput mySecondTask(MySecondTaskInput input) { /* */ }
}
package com.company.services
import io.infinitic.annotations.Retry
class MyServiceImpl : MyService {
@Retry(WithRetryImpl::class.java)
override fun myFirstTask(MyFirstTaskInput input) { /* */ }
override mySecondTask(MySecondTaskInput input) { /* */ }
}
Or as a class annotation to define a retry policy on all tasks
package com.company.services;
import io.infinitic.annotations.Retry;
@Retry(with = WithRetryImpl.class)
public class MyServiceImpl implements MyService {
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input) { /* */ }
MySecondTaskOutput mySecondTask(MySecondTaskInput input) { /* */ }
}
package com.company.services
import io.infinitic.annotations.Retry
@Retry(WithRetryImpl::class.java)
class MyServiceImpl : MyService {
override fun myFirstTask(MyFirstTaskInput input) { /* */ }
override mySecondTask(MySecondTaskInput input) { /* */ }
}
@Name Annotation
A task instance is internally described by both its full java name (package included) and the name of the method called.
We may want to avoid coupling this name with the underlying implementation, for example if we want to rename the class or method, or if we mix programming languages.
Infinitic provides a @Name
annotation that let us declare explicitly the names that Infinitic should use internally. For example:
package com.company.services;
import io.infinitic.annotations.Name;
@Name(name = "MyNewServiceName")
public interface MyService {
@Name(name = "FirstTask")
MyFirstTaskOutput myFirstTask(MyFirstTaskInput input);
@Name(name = "SecondTask")
MySecondTaskOutput mySecondTask(MySecondTaskInput input);
}
package com.company.services
import io.infinitic.annotations.Name
@Name("MyNewServiceName")
interface MyService {
@Name("FirstTask")
fun myFirstTask(input: MyFirstTaskInput): MyFirstTaskOutput
@Name("SecondTask")
fun mySecondTask(input: MySecondTaskInput): MySecondTaskOutput
}
When using this annotation, the Service name
setting in Service worker configuration file should be the one provided by the annotation:
services:
- name: MyNewServiceName
class: com.company.services.MyServiceImpl
Task Context
In some cases, it is useful to understand more about the context in which a task is executed.
The io.infinitic.tasks.Task
class provides the following static properties:
Name | Type | Description |
---|---|---|
taskId | String | Unique identifier of the task |
serviceName | String | Name of the Service (taken from the @Name annotation if provided, otherwise defaulting to the service's interface name) |
taskName | String | Name of the task (taken from the @Name annotation if provided, otherwise defaulting to the service's method name) |
workflowId | String? | Unique identifier of the workflow (if part of a workflow) |
workflowName | String? | Name of the workflow (if part of a workflow) |
tags | Set<String> | Tags provided when dispatching the task |
meta | MutableMap<String, ByteArray> | Metadata provided when dispatching the task |
retryIndex | Integer | Number of times the task was automatically retried |
retrySequence | Integer | Number of times the task was manually retried |
lastError | ExecutionError? | If any, the error during the previous attempt |
client | InfiniticClient | An InfiniticClient that can be used inside the task |
The task context is only accessible from the thread that initiated:
- the method (task) execution
- the
getTimeoutInSeconds
execution - the
getSecondsBeforeRetry
execution
In tests, io.infinitic.tasks.TaskContext
can be mocked and injected through Task.setContext(mockedTaskContext)
before running a test that uses the task's context.
tags
The tags
property of the task context is an immutable set of strings. Its value is defined when dreating the Service stub before dispatching the task:
final HelloService helloService = newService(
HelloService.class,
tags = Set.of("userId" + userId, "companyId" + companyId)
);
val helloService = newService(
HelloService::class.java,
tags = setOf("userId:$userId", "companyId:$companyId")
)
meta
The meta
property of the task context is a mutable map of strings to arrays of bytes. Its value is defined when creating the Service stub before dispatching the task:
final HelloService helloService = newService(
HelloService.class,
tags = null,
meta = Map.of(
"foo", "bar".getBytes(),
"baz", "qux".getBytes()
)
);
private val helloService = newService(
HelloService::class.java,
meta = mapOf(
"foo" to "bar".toByteArray(),
"baz" to "qux".toByteArray()
)
)
The meta
property is mutable and can be read and write during the task execution and its retries.
retryIndex
The retryIndex
property of the task context indicates the current number of retry attempts. It starts at 0 and is incremented when a task is automatically retried:
When a task is manually retried, the retryIndex
property is reset to 0.
retrySequence
The retrySequence
property of the task context indicates the current number of manual retries. It starts at 0 and is incremented when a task is manually retried:
lastError
If not null (when retryIndex
>= 1), the lastError property is a data object representing the exception thrown during the last task attempt. This ExecutionError
instance has the following properties:
Name | Type | Description |
---|---|---|
workerName | String | Name of the worker where the previous Exception was thrown |
name | String | Name of the previous Exception |
message | String | Message of the previous Exception |
stackTraceToString | String | Stringified stack trace of the previous Exception |
cause | ExecutionError? | Cause of the previous Exception |