Tool

LogStash

Zin0_0 2022. 2. 24. 11:07
반응형

Log Stash

  • 실시간 파이프라인 기능을 가진 오픈소스 데이터 수집 엔진
  • 서로 다른 소스의 데이터를 탄력적으로 통합하고 사용자가 선택한 목적지로 데이터를 정규화하도록 도움
  • 다양한 입력, 필터, 출력 플러그인을 통해 다양한 유형의 이벤트를 수집
    • Apache 및 Application 로그 (log4J 등), Syslog, windows 이벤트 로그, 네트워킹 및 방화벽 로그 등을 쉽게 수집
  • Filebeat 와 연계하여 보충적인 보안 로그 전달 기능 활용 Ganglia, collectd, NetFlow, JMX, 기타 여러 인프라 및 애플리케이션 플랫폼의 메트릭을 TCPUDP를 통해 수집

작동 방식 및 플러그인

  • Logstash 이벤트 처리 파이프라인에는 입력 → 필터 → 출력 세 단계
  • Input
    • file
      • Unix 명령어인 tail -0F와 유사하게 filesystem의 파일을 읽을 때 사용
    • syslog
      • 514 포트에서 syslog 메시지를 수신 대기하고 RFC3164 형식에 따라 구문 분석
    • redis
      • redis 서버에서 읽어오며 redis channels와 redis lists를 사용
    • beats
      • Beats에서 보낸 이벤트를 처리
    • http
      • Http나 Https로 부터 이벤트를 수신
    • udp
      • UDP를 통해 이벤트를 수신
    • azure_envent_hubs, cloudwatch(AWS), es(elasticsearch), exec 등 이외에도 다양한 플러그인 존재
  • Filter
    • grok
      • 임의의 텍스트를 분석하고 구조화하는 플러그인
      • 현재 Logstash에서 구조화하고 queryable하는 가장 좋은 방법 (Logstash 필터의 가장 기본적인 요소)
        • Logstash에 120개의 패턴이 내장되어 있어 필요에 맞는 패턴을 찾을 가능성이 크기 때문
    • kv
      • Key-Value 타입으로 이벤트를 파싱해주는 플러그인
    • mutate
      • 이벤트 필드에 대한 일반적인 변환 작업을 해주는 플러그인
        • 이벤트의 필드 이름 수정, 제거 등 수행
    • drop
      • 디버그 이벤트와 같은 이벤트를 삭제하는 플러그인
    • clone
      • 이벤트의 복사본을 만들고 필드를 추가하거나 제거하는 플러그인
    • ruby
      • ruby 코드를 사용하여 명령을 할 수 있도록 도와주는 플러그인
    • geoip, bytes, csv, date, json, elasticsearch, http 등 이외에도 다양한 플러그인 존재
  • Output
    • elasticsearch
      • es에 이벤트 데이터를 전송하는 플러그인
    • file
      • 이벤트 데이터를 디스크의 file에 쓰는 플러그인
    • email
      • email을 보내주는 플러그인
      • 발신자와 수신자, 제목, mailserver 주소, 통신 방식 등을 함께 넘겨 메일링
    • csv, cloudwatch, elasticsearch, email, http, kafka 등 이외에도 다양한 플러그인 존재
  • Codecs
    • 입출력의 일부로 동작가능한 stream filter로 메시지 전송과 serialization 프로세스를 분리하여 사용 가능
    • json, masgpack, plain text가 가장 널리 사용됨

특징

  • Java (JVM) 11 or 17 을 이용하여 구동
  • Elasticsearch 및 Kibana 시너지 효과
  • 수평 확장이 가능한 데이터 처리 파이프라인
  • 플러그형 파이프라인 아키텍처
  • 패턴 일치, 지리적 맵핑, 동적 조회 기능과 함께 여러 집계 및 변이 기능을 즉시 사용할 수 있음
  • 보관(Stash) 선택 (Output)
    • 데이터를 저장, 분석하고 그에 대한 작업을 수행
    • File, 로그 전송 등

파이프라인 (Pipe Line)

  • Logstash를 이용할 때, 동일한 Logstash 인스턴스 내에서 다중 파이프라인을 사용할 수 있음
    • 다른 인스턴스의 Logstash의 통신을 설정해야하는 경우에는 Kafka 또는 Redis 같은 중계 queue를 사용하거나 Lumberjack 출력을 Beats 입력에 연결하여 사용해야함
  •  

사용 예시

  • 일반적인 설정 (하나의 configuration을 사용하는 예시)
input {
  udp {
    port => 1234
    workers => 10
    queue_size => 25000
    add_field => { "@log_type" => "firewall"}
    type => "firewall"
  }
  http {
      type => "l7check"
  }
}

filter {
  if [type] == "firewall" {
      ruby {
        code => "event.set('@kst_date', event.timestamp.time.localtime('+09:00').strftime('%Y-%m-%d'))"
      }
      grok {
          pattern_definitions => { "CUSTOM_DATE" => "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}" }
          match => { "message" => "<%{NUMBER:pid}>date=%{CUSTOM_DATE}" }
      }
      kv {
          source => "message"
          allow_duplicate_values => false
          include_keys => ["memberNo", "name", "age"]
      }
      mutate {
          rename => {"name" => "memberName"}
          remove_filed => ["telephone"]
      }
  } else if [type] == "l7Check" {
      do Sth..
  }
}

output {
  if [type] == "firewall" {
    file {
          path => "logs/firewall-%{@kst_date}.log"
        codec => line { format => "Date : [%{@kst_timestamp}], Cause : %{tags}, Message : %{message}"}
      }
  } else if [type] == "l7Check" {
      do Sth..
  }
}
  • 다중 파이프라인을 사용하는 예시
input {
  udp {
    port => 7010
    workers => 10
    queue_size => 25000
    add_field => { "@log_type" => "firewall"}
    type => "firewall"
  }
  http {
      type => "l7check"
  }
}

filter {
  if [type] == "firewall" {
      ruby {
        code => "event.set('@kst_date', event.timestamp.time.localtime('+09:00').strftime('%Y-%m-%d'))"
      }
      grok {
          pattern_definitions => { "CUSTOM_DATE" => "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}" }
          match => { "message" => "<%{NUMBER:pid}>date=%{CUSTOM_DATE}" }
      }
      kv {
          source => "message"
          allow_duplicate_values => false
          include_keys => ["memberNo", "name", "age"]
      }
      mutate {
          rename => {"name" => "memberName"}
          remove_filed => ["telephone"]
      }
  } else if [type] == "l7Check" {
      do Sth..
  }
}

output {
  if [type] == "firewall" {
    pipeline {
        send_to => "file_pipeline"
    }
  } else if [type] == "l7Check" {
      do Sth..
  }
}
input {
    pipeline {
        address => "file_pipeline"
    }
}

output {
    file {
          path => "logs/firewall-%{@kst_date}.log"
        codec => line { format => "Date : [%{@kst_timestamp}], Cause : %{tags}, Message : %{message}"}
      }
}
  • send_to
    • 전달하고자 하는 pipeline의 주소를 적어준다. 이 때, 전달하려는 주소는 받는 곳의 address와 같음
  • address
    • pipeline이 연결될 주소 (자신의 주소)
- pipeline.id: main_pipeline
  queue.type: persisted
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./logstash.conf"
- pipeline.id: file_pipeline
  path.config: "./logstash-file.conf"
  • 다중 파이프라인을 사용하기 위해서는 pipeline.yml 파일에서 pipeline id와 config 위치를 설정해주어야 함

Reference

https://www.elastic.co/guide/en/logstash/current/index.html

반응형