HiberCell是一家开发预防癌症复发新疗法的生物技术公司,获得6075万美元A轮融资,
06-18
使用FileBeat收集Kafka日志到Elasticsearch 1.需求分析 数据中有一个kafka_server.log.tar.gz压缩包,里面包含了很多Kafka Server 日志,现在我们想在 Elasticsearch 中快速查询这些日志来定位问题。我们需要使用FileBeats将日志数据上传到Elasticsearch。
问题:首先,我们需要指定FileBeat收集哪些Kafka日志,因为FileBeats必须知道日志存储在哪里才能收集。其次,收集完这些数据后,需要指定FileBeats将收集到的日志输出到Elasticsearch,所以Elasticsearch的地址也必须指定。
2.配置FileBeats FileBeats配置文件主要分为两部分。从名称inputsoutput就可以看出,一个用于输入数据,另一个用于输出数据。
1、输入配置代码语言:javascript copy filebeat.inputs:- type: log启用: true paths: - /var/log/*.log #- FileBeats中的c:\programdata\elasticsearch\logs\*,可读取1或更多数据源。 2、输出配置 默认情况下,FileBeat 会将日志数据放入名为:filebeat-%filebeat 版本号%-yyyy.MM.dd 的索引中。
PS:FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置选项。三、配置文件 1.创建配置文件代码语言:javascript copy cd /export/server/es/filebeat-7.6.1-linux-x86_64vim filebeat_kafka_log.yml 2.复制配置文件代码语言:javascript copy filebeat。
输入:- 类型:启用日志:true 路径:- /export/server/es/data/kafka/server.log.*output.elasticsearch:主机:[“node1:”,“node2:”,“node3:”] 4、运行FileBeat1,运行FileBeat代码语言:javascript copy./filebeat -c filebeat_kafka_log.yml -e2,将日志数据上传到/var/kafka/log,并解压代码语言:javascript copy mkdir -p /export/server /es/data/kafka/tar -xvzf kafka_server.log.tar.gz 注意:文件权限错误 如果启动fileBeat时,报配置文件权限错误,请将其权限修改为-rw-r--r-- 5.查询数据 1、查看索引信息 GET /_cat/indices?v 代码语言:javascript Copy { "health": "green", "status": "open", "index": "filebeat-7.6.1-.12.05- 01”,“uuid”:“dplqB_hTQq2XeSk6S4tccQ”,“pri”:“1”,“rep”:“1”,“做cs.count":"80",????????"docs.deleted":"0",????????"store.size":"71.9mb",????????"pri.store.size":"35.8mb"????}GET?/filebeat-7.6.1 -.12.05-01/_search代码语言:javascript 复制????????????{????????????????"_index":?"filebeat-7.6.1-.12.05-01",????????????????"_type":?"_doc",????????????????"_id":?“-72pX3IBjTeClvZff0CB”,????????????????“_score” :?1,????????????????"_source":?{????????????????????"@timestamp":?"12-05T09:00:40.Z",?????????????????????"log":?{?????????????????????????“偏移量”:?3,????????????????????????“文件”:{?????????????????????????????“路径”:?“/var/kafka /log/server.log.12-05-16"????????????????????????}????????????????????},????????????????????"message":?"[12-05?09:01:30,]?INFO?套接字连接建立d、发起会话,客户端:/..88.:2,服务器:node1.cn/..88.:(org.apache.zookeeper.ClientCnxn)",???????????????????"input":?{?????????????????????????"type":?"log"?????????????????????} ,????????????????????“ecs”:?{????????????????????????“版本”:?“1.4.0”????????????????????},??????????????????????“主机”:?{?????????????????????????“名称”:“节点1”????????????????????},???????????????????“代理”:?{?????????????????????????“id”:?“b4c5c4dc-03c3-4ba4-dc 6afcb36d64", “版本”:“7.6.1”,????????????????????????“类型”:“filebeat”,???????????????????????“ephemeral_id”:“b8fbf7ab-bc37-46dd-86c7-fa7d74d36f63”,????????????????????????"hostname":?"node1"????????????????????}????????????????}????????????}FileBeat自动给我们添加了关于一些日志、采集类型、主机各个字段 6. 解决一条日志多行问题 在我们日常的日志处理中,经常会遇到日志出现异常的情况。类似下面的情况:代码语言:javascript复制[12-05 14:00:05,] WARN [ReplicaFetcherreplicaId=0,leaderId=1,fetcherId=0]发送Map的leader纪元请求时出错(test_10m-2 -> (currentLeaderEpoch=Optional[],leaderEpoch=))(kafka.server.ReplicaFetcherThread)java.io.IOException:连接到node2:(id:1机架:null)失败。
在 org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) 在 kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:) 在 kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:) 在 kafka。 server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:) 在 kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:) 在 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:) 在 kafka.utils.ShutdownableThread.run(ShutdownableThread.标量:96)[12-05 14:00:05,] INFO [ReplicaFetcher replicaId = 0,leaderId = 1,fetcherId = 0]重试分区test_10m-2的leaderEpoch请求,因为领导者报告错误:UNKNOWN_SERVER_ERROR(kafka.server.ReplicaFetcherThread)[12-05 14:00:08 ,]警告[ReplicaFetcherreplicaId=0,leaderId=1,fetcherId=0]无法建立与节点1(node2/..88.:)的连接。
经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) 在FileBeat中,Harvest逐行读取日志文件,但是上面的日志会有一个日志跨越多行。
当有异常信息时,肯定会出现多行。我们先看看这种情况如果不默认处理的话可能会出现什么问题。
1. 导入错误日志 1) 在 /export/server/es/data/kafka/ 中创建名为 server.log.12-05 的日志文件 2) 更改数据中的 err.txt 日志 将文本粘贴到文件中,然后观察 FileBeat。发现FileBeat已经针对日志文件启动Harvester并读取数据。
代码语言:javascript 复制 12-05T19:11:01。 INFO log/harvester.go: Harvester returned for file: /var/kafka/log/server.log.12-) 在Elasticsearch中检索该文件我们发现原来是一个日志中的异常信息被作为单独的消息进行处理~"message":"java.io.IOException:Connection to node2:(id; 这显然不符合我们的预期。
我们想要的是将所有异常消息合并到一个日志中。那么遇到这种情况该如何处理呢? 2.???????问题分析每条日志开头都有统一的格式。
以Kafka的日志消息为例,[12-05 14:00:05,]这是统一的格式。如果不是以这种形式开始的话,说明这一行一定是属于某个日志,而不是一个独立的日志。
因此,我们可以通过日志的开头来判断某一行是否是新日志。 3.??????FileBeat多行配置选项在FileBeat的配置中,有一个特殊的配置是为了解决一条日志跨越多行的问题。
主要针对以下三种配置: 代码语言:javascript copy multiline.pattern: ^\[multiline.negate: falsemultiline.match: aftermultiline.pattern 表示可以匹配日志的模式。默认配置是以 [ 开头的模式被视为日志。
新日志。 multiline.negate:配置该模式是否生效。
默认为 false。 multiline.match:指示是将不匹配的行追加到上一个日志还是下一个日志。
4.重新配置FileBeat1)修改filebeat.yml,添加以下内容代码语言:javascript复制filebeat.inputs:-type:logenabled:truepaths:-/var/kafka/log/server。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-06
06-06
06-17
06-18
06-17
06-18
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用