🐾
개발자국
  • 🐶ABOUT
  • 🚲프로그래밍
    • 객체 지향 프로그래밍
    • 오브젝트
      • 1장: 객체, 설계
      • 2장: 객체지향 프로그래밍
      • 3장: 역할, 책임, 협력
      • 4장: 설계 품질과 트레이드오프
      • 5장: 책임 할당하기
      • 6장: 메시지와 인터페이스
      • 7장: 객체 분해
      • 8장: 의존성 관리하기
      • 9장: 유연한 설계
      • 10장: 상속과 코드 재사용
      • 11장: 합성과 유연한 설계
      • 12장: 다형성
      • 13장: 서브클래싱과 서브타이핑
      • 14장: 일관성 있는 협력
      • 15장: 디자인 패턴과 프레임워크
    • 도메인 주도 개발 시작하기
      • 1장: 도메인 모델 시작하기
      • 2장: 아키텍처 개요
      • 3장: 애그리거트
      • 4장: 리포지토리와 모델 구현
      • 5장: 스프링 데이터 JPA를 이용한 조회 기능
      • 6장: 응용 서비스와 표현 영역
      • 7장: 도메인 서비스
      • 8장: 애그리거트 트랜잭션 관리
      • 9장: 도메인 모델과 바운디드 컨텍스트
      • 10장: 이벤트
      • 11장: CQRS
    • 클린 아키텍처
      • 만들면서 배우는 클린 아키텍처
        • 계층형 아키텍처의 문제와 의존성 역전
        • 유스케이스
        • 웹 어댑터
        • 영속성 어댑터
        • 아키텍처 요소 테스트
        • 경계 간 매핑 전략
        • 애플리케이션 조립
        • 아키텍처 경계 강제하기
        • 지름길 사용하기
        • 아키텍처 스타일 결정하기
    • 디자인 패턴
      • 생성(Creational) 패턴
        • 팩토리 패턴
        • 싱글톤 패턴
        • 빌더 패턴
        • 프로토타입 패턴
      • 행동(Behavioral) 패턴
        • 전략 패턴
        • 옵저버 패턴
        • 커맨드 패턴
        • 템플릿 메서드 패턴
        • 반복자 패턴
        • 상태 패턴
        • 책임 연쇄 패턴
        • 인터프리터 패턴
        • 중재자 패턴
        • 메멘토 패턴
        • 비지터 패턴
      • 구조(Structural) 패턴
        • 데코레이터 패턴
        • 어댑터 패턴
        • 퍼사드 패턴
        • 컴포지트 패턴
        • 프록시 패턴
        • 브리지 패턴
        • 플라이웨이트 패턴
      • 복합 패턴
  • 시스템 설계
    • 1. 사용자 수에 따른 규모 확장성
    • 2. 개략적 규모 추정
    • 3. 시스템 설계 접근법
    • 4. 처리율 제한 장치
    • 5. 안정 해시
    • 6. 키-값 저장소
    • 7. 유일한 ID 생성기
    • 8. URL 단축기
    • 9. 웹 크롤러
    • 10. 알림 시스템
    • 11. 뉴스 피드
    • 12. 채팅 시스템
    • 13. 검색어 자동완성
    • 14. 유튜브 스트리밍
    • 15. 구글 드라이브
    • ⭐️. 캐싱 전략
    • ⭐️. 재고 시스템으로 알아보는 동시성이슈 해결방법
    • ⭐️. 실습으로 배우는 선착순 이벤트 시스템
  • 🏝️자바
    • 자바의 내부 속으로
      • Java 언어의 특징
      • JDK
      • JVM
        • 메모리 관리
        • Garbage Collector
          • 기본 동작
          • Heap 영역을 제외한 GC 처리 영역
          • (WIP) GC 알고리즘
        • 클래스 로더
      • 자바 실행 방식
      • 메모리 모델과 관리
      • 바이트 코드 조작
      • 리플렉션
      • 다이나믹 프록시
      • 어노테이션 프로세서
    • 자바의 기본
      • 데이터 타입, 변수, 배열
    • 이펙티브 자바
      • 2장: 객체의 생성과 파괴
        • item 1) 생성자 대신 정적 팩토리 메서드를 고려하라
        • item2) 생성자에 매개변수가 많다면 빌더를 고려하라
        • item3) private 생성자나 열거 타입으로 싱글톤임을 보증하라
        • item4) 인스턴스화를 막으려면 private 생성자를 사용
        • item5) 자원을 직접 명시하는 대신 의존 객체 주입 사용
        • item6) 불필요한 객체 생성 지양
        • item7) 다 쓴 객체는 참조 해제하라
        • item8) finalizer와 cleaner 사용 자제
        • item9) try-with-resources를 사용하자
      • 3장: 모든 객체의 공통 메서드
        • item 10) equals는 일반 규약을 지켜 재정의 하자
        • item 11) equals 재정의 시 hashCode도 재정의하라
        • item 12) 항상 toString을 재정의할 것
        • item 13) clone 재정의는 주의해서 진행하라
        • item 14) Comparable 구현을 고려하라
      • 4장: 클래스와 인터페이스
        • item 15) 클래스와 멤버의 접근 권한을 최소화하라
        • item 16) public 클래스에서는 public 필드가 아닌 접근자 메서드를 사용하라
        • item 17) 변경 가능성을 최소화하라
        • item 18) 상속보다는 컴포지션을 사용하라
        • item 19) 상속을 고려해 설계하고 문서화하고, 그러지 않았다면 상속을 금지하라
        • item 20) 추상 클래스보다는 인터페이스를 우선하라
        • item 21) 인터페이스는 구현하는 쪽을 생각해 설계하라
        • item 22) 인터페이스는 타입을 정의하는 용도로만 사용하라
        • item 23) 태그 달린 클래스보다는 클래스 계층구조를 활용하라
        • item 24) 멤버 클래스는 되도록 static으로 만들라
        • item 25) 톱레벨 클래스는 한 파일에 하나만 담으라
      • 5장: 제네릭
        • item 26) 로 타입은 사용하지 말 것
        • item 27) unchecked 경고를 제거하라
        • item 28) 배열보다 리스트를 사용하라
        • item 29) 이왕이면 제네릭 타입으로 만들라
        • item 30) 이왕이면 제네릭 메서드로 만들라
        • item 31) 한정적 와일드카드를 사용해 API 유연성을 높이라
        • item 32) 제네릭과 가변 인수를 함께 사용
        • item 33) 타입 안전 이종 컨테이너를 고려하라
      • 6장: 열거 타입과 어노테이션
        • item 34) int 상수 대신 열거 타입을 사용하라
        • item 35) ordinal 메서드 대신 인스턴스 필드를 사용하라
        • item 36) 비트 필드 대신 EnumSet을 사용하라
        • item 37) ordinal 인덱싱 대신 EnumMap을 사용하라
        • item 38) 확장할 수 있는 열거 타입이 필요하면 인터페이스를 사용하라
        • item 39) 명명 패턴보다 어노테이션을 사용하라
        • item 40) @Override 어노테이션을 일관되게 사용하라
        • item 41) 정의하려는 것이 타입이라면 마커 인터페이스를 사용하라
      • 7장: 람다와 스트림
        • item 42) 익명 클래스보다는 람다를 사용하라
        • item 43) 람다보다는 메서드 참조를 사용하라
        • item 44) 표준 함수형 인터페이스를 사용하라
        • item 45) 스트림은 주의해서 사용하라
        • item 46) 스트림에서는 부작용 없는 함수를 사용하라
        • item 47) 반환 타입으로는 스트림보다 컬렉션이 낫다
        • item 48) 스트림 병렬화는 주의해서 적용하라
      • 8장: 메서드
        • item 49) 매개변수가 유효한지 검사하라
        • item 50) 적시에 방어적 복사본을 만들라
        • item 51) 메서드 시그니처를 신중히 설계하라
        • item 52) 다중정의는 신중히 사용하라
        • item 53) 가변인수는 신중히 사용하라
        • item 54) null이 아닌, 빈 컬렉션이나 배열을 반환하라
        • item 55) 옵셔널 반환은 신중히 하라
        • item 56) 공개된 API 요소에는 항상 문서화 주석을 작성하라
      • 9장: 일반적인 프로그래밍 원칙
        • item 57) 지역 변수의 범위를 최소화하라
        • item 58) 전통적인 for문보다 for-each문을 사용하기
        • item 59) 라이브러리를 익히고 사용하라
        • item 60) 정확한 답이 필요하다면 float, double은 피하라
        • item 61) 박싱된 기본타입보단 기본 타입을 사용하라
        • item 62) 다른 타입이 적절하다면 문자열 사용을 피하라
        • item 63) 문자열 연결은 느리니 주의하라
        • item 64) 객체는 인터페이스를 사용해 참조하라
        • item 65) 리플렉션보단 인터페이스를 사용
        • item 66) 네이티브 메서드는 신중히 사용하라
        • item 67) 최적화는 신중히 하라
        • item 68) 일반적으로 통용되는 명명 규칙을 따르라
      • 10장: 예외
        • item 69) 예외는 진짜 예외 상황에만 사용하라
        • item 70) 복구할 수 있는 상황에서는 검사 예외를, 프로그래밍 오류에는 런타임 예외를 사용하라
        • item 71) 필요 없는 검사 예외 사용은 피하라
        • item 72) 표준 예외를 사용하라
        • item 73) 추상화 수준에 맞는 예외를 던지라
        • item 74) 메서드가 던지는 모든 예외를 문서화하라
        • item 75) 예외의 상세 메시지에 실패 관련 정보를 담으라
        • item 76) 가능한 한 실패 원자적으로 만들라
        • item 77) 예외를 무시하지 말라
      • 11장: 동시성
        • item 78) 공유 중인 가변 데이터는 동기화해 사용하라
        • item 79) 과도한 동기화는 피하라
        • item 80) 스레드보다는 실행자, 태스크, 스트림을 애용하라
        • item 81) wait와 notify보다는 동시성 유틸리티를 애용하라
        • item 82) 스레드 안전성 수준을 문서화하라
        • item 83) 지연 초기화는 신중히 사용하라
        • item 84) 프로그램의 동작을 스레드 스케줄러에 기대지 말라
      • 12장: 직렬화
        • item 85) 자바 직렬화의 대안을 찾으라
        • item 86) Serializable을 구현할지는 신중히 결정하라
        • item 87) 커스텀 직렬화 형태를 고려해보라
        • item 88) readObject 메서드는 방어적으로 작성하라
        • item 89) 인스턴스 수를 통제해야 한다면 readResolve보다는 열거 타입을 사용하라
        • item 90) 직렬화된 인스턴스 대신 직렬화 프록시 사용을 검토하라
    • 모던 자바 인 액션
      • 1장: 자바의 역사
      • 2장: 동작 파라미터화
      • 3장: 람다
      • 4장: 스트림
      • 5장: 스트림 활용
      • 6장: 스트림으로 데이터 수집
      • 7장: 병렬 데이터 처리와 성능
      • 8장: 컬렉션 API 개선
      • 9장: 람다를 이용한 리팩토링, 테스팅, 디버깅
      • 10장: 람다를 이용한 DSL
      • 11장: null 대신 Optional
      • 12장: 날짜와 시간 API
      • 13장: 디폴트 메서드
      • 14장: 자바 모듈 시스템
      • 15장: CompletableFuture와 Reactive 개요
      • 16장: CompletableFuture
      • 17장: 리액티브 프로그래밍
      • 18장: 함수형 프로그래밍
      • 19장: 함수형 프로그래밍 기법
      • 20장: 스칼라 언어 살펴보기
    • 자바의 이모저모
      • Javax
      • Objects
      • NIO
      • Thread
      • Concurrent
        • Atomic
        • Executor, ExecutorService
        • Interrupt
      • Assertions
    • Netty
      • 네티 맛보기
      • 네티의 주요 특징
      • 채널 파이프라인
      • 이벤트 루프
      • 바이트 버퍼
      • 부트스트랩
      • 네티 테스트
      • 코덱
      • 다양한 ChannelHandler와 코덱
      • 웹소켓
      • UDP 브로드캐스팅
    • 자바 병렬 프로그래밍
      • 2장: 스레드 안전성
      • 15장: 단일 연산 변수와 논블로킹 동기화
  • 🏖️코틀린
    • 코틀린 인 액션
      • 코틀린 언어의 특징
      • 코틀린 기초
      • 함수 정의와 호출
      • 클래스, 객체, 인터페이스
      • 람다
      • 타입 시스템
      • 연산자 오버로딩과 기타 관례
      • 고차 함수
      • 제네릭스
      • 어노테이션과 리플렉션
      • DSL 만들기
  • 🌸스프링
    • Spring Core
      • Cron Expression
      • Bean
        • Lifecycle
        • Aware
    • Spring MVC
    • Spring Security
      • 로그인 처리
      • 로그아웃 처리
      • JWT 인증 방식
      • 메소드별 인가 처리
    • Spring Data
      • Pageable
      • Spring Data Couchbase
      • Spring Data Redis
        • Serializer
    • Spring REST Docs
    • Spring Annotations
    • Spring Cloud
      • Service Discovery
      • API Gateway
      • Spring Cloud Config
      • MicroService Communication
      • Data Synchronization
    • Test
      • 테스트 용어 정리
      • JUnit
      • Spring Boot Test
      • Mockito
    • QueryDSL
      • 프로젝트 환경설정
      • 기본 문법
      • 중급 문법
      • 순수 JPA와 QueryDSL
      • 스프링 데이터 JPA와 QueryDSL
    • Lombok
      • @Data
      • @Builder
      • Log Annotations
  • 🕋DB
    • MySQL
      • CentOS7에서 MySQL 8 버전 설치하기
    • MongoDB
      • 
    • Redis
      • Sentinel
      • Cluster
      • Transaction
      • 자료구조
        • String
        • List
        • Set
        • Hash
        • Bitmaps
        • SortedSet
      • Lettuce 단일 서버, 클러스터 서버, 풀링 사용 방법
  • 📽️인프라
    • 리눅스
      • 주요 명령어 모음
    • Docker
      • Docker
      • Docker Compose
      • Docker Swarm
      • Docker Network
      • Linux에서 root 아닌 유저로 docker 실행하기
    • Kubernetes
      • 기초 개념
      • Pod
      • Configuration
      • ReplicationSet
      • Network
      • ConfigMap & Secret
      • Volume, Mount, Claim
      • Controller
      • Multi Container Pod
      • StatefulSet & Job
      • Rollout & Rollback
      • Helm
      • 개발 워크플로우와 CI/CD
      • Container Probes
      • Resource Limit
      • Logging & Monitoring
      • Ingress
      • Security
      • Multi Node/Architecture Cluster
      • Workload & Pod management
      • CRD & Operator
      • Serverless Function
      • K8S Cheat Sheet
    • Kafka
      • 카프카 개요
      • 카프카 설치 및 실습
      • Kafka Broker
      • Topic, Partition, Record
      • Producer
      • Consumer
      • Kafka Streams
      • Kafka Connect
      • MirrorMaker
  • AWS
    • AWS Console / CLI / SDK
    • IAM
    • EC2
      • EC2 Advanced
    • ELB / ASG
    • RDS / Aurora / ElastiCache
    • DynamoDB
    • DocumentDB / Neptune / Keyspaces / QLDB / Timestream
    • Route 53
    • Beanstalk
    • Solution Architect
    • S3
      • 보안
    • CloudFront
    • Global Accelerator
    • AWS Storage
    • Messaging
    • Container
    • Serverless
    • Data Analysis
    • Machine Learning
    • Monitoring
    • Security
    • VPC
    • Data Migration
    • 기타 서비스
  • 🏔️CS
    • 운영 체제
      • Introduction
      • System Structures
      • Process
      • Synchronization
      • Muitithreaded Programming
      • Process Scheduling
      • Memory Management
      • Virtual Memory
    • 네트워크
      • 네트워크 기초
      • 네트워크 통신 방식
      • OSI 7계층
        • 1계층: 물리계층
        • 2계층: 데이터 링크 계층
        • 3계층: 네트워크 계층
        • 4계층: 전송 계층
        • 5계층: 세션 계층
        • 6계층: 표현 계층
        • 7계층: 응용 계층
      • TCP/IP 스택
      • ARP
      • 데이터 크기 조절
      • WDM
      • NAT
      • DNS
      • DHCP
      • VPN
      • 네이글 알고리즘
      • 서버 네트워크
      • 네트워크 보안
        • 보안의 기본
        • 보안 장비
      • 이중화
    • 데이터베이스
      • 트랜잭션
    • 컴퓨터 구조
      • 개요
      • Instruction Set Architecture
      • Procedure Call & Return
      • Linking
      • Pipeline
      • Memory Hierarchy
      • Virtual Memory
      • Interrupt / Exception, IO
    • 자료 구조
      • Array
      • List
      • Map
      • Set
      • Queue
      • PriorityQueue
      • Stack
    • 웹 기술
      • HTTP
        • 쿠키와 세션
  • 🪂Big Data
    • Apache Hadoop
  • 🕹️ETC
    • Git
      • 내부 구조
      • 내가 자주 사용하는 명령어 모음
      • Commit Convention
    • 이력서 작성하기
    • Embedded
      • 라즈베리파이에서 네오픽셀 적용기
    • 기술블로그 모음집
Powered by GitBook
On this page
  • 개념
  • 실행 모드
  • 소스 커넥터
  • 구현해보기
  • 싱크 커넥터
  • 구현해보기
  1. 인프라
  2. Kafka

Kafka Connect

PreviousKafka StreamsNextMirrorMaker

Last updated 8 months ago

개념

  • 반복적인 파이프라인 생성 작업이 필요할 때 프로듀서, 컨슈머 애플리케이션을 개발하고 배포하고 운영하는 것은 비효율적이다. 커넥트를 이용하면 특정 작업 형태를 템플릿으로 만들어 둔 커넥터를 실행하여 반복 작업을 줄일 수 있다.

  • 파일의 데이터를 토픽으로 보내는 커넥터는 파일이 존재하는 디렉토리 위치, 파일 이름 등 고유한 설정값을 입력받은 후 실행할 수 있을 것이다.

  • 소스 커넥터는 프로듀서 역할을 하며, 싱크 커넥터는 컨슈머 역할을 한다.

  • MySQL, S3, MongoDB 등과 같은 저장소가 싱크/소스 애플리케이션에 해당한다.

  • 커넥트에 커넥터 생성을 요청하면 내부적으로 커넥터와 태스크가 생성된다. 태스크는 커넥터에 종속되어 실질적인 데이터를 처리한다.

  • 커넥터를 사용한 파이프라인에 컨버터, 트랜스폼 기능을 추가할 수 있다.

    • 컨버터는 데이터 처리 전 스키마를 변경하도록 도와준다. 예를 들어 JsonConverter, ByteArrayConverter 등이 제공된다.

    • 트랜스폼은 데이터 처리 시 메시지 단위로 데이터를 간단히 변환하려는 용도로 사용된다. 예를 들어 JSON 데이터에서 일부 키 값을 제거/추가할 수 있다.

실행 모드

단일 모드 커넥트

  • 커넥터를 정의하는 파일을 작성하고 해당 파일을 참조하는 단일 커넥트를 실행하여 파이프라인을 생성할 수 있다.

  • 단일 애플리케이션으로 실행되므로 SPOF(Single Point of Failure) 지점이 될 수 있어 고가용성 구성이 불가능하다.

  • 주로 개발환경이나 중요도가 낮은 파이프라인 운영 시 사용한다.

  • connect-standalone.properties 파일에 커넥트 속성을 설정하여 실행시킬 수 있다.

    • bootstrap.servers : 커넥트와 연동할 카프카 클러스터 주소를 입력한다. 2개 이상 브로커로 이뤄졌다면 , 로 구분해 입력한다.

    • key.converter / value.converter : 데이터를 카프카에 저장하거나 카프카에서 가져올 때 사용하는 컨버터 지정

    • key.converter.schemas.enable / value.converter.schemas.enable : 스키마 형태를 사용하는지 여부

    • offset.storage.file.filename : 오프셋을 저장할 로컬 파일의 경로와 이름 지정

    • offset.flush.intervals.ms : 태스크가 처리 완료한 오프셋을 커밋하는 주기 지정

    • plugin.path : 플러그인 형태로 추가할 커넥터의 디렉토리 주소 지정

  • connect-file-source.properties 파일에 커넥터 속성을 설정할 수 있다.

    • name : 커넥터 이름을 지정하며, 커넥트 내에서 중복되면 안된다.

    • connector.class : 사용할 커넥터 클래스 이름을 지정한다.

    • tasks.max : 커넥터로 실행할 테스크 개수를 지정한다. 태스크 개수를 늘려 병렬로 처리되도록 할 수 있다.

    • 만약 FileStreamSource 커넥터를 사용한다면 읽을 파일 위치, 데이터를 저장할 토픽의 이름을 지정하는 등, 커넥터마다 필요한 속성을 설정해주어야 한다.

  • 아래와 같은 명령으로 커넥트 프로세스를 실행시킬 수 있다.

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

