커스텀 소스 커넥터를 사용하려면 SourceConnector, SourceTask 클래스를 통해 구현하고 jar 파일로 만들어 실행 시 플러그인으로 추가해야 한다.
구현해보기
의존성 추가
connect-api 라이브러리를 추가한다.
implementation 'org.apache.kafka:connect-api'
Config
AbstractConfig 클래스를 구현하여 커넥터 실행 시 필요한 설정 값들을 정의할 수 있다.
ConfigDef 클래스를 생성해 여러 옵션값을 세밀하게 지정할 수 있다.
생성자에서는 상위 클래스 생성자에 설정값들을 담아 호출한다.
public class SingleFileSourceConnectorConfig extends AbstractConfig {
public static final String DIR_FILE_NAME = "file";
private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";
private static final String DIR_FILE_NAME_DOC = "읽을 파일 경로와 이름";
public static final String TOPIC_NAME = "topic";
private static final String TOPIC_DEFAULT_VALUE = "test";
private static final String TOPIC_DOC = "보낼 토픽명";
public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME,
Type.STRING,
DIR_FILE_NAME_DEFAULT_VALUE,
Importance.HIGH,
DIR_FILE_NAME_DOC)
.define(TOPIC_NAME,
Type.STRING,
TOPIC_DEFAULT_VALUE,
Importance.HIGH,
TOPIC_DOC);
public SingleFileSourceConnectorConfig(Map<String, String> props) {
super(CONFIG, props);
}
}
SourceConnector
SourceConnector는 태스크 실행 전 커넥터 설정 파일을 초기화하고 어떤 태스크 클래스를 사용할 것인지 정의할 때 사용한다.
아래는 SourceConnector 구현체 예시이며 하나의 파일을 소스로 사용하여 토픽에 저장하기 위한 커넥터이다.
version 메서드는 커넥터의 버전을 반환한다.
start 메서드는 입력된 속성을 통해 설정값을 초기화한다. 만약 올바른 값이 입력되지 않았다면 ConnectException을 발생시킨다.
taskClass 메서드는 이 커넥터에서 사용할 태스크 클래스를 지정한다.
taskConfigs 메서드는 태스크가 여러 개 일때 태스크마다 각기 다른 옵션을 적용하도록 한다.
config 메서드는 커넥터가 사용할 설정값 정보를 ConfigDef 타입으로 반환한다.
stop 메서드는 커넥터가 종료될 때 호출되므로 자원을 해제하는 등의 작업을 수행하도록 한다.
public class SingleFileSourceConnector extends SourceConnector {
private final Logger logger = LoggerFactory.getLogger(SingleFileSourceConnector.class);
private Map<String, String> configProperties;
@Override
public String version() {
return "1.0";
}
@Override
public void start(Map<String, String> props) {
this.configProperties = props;
try {
new SingleFileSourceConnectorConfig(props);
} catch (ConfigException e) {
throw new ConnectException(e.getMessage(), e);
}
}
@Override
public Class<? extends Task> taskClass() {
return SingleFileSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(configProperties);
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(taskProps);
}
return taskConfigs;
}
@Override
public ConfigDef config() {
return SingleFileSourceConnectorConfig.CONFIG;
}
@Override
public void stop() {
}
}
SourceTask
SourceTask는 소스 애플리케이션 혹은 파일로부터 데이터를 가져와 토픽으로 보낸다.
자체적으로 소스 애플리케이션 혹은 파일을 어디까지 읽었는지에 대한 오프셋을 가진다. 이를 통해 토픽에 중복으로 데이터를 보내는 것을 방지한다.
아래는 SourceTask의 구현체이다.
version 메서드에서는 태스크의 버전을 지정한다. 보통 커넥터의 버전과 동일하게 둔다.
start 메서드에서는 태스크 시작 시 필요한 로직을 작성한다. 데이터 처리에 대한 모든 리소스를 여기서 초기화하면 좋다.
poll 메서드에서는 소스 애플리케이션 또는 파일로부터 데이터를 읽어오는 로직을 작성한다. 데이터를 토픽으로 내보내기 위해 SourceRecord 타입으로 데이터를 반환해야 한다.
stop 메서드에서는 태스크 종료 시 필요한 로직을 작성한다.
public class SingleFileSourceTask extends SourceTask {
private Logger logger = LoggerFactory.getLogger(SingleFileSourceTask.class);
public final String FILENAME_FIELD = "filename";
public final String POSITION_FIELD = "position";
private Map<String, String> fileNamePartition;
private Map<String, Object> offset;
private String topic;
private String file;
private long position = -1;
@Override
public String version() {
return "1.0";
}
@Override
public void start(Map<String, String> props) {
try {
// Init variables
SingleFileSourceConnectorConfig config = new SingleFileSourceConnectorConfig(props);
topic = config.getString(SingleFileSourceConnectorConfig.TOPIC_NAME);
file = config.getString(SingleFileSourceConnectorConfig.DIR_FILE_NAME);
fileNamePartition = Collections.singletonMap(FILENAME_FIELD, file);
offset = context.offsetStorageReader().offset(fileNamePartition);
// Get file offset from offsetStorageReader
if (offset != null) {
Object lastReadFileOffset = offset.get(POSITION_FIELD);
if (lastReadFileOffset != null) {
position = (Long) lastReadFileOffset;
}
} else {
position = 0;
}
} catch (Exception e) {
throw new ConnectException(e.getMessage(), e);
}
}
@Override
public List<SourceRecord> poll() {
List<SourceRecord> results = new ArrayList<>();
try {
Thread.sleep(1000);
List<String> lines = getLines(position);
if (lines.size() > 0) {
lines.forEach(line -> {
Map<String, Long> sourceOffset = Collections.singletonMap(POSITION_FIELD, ++position);
SourceRecord sourceRecord = new SourceRecord(fileNamePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line);
results.add(sourceRecord);
});
}
return results;
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ConnectException(e.getMessage(), e);
}
}
private List<String> getLines(long readLine) throws Exception {
BufferedReader reader = Files.newBufferedReader(Paths.get(file));
return reader.lines().skip(readLine).collect(Collectors.toList());
}
@Override
public void stop() {
}
}
싱크 커넥터
토픽의 데이터를 타깃 애플리케이션이나 파일에 저장한다.
SinkConnector와 SinkTask 클래스를 사용해 싱크 커넥터를 구현할 수 있으며 jar 파일로 만들어 플러그인으로 추가해 사용할 수 있다.
구현해보기
의존성 추가
connect-api 라이브러리를 추가한다.
implementation 'org.apache.kafka:connect-api'
Config
토픽의 데이터를 저장할 대상의 정보를 담는다.
public class SingleFileSinkConnectorConfig extends AbstractConfig {
public static final String DIR_FILE_NAME = "file";
private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";
private static final String DIR_FILE_NAME_DOC = "저장할 디렉토리와 파일 이름";
public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME,
Type.STRING,
DIR_FILE_NAME_DEFAULT_VALUE,
Importance.HIGH,
DIR_FILE_NAME_DOC);
public SingleFileSinkConnectorConfig(Map<String, String> props) {
super(CONFIG, props);
}
}
SinkConnector
태스크 실행 전 사용자로부터 입력받은 설정값을 초기화하고 어떤 태스크 클래스를 사용할 지 정의한다.
아래는 SinkConnector 구현체이다. 메서드들의 역할은 SourceConnector와 동일하다.
public class SingleFileSinkConnector extends SinkConnector {
private Map<String, String> configProperties;
@Override
public String version() {
return "1.0";
}
@Override
public void start(Map<String, String> props) {
this.configProperties = props;
try {
new SingleFileSinkConnectorConfig(props);
} catch (ConfigException e) {
throw new ConnectException(e.getMessage(), e);
}
}
@Override
public Class<? extends Task> taskClass() {
return SingleFileSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(configProperties);
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(taskProps);
}
return taskConfigs;
}
@Override
public ConfigDef config() {
return SingleFileSinkConnectorConfig.CONFIG;
}
@Override
public void stop() {
}
}
SinkTask
토픽으로부터 데이터를 가져와 애플리케이션이나 파일에 저장하는 로직이 담긴다.
아래는 SinkTask의 구현체이다.
version 메서드에서는 태스크의 버전을 지정한다. 보통 커넥터의 버전과 동일하게 둔다.
start 메서드에서는 태스크 시작 시 필요한 로직을 작성한다. 데이터 처리에 대한 모든 리소스를 여기서 초기화하면 좋다.
put 메서드에서는 토픽 데이터를 타겟 애플리케이션 또는 파일로 저장하는 로직을 작성한다. SinkRecord는 토픽의 레코드이며, 토픽, 파티션, 타임스탬프 정보를 담고 있다.
put 메서드에서는 데이터를 insert하고, flush 메서드에서 커밋하도록 한다면 트랜잭션 형태로 관리할 수 있다.
stop 메서드에서는 태스크 종료 시 필요한 로직을 작성한다.
public class SingleFileSinkTask extends SinkTask {
private SingleFileSinkConnectorConfig config;
private File file;
private FileWriter fileWriter;
@Override
public String version() {
return "1.0";
}
@Override
public void start(Map<String, String> props) {
try {
config = new SingleFileSinkConnectorConfig(props);
file = new File(config.getString(config.DIR_FILE_NAME));
fileWriter = new FileWriter(file, true);
} catch (Exception e) {
throw new ConnectException(e.getMessage(), e);
}
}
@Override
public void put(Collection<SinkRecord> records) {
try {
for (SinkRecord record : records) {
fileWriter.write(record.value().toString() + "\n");
}
} catch (IOException e) {
throw new ConnectException(e.getMessage(), e);
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
try {
fileWriter.flush();
} catch (IOException e) {
throw new ConnectException(e.getMessage(), e);
}
}
@Override
public void stop() {
try {
fileWriter.close();
} catch (IOException e) {
throw new ConnectException(e.getMessage(), e);
}
}
}