공부/Kafka

Strimzi Kafka Operator의 kafka connect로 Mysql to Postgres migration

토고미 2022. 3. 18. 15:01

kafka connect란 일종의 api 서버로서, 카프카의 클라이언트를 대신하여 데이터를 브로커에게 pub/sub 한다.

connect와 connector, schema registry에 관한 것은 스킵하겠다.

모르겠으면 구글링! 아니면 조만간 내가 글을 쓰고 여기에 링크를 걸든가..

 

쉽게 생각하면 여러 개의 클라이언트가 난잡하게 카프카 브로커에 접근하는 것을

카프카 커넥트라는 api 서버가 통합 관리하는 것이라 생각하면 된다.

그리고 그 api 서버에 뚫려있는 api가 connector이다!

 

=======

 

이 글의 목적은 k8s 환경에서 strimzi kafka operator를 이용하여 kafka 및 kafka connect를 프로비저닝 한 뒤,

같은 k8s 환경에 존재하는 mysql로부터 postgres로 데이터를 pub/sub 하는 것이다.

 

관련된 모든 파일은 아래 깃헙 레포에 있다.

https://github.com/aldlfkahs/strimzi-kafka-example

 

GitHub - aldlfkahs/strimzi-kafka-example

Contribute to aldlfkahs/strimzi-kafka-example development by creating an account on GitHub.

github.com

 

참고로 모든 리소스를 kafka 네임스페이스에 배포하도록 되어있다.

 

다음 단계로 넘어가기 전에 이전 단계의 pod가 running 된 것을 확인하고 진행하자!

1. Strimzi Kafka Operator 배포

아래 링크의 strimzi 공식 사이트에서 다운받거나

https://strimzi.io/downloads/

 

Downloads

Downloads Strimzi releases are available for download on our GitHub. The release artifacts contain documentation and example YAML files for deployment on Kubernetes.

strimzi.io

 

아니면 내 레포에서 0_strimzi-cluster-operator-0.26.0.yaml 파일을 사용하면 된다.

kubectl apply -f 0_strimzi-cluster-operator-0.26.0.yaml

파일 이름에서 알 수있듯이 0.26.0 버전을 사용했다.

 

 

2. Kafka Cluster  배포

operator의 배포가 완료되었다면 이제 진짜 카프카 인스턴스를 배포할 차례이다.

1_kafka-simple-clutser.yaml 파일을 참고하면 되며, 몇 가지 설정사항을 살펴보자.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster # 클러스터 이름
  namespace: kafka
spec:
  kafka:
    version: 3.0.0
    replicas: 3
    resources:
      limits:
        cpu: 1000m
        memory: 1536Mi
      requests:
        cpu: 100m
        memory: 100Mi
    listeners:
      - name: plain
        port: 9092
        type: internal # 서비스 타입
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.8"
    storage:
      type: ephemeral # 스토리지 타입
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral # 스토리지 타입

.metadata.name : 클러스터 이름

.spec.kafka.listeners[].type : 서비스 타입 (internal=외부노출X, nodeport=노드포트로 노출 등)

.spec.kafka.storage.type : 스토리지 타입 (ephemeral=pv 사용 안함, persistent-claim=pvc 사용 등)

.spec.zookeeper.storage.type : 위와 동일

 

적절히 설정한 뒤 배포하자

kubectl apply -f 1_kafka-simple-cluster.yaml

 

3. Schema Registry 배포

schema registry는 strimzi kafka에서 제공하지 않는다. strimzi kafka는 오픈소스인 반면, schema registry를 개발한 회사들은 자신들에게 라이센스가 있어서 strimzi의 철학과 맞지 않다나 뭐라나...

 

아무튼 그런 사정으로 인해 schema registry를 사용하기 위해서는 strimzi kafka operator의 도움을 받지 않고 직접 배포해야 한다.

 

schema registry도 여러 회사에서 만든 제품들이 있는데, 그중에서도 잘 나가는 건 confluent사에서 만든 것이다.

