kafka入门

安装Kafka

根据架构,我们再安装kafka服务器,因为资源问题,我们已经虚拟了3台zookeeper服务器,接着我们就在一台虚拟机上面虚拟3个kafka服务,使之成为一个伪集群的概念

简介

kafka目前的定位是分布式流式处理平台和消息引擎。其核心架构:

  1. 生产者发送消息给kafka服务器
  2. 消费者从kafka服务器读取消息
  3. kafka服务器依托zookeeper集群进行服务的协调管理。

安装

首先我们需要三份配置文件,如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#config/server1.properties
borker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://192.168.31.64:9092
log.dirs=/data/logs/kafka1
zookeeper.connect=192.168.31.144:2181,192.168.31.20:2181,192.168.31.223:2181
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

---

# config/server2.properties
borker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://192.168.31.64:9093
log.dirs=/data/logs/kafka2
zookeeper.connect=192.168.31.144:2181,192.168.31.20:2181,192.168.31.223:2181
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

---

# config/server3.properties
borker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://192.168.31.64:9094
log.dirs=/data/logs/kafka3
zookeeper.connect=192.168.31.144:2181,192.168.31.20:2181,192.168.31.223:2181
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000
安装jps
1
yum install java-1.8.0-openjdk-devel.x86_64 -y
启动kafka服务器
1
2
3
./bin/kafka-server-start.sh -daemon config/server1.properties 
./bin/kafka-server-start.sh -daemon config/server2.properties
./bin/kafka-server-start.sh -daemon config/server3.properties
验证服务启动
1
2
3
4
5
6
>>> jps|grep Kafka

结果:
39969 Kafka
39364 Kafka
39644 Kafka
验证部署

我们要验证一下我们部署的可用性。

测试topic创建和删除
1
2
3
4
5
6
7
8
9
10
11
# 创建
./bin/kafka-topics.sh --zookeeper 192.168.31.144:2181,192.168.31.223:2181,192.168.31.20:2181 --create --topic test-topic --partitions 3 --replication-factor 3

# 查看topic
./bin/kafka-topics.sh --zookeeper 192.168.31.144:2181,192.168.31.223:2181,192.168.31.20:2181 -list

# 查看分区信息
./bin/kafka-topics.sh --zookeeper 192.168.31.144:2181,192.168.31.223:2181,192.168.31.20:2181 --describe --topic test-topic

# 删除
./bin/kafka-topics.sh --zookeeper 192.168.31.144:2181,192.168.31.223:2181,192.168.31.20:2181 --delete --topic test-topic
测试消息发送和消费
1
2
3
4
5
6
# 开两个终端
# 生产者
./bin/kafka-console-producer.sh --broker-list 192.168.31.64:9092,192.168.31.64:9093,192.168.31.64:9094 --topic test-topic

# 消费者
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.64:9092,192.168.31.64:9093,192.168.31.64:9094 --topic test-topic --from-beginning

kafka开发

概述

无论kafka如何变化,其核心一定是有外部的生产者给自己发送信息,然后有外部的消费者读取信息。kafka封装了一套二进制通信协议,可以使用任何的语言,按照这套协议进行开发编程,在这里我们使用python进行实践

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python 
#-*- coding:utf-8 -*-
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

producer = KafkaProducer(bootstrap_servers=['192.168.31.64:9092','192.168.31.64:9093','192.168.31.64:9094'],retries=5)

# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

producer.send('my-topic',key=b'test',value=b'this is testing message')

producer2 = KafkaProducer(bootstrap_servers=['192.168.31.64:9092','192.168.31.64:9093','192.168.31.64:9094'],value_serializer=lambda m:json.dumps(m).encode('utf-8'))
producer2.send('json-files','{"keyxxxx":"valuexxx"}')

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['192.168.31.64:9092','192.168.31.64:9093','192.168.31.64:9094'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))

# 实时取得数据

参考

-bash: jps: command not found

server.properties配置实践

consumber

producer