분산 모드 커넥트

  • 두 대 이상의 서버에서 실행된 프로세스들을 클러스터 형태로 관리해 단일 모드보다 안전하게 운영할 수 있다.

  • 데이터 처리량 변화가 있다면 스케일 아웃할 수 있다.

  • 커넥터 실행 시 같은 그룹으로 지정된 여러 커넥트들에 분산되어 실행된다. 커넥트 중 한 대에 문제가 생겨도 나머지 커넥트에서 커넥터를 안전히 실행할 수 있다.

  • 소스 커넥터, 싱크 커넥터는 데이터 처리 시점을 저장하기 위해 오프셋 정보를 사용한다. 분산 모드 커넥트 이용 시 오프셋 정보는 카프카 내부 토픽에 저장된다. 유실을 막기 위해 복제 개수를 지정할 수 있다.

  • connect-distributed.properties 파일에 커넥트 속성을 설정하여 실행시킬 수 있다.

    • bootstrap.servers : 커넥트와 연동할 카프카 클러스터 주소를 입력한다.

    • group.id : 커넥터 프로세서 그룹의 이름을 지정한다.

    • key.converter / value.converter : 데이터를 카프카에 저장하거나 카프카에서 가져올 때 사용하는 컨버터 지정

    • key.converter.schemas.enable / value.converter.schemas.enable : 스키마 형태를 사용하는지 여부

    • offset.storage.topic / offset.storage.replication.factor : 오프셋 저장 토픽 이름, 복제 개수

    • config.storage.topic / config.storage.replication.factor

    • status.storage.topic / status.storage.replication.factor

    • offset.flush.intervals.ms : 태스크가 처리 완료한 오프셋을 커밋하는 주기 지정

    • plugin.path : 플러그인 형태로 추가할 커넥터의 디렉토리 주소 지정

  • 분산 모드 커넥트 실행 시 커넥트 설정 파일만 있으면 된다. 커넥터는 커넥트 실행 후 REST API를 통해 실행/중단/변경한다.

