Migrating Data From Many Source With Change Data Capture on GCP (Part 1)

You,cdcgcp

Read and follow me on Medium (opens in a new tab)

Log-Based Change Data Capture. Source: https://www.striim.com/blog/change-data-capture-cdc-what-it-is-and-how-it-works

CDC

Change Data Capture is a software process that identifies and tracks changes to data in a database.

CDC provides real-time or near-real-time movement of data by moving and processing data continuously as new database events occur.

In high-velocity data environments where time-sensitive decisions are made, Change Data Capture is an excellent fit to achieve low latency, reliable, and scalable data replication.

CDC for ETL

ETL (extract, transform, load) is a data integration process in which data is extracted from various sources and delivered to a data warehouse, database, or data lake. Data can be extracted using database queries (batch-based) or Change Data Capture (near-real-time).

During the transformation phase, data is processed and converted into the appropriate format for the target destination. While legacy ETL has a slow transformation step, modern ETL platforms replace disk-based processing with in-memory processing to allow for real-time data processing, enrichment, and analysis. This is the purpose of using CDC in ETL. The final step of ETL involves loading data into the target destination.

How

There are multiple common Change Data Capture methods that you can implement depending on your application requirements and tolerance for performance overhead (Audit Columns, Table Deltas, Trigger-based CDC, Log-Based Change Data Capture,…). But in this blog aimed at data migration, I want to focus on Log-Based Change Data Capture.

Tools for Change Data Capture

I compare 2 tools(Datastream (opens in a new tab) vs Debezium (opens in a new tab)) that I feel are suitable for my data migration purposes. Debezium is an open-source platform for change data capture that is very common and Datastream is a CDC service provided by GCP.

It can be seen that Datastream is a convenient solution if you use GCP and want to CDC, the limitation of Datastream is that it only supports a small number of databases at the moment.

As for Debezium, it is open-source, so it is allowed by the development community to support many different types of databases, but if want to use it, you need to spend the effort to install and maintenance on your system.

Because there are many different data sources (including RDBMS and NoSQL), I use both tools so that each is appropriate. For data sources already supported by Datastream, we will make use of this service. Otherwise, if the data source is not supported we will use Debezium.

The data migration strategy is that for each database/table, we will take a snapshot (use Spark, JDBC) of them the first time to get the data from the beginning to time A, and then from time A onwards, we will use CDC to be able to update data.

In Part 1 of this blog, we will go through the steps to get a complete stream config in Datastream for CDC.

Datastream

Source: https://cloud.google.com/datastream

Here are the steps that I do to be able to configure Datastream to use for getting CDC bin logs from MySQL to GCS.

1/ Enable Datastream (opens in a new tab) on GCP

2/ Configure the data source (opens in a new tab)

3/ Must allow network connection from Datastream to the data source (whitelisting, IP filtering, etc.), check this doc (opens in a new tab) for details. Also, need to create a username/password for Datastream can access MySQL.

4/ Create Connect profile(Source) and check the network connection from Datastream to the data source.

5/ Create a Cloud Storage profile(Destination) and provide permissions for Datastream to write data in the GCS bucket/path, check this doc (opens in a new tab) for details.

6/ Create Stream in Datastream(connect Source to Destination)

Output

The data for a given stream is written(a 1-minute batch) to the provided bucket or file prefix at:

[bucket]/[prefix]/[objectname]/yyyy/mm/dd/hh/mm/[filename(idempotent)]

The object name in the path for database sources is the schema name followed by the table name (separated by an underscore ‘_’). To understand more details about the data written in GCS, you can read the details here (opens in a new tab).

Then, you can leverage this data to update data at Data Warehouse with near real-time latency. Congratulations!

gs://stream-cdc/mysql-dev/ph_table_test/2023/05/17/12/22/mysql_ph_table_test_2023_05_17_12_22_…_mysql-cdc-binlog_-...avro

Schema:   
 |-- uuid: string (nullable = true)  
 |-- read_timestamp: timestamp (nullable = true)  
 |-- source_timestamp: timestamp (nullable = true)  
 |-- object: string (nullable = true)  
 |-- read_method: string (nullable = true)  
 |-- stream_name: string (nullable = true)  
 |-- schema_key: string (nullable = true)  
 |-- sort_keys: array (nullable = true)  
 |    |-- element: struct (containsNull = true)  
 |    |    |-- member0: string (nullable = true)  
 |    |    |-- member1: long (nullable = true)  
 |-- source_metadata: struct (nullable = true)  
 |    |-- table: string (nullable = true)  
 |    |-- database: string (nullable = true)  
 |    |-- primary_keys: array (nullable = true)  
 |    |    |-- element: string (containsNull = true)  
 |    |-- log_file: string (nullable = true)  
 |    |-- log_position: long (nullable = true)  
 |    |-- change_type: string (nullable = true)  
 |    |-- is_deleted: boolean (nullable = true)  
 |-- payload: struct (nullable = true)  
 |    |-- field_name_1: field_type_1 (nullable = true)  
 |    |-- field_name_2: field_type_2 (nullable = true)  
 |    |-- ...
{  
   "uuid":"k8h654b3-6569-...",  
   "read_timestamp":"2023-04-18 23:57:32",  
   "source_timestamp":"2023-04-18 23:57:31",  
   "object":"ph_table_test",  
   "read_method":"mysql-cdc-binlog",  
   "stream_name":"projects/.../locations/asia-southeast1/streams/mysql-dev-to-gcs-bucket",  
   "schema_key":"ba0d2c5518da691382...",  
   "sort_keys":[  
      "1676433451000",  
      "mysql-bin.002136",  
      "80553258"  
   ],  
   "source_metadata":{  
      "table":"test_table",  
      "database":"ph",  
      "primary_keys":[  
         "id"  
      ],  
      "log_file":"mysql-bin.002136",  
      "log_position":80553258,  
      "change_type":"INSERT",  
      "is_deleted":false  
   },  
   "payload":{  
      "id":58727,  
      "name":"Hung Nguyen",  
      "status":1,  
      "created_date":"2023-04-19 07:57:31.000000",  
      "updated_date":null,  
      "deleted_at":null  
   }  
}

A few things to keep in mind

Reference