🔥 Debezium?
🚀 필터, 마스킹, 변환(SMT)?
🚀 Embedded mode?
🔥 시나리오
🚀 Polling
SELECT
쿼리를 던지는 방식이다. 데이터 간의 최소 간격을 측정하거나 혹은 real-time(soft/hard)에서 허락하는 지연 시간 범위로 설정하는 것이다. 어쨋든 주기적으로 SELECT
쿼리를 던진다는 것은 변함이 없는데 인덱스,캐시,락에 부담된다.🚀 CDC
🚀 Polling vs CDC
항목 | 폴링(주기적 SELECT) | Debezium Embedded (CDC) |
동작 방식 | 특정 주기 마다 SELECT 쿼리 실행 | 트랜잭션 로그를 읽어 변경 사항 전달 |
지연(latency) | 폴링 주기만큼(예: 10초/1분) | 보통 수백 ms ~ 수 초 |
부하/효율 | 주기마다 인덱스 스캔(빈번한 불필요 조회) | 쓰기 1건당 로그 한 번 읽기 |
트랜잭션 일관성 | 테이블 단위로 부분/역전 가능 | 트랜잭션 경계/순서 보존 |
삭제 감지 | 불가(소프트 삭제 컬럼 없으면 못 잡음) | DELETE 이벤트 제공 |
스키마 변경 | 직접 감지 | DDL 이벤트 제공 |
중복/유실 | timestamp,id 이슈로 중복/유실 가능성 | 오프셋으로 재처리 용이 (멱등성 필요) |
운영 난이도 | 쉬움 (읽기 권한과 인덱스만 있으면 됨) | 트랜잭션 로그 권한 및 오프셋/히스토리 저장 필요 |
장애 복원 | 어플리케이션에서 마지막으로 읽는 위치 관리 | 오프셋으로 마지막 위치 복원 |
규모 확장 | 변경 비율↑면 폴링 비용 급증 | 트래픽↑에도 효율적(로그 스트림) |
의존 | 앱 코드만(쿼리/인덱스) | Debezium 라이브러리 + DB binlog 설정 |
🔥 Debezium Embedded
ChangeEvent
가 들어오며 저장된 오프셋/히스토리는 파일/DB에 저장한다. 앱이 재시작되면 저장된 오프셋부터 이어서 처리하게 된다.🚀 의존성
# 임베디드 모드 (앱 내 Kafka Connect 없이 커넥터 구동) implementation("io.debezium:debezium-embedded:3.1.0.Final") # 임베디드 실행을 위한 API 인터페이스 (DebeziumEngine, ChangeEvent) implementation("io.debezium:debezium-api:3.1.0.Final") # MySQL 용 소스 커넥터(binlog 를 읽어 이벤트 전달) implementation("io.debezium:debezium-connector-mysql:3.1.0.Final") # 오프셋, 스키마 히스토리를 DB에서 관리하기 위한 스토리지 모듈 implementation("io.debezium:debezium-storage-jdbc:3.1.0.Final")
🚀 Config
Debezium connector for MySQL :: Debezium Documentation
Debezium connector for MySQL :: Debezium Documentation
MySQL has a binary log (binlog) that records all operations in the order in which they are committed to the database. This includes changes to table schemas as well as changes to the data in tables. MySQL uses the binlog for replication and recovery.
Storing state of a Debezium connector :: Debezium Documentation
Storing state of a Debezium connector :: Debezium Documentation
Debezium connectors require persistent storage to preserve their state between restarts. All connectors require a mechanism to provide persistent storage for the offsets. In addition, connectors such as Db2, MySQL, Oracle, and SQL Server, require additional storage for their so-called internal schema history, which records changes to table schema in the database.
@Configuration class DebeziumConfig { @Bean("debeziumConnector") fun debeziumConnector(): Properties = Properties() .apply { put("name", "lkdcode-mysql") put("connector.class", "io.debezium.connector.mysql.MySqlConnector") put("topic.prefix", "lkdcode") put("snapshot.mode", "initial") put("snapshot.locking.mode", "none") put("database.hostname", "localhost") put("database.port", "13306") put("database.user", "root") put("database.password", "lkdcode") put("database.server.id", "1234") put("database.connectionTimeZone", "Asia/Seoul") put("database.include.list", "lkdcode") put("table.include.list", "lkdcode.banana") put("offset.storage", "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore") put("offset.storage.jdbc.connection.url", "jdbc:mysql://localhost:13306/lkdcode") put("offset.storage.jdbc.connection.user", "root") put("offset.storage.jdbc.connection.password", "lkdcode") put("offset.storage.jdbc.table.name", "debezium_offsets") put("schema.history.internal", "io.debezium.storage.jdbc.history.JdbcSchemaHistory") put("schema.history.internal.jdbc.connection.url", "jdbc:mysql://localhost:13306/lkdcode") put("schema.history.internal.jdbc.connection.user", "root") put("schema.history.internal.jdbc.connection.password", "lkdcode") put("schema.history.internal.jdbc.table.name", "debezium_schema_history") } }
🎯 name
name
인 경우 기존 위치에서 이어받을 수 있다. 운영 중에 변경하게 되면 새로운 커넥터로 인식된다. 동시에 두 개의 커넥터가 같은 name
을 사용하면 충돌한다.🎯 connector.class
🎯 topic.prefix
🎯 offset.storage
🎯 schema.history
🎯 snapshot.mode
READ
이벤트로 토픽에 내보내는데 엔진 시작 시 스냅샷으로 어떻게/얼마나 가져올지(or 생략할지) 결정하는 설정이다.Debezium connector for MySQL :: Debezium Documentation
Debezium connector for MySQL :: Debezium Documentation
MySQL has a binary log (binlog) that records all operations in the order in which they are committed to the database. This includes changes to table schemas as well as changes to the data in tables. MySQL uses the binlog for replication and recovery.
initial
: 기본 설정. 오프셋이 없을 때 한 번만 전체 스냅샷 후 스티리밍 전환
no_data
: 스키마만 캡처. 데이터 스냅샷 생략 후 바로 스트리밍(과거 풀데이터 불필요할 때)
when_needed
: 시작 시 오프셋이 없거나 로그 위치가 유실된 경우에만 필요 시 스냅샷
initial_only
: 스냅샷만 수행하고 종료(일회성 덤프 용도)
recovery
: 스키마 히스토리 토픽/파일 복구용 스냅샷
🎯 offset storage DDL
Storing state of a Debezium connector :: Debezium Documentation
Storing state of a Debezium connector :: Debezium Documentation
Debezium connectors require persistent storage to preserve their state between restarts. All connectors require a mechanism to provide persistent storage for the offsets. In addition, connectors such as Db2, MySQL, Oracle, and SQL Server, require additional storage for their so-called internal schema history, which records changes to table schema in the database.
🎯 schema history DDL
Storing state of a Debezium connector :: Debezium Documentation
Storing state of a Debezium connector :: Debezium Documentation
Debezium connectors require persistent storage to preserve their state between restarts. All connectors require a mechanism to provide persistent storage for the offsets. In addition, connectors such as Db2, MySQL, Oracle, and SQL Server, require additional storage for their so-called internal schema history, which records changes to table schema in the database.
🚀 Listener
Debezium Engine :: Debezium Documentation
Debezium Engine :: Debezium Documentation
Debezium connectors are normally operated by deploying them to a Kafka Connect service, and configuring one or more connectors to monitor upstream databases and produce data change events for all changes that they see in the upstream databases. Those data change events are written to Kafka, where they can be independently consumed by many different applications. Kafka Connect provides excellent fault tolerance and scalability, since it runs as a distributed service and ensures that all registered and configured connectors are always running. For example, even if one of the Kafka Connect endpoints in a cluster goes down, the remaining Kafka Connect endpoints will restart any connectors that were previously running on the now-terminated endpoint, minimizing downtime and eliminating administrative activities.
@Service class DebeziumListener( @Qualifier("debeziumConnector") private val properties: Properties ) { private val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "debezium-lkdcode").apply { isDaemon = false } } private lateinit var engine: DebeziumEngine<ChangeEvent<String, String>> @PostConstruct fun start() { engine = DebeziumEngine.create(Json::class.java) .using(properties) .notifying { records: List<ChangeEvent<String, String>>, committer: DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> -> records.forEach { record -> try { println("CDC payload: ${record.value()}") committer.markProcessed(record) } catch (e: Exception) { println("ERROR: $e") } } committer.markBatchFinished() } .build() executor.submit(engine) } @PreDestroy fun stop() { try { if (this::engine.isInitialized) engine.close() } finally { executor.shutdown() } } }
@PostConstruct
, @PreDestroy
를 통해 스프링 빈을 모두 만들고 의존성 주입이 끝난 시점에 엔진시작, 그리고 종료 직전에 안전하게 정상 셧다운 한다. 그래야 Debezium 의 offset 이 커밋되고 커넥터가 해당 자원을 반납한다. 이를 생량하면 offset 이 미커밋되거나 커넥션 누수 등이 생길 수 있다. 다른 방법으로도 구현이 가능하므로 상황에 알맞게 구현하면 된다.committer.markProcessed(record)
, committer.markBatchFinished()
JSONPath Online Evaluator
[ { "id": 34, "name": "바나나", "sweetness_brix": "5.0", "brand": "lkdcode", "grade": "특급", "harvested_at": 20340 } ]
🔥 체크 사항
🚀 무한 락?
snapshot.locking.mode
옵션을 따로 설정하지 않으면 기본값인 minimal
로 동작하게 되는데 해당 설정은 아주 짧은 시간동안 FTWRL
을 획득한다. (실제 락은 MySQL 커넥터 설정에 의해 결정) 이후 REPEATABLE READ
트랜잭션으로 일관성을 보장한다.FTWRL
을 잡고 있는 초반 단계에서 스키마 정보를 영속화하는데 이때 스키마 히스토리를 같은 MySQL 서버에 쓰도록 설정했다면 자기 자신이 잡고 있는 FTWRL
때문에 INSERT
가 막혀 마치 교착상태처럼 보이는 현상이 발생할 수 있다.snapshot.locking.mode
혹은 snapshot.mode
설정을 통해서 해결할 수 있다.🚀 Listener 가 이벤트를 수신 못한다면 아래의 설정들을 살펴보자.
log_bin
: 바이너리 로그 사용
server_id
: 서버 고유 ID
binlog_format
: 행 단위 로깅STATEMENT
: 실행된 SQL 문 그 자체ROW
: 변경된 행 데이터MIXED
: 상황에 따라 혼합
binlog_row_image
: 행 이미지의 상세도FULL
: 모든 컬럼을 기록MINIMAL
: 변경된 컬럼 및 키 등 필수값만NOBLOB
: BLOB/TEXT 등은 변경 시에만 포함
SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE 'server_id'; SHOW VARIABLES LIKE 'binlog_format'; SHOW VARIABLES LIKE 'binlog_row_image'; # 결과 log_bin = ON server_id ≠ 0 binlog_format = ROW binlog_row_image = FULL
server-id = 1 log-bin = /var/lib/mysql/mysql-bin binlog-format = ROW binlog-row-image = FULL expire_logs_days = 7
- 직접 config 파일에서 설정하기
cat mariadb-server.cnf # # These groups are read by MariaDB server. # Use it for options that only the server (but not clients) should see # # See the examples of server my.cnf files in /usr/share/mysql/ # # this is read by the standalone daemon and embedded servers [server] # this is only for the mysqld standalone daemon # Settings user and group are ignored when systemd is used. # If you need to run mysqld under a different user or group, # customize your systemd unit file for mysqld/mariadb according to the # instructions in http://fedoraproject.org/wiki/Systemd [mysqld] datadir=/var/lib/mysql socket=/var/lib/mysql/mysql.sock log-error=/var/log/mariadb/mariadb.log pid-file=/run/mariadb/mariadb.pid lower_case_table_names = 1 # 직접 config 파일에서 설정할 수 있다. # ✅ Debezium/CDC 필수 설정들 server_id=1 log_bin=/var/lib/mysql/mysql-bin binlog_format=ROW binlog_row_image=FULL expire_logs_days=7 # # * Galera-related settings # [galera] # Mandatory settings #wsrep_on=ON #wsrep_provider= #wsrep_cluster_address= #binlog_format=row #default_storage_engine=InnoDB #innodb_autoinc_lock_mode=2 # # Allow server to accept connections on all interfaces. # #bind-address=0.0.0.0 # # Optional setting #wsrep_slave_threads=1 #innodb_flush_log_at_trx_commit=0 # this is only for embedded server [embedded] # This group is only read by MariaDB servers, not by MySQL. # If you use the same .cnf file for MySQL and MariaDB, # you can put MariaDB-only options here [mariadb] # This group is only read by MariaDB-10.5 servers. # If you use the same .cnf file for MariaDB of different versions, # use this group for options that older servers don't understand [mariadb-10.5]
REPLICATION SLAVE
: 바이너리 로그를 읽는 레플리카 권한 (MySQL 8에선REPLICATION REPLICA
)
REPLICATION CLIENT
:SHOW MASTER/SLAVE STATUS
,SHOW BINARY LOGS
등 상태 조회 권한.
SELECT ON
: 스냅샷 시 테이블을 읽을 수 있는 권한
CREATE USER 'debezium'@'%' IDENTIFIED BY 'debezium_password'; GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'%'; FLUSH PRIVILEGES;