ELK日志收集

编程

ELk简介

E:  elasticsearch   存储数据                java

L: logstash 收集,过滤,转发,匹配,大,启动慢,中间角色 java

K: kibana 过滤,分析,图形展示 java

F: filebeat 收集日志,过滤 go

基本架构图

传统日志分析需求(面试)

1.找出访问网站频次最高的 IP 排名前十

2.找出访问网站排名前十的 URL

3.找出中午 10 点到 2 点之间 www 网站访问频次最高的 IP

4.对比昨天这个时间段和今天这个时间段访问频次有什么变化

5.对比上周这个时间和今天这个时间的区别

6.找出特定的页面被访问了多少次

7.找出有问题的 IP 地址,并告诉我这个 IP 地址都访问了什么页面,在对比前几天他来过吗?他从什么时间段开

始访问的,什么时间段走了

8.找出来访问最慢的前十个页面并统计平均响应时间,对比昨天这也页面访问也这么慢吗?

9.找出搜索引擎今天各抓取了多少次?抓取了哪些页面?响应时间如何?

10.找出伪造成搜索引擎的 IP 地址

yum provides host

安装软件包

host +ip 我可以看到ip是否伪装

11.5 分钟之内告诉我结果

日志收集分类

代理层: nginx haproxy

web层: nginx tomcat java php

db层: mysql mongo redis es

系统层: message secure

存储层:nfs gfs

部署

ELK安装部署

准备单机环境

db01  nginx filebet es kibana es-head 

db02 nginx filebet

1.es单机环境准备

cat >/etc/elasticsearch/elasticsearch.yml <<EOF

node.name: node-1

path.data: /var/lib/elasticsearch

path.logs: /var/log/elasticsearch

network.host: 10.0.0.51,127.0.0.1

http.port: 9200

EOF

systemctl stop elasticsearch

rm -rf /var/lib/elasticsearch/*

rm -rf /var/lib/kibana/*

systemctl start elasticsearch

systemctl start kibana

tail -f /var/log/elasticsearch/elasticsearch.log

netstat -lntup|egrep "5601|9200"

filebet收集nginx普通格式日志

ES

kibana

es-head

0.更新系统时间

ntpdate time1.aliyun.com

1.安装Nginx

cat >/etc/yum.repos.d/nginx.repo <<EOF

[nginx-stable]

name=nginx stable repo

baseurl=http://nginx.org/packages/centos/$releasever/$basearch/

gpgcheck=1

enabled=1

gpgkey=https://nginx.org/keys/nginx_signing.key

module_hotfixes=true

[nginx-mainline]

name=nginx mainline repo

baseurl=http://nginx.org/packages/mainline/centos/$releasever/$basearch/

gpgcheck=1

enabled=0

gpgkey=https://nginx.org/keys/nginx_signing.key

module_hotfixes=true

EOF

yum install nginx -y

systemctl start nginx

curl 127.0.0.1

2.配置Nginx并创建测试页面

rm -rf /etc/nginx/conf.d/default.conf

cat >/etc/nginx/conf.d/www.conf<<EOF

server {

listen 80;

server_name localhost;

location / {

root /code/www;

index index.html index.htm;

}

}

EOF

mkdir /code/www/ -p

echo "db01-www" > /code/www/index.html

nginx -t

systemctl restart nginx

curl 127.0.0.1

tail -f /var/log/nginx/access.log

2.安装filebeat

rpm -ivh filebeat-6.6.0-x86_64.rpm

rpm -qc filebeat

3.配置filebeat

[root@db-01 /data/soft]# cat /etc/filebeat/filebeat.yml

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

4.启动并检查

systemctl start filebeat

tail -f /var/log/filebeat/filebeat

5.查看日志结果

es-head查看

curl -s 127.0.0.1:9200/_cat/indices|awk '{print $3}'

6.kibana添加索引

Management-->kibana-->Index Patterns-->filebeat-6.6.0-2020.02.13

kibana界面

kibana区域介绍

filebeat收集nginx的json各式日志

1.普通Nginx日志不足的地方:

- 日志都在一个value里,不能拆分单独显示和搜索

- 索引名称没有意义

2.理想中的情况

{

$remote_addr : 192.168.12.254

- : -

$remote_user : -

[$time_local]: [10/Sep/2019:10:52:08 +0800]

$request: GET /jhdgsjfgjhshj HTTP/1.0

$status : 404

$body_bytes_sent : 153

$http_referer : -

$http_user_agent :ApacheBench/2.3

$http_x_forwarded_for:-

}

3.目标

将Nginx日志转换成json格式

4.修改nginx配置文件使日志转换成json

vim /etc/nginx/nginx.conf

log_format json '{ "time_local": "$time_local", '

'"remote_addr": "$remote_addr", '

'"referer": "$http_referer", '

'"request": "$request", '

'"status": $status, '

'"bytes": $body_bytes_sent, '

'"agent": "$http_user_agent", '

'"x_forwarded": "$http_x_forwarded_for", '

'"up_addr": "$upstream_addr",'

'"up_host": "$upstream_http_host",'

'"upstream_time": "$upstream_response_time",'

'"request_time": "$request_time"'

' }';

access_log /var/log/nginx/access.log json;

清空旧日志

> /var/log/nginx/access.log

检查并重启nginx

nginx -t

systemctl restart nginx

5.修改filebeat配置文件

cat >/etc/filebeat/filebeat.yml<<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

EOF

6.删除旧的ES索引

es-head >> filebeat-6.6.0-2019.11.15 >> 动作 >>删除

7.重启filebeat

systemctl restart filebeat

curl 127.0.0.1生成新的日志,

8,删除kibana

自定义索引名称

filebeat自定义ES索引名称

1.理想中的索引名称

filebeat-6.6.0-2020.02.13

nginx-6.6.0-2019.11.15

2.filebeat配置

cat >/etc/filebeat/filebeat.yml<<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

index: "nginx-%{[beat.version]}-%{+yyyy.MM}"

setup.template.name: "nginx"

setup.template.pattern: "nginx-*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

3.重启filebeat

systemctl restart filebeat

4.生成新日志并检查

curl 127.0.0.1

es-head里索引名称

nginx-6.6.0-2020.02

5.kibana添加

日志分类存储">按日志分类存储

filebeat按照服务类型拆分索引

1.理想中的情况:

nginx-access-6.6.0-2020.02

nginx-error-6.6.0-2020.02

2.filebeat配置

第一种方法:

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

- type: log

enabled: true

paths:

- /var/log/nginx/error.log

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

indices:

- index: "nginx-access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

source: "/var/log/nginx/access.log"

- index: "nginx-error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

source: "/var/log/nginx/error.log"

setup.template.name: "nginx"

setup.template.pattern: "nginx-*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

第二种方法:

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

tags: ["access"]

- type: log

enabled: true

paths:

- /var/log/nginx/error.log

tags: ["error"]

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

indices:

- index: "nginx-access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "access"

- index: "nginx-error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "error"

setup.template.name: "nginx"

setup.template.pattern: "nginx-*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

3.重启filebeat

systemctl restart filebeat

4.生成测试数据

curl 127.0.0.1/zhangya

5.检查是否生成对应的索引

nginx-access-6.6.0-2020.02

nginx-error-6.6.0-2020.02

收集多台nginx日志

1.安装nginx

yum install nginx

2.复制db01配置文件

scp 10.0.0.51:/etc/nginx/nginx.conf /etc/nginx/nginx.conf

scp 10.0.0.51:/etc/nginx/conf.d/www.conf /etc/nginx/conf.d/

3.创建测试页面

mkdir /code/www/ -p

echo "db02-www" > /code/www/index.html

4.重启nginx

>/var/log/nginx/access.log

>/var/log/nginx/error.log

nginx -t

systemctl restart nginx

5.生成测试页面

curl 127.0.0.1/22222222222222

6.安装filebeat

rpm -ivh filebeat...

7.复制filebeat配置文件

scp 10.0.0.51:/etc/filebeat/filebeat.yml /etc/filebeat/

8.启动filebeat

systemctl restart filebeat

filebeat原理

类似tail -f

实时读取,会记录上一次已经传送的位置点

自定义kibana图形

组合面板

过滤查看

收集tomcat的json日志

1.安装tomcat 

yum install tomcat tomcat-webapps tomcat-admin-webapps tomcat-docs-webapp tomcat-javadoc -y

filebeat收集tomcat的json日志

1.安装tomcat

yum install tomcat -y

systemctl start tomcat

tail -f /var/log/tomcat/localhost_access_log.2020-02-14.txt

2.修改tomcat配置将日志转换为json格式

cp /etc/tomcat/server.xml /opt/

vim /etc/tomcat/server.xml

pattern="{&quot;clientip&quot;:&quot;%h&quot;,&quot;ClientUser&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;AccessTime&quot;:&quot;%t&quot;,&quot;method&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;SendBytes&quot;:&quot;%b&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;partner&quot;:&quot;%{Referer}i&quot;,&quot;AgentVersion&quot;:&quot;%{User-Agent}i&quot;}"/>

3.清空日志并重启

> /var/log/tomcat/localhost_access_log.2020-02-14.txt

systemctl restart tomcat

4.访问并查看日志是否为json格式

curl 127.0.0.1:8080

tail -f /var/log/tomcat/localhost_access_log.2020-02-14.txt

5.创建filebeat配置文件

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

tags: ["access"]

- type: log

enabled: true

paths:

- /var/log/nginx/error.log

tags: ["error"]

- type: log

enabled: true

paths:

- /var/log/tomcat/localhost_access_log.*.txt

json.keys_under_root: true

json.overwrite_keys: true

tags: ["tomcat"]

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

indices:

- index: "nginx-access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "access"

- index: "nginx-error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "error"

- index: "tomcat-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "tomcat"

setup.template.name: "nginx"

setup.template.pattern: "nginx-*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

6.重启filebeat并检查

systemctl restart filebeat

收集JAVA日志

https://www.elastic.co/guide/en/beats/filebeat/6.6/multiline-examples.html

java日志的特点: 

1.报错信息巨多

2.报错信息巨多还是一个事件.不能分开看

一段java报错日志如下:

[2019-09-10T16:15:41,630][ERROR][o.e.b.Bootstrap ] [CcJTI28] Exception

java.lang.IllegalArgumentException: unknown setting [nnode.name] did you mean [node.name]?

at org.elasticsearch.common.settings.AbstractScopedSettings.validate(AbstractScopedSettings.java:482) ~[elasticsearch-6.6.0.jar:6.6.0]

at org.elasticsearch.common.settings.AbstractScopedSettings.validate(AbstractScopedSettings.java:427) ~[elasticsearch-6.6.0.jar:6.6.0]

at org.elasticsearch.common.settings.AbstractScopedSettings.validate(AbstractScopedSettings.java:398) ~[elasticsearch-6.6.0.jar:6.6.0]

at org.elasticsearch.common.settings.AbstractScopedSettings.validate(AbstractScopedSettings.java:369) ~[elasticsearch-6.6.0.jar:6.6.0]

at org.elasticsearch.common.settings.SettingsModule.<init>(SettingsModule.java:148) ~[elasticsearch-6.6.0.jar:6.6.0]

[2019-09-10T16:18:16,742][INFO ][o.e.c.m.MetaDataIndexTemplateService] [node-1] adding template [kibana_index_template:.kibana] for index patterns [.kibana]

[2019-09-10T16:18:17,981][INFO ][o.e.c.m.MetaDataIndexTemplateService] [node-1] adding template [kibana_index_template:.kibana] for index patterns [.kibana]

[2019-09-10T16:18:33,417][INFO ][o.e.c.m.MetaDataIndexTemplateService] [node-1] adding template [kibana_index_template:.kibana] for index patterns [.kibana]

匹配思路:

1.java报错日志特点

正常日志是以[日期]开头的

报错日志行数多,但是不是以[

2.匹配以[开头的行,一直到下一个以[开头的行,中间所有的数据属于一个事件,放在一起发给ES

filebeat收集java多行匹配模式

1.filebeat配置文件

cat >/etc/filebeat/filebeat.yml<<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/elasticsearch/elasticsearch.log

multiline.pattern: '^['

multiline.negate: true

multiline.match: after

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

index: "es-%{[beat.version]}-%{+yyyy.MM}"

setup.template.name: "es"

setup.template.pattern: "es-*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

2.重启filebeat

systemctl restart filebeat

3.检查java报错日志是否合并成一行了

kibana添加索引然后搜索关键词 at org

filbeat模块module

作用:

可以将特定的服务的普通日志转成json格式

filbeat使用模块收集nginx日志

1.清空并把nginx日志恢复成普通格式

#清空日志

>/var/log/nginx/access.log

#编辑配置文件

vi /etc/nginx/nginx.conf

log_format main '$remote_addr - $remote_user [$time_local] "$request" '

'$status $body_bytes_sent "$http_referer" '

'"$http_user_agent" "$http_x_forwarded_for"';

access_log /var/log/nginx/access.log main;

#检查并重启

nginx -t

systemctl restart nginx

2.访问并检查日志是否为普通格式

curl 127.0.0.1

tail -f /var/log/nginx/access.log

3.配置filebeat配置文件支持模块

vim /etc/filebeat/filebeat.yml

filebeat.config.modules:

path: ${path.config}/modules.d/*.yml

reload.enabled: true

reload.period: 10s

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

indices:

- index: "nginx-access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

event.dataset: "nginx.access"

- index: "nginx-error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

event.dataset: "nginx.error"

setup.template.name: "nginx"

setup.template.pattern: "nginx-*"

setup.template.enabled: false

setup.template.overwrite: true

4.激活filebeat的nginx模块

filebeat modules enable nginx

filebeat modules list

5.配置filebeat的nginx模块配置文件

cat >/etc/filebeat/modules.d/nginx.yml <<EOF

- module: nginx

access:

enabled: true

var.paths: ["/var/log/nginx/access.log"]

error:

enabled: true

var.paths: ["/var/log/nginx/error.log"]

EOF

6.es安装filebeat的nginx模块必要插件并重启

cd /usr/share/elasticsearch/:

./bin/elasticsearch-plugin install file:///root/ingest-geoip-6.6.0.zip

./bin/elasticsearch-plugin install file:///root/ingest-user-agent-6.6.0.zip

systemctl restart elasticsearch

7.重启filebeat

systemctl restart filebeat

filebeat使用模块收集mysql日志

1.配置mysql错误日志和慢日志路径

编辑my.cnf

log-error=错误日志路径

slow_query_log=ON

slow_query_log_file=慢日志日志路径

long_query_time=3

2.重启mysql并制造慢日志

systemctl restart mysql

慢日志制造语句

select sleep(2) user,host from mysql.user ;

3.确认慢日志和错误日志确实有生成

4.激活filebeat的mysql模块

filebeat module enable mysql

5.配置mysql的模块

- module: mysql

error:

enabled: true

var.paths: ["错误日志路径"]

slowlog:

enabled: true

var.paths: ["慢日志日志路径"]

6.配置filebeat根据日志类型做判断

filebeat.config.modules:

path: ${path.config}/modules.d/*.yml

reload.enabled: true

reload.period: 10s

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

indices:

- index: "nginx_access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

fileset.module: "nginx"

fileset.name: "access"

- index: "nginx_error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

fileset.module: "nginx"

fileset.name: "error"

- index: "mysql_slowlog-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

fileset.module: "mysql"

fileset.name: "slowlog"

- index: "mysql_error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

fileset.module: "mysql"

fileset.name: "error"

setup.template.name: "nginx"

setup.template.pattern: "nginx_*"

setup.template.enabled: false

setup.template.overwrite: true

7.重启filebeat

systemctl restart filebeat

使用input的docker类型收集docker日志

docker

1.安装dockder

yum install -y yum-utils device-mapper-persistent-data lvm2

wget -O /etc/yum.repos.d/docker-ce.repo https://download.docker.com/linux/centos/docker-ce.repo

sed -i 's+download.docker.com+mirrors.tuna.tsinghua.edu.cn/docker-ce+' /etc/yum.repos.d/docker-ce.repo

yum makecache fast

yum install docker-ce -y

mkdir -p /etc/docker

tee /etc/docker/daemon.json <<-'EOF'

{

"registry-mirrors": ["https://ig2l319y.mirror.aliyuncs.com"]

}

EOF

systemctl daemon-reload

systemctl restart docker

2.启动2个Nginx容器并访问测试

docker run -d -p 80:80 nginx

docker run -d -p 8080:80 nginx

curl 10.0.0.52

curl 10.0.0.52:8080

3.配置filebeat

[root@db02 ~]# cat /etc/filebeat/filebeat.yml

filebeat.inputs:

- type: docker

containers.ids:

- '*'

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

index: "docker-%{[beat.version]}-%{+yyyy.MM}"

setup.template.name: "docker"

setup.template.pattern: "docker-*"

setup.template.enabled: false

setup.template.overwrite: true

4.重启filebeat

systemctl restart filebeat

5.访问生成测试数据

curl 10.0.0.52/1111111111

curl 10.0.0.52:8080/2222222222

使用docker-compose启动docker容器

场景: 

容器1: nginx

容器2: mysql

filebeat收集docker日志可以早下班版

1.假设的场景

nginx容器 80端口

mysql容器 8080端口

2.理想中的索引名称

docker-nginx-6.6.0-2020.02

docker-mysql-6.6.0-2020.02

3.理想的日志记录格式

nginx容器日志:

{

"log": "xxxxxx",

"stream": "stdout",

"time": "xxxx",

"service": "nginx"

}

mysql容器日志:

{

"log": "xxxxxx",

"stream": "stdout",

"time": "xxxx",

"service": "mysql"

}

4.docker-compose配置

yum install docker-compose -y

cat >docker-compose.yml<<EOF

version: '3'

services:

nginx:

image: nginx:latest

labels:

service: nginx

logging:

options:

labels: "service"

ports:

- "80:80"

db:

image: nginx:latest

labels:

service: db

logging:

options:

labels: "service"

ports:

- "8080:80"

EOF

5.删除旧的容器

docker stop $(docker ps -q)

docker rm $(docker ps -qa)

6.启动容器

docker-compose up -d

7.配置filebeat

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/lib/docker/containers/*/*-json.log

json.keys_under_root: true

json.overwrite_keys: true

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

indices:

- index: "docker-nginx-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

attrs.service: "nginx"

- index: "docker-db-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

attrs.service: "db"

setup.template.name: "docker"

setup.template.pattern: "docker-*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

8.重启filebeat

systemctl restart filebeat

9.生成访问日志

curl 127.0.0.1/nginxxxxxxxxxxx

curl 127.0.0.1:8080/dbbbbbbbbb

根据服务分类

目前不完善的地方

正常日志和报错日志放在一个索引里了

2.理想中的索引名称

docker-nginx-access-6.6.0-2020.02

docker-nginx-error-6.6.0-2020.02

docker-db-access-6.6.0-2020.02

docker-db-error-6.6.0-2020.02

3.filebeat配置文件

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/lib/docker/containers/*/*-json.log

json.keys_under_root: true

json.overwrite_keys: true

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

indices:

- index: "docker-nginx-access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

attrs.service: "nginx"

stream: "stdout"

- index: "docker-nginx-error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

attrs.service: "nginx"

stream: "stderr"

- index: "docker-db-access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

attrs.service: "db"

stream: "stdout"

- index: "docker-db-error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

attrs.service: "db"

stream: "stderr"

setup.template.name: "docker"

setup.template.pattern: "docker-*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

4.重启filebeat

systemctl restart filebeat

5.生成测试数据

curl 127.0.0.1/nginxxxxxxxxxxx

curl 127.0.0.1:8080/dbbbbbbbbb

收集docker日志涨薪版

1.需求分析

json格式并且按照下列索引生成

docker-nginx-access-6.6.0-2020.02

docker-db-access-6.6.0-2020.02

docker-db-error-6.6.0-2020.02

docker-nginx-error-6.6.0-2020.02

2.停止并且删除以前的容器

docker stop $(docker ps -qa)

docker rm $(docker ps -qa)

3.创建新容器

docker run -d -p 80:80 -v /opt/nginx:/var/log/nginx/ nginx

docker run -d -p 8080:80 -v /opt/mysql:/var/log/nginx/ nginx

4.准备json格式的nginx配置文件

scp 10.0.0.51:/etc/nginx/nginx.conf /root/

[root@db02 ~]# grep "access_log" nginx.conf

access_log /var/log/nginx/access.log json;

5.拷贝到容器里并重启

docker cp nginx.conf Nginx容器的ID:/etc/nginx/

docker cp nginx.conf mysql容器的ID:/etc/nginx/

docker stop $(docker ps -qa)

docker start Nginx容器的ID

docker start mysql容器的ID

6.删除ES已经存在的索引

7.配置filebeat配置文件

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /opt/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

tags: ["nginx_access"]

- type: log

enabled: true

paths:

- /opt/nginx/error.log

tags: ["nginx_err"]

- type: log

enabled: true

paths:

- /opt/mysql/access.log

json.keys_under_root: true

json.overwrite_keys: true

tags: ["db_access"]

- type: log

enabled: true

paths:

- /opt/mysql/error.log

tags: ["db_err"]

output.elasticsearch:

hosts: ["10.0.0.51:9200"]

indices:

- index: "docker-nginx-access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "nginx_access"

- index: "docker-nginx-error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "nginx_err"

- index: "docker-db-access-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "db_access"

- index: "docker-db-error-%{[beat.version]}-%{+yyyy.MM}"

when.contains:

tags: "db_err"

setup.template.name: "docker"

setup.template.pattern: "docker-*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

8.重启filebeat

systemctl restart filebeat

9.访问并测试

curl 127.0.0.1/oldboy

curl 127.0.0.1:8080/oldboy

cat /opt/nginx/access.log

cat /opt/mysql/access.log

es-head查看

第二种方案

使用缓存服务来缓解ES压力

架构图

引入redis缓存

1.安装redis

yum install redis

sed -i 's#^bind 127.0.0.1#bind 127.0.0.1 10.0.0.51#' /etc/redis.conf

systemctl start redis

netstat -lntup|grep redis

redis-cli -h 10.0.0.51

2.停止docker容器

docker stop $(docker ps -q)

3.停止filebeat

systemctl stop filebeat

4.删除旧的ES索引

5.确认nginx日志为json格式

grep "access_log" nginx.conf

6.修改filebeat配置文件

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

tags: ["access"]

- type: log

enabled: true

paths:

- /var/log/nginx/error.log

tags: ["error"]

output.redis:

hosts: ["10.0.0.51"]

keys:

- key: "nginx_access"

when.contains:

tags: "access"

- key: "nginx_error"

when.contains:

tags: "error"

setup.template.name: "nginx"

setup.template.pattern: "nginx_*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

7.重启filebaet和nginx

systemctl restart nginx

systemctl restart filebeat

8.生成测试数据

curl 127.0.0.1/haha

9.检查

redis-cli -h 10.0.0.51

keys *

TYPE nginx_access

LLEN nginx_access

LRANGE nginx_access 0 -1

确认是否为json格式

10.安装logstash

rpm -ivh jdk-8u102-linux-x64.rpm

rpm -ivh logstash-6.6.0.rpm

11.配置logstash

cat >/etc/logstash/conf.d/redis.conf<<EOF

input {

redis {

host => "10.0.0.51"

port => "6379"

db => "0"

key => "nginx_access"

data_type => "list"

}

redis {

host => "10.0.0.51"

port => "6379"

db => "0"

key => "nginx_error"

data_type => "list"

}

}

filter {

mutate {

convert => ["upstream_time", "float"]

convert => ["request_time", "float"]

}

}

output {

stdout {}

if "access" in [tags] {

elasticsearch {

hosts => "http://10.0.0.51:9200"

manage_template => false

index => "nginx_access-%{+yyyy.MM}"

}

}

if "error" in [tags] {

elasticsearch {

hosts => "http://10.0.0.51:9200"

manage_template => false

index => "nginx_error-%{+yyyy.MM}"

}

}

}

EOF

12.前台启动测试

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf

13.检查

logstash输出的内容有没有解析成json

es-head上有没有索引生成

redis里的列表数据有没有在减少

14.将logstash放在后台运行

ctrl+c

systemctl start logstash

听风扇声音,开始转的时候表示logstash启动了

filebeat引入redis完善方案

filebeat引入redis完善方案

1.前提条件

- filebeat不支持传输给redis哨兵或集群

- logstash也不支持从redis哨兵或集群里读取数据

2.安装配置redis

yum install redis -y

sed -i 's#^bind 127.0.0.1#bind 127.0.0.1 10.0.0.51#' /etc/redis.conf

systemctl start redis

3.安装配置nginx

配置官方源

yum install nginx -y

放在nginx.conf最后一行的}后面,不要放在conf.d里面

stream {

upstream redis {

server 10.0.0.51:6379 max_fails=2 fail_timeout=10s;

server 10.0.0.52:6379 max_fails=2 fail_timeout=10s backup;

}

server {

listen 6380;

proxy_connect_timeout 1s;

proxy_timeout 3s;

proxy_pass redis;

}

}

nginx -t

systemctl start nginx

4.安装配置keepalived

yum install keepalived -y

db01的配置

global_defs {

router_id db01

}

vrrp_instance VI_1 {

state MASTER

interface eth0

virtual_router_id 50

priority 150

advert_int 1

authentication {

auth_type PASS

auth_pass 1111

}

virtual_ipaddress {

10.0.0.100

}

}

db02的配置

global_defs {

router_id db02

}

vrrp_instance VI_1 {

state BACKUP

interface eth0

virtual_router_id 50

priority 100

advert_int 1

authentication {

auth_type PASS

auth_pass 1111

}

virtual_ipaddress {

10.0.0.100

}

}

systemctl start keepalived

ip a

5.测试访问能否代理到redis

redis-cli -h 10.0.0.100 -p 6380

把db01的redis停掉,测试还能不能连接redis

6.配置filebeat

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

tags: ["access"]

- type: log

enabled: true

paths:

- /var/log/nginx/error.log

tags: ["error"]

output.redis:

hosts: ["10.0.0.100:6380"]

keys:

- key: "nginx_access"

when.contains:

tags: "access"

- key: "nginx_error"

when.contains:

tags: "error"

setup.template.name: "nginx"

setup.template.pattern: "nginx_*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

7.测试访问filebeat能否传输到redis

curl 10.0.0.51/haha

redis-cli -h 10.0.0.51 #应该有数据

redis-cli -h 10.0.0.52 #应该没数据

redis-cli -h 10.0.0.100 -p 6380 #应该有数据

8.配置logstash

cat >/etc/logstash/conf.d/redis.conf<<EOF

input {

redis {

host => "10.0.0.100"

port => "6380"

db => "0"

key => "nginx_access"

data_type => "list"

}

redis {

host => "10.0.0.100"

port => "6380"

db => "0"

key => "nginx_error"

data_type => "list"

}

}

filter {

mutate {

convert => ["upstream_time", "float"]

convert => ["request_time", "float"]

}

}

output {

stdout {}

if "access" in [tags] {

elasticsearch {

hosts => "http://10.0.0.51:9200"

manage_template => false

index => "nginx_access-%{+yyyy.MM}"

}

}

if "error" in [tags] {

elasticsearch {

hosts => "http://10.0.0.51:9200"

manage_template => false

index => "nginx_error-%{+yyyy.MM}"

}

}

}

EOF

9.启动测试

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf

10.最终测试

ab -n 10000 -c 100 10.0.0.100/

检查es-head上索引条目是否为10000条

关闭db01的redis,在访问,测试logstash正不正常

恢复db01的redis,再测试

filbeat引入redis优化方案

1.新增加一个日志路径需要修改4个地方:

- filebat 2个位置

- logstash 2个位置

2.优化之后需要修改的地方2个地方

- filebat 1个位置

- logstash 1个位置

3.filebeat配置文件

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

tags: ["access"]

- type: log

enabled: true

paths:

- /var/log/nginx/error.log

tags: ["error"]

output.redis:

hosts: ["10.0.0.100:6380"]

key: "nginx_log"

setup.template.name: "nginx"

setup.template.pattern: "nginx_*"

setup.template.enabled: false

setup.template.overwrite: true

4.优化后的logstash

input {

redis {

host => "10.0.0.100"

port => "6380"

db => "0"

key => "nginx_log"

data_type => "list"

}

}

filter {

mutate {

convert => ["upstream_time", "float"]

convert => ["request_time", "float"]

}

}

output {

stdout {}

if "access" in [tags] {

elasticsearch {

hosts => "http://10.0.0.51:9200"

manage_template => false

index => "nginx_access-%{+yyyy.MM}"

}

}

if "error" in [tags] {

elasticsearch {

hosts => "http://10.0.0.51:9200"

manage_template => false

index => "nginx_error-%{+yyyy.MM}"

}

}

}

使用kafka作为缓存

1.配置hosts

10.0.0.51 kafka51

10.0.0.52 kafka52

10.0.0.53 kafka53

2.安装配置zookeeper

cd /data/soft/

tar zxf zookeeper-3.4.11.tar.gz -C /opt/

ln -s /opt/zookeeper-3.4.11/ /opt/zookeeper

mkdir -p /data/zookeeper

cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg

cat >/opt/zookeeper/conf/zoo.cfg<<EOF

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data/zookeeper

clientPort=2181

server.1=10.0.0.51:2888:3888

server.2=10.0.0.52:2888:3888

server.3=10.0.0.53:2888:3888

EOF

注意!ID每台机器不一样

echo "1" > /data/zookeeper/myid

cat /data/zookeeper/myid

3.启动zookeeper

所有节点都启动

/opt/zookeeper/bin/zkServer.sh start

4.每个节点都检查

/opt/zookeeper/bin/zkServer.sh status

5.测试zookeeper

在一个节点上执行,创建一个频道

/opt/zookeeper/bin/zkCli.sh -server 10.0.0.51:2181

create /test "hello"

在其他节点上看能否接收到

/opt/zookeeper/bin/zkCli.sh -server 10.0.0.52:2181

get /test

查看进程

ps -aux | grep 'zookeeper'

6.安装部署kafka

db01操作

cd /data/soft/

tar zxf kafka_2.11-1.0.0.tgz -C /opt/

ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka

mkdir /opt/kafka/logs

cat >/opt/kafka/config/server.properties<<EOF

broker.id=1

listeners=PLAINTEXT://10.0.0.51:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/opt/kafka/logs

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=24

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

EOF

db02操作

cd /data/soft/

tar zxf kafka_2.11-1.0.0.tgz -C /opt/

ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka

mkdir /opt/kafka/logs

cat >/opt/kafka/config/server.properties<<EOF

broker.id=2

listeners=PLAINTEXT://10.0.0.52:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/opt/kafka/logs

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=24

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

EOF

db03操作

cd /data/soft/

tar zxf kafka_2.11-1.0.0.tgz -C /opt/

ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka

mkdir /opt/kafka/logs

cat >/opt/kafka/config/server.properties<<EOF

broker.id=3

listeners=PLAINTEXT://10.0.0.53:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/opt/kafka/logs

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=24

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

EOF

7.前台启动测试

/opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties

看最后有没有start

8.验证进程

jps

出现3个进程

9.测试创建topic

/opt/kafka/bin/kafka-topics.sh --create  --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic kafkatest

10.测试获取toppid

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic kafkatest

11.测试删除topic

/opt/kafka/bin/kafka-topics.sh --delete --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic kafkatest

12.kafka测试命令发送消息

创建命令

/opt/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic  messagetest

测试发送消息

/opt/kafka/bin/kafka-console-producer.sh --broker-list  10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 --topic  messagetest

出现尖角号

其他节点测试接收

/opt/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic messagetest --from-beginning

测试获取所有的频道

/opt/kafka/bin/kafka-topics.sh  --list --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181

13.测试成功之后,可以放在后台启动

/opt/kafka/bin/kafka-server-start.sh  -daemon /opt/kafka/config/server.properties

14.修改filebeat配置文件

cat >/etc/filebeat/filebeat.yml <<EOF

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/nginx/access.log

json.keys_under_root: true

json.overwrite_keys: true

tags: ["access"]

- type: log

enabled: true

paths:

- /var/log/nginx/error.log

tags: ["error"]

output.kafka:

hosts: ["10.0.0.51:9092", "10.0.0.52:9092", "10.0.0.53:9092"]

topic: 'filebeat'

setup.template.name: "nginx"

setup.template.pattern: "nginx_*"

setup.template.enabled: false

setup.template.overwrite: true

EOF

重启filebeat

systemctl restart filebeat

11.访问并检查kafka里有没有收到日志

curl 10.0.0.51

/opt/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181

/opt/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic filebeat --from-beginning

15.修改logstash配置文件

cat >/etc/logstash/conf.d/kafka.conf <<EOF

input {

kafka{

bootstrap_servers=>["10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092"]

topics=>["filebeat"]

#group_id=>"logstash"

codec => "json"

}

}

filter {

mutate {

convert => ["upstream_time", "float"]

convert => ["request_time", "float"]

}

}

output {

stdout {}

if "access" in [tags] {

elasticsearch {

hosts => "http://10.0.0.51:9200"

manage_template => false

index => "nginx_access-%{+yyyy.MM}"

}

}

if "error" in [tags] {

elasticsearch {

hosts => "http://10.0.0.51:9200"

manage_template => false

index => "nginx_error-%{+yyyy.MM}"

}

}

}

EOF

16.启动logstash并测试

1.前台启动

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf

2.后台启动

systemctl start logstash

17.logstash移除不需要的字段

在filter区块里添加remove_field字段即可

filter {

mutate {

convert => ["upstream_time", "float"]

convert => ["request_time", "float"]

remove_field => [ "beat" ]

}

}

kafka与zookeeper 的关系

zookeeper中存储的信息有broker,consumer等重要znode信息。

可以感知到,每个kafka节点会在zookeeper中注册该机器的配置信息。

然后注册完的kafka节点的topic信息会存在topics目录下面。

根据zookeeper目录列表可以看到,zookeeper存储了kafka集群的所有信息,那么发送和接收消息是怎样的流程呢?

kafka的发送与接收

发送:kafka的发送程序(代码)会指定broker服务地址,那么消息的发送会直接发送到broker提供的地址中。

如果地址是列表(指定了多个broker地址),那么则随机选择一个可用的发送。接收到消息的kafka机器会向zookeeper查询拥有该topic下partition决定权(leader)的机器,然后由该leader选择机器存储数据,最终存储数据。

接收:kafka的接收会指定zookeeper地址,那么接收到消费任务的zookeeper将任务报告给该topic下partition的leader,由该leader指定follower完成数据的获取并返回。

Zookeeper上的细节:

1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。

2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。

3. 每个consumer group关联一个临时的owner registry和一个持久的offset registry。

对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id。

同时包含一个offset registry,内容为上一次订阅的offset。

如何在公司推广ELK

  • 优先表达对别人的好处,可以让别人早下班
  • 实验环境准备充足,可以随时打开演示,数据和画图丰富一些
  • 开发组,后端组,前端组,运维组,DBA组 单独定制面板
  • 单独找组长,说优先给咱们组解决问题
  • 你看,你有问题还得这么麻烦跑过来,我给你调好之后,你直接点点鼠标就可以了,如果还有问题,您一句话,我过去

以上是 ELK日志收集 的全部内容, 来源链接: utcz.com/z/514235.html

回到顶部