bin/connect-distributed.sh config/connect-distributed.properties

소스 커넥터

  • 소스 애플리케이션 또는 파일로부터 데이터를 가져와 토픽에 적재하는 역할을 한다.

  • 커스텀 소스 커넥터를 사용하려면 SourceConnector, SourceTask 클래스를 통해 구현하고 jar 파일로 만들어 실행 시 플러그인으로 추가해야 한다.

구현해보기

의존성 추가

  • connect-api 라이브러리를 추가한다.

implementation 'org.apache.kafka:connect-api'

Config

  • AbstractConfig 클래스를 구현하여 커넥터 실행 시 필요한 설정 값들을 정의할 수 있다.

    • ConfigDef 클래스를 생성해 여러 옵션값을 세밀하게 지정할 수 있다.

    • 생성자에서는 상위 클래스 생성자에 설정값들을 담아 호출한다.

public class SingleFileSourceConnectorConfig extends AbstractConfig {

    public static final String DIR_FILE_NAME = "file";
    private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";
    private static final String DIR_FILE_NAME_DOC = "읽을 파일 경로와 이름";

    public static final String TOPIC_NAME = "topic";
    private static final String TOPIC_DEFAULT_VALUE = "test";
    private static final String TOPIC_DOC = "보낼 토픽명";

