elk是什么
elk是elastic公司提供的一套完整的日志收集以及前端展示的解决方案,是三个产品的首字母缩写,分别是elasticsearch、logstash和kibana。
其中logstash负责对日志进行处理,如日志的过滤、日志的格式化等;elasticsearch具有强大的文本搜索能力,因此作为日志的存储容器;而kibana负责前端的展示。
elk搭建架构如下图:
加入了filebeat用于从不同的客户端收集日志,然后传递到logstash统一处理。
elk的搭建
因为elk是三个产品,可以选择依次安装这三个产品。
这里选择使用docker安装elk。
docker安装elk也可以选择分别下载这三个产品的镜像并运行,但是本次使用直接下载elk的三合一镜像来安装。
因此首先要保证已经有了docker的运行环境,docker运行环境的搭建请查看:https://blog.csdn.net/qq13112...
拉取镜像
有了docker环境之后,在服务器运行命令:
docker pull sebp/elk
这个命令是在从docker仓库下载elk三合一的镜像,总大小为2个多g,如果发现下载速度过慢,可以将docker仓库源地址替换为国内源地址。
下载完成之后,查看镜像:
docker images
logstash配置
在/usr/config/logstash
目录下新建beats-input.conf,用于日志的输入:
1
2
3
4
5
|
input { beats { port => 5044 } } |
新建output.conf,用于日志由logstash到elasticsearch的输出:
1
2
3
4
5
6
7
|
output { elasticsearch { hosts => ["localhost"] manage_template => false index => "%{[@metadata][beat]}" } } |
其中的index
为输出到elasticsearch后的index
。
运行容器
有了镜像之后直接启动即可:
docker run -d -p 5044:5044 -p 5601:5601 -p 9203:9200 -p 9303:9300 -v /var/data/elk:/var/lib/elasticsearch -v /usr/config/logstash:/etc/logstash/conf.d --name=elk sebp/elk
-d的意思是后台运行容器;
-p的意思是宿主机端口:容器端口,即将容器中使用的端口映射到宿主机上的某个端口,elasticsearch的默认端口是9200和9300,由于我的机器上已经运行了3台elasticsearch实例,因此此处将映射端口进行了修改;
-v的意思是宿主机的文件|文件夹:容器的文件|文件夹,此处将容器中elasticsearch 的数据挂载到宿主机的/var/data/elk
上,以防容器重启后数据的丢失;并且将logstash的配置文件挂载到宿主机的/usr/config/logstash
目录。
--name的意思是给容器命名,命名是为了之后操作容器更加方便。
如果你之前搭建过elasticsearch的话,会发现搭建的过程中有各种错误,但是使用docker搭建elk的过程中并没有出现那些错误。
运行后查看容器:
docker ps
查看容器日志:
docker logs -f elk
进入容器:
docker exec -it elk /bin/bash
修改配置后重启容器:
docker restart elk
查看kinaba
浏览器输入http://my_host:5601/
即可看到kinaba界面。此时elasticsearch中还没有数据,需要安装filebeat采集数据到elk中。
filebeat搭建
filebeat用于采集数据并上报到logstash或者elasticsearch,在需要采集日志的服务器上下载filebeat并解压即可使用
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.1-linux-x86_64.tar.gz
tar -zxvf filebeat-6.2.1-linux-x86_64.tar.gz
修改配置文件
进入filebeat,修改filebeat.yml。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
filebeat.prospectors: - type: log #需要设置为true配置才能生效 enabled: true path: #配置需要采集的日志路径 - /var/log/*.log #可以打一个tag以后分类使用 tag: ["my_tag"] #对应elasticsearch的type document_type: my_type setup.kibana: #此处为kibana的ip及端口,即kibana:5601 host: "" output.logstash: #此处为logstash的ip及端口,即logstash:5044 host: [""] #需要设置为true,否则不生效 enabled: true #如果想直接从filebeat采集数据到elasticsearch,则可以配置output.elasticsearch的相关配置 |
运行filebeat
运行:
./filebeat -e -c filebeat.yml -d "publish"
此时可以看到filebeat会将配置的path下的log发送到logstash;然后在elk中,logstash处理完数据之后就会发送到elasticsearch。但我们想做的是通过elk进行数据分析,因此导入到elasticsearch的数据必须是json格式的。
这是之前我的单条日志的格式:
1
|
2019-10-22 10:44:03.441 info rmjk.interceptors.ipinterceptor line:248 - {"clienttype":"1","decode":"0fbd93a286533d071","eatype":2,"eaid":191970823383420928,"ip":"xx.xx.xx.xx","model":"honor stf-al10","ostype":"9","path":"/applicationenter","result":5,"session":"ef0a5c4bca424194b29e2ff31632ee5c","timestamp":1571712242326,"uid":"130605789659402240","v":"2.2.4"} |
导入之后不好分析,之后又想到使用logstash的filter中的grok来处理日志使之变成json格式之后再导入到elasticsearch中,但是由于我的日志中的参数是不固定的,发现难度太大了,于是转而使用logback,将日志直接格式化成json之后,再由filebeat发送。
logback配置
我的项目是spring boot,在项目中加入依赖:
1
2
3
4
5
|
< dependency > < groupid >net.logstash.logback</ groupid > < artifactid >logstash-logback-encoder</ artifactid > < version >5.2</ version > </ dependency > |
然后在项目中的resource目录下加入logback.xml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
<? xml version = "1.0" encoding = "utf-8" ?> < configuration > <!-- 说明: 1、日志级别及文件 日志记录采用分级记录,级别与日志文件名相对应,不同级别的日志信息记录到不同的日志文件中 例如:error级别记录到log_error_xxx.log或log_error.log(该文件为当前记录的日志文件),而log_error_xxx.log为归档日志, 日志文件按日期记录,同一天内,若日志文件大小等于或大于2m,则按0、1、2...顺序分别命名 例如log-level-2013-12-21.0.log 其它级别的日志也是如此。 2、文件路径 若开发、测试用,在eclipse中运行项目,则到eclipse的安装路径查找logs文件夹,以相对路径../logs。 若部署到tomcat下,则在tomcat下的logs文件中 3、appender fileerror对应error级别,文件名以log-error-xxx.log形式命名 filewarn对应warn级别,文件名以log-warn-xxx.log形式命名 fileinfo对应info级别,文件名以log-info-xxx.log形式命名 filedebug对应debug级别,文件名以log-debug-xxx.log形式命名 stdout将日志信息输出到控制上,为方便开发测试使用 --> < contextname >service</ contextname > < property name = "log_path" value = "logs" /> <!--设置系统日志目录--> < property name = "appdir" value = "doctor" /> <!-- 日志记录器,日期滚动记录 --> < appender name = "fileerror" class = "ch.qos.logback.core.rolling.rollingfileappender" > <!-- 正在记录的日志文件的路径及文件名 --> < file >${log_path}/${appdir}/log_error.log</ file > <!-- 日志记录器的滚动策略,按日期,按大小记录 --> < rollingpolicy class = "ch.qos.logback.core.rolling.timebasedrollingpolicy" > <!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。 而2013-12-21的日志文件在由filenamepattern指定。%d{yyyy-mm-dd}指定日期格式,%i指定索引 --> < filenamepattern >${log_path}/${appdir}/error/log-error-%d{yyyy-mm-dd}.%i.log</ filenamepattern > <!-- 除按日志记录之外,还配置了日志文件不能超过2m,若超过2m,日志文件会以索引0开始, 命名日志文件,例如log-error-2013-12-21.0.log --> < timebasedfilenamingandtriggeringpolicy class = "ch.qos.logback.core.rolling.sizeandtimebasedfnatp" > < maxfilesize >2mb</ maxfilesize > </ timebasedfilenamingandtriggeringpolicy > </ rollingpolicy > <!-- 追加方式记录日志 --> < append >true</ append > <!-- 日志文件的格式 --> < encoder class = "ch.qos.logback.classic.encoder.patternlayoutencoder" > < pattern >%d{yyyy-mm-dd hh:mm:ss.sss} %-5level %logger line:%-3l - %msg%n</ pattern > < charset >utf-8</ charset > </ encoder > <!-- 此日志文件只记录info级别的 --> < filter class = "ch.qos.logback.classic.filter.levelfilter" > < level >error</ level > < onmatch >accept</ onmatch > < onmismatch >deny</ onmismatch > </ filter > </ appender > <!-- 日志记录器,日期滚动记录 --> < appender name = "filewarn" class = "ch.qos.logback.core.rolling.rollingfileappender" > <!-- 正在记录的日志文件的路径及文件名 --> < file >${log_path}/${appdir}/log_warn.log</ file > <!-- 日志记录器的滚动策略,按日期,按大小记录 --> < rollingpolicy class = "ch.qos.logback.core.rolling.timebasedrollingpolicy" > <!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。 而2013-12-21的日志文件在由filenamepattern指定。%d{yyyy-mm-dd}指定日期格式,%i指定索引 --> < filenamepattern >${log_path}/${appdir}/warn/log-warn-%d{yyyy-mm-dd}.%i.log</ filenamepattern > <!-- 除按日志记录之外,还配置了日志文件不能超过2m,若超过2m,日志文件会以索引0开始, 命名日志文件,例如log-error-2013-12-21.0.log --> < timebasedfilenamingandtriggeringpolicy class = "ch.qos.logback.core.rolling.sizeandtimebasedfnatp" > < maxfilesize >2mb</ maxfilesize > </ timebasedfilenamingandtriggeringpolicy > </ rollingpolicy > <!-- 追加方式记录日志 --> < append >true</ append > <!-- 日志文件的格式 --> < encoder class = "ch.qos.logback.classic.encoder.patternlayoutencoder" > < pattern >%d{yyyy-mm-dd hh:mm:ss.sss} %-5level %logger line:%-3l - %msg%n</ pattern > < charset >utf-8</ charset > </ encoder > <!-- 此日志文件只记录info级别的 --> < filter class = "ch.qos.logback.classic.filter.levelfilter" > < level >warn</ level > < onmatch >accept</ onmatch > < onmismatch >deny</ onmismatch > </ filter > </ appender > <!-- 日志记录器,日期滚动记录 --> < appender name = "fileinfo" class = "ch.qos.logback.core.rolling.rollingfileappender" > <!-- 正在记录的日志文件的路径及文件名 --> < file >${log_path}/${appdir}/log_info.log</ file > <!-- 日志记录器的滚动策略,按日期,按大小记录 --> < rollingpolicy class = "ch.qos.logback.core.rolling.timebasedrollingpolicy" > <!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。 而2013-12-21的日志文件在由filenamepattern指定。%d{yyyy-mm-dd}指定日期格式,%i指定索引 --> < filenamepattern >${log_path}/${appdir}/info/log-info-%d{yyyy-mm-dd}.%i.log</ filenamepattern > <!-- 除按日志记录之外,还配置了日志文件不能超过2m,若超过2m,日志文件会以索引0开始, 命名日志文件,例如log-error-2013-12-21.0.log --> < timebasedfilenamingandtriggeringpolicy class = "ch.qos.logback.core.rolling.sizeandtimebasedfnatp" > < maxfilesize >2mb</ maxfilesize > </ timebasedfilenamingandtriggeringpolicy > </ rollingpolicy > <!-- 追加方式记录日志 --> < append >true</ append > <!-- 日志文件的格式 --> < encoder class = "ch.qos.logback.classic.encoder.patternlayoutencoder" > < pattern >%d{yyyy-mm-dd hh:mm:ss.sss} %-5level %logger line:%-3l - %msg%n</ pattern > < charset >utf-8</ charset > </ encoder > <!-- 此日志文件只记录info级别的 --> < filter class = "ch.qos.logback.classic.filter.levelfilter" > < level >info</ level > < onmatch >accept</ onmatch > < onmismatch >deny</ onmismatch > </ filter > </ appender > < appender name = "jsonlog" class = "ch.qos.logback.core.rolling.rollingfileappender" > <!-- 正在记录的日志文件的路径及文件名 --> < file >${log_path}/${appdir}/log_ipinterceptor.log</ file > < rollingpolicy class = "ch.qos.logback.core.rolling.timebasedrollingpolicy" > < filenamepattern >${log_path}/${appdir}/log_ipinterceptor.%d{yyyy-mm-dd}.log</ filenamepattern > </ rollingpolicy > < encoder class = "net.logstash.logback.encoder.loggingeventcompositejsonencoder" > < jsonfactorydecorator class = "net.logstash.logback.decorate.characterescapesjsonfactorydecorator" > < escape > < targetcharactercode >10</ targetcharactercode > < escapesequence >u2028</ escapesequence > </ escape > </ jsonfactorydecorator > < providers > < pattern > < pattern > { "timestamp":"%date{iso8601}", "uid":"%mdc{uid}", "requestip":"%mdc{ip}", "id":"%mdc{id}", "clienttype":"%mdc{clienttype}", "v":"%mdc{v}", "decode":"%mdc{decode}", "dataid":"%mdc{dataid}", "datatype":"%mdc{datatype}", "vid":"%mdc{vid}", "did":"%mdc{did}", "cid":"%mdc{cid}", "tagid":"%mdc{tagid}" } </ pattern > </ pattern > </ providers > </ encoder > </ appender > <!-- 彩色日志 --> <!-- 彩色日志依赖的渲染类 --> < conversionrule conversionword = "clr" converterclass = "org.springframework.boot.logging.logback.colorconverter" /> < conversionrule conversionword = "wex" converterclass = "org.springframework.boot.logging.logback.whitespacethrowableproxyconverter" /> < conversionrule conversionword = "wex" converterclass = "org.springframework.boot.logging.logback.extendedwhitespacethrowableproxyconverter" /> <!-- 彩色日志格式 --> < property name = "console_log_pattern" value = "${console_log_pattern:-%clr(%d{yyyy-mm-dd hh:mm:ss.sss}){faint} %clr(${log_level_pattern:-%5p}) %clr(${pid:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${log_exception_conversion_word:-%wex}}" /> < appender name = "stdout" class = "ch.qos.logback.core.consoleappender" > <!--encoder 默认配置为patternlayoutencoder--> < encoder > < pattern >${console_log_pattern}</ pattern > < charset >utf-8</ charset > </ encoder > <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息--> < filter class = "ch.qos.logback.classic.filter.thresholdfilter" > < level >debug</ level > </ filter > </ appender > <!-- 指定项目中某个包,当有日志操作行为时的日志记录级别 --> <!-- rmjk.dao.mappe为根包,也就是只要是发生在这个根包下面的所有日志操作行为的权限都是debug --> <!-- 级别依次为【从高到低】:fatal > error > warn > info > debug > trace --> < logger name = "rmjk.dao.mapper" level = "debug" /> < logger name = "rmjk.service" level = "debug" /> <!--显示日志--> < logger name = "org.springframework.jdbc.core" additivity = "false" level = "debug" > < appender-ref ref = "stdout" /> < appender-ref ref = "fileinfo" /> </ logger > <!-- 打印json日志 --> < logger name = "ipinterceptor" level = "info" additivity = "false" > < appender-ref ref = "jsonlog" /> </ logger > <!-- 生产环境下,将此级别配置为适合的级别,以免日志文件太多或影响程序性能 --> < root level = "info" > < appender-ref ref = "fileerror" /> < appender-ref ref = "filewarn" /> < appender-ref ref = "fileinfo" /> <!-- 生产环境将请stdout,testfile去掉 --> < appender-ref ref = "stdout" /> </ root > </ configuration > |
其中的关键为:
1
2
3
|
< logger name = "ipinterceptor" level = "info" additivity = "false" > < appender-ref ref = "jsonlog" /> </ logger > |
在需要打印的文件中引入slf4j:
1
|
private static final logger log = loggerfactory.getlogger( "ipinterceptor" ); |
mdc中放入需要打印的信息:
1
2
3
|
mdc.put( "ip" , ipaddress); mdc.put( "path" , servletpath); mdc.put( "uid" , parammap.get( "uid" ) == null ? "" : parammap.get( "uid" ).tostring()); |
此时如果使用了log.info("msg")
的话,打印的内容会输入到日志的message中,日志格式如下:
修改logstash配置
修改/usr/config/logstash
目录下的beats-input.conf:
1
2
3
4
5
6
|
input { beats { port => 5044 codec => "json" } } |
只加了一句codec => "json"
,但是logstash会按照json格式来解析输入的内容。
因为修改了配置,重启elk:
docker restart elk
这样,当我们的日志生成完毕之后,使用filebeat导入到elk中,就可以通过kibana来进行日志分析了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://segmentfault.com/a/1190000020787873