`

kafka小试

 
阅读更多

我使用的kafka版本 kafka_2.8.0-0.8.1.1.tgz

参考了官网手册http://kafka.apache.org/documentation.html#quickstart

和http://blog.csdn.net/hxpjava1/article/details/19160665  版本低一下,里面有些代码不兼容

  1. 下载kafka 地址http://mirrors.hust.edu.cn/apache/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz
tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1

    2.启动服务

       首先要启动zookeeper      

bin/zookeeper-server-start.sh config/zookeeper.properties &

      启动kafaka

bin/kafka-server-start.sh config/server.properties &

   3.创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

   查看是否创建成功

bin/kafka-topics.sh --list --zookeeper localhost:2181

  4.发送消息

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "test.kafka.com:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "key", "测试");
        producer.send(data);
        producer.close();
        System.out.println("结束");
    }
}

 5.接收消息

 

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerSample {
	
	public static void main(String[] args) {  
        // specify some consumer properties  
		Properties props = new Properties();  
       props.put("group.id", "test-consumer-group");
	   props.put("zookeeper.connect", "test.kafka.com:2181");
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");
		
		// Create the connection to the cluster  
		ConsumerConfig consumerConfig = new ConsumerConfig(props);  
		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
		
		        // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume  
		HashMap<String, Integer> map = new HashMap<String, Integer>();  
		map.put("test", 4); 
		 Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =  
		        consumerConnector.createMessageStreams(map);  
		List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");  
		
		        // create list of 4 threads to consume from each of the partitions   
		ExecutorService executor = Executors.newFixedThreadPool(4);  
		
		        // consume the messages in the threads  
		for (final KafkaStream<byte[], byte[]> stream : streams) {  
		    executor.submit(new Runnable() {  
		        public void run() {  
		            for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) { 
		            	System.out.println("topic:"+msgAndMetadata.topic());
		                String tmp = new String(msgAndMetadata.message());
		                System.out.println("message key: " + new String(msgAndMetadata.key())); 
		                System.out.println("message content: " + tmp);  
		            }  
		        }  
		    });  
		}  

	}
}

 

  6.注意的地方

     test.kafka.com  为域名映射,可以自己映射到自己的kafka的ip地址

     如果发送消息失败 看下防火墙是否关闭

    对于group.id可以查看config/consumer.properties的配置

  7.如果出现FailedToSendMessageException: Failed to send messages after 3 tries错误

     修改config/server.properties  链接zookeeper为

    zookeeper.connect=127.0.0.1:2181

    配置的时候最好通过域名映射添加topic

  8.maven配置文件

<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.0.0.Final</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.14</version>
		</dependency>
		<dependency>
		  <groupId>org.apache.kafka</groupId>
		  <artifactId>kafka_2.9.2</artifactId>
		  <version>0.8.1.1</version>
		</dependency> 
		<dependency>
		  <groupId>org.scala-lang</groupId>
		  <artifactId>scala-library</artifactId>
		  <version>2.9.3</version>
		</dependency>
		<dependency>
			<groupId>com.yammer.metrics</groupId>
			<artifactId>metrics-core</artifactId>
			<version>2.2.0</version>
		</dependency>
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.3</version>
		</dependency>
	</dependencies>

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics