Debezium 개요
Debezium 개요

Debezium 개요

Tags
Debezium
MySQL
Embedded
Published
September 11, 2025
Author
lkdcode
 
언제 발생할지 모르는 데이터의 처리, 실시간 적재 등 어떻게 해결할 수 있을까?
CDC 도구로 풀어낼 수 있다.
 

🔥 Debezium?

대표적인 오픈소스 CDC(Change Data Capture) 플랫폼으로 데이터베이스의 변경(INSERT,UPDATE,DELETE)을 실시간 이벤트 스트림으로 뽑아 다른 시스템으로 보내는 도구이다. 실시간 ETL, CQRS 읽기 모델, MSA 이벤트화(Outbox 패턴) 등에 활용된다.
 
https://debezium.io/documentation/reference/3.2/architecture.html
 
로그 기반 캡처(binlog/WAL 등)는 DB 의 부하를 줄이고 데이터 유실 방지에 효과적이다. 초기 스냅샷을 통해 실시간 스트리밍을 지원하며 키 단위 순서 보장, 오프셋/재처리도 지원한다. 스키마의 변경(DDL)을 전파하며 필터링, 마스킹, 변환(SMT)도 가능하다.
 

🚀 필터, 마스킹, 변환(SMT)?

SMT(Single Message Transform) 는 Kafka connect(=Debezium이 올라가는 런타임)에서 토픽으로 내보내기 전에 이벤트 1건을 즉석에서 가공하는 작은 필터이다. 특정 조건의 이벤트를 핸들링하거나 민감한 데이터를 마스킹 처리하거나 값을 재구성할 수 있다.
 

🚀 Embedded mode?

기본적으로 Kafka connect 위에서 소스 커넥터로 돌아가 Kafka 토픽에 이벤트를 쏘는데, Embedded 모드는 Kafka Connect 없이 애플리케이션 프로세스 안에서 커넥터를 라이브러리처럼 실행할 수 있다. 카프카 인프라까지 운영하지 않는 선에서 Debezium 을 운영할 수 있다.
 

🔥 시나리오

시나리오를 수립하고 Debezium과 Polling 방식에 대해 차이를 알아본다.
시나리오의 요구 사항은 다음과 같다.
 
특정 테이블로 INSERT 되는 데이터를 처리해야 한다. 데이터 간의 최소 간격은 약 수 초 내외지만 최대 간격은 몇 시간, 몇 달이 될 수도 있는 불규칙적인 주기를 가지는 데이터 발생 건에 대해 처리해야 한다.
 
어떻게 가공할지는 다루지 않고, 어떤 방법으로 해당 데이터를 가져오냐까지만 알아본다.
CDC 외에도 해결할 수 있는 여러 방법들이 있는데, 대표적으로 폴링과 비교해보자.
 

🚀 Polling

주기적으로 SELECT 쿼리를 던지는 방식이다. 데이터 간의 최소 간격을 측정하거나 혹은 real-time(soft/hard)에서 허락하는 지연 시간 범위로 설정하는 것이다. 어쨋든 주기적으로 SELECT 쿼리를 던진다는 것은 변함이 없는데 인덱스,캐시,락에 부담된다.
 

🚀 CDC

데이터베이스가 내부적으로 사용하는 트랜잭션 로그(binlog/WAL)를 복제 프로토콜로 스트리밍해서 읽는다. 테이블을 직접 스캔하거나 쿼리를 날리지 않는다. 트리거도 추가하지 않는다. 이는 CPU/락/쿼리 부하가 매우 적은 것을 의미하고 CDC 에서의 부하는 log 를 더 오래 보관해야 하는 디스크/네트워크 I/O 문제 정도가 있다. 위의 두 방식과 비교했을 때 더 적은 부하다.
 

🚀 Polling vs CDC

항목
폴링(주기적 SELECT)
Debezium Embedded (CDC)
동작 방식
특정 주기 마다 SELECT 쿼리 실행
트랜잭션 로그를 읽어 변경 사항 전달
지연(latency)
폴링 주기만큼(예: 10초/1분)
보통 수백 ms ~ 수 초
부하/효율
주기마다 인덱스 스캔(빈번한 불필요 조회)
쓰기 1건당 로그 한 번 읽기
트랜잭션 일관성
테이블 단위로 부분/역전 가능
트랜잭션 경계/순서 보존
삭제 감지
불가(소프트 삭제 컬럼 없으면 못 잡음)
DELETE 이벤트 제공
스키마 변경
직접 감지
DDL 이벤트 제공
중복/유실
timestamp,id 이슈로 중복/유실 가능성
오프셋으로 재처리 용이 (멱등성 필요)
운영 난이도
쉬움 (읽기 권한과 인덱스만 있으면 됨)
트랜잭션 로그 권한 및 오프셋/히스토리 저장 필요
장애 복원
어플리케이션에서 마지막으로 읽는 위치 관리
오프셋으로 마지막 위치 복원
규모 확장
변경 비율↑면 폴링 비용 급증
트래픽↑에도 효율적(로그 스트림)
의존
앱 코드만(쿼리/인덱스)
Debezium 라이브러리 + DB binlog 설정
 
정리하자면, 소규모에서는 Polling 이 낫다. 가볍고 단순하며 필요한 권한 요구도 낮다. 소량의 트래픽과 DB 설정 변경이 어려운 환경에서도 동작하기 때문이다. CDC 는 위의 장점과 정 반대로 권한과 관리에 대한 비용이 더 들지만 그만큼 실시간으로 반응하거나 정확한 추적 등에서 장점이 있다.
 
위의 요구 사항은 Polling 방식을 선택할 수도 있지만, ‘불규칙적인 주기’를 가지는 데이터는 소량의 트래픽이라도 Polling 보다는 CDC 가 더 유리하다.
 
 

🔥 Debezium Embedded

 
notion image
 
앞서 Debezium 은 kafka connect 위에서 돌아간다. kafka 까지 운영하기 싫다면 Embedded 모드로 실행하면 된다. Embedded 모드는 앱(스프링 부트) DebeziumEngine을 시작하고 커넥터가 DB 로그를 읽는다. 이벤트가 발생할 때마다 콜백으로 ChangeEvent 가 들어오며 저장된 오프셋/히스토리는 파일/DB에 저장한다. 앱이 재시작되면 저장된 오프셋부터 이어서 처리하게 된다.
 

🚀 의존성

MySQL 과 스프링 부트로 Debezium 시나리오를 달성해본다. 아래의 Debezium 의존성을 추가해준다.
# 임베디드 모드 (앱 내 Kafka Connect 없이 커넥터 구동) implementation("io.debezium:debezium-embedded:3.1.0.Final") # 임베디드 실행을 위한 API 인터페이스 (DebeziumEngine, ChangeEvent) implementation("io.debezium:debezium-api:3.1.0.Final") # MySQL 용 소스 커넥터(binlog 를 읽어 이벤트 전달) implementation("io.debezium:debezium-connector-mysql:3.1.0.Final") # 오프셋, 스키마 히스토리를 DB에서 관리하기 위한 스토리지 모듈 implementation("io.debezium:debezium-storage-jdbc:3.1.0.Final")
 

🚀 Config

Debezium 설정은 공식문서에 잘 나와있다.
  • Debezium connector for MySQL :: Debezium Documentation
  • Storing state of a Debezium connector :: Debezium Documentation
 
오프셋/스키마 히스토리는 같은 MySQL 에서 관리할 예정이며 자세한 관련 설정 역시 공식 문서를 참고하면 된다.
공식 문서에 상세한 설정들이 나와있으므로 여기서는 간단한 설정만 진행한다.
@Configuration class DebeziumConfig { @Bean("debeziumConnector") fun debeziumConnector(): Properties = Properties() .apply { put("name", "lkdcode-mysql") put("connector.class", "io.debezium.connector.mysql.MySqlConnector") put("topic.prefix", "lkdcode") put("snapshot.mode", "initial") put("snapshot.locking.mode", "none") put("database.hostname", "localhost") put("database.port", "13306") put("database.user", "root") put("database.password", "lkdcode") put("database.server.id", "1234") put("database.connectionTimeZone", "Asia/Seoul") put("database.include.list", "lkdcode") put("table.include.list", "lkdcode.banana") put("offset.storage", "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore") put("offset.storage.jdbc.connection.url", "jdbc:mysql://localhost:13306/lkdcode") put("offset.storage.jdbc.connection.user", "root") put("offset.storage.jdbc.connection.password", "lkdcode") put("offset.storage.jdbc.table.name", "debezium_offsets") put("schema.history.internal", "io.debezium.storage.jdbc.history.JdbcSchemaHistory") put("schema.history.internal.jdbc.connection.url", "jdbc:mysql://localhost:13306/lkdcode") put("schema.history.internal.jdbc.connection.user", "root") put("schema.history.internal.jdbc.connection.password", "lkdcode") put("schema.history.internal.jdbc.table.name", "debezium_schema_history") } }
 
민감한 설정들은 필요에 따라 잘 관리해주면 된다. 대표적인 주요 설정들만 알아보고 상세 설정은 공식 문서를 추천한다.
 

🎯 name

커넥터(or Embedded 엔진) 인스턴스의 고유 이름이다. Offset 저장소가 해당 이름을 기준으로 상태를 묶어 보관하므로 같은 name 인 경우 기존 위치에서 이어받을 수 있다. 운영 중에 변경하게 되면 새로운 커넥터로 인식된다. 동시에 두 개의 커넥터가 같은 name 을 사용하면 충돌한다.
 

