Kafak环境搭建
Docker创建kafka数据dev环境
1 | docker run -d --name data-dev --restart always --net=host -e ADV_HOST=172.16.26.193 landoop/fast-data-dev |
下载kafka
http://kafka.apache.org/quickstart
启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.out 2>&1 &
启动kafka单机
bin/kafka-server-start.sh config/server.properties >>kafka.out 2>&1 &
集群启动
bin/kafka-server-start.sh config/server.properties >>kafka-0.out 2>&1 &
bin/kafka-server-start.sh config/server-1.properties >>kafka-1.out 2>&1 &
bin/kafka-server-start.sh config/server-2.properties >>kafka-2.out 2>&1 &
监控
Kafka监控工具KafkaOffsetMonitor配置及使用
1 | java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ |
TODO 监控原理
PS: kafka 日志默认保存7天.
topic创建
1 | bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic my-replicated-topic |
命令行消费者
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic nginx_log --from-beginning |
增加partition
1 | bin/kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --partitions 10 --topic nginx_log |
查看某个topic的 logSize
指的是topic各个分区的logSize
1 | bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 --topic nginx_log --time -1 |
time 为-2 表示查看offset的最小值, -1 表示最大值1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list api.quartz.ren:9092,api.quartz.ren:9093,api.quartz.ren:9094 -topic nginx_log --time -2
查看消费者组内的offset位置(消费情况)
1 | # To view offsets, as mentioned earlier, we "describe" the consumer group like this: |
Managing Consumer Groups
1 | # |
更改offset
1 | # 先查看一下customer的offset状态 |
以上reset 只能在 consumer inactive状态时,才可以. 问题: 这个操作的目的和结果是什么???
1 | earliest |