我使用的kafka版本 kafka_2.8.0-0.8.1.1.tgz
参考了官网手册http://kafka.apache.org/documentation.html#quickstart
和http://blog.csdn.net/hxpjava1/article/details/19160665 版本低一下,里面有些代码不兼容
- 下载kafka 地址http://mirrors.hust.edu.cn/apache/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz
tar -xzf kafka_2.9.2-0.8.1.1.tgz cd kafka_2.9.2-0.8.1.1
2.启动服务
首先要启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafaka
bin/kafka-server-start.sh config/server.properties &
3.创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看是否创建成功
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.发送消息
import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("metadata.broker.list", "test.kafka.com:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "key", "测试"); producer.send(data); producer.close(); System.out.println("结束"); } }
5.接收消息
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerSample { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("group.id", "test-consumer-group"); props.put("zookeeper.connect", "test.kafka.com:2181"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume HashMap<String, Integer> map = new HashMap<String, Integer>(); map.put("test", 4); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map); List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new Runnable() { public void run() { for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) { System.out.println("topic:"+msgAndMetadata.topic()); String tmp = new String(msgAndMetadata.message()); System.out.println("message key: " + new String(msgAndMetadata.key())); System.out.println("message content: " + tmp); } } }); } } }
6.注意的地方
test.kafka.com 为域名映射,可以自己映射到自己的kafka的ip地址
如果发送消息失败 看下防火墙是否关闭
对于group.id可以查看config/consumer.properties的配置
7.如果出现FailedToSendMessageException: Failed to send messages after 3 tries错误
修改config/server.properties 链接zookeeper为
zookeeper.connect=127.0.0.1:2181
配置的时候最好通过域名映射添加topic
8.maven配置文件
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.0.Final</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.9.3</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> </dependencies>
相关推荐
kafka kafka kafka kafka kafka
kafka
kafka连接工具
kafka kafka kafka
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka 插件
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, kafka_2.11-2.3.0.tgz, kafka_2.11-2.3.1.tgz, ...
kafka的docker镜像包含了kafka,zookeeper 和kafkamanager,可以通过docker 来load 安装
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
1、图形化界面可以直观地查看 Kafka 的 Topic 里的内容 2、自由设置 Kafka 数据展示格式 3、使用 Kafka Tool 创建/删除 Topic 4、使用 Kafka Tool 模拟发送 Messages
【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题...
5、kafka监控工具Kafka-Eagle介绍及使用 网址:https://blog.csdn.net/chenwewi520feng/article/details/130581571 本文主要介绍了kafka监控工具Kafka-Eagle的使用。 本文依赖:kafka、zookeeper部署完成。 本分分为...
本人在北美刚刚毕业,目前面试的几家大厂包括小公司在面试中都频繁的问道kafka这个技术,作为大数据开发或者java全栈的开发者来说,2020年很有必要系统的学习一下kafka. 1.[全面][Kafka2.11][jdk1.8][ZooKeeper3.4.6...
kafka-map是一个连接kafka的页面工具
kafkapython教程_Kafka快速⼊门(⼗⼆)——Python客户端 Kafka快速⼊门(⼗⼆)——Python客户端 ⼀、confluent-kafka 1、confluent-kafka简介 confluent-kafka是Python模块,是对librdkafka的轻量级封装,⽀持Kafka ...
kafka安装包及安装步骤(原始安装及docker安装)
赠送jar包:kafka_2.11-0.10.0.1.jar; 赠送原API文档:kafka_2.11-0.10.0.1-javadoc.jar; 赠送源代码:kafka_2.11-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka_2.11-0.10.0.1.pom; 包含翻译后的API文档...
大数据,kafka
Kafka技术内幕:图文详解Kafka源码设计与实现 有书签 有源码