EFK + Kafka 日志收集

[TOC]

方案

  1. 部署 FluentBit 从集群每个节点采集日志
  2. 推送 kafka 削峰并保存
  3. 部署 FileBeat 从 kafka 消费日志
  4. FileBeat 发送日志到 ElasticSearch 并保存
  5. 部署 Kibana 展示 ElasticSearch 数据
1
2
3
4
5
6
7
8
9
10
graph LR;
a(节点)--采集-->b(FluentBit)
b--推送-->c(Kafka)
d(节点)--采集-->e(FluentBit)
e--推送-->c(Kafka)
f(节点)--采集-->g(FluentBit)
g--推送-->c(Kafka)
c--消费-->Logstash(Logstash)
Logstash-- 发送 -->Elastic(Elastic)
Elastic --展示-->Kibana(Kibana)

部署 FluentBit

官方文档https://github.com/fluent/fluent-bit-kubernetes-logging

  1. 部署 service-account、role、role-binding

Kubernetes v1.21 以下

1
2
3
4
kubectl create namespace logging
kubectl create -f https://raw.githubusercontent.com/fluent/fluent-bit-kubernetes-logging/master/fluent-bit-service-account.yaml
kubectl create -f https://raw.githubusercontent.com/fluent/fluent-bit-kubernetes-logging/master/fluent-bit-role.yaml
kubectl create -f https://raw.githubusercontent.com/fluent/fluent-bit-kubernetes-logging/master/fluent-bit-role-binding.yaml

Kubernetes v1.22 以上

1
2
3
4
kubectl create namespace logging
kubectl create -f https://raw.githubusercontent.com/fluent/fluent-bit-kubernetes-logging/master/fluent-bit-service-account.yaml
kubectl create -f https://raw.githubusercontent.com/fluent/fluent-bit-kubernetes-logging/master/fluent-bit-role-1.22.yaml
kubectl create -f https://raw.githubusercontent.com/fluent/fluent-bit-kubernetes-logging/master/fluent-bit-role-binding-1.22.yaml

创建 fluent-bit-service-account.yaml

1
2
3
4
5
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluent-bit
namespace: logging

创建fluent-bit-role-1.22.yaml

1
2
3
4
5
6
7
8
9
10
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluent-bit-read
rules:
- apiGroups: [""]
resources:
- namespaces
- pods
verbs: ["get", "list", "watch"]

创建fluent-bit-role-binding-1.22.yaml

1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fluent-bit-read
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: fluent-bit-read
subjects:
- kind: ServiceAccount
name: fluent-bit
namespace: logging
  1. 部署 ConfigMap

部署命令

1
kubectl apply -f fluent-bit-configmap.yaml

编写 fluent-bit-configmap.yaml

官网:https://docs.fluentbit.io/manual/pipeline/outputs/[kafka](https://so.csdn.net/so/search?q=kafka&spm=1001.2101.3001.7020)
正则:http://rubular.com/

正则表达式校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: logging
labels:
k8s-app: fluent-bit
data:
# Configuration files: server, input, filters and output
# ======================================================
fluent-bit.conf: |
[SERVICE]
Flush 1
Log_Level info
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020

@INCLUDE input-kubernetes.conf
#@INCLUDE filter-kubernetes.conf
@INCLUDE output-kafka.conf

