Logstash 및 Beats 활용 (ELK Stack 구성)
📌 ELK Stack은 Elasticsearch, Logstash, Kibana를 포함하는 로그 및 데이터 분석 솔루션입니다.
- Logstash: 다양한 데이터 소스로부터 데이터를 수집 및 변환하여 Elasticsearch에 전달
- Beats: 경량 데이터 수집기(Filebeat, Metricbeat, Heartbeat)
- Kafka 연동: 대용량 데이터 스트리밍을 위한 Apache Kafka와 Elasticsearch 연결
Logstash를 이용한 데이터 수집 및 변환
Logstash는 다양한 소스로부터 데이터를 수집하여 필터링, 변환 후 Elasticsearch로 전송하는 도구입니다.
✅ (1) Logstash 설치
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.x.x-linux-x86_64.tar.gz
tar -xzf logstash-8.x.x-linux-x86_64.tar.gz
cd logstash-8.x.x-linux-x86_64
✅ Logstash 다운로드 및 압축 해제 후 실행 준비.
✅ (2) Logstash 기본 구성 (logstash.conf)
Logstash의 설정 파일은 Input → Filter → Output 구조를 가집니다.
input {
file {
path => "/var/log/syslog"
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:host} %{GREEDYDATA:log}" }
}
date {
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "syslog-%{+YYYY.MM.dd}"
}
}
✅ 파일 로그(/var/log/syslog)를 읽고 변환하여 Elasticsearch에 저장.
✅ grok을 사용하여 timestamp, host, log 필드로 분리.
✅ 데이터를 날짜별 인덱스로 저장 (syslog-YYYY.MM.dd).
✅ (3) Logstash 실행
bin/logstash -f logstash.conf
✅ Logstash 실행 후 데이터를 Elasticsearch로 전송.
✅ (4) Elasticsearch에서 데이터 확인
GET syslog-*/_search
{
"query": {
"match_all": {}
}
}
✅ Elasticsearch에서 Logstash를 통해 수집된 로그 데이터 조회.
Beats 활용 (Filebeat, Metricbeat, Heartbeat)
Beats는 경량 데이터 수집기로, 개별 서버에서 실행되며 Logstash 또는 Elasticsearch로 데이터를 전송할 수 있습니다.
✅ (1) Filebeat - 로그 파일 수집
Filebeat는 파일 로그를 모니터링하고 전송하는 경량 데이터 수집기입니다.
📌 Filebeat 설정 (filebeat.yml)
filebeat.inputs:
- type: log
paths:
- /var/log/syslog
output.elasticsearch:
hosts: ["http://localhost:9200"]
index: "filebeat-%{+YYYY.MM.dd}"
✅ 파일 로그(/var/log/syslog)를 모니터링하고 Elasticsearch에 저장.
✅ (2) Metricbeat - 시스템 메트릭 수집
Metricbeat는 CPU, 메모리, 네트워크 등의 시스템 메트릭을 수집합니다.
📌 Metricbeat 설정 (metricbeat.yml)
metricbeat.modules:
- module: system
metricsets:
- cpu
- memory
- network
enabled: true
period: 10s
output.elasticsearch:
hosts: ["http://localhost:9200"]
index: "metricbeat-%{+YYYY.MM.dd}"
✅ CPU, 메모리, 네트워크 메트릭을 10초마다 수집하여 Elasticsearch에 저장.
✅ (3) Heartbeat - 서비스 상태 모니터링
Heartbeat는 웹 서비스 및 TCP, ICMP 등의 연결 상태를 모니터링합니다.
📌 Heartbeat 설정 (heartbeat.yml)
heartbeat.monitors:
- type: http
schedule: '@every 10s'
urls:
- "http://localhost:5601"
- "http://localhost:9200"
output.elasticsearch:
hosts: ["http://localhost:9200"]
index: "heartbeat-%{+YYYY.MM.dd}"
✅ Elasticsearch와 Kibana 상태를 10초마다 체크하여 Elasticsearch에 저장.
Kafka와 Elasticsearch 연동
대량의 로그 및 이벤트 데이터를 실시간으로 처리할 때 Kafka를 중간 버퍼로 사용하여 Elasticsearch로 전송할 수 있습니다.
✅ (1) Kafka 설치 및 실행
wget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0
# Zookeeper 실행
bin/zookeeper-server-start.sh config/zookeeper.properties
# Kafka 실행
bin/kafka-server-start.sh config/server.properties
✅ Zookeeper와 Kafka를 실행하여 메시지 큐 준비.
✅ (2) Kafka에 로그 데이터 전송
bin/kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic logs --bootstrap-server localhost:9092
✅ Kafka 토픽(logs)을 생성하고 로그 메시지를 입력하여 테스트.
✅ (3) Logstash를 사용하여 Kafka → Elasticsearch 데이터 전송
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["logs"]
group_id => "logstash-group"
}
}
filter {
json {
source => "message"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "kafka-logs-%{+YYYY.MM.dd}"
}
}
✅ Kafka에서 데이터를 읽어 Elasticsearch로 저장.
✅ (4) 데이터 확인
GET kafka-logs-*/_search
{
"query": {
"match_all": {}
}
}
✅ Kafka를 통해 전달된 로그 데이터가 Elasticsearch에 정상적으로 저장되었는지 확인.
Logstash 및 Beats 활용 정리
기능 |
설명 |
Logstash |
다양한 데이터 소스를 수집하여 Elasticsearch로 전달 |
Filebeat |
로그 파일을 모니터링하고 전송 |
Metricbeat |
시스템 메트릭(CPU, 메모리 등) 수집 |
Heartbeat |
서비스 및 네트워크 상태 모니터링 |
Kafka 연동 |
대용량 로그를 Kafka → Elasticsearch로 스트리밍 |