A technical sneak peek into inbound Connectors alongside Camunda 8.1

When Camunda Platform 8 launched earlier this year, we introduced Connectors. Now, we’ve published the source code for our out-of-the-box Connectors with the latest 8.1 release. This opens the door to run those Connectors in Self-Managed environments, and also allows users to dive into their source code while building their own Connectors. 

However, the existing out-of-the-box Connectors are outbound Connectors. As discussed in my first technical sneak peek into Connectors, outbound Connectors are helpful if something needs to happen in the third-party system as soon as a process reaches a service task. For example, calling a REST endpoint or publishing a message to Slack.

Therefore, there is also a need for inbound Connectors. With inbound Connectors, something needs to happen within the workflow engine because of an external event in the third-party system. For example, a published Slack message or a called REST endpoint starts a new process instance. There are three different types of inbound Connectors:

  1. Webhook: An HTTP endpoint is made available to the outside, which when called, can start a process instance, for example.
  2. Subscription: A subscription is opened on the third-party system, like messaging or Apache Kafka, and new entries are then received and correlated to a waiting process instance in Camunda, for example.
  3. Polling: Some external API needs to be regularly queried for new entries, such as a drop folder on Google Drive or FTP.

To implement inbound Connectors, we first had to create a bit of infrastructure in Camunda Platform 8. This occurred with the latest 8.1 release by adding generic extension points to BPMN.

Generic properties in BPMN

Let’s briefly explore what a generic property is, and how we can leverage it to build inbound Connectors. Interestingly enough, this feature also allows our users to build their own extensions to Camunda Platform 8.

The end-to-end story looks like this:

  1. You can add custom properties (basically key-value pairs) to any BPMN symbol.
  2. Those properties are passed on by the workflow engine, even if it does not use it by itself.
  3. The properties can be read by third-party components to do whatever they want to do with it.

For inbound Connectors, this allows one to define and store properties, e.g. on the start event of a process model, as shown in the following BPMN XML file:

<bpmn:startEvent id="StartEvent" name="Order received">
  <bpmn:extensionElements>
    <zeebe:properties>
      <zeebe:property name="inbound.type" value="webhook" />
      <zeebe:property name="inbound.context" value="MY_CONTEXT" />
      ...
    </zeebe:properties>
  </bpmn:extensionElements>
  ...

Those properties do not need to be edited on an XML level, but you can leverage element templates in Camunda Modeler to provide an easy-to-use modeling experience for your properties:

This metadata can be read from the BPMN process models later to recognize when a new process with Connector properties is deployed. Then, you could open a new webhook, for example. 

Currently, we poll for new process definitions using the public Operate API. To improve efficiency in obtaining new process definitions, there is a public API on the roadmap to properly notify of Zeebe events, like a process deployment. However, this existing design does provide the most flexibility in running Connectors.

For example, you can run the Connector runtime next to your own Kafka, Vault, or whatever system you don’t want to expose to the outside world. As soon as a more efficient public API for deployment events becomes available in Camunda Platform, we will replace the polling mechanism under the hood, without the need to adjust the Connector architecture itself.