    public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME,
                                                    Type.STRING,
                                                    DIR_FILE_NAME_DEFAULT_VALUE,
                                                    Importance.HIGH,
                                                    DIR_FILE_NAME_DOC)
                                                    .define(TOPIC_NAME,
                                                            Type.STRING,
                                                            TOPIC_DEFAULT_VALUE,
                                                            Importance.HIGH,
                                                            TOPIC_DOC);

    public SingleFileSourceConnectorConfig(Map<String, String> props) {
        super(CONFIG, props);
    }
}

SourceConnector

  • SourceConnector는 태스크 실행 전 커넥터 설정 파일을 초기화하고 어떤 태스크 클래스를 사용할 것인지 정의할 때 사용한다.

  • 아래는 SourceConnector 구현체 예시이며 하나의 파일을 소스로 사용하여 토픽에 저장하기 위한 커넥터이다.

    • version 메서드는 커넥터의 버전을 반환한다.

    • start 메서드는 입력된 속성을 통해 설정값을 초기화한다. 만약 올바른 값이 입력되지 않았다면 ConnectException을 발생시킨다.

    • taskClass 메서드는 이 커넥터에서 사용할 태스크 클래스를 지정한다.

    • taskConfigs 메서드는 태스크가 여러 개 일때 태스크마다 각기 다른 옵션을 적용하도록 한다.

    • config 메서드는 커넥터가 사용할 설정값 정보를 ConfigDef 타입으로 반환한다.

    • stop 메서드는 커넥터가 종료될 때 호출되므로 자원을 해제하는 등의 작업을 수행하도록 한다.