🎯 connector.class

Debezium 이 사용할 데이터베이스 커넥터의 클래스이다.
 

🎯 topic.prefix

생성할 토픽/스트림 이름의 접두어를 설정한다.
여러 커넥터가 같은 브로커를 써도 이름 충돌을 방지한다.
 

🎯 offset.storage

offset 을 파일로 저장할 수도 있지만 데이터베이스에서 관리할 수도 있다. DB에서 관리하도록 하는 설정들이다.
 

🎯 schema.history

스키마 히스토리 역시 파일대신 DB 에서 관리하도록 하는 설정들이다.
 

🎯 snapshot.mode

스냅샷은 커넥터가 시작하자마자 그 시점의 테이블 전체(스키마+데이터)를 한 번 읽어 READ 이벤트로 토픽에 내보내는데 엔진 시작 시 스냅샷으로 어떻게/얼마나 가져올지(or 생략할지) 결정하는 설정이다.
자세한 설정은 아래의 공식 문서 참고.
  • Debezium connector for MySQL :: Debezium Documentation
  • initial : 기본 설정. 오프셋이 없을 때 한 번만 전체 스냅샷 후 스티리밍 전환
  • no_data : 스키마만 캡처. 데이터 스냅샷 생략 후 바로 스트리밍(과거 풀데이터 불필요할 때)
  • when_needed : 시작 시 오프셋이 없거나 로그 위치가 유실된 경우에만 필요 시 스냅샷
  • initial_only : 스냅샷만 수행하고 종료(일회성 덤프 용도)
  • recovery : 스키마 히스토리 토픽/파일 복구용 스냅샷
 
위의 설정에서 offset 과 schema history 를 MySQL 에서 관리할 수 있도록 설정했으므로 해당 테이블들을 생성해준다.
 

🎯 offset storage DDL

  • Storing state of a Debezium connector :: Debezium Documentation
 

🎯 schema history DDL

  • Storing state of a Debezium connector :: Debezium Documentation
 

🚀 Listener

Debezium 이 던지는 이벤트를 받아보자. Config 설정도 있고 자바 코드로 작성되어 있어 참고하면 된다.
  • Debezium Engine :: Debezium Documentation
 
@Service class DebeziumListener( @Qualifier("debeziumConnector") private val properties: Properties ) { private val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "debezium-lkdcode").apply { isDaemon = false } } private lateinit var engine: DebeziumEngine<ChangeEvent<String, String>> @PostConstruct fun start() { engine = DebeziumEngine.create(Json::class.java) .using(properties) .notifying { records: List<ChangeEvent<String, String>>, committer: DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> -> records.forEach { record -> try { println("CDC payload: ${record.value()}") committer.markProcessed(record) } catch (e: Exception) { println("ERROR: $e") } } committer.markBatchFinished() } .build() executor.submit(engine) } @PreDestroy fun stop() { try { if (this::engine.isInitialized) engine.close() } finally { executor.shutdown() } } }
 
DebeziumEngine 을 실행하기 위한 쓰레드를 하나 만들고 DebeziumEngine 설정 후 해당 쓰레드에게 넘긴다. DebeziumEngine#run 은 기본적으로 블로킹 루프인데 실행하면 DB 변경을 폴링하고 콜백한다. 스프링 초기화 스레드에서 돌리게 되면 앱이 부팅을 마치지 못하니 전용 쓰레드를 만들어서 넘기는 것이다.
 
쓰레드를 쓰면 이벤트 순서가 보장되고(단일 소비자) 엔진 처리 지연이 애플리케이션의 다른 쓰레드에 영향을 주지 않는다. @PostConstruct, @PreDestroy 를 통해 스프링 빈을 모두 만들고 의존성 주입이 끝난 시점에 엔진시작, 그리고 종료 직전에 안전하게 정상 셧다운 한다. 그래야 Debezium 의 offset 이 커밋되고 커넥터가 해당 자원을 반납한다. 이를 생량하면 offset 이 미커밋되거나 커넥션 누수 등이 생길 수 있다. 다른 방법으로도 구현이 가능하므로 상황에 알맞게 구현하면 된다.
committer.markProcessed(record) , committer.markBatchFinished()
markProcessed 메서드를 통해 해당 레코드를 성공적으로 처리했다는 레코드 단위의 ACK이다.
내부적으로 offset으로 관리하게 되고 markBatchFinished 는 해당 offset 에 커밋하게 되는것이다.
 
이제 Debezium 셋팅이 끝났으니 banana 테이블에 INSERT 해보자!
콘솔에 출력되는 값을 JSONPath Online Evaluator 에서 JSONPath 로 검색해볼 수 있다.
객체로 받고 싶다면 적절히 역직렬화해서 받으면 된다.
[ { "id": 34, "name": "바나나", "sweetness_brix": "5.0", "brand": "lkdcode", "grade": "특급", "harvested_at": 20340 } ]
 

