ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • LogStash
    Tool 2022. 2. 24. 11:07
    반응형

    Log Stash

    • 실시간 파이프라인 기능을 가진 오픈소스 데이터 수집 엔진
    • 서로 다른 소스의 데이터를 탄력적으로 통합하고 사용자가 선택한 목적지로 데이터를 정규화하도록 도움
    • 다양한 입력, 필터, 출력 플러그인을 통해 다양한 유형의 이벤트를 수집
      • Apache 및 Application 로그 (log4J 등), Syslog, windows 이벤트 로그, 네트워킹 및 방화벽 로그 등을 쉽게 수집
    • Filebeat 와 연계하여 보충적인 보안 로그 전달 기능 활용 Ganglia, collectd, NetFlow, JMX, 기타 여러 인프라 및 애플리케이션 플랫폼의 메트릭을 TCPUDP를 통해 수집

    작동 방식 및 플러그인

    • Logstash 이벤트 처리 파이프라인에는 입력 → 필터 → 출력 세 단계
    • Input
      • file
        • Unix 명령어인 tail -0F와 유사하게 filesystem의 파일을 읽을 때 사용
      • syslog
        • 514 포트에서 syslog 메시지를 수신 대기하고 RFC3164 형식에 따라 구문 분석
      • redis
        • redis 서버에서 읽어오며 redis channels와 redis lists를 사용
      • beats
        • Beats에서 보낸 이벤트를 처리
      • http
        • Http나 Https로 부터 이벤트를 수신
      • udp
        • UDP를 통해 이벤트를 수신
      • azure_envent_hubs, cloudwatch(AWS), es(elasticsearch), exec 등 이외에도 다양한 플러그인 존재
    • Filter
      • grok
        • 임의의 텍스트를 분석하고 구조화하는 플러그인
        • 현재 Logstash에서 구조화하고 queryable하는 가장 좋은 방법 (Logstash 필터의 가장 기본적인 요소)
          • Logstash에 120개의 패턴이 내장되어 있어 필요에 맞는 패턴을 찾을 가능성이 크기 때문
      • kv
        • Key-Value 타입으로 이벤트를 파싱해주는 플러그인
      • mutate
        • 이벤트 필드에 대한 일반적인 변환 작업을 해주는 플러그인
          • 이벤트의 필드 이름 수정, 제거 등 수행
      • drop
        • 디버그 이벤트와 같은 이벤트를 삭제하는 플러그인
      • clone
        • 이벤트의 복사본을 만들고 필드를 추가하거나 제거하는 플러그인
      • ruby
        • ruby 코드를 사용하여 명령을 할 수 있도록 도와주는 플러그인
      • geoip, bytes, csv, date, json, elasticsearch, http 등 이외에도 다양한 플러그인 존재
    • Output
      • elasticsearch
        • es에 이벤트 데이터를 전송하는 플러그인
      • file
        • 이벤트 데이터를 디스크의 file에 쓰는 플러그인
      • email
        • email을 보내주는 플러그인
        • 발신자와 수신자, 제목, mailserver 주소, 통신 방식 등을 함께 넘겨 메일링
      • csv, cloudwatch, elasticsearch, email, http, kafka 등 이외에도 다양한 플러그인 존재
    • Codecs
      • 입출력의 일부로 동작가능한 stream filter로 메시지 전송과 serialization 프로세스를 분리하여 사용 가능
      • json, masgpack, plain text가 가장 널리 사용됨

    특징

    • Java (JVM) 11 or 17 을 이용하여 구동
    • Elasticsearch 및 Kibana 시너지 효과
    • 수평 확장이 가능한 데이터 처리 파이프라인
    • 플러그형 파이프라인 아키텍처
    • 패턴 일치, 지리적 맵핑, 동적 조회 기능과 함께 여러 집계 및 변이 기능을 즉시 사용할 수 있음
    • 보관(Stash) 선택 (Output)
      • 데이터를 저장, 분석하고 그에 대한 작업을 수행
      • File, 로그 전송 등

    파이프라인 (Pipe Line)

    • Logstash를 이용할 때, 동일한 Logstash 인스턴스 내에서 다중 파이프라인을 사용할 수 있음
      • 다른 인스턴스의 Logstash의 통신을 설정해야하는 경우에는 Kafka 또는 Redis 같은 중계 queue를 사용하거나 Lumberjack 출력을 Beats 입력에 연결하여 사용해야함
    •  

    사용 예시

    • 일반적인 설정 (하나의 configuration을 사용하는 예시)
    input {
      udp {
        port => 1234
        workers => 10
        queue_size => 25000
        add_field => { "@log_type" => "firewall"}
        type => "firewall"
      }
      http {
          type => "l7check"
      }
    }
    
    filter {
      if [type] == "firewall" {
          ruby {
            code => "event.set('@kst_date', event.timestamp.time.localtime('+09:00').strftime('%Y-%m-%d'))"
          }
          grok {
              pattern_definitions => { "CUSTOM_DATE" => "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}" }
              match => { "message" => "<%{NUMBER:pid}>date=%{CUSTOM_DATE}" }
          }
          kv {
              source => "message"
              allow_duplicate_values => false
              include_keys => ["memberNo", "name", "age"]
          }
          mutate {
              rename => {"name" => "memberName"}
              remove_filed => ["telephone"]
          }
      } else if [type] == "l7Check" {
          do Sth..
      }
    }
    
    output {
      if [type] == "firewall" {
        file {
              path => "logs/firewall-%{@kst_date}.log"
            codec => line { format => "Date : [%{@kst_timestamp}], Cause : %{tags}, Message : %{message}"}
          }
      } else if [type] == "l7Check" {
          do Sth..
      }
    }
    • 다중 파이프라인을 사용하는 예시
    input {
      udp {
        port => 7010
        workers => 10
        queue_size => 25000
        add_field => { "@log_type" => "firewall"}
        type => "firewall"
      }
      http {
          type => "l7check"
      }
    }
    
    filter {
      if [type] == "firewall" {
          ruby {
            code => "event.set('@kst_date', event.timestamp.time.localtime('+09:00').strftime('%Y-%m-%d'))"
          }
          grok {
              pattern_definitions => { "CUSTOM_DATE" => "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}" }
              match => { "message" => "<%{NUMBER:pid}>date=%{CUSTOM_DATE}" }
          }
          kv {
              source => "message"
              allow_duplicate_values => false
              include_keys => ["memberNo", "name", "age"]
          }
          mutate {
              rename => {"name" => "memberName"}
              remove_filed => ["telephone"]
          }
      } else if [type] == "l7Check" {
          do Sth..
      }
    }
    
    output {
      if [type] == "firewall" {
        pipeline {
            send_to => "file_pipeline"
        }
      } else if [type] == "l7Check" {
          do Sth..
      }
    }
    input {
        pipeline {
            address => "file_pipeline"
        }
    }
    
    output {
        file {
              path => "logs/firewall-%{@kst_date}.log"
            codec => line { format => "Date : [%{@kst_timestamp}], Cause : %{tags}, Message : %{message}"}
          }
    }
    
    • send_to
      • 전달하고자 하는 pipeline의 주소를 적어준다. 이 때, 전달하려는 주소는 받는 곳의 address와 같음
    • address
      • pipeline이 연결될 주소 (자신의 주소)
    - pipeline.id: main_pipeline
      queue.type: persisted
      pipeline.workers: 1
      pipeline.batch.size: 1
      path.config: "./logstash.conf"
    - pipeline.id: file_pipeline
      path.config: "./logstash-file.conf"
    • 다중 파이프라인을 사용하기 위해서는 pipeline.yml 파일에서 pipeline id와 config 위치를 설정해주어야 함

    Reference

    https://www.elastic.co/guide/en/logstash/current/index.html

    반응형

    댓글

Designed by Tistory.