파일은 2_schema-registry.yaml을 참고하면 된다.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: schema-registry
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: schema-registry
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: schema-registry
    spec:
      containers:
      - name: my-cluster-schema-registry
        image: confluentinc/cp-schema-registry:7.0.1
        ports:
        - containerPort: 8081
        env:
        - name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
          value: my-cluster-kafka-bootstrap:9092 # 배포된 카프카 클러스터 서비스 주소
        - name: SCHEMA_REGISTRY_HOST_NAME
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: SCHEMA_REGISTRY_LISTENERS
          value: http://0.0.0.0:8081
        - name: SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL
          value: PLAINTEXT
---
apiVersion: v1
kind: Service
metadata:
  name: schema-registry
  namespace: kafka
spec:
  ports:
  - port: 8081
  clusterIP: None
  selector:
    app: schema-registry

여기선 다른 건 제쳐두고 가장 중요한 건 딱 하나이다.

 

.spec.template.spec.containers[0].env에서

SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS라는 이름의 환경변수 값이다.

이 키 값의 value를 2번에서 배포한 카프카 클러스터의 {bootstrap 서비스 주소}:9092로 설정해주어야 한다.

 

schema registry는 백엔드 스토리지로 카프카 브로커를 사용하기 때문인데,

사실 그래서 꼭 2번에서 배포한 카프카 클러스터일 필요는 없고, 다른 카프카 클러스터가 있다면 그것을 사용해도 좋다.

 

잘 설정했다면 배포하자

kubectl apply -f 2_schema-registry.yaml

 

4. Kafka Connect 배포

이제 카프카 커넥트까지 띄우면 기본적인 준비는 끝난다.

3_kafka-connect.yaml 파일을 참고하면 된다.

 

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster3
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: aldlfkahs/kafka-connect-jdbc:v1.0 # 사용할 커넥트 이미지
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9092 # 배포한 카프카 클러스터 서비스 주소
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry.kafka.svc.cluster.local:8081 # 배포한 schema registry 서비스 주소
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://schema-registry.kafka.svc.cluster.local:8081 # 배포한 schema registry 서비스 주소

 

여기서도 서비스 주소만 잘 적어주면 된다.

 

.spec.image : 사용할 커넥터의 jar 파일 등이 잘 포함된 이미지, 모르겠으면 예시에 적힌 내 이미지를 사용하면 된다.

.spec.bootstrapServers : 2번에서 배포한 카프카 클러스터 서비스 주소를 적어주면 된다.

.spec.config :

  key.converter.schema.registry.url : 배포한 schema registry 서비스 주소를 적어주면 된다. 반드시 앞에 http://를 붙이자.

  value.converter.schema.registry.url : 위와 동일

 

 

.spec.config에는 다양한 설정을 할 수 있지만 일단 목표가 msyql to postgres이니 이 예제대로 따라하면 된다.

참고로 mysql과 postgres를 위한 connector 및 AvroConverter 등은 내가 포함시켜서 이미지를 만들어놨으니

따로 이미지가 없다면 예제에 적힌 이미지를 그대로 사용하면 된다.

 

kubectl apply -f 3_kafka-connect.yaml

이제 카프카 커넥트의 기본적인 구조는 전부 배포했다!

 

아래 부터는 실제 DB to DB 마이그레이션을 위한 배포이다.

 

5. Mysql, Postgres 배포

sourceDB로 Mysql, sinkDB로 Postgres를 사용할 것이다.

깃 레포의 db-deploy 폴더 안에 mysql.yaml과 postgres.yaml을 참고하면 된다.

각 계정과 비밀번호는 아래와 같이 설정했다.

 

mysql의 계정/비번 : root/mypassword

postgres의 계정/비번 : postgres/mypassword

 

mysql의 컨피그맵 부분만 살펴보겠다.

apiVersion: v1
kind: ConfigMap
metadata:
  name: mysql-config
  namespace: kafka
data:
  my.cnf: |
    [mysqld]
    log-bin=mysql-bin
    max_binlog_size=1G
    expire_logs_days=30 
  initdb.sql: |
    SET GLOBAL binlog_format = 'ROW';
    SET GLOBAL server_id = 184054;
    CREATE DATABASE test;
    use test;
    CREATE TABLE example (id INT(16) NOT NULL, name VARCHAR(32));
    INSERT INTO example VALUES (123, 'tmax');

 

