Ingest Pipeline 개요
📌 Ingest Pipeline은 Elasticsearch에 데이터를 색인하기 전에 가공하는 기능입니다.
- 데이터를 저장하기 전 변환, 정제, 필터링을 자동으로 수행.
- Logstash 없이도 데이터 변환을 직접 수행 가능.
- **Processors(프로세서)**를 이용하여 데이터를 변환 (set, rename, grok 등).
Ingest Pipeline을 활용한 데이터 변환
✅ (1) Ingest Pipeline 생성
아래 예제는 set, rename, grok을 활용하여 데이터를 변환하는 Ingest Pipeline을 생성합니다.
PUT _ingest/pipeline/log_pipeline
{
"description": "Log processing pipeline",
"processors": [
{
"set": {
"field": "log_level",
"value": "INFO"
}
},
{
"rename": {
"field": "old_field",
"target_field": "new_field"
}
},
{
"grok": {
"field": "message",
"patterns": ["%{IP:client_ip} - %{USERNAME:user} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:status}"]
}
}
]
}
✅ 설명
- "set": "log_level" 값을 "INFO"로 설정.
- "rename": "old_field"을 "new_field"로 변경.
- "grok": 정규식 패턴을 적용하여 로그 데이터를 IP, 사용자, 시간, HTTP 요청 등으로 파싱.
✅ (2) Ingest Pipeline 테스트
POST _ingest/pipeline/log_pipeline/_simulate
{
"docs": [
{
"_source": {
"old_field": "deprecated_value",
"message": "192.168.1.1 - johndoe [10/Feb/2024:10:00:00 +0000] \"GET /index.html HTTP/1.1\" 200"
}
}
]
}
🔹 응답 예시
{
"docs": [
{
"doc": {
"_source": {
"log_level": "INFO",
"new_field": "deprecated_value",
"client_ip": "192.168.1.1",
"user": "johndoe",
"timestamp": "10/Feb/2024:10:00:00 +0000",
"method": "GET",
"request": "/index.html",
"http_version": "1.1",
"status": "200"
}
}
}
]
}
✅ "message" 필드가 자동으로 파싱되어 개별 필드(client_ip, user, timestamp 등)로 변환됨.
✅ (3) Ingest Pipeline을 적용하여 데이터 색인
PUT /logs/_doc/1?pipeline=log_pipeline
{
"old_field": "deprecated_value",
"message": "192.168.1.1 - johndoe [10/Feb/2024:10:00:00 +0000] \"GET /index.html HTTP/1.1\" 200"
}
✅ 새로운 데이터가 색인될 때 자동으로 Ingest Pipeline이 실행되어 변환됨.
Reindex API를 사용한 데이터 마이그레이션
Reindex API는 기존 데이터를 새로운 인덱스로 복사할 때 사용합니다.
- 인덱스 구조 변경이 필요할 때 활용.
- 데이터를 새로운 인덱스로 이동하면서 Ingest Pipeline을 적용 가능.
✅ (1) 기존 데이터 조회
GET old_index/_search
{
"size": 5
}
✅ 기존 old_index 데이터 확인.
✅ (2) 새로운 인덱스 생성
PUT new_index
{
"mappings": {
"properties": {
"new_field": { "type": "keyword" },
"log_level": { "type": "keyword" },
"client_ip": { "type": "ip" },
"timestamp": { "type": "date" }
}
}
}
✅ 새로운 데이터 스키마 적용.
✅ (3) Reindex 실행
POST _reindex
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index",
"pipeline": "log_pipeline"
}
}
✅ 기존 old_index 데이터를 new_index로 복사하면서 log_pipeline을 적용.
✅ (4) 데이터 마이그레이션 확인
GET new_index/_search
{
"size": 5
}
✅ 데이터가 정상적으로 변환되어 저장되었는지 확인.
Ingest Pipeline & Reindex API 정리
기능 | 설명 |
Ingest Pipeline | 데이터 색인 전에 자동으로 변환/정제 |
Set Processor | 특정 필드의 값을 설정 |
Rename Processor | 필드 이름 변경 |
Grok Processor | 정규식 패턴을 사용하여 필드 추출 |
Reindex API | 기존 데이터를 새로운 인덱스로 복사 |
Reindex + Pipeline | 데이터를 이동하면서 자동 변환 적용 |
'Elastic Search' 카테고리의 다른 글
[LV 5] Kibana 연동 및 데이터 시각화 (0) | 2025.02.02 |
---|---|
[LV 4] Elasticsearch Scaling (확장 전략) (0) | 2025.02.02 |
[LV 2] 쿼리 DSL (Query DSL) 활용 (0) | 2025.02.02 |
[LV 2] 색인(Indexing)과 검색(Search) 기본 (0) | 2025.02.02 |
[LV 2] 인덱스(Index)와 매핑(Mapping) (0) | 2025.02.02 |