Writing an Apache Kafka Connector for Zeebe (And A Working Prototype)

Bernd Rücker is a co-founder and developer advocate at Camunda.

With Zeebe.io we provide a horizontally scalable workflow engine completely open source. We face a lot of customer scenarios where Zeebe needs to be connected to Apache Kafka (or the Confluent Platform).

I’m currently working on an article about the various use cases here, ranging from process-based monitoring to stateful orchestration. I will link it here when it’s ready, and until then, my talk from Kafka Summit San Francisco might be helpful for you.

A relatively easy but clean way to integrate with Kafka is Kafka Connect. For a proof-of-concept, I implemented a prototypical connector:

An Apache Kafka connector for Zeebe.

Features:

  • Correlate messages from a Kafka topic with Zeebe workflows. This uses the Zeebe Message Correlation features. So, for example, if no matching workflow instance is found, the message is buffered for its time-to-live (TTL) and then discarded. You could simply ingest all messages from a Kafka topic and check if they correlate to a workflow instance in Zeebe.
  • Send messages from a workflow in Zeebe to a Kafka topic.

How to Use the Connector

You can find the source code here.

  • Build via mvn package
  • Put the resulting UBER jar into KAFKA_HOME/plugins
  • Run Kafka Connect using the Connector pointing to the property files listed below: connect-standalone connect-standalone.properties zeebe-sink.properties zeebe-source.properties

An example can be found in the flowing-retail sample application.

Sink (Kafka => Zeebe)

The sink will forward all records on a Kafka topic to Zeebe:

name=ZeebeSinkConnector
connector.class=...ZeebeSinkConnector

correlationJsonPath=$.orderId
messageNameJsonPath=$.eventType
zeebeBrokerAddress=localhost:26500

topics=flowing-retail

In a workflow model, you can wait for certain events by name (extracted from the payload by messageNameJsonPath):

Message catch event to correlate a message from Kafka with a workflow instance in Zeebe

Source (Zeebe => Kafka)

The source can send records to Kafka if a workflow instance flows through a certain activity:

name=ZeebeSourceConnector
connector.class=...ZeebeSourceConnector
zeebeBrokerAddress=localhost:26500

topics=flowing-retail

In a workflow, you can then add a Service Task with the task type “sendMessage” which will create a record on the Kafka topic configured:

A service task in the workflow to send a message to Kafka.

Understand the Source Code and Basic Design Decisions

The source code is available on GitHub.

Sink

Writing a sink is relatively straightforward once you’ve written all the boilerplate code to configure your connector.

One assumption for the prototype is that all messages contain plain JSON without a schema. The easiest handling is if we get that JSON as String, so the connector must be configured to use the StringConverter. For now, the connector cannot process anything else (like e.g. Avro messages) but this would be easy to extend.

The main idea is that you get all messages (“records” in Kafka speak), convert the payload to a JSON string, extract correlation information via JsonPath and hand over the message to Zeebe.

public void put(final Collection<SinkRecord> records) {
  for (SinkRecord record : records) {
      String payload = (String) record.value();
      DocumentContext jsonPathCtx = JsonPath.parse(payload);
      String correlationKey = jsonPathCtx.read(correlationKeyJsonPath);
      String messageName = jsonPathCtx.read(messageNameJsonPath);

      // message id it used for idempotency - messages with same ID will not be processed twice by Zeebe
      String messageId = record.kafkaPartition() + ":" + record.kafkaOffset();

      zeebe.workflowClient().newPublishMessageCommand() //
          .messageName(messageName) //
          .correlationKey(correlationKey) //
          .messageId(messageId) //
          .payload(payload) //
          .send().join();
  }
}

One important detail is the message id for Zeebe, which is constructed out of the Kafka Partition and Offset. This makes the id unique for every record in the system. Zeebe is capable of idempotent message sending.

This means that whenever you resend a message to Zeebe, it will not be processed again. This makes me very relaxed in the connector, as I do not have to bother with consistency: Kafka Connect assures at least once delivery (meaning, whenever there is an exception or crash during the put method, the record will be processed later on again) and Zeebe can deal with duplicate messages.

Source

Kafka Connect requires polling new data to ingest into Kafka. Now Zeebe itself is build on modern paradigms and provides a streaming API. It is not possible (yet) to poll for tasks.

So I opened a Zeebe subscription to collect all Zeebe jobs that needs to be done in a in-memory queue.

private ConcurrentLinkedQueue<JobEvent> collectedJobs = new ConcurrentLinkedQueue<>();

