Implementing the Outbox Pattern with CDC using Debezium


Take your skills to the next level!

The Persistence Hub is the place to be for every Java developer. It gives you access to all my premium video courses, monthly Java Persistence News, monthly coding problems, and regular expert sessions.


A microservice architecture makes the implementation of a single service easier but introduces new challenges as soon as you need to share data between services. Reliably sharing data and keeping your services independent of each other isn’t as easy as most developers expect. But using the right patterns, it’s also not an impossible task. Using the outbox pattern and a Change Data Capture (CDC) implementation provided by Debezium, this gets actually quite simple.

In one of my previous articles, I already explained the outbox pattern in great details and showed you how to fill the outbox table with Hibernate. So, in this article, I will only provide a quick introduction to the pattern. And after that, I will show you how to use Debezium and CDC to get the data from the outbox table and send it to Apache Kafka.

Requirements when sharing data between services

But before we dive into the implementation details, let’s quickly summarize why updating multiple services is gets so complicated. Why can’t we update all databases, or at least persist an update in our database and send a message to Apache Kafka?

The main answer to all these questions is: Because we want to be able to develop, deploy, and scale our services independently!

You can’t do that if one service has to know the internal database structure of multiple other services. That introduces strong dependencies between these services and makes it extremely hard to implement and deploy any changes.

And you would need to use global transactions to avoid dual writes and to ensure data consistency when writing to multiple systems, e.g., your database and Apache Kafka. But global transactions are complex and negatively affect performance and scalability of your system.

Friends don’t let friends do dual writes!

Gunnar Morling

So, you need a pattern that enables you to use a local transaction to update your database and to trigger a message that gets sent to Apache Kafka. That might sound impossible, but it’s possible if you use the outbox pattern to split the communication into 2 parts and accept an eventually consistent approach.

The Outbox Pattern

I already explained the outbox pattern in great details on this blog, so let’s keep this section short. The general idea is simple:

Similar to an outbox that was used in paper-based offices, you add an outbox table to the database of your microservice. The service writes all the messages it wants to send to the message broker into this table. A message relay service then collects these messages and sends them to the message broker, e.g., Apache Kafka.

A typical outbox table looks like this:

Outbox table

The type, aggregatetype, and aggregateid columns provide metadata information about the event or message. These are useful to process your event within Apache Kafka or to allow event consumers to filter the event they want to handle.

The payload column contains the information that shall be sent to the broker. This can be specific information for an event, or it can be the current state of the aggregate on which the event occurred. I prefer to send the current state of the aggregate. This doesn’t share any internal details of the microservice that created the event and the consuming services get all the information they might need.

Here you can see an overview diagram of the complete system:

Microservice architecture

OK, I already explained the left side of the graphic in a previous post. Let’s focus on the message relay service. It gets the message from the outbox table and sends it to the message broker.

You could, of course, implement this service yourself. The easiest implementation polls the outbox table in a defined interval, creates an event for each new record and publishes it to a topic in Apache Kafka.

But Debezium provides a much better way to set up your message relay service. It doesn’t require you to write any code, and it doesn’t need to poll the outbox table.

Setting up the message relay service using Debezium

Debezium provides an implementation of the change data capture (CDC) pattern. It’s built on top of Apache Kafka and provides Kafka connectors that monitor your database and pick up any changes. The approach and implementation of these connectors depend on the database. You can find out more about it in the Debezium FAQ.

Setting up Debezium

In this example, I want to use a PostgreSQL database. The Debezium connector takes a snapshot of the database when you start it for the first time. After that is done, it automatically streams all changes to an Apache Kafka instance. You can do that for all the tables in your database, and the changes in each table get streamed to their own topic. To be able to monitor the changes in your PostgreSQL database, you need to install a decoder plugin on your PostgreSQL server.

Or, if you just want to give it a try, you can use the docker example images provided by the Debezium team. That’s what I’m using for this article. The following docker-compose.yaml file provides the required configuration to start docker containers for a Zookeeper, an Apache Kafka, a PostgreSQL database, and Kafka Connect instance.

version: '2'
services:
  zookeeper:
    container_name: zookeeper
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    container_name: kafka
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  postgres:
    container_name: postgres
    image: debezium/example-postgres:${DEBEZIUM_VERSION}
    ports:
     - 5432:5432
    environment:
     - POSTGRES_USER=postgres
     - POSTGRES_PASSWORD=postgres
  connect:
    container_name: connect
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - postgres
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

Based on this configuration, you can start all 4 required containers by executing the following command on your command line:

docker-compose up -d

Setting up the Outbox Event Router

After starting the docker containers, you can configure the connector with an optional outbox event router.

