Kafka 실전 개발 및 운영 학습

Kafka의 기본 개념을 익혔다면, 이제 Java(Spring Boot) 기반 Kafka 연동, 메시지 처리 설정, 데이터 직렬화 및 연동 기술을 학습할 차례입니다.


📌 1. Kafka Java Client (Java + Spring Boot 연동)

Kafka를 Java(Spring Boot) 애플리케이션에서 사용하려면 Kafka Java Client 라이브러리를 활용합니다.

🔹 Spring Boot 프로젝트 설정

Maven 의존성 추가 (pom.xml)

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.7</version>
</dependency>

🔹 Producer 코드 (메시지 전송)

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

🔹 Consumer 코드 (메시지 소비)

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {
    @KafkaListener(topics = "test-topic", groupId = "my-group")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

이제 KafkaProducerService.sendMessage("test-topic", "Hello Kafka") 를 호출하면 Kafka에 메시지가 전송되고 Consumer가 이를 소비합니다.


📌 2. Producer 설정 (Acknowledgment, Key 기반 메시지 전송)

🔹 Producer 설정 옵션

Kafka Producer는 다양한 설정을 지원합니다.

옵션 설명
acks all: 모든 리플리케이션에 메시지가 저장될 때까지 대기
key.serializer 메시지 키 직렬화 방식 (예: StringSerializer)
value.serializer 메시지 값 직렬화 방식 (예: JsonSerializer)
retries 실패 시 재시도 횟수
batch.size 메시지 배치 크기
linger.ms 배치 전송 대기 시간

🔹 Key 기반 메시지 전송

Kafka에서는 Partition을 결정할 때 Key를 활용할 수 있습니다.

public void sendMessageWithKey(String topic, String key, String message) {
    kafkaTemplate.send(topic, key, message);
}

✅ 같은 Key를 가진 메시지는 같은 Partition으로 전송됩니다.


📌 3. Consumer 설정 (Offset 관리, Consumer Group 활용)

Kafka Consumer는 Offset 관리가 중요합니다.

🔹 Offset 자동/수동 커밋

설정 설명
enable.auto.commit=true 자동으로 Offset을 커밋
enable.auto.commit=false 수동으로 Offset을 커밋

🔹 수동 Offset 관리 예제

@KafkaListener(topics = "test-topic", groupId = "my-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
    System.out.println("Received: " + record.value());
    ack.acknowledge(); // 수동 커밋
}

✅ ack.acknowledge(); 를 호출해야 Offset이 저장됩니다.


📌 4. Serialization / Deserialization (JSON, Avro, Protocol Buffers 활용)

Kafka 메시지는 JSON, Avro, Protocol Buffers 등 다양한 포맷을 사용할 수 있습니다.

🔹 JSON 직렬화/역직렬화 설정

@Configuration
public class KafkaConfig {
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }
}

🔹 Avro 사용 예시

@KafkaListener(topics = "avro-topic", groupId = "group")
public void consumeAvro(AvroMessage avroMessage) {
    System.out.println("Received Avro message: " + avroMessage);
}

Avro를 사용하면 메시지 크기를 줄일 수 있어 성능 최적화에 유리합니다.


📌 5. Kafka Connect (DB, File, HTTP 등 외부 시스템 연동)

Kafka Connect를 사용하면 데이터베이스, 파일, HTTP API 등과 Kafka를 쉽게 연동할 수 있습니다.

🔹 Kafka Connect 주요 플러그인

플러그인 설명
JDBC Connector MySQL, PostgreSQL 데이터를 Kafka로
FileStream Connector 파일 로그를 Kafka로
Elasticsearch Connector Kafka 데이터를 Elasticsearch로

🔹 MySQL → Kafka 연동 (JDBC Connector 예제)

{
  "name": "mysql-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://localhost:3306/testdb",
    "topic.prefix": "mysql-",
    "mode": "incrementing",
    "incrementing.column.name": "id"
  }
}

✅ Kafka Connect를 이용하면 코드 없이 데이터 연동 가능


📌 6. Kafka Streams (데이터 스트리밍 및 실시간 변환 처리)

Kafka Streams는 Kafka 데이터를 실시간으로 처리하는 라이브러리입니다.

🔹 Kafka Streams 기본 예제

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");
stream.mapValues(value -> value.toUpperCase()).to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

✅ input-topic에서 받은 메시지를 대문자로 변환하여 output-topic으로 전송


📌 7. Monitoring (Kafka UI 도구 활용)

Kafka를 운영할 때 모니터링이 중요합니다.

🔹 Kafka 모니터링 도구

도구 설명
Kafka Manager Kafka 상태 및 Topic 모니터링
Confluent Control Center Kafka 클러스터 관리
Prometheus & Grafana Kafka 메트릭 시각화
Kafka UI 웹 기반 Kafka 관리 도구

🔹 Kafka UI 실행

docker run -d -p 8080:8080 provectuslabs/kafka-ui

✅ http://localhost:8080 에서 Kafka 메시지를 시각적으로 확인 가능


✅ 정리

학습 주제 설명
Kafka Java Client Java(Spring Boot)에서 Kafka 연동
Producer 설정 Acknowledgment, Key 기반 메시지 전송
Consumer 설정 Offset 관리, Consumer Group 활용
Serialization JSON, Avro, Protocol Buffers 활용
Kafka Connect DB, File, HTTP 등 외부 시스템과 연동
Kafka Streams 데이터 스트리밍 및 실시간 변환 처리
Monitoring Kafka UI 도구(Kafka Manager, Confluent Control Center)

 

이제 Kafka를 실무 프로젝트에서 활용할 준비가 되었습니다! 🚀
다음 단계에서는 Kafka 운영 및 성능 최적화를 학습하면 더욱 강력한 Kafka 시스템을 구축할 수 있습니다.

+ Recent posts