全国统一服务热线

400-6263721

  • 4
  • 5
  • 3
  • 8
  • 88

Logstach与flume对比

  大数据学习路线之Logstach与flume对比,没有集群的概念,logstach与flume都称为组

  logstash是用JRuby语言开发的

  组件的对比:

  logstach : input filter output

  flume : source channel sink

  优劣对比:

  logstach :

  安装简单,安装体积小

  有filter组件,使得该工具具有数据过滤,数据切分的功能

  可以与ES无缝结合

  具有数据容错功能,在数据采集的时候,如果发生宕机或断开的情况,会断点续传(会记录读取的偏移量)

  综上,该工具主要用途为采集日志数据

  flume:

  高可用方面要比logstach强大

  flume一直在强调数据的安全性,flume在数据传输过程中是由事务控制的

  flume可以应用在多类型数据传输领域

  数据对接

  将logstach.gz文件上传解压即可

  可以在logstach目录下创建conf文件,用来存储配置文件

  一 命令启动

  1.bin/logstash -e 'input { stdin {} } output { stdout{} }'

  stdin/stdout(标准输入输出流)

  hello xixi

  2018-09-12T21:58:58.649Z hadoop01 hello xixi

  hello haha

  2018-09-12T21:59:19.487Z hadoop01 hello haha

  2.bin/logstash -e 'input { stdin {} } output { stdout{codec => rubydebug} }'

  hello xixi

  {

  "message" => "hello xixi",

  "@version" => "1",

  "@timestamp" => "2018-09-12T22:00:49.612Z",

  "host" => "hadoop01"

  }

  3.es集群中 ,需要启动es集群

  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200"]} stdout{} }'

  输入命令后,es自动生成index,自动mapping.

  hello haha

  2018-09-12T22:13:05.361Z hadoop01 hehello haha

  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200", "192.168.88.82:9200"]} stdout{} }'

  4.kafka集群中,启动kafka集群

  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200", "192.168.88.82:9200"]} stdout{} }'

  二 配置文件启动

  需要启动zookeeper集群,kafka集群,es集群

  1.与kafka数据对接

  vi logstash-kafka.conf

  启动

  bin/logstash -f logstash-kafka.conf (-f:指定文件)

  在另一节点上启动kafka消费命令

  input {

  file {

  path => "/root/data/test.log"

  discover_interval => 5

  start_position => "beginning"

  }

  }

  output {

  kafka {

  topic_id => "test1"

  codec => plain {

  format => "%{message}"

  charset => "UTF-8"

  }

  bootstrap_servers => "node01:9092,node02:9092,node03:9092"

  }

  }

  2.与kafka-es数据对接

  vi logstash-es.conf

  #启动logstash

  bin/logstash -f logstash-es.conf

  在另一节点上启动kafka消费命令

  input {

  file {

  type => "gamelog"

  path => "/log/*/*.log"

  discover_interval => 10

  start_position => "beginning"

  }

  }

  output {

  elasticsearch {

  index => "gamelog-%{+YYYY.MM.dd}"

  hosts => ["node01:9200", "node02:9200", "node03:9200"]

  }

  }

  数据对接过程

  logstach节点存放: 哪个节点空闲资源多放入哪个节点 (灵活存放)

  1.启动logstach监控logserver目录,把数据采集到kafka

  2.启动另外一个logstach,监控kafka某个topic数据,把他采集到elasticsearch

  数据对接案例

  需要启动两个logstach,调用各个配置文件,进行对接

  1.采集数据到kafka

  cd conf

  创建配置文件: vi gs-kafka.conf

  input {

  file {

  codec => plain {

  charset => "GB2312"

  }

  path => "/root/basedir/*/*.txt"

  discover_interval => 5

  start_position => "beginning"

  }

  }

  output {

  kafka {

  topic_id => "gamelogs"

  codec => plain {

  format => "%{message}"

  charset => "GB2312"

  }

  bootstrap_servers => "node01:9092,node02:9092,node03:9092"

  }

  }

  创建kafka对应的topic

  bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic gamelogs

  2.在hadoop01上启动logstach

  bin/logstash -f conf/gs-kafka.conf

  3.在hadoop02上启动另外一个logstach

  cd logstach/conf

  vi kafka-es.conf

  input {

  kafka {

  type => "accesslogs"

  codec => "plain"

  auto_offset_reset => "smallest"

  group_id => "elas1"

  topic_id => "accesslogs"

  zk_connect => "node01:2181,node02:2181,node03:2181"

  }

  kafka {

  type => "gamelogs"

  auto_offset_reset => "smallest"

  codec => "plain"

  group_id => "elas2"

  topic_id => "gamelogs"

  zk_connect => "node01:2181,node02:2181,node03:2181"

  }

  }

  filter {

  if [type] == "accesslogs" {

  json {

  source => "message"

  remove_field => [ "message" ]

  target => "access"

  }

  }

  if [type] == "gamelogs" {

  mutate {

  split => { "message" => " " }

  add_field => {

  "event_type" => "%{message[3]}"

  "current_map" => "%{message[4]}"

  "current_X" => "%{message[5]}"

  "current_y" => "%{message[6]}"

  "user" => "%{message[7]}"

  "item" => "%{message[8]}"

  "item_id" => "%{message[9]}"

  "current_time" => "%{message[12]}"

  }

  remove_field => [ "message" ]

  }

  }

  }

  output {

  if [type] == "accesslogs" {

  elasticsearch {

  index => "accesslogs"

  codec => "json"

  hosts => ["node01:9200", "node02:9200", "node03:9200"]

  }

  }

  if [type] == "gamelogs" {

  elasticsearch {

  index => "gamelogs1"

  codec => plain {

  charset => "UTF-16BE"

  }

  hosts => ["node01:9200", "node02:9200", "node03:9200"]

  }

  }

  }

  bin/logstash -f conf/kafka-es.conf

  4.修改basedir文件中任意数据即可产生es的index文件

  5.网页数据存储在设置的/data/esdata中

  6.在网页中查找指定字段

  默认分词器为term,只能查找单个汉字,query_string可以查找全汉字

尊重原创文章,转载请注明出处与链接:http://www.mxiao.cn/1169/new/197659/违者必究! 以上就是广州IT培训学院 小编为您整理Logstach与flume对比的全部内容。

推荐课程 / RECOMMENDED COURSE

  • web前端开发

  • php开发

  • python全栈+人工智能

  • 大数据

  • 查看更多>>

定制专属于你的课程

10秒登记,定制专属于你的课程方案

填写下表,让专业老师根据你的性格爱好选择最适合你的。

版权所有:广州IT培训学院

温馨提示:提交留言后老师会第一时间与您联系!热线电话:400-6263721