Introduction Change Data Capture (CDC) is a technique that identifies and records all changes (inserts, updates, and deletes) made to a data source, such as a relational database, and transmits these changes to other systems, enabling real-time synchronization. This approach is fundamental to many modern data architectures that require up-to-date and consistent information across disparate systems.
Benefits CDC brings a series of benefits that go beyond simple data movement. The main benefits can be seen in the list below.
Reduced load on the source system by capturing only changes and avoiding massive data readings, resulting in lower infrastructure costs.
Real-time data replication, eliminating the latency that typically exists in traditional ETL processes.
Ability to track changes for auditing, backup, and reprocessing on other platforms.
Continuous updates to data lakes and data warehouses, feeding analytical systems with current data.
Main techniques CDC implementation can be carried out in different ways, each with specific advantages and limitations. Knowing these alternatives helps you select the most appropriate solution for each organization's context. The most common techniques are presented below.
Reading transaction logs: captures changes directly from transaction logs, a method used by tools such as Debezium.
Timestamp columns: Use of a column that records the last modification in each row. Ideal for systems that do not have detailed transactional logs.
Triggers: use of triggers to record changes in tables, although this may impact performance.
Snapshot Differencing: Compares snapshots taken periodically. It's a heavier, less precise approach.
Binlog Techniques (Transaction Log) The binlog, or transaction log, is a file that records all operations performed on the database. Databases like MySQL (binlog) and PostgreSQL (WAL) have transactional logs that can be monitored in real time to capture changes. This approach is efficient and accurate because it captures every change made to the data, but it requires access to the logs and read permissions. The downside is that not all banks have such detailed transaction logs, and configuration may require infrastructure adjustments.
Timestamp Column Consists of creating a column that records the last change made to each row in a table. By querying only the rows changed since the last run, the system captures incremental changes. This technique is simple and applicable to any database, but it does not capture data deletions and requires that all monitored tables include this column.
Triggers Triggers are functions defined in the database that perform actions when a specific event occurs (such as an insert, update, or delete). With CDC, triggers can be configured to record these changes in a history table. While efficient, this method is often avoided due to its performance impact and maintenance complexity.
Diff between Snapshots Consists of creating periodic snapshots of the data and comparing them to identify changes. It is a laborious method with a greater impact on the system, as it requires intensive processing, and is only used when other CDC approaches are not available.
Tools Below are some of the main Change Data Capture tools on the market.
Debezium: Open-source tool that uses transaction logs to capture changes in real time. It supports databases like MySQL, PostgreSQL, MongoDB, and others, and is integrated with Kafka, which facilitates data transmission in distributed data pipelines.
Oracle GoldenGate: Oracle's proprietary tool for real-time data replication and integration, compatible with multiple database systems.
StreamSets: Integration platform that includes components for CDC in various databases and data sources.
Qlik Replicate (Attunity): Robust CDC solution used for replication and data synchronization between databases.
Amazon DMS and Azure Data Factory: CDC solutions offered on their respective cloud platforms for replication and data synchronization between services.
Airbyte: Airbyte is an open-source data integration platform that supports CDC through various connectors, including transaction log monitoring for databases such as MySQL and PostgreSQL. It's a flexible and scalable option, with a user-friendly interface that makes it easy to create data pipelines. Airbyte has a wide library of connectors and stands out for its active community and customization possibilities.
Striim: Striim is a real-time data integration platform that combines CDC with streaming analytics and event processing. It supports multiple data sources, such as relational databases, and can stream data to multiple platforms, including clouds like AWS, Azure, and Google Cloud. Its robust architecture enables large-scale data replication and real-time processing, making it popular for use cases that require instant insights from data.
While many tools exist, we'll focus in this article on Debezium, a popular open-source option built on top of Kafka Connect, to illustrate a practical implementation.
Kafka Connector Kafka Connector is a core feature of Kafka Connect, an Apache Kafka Platform tool designed to simplify and automate integration between Kafka and other systems, such as databases, data warehouses, cloud storage systems, and more. Kafka Connect serves as a distributed, scalable integration layer that uses specialized connectors to reliably transfer data between systems.
How Kafka Connect works Kafka Connect uses connectors, which are specialized plugins for each type of system or data source, divided into two categories: Source Connectors: Capture data from an external source and publish it to Kafka topics.
Sink Connectors: Consume data from Kafka topics and write it to a target system.
Connectors are configured using JSON configuration files or REST APIs, where you can define the data source or destination, authentication information, replication settings, and optional transformations to conform the data to the desired format.
Kafka Connect Architecture The Kafka Connect architecture is composed of workers, which are instances running connectors in a distributed manner. This allows Kafka Connect to: Scale horizontally as data load increases.
Be fault-tolerant: if one worker fails, another can take over.
Centralize connector configuration and monitoring.
Data is processed in blocks called tasks, which distribute the workload among workers and enable parallel processing.
"Not everything is roses" (limitations) Limited options There are less than 20 different types of connectors in Kafka Connect. If a generic connector is not suitable, you must develop a custom one. Similarly, Kafka Connect only provides a basic set of transformations, and you may have to write your own custom transformations as well. This can increase the time and effort required for implementation.
Configuration Complexity Kafka Connect configurations quickly become complex, especially when dealing with multiple connectors, tasks, and transformations. This can make it difficult to manage, maintain, and troubleshoot the system. It's also challenging to integrate Kafka Connect with other tools your organization may already be using. This creates a barrier to adoption and slows down the implementation process.
Limited error handling support Kafka's primary focus is streaming data, which results in limited built-in error handling capabilities. The distributed nature and potential interdependencies between various Kafka components increase complexity, making it difficult to discover the root cause of an error. You may have to implement custom error handling mechanisms, which can be time-consuming and not always as reliable as the built-in capabilities provided by other data integration and ETL tools.
Recovering gracefully from complex errors is also challenging in Kafka, as there may not be a clear path to resume data processing after an error occurs. This can lead to inefficiencies and require more manual intervention to restore normal operations.
Performance Issues Most applications require high-speed data pipelines for real-time use cases. However, depending on the connector type and data volume, Kafka Connect may introduce some latency into your system. This may not be suitable if your application cannot tolerate delays.
"But there is a solution" (best practices) Even with these limitations, it would be incorrect to say that Kafka Connect is not a good choice; it has numerous benefits that far outweigh its limitations.
Kafka Connect is a powerful tool for building scalable data pipelines, but ensuring successful implementation and operational adoption requires following best practices. Here are some tips and advice to avoid mistakes and achieve success with Kafka Connect.
Plan your pipeline Before you begin the implementation process, determine the sources and destinations of your data and ensure your pipeline is scalable and can handle increasing data volumes. Also, make sure you provision the infrastructure properly. Kafka Connect requires a scalable and reliable infrastructure for optimal performance. Use a cluster-based architecture with sufficient resources to handle the data pipeline load and ensure your infrastructure is highly available.
Use a Schema Registry Using a Schema Registry in Kafka Connect can be beneficial because it allows centralized storage and management of schemas. This helps maintain schema consistency across systems, reduces the likelihood of data corruption, and simplifies schema evolution.
Configure for Performance It's important to design and configure your Kafka Connect for future scale and performance. For example, you can implement a suitable partitioning strategy to distribute your data evenly across partitions. This will help your Kafka Connect cluster manage high volume of data effectively. Likewise, you can use an efficient serialization format like Avro or Protobuf to transfer it faster over the network.
Monitoring Monitoring is essential to ensure the smooth operation of your data pipeline. Use monitoring tools to track the performance of your Kafka Connect deployment and quickly identify and resolve any issues.
Kafka Connect monitoring options: Prometheus + Grafana Confluent Control Center Lenses.io Elastic Stack (ELK) Metrics Kafka Connect API Dead Letter Topic (DLT) You can also consider implementing a Dead Letter Topic (DLT) as a safety net to catch and handle any messages that repeatedly fail during retries, ensuring they are not lost, and on top of that build a reprocessing strategy for all DLT messages.
Debezium Debezium is an open-source CDC platform built on top of Kafka Connect. It reads directly from database transaction logs, capturing changes in real time and publishing these events to Kafka topics. This enables the integration of this data with multiple consumers and facilitates the development of real-time data pipelines.
Architecture Debezium runs as a Kafka Connect connector and uses a specific connector for each database type. When configured, Debezium reads transaction logs and streams changes to topics in Kafka, where consumers can access these events. This process allows Debezium to be highly scalable and easy to integrate with other applications and systems.
Benefits of Debezium: Open-source and flexible: being free and extensible.
Scalable with Kafka: ideal for distributed and high-demand systems.
Consistency and reliability: captures changes without intervening in the data structure.
Debezium Transformations (SMTs) Single Message Transformations (SMTs) in Debezium are transformations that can be applied to individual messages during Change Data Capture (CDC) processing. These transformations occur after reading data from the source and before writing it to the kafka or target system.
Transformation Types Routing Transformations Allow you to modify how messages are routed within the system.
RegexRouter: Modifies the target topic using regular expressions ByteBufferConverter: Converts fields between different byte formats TopicRouter: Redirects messages to specific topics based on conditions Content Transformations Modify the content of messages.
ExtractField: Extracts a specific field from the message InsertField: Adds static or dynamic fields ReplaceField: Replaces values of existing fields MaskField: Masks sensitive data DropFields: Removes specific fields Value Transformations Change values within messages.
ValueToKey: Moves a value to the message key ValueToTimestamp: Converts a field to a timestamp Cast: Converts data types TimestampConverter: Converts timestamp formats Structural Transformations Modify the structure of messages.
Flatten: Converts nested structures to flat structure HoistField: Raises nested fields to the top level WrapField: Encapsulates fields in a new structure Common Use Cases Sensitive Data Masking { "transforms": "maskFields", "transforms.maskFields.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.maskFields.fields": "creditCard,ssn", "transforms.maskFields.replacement": "****" } 2. Field Renaming { "transforms": "RenameField", "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.RenameField.renames": "oldName:newName" } 3. Metadata Addition { "transforms": "insertSourceDetails", "transforms.insertSourceDetails.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertSourceDetails.static.field": "source_database", "transforms.insertSourceDetails.static.value": "production_db" } Performance Considerations Execution Order: Transformations are executed in the order they are listed Performance Impact: Each transformation adds overhead to processing Complexity: Complex transformations can affect latency Best Practices Minimize Transformations: Use only the necessary transformations Efficient Order: Place more selective transformations first Monitoring: Monitor the impact of transformations on performance Testing: Validate transformations in a development environment Documentation: Maintain clear documentation of the applied transformations Limitations Some transformations may not preserve original data types Complex transformations may affect message order guarantee Not all transformations are Compatible with each other. Some transformations may not work with all serialization formats. Usage Recommendations 1. Requirements Analysis: Clearly identify the transformation needs. Assess the impact on the system as a whole. Implementation: Start with simple transformations. Test each transformation individually. Document all applied transformations. 3. Maintenance: Monitor performance regularly. Periodically review the need for each transformation. Keep transformations updated with new versions of Debezium. Use Cases: CDC can be applied in a variety of scenarios to optimize real-time data integration and replication, some of which are referred to below.
Outbox Pattern (monitors and forwards events to a Kafka (e.g.), ensuring that messages are published, even with failures, and decoupling the logic from message delivery.) Data Mirroring (monolith -> microservice migration) Data Replication (Integration / microservices -> microservices) Data Auditing Real-time Analytics (real-time dashboards and reports) Data Caching (Caching certain Postgres data in Redis, for example) Data for Query (making data available in a search engine like ElasticSearch) CQRS (making Command data available in Query data sources) etc...
Practical Use of Debezium Infrastructure Preparation Ensure that Docker and Docker Compose are installed and running, as we will need to upload Kafka, Zookeeper, Debezium Connector and the Database.
Creating Docker Compose Create a docker-compose.yml file with the following content: version: '3.8' services: postgres: image: debezium/postgres:16 ports: - "5433:5432" environment: - POSTGRES_DB=inventory - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres command: postgres -c wal_level=logical networks: debezium-network: aliases: - postgres zookeeper: image: confluentinc/cp-zookeeper:7.5.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 networks: debezium-network: aliases: - zookeeper kafka: image: confluentinc/cp-kafka:7.5.0 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9093,EXTERNAL://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_LISTENERS: INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 networks: debezium-network: aliases: - kafka kafka-connect: image: debezium/connect:2.5 ports: - "8083:8083" depends_on: - kafka - postgres environment: BOOTSTRAP_SERVERS: kafka:9093 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: connect_configs OFFSET_STORAGE_TOPIC: connect_offsets STATUS_STORAGE_TOPIC: connect_statuses KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect networks: debezium-network: aliases: - kafka-connect networks: debezium-network: driver: bridge Running the services In the folder where docker-compose.yml was created, run (in the terminal): # depending on how your docker was installed, # you will need to use 'sudo' before the docker compose up command Debezium Connector Configuration Send a JSON configuration to the Kafka Connect REST endpoint to configure the Debezium connector: # 1. First, check if Debezium Connect is running: curl -X GET http://localhost:8083/connectors # 2. Creating the connector # 2.1. Main command to create the connector (WITHOUT transformation / start with this): curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "inventory-clientes-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5433", "database.user": "postgres", "database.password": "postgres", "database.dbname": "inventory", "database.server.name": "dbserver1", "table.include.list": "public.clientes", "topic.prefix": "inventory", "schema.include.list": "public", "decimal.handling.mode": "precise", "plugin.name": "pgoutput" } }' # 2.2. Main command to create the connector (COM transformation): curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "inventory-clientes-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5433", "database.user": "postgres", "database.password": "postgres", "database.dbname": "inventory", "database.server.name": "dbserver1", "table.include.list": "public.clientes", "topic.prefix": "inventory", "schema.include.list": "public", "decimal.handling.mode": "precise", "plugin.name": "pgoutput", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "rewrite" } }' # Example of a configuration of a Sink connector for Redis curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "redis-sink", "config": { "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "redis.hosts": "redis:6379", "topics": "inventory.public.clientes", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true" } }' # 3. To check the connector status: curl -X GET http://localhost:8083/connectors/inventory-clientes-connector/status Parameter Explanation: name: Name of the connector, used for management (can be customized).
connector.class: Specifies the Debezium connector for PostgreSQL (io.debezium.connector.postgresql.PostgresConnector).
tasks.max: Maximum number of parallel tasks for the connector (may increase depending on load).
database.hostname: PostgreSQL server address.
database.port: Port on which PostgreSQL is running (default is 5432).
database.user and database.password: Username and password to access the database (the user needs permissions to read the transaction logs).
database.dbname: Name of the database to be monitored.
database.server.name: Logical name of the server; will be used as a prefix for Kafka topics created for the tables.
slot.name: Name of the replication slot. This changes data in real time using logical replication.
plugin.name: Defines the logical replication plugin. For PostgreSQL 10+ and Debezium, pgoutput is recommended.
table.include.list: List of tables to monitor (in schema.table format). If you want to monitor multiple tables, you can separate the names with commas.
publication.name: Name of the logical replication publication that captures changes to the specified table. database.history.kafka.bootstrap.servers: Kafka address where the schema history will be stored, essential for Debezium to understand the table structure and schema changes.
database.history.kafka.topic: Kafka topic where the schema history will be stored.
Monitoring Changes kafka-console-consumer Kafka stack tool for consuming topics directly in the console.
# Console Consumer - check messages in topic sudo docker exec -it cdc-kafka kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic inventory.public.clientes \ --from-beginning Plugin Big Data Tools for IntelliJ Provectus Kafka UI GitHub - provectus/kafka-ui: Open-Source Web UI for Apache Kafka Management kafka-ui: image: provectuslabs/kafka-ui:latest container_name: cdc-kafka-ui ports: - 8080:8080 depends_on: - kafka environment: DYNAMIC_CONFIG_ENABLED: 'true' networks: debezium-network: aliases: - kafka-ui "Seeing the magic happen" Create the table -- Create the table CREATE TABLE inventory.public.clientes ( SERIAL PRIMARY KEY id, name VARCHAR(50), email VARCHAR(50), data_criacao TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Checks if there is data SELECT * FROM inventory.public.clientes; Connecting the Consumer You can connect the default Kafka consumer to consume messages as soon as they enter the topic: # Console Consumer - checks messages in the topic sudo docker exec -it cdc-kafka kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic inventory.public.clientes \ --from-beginning Or use one of the tools listed above (IntelliJ plugin or Kafka UI).
Testing how it works Inserting data INSERT INTO inventory.public.clientes (name, email) VALUES ('João Silva 1', 'joao1@email.com'); INSERT INTO inventory.public.clientes (name, email) VALUES ('João Silva 2', 'joao2@email.com'); INSERT INTO inventory.public.clientes (name, email) VALUES ('João Silva 3', 'joao3@email.com'); Updating data UPDATE inventory.public.clientes SET name = 'Pedro Silva', email = 'pedro.silva@email.com' WHERE id = 1; UPDATE inventory.public.clientes SET name = 'José Silva', email = 'jose.silva@email.com' WHERE id = 2; UPDATE inventory.public.clientes SET nome = 'Jacó Silva', email = 'jaco.silva@email.com' WHERE id = 3; Deleting data -- Deletes the record DELETE FROM inventory.public.clientes WHERE id = 1; DELETE FROM inventory.public.clientes WHERE id = 2; DELETE FROM inventory.public.clientes WHERE id = 3; Changing the amount of replication information -- Changes the amount of information that is available for logical decoding in case of UPDATE and DELETE events.
ALTER TABLE inventory.public.clientes REPLICA IDENTITY FULL; Rerun the tests (insert, update, and delete) after adjusting the replication level.
Troubleshooting # Viewing the logs sudo docker compose --file debezium-docker-compose.yml logs kafka sudo docker compose --file debezium-docker-compose.yml logs zookeeper sudo docker compose --file debezium-docker-compose.yml logs postgres sudo docker compose --file debezium-docker-compose.yml logs kafka-connect # Inspecting the network sudo docker network inspect debezium-network Conclusion Change Data Capture is a technique widely used by the market, in its various forms, in its various tools. It is especially useful in data and system migrations, facilitating the throttling of a monolith or replicating some data from one microservice to another, or simply applied to the Outbox pattern.
In this scenario, Debezium presents itself as a great tool for this, based on Kafka's already tested and proven Connect platform. But as in any scenario, it has its points of attention, risks and disadvantages. It is essential that technical teams are aware of both the potential and limitations of the tool, ensuring well-informed architectural decisions.