initdb.sql은 mysql이 최초 실행될 때 수행할 명령어다.

 

test라는 데이터베이스를 새로 만들고

그 안에서 id와 name이라는 column을 갖는 example 테이블을 만드는 것을 확인할 수 있다.

 

그 이외의 설정은 내가 이리저리 테스트하며 커넥터와 연결하기 위한 설정들이다.

 

그럼 DB를 배포하도록 하자.

kubecl apply -f db-deploy/

 

6. Connector 적용

마지막 관문이다. connect에 mysql source connector와 postgres sink connector를 이용해 5번에서 배포한 DB를 연결하자.

4_kafka-mysql-source-connector.yaml, 5_kafka-postgres-sink-connector.yaml 파일이다.

 

먼저 mysql source connector이다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: jdbc-mysql-source-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect-cluster3
spec:
  class: io.confluent.connect.jdbc.JdbcSourceConnector
  tasksMax: 1
  config:
    mode: "incrementing"
    incrementing.column.name: "id"
    connection.url: "jdbc:mysql://mysql.kafka.svc.cluster.local:3306/test"
    connection.user: "root"
    connection.password: "mypassword"
    table.whitelist: "example"

connection과 관련된 설정을 띄운대로 맞게 적어주면 된다.

참고로 connection.url 끝 부분의 ~/test는 test 데이터베이스를 쓰겠다는 의미이다.

 

table.whitelist는 어떤 테이블에 대해 pub할지를 화이트리스트로 필터링하는 것이다.

 

mode는 어떤 방식으로 pub을 할지이고,

incrementing.column.name은 mode가 incrementing일 때 어떤 column이 증가하는 것을 기준으로 pub을 할지이다.

 

mode에는 세가지 설정이 있는데, 이는 아래 링크의 공식문서를 참고 바란다.

https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html

 

다음으로 postgres sink connector이다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: postgres-connector1
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect-cluster3
spec:
  class: io.confluent.connect.jdbc.JdbcSinkConnector
  tasksMax: 1
  config:
    topics: "example"
    connection.url: "jdbc:postgresql://postgres-service.kafka.svc.cluster.local:5432/postgres"
    connection.user: "postgres"
    connection.password: "mypassword"
    auto.create: "true"

mysql source connector와 크게 다를 것 업다.

 

어떤 topic을 sub할지, 테이블을 자동으로 만들어줄지에 대한 auto.create 정도가 있다.

역시 자세한 설정 사항은 공식문서를 참고하자.

 

그럼 이제 적용하자.

kubectl apply -f 4_kafka-mysql-source-connector.yaml
kubectl apply -f 5_kafka-postgres-sink-connector.yaml

 

직접 postgres에 들어가서 확인을 해보자

postgres=# \dt
Did not find any relations.

처음 postgres 띄웠을 때는 이렇게 아무 테이블도 없지만,

source/sink connector를 배포하고나면

 

postgres=# \dt
          List of relations
 Schema |  Name   | Type  |  Owner
--------+---------+-------+----------
 public | example | table | postgres
(1 row)


postgres=# select * from example;
 id  | name
-----+------
 123 | tmax
(1 row)

이렇게 테이블과 데이터가 들어간 것을 확인할 수 있다!

 

새롭게 mysql에 데이터를 넣어도 동작하는지 확인해보자

mysql> insert into example values('125', 'dangan');
Query OK, 1 row affected (0.01 sec)

단 넣어줄때, 기존에 있던 데이터의 id 칼럼 중 최대값보다 큰 id값을 넣어주어야한다.

mode: incrementing이 그런 설정이기 때문이다.

 

그럼 이제 다시 postgres를 확인하면

postgres=# select * from example;
 id  |  name
-----+--------
 123 | tmax
 125 | dangan

 

짜잔!

 

참고

https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html

https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/

 

깃헙

https://github.com/aldlfkahs/strimzi-kafka-example