input-kubernetes.conf: |
[INPUT]
Name tail
Tag kube.njc.dev.permission
Path /var/log/containers/*tech-dept*.log
DB /var/log/flb_kube.db
Mem_Buf_Limit 100MB
Rotate_Wait 60
Skip_Long_Lines On
Refresh_Interval 10
Multiline On
Parser_Firstline log_date
Parser_1 log_attributes

filter-kubernetes.conf: |
[FILTER]
Name kubernetes
Match kube.njc.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log On
Merge_Log_Key log_processed
K8S-Logging.Parser On
K8S-Logging.Exclude Off

[FILTER]
Name parser
Match kube.njc.dev.permission
Key_Name log
Parser njc_service_log
Reserve_Data On
Preserve_Key On

output-kafka.conf: |
[OUTPUT]
Name kafka
Match *
Brokers 192.168.5.31:9092,192.168.5.32:9092,192.168.5.33:9092
Topics java-prod-log
format json
timestamp_key kafkatimestamp
timestamp_format iso8601
rdkafka.message.max.bytes 200000000
rdkafka.fetch.message.max.bytes 204857600

parsers.conf: |
[PARSER]
Name log_date
Format regex
Regex /^\s*(?<timeDate>[^ ]* [^ ]*)\s\[\s*(?<serverName>[^\]]*)\]\[\s*(?<level>[^\]]*)\](?<message>[\s\S]*)/

[PARSER]
Name log_attributes
Format regex
Regex /^\s*(?<timeDate>[^ ]* [^ ]*)\s\[\s*(?<serverName>[^\]]*)\]\[\s*(?<level>[^\]]*)\](?<message>[\s\S]*)/

修改 INPUT 的 Path,按需过滤容器日志
注意 output-kafka 的 Match 和 FILTER 的 Match 以及 INPUT 的 Tag 一样
修改 kafka 的 Brokers 和 Topics

集群节点日志存储情况

image-20230922133531231

我们只关注tech-dept命名空间下的日志即可,因为此命名空间下的日志基本都是业务日志,少量的中间件日志,可以忽略不计

  1. 部署 DaemonSet

    部署命令

    1
    kubectl apply -f fluent-bit-ds.yaml

    编写 fluent-bit-ds.yam

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    apiVersion: apps/v1
    kind: DaemonSet
    metadata:
    name: fluent-bit
    namespace: logging
    labels:
    k8s-app: fluent-bit-logging
    version: v1
    kubernetes.io/cluster-service: "true"
    spec:
    updateStrategy:
    type: RollingUpdate
    selector:
    matchLabels:
    k8s-app: fluent-bit-logging
    template:
    metadata:
    labels:
    k8s-app: fluent-bit-logging
    version: v1
    kubernetes.io/cluster-service: "true"
    annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "2020"
    prometheus.io/path: /api/v1/metrics/prometheus
    spec:
    containers:
    - name: fluent-bit
    image: fluent/fluent-bit
    imagePullPolicy: Always
    ports:
    - containerPort: 2020
    readinessProbe:
    httpGet:
    path: /api/v1/metrics/prometheus
    port: 2020
    livenessProbe:
    httpGet:
    path: /
    port: 2020
    resources:
    requests:
    cpu: 5m
    memory: 10Mi
    limits:
    cpu: 200m
    memory: 400Mi
    volumeMounts:
    - name: varlog
    mountPath: /var/log
    - name: varlibdockercontainers
    mountPath: /var/lib/docker/containers
    readOnly: true
    - name: fluent-bit-config
    mountPath: /fluent-bit/etc/
    terminationGracePeriodSeconds: 10
    volumes:
    - name: varlog
    hostPath:
    path: /var/log
    - name: varlibdockercontainers
    hostPath:
    path: /var/lib/docker/containers
    - name: fluent-bit-config
    configMap:
    name: fluent-bit-config
    serviceAccountName: fluent-bit
    tolerations:
    - key: node-role.kubernetes.io/master
    operator: Exists
    effect: NoSchedule
    - operator: "Exists"
    effect: "NoExecute"
    - operator: "Exists"
    effect: "NoSchedule"

    部署结果:使用 DaemonSet 部署,保证每个 node 节点均运行着一个 pod,可以采集所有节点的日志

    image-20230922133924491

    部署日志:可以看见成功连接 kafka 集群
    在这里插入图片描述


部署Logstash

在Rainbond中部署Logstash服务,可以横向扩展

构建源:

1
2
3
4
创建方式: 镜像
镜像名称: docker.elastic.co/logstash/logstash:7.17.10
版本:7.17.10
启动命令:logstash -f ./config/kafka-logstash-es.conf

配置文件:

  1. kafka-logstash-es

    名称: kafka-logstash-es

    配置文件挂载路径:/usr/share/logstash/config/kafka-logstash-es.conf

    配置文件内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    input {				
    kafka {
    bootstrap_servers => "192.168.5.31:9092,192.168.5.32:9092,192.168.5.33:9092"
    topics => ["java-test-log"]
    group_id => "fluentd-test-log"
    client_id => "logstash-test"
    consumer_threads => 1
    decorate_events => true
    codec => "json"
    }
    }

    filter {
    json {
    source => "message" #将message字段进行解析
    skip_on_invalid_json => true #无效的json则跳过
    }

    date {
    match => ["timeDate", "ISO8601"]
    target => "@timestamp"
    }
    mutate {
    remove_field => ["_id"]
    remove_field => ["@verson"]
    remove_field => ["tags"]
    remove_field => ["_type"]
    remove_field => ["_score"]
    remove_field => ["_index"]
    remove_field => ["kafkatimestamp"]
    }
    }

    output {
    #elasticsearch {
    # action => "index"
    # hosts => ["192.168.5.35:9200","192.168.5.36:9200","192.168.5.37:9200"]
    # index => "k8s-test-log-%{[serverName]}-%{+yyyy.MM.dd}"
    # user => "elastic"
    # password => "sdbj123987"
    #}
    stdout {
    codec => rubydebug
    }
    }
  2. logstash

    名称:logstash

    配置文件挂载路径:/usr/share/logstash/config/logstash.yml

    配置文件内容:

    1
    config.support_escapes: true