Using this basis, an inbound Connector runtime can start the required inbound Connector for process definitions. In the example above, the runtime would provide a new endpoint under a specific URL (e.g. http://cluster-address/inbound/MY_CONTEXT). Whenever there is a call to it, a process instance will kick off, taking the various other configuration parameters into account. 

Example inbound Connector: Github webhook

Let’s get to something more specific: starting a process instance if something happens on GitHub. As you can see in the following screenshot, you will need to set a couple of properties in your Camunda process model on the start event. For example, you need to define a path to be used to create a URL endpoint, provide a secret (normally to be looked up from the secret store used with Connectors), variable mapping, and so forth.

Next, register this webhook within GitHub.

Pretty straightforward, isn’t it? 

In the code, the magic to make it happen is basically contained in two pieces. First, the Connector runtime needs to query process definitions and scan for Connector properties on them, which looks roughly like this:

@Scheduled(fixedDelay = 5000)
public void scheduleImport() throws OperateException {
  List<ProcessDefinition> processDefinitions = camundaOperateClient
    .searchProcessDefinitions();
  for (ProcessDefinition processDefinition: processDefinitions) {
    if (!registry.processDefinitionChecked(processDefinition.getKey())) {
      processBpmnXml(
        processDefinition,
        camundaOperateClient.getProcessDefinitionXml(processDefinition.getKey()));
      registry.markProcessDefinitionChecked(processDefinition.getKey());
    }
  }
}

private void processBpmnXml(ProcessDefinition processDefinition, String resource) {
  final BpmnModelInstance bpmnModelInstance = Bpmn.readModelFromStream(
    new ByteArrayInputStream(resource.getBytes()));
  bpmnModelInstance.getDefinitions()
    .getChildElementsByType(Process.class)
    .stream().flatMap(
       process -> process.getChildElementsByType(StartEvent.class).stream()
    )
    .map(startEvent -> startEvent.getSingleExtensionElement(ZeebeProperties.class))
    .filter(Objects::nonNull)
    .forEach(zeebeProperties -> processZeebeProperties(processDefinition, zeebeProperties));
  // TODO: Also process intermediate catching message events and Receive Tasks
}

private void processZeebeProperties(ProcessDefinition processDefinition, ZeebeProperties zeebeProperties) {
  InboundConnectorProperties properties = new InboundConnectorProperties(
    processDefinition.getBpmnProcessId(),
    processDefinition.getVersion().intValue(),
    processDefinition.getKey(),
    zeebeProperties.getProperties().stream()
      .collect(Collectors.toMap(ZeebeProperty::getName, ZeebeProperty::getValue)));

  if (InboundConnectorProperties.TYPE_WEBHOOK.equals(properties.getType())) {
      registry.registerWebhookConnector(properties);
  } 
  // ...

Now, another part of the runtime can provide an endpoint, and if called, check if there is a Connector registered for the called endpoint. If so, the configuration is used to start a new process instance:

// Part of the runtime that provides webhook endpoints
@PostMapping("/inbound/{context}")
public ResponseEntity<ProcessInstanceEvent> inbound(
    @PathVariable String context,
    @RequestBody Map<String, Object> body,
    @RequestHeader Map<String, String> headers) {

  if (!registry.containsContextPath(context)) {
    throw new ResponseStatusException(HttpStatus.NOT_FOUND, "No webhook found for context: " + context);
  }
  WebhookConnectorProperties connectorProperties = registry.getWebhookConnectorByContextPath(context);
  boolean valid = validateSecret(connectorProperties, webhookContext);
  if (!valid) {
    return ResponseEntity.status(400).build();
  }
  Map<String, Object> variables = extractVariables(connectorProperties, webhookContext);
  ProcessInstanceEvent processInstanceEvent = zeebeClient
    .newCreateInstanceCommand()
    .bpmnProcessId(connectorProperties.bpmnProcessId())
    .version(connectorProperties.version())
    .variables(variables)
    .send()
    .join(); // TODO: Switch to rective HTTP client
  return ResponseEntity.status(HttpStatus.CREATED).body(processInstanceEvent);
}

Of course, the code above is a bit simplified and taken from the first increment of the code (read: not yet using all coding best practices :-)), but it should give you an idea of how an inbound webhook Connector will generally work.

You can see a quick walkthrough of this Connector in action here: 

What’s next?

Important infrastructure to build extensions landed in Camunda 8.1, but we will further improve this infrastructure in the upcoming release. On this basis, we are currently building inbound Connectors, specifically REST webhooks end to end. As part of this effort, we will add bits and pieces to the Connector SDK that allows everybody, including you, to build your own inbound Connectors. At the same time, we also plan to prototype subscription Connectors, so stay tuned!

Want to get started with the new Camunda connectors today? Be sure to check out our Connector SDK to learn more, and watch that space for updates. If you’re new to Camunda, you can always sign up for a free SaaS account here.

Ready to get started?

Still have questions?