public class SingleFileSourceConnector extends SourceConnector {

    private final Logger logger = LoggerFactory.getLogger(SingleFileSourceConnector.class);

    private Map<String, String> configProperties;

    @Override
    public String version() {
        return "1.0";
    }

    @Override
    public void start(Map<String, String> props) {
        this.configProperties = props;
        try {
            new SingleFileSourceConnectorConfig(props);
        } catch (ConfigException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public Class<? extends Task> taskClass() {
        return SingleFileSourceTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        Map<String, String> taskProps = new HashMap<>();
        taskProps.putAll(configProperties);
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(taskProps);
        }
        return taskConfigs;
    }

    @Override
    public ConfigDef config() {
        return SingleFileSourceConnectorConfig.CONFIG;
    }

    @Override
    public void stop() {
    }
}

SourceTask

  • SourceTask는 소스 애플리케이션 혹은 파일로부터 데이터를 가져와 토픽으로 보낸다.

  • 자체적으로 소스 애플리케이션 혹은 파일을 어디까지 읽었는지에 대한 오프셋을 가진다. 이를 통해 토픽에 중복으로 데이터를 보내는 것을 방지한다.

  • 아래는 SourceTask의 구현체이다.

    • version 메서드에서는 태스크의 버전을 지정한다. 보통 커넥터의 버전과 동일하게 둔다.

    • start 메서드에서는 태스크 시작 시 필요한 로직을 작성한다. 데이터 처리에 대한 모든 리소스를 여기서 초기화하면 좋다.

    • poll 메서드에서는 소스 애플리케이션 또는 파일로부터 데이터를 읽어오는 로직을 작성한다. 데이터를 토픽으로 내보내기 위해 SourceRecord 타입으로 데이터를 반환해야 한다.

    • stop 메서드에서는 태스크 종료 시 필요한 로직을 작성한다.

public class SingleFileSourceTask extends SourceTask {
    private Logger logger = LoggerFactory.getLogger(SingleFileSourceTask.class);

    public final String FILENAME_FIELD = "filename";
    public final String POSITION_FIELD = "position";

    private Map<String, String> fileNamePartition;
    private Map<String, Object> offset;
    private String topic;
    private String file;
    private long position = -1;


    @Override
    public String version() {
        return "1.0";
    }

