Kafka 실전 개발 및 운영 학습
Kafka의 기본 개념을 익혔다면, 이제 Java(Spring Boot) 기반 Kafka 연동, 메시지 처리 설정, 데이터 직렬화 및 연동 기술을 학습할 차례입니다.
📌 1. Kafka Java Client (Java + Spring Boot 연동)
Kafka를 Java(Spring Boot) 애플리케이션에서 사용하려면 Kafka Java Client 라이브러리를 활용합니다.
🔹 Spring Boot 프로젝트 설정
Maven 의존성 추가 (pom.xml)
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.7</version>
</dependency>
🔹 Producer 코드 (메시지 전송)
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
🔹 Consumer 코드 (메시지 소비)
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "test-topic", groupId = "my-group")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
✅ 이제 KafkaProducerService.sendMessage("test-topic", "Hello Kafka") 를 호출하면 Kafka에 메시지가 전송되고 Consumer가 이를 소비합니다.
📌 2. Producer 설정 (Acknowledgment, Key 기반 메시지 전송)
🔹 Producer 설정 옵션
Kafka Producer는 다양한 설정을 지원합니다.
옵션 | 설명 |
acks | all: 모든 리플리케이션에 메시지가 저장될 때까지 대기 |
key.serializer | 메시지 키 직렬화 방식 (예: StringSerializer) |
value.serializer | 메시지 값 직렬화 방식 (예: JsonSerializer) |
retries | 실패 시 재시도 횟수 |
batch.size | 메시지 배치 크기 |
linger.ms | 배치 전송 대기 시간 |
🔹 Key 기반 메시지 전송
Kafka에서는 Partition을 결정할 때 Key를 활용할 수 있습니다.
public void sendMessageWithKey(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
✅ 같은 Key를 가진 메시지는 같은 Partition으로 전송됩니다.
📌 3. Consumer 설정 (Offset 관리, Consumer Group 활용)
Kafka Consumer는 Offset 관리가 중요합니다.
🔹 Offset 자동/수동 커밋
설정 | 설명 |
enable.auto.commit=true | 자동으로 Offset을 커밋 |
enable.auto.commit=false | 수동으로 Offset을 커밋 |
🔹 수동 Offset 관리 예제
@KafkaListener(topics = "test-topic", groupId = "my-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println("Received: " + record.value());
ack.acknowledge(); // 수동 커밋
}
✅ ack.acknowledge(); 를 호출해야 Offset이 저장됩니다.
📌 4. Serialization / Deserialization (JSON, Avro, Protocol Buffers 활용)
Kafka 메시지는 JSON, Avro, Protocol Buffers 등 다양한 포맷을 사용할 수 있습니다.
🔹 JSON 직렬화/역직렬화 설정
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
}
🔹 Avro 사용 예시
@KafkaListener(topics = "avro-topic", groupId = "group")
public void consumeAvro(AvroMessage avroMessage) {
System.out.println("Received Avro message: " + avroMessage);
}
✅ Avro를 사용하면 메시지 크기를 줄일 수 있어 성능 최적화에 유리합니다.
📌 5. Kafka Connect (DB, File, HTTP 등 외부 시스템 연동)
Kafka Connect를 사용하면 데이터베이스, 파일, HTTP API 등과 Kafka를 쉽게 연동할 수 있습니다.
🔹 Kafka Connect 주요 플러그인
플러그인 | 설명 |
JDBC Connector | MySQL, PostgreSQL 데이터를 Kafka로 |
FileStream Connector | 파일 로그를 Kafka로 |
Elasticsearch Connector | Kafka 데이터를 Elasticsearch로 |
🔹 MySQL → Kafka 연동 (JDBC Connector 예제)
{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/testdb",
"topic.prefix": "mysql-",
"mode": "incrementing",
"incrementing.column.name": "id"
}
}
✅ Kafka Connect를 이용하면 코드 없이 데이터 연동 가능
📌 6. Kafka Streams (데이터 스트리밍 및 실시간 변환 처리)
Kafka Streams는 Kafka 데이터를 실시간으로 처리하는 라이브러리입니다.
🔹 Kafka Streams 기본 예제
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");
stream.mapValues(value -> value.toUpperCase()).to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
✅ input-topic에서 받은 메시지를 대문자로 변환하여 output-topic으로 전송
📌 7. Monitoring (Kafka UI 도구 활용)
Kafka를 운영할 때 모니터링이 중요합니다.
🔹 Kafka 모니터링 도구
도구 | 설명 |
Kafka Manager | Kafka 상태 및 Topic 모니터링 |
Confluent Control Center | Kafka 클러스터 관리 |
Prometheus & Grafana | Kafka 메트릭 시각화 |
Kafka UI | 웹 기반 Kafka 관리 도구 |
🔹 Kafka UI 실행
docker run -d -p 8080:8080 provectuslabs/kafka-ui
✅ http://localhost:8080 에서 Kafka 메시지를 시각적으로 확인 가능
✅ 정리
학습 주제 | 설명 |
Kafka Java Client | Java(Spring Boot)에서 Kafka 연동 |
Producer 설정 | Acknowledgment, Key 기반 메시지 전송 |
Consumer 설정 | Offset 관리, Consumer Group 활용 |
Serialization | JSON, Avro, Protocol Buffers 활용 |
Kafka Connect | DB, File, HTTP 등 외부 시스템과 연동 |
Kafka Streams | 데이터 스트리밍 및 실시간 변환 처리 |
Monitoring | Kafka UI 도구(Kafka Manager, Confluent Control Center) |
이제 Kafka를 실무 프로젝트에서 활용할 준비가 되었습니다! 🚀
다음 단계에서는 Kafka 운영 및 성능 최적화를 학습하면 더욱 강력한 Kafka 시스템을 구축할 수 있습니다.