编辑: 戴静菡 | 2019-08-11 |
【商标声明】 及其它腾讯云服务相关的商标均为腾讯云计算(北京)有限责任公司及其关联公司所有.本文档涉及的第三方主体的商标,依法由权利人所有. 【服务声明】 本文档意在向客户介绍腾讯云全部或部分产品、服务的当时的整体概况,部分产品、服务的内容可能有所调整.您所购买的腾讯云产品、服务的种 类、服务标准等应由您与腾讯云之间的商业合同约定,除非双方另有约定,否则,腾讯云对本文档内容不做任何明示或模式的承诺或保证. 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第3 共14页 文档目录 快速入门 CKafka 使用入门 CKafka与开源Kafka兼容性说明 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第4 共14页 查看 CKafka 实例,创建 topic 申请通过后,您的 CKafka 控制台中会展示 CKafka 实例,单击实例信息可以查看实例详情.包括地域、内网 VIP、端口等信息. 快速入门 CKafka 使用入门 最近更新时间:2018-09-12 10:56:24 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第5 共14页在topic 管理页面,您可以创建 topic、指定 topic 的分区个数和副本个数. 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第6 共14页 注意: 当前 topic 名称输入后无法更改.此外,分区个数指定后,只能进行新增分区的操作.副本数指定后无法更改. 创建好 topic 和分区后,可以通过云服务器的 kafka 客户端对该实例进行生产和消费的操作. 分区数:一个物理上分区的概念,一个Topic可以包含一个或者多个partition,CKafka以partition作为分配单位 副本数:partition的副本个数,用于保障partition的高可用,为保障数据可靠性,当前不支持创建单副本topic,默认开启2副本. 注意:这里的副本数也算分区个数的,比如说客户创建了topic 1个, 分区6 , 副本2,那么分区额度一共用了162=12个. 如果超过了购买的最大分区个数,那么就会有提示,类似如下: 本地下载 Kafka 工具包 1. 安装 JDK 环境 本教程在腾讯云服务器上搭建 CKafka 环境,首先可以在购买页 选购云服务器,并登入.本次测试机器配置如下: 机器配置 操作系统 CentOS 6.8
64 位CPU 1核 内存 2GB 公网带宽 1Mbps 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第7 共14页 之后需要给云服务器安装 JDK. 1.1 下载 JDK,可以通过 wget 命令获取,如果需要其他不同版本也可以在官网进行下载. 建议使用 1.7 以上版本的 JDK,本教程的版本为 jdk1.7.0_79. 1.2 移动到固定文件夹并解压缩 mkdir /usr/local/jdk mv jdk-7u79-linux-x64.tar.gz /usr/local/jdk/ cd /usr/local/jdk/ tar -xzvf jdk-7u79-linux-x64.tar.gz 1.3 配置环境变量 vim /etc/profile 在文件末尾加入如下环境变量的配置: export JAVA_HOME=/usr/local/jdk/jdk1.7.0_79(JDK的解压目录) export JRE_HOME=/usr/local/jdk/jdk1.7.0_79/jre export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JRE_HOME/lib wq 保存退出后,使用 source /etc/profile 命令使文件立即生效. 1.4 验证 通过以下命令验证环境是否安装完成(javac 命令也可以),查看版本号是否一致: cd $JAVA_HOME/bin ./java -version $JAVA_HOME 为安装的 JDK 的主目录 出现下图则证明 jdk 安装完成. 2. 下载 Kafka 工具包 下载并解压 kafka 安装包.(Kafka 安装包官网下载地址>
>
) 当前 CKafka 100% 兼容 Kafka 0.9 0.10 版本,建议您下载相应版本的安装包. tar -xzvf kafka_2.10-0.10.2.0.tgz mv kafka_2.10-0.10.2.0 /opt/ 下载解压完成后,无需配置其他环境,直接可用. 可以通过 telnet 指令测试本机是否连通到 CKafka 实例: telnet IP
9092 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第8 共14页3. Kafka API 简单测试 发送消息: ./kafka-console-producer.sh --broker-list xxx.xxx.xxx.xxx:9092 --topic topicName This is a message This is another message 其中 broker-list 中的 IP 即为 CKafka 实例中的 VIP,topicName 为CKafka 实例中的 topic 名称. 接收消息(CKafka 默认隐藏 Zookeeper 集群): ./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --from-beginning --new-consumer --topic topicName This is a message This is another message 上述命令中,由于没有指定 consumer group 进行消费,系统会随机生成一个 group 进行消费.这样做容易达到 group 上限.因此推荐 指定 Group 的方式接收消息,首先需要在 consumer.properties 中配置下指定的 group name,如下图所示: 配置完成后,指定 consumer group 的命令如下所示: ./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --from-beginning --new-consumer --topic topicName --cons umer.config ../config/consumer.properties 注意: ConsumerConfig参数配置中,建议将auto.offset.reset配置为earliest,防止新的消费者分组不存在时,漏消费消息的情况发生. 原因:当创建一个新分组的消费者时,auto.offset.reset 值为 latest 时,表示消费最新的数据,即从 consumer 创建后生产的数据.这样会 导致之前产生的数据不消费. 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第9 共14页 查看对应的 CKafka 监控: 其他功能 开启白名单 CKafka 支持在 topic 维度开启 IP 白名单的功能,有效保证数据安全. 在新建 topic 和编辑 topic 页面均可以开启 IP 白名单. 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第10 共14页 设置消息保留时间 CKafka 支持设置消息保留时间,以分钟为单位,最短
1 分钟,最长保留
30 天. 注意: 这里设置的消息保留时间,过期的消息就会被删除,而删除的机制是按照 ckafka 的分片批量删除的,不是立刻删除的,目前分片的大小是 1G,如果分片不到 1G 就不会删除.因此,假如您设置的是
1 分钟,而分片的数据大小在
1 分钟内无法增到 1G,那么这个时间是无效的,建 议延长保留时间,这具体得看您数据的堆积速度. 快速入门 产品文档 版权所有:腾讯云计算(北京)有限责任公司 第11 共14页Ckafka起初以兼容0.9.x版本为目标,后续开发了0.10.x兼容版本.Ckafka兼容0.9系列以及0.10系列的生产/消费接口.但是现在暂不开放 Zookeeper地址,所以对于需要Zookeeper地址的High Level Consumer API暂不提供支持. Kafka Producer Type Producer变化 Kafka 0.8.1版本中,Producer API被重写.该客户端被官方推荐,其拥有更好的性能以及更多的功能,后续社区将维护新版本的Producer API. Producer改造 1) 新API写法DEMO Properties props = new Properties();
props.put( bootstrap.servers , localhost:4242 );
props.put( acks , all );
props.put( retries ,0);
props.put( batch.size , 16384);
props.put( linger.ms , 1);
props.put( buffer.memory , 33554432);
props.put( key.serializer , org.apache.kafka.common.serialization.StringSerializer );
props.put( value.serializer , org.apache.kafka.common.serialization.StringSerializer );
Producer producer = new KafkaProducer(props);
producer.send(new ProducerRecord( my-topic , Integer.toString(0), Integer.toString(0)));
producer.close();
2) 旧API写法DEMO Properties props = new Properties();
props.put( metadata.broker.list , broker1:9092 );
props.put( serializer.class , kafka.serializer.StringEncoder );
props.put( partitioner.class , example.producer.SimplePart........