public void start(final Map<String, String> props) {
  zeebe = ZeebeClient.newClient();

  // subscribe to Zeebe to collect new messages to be sent
  subscription = zeebe.jobClient().newWorker() //
      .jobType("sendMessage") //
      .handler(new JobHandler() {
          public void handle(JobClient jobClient, JobEvent jobEvent) {
            collectedJobs.add(jobEvent);
          }
        }) //
      .name("KafkaConnector") //
      .timeout(Duration.ofSeconds(5)) //
      .open();
}

This queue is then worked on by Kafka Connect and the job is completed right after. Every job is locked for a short timeout (5 seconds at the moment). After Kafka Connect has created the record, it calls a commit method which then completes the task in Zeebe. If the job does not get processed because of a crash, it will be re-executed automatically, so we apply at-least-once semantics for the creation of records.

public void commitRecord(SourceRecord record) throws InterruptedException {
    Long jobKey = jobKeyForRecord.remove(record);
    zeebe.jobClient() //
      .newCompleteCommand(jobKey) //
      .send().join();
  }

The record gets a payload with a JSON string of the payload contained in the Zeebe process, which is configurable via Input Mappings of the workflow definition:

public List<SourceRecord> poll() {
  List<SourceRecord> records = new LinkedList<>();

  JobEvent jobEvent = null;
  while ((jobEvent = collectedJobs.poll()) != null) {
      for (String topic : kafkaTopics) {
        final SourceRecord record = new SourceRecord(null, null, topic,
            Schema.BYTES_SCHEMA, //
            jobEvent.getPayload().getBytes(Charset.forName("UTF-8")));
        records.add(record);
      }
  }
  return records;
}

That’s it. Have fun.

Status and Feedback

This connector is just a proof of concept and the code might serve as a starting point for your own project. We regularly discuss having a proper connector as part of the Zeebe project. If this would help you don’t hesitate to contact me and describe your use case at hand — this might help us to prioritize accordingly.


Bernd Ruecker is co-founder and developer advocate at Camunda. He is passionate about developer-friendly workflow automation technology. Follow him on Twitter or check out his homepage. As always, he loves getting your feedback. Send him an email!

Camunda Developer Community

Join Camunda’s global community of developers sharing code, advice, and meaningful experiences

  • Title slide that reads "Why Camunda 8"

    Why R-KOM Chose Camunda Platform 8

    In this blog series, we highlight the customers who have chosen to utilize Camunda Platform 8 and explore the challenges those companies are attempting to overcome using process orchestration. For the latest installment of Why Camunda 8, we spoke with R-KOM, a telecommunications company based in Regensburg, Germany. When R-KOM was founded in 1997, its shareholders pooled their telecommunications infrastructure, which had evolved over decades with utility networks for water, electricity, and gas. Initially, R-KOM’s services were limited to business and the public sector, but now it has developed further in line with demand. Over the years, the company’s high-performance infrastructure and a broad range of products have grown. Today, R-KOM has a number of city networks in Eastern Bavaria...

    Read more
  • Title slide that reads "Why Camunda 8"

    Why Gruner + Jahr Chose Camunda 8

    In this new blog series, we explore the reasons why customers are migrating to Camunda 8. For our first installment of Why Camunda 8, we talked to Gruner + Jahr, one of the largest premium magazine publishers in Europe. G+J includes such established (German) print and digital brands as STERN, GEO, BRIGITTE, ESSEN & TRINKEN, and SCHÖNER WOHNEN, as well as younger brands such as CHEFKOCH, BARBARA, BEEF, 11FREUNDE. In addition to the numerous print and digital media offerings, G+J offers products and licenses such as the SCHÖNER WOHNEN collection. The digital business contributes more than half of revenues and is exhibiting continued strong growth. Indeed, the company’s digital products lead the rankings in all publishing segments, from news through...

    Read more
  • Camunda Platform 8.1.0-alpha3 Released

    We’re excited to announce the release of Camunda Platform 8.1.0-alpha3. If you’d like to get started with Camunda Platform 8.1.0-alpha3 right away, you can sign up for a free trial now. Create Process Instance Starting at User-Defined Elements An often requested feature is now available as a preview with Camunda Platform 8.1.0-alpha3: create a process instance starting at user-defined elements. When creating a process instance through the CreateProcessInstance RPC, the process instance is started at the default none start event. For testing purposes, you may want to start at one (or multiple) of the other elements. This feature is now available through both the CreateProcessInstance RPC and the CreateProcessInstanceWithResult RPC. It is available for use in the Zeebe Java client...

    Read more

Ready to get started?

Still have questions?