    @Override
    public void start(Map<String, String> props) {
        try {
            // Init variables
            SingleFileSourceConnectorConfig config = new SingleFileSourceConnectorConfig(props);
            topic = config.getString(SingleFileSourceConnectorConfig.TOPIC_NAME);
            file = config.getString(SingleFileSourceConnectorConfig.DIR_FILE_NAME);
            fileNamePartition = Collections.singletonMap(FILENAME_FIELD, file);
            offset = context.offsetStorageReader().offset(fileNamePartition);

            // Get file offset from offsetStorageReader
            if (offset != null) {
                Object lastReadFileOffset = offset.get(POSITION_FIELD);
                if (lastReadFileOffset != null) {
                    position = (Long) lastReadFileOffset;
                }
            } else {
                position = 0;
            }

        } catch (Exception e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public List<SourceRecord> poll() {
        List<SourceRecord> results = new ArrayList<>();
        try {
            Thread.sleep(1000);

            List<String> lines = getLines(position);

            if (lines.size() > 0) {
                lines.forEach(line -> {
                    Map<String, Long> sourceOffset = Collections.singletonMap(POSITION_FIELD, ++position);
                    SourceRecord sourceRecord = new SourceRecord(fileNamePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line);
                    results.add(sourceRecord);
                });
            }
            return results;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new ConnectException(e.getMessage(), e);
        }
    }

    private List<String> getLines(long readLine) throws Exception {
        BufferedReader reader = Files.newBufferedReader(Paths.get(file));
        return reader.lines().skip(readLine).collect(Collectors.toList());
    }

    @Override
    public void stop() {
    }
}

싱크 커넥터

  • 토픽의 데이터를 타깃 애플리케이션이나 파일에 저장한다.

  • SinkConnector와 SinkTask 클래스를 사용해 싱크 커넥터를 구현할 수 있으며 jar 파일로 만들어 플러그인으로 추가해 사용할 수 있다.

구현해보기

의존성 추가

  • connect-api 라이브러리를 추가한다.

implementation 'org.apache.kafka:connect-api'

Config

  • 토픽의 데이터를 저장할 대상의 정보를 담는다.

public class SingleFileSinkConnectorConfig extends AbstractConfig {

    public static final String DIR_FILE_NAME = "file";
    private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";
    private static final String DIR_FILE_NAME_DOC = "저장할 디렉토리와 파일 이름";

    public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME,
                                                    Type.STRING,
                                                    DIR_FILE_NAME_DEFAULT_VALUE,
                                                    Importance.HIGH,
                                                    DIR_FILE_NAME_DOC);

    public SingleFileSinkConnectorConfig(Map<String, String> props) {
        super(CONFIG, props);
    }
}

SinkConnector

  • 태스크 실행 전 사용자로부터 입력받은 설정값을 초기화하고 어떤 태스크 클래스를 사용할 지 정의한다.

  • 아래는 SinkConnector 구현체이다. 메서드들의 역할은 SourceConnector와 동일하다.

public class SingleFileSinkConnector extends SinkConnector {

    private Map<String, String> configProperties;

    @Override
    public String version() {
        return "1.0";
    }

    @Override
    public void start(Map<String, String> props) {
        this.configProperties = props;
        try {
            new SingleFileSinkConnectorConfig(props);
        } catch (ConfigException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public Class<? extends Task> taskClass() {
        return SingleFileSinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        Map<String, String> taskProps = new HashMap<>();
        taskProps.putAll(configProperties);
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(taskProps);
        }
        return taskConfigs;
    }

    @Override
    public ConfigDef config() {
        return SingleFileSinkConnectorConfig.CONFIG;
    }

    @Override
    public void stop() {
    }
}

SinkTask

  • 토픽으로부터 데이터를 가져와 애플리케이션이나 파일에 저장하는 로직이 담긴다.

  • 아래는 SinkTask의 구현체이다.

    • version 메서드에서는 태스크의 버전을 지정한다. 보통 커넥터의 버전과 동일하게 둔다.

    • start 메서드에서는 태스크 시작 시 필요한 로직을 작성한다. 데이터 처리에 대한 모든 리소스를 여기서 초기화하면 좋다.

    • put 메서드에서는 토픽 데이터를 타겟 애플리케이션 또는 파일로 저장하는 로직을 작성한다. SinkRecord는 토픽의 레코드이며, 토픽, 파티션, 타임스탬프 정보를 담고 있다.

    • put 메서드에서는 데이터를 insert하고, flush 메서드에서 커밋하도록 한다면 트랜잭션 형태로 관리할 수 있다.

    • stop 메서드에서는 태스크 종료 시 필요한 로직을 작성한다.

public class SingleFileSinkTask extends SinkTask {
    private SingleFileSinkConnectorConfig config;
    private File file;
    private FileWriter fileWriter;

    @Override
    public String version() {
        return "1.0";
    }

    @Override
    public void start(Map<String, String> props) {
        try {
            config = new SingleFileSinkConnectorConfig(props);
            file = new File(config.getString(config.DIR_FILE_NAME));
            fileWriter = new FileWriter(file, true);
        } catch (Exception e) {
            throw new ConnectException(e.getMessage(), e);
        }

    }

    @Override
    public void put(Collection<SinkRecord> records) {
        try {
            for (SinkRecord record : records) {
                fileWriter.write(record.value().toString() + "\n");
            }
        } catch (IOException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        try {
            fileWriter.flush();
        } catch (IOException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public void stop() {
        try {
            fileWriter.close();
        } catch (IOException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }
}

📽️
https://medium.com/walmartglobaltech/kafka-connect-overview-a84782d96ab5