Logstash 및 Beats 활용 (ELK Stack 구성)

📌 ELK Stack은 Elasticsearch, Logstash, Kibana를 포함하는 로그 및 데이터 분석 솔루션입니다.

  • Logstash: 다양한 데이터 소스로부터 데이터를 수집 및 변환하여 Elasticsearch에 전달
  • Beats: 경량 데이터 수집기(Filebeat, Metricbeat, Heartbeat)
  • Kafka 연동: 대용량 데이터 스트리밍을 위한 Apache Kafka와 Elasticsearch 연결

Logstash를 이용한 데이터 수집 및 변환

Logstash는 다양한 소스로부터 데이터를 수집하여 필터링, 변환 후 Elasticsearch로 전송하는 도구입니다.


(1) Logstash 설치

wget https://artifacts.elastic.co/downloads/logstash/logstash-8.x.x-linux-x86_64.tar.gz
tar -xzf logstash-8.x.x-linux-x86_64.tar.gz
cd logstash-8.x.x-linux-x86_64

Logstash 다운로드 및 압축 해제 후 실행 준비.


(2) Logstash 기본 구성 (logstash.conf)

Logstash의 설정 파일은 Input → Filter → Output 구조를 가집니다.

input {
  file {
    path => "/var/log/syslog"
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:host} %{GREEDYDATA:log}" }
  }
  date {
    match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    target => "@timestamp"
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "syslog-%{+YYYY.MM.dd}"
  }
}

파일 로그(/var/log/syslog)를 읽고 변환하여 Elasticsearch에 저장.
grok을 사용하여 timestamp, host, log 필드로 분리.
데이터를 날짜별 인덱스로 저장 (syslog-YYYY.MM.dd).


(3) Logstash 실행

bin/logstash -f logstash.conf

Logstash 실행 후 데이터를 Elasticsearch로 전송.


(4) Elasticsearch에서 데이터 확인

GET syslog-*/_search
{
  "query": {
    "match_all": {}
  }
}

Elasticsearch에서 Logstash를 통해 수집된 로그 데이터 조회.


Beats 활용 (Filebeat, Metricbeat, Heartbeat)

Beats는 경량 데이터 수집기로, 개별 서버에서 실행되며 Logstash 또는 Elasticsearch로 데이터를 전송할 수 있습니다.


(1) Filebeat - 로그 파일 수집

Filebeat는 파일 로그를 모니터링하고 전송하는 경량 데이터 수집기입니다.

📌 Filebeat 설정 (filebeat.yml)

filebeat.inputs:
  - type: log
    paths:
      - /var/log/syslog

output.elasticsearch:
  hosts: ["http://localhost:9200"]
  index: "filebeat-%{+YYYY.MM.dd}"

파일 로그(/var/log/syslog)를 모니터링하고 Elasticsearch에 저장.


(2) Metricbeat - 시스템 메트릭 수집

Metricbeat는 CPU, 메모리, 네트워크 등의 시스템 메트릭을 수집합니다.

📌 Metricbeat 설정 (metricbeat.yml)

metricbeat.modules:
  - module: system
    metricsets:
      - cpu
      - memory
      - network
    enabled: true
    period: 10s

output.elasticsearch:
  hosts: ["http://localhost:9200"]
  index: "metricbeat-%{+YYYY.MM.dd}"

CPU, 메모리, 네트워크 메트릭을 10초마다 수집하여 Elasticsearch에 저장.


(3) Heartbeat - 서비스 상태 모니터링

Heartbeat는 웹 서비스 및 TCP, ICMP 등의 연결 상태를 모니터링합니다.

📌 Heartbeat 설정 (heartbeat.yml)

heartbeat.monitors:
  - type: http
    schedule: '@every 10s'
    urls:
      - "http://localhost:5601"
      - "http://localhost:9200"

output.elasticsearch:
  hosts: ["http://localhost:9200"]
  index: "heartbeat-%{+YYYY.MM.dd}"

Elasticsearch와 Kibana 상태를 10초마다 체크하여 Elasticsearch에 저장.


Kafka와 Elasticsearch 연동

대량의 로그 및 이벤트 데이터를 실시간으로 처리할 때 Kafka를 중간 버퍼로 사용하여 Elasticsearch로 전송할 수 있습니다.


(1) Kafka 설치 및 실행

wget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0

# Zookeeper 실행
bin/zookeeper-server-start.sh config/zookeeper.properties

# Kafka 실행
bin/kafka-server-start.sh config/server.properties

Zookeeper와 Kafka를 실행하여 메시지 큐 준비.


(2) Kafka에 로그 데이터 전송

bin/kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic logs --bootstrap-server localhost:9092

Kafka 토픽(logs)을 생성하고 로그 메시지를 입력하여 테스트.


(3) Logstash를 사용하여 Kafka → Elasticsearch 데이터 전송

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["logs"]
    group_id => "logstash-group"
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "kafka-logs-%{+YYYY.MM.dd}"
  }
}

Kafka에서 데이터를 읽어 Elasticsearch로 저장.


(4) 데이터 확인

GET kafka-logs-*/_search
{
  "query": {
    "match_all": {}
  }
}

Kafka를 통해 전달된 로그 데이터가 Elasticsearch에 정상적으로 저장되었는지 확인.


Logstash 및 Beats 활용 정리

기능 설명
Logstash 다양한 데이터 소스를 수집하여 Elasticsearch로 전달
Filebeat 로그 파일을 모니터링하고 전송
Metricbeat 시스템 메트릭(CPU, 메모리 등) 수집
Heartbeat 서비스 및 네트워크 상태 모니터링
Kafka 연동 대용량 로그를 Kafka → Elasticsearch로 스트리밍

 

+ Recent posts