Integrating Process and Task Events with Camunda Platform, AWS Kinesis, and Elasticsearch

Editor’s note: At Camunda Community Summit 2022, Fidelity Investments Director of Architecture Harish Malavade shared a behind-the-scenes look at Fidelity’s digital automation platform, which uses Camunda as its core workflow engine. During this Expert Session, we learned about Fidelity’s custom requirements for process and task management as well as reporting and analytics. Malavade also discussed how his team integrated BPMN process and task events to better maintain platform-level process and task status, improve analytics and reporting, and so much more.

Catch the full presentation here.  

In this guest blog post, Malavade shares some of the technical details of this integration and the Camunda documentation that were used along the way.


In a microservices architecture, events are often used to communicate between loosely coupled applications so they can scale and fail independently. These applications can consume and react to the events generated by producer applications where the state is changing or an update is occurring.

In a BPMN world, process and task lifecycle events can be powerful in capturing and reacting when a process is started, ended, reached a particular state, suspended, resumed, errored, or resolved; or when a task is created, assigned, completed, timed out, etc. These events can also be used in loosely coupling the core process engine with custom UIs, Reporting, Analytics, and Machine Learning.

Camunda Event Integration

Camunda provides multiple ways to integrate with events and these events can be real-time, historic, or based on the history level set while configuring the process engine.

Let us understand a few key concepts.

Event Listeners

Camunda supports defining two types of event listeners. More details can be found here.

Execution Listeners

  • Capture start and end events at every shape and at the process level. 
  • Can also execute custom logic from any Java class that is deployed along with the process engine.

Example

SingleTaskProcess is an execution listener that can be added at the process level to set firstname as a variable when the process has started and set lastname as a variable when the process has ended. The same can be achieved at every state change across all shapes within BPMN to track token movement.

Global Listener can be added like below:

public class CustomProcessListener implements ExecutionListener {
	@Override
	public void notify(DelegateExecution execution) throws Exception {
		ProcessEventTransformer processEventTransformer = new ProcessEventTransformer();
		processEventTransformer.transformtoCem(execution);
	}
}

Task Listeners

In addition to execution listeners, user tasks can have custom task listeners that can:

  • Capture user task lifecycle events like task create, assignment, updates, timeouts, complete, and delete. 
  • Execute custom logic from scripts, expressions, or Java classes deployed along with the process engine

In the example below, there is a need to execute custom business logic on task create and assign:

a screenshot showing the need to execute custom business to assign and create scripts

Global task listeners can be added like below:

public class CustomTaskCreateListener implements TaskListener {
@Override
    public void notify(DelegateTask delegateTask) {
        Context.getCommandContext().getTransactionContext()
        .addTransactionListener(TransactionState.COMMITTED, commandContext -> {
        //TODO listener logic Here to avoid duplicate events during optimisticLock
    }
}

History Event Handler

History event handlers provide audit information about executed process instances depending on the history level set during process engine configuration. 

There are two flavors of history event handlers:

  • DefaultHistoryHandler provides default history at process, task, activity, variable, etc., used in reporting where data is being pulled from history tables and
  • CustomHistoryHandler which can execute custom logic along with events being stored in history tables. 

More details can be found here.

The process engine and its core engine and history service.
The process engine

Let us also understand a few more key concepts that Camunda supports which can help us build this event framework:

public class CamundaProcessHistoryEventHandler implements HistoryEventHandler {
 @Override
    public void handleEvent(HistoryEvent historyEvent) {
		//TODO handle state change events here
	}
}

Engine Plugin

An engine plugin allows the extending and customizing of a process engine configuration by overwriting certain behavior.

It can also:

  • Add behaviors before or after the process engine is initialized.
  • Add event parsers and handlers before engine initialization.

More details can be found here.

Engine plugins can be added with the below:

public class EventPlugin extends AbstractProcessEnginePlugin {
@Override
    public void preInit(ProcessEngineConfigurationImpl processEngineConfiguration) {
        List<BpmnParseListener> preParseListeners = processEngineConfiguration.getCustomPreBPMNParseListeners();
        if (preParseListeners == null) {
	        preParseListeners = new ArrayList<BpmnParseListener>();
	        processEngineConfiguration.setCustomPreBPMNParseListeners(preParseListeners);
        }
        preParseListeners.add(new CreateTaskParser());
        preParseListeners.add(new AssignTaskParser());
        preParseListeners.add(new CompleteTaskParser());
        preParseListeners.add(new DeleteTaskParser());
    }
}

Parse Listeners

Parse listeners are registered to receive events during BPMN parsing and can override parse methods on any activity you desire to preempt. More than one parse listener can be added to the process engine.

More details can be found here.

Parse listeners can be added using the below:

public class CreateTaskParser extends AbstractBpmnParseListener  {
@Override
    public void parseUserTask(Element userTaskElement, ScopeImpl scope, ActivityImpl activity) {
        ActivityBehavior activityBehavior = activity.getActivityBehavior();
        if(activityBehavior instanceof UserTaskActivityBehavior ){
            UserTaskActivityBehavior userTaskActivityBehavior = (UserTaskActivityBehavior) activityBehavior;
            userTaskActivityBehavior
            .getTaskDefinition()
            .addTaskListener(TaskListener.EVENTNAME_CREATE, new TaskCreateListener());
    }
}

Incident Handler

Incident handlers can be used to manage process failures and viewed in an operational dashboard or heatmap. They can also be configured to alert relevant stakeholders.

More details can be found here

Incident handlers can be added using:

public class IncidentHandler extends DefaultIncidentHandler {
@Override
    public Incident handleIncident(IncidentContext context, String message) {
	    //TODO custom logic to handle Incident
    }
@Override
    public void resolveIncident(IncidentContext context) {
	    //TODO custom logic to resolve Incident
    }
}

Global Event Handler

A global event handler can be built on top of Camunda. This process engine plugin can add default event behaviors to produce, transform, and publish process and task events throughout the process (BPMN) lifecycle within Camunda.

In the below architecture:

  • Clients (B2B or custom UI) call into Camunda through REST APIs to perform process or task level operations.
  • Custom Event plugin adds default behavior through a combination of parsers, listeners, historyEventHandlers, and IncidentHandlers to fire events as a process or task lifecycle state changes. 
  • These state change events are converted into a custom canonical format (XML or JSON) that acts as a contract between the producer (Camunda) and consumers (Reporting, Elasticsearch, and other clients).
  • These events can then be posted to any event streams like Kafka, MQ, Kinesis (shown below), and so on, using client libraries.
The AWS Kinesis event stream within the overall architecture
AWS Kinesis event stream
  • Consumers can subscribe to one or more such events as they are interested.
  • Elasticsearch streams these events for Custom UI to query for task- and process-level data.
  • Incident Handler helps to manage any failures or resolution of failed processes.
  • Reporting and analytics are performed on top of these events at both process and task levels.
The architecture of the process engine
The architecture of the digital automation platform

Editor’s note: Harish Malavade’s full Camunda Community Summit 2022 Expert Session and all of our expert-led presentations are available here.