目录
数据收集概述
数据收集架构
graph TB
subgraph "数据源层"
A1[Web服务器日志]
A2[应用程序日志]
A3[系统指标]
A4[数据库日志]
A5[网络数据包]
A6[安全事件]
end
subgraph "收集层"
B1[Filebeat]
B2[Metricbeat]
B3[Packetbeat]
B4[Winlogbeat]
B5[Auditbeat]
B6[Heartbeat]
end
subgraph "处理层"
C1[Logstash Pipeline 1]
C2[Logstash Pipeline 2]
C3[Logstash Pipeline 3]
end
subgraph "存储层"
D[Elasticsearch]
end
A1 --> B1
A2 --> B1
A3 --> B2
A4 --> B1
A5 --> B3
A6 --> B5
B1 --> C1
B2 --> C2
B3 --> C3
B4 --> C1
B5 --> C3
B6 --> C2
C1 --> D
C2 --> D
C3 --> D
数据流处理模式
1. 直接模式 (Direct):
Beats → Elasticsearch
2. 缓冲模式 (Buffered):
Beats → Logstash → Elasticsearch
3. 队列模式 (Queued):
Beats → Message Queue → Logstash → Elasticsearch
4. 混合模式 (Hybrid):
Beats → Logstash → Message Queue → Logstash → Elasticsearch
Beats数据收集
1. Filebeat详细配置
基础配置示例:
# filebeat.yml
filebeat.inputs:
# Web服务器日志
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/nginx/error.log
fields:
log_type: nginx
environment: production
fields_under_root: true
multiline.pattern: '^\d{4}/\d{2}/\d{2}'
multiline.negate: true
multiline.match: after
exclude_lines: ['^DEBUG']
include_lines: ['^ERR', '^WARN', '^INFO']
# 应用程序日志
- type: log
enabled: true
paths:
- /var/log/app/*.log
fields:
log_type: application
service: web-api
fields_under_root: true
json.keys_under_root: true
json.add_error_key: true
json.message_key: message
# 容器日志
- type: container
enabled: true
paths:
- '/var/lib/docker/containers/*/*.log'
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
# 模块配置
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
# 处理器配置
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
# 输出配置
output.logstash:
hosts: ["logstash1:5044", "logstash2:5044"]
loadbalance: true
compression_level: 3
# 日志配置
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
模块配置示例:
# modules.d/nginx.yml
- module: nginx
access:
enabled: true
var.paths: ["/var/log/nginx/access.log*"]
error:
enabled: true
var.paths: ["/var/log/nginx/error.log*"]
# modules.d/apache.yml
- module: apache
access:
enabled: true
var.paths: ["/var/log/apache2/access.log*"]
error:
enabled: true
var.paths: ["/var/log/apache2/error.log*"]
# modules.d/mysql.yml
- module: mysql
error:
enabled: true
var.paths: ["/var/log/mysql/error.log*"]
slowlog:
enabled: true
var.paths: ["/var/log/mysql/mysql-slow.log*"]
2. Metricbeat系统监控
# metricbeat.yml
metricbeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
# 系统模块
metricbeat.modules:
# 系统指标
- module: system
metricsets:
- cpu
- load
- memory
- network
- process
- process_summary
- socket_summary
- filesystem
- fsstat
- diskio
enabled: true
period: 10s
processes: ['.*']
cpu.metrics: ["percentages", "normalized_percentages"]
core.metrics: ["percentages"]
# Docker监控
- module: docker
metricsets:
- container
- cpu
- diskio
- healthcheck
- info
- memory
- network
hosts: ["unix:///var/run/docker.sock"]
period: 10s
enabled: true
# Kubernetes监控
- module: kubernetes
metricsets:
- node
- system
- pod
- container
- volume
period: 10s
host: ${NODE_NAME}
hosts: ["https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}"]
# 处理器
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
# 输出
output.elasticsearch:
hosts: ["elasticsearch1:9200", "elasticsearch2:9200"]
index: "metricbeat-%{+yyyy.MM.dd}"
template.settings:
index.number_of_shards: 1
index.codec: best_compression
3. Packetbeat网络监控
# packetbeat.yml
packetbeat.interfaces.device: any
packetbeat.interfaces.snaplen: 1514
packetbeat.interfaces.type: af_packet
packetbeat.interfaces.buffer_size_mb: 100
# 协议配置
packetbeat.protocols:
# HTTP监控
- type: http
ports: [80, 8080, 8000, 5000, 8002]
hide_keywords: ["pass", "password", "passwd"]
send_headers: true
send_all_headers: true
split_cookie: true
real_ip_header: "X-Forwarded-For"
# MySQL监控
- type: mysql
ports: [3306]
max_rows: 10
max_row_length: 1024
# Redis监控
- type: redis
ports: [6379]
# DNS监控
- type: dns
ports: [53]
include_authorities: true
include_additionals: true
# TLS监控
- type: tls
ports: [443, 993, 995, 5223, 8443, 8883, 9243]
# 流配置
packetbeat.flows:
timeout: 30s
period: 10s
# 处理器
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- detect_mime_type:
field: http.request.body.content
target: http.request.mime_type
# 输出
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "packetbeat-%{+yyyy.MM.dd}"
Logstash数据处理
1. 输入插件配置
Beats输入:
input {
beats {
port => 5044
host => "0.0.0.0"
client_inactivity_timeout => 300
include_codec_tag => false
}
}
文件输入:
input {
file {
path => "/var/log/app/*.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
tags => ["application", "multiline"]
}
}
TCP/UDP输入:
input {
tcp {
port => 5000
codec => json_lines
tags => ["tcp", "json"]
}
udp {
port => 5001
codec => plain
tags => ["udp", "syslog"]
}
}
HTTP输入:
input {
http {
host => "0.0.0.0"
port => 8080
codec => json
additional_codecs => {
"application/json" => "json"
"text/plain" => "plain"
}
tags => ["http", "webhook"]
}
}
2. 过滤器插件详解
Grok解析:
filter {
if [log_type] == "nginx" {
grok {
match => {
"message" => "%{NGINXACCESS}"
}
patterns_dir => ["/etc/logstash/patterns"]
tag_on_failure => ["_grokparsefailure"]
}
}
if [log_type] == "apache" {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
# 自定义Grok模式
if [log_type] == "application" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:log_message}"
}
}
}
}
日期解析:
filter {
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
timezone => "UTC"
}
# 多种日期格式
date {
match => [
"log_timestamp",
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"MMM dd HH:mm:ss"
]
}
}
字段操作:
filter {
mutate {
# 添加字段
add_field => {
"environment" => "production"
"processed_by" => "logstash"
}
# 重命名字段
rename => {
"host" => "hostname"
"message" => "original_message"
}
# 删除字段
remove_field => [ "beat", "input", "prospector" ]
# 类型转换
convert => {
"response_time" => "float"
"status_code" => "integer"
"bytes_sent" => "integer"
}
# 字符串操作
lowercase => [ "method", "protocol" ]
uppercase => [ "log_level" ]
strip => [ "user_agent" ]
# 分割字段
split => { "tags" => "," }
# 合并字段
merge => { "all_errors" => "error_details" }
}
}
条件处理:
filter {
if [log_type] == "nginx" {
if [status] >= 400 {
mutate {
add_tag => [ "error" ]
}
}
if [status] >= 500 {
mutate {
add_tag => [ "server_error" ]
}
}
}
# 复杂条件
if [log_type] == "application" and [level] == "ERROR" {
mutate {
add_field => { "alert_required" => "true" }
}
}
# 正则表达式匹配
if [user_agent] =~ /bot|crawler|spider/i {
mutate {
add_tag => [ "bot_traffic" ]
}
}
}
数据解析与转换
1. JSON数据处理
filter {
# JSON解析
if [log_type] == "application" {
json {
source => "message"
target => "parsed"
skip_on_invalid_json => true
}
# 提取JSON字段
if [parsed] {
mutate {
add_field => {
"app_name" => "%{[parsed][application]}"
"log_level" => "%{[parsed][level]}"
"thread_name" => "%{[parsed][thread]}"
}
}
}
}
}
2. XML数据处理
filter {
xml {
source => "message"
target => "parsed_xml"
xpath => [
"/log/level/text()", "log_level",
"/log/message/text()", "log_message",
"/log/@timestamp", "log_timestamp"
]
}
}
3. CSV数据处理
filter {
csv {
source => "message"
separator => ","
columns => [ "timestamp", "level", "component", "message", "thread" ]
skip_empty_columns => true
}
}
4. 地理位置解析
filter {
# GeoIP解析
geoip {
source => "client_ip"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
# 坐标转换
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
5. 用户代理解析
filter {
useragent {
source => "user_agent"
target => "ua"
prefix => "ua_"
}
}
数据路由与过滤
1. 多管道配置
pipelines.yml配置:
# pipelines.yml
- pipeline.id: web-logs
path.config: "/etc/logstash/conf.d/web-*.conf"
pipeline.workers: 4
pipeline.batch.size: 1000
- pipeline.id: app-logs
path.config: "/etc/logstash/conf.d/app-*.conf"
pipeline.workers: 2
pipeline.batch.size: 500
- pipeline.id: security-logs
path.config: "/etc/logstash/conf.d/security-*.conf"
pipeline.workers: 1
pipeline.batch.size: 100
queue.type: persisted
queue.max_bytes: 1gb
2. 条件路由
# 基于日志类型路由
output {
if [log_type] == "nginx" {
elasticsearch {
hosts => ["es-web:9200"]
index => "nginx-logs-%{+YYYY.MM.dd}"
}
}
if [log_type] == "application" {
elasticsearch {
hosts => ["es-app:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}
if [log_type] == "security" {
elasticsearch {
hosts => ["es-security:9200"]
index => "security-logs-%{+YYYY.MM.dd}"
}
}
# 错误日志额外处理
if [level] == "ERROR" or [status] >= 500 {
email {
to => "admin@example.com"
subject => "Critical Error Detected"
body => "Error: %{message}"
}
}
}
3. 数据过滤
filter {
# 丢弃调试日志
if [level] == "DEBUG" {
drop { }
}
# 丢弃健康检查请求
if [request] =~ /\/health|\/_status/ {
drop { }
}
# 丢弃静态资源请求
if [request] =~ /\.(css|js|png|jpg|gif|ico)$/ {
drop { }
}
# 采样处理
if [log_type] == "debug" {
if [sample_rate] {
ruby {
code => "
if rand() > event.get('sample_rate').to_f
event.cancel
end
"
}
}
}
}
性能优化
1. 管道优化
# logstash.yml性能配置
pipeline.workers: 8
pipeline.batch.size: 2000
pipeline.batch.delay: 50
# 队列配置
queue.type: persisted
queue.max_bytes: 2gb
queue.checkpoint.writes: 1024
# JVM配置
config.reload.automatic: true
config.reload.interval: 3s
# 监控配置
monitoring.enabled: true
monitoring.elasticsearch.hosts: ["http://localhost:9200"]
2. 内存优化
# jvm.options优化
-Xms4g
-Xmx4g
-XX:+UseG1GC
-XX:+UseStringDeduplication
-XX:+AlwaysPreTouch
3. 输出优化
output {
elasticsearch {
hosts => ["es1:9200", "es2:9200", "es3:9200"]
index => "logs-%{+YYYY.MM.dd}"
# 批量配置
flush_size => 1000
idle_flush_time => 1
# 连接池配置
pool_max => 1000
pool_max_per_route => 100
# 重试配置
retry_max_interval => 5
retry_max_items => 5000
# 模板配置
template_overwrite => true
template => "/etc/logstash/templates/logs-template.json"
template_name => "logs"
}
}
监控与调试
1. 管道监控
# 查看管道状态
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# 查看热线程
curl -X GET "localhost:9600/_node/hot_threads?pretty"
# 查看插件信息
curl -X GET "localhost:9600/_node/plugins?pretty"
2. 性能监控脚本
#!/bin/bash
# logstash-monitor.sh
LOGSTASH_API="http://localhost:9600"
# 获取管道统计
get_pipeline_stats() {
echo "Pipeline Statistics:"
curl -s "$LOGSTASH_API/_node/stats/pipelines" | jq -r '
.pipelines | to_entries[] |
"Pipeline: \(.key)" +
"\n Events In: \(.value.events.in)" +
"\n Events Out: \(.value.events.out)" +
"\n Events Filtered: \(.value.events.filtered)" +
"\n Duration (ms): \(.value.events.duration_in_millis)" +
"\n Queue Push Duration (ms): \(.value.events.queue_push_duration_in_millis)\n"
'
}
# 获取JVM统计
get_jvm_stats() {
echo "JVM Statistics:"
curl -s "$LOGSTASH_API/_node/stats/jvm" | jq -r '
"Heap Used: \(.jvm.mem.heap_used_percent)%" +
"\nHeap Max: \(.jvm.mem.heap_max_in_bytes / 1024 / 1024 | floor)MB" +
"\nGC Count: \(.jvm.gc.collectors.old.collection_count)" +
"\nGC Time: \(.jvm.gc.collectors.old.collection_time_in_millis)ms"
'
}
# 主函数
main() {
echo "Logstash Monitoring - $(date)"
echo "================================"
get_pipeline_stats
echo
get_jvm_stats
echo "================================"
}
main
3. 调试配置
# 调试输出
output {
stdout {
codec => rubydebug {
metadata => true
}
}
# 文件输出用于调试
file {
path => "/var/log/logstash/debug-%{+YYYY.MM.dd}.log"
codec => json_lines
}
}
实战案例
1. Web服务器日志分析
完整配置示例:
# web-logs.conf
input {
beats {
port => 5044
type => "web"
}
}
filter {
if [fields][log_type] == "nginx" {
grok {
match => {
"message" => "%{NGINXACCESS}"
}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
target => "geoip"
}
useragent {
source => "agent"
target => "useragent"
}
mutate {
convert => {
"response" => "integer"
"bytes" => "integer"
"responsetime" => "float"
}
}
# 添加响应时间分类
if [responsetime] {
if [responsetime] < 0.1 {
mutate { add_field => { "response_category" => "fast" } }
} else if [responsetime] < 1.0 {
mutate { add_field => { "response_category" => "normal" } }
} else {
mutate { add_field => { "response_category" => "slow" } }
}
}
# 添加状态码分类
if [response] {
if [response] >= 200 and [response] < 300 {
mutate { add_field => { "status_category" => "success" } }
} else if [response] >= 300 and [response] < 400 {
mutate { add_field => { "status_category" => "redirect" } }
} else if [response] >= 400 and [response] < 500 {
mutate { add_field => { "status_category" => "client_error" } }
} else if [response] >= 500 {
mutate { add_field => { "status_category" => "server_error" } }
}
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
template_overwrite => true
template => "/etc/logstash/templates/nginx-template.json"
template_name => "nginx"
}
# 错误日志告警
if [response] >= 500 {
email {
to => "ops@example.com"
subject => "Server Error Alert"
body => "Server error detected: %{message}"
}
}
}
2. 应用程序日志处理
# app-logs.conf
input {
beats {
port => 5045
type => "application"
}
}
filter {
if [fields][log_type] == "application" {
# 多行日志处理
multiline {
pattern => "^\d{4}-\d{2}-\d{2}"
negate => true
what => "previous"
}
# JSON日志解析
if [message] =~ /^\{/ {
json {
source => "message"
skip_on_invalid_json => true
}
} else {
# 结构化日志解析
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] \[%{DATA:thread}\] %{DATA:logger} - %{GREEDYDATA:log_message}"
}
}
}
# 异常堆栈处理
if [log_message] =~ /Exception|Error/ {
mutate {
add_tag => [ "exception" ]
}
# 提取异常类型
grok {
match => {
"log_message" => "%{JAVACLASS:exception_class}"
}
tag_on_failure => []
}
}
# 性能日志处理
if [logger] =~ /performance/ {
grok {
match => {
"log_message" => "Method: %{DATA:method_name}, Duration: %{NUMBER:duration:float}ms"
}
}
if [duration] {
if [duration] > 1000 {
mutate { add_tag => [ "slow_method" ] }
}
}
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "application-logs-%{+YYYY.MM.dd}"
}
# 异常告警
if "exception" in [tags] {
slack {
url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
channel => "#alerts"
username => "logstash"
text => "Exception detected: %{exception_class} - %{log_message}"
}
}
}
3. 安全日志分析
# security-logs.conf
input {
beats {
port => 5046
type => "security"
}
}
filter {
if [fields][log_type] == "auth" {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:log_message}"
}
}
# SSH登录分析
if [program] == "sshd" {
if [log_message] =~ /Failed password/ {
grok {
match => {
"log_message" => "Failed password for %{DATA:username} from %{IP:source_ip} port %{INT:source_port}"
}
}
mutate { add_tag => [ "ssh_failed_login" ] }
}
if [log_message] =~ /Accepted password/ {
grok {
match => {
"log_message" => "Accepted password for %{DATA:username} from %{IP:source_ip} port %{INT:source_port}"
}
}
mutate { add_tag => [ "ssh_successful_login" ] }
}
}
# 地理位置分析
if [source_ip] {
geoip {
source => "source_ip"
target => "geoip"
}
}
# 威胁检测
if "ssh_failed_login" in [tags] {
# 检查是否为已知恶意IP
translate {
source => "source_ip"
target => "threat_level"
dictionary_path => "/etc/logstash/threat_ips.yml"
fallback => "unknown"
}
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "security-logs-%{+YYYY.MM.dd}"
}
# 安全告警
if "ssh_failed_login" in [tags] {
# 发送到SIEM系统
http {
url => "https://siem.example.com/api/alerts"
http_method => "post"
format => "json"
headers => {
"Authorization" => "Bearer YOUR_TOKEN"
}
}
}
}
总结
本章详细介绍了ELK Stack的数据收集与处理,包括:
核心要点
- 数据收集: Beats系列工具的配置和使用
- 数据处理: Logstash的输入、过滤、输出配置
- 数据解析: 各种数据格式的解析和转换
- 数据路由: 基于条件的数据路由和过滤
- 性能优化: 管道优化、内存调优、批量处理
- 监控调试: 性能监控、问题诊断、调试技巧
最佳实践
- 合理设计数据收集架构
- 优化Grok模式以提高解析性能
- 使用条件语句减少不必要的处理
- 配置适当的批量大小和工作线程
- 建立完善的监控和告警机制
- 定期清理和优化配置文件
下一章我们将学习Elasticsearch的索引管理与查询优化。