Ingest Pipeline 개요

📌 Ingest PipelineElasticsearch에 데이터를 색인하기 전에 가공하는 기능입니다.

  • 데이터를 저장하기 전 변환, 정제, 필터링을 자동으로 수행.
  • Logstash 없이도 데이터 변환을 직접 수행 가능.
  • **Processors(프로세서)**를 이용하여 데이터를 변환 (set, rename, grok 등).

Ingest Pipeline을 활용한 데이터 변환

(1) Ingest Pipeline 생성

아래 예제는 set, rename, grok을 활용하여 데이터를 변환하는 Ingest Pipeline을 생성합니다.

PUT _ingest/pipeline/log_pipeline
{
  "description": "Log processing pipeline",
  "processors": [
    {
      "set": {
        "field": "log_level",
        "value": "INFO"
      }
    },
    {
      "rename": {
        "field": "old_field",
        "target_field": "new_field"
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": ["%{IP:client_ip} - %{USERNAME:user} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:status}"]
      }
    }
  ]
}

설명

  • "set": "log_level" 값을 "INFO"로 설정.
  • "rename": "old_field"을 "new_field"로 변경.
  • "grok": 정규식 패턴을 적용하여 로그 데이터를 IP, 사용자, 시간, HTTP 요청 등으로 파싱.

(2) Ingest Pipeline 테스트

POST _ingest/pipeline/log_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "old_field": "deprecated_value",
        "message": "192.168.1.1 - johndoe [10/Feb/2024:10:00:00 +0000] \"GET /index.html HTTP/1.1\" 200"
      }
    }
  ]
}

🔹 응답 예시

{
  "docs": [
    {
      "doc": {
        "_source": {
          "log_level": "INFO",
          "new_field": "deprecated_value",
          "client_ip": "192.168.1.1",
          "user": "johndoe",
          "timestamp": "10/Feb/2024:10:00:00 +0000",
          "method": "GET",
          "request": "/index.html",
          "http_version": "1.1",
          "status": "200"
        }
      }
    }
  ]
}

"message" 필드가 자동으로 파싱되어 개별 필드(client_ip, user, timestamp 등)로 변환됨.


(3) Ingest Pipeline을 적용하여 데이터 색인

PUT /logs/_doc/1?pipeline=log_pipeline
{
  "old_field": "deprecated_value",
  "message": "192.168.1.1 - johndoe [10/Feb/2024:10:00:00 +0000] \"GET /index.html HTTP/1.1\" 200"
}

새로운 데이터가 색인될 때 자동으로 Ingest Pipeline이 실행되어 변환됨.


Reindex API를 사용한 데이터 마이그레이션

Reindex API는 기존 데이터를 새로운 인덱스로 복사할 때 사용합니다.

  • 인덱스 구조 변경이 필요할 때 활용.
  • 데이터를 새로운 인덱스로 이동하면서 Ingest Pipeline을 적용 가능.

(1) 기존 데이터 조회

GET old_index/_search
{
  "size": 5
}

기존 old_index 데이터 확인.


(2) 새로운 인덱스 생성

PUT new_index
{
  "mappings": {
    "properties": {
      "new_field": { "type": "keyword" },
      "log_level": { "type": "keyword" },
      "client_ip": { "type": "ip" },
      "timestamp": { "type": "date" }
    }
  }
}

새로운 데이터 스키마 적용.


(3) Reindex 실행

POST _reindex
{
  "source": {
    "index": "old_index"
  },
  "dest": {
    "index": "new_index",
    "pipeline": "log_pipeline"
  }
}

기존 old_index 데이터를 new_index로 복사하면서 log_pipeline을 적용.


(4) 데이터 마이그레이션 확인

GET new_index/_search
{
  "size": 5
}

데이터가 정상적으로 변환되어 저장되었는지 확인.


Ingest Pipeline & Reindex API 정리

기능 설명
Ingest Pipeline 데이터 색인 전에 자동으로 변환/정제
Set Processor 특정 필드의 값을 설정
Rename Processor 필드 이름 변경
Grok Processor 정규식 패턴을 사용하여 필드 추출
Reindex API 기존 데이터를 새로운 인덱스로 복사
Reindex + Pipeline 데이터를 이동하면서 자동 변환 적용

 

+ Recent posts