🔥 체크 사항

 

🚀 무한 락?

Debezium 설정에서 초기 스냅샷을 수행하게 설정하고, snapshot.locking.mode 옵션을 따로 설정하지 않으면 기본값인 minimal 로 동작하게 되는데 해당 설정은 아주 짧은 시간동안 FTWRL 을 획득한다. (실제 락은 MySQL 커넥터 설정에 의해 결정) 이후 REPEATABLE READ 트랜잭션으로 일관성을 보장한다.
 
FTWRL 을 잡고 있는 초반 단계에서 스키마 정보를 영속화하는데 이때 스키마 히스토리를 같은 MySQL 서버에 쓰도록 설정했다면 자기 자신이 잡고 있는 FTWRL 때문에 INSERT 가 막혀 마치 교착상태처럼 보이는 현상이 발생할 수 있다.
 
저장소를 분리하거나, snapshot.locking.mode 혹은 snapshot.mode 설정을 통해서 해결할 수 있다.
 

🚀 Listener 가 이벤트를 수신 못한다면 아래의 설정들을 살펴보자.

아래의 4개 변수는 CDC와 레플리케이션의 최소 요건이다.
  • log_bin : 바이너리 로그 사용
  • server_id : 서버 고유 ID
  • binlog_format : 행 단위 로깅
    • STATEMENT : 실행된 SQL 문 그 자체
    • ROW : 변경된 행 데이터
    • MIXED : 상황에 따라 혼합
  • binlog_row_image : 행 이미지의 상세도
    • FULL : 모든 컬럼을 기록
    • MINIMAL : 변경된 컬럼 및 키 등 필수값만
    • NOBLOB : BLOB/TEXT 등은 변경 시에만 포함
SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE 'server_id'; SHOW VARIABLES LIKE 'binlog_format'; SHOW VARIABLES LIKE 'binlog_row_image'; # 결과 log_bin = ON server_id ≠ 0 binlog_format = ROW binlog_row_image = FULL
server-id = 1 log-bin = /var/lib/mysql/mysql-bin binlog-format = ROW binlog-row-image = FULL expire_logs_days = 7
 
  • 직접 config 파일에서 설정하기
cat mariadb-server.cnf # # These groups are read by MariaDB server. # Use it for options that only the server (but not clients) should see # # See the examples of server my.cnf files in /usr/share/mysql/ # # this is read by the standalone daemon and embedded servers [server] # this is only for the mysqld standalone daemon # Settings user and group are ignored when systemd is used. # If you need to run mysqld under a different user or group, # customize your systemd unit file for mysqld/mariadb according to the # instructions in http://fedoraproject.org/wiki/Systemd [mysqld] datadir=/var/lib/mysql socket=/var/lib/mysql/mysql.sock log-error=/var/log/mariadb/mariadb.log pid-file=/run/mariadb/mariadb.pid lower_case_table_names = 1 # 직접 config 파일에서 설정할 수 있다. # ✅ Debezium/CDC 필수 설정들 server_id=1 log_bin=/var/lib/mysql/mysql-bin binlog_format=ROW binlog_row_image=FULL expire_logs_days=7 # # * Galera-related settings # [galera] # Mandatory settings #wsrep_on=ON #wsrep_provider= #wsrep_cluster_address= #binlog_format=row #default_storage_engine=InnoDB #innodb_autoinc_lock_mode=2 # # Allow server to accept connections on all interfaces. # #bind-address=0.0.0.0 # # Optional setting #wsrep_slave_threads=1 #innodb_flush_log_at_trx_commit=0 # this is only for embedded server [embedded] # This group is only read by MariaDB servers, not by MySQL. # If you use the same .cnf file for MySQL and MariaDB, # you can put MariaDB-only options here [mariadb] # This group is only read by MariaDB-10.5 servers. # If you use the same .cnf file for MariaDB of different versions, # use this group for options that older servers don't understand [mariadb-10.5]
 
MySQL/MariaDB 의 DCL(권한 관리) 쿼리를 추가해 Debezium 전용 계정을 만들어 CDC 에 필요한 권한을 줄 수 있다.
  • REPLICATION SLAVE : 바이너리 로그를 읽는 레플리카 권한 (MySQL 8에선 REPLICATION REPLICA)
  • REPLICATION CLIENT : SHOW MASTER/SLAVE STATUS, SHOW BINARY LOGS 등 상태 조회 권한.
  • SELECT ON : 스냅샷 시 테이블을 읽을 수 있는 권한
CREATE USER 'debezium'@'%' IDENTIFIED BY 'debezium_password'; GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'%'; FLUSH PRIVILEGES;