安装Kafka
根据架构,我们再安装kafka服务器,因为资源问题,我们已经虚拟了3台zookeeper服务器,接着我们就在一台虚拟机上面虚拟3个kafka服务,使之成为一个伪集群的概念
简介
kafka目前的定位是分布式流式处理平台和消息引擎。其核心架构:
生产者发送消息给kafka服务器
消费者从kafka服务器读取消息
kafka服务器依托zookeeper集群进行服务的协调管理。
安装
首先我们需要三份配置文件,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #config/server1.properties borker.id=0 delete.topic.enable=true listeners=PLAINTEXT://192.168.31.64:9092 log.dirs=/data/logs/kafka1 zookeeper.connect=192.168.31.144:2181,192.168.31.20:2181,192.168.31.223:2181 unclean.leader.election.enable=false zookeeper.connection.timeout.ms=6000 --- # config/server2.properties borker.id=1 delete.topic.enable=true listeners=PLAINTEXT://192.168.31.64:9093 log.dirs=/data/logs/kafka2 zookeeper.connect=192.168.31.144:2181,192.168.31.20:2181,192.168.31.223:2181 unclean.leader.election.enable=false zookeeper.connection.timeout.ms=6000 --- # config/server3.properties borker.id=2 delete.topic.enable=true listeners=PLAINTEXT://192.168.31.64:9094 log.dirs=/data/logs/kafka3 zookeeper.connect=192.168.31.144:2181,192.168.31.20:2181,192.168.31.223:2181 unclean.leader.election.enable=false zookeeper.connection.timeout.ms=6000
安装jps
1 yum install java-1.8.0-openjdk-devel.x86_64 -y
启动kafka服务器
1 2 3 ./bin/kafka-server-start.sh -daemon config/server1.properties ./bin/kafka-server-start.sh -daemon config/server2.properties ./bin/kafka-server-start.sh -daemon config/server3.properties
验证服务启动
1 2 3 4 5 6 >>> jps|grep Kafka 结果: 39969 Kafka 39364 Kafka 39644 Kafka
验证部署
我们要验证一下我们部署的可用性。
测试topic创建和删除
1 2 3 4 5 6 7 8 9 10 11 # 创建 ./bin/kafka-topics.sh --zookeeper 192.168.31.144:2181,192.168.31.223:2181,192.168.31.20:2181 --create --topic test-topic --partitions 3 --replication-factor 3 # 查看topic ./bin/kafka-topics.sh --zookeeper 192.168.31.144:2181,192.168.31.223:2181,192.168.31.20:2181 -list # 查看分区信息 ./bin/kafka-topics.sh --zookeeper 192.168.31.144:2181,192.168.31.223:2181,192.168.31.20:2181 --describe --topic test-topic # 删除 ./bin/kafka-topics.sh --zookeeper 192.168.31.144:2181,192.168.31.223:2181,192.168.31.20:2181 --delete --topic test-topic
测试消息发送和消费
1 2 3 4 5 6 # 开两个终端 # 生产者 ./bin/kafka-console-producer.sh --broker-list 192.168.31.64:9092,192.168.31.64:9093,192.168.31.64:9094 --topic test-topic # 消费者 ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.64:9092,192.168.31.64:9093,192.168.31.64:9094 --topic test-topic --from-beginning
kafka开发
概述
无论kafka如何变化,其核心一定是有外部的生产者给自己发送信息,然后有外部的消费者读取信息。kafka封装了一套二进制通信协议,可以使用任何的语言,按照这套协议进行开发编程,在这里我们使用python进行实践
producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #!/usr/bin/env python #-*- coding:utf-8 -*- from kafka import KafkaProducer from kafka.errors import KafkaError import json producer = KafkaProducer(bootstrap_servers=['192.168.31.64:9092','192.168.31.64:9093','192.168.31.64:9094'],retries=5) # Asynchronous by default future = producer.send('my-topic', b'raw_bytes') # Block for 'synchronous' sends try: record_metadata = future.get(timeout=10) except KafkaError: # Decide what to do if produce request failed... log.exception() pass print (record_metadata.topic) print (record_metadata.partition) print (record_metadata.offset) producer.send('my-topic',key=b'test',value=b'this is testing message') producer2 = KafkaProducer(bootstrap_servers=['192.168.31.64:9092','192.168.31.64:9093','192.168.31.64:9094'],value_serializer=lambda m:json.dumps(m).encode('utf-8')) producer2.send('json-files','{"keyxxxx":"valuexxx"}')
consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from kafka import KafkaConsumer # To consume latest messages and auto-commit offsets consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['192.168.31.64:9092','192.168.31.64:9093','192.168.31.64:9094']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) # 实时取得数据
参考
-bash: jps: command not found
server.properties配置实践
consumber
producer