The connector connects your database with Apache Kafka. It’s responsible for getting the changes from your database and publishing an event for each of them to Apache Kafka. You can add a new connector by sending a POST request to connectors endpoint of your Kafka Connect instance. If you want to implement your own event transformation and routing, this is the only part you need.

But I highly recommend taking a look at Debezium’s outbox event router. It provides a set of configurable transformations and event routing features that enable you to implement the most common outbox use cases. The only thing you need to do to use the outbox event router is to include it in your connector configuration.

So, let’s take a look at a simple connector and outbox event router configuration that connects as the user postgres to the bookstore database on localhost:5432.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "bookstore-outbox-connector", 
"config": {
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "database.hostname": "postgres",
  "database.port": "5432",
  "database.user": "postgres",
  "database.password": "postgres",
  "database.dbname" : "bookstore",
  "database.server.name": "localhost",
  "tombstones.on.delete" : "false",
  "table.whitelist" : "store.outboxevent",
  "transforms" : "outbox",
  "transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter"}
}'

By default, the PostgreSQL connector would monitor all database tables, create a topic in Apache Kafka for each of them and publishes an event for each database record that got changed.

That’s obviously not what you want to do if you’re implementing the outbox pattern. One of the main ideas of this pattern is that you only expose 1 table as part of your API. In this example, that table is called outboxevent, and it’s part of the store schema. By setting the table.whitelist attribute to store.outboxevent, you can tell the connector to only monitor that table.

As promised, this configuration also configures a transformation step. The transformer is called outbox, and it uses the io.debezium.transforms.outbox.EventRouter class. It transforms a message that looks like this:

{
  "schema":
    {
      "type": "struct",
      "fields":
        [
          {
            "type": "struct",
            "fields":
              [
                {
                  "type": "string",
                  "optional": false,
                  "name": "io.debezium.data.Uuid",
                  "version": 1,
                  "field": "id",
                },
                {
                  "type": "string",
                  "optional": false,
                  "field": "aggregatetype",
                },
                { "type": "string", "optional": false, "field": "aggregateid" },
                { "type": "string", "optional": false, "field": "type" },
                { "type": "string", "optional": false, "field": "payload" },
              ],
            "optional": true,
            "name": "localhost.store.outboxevent.Value",
            "field": "before",
          },
          {
            "type": "struct",
            "fields":
              [
                {
                  "type": "string",
                  "optional": false,
                  "name": "io.debezium.data.Uuid",
                  "version": 1,
                  "field": "id",
                },
                {
                  "type": "string",
                  "optional": false,
                  "field": "aggregatetype",
                },
                { "type": "string", "optional": false, "field": "aggregateid" },
                { "type": "string", "optional": false, "field": "type" },
                { "type": "string", "optional": false, "field": "payload" },
              ],
            "optional": true,
            "name": "localhost.store.outboxevent.Value",
            "field": "after",
          },
          {
            "type": "struct",
            "fields":
              [
                { "type": "string", "optional": true, "field": "version" },
                { "type": "string", "optional": true, "field": "connector" },
                { "type": "string", "optional": false, "field": "name" },
                { "type": "string", "optional": false, "field": "db" },
                { "type": "int64", "optional": true, "field": "ts_usec" },
                { "type": "int64", "optional": true, "field": "txId" },
                { "type": "int64", "optional": true, "field": "lsn" },
                { "type": "string", "optional": true, "field": "schema" },
                { "type": "string", "optional": true, "field": "table" },
                {
                  "type": "boolean",
                  "optional": true,
                  "default": false,
                  "field": "snapshot",
                },
                {
                  "type": "boolean",
                  "optional": true,
                  "field": "last_snapshot_record",
                },
                { "type": "int64", "optional": true, "field": "xmin" },
              ],
            "optional": false,
            "name": "io.debezium.connector.postgresql.Source",
            "field": "source",
          },
          { "type": "string", "optional": false, "field": "op" },
          { "type": "int64", "optional": true, "field": "ts_ms" },
        ],
      "optional": false,
      "name": "localhost.store.outboxevent.Envelope",
    },
  "payload":
    {
      "before": null,
      "after":
        {
          "id": "49fcc56a-326d-4e63-acdc-6bb6761b0c7e",
          "aggregatetype": "Book",
          "aggregateid": "1",
          "type": "CREATE",
          "payload": '{"id": 1, "title": "Hibernate Tips - More than 70 solutions to common Hibernate problems", "chapters": [{"id": 2, "content": "How to map natural IDs"}, {"id": 3, "content": "How to map a bidirectional one-to-one association"}]}',
        },
      "source":
        {
          "version": "0.9.5.Final",
          "connector": "postgresql",
          "name": "localhost",
          "db": "bookstore",
          "ts_usec": 1567054021909000,
          "txId": 579,
          "lsn": 24053160,
          "schema": "store",
          "table": "outboxevent",
          "snapshot": true,
          "last_snapshot_record": false,
          "xmin": null,
        },
      "op": "r",
      "ts_ms": 1567054021910,
    },
}

into this:

{"schema":{"type":"string","optional":false},"payload":"1"}

{
  "schema":
    {
      "type": "struct",
      "fields":
        [
          { "type": "string", "optional": false, "field": "payload" },
          { "type": "string", "optional": false, "field": "eventType" },
        ],
      "optional": false,
    },
  "payload":
    {
      "payload": '{"id": 1, "title": "Hibernate Tips - More than 70 solutions to common Hibernate problems", "chapters": [{"id": 2, "content": "How to map natural IDs"}, {"id": 3, "content": "How to map a bidirectional one-to-one association"}]}',
      "eventType": "CREATE",
    },
}

As you can see, the transformation drastically simplified the structure of the event. It removed the information about the before state, which is always empty because we’re adding a new record.

The transformation also removed the metainformation from the message. The aggregateid becomes the key of the event and part of the event header. The aggregatetype gets used to identify the topic in Apache Kafka to which the event gets published. By default, each aggregatetype gets its own topic. The default name of the topic starts with outbox.event​. followed by the aggregatetype.

Customizing the Event Routing

You can customize the default behavior by using a set of configuration parameters when creating the connector. There are way too many configuration parameters to show all of them in this article. So, let’s take a look at an example configuration that publishes all events to a topic called bookstore.events and includes the id of the aggregate in the event. If you want to dive deeper into the configuration options, please take a look at the Debezium documentation.

You can do that by providing the name of the topic as the route.topic.replacement configuration parameter and by defining the additional field in the table.fields.additional.placement parameter. When you specify the handling of an additional field, you need to provide 3 Strings separated by a “:”. The first one is the name of the table column, the second one specifies if the field shall be part of the envelope or header of the event and the third one defines the alias that’s used in the event.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "bookstore-outbox-connector", 
"config": {
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "database.hostname": "postgres",
  "database.port": "5432",
  "database.user": "postgres",
  "database.password": "postgres",
  "database.dbname" : "bookstore",
  "database.server.name": "localhost",
  "tombstones.on.delete" : "false",
  "table.whitelist" : "store.outboxevent",
  "transforms" : "outbox",
  "transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.route.topic.replacement" : "bookstore.events",
  "transforms.outbox.table.fields.additional.placement" : "aggregateid:envelope:id"}
}'

When you use this connector, you will see that Debezium now publishes the events to the bookstore.events topic and that the event contains the additional id field.

{
  "schema":
    {
      "type": "struct",
      "fields":
        [
          { "type": "string", "optional": false, "field": "payload" },
          { "type": "string", "optional": false, "field": "eventType" },
          { "type": "string", "optional": false, "field": "id" },
        ],
      "optional": false,
    },
  "payload":
    {
      "payload": '{"id": 16, "title": "Hibernate Tips - More than 70 solutions to common Hibernate problems", "chapters": [{"id": 17, "content": "How to map natural IDs"}, {"id": 18, "content": "How to map a bidirectional one-to-one association"}]}',
      "eventType": "CREATE",
      "id": "16",
    },
}

As you can see, Debeziums outbox event router provides a pretty flexible solution to publishing your outbox messages as events to Apache Kafka. The only thing you now need to do is to connect your consumers to the topics and react to the events. But that’s a topic for another article.

Conclusion

We have used the outbox pattern to update the database and publish events in Apache Kafka. As you have seen, Debezium provides you a set of connectors and transformers that you just need to configure to publish an event whenever you write a new record to the outbox table. That makes the implementation of the outbox pattern pretty simple.

But this pattern also requires a lot of additional infrastructures. In a highly available production environment, you need to run and manage multiple instances of Zookeeper, Apache Kafka, and Kafka Connect. That makes the architecture of your system way more complex than it would have been in for a monolithic application.

4 Comments

  1. Avatar photo Mitchell Pronschinske says:

    Hi Thorben. Great article!
    Do you think Debezium will add support for HashiCorp Consul in place of Zookeeper in the future? It’s really surpassed Zookeeper over the past few years in my opinion.

    1. Avatar photo Thorben Janssen says:

      I don’t know. But I forwarded the question to Gunnar Morling. He’s the project lead for Debezium. Let’s see what he says …

    2. Debezium itself doesn’t depend on ZooKeeper, but Apache Kafka does. The Kafka community has plans to replace ZK, but I don’t think it’d be Consul, but rather a custom implementation. Note you can use Debezium also with other messaging brokers such as Apache Pulsar and via its embedded engine (where you use it as JAR in your own application) with any other kind of broker.

      1. Avatar photo Thorben Janssen says:

        Thanks for clarifying, Gunnar!

Comments are closed.