dz网站建设视频教程,品牌建设方式有哪些,教学资源库网站建设立项申报书,设计理念怎么写模板默认内网访问#xff0c;要在外网访问的话#xff0c;需要在修改config/server.properties中的配置 将listeners和advertised.listeners的值用主机名进行替换#xff0c;在外用使用java进行生产者或消费者连接的时候#xff0c;不填写具体的IP#xff0c;填写安装kafka的主…默认内网访问要在外网访问的话需要在修改config/server.properties中的配置 将listeners和advertised.listeners的值用主机名进行替换在外用使用java进行生产者或消费者连接的时候不填写具体的IP填写安装kafka的主机名然后在hosts目录中配置该主机名对应的真是IP地址即可 以下命令都是摘抄与官网http://kafka.apache.org/quickstart 先启动zookeeper,默认自带的 bin/zookeeper-server-start.sh config/zookeeper.properties 然后启动kafka服务 bin/kafka-server-start.sh config/server.properties 列举拥有哪些topics bin/kafka-topics.sh --list --bootstrap-server localhost:9092 在服务器上打开一个生产者然后把输入的每行数据发送到kafka中的命令 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#后面光标提示数据数据然后回车就会发送到kafka中了 打开一个消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 当有数据往kafka的test主题发送消息这边就会进行消费。 java调用作为生产者和消费者代码 项目需要引入的依赖pom.xml project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.theorydance/groupIdartifactIdkafkademo/artifactIdversion0.0.1-SNAPSHOT/versionpackagingjar/packagingnamekafkademo/nameurlhttp://maven.apache.org/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.1.1/version/dependency/dependencies
/project 生产者代码ProducerDemo.java package com.theorydance.kafkademo;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class ProducerDemo {public static void main(String[] args){Properties properties new Properties();properties.put(bootstrap.servers, node125:9092);properties.put(acks, all);properties.put(retries, 0);properties.put(batch.size, 16384);properties.put(linger.ms, 1);properties.put(buffer.memory, 33554432);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer null;try {producer new KafkaProducerString, String(properties);for (int i 0; i 100; i) {String msg This is Message i;producer.send(new ProducerRecordString, String(HelloWorld, msg));System.out.println(Sent: msg);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}
} 消费者代码ConsumerDemo.java package com.theorydance.kafkademo;import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;public class ConsumerDemo {public static void main(String[] args) throws InterruptedException {Properties properties new Properties();properties.put(bootstrap.servers, node125:9092);properties.put(group.id, group-1);properties.put(enable.auto.commit, true);properties.put(auto.commit.interval.ms, 1000);properties .put(auto.offset.reset, earliest);properties.put(session.timeout.ms, 30000);properties.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);properties.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);while(true){MapString, ListPartitionInfo maps kafkaConsumer.listTopics();System.out.println(监听topicsmaps.keySet());SetString sets new HashSet();for (String topic : maps.keySet()) {if(topic.startsWith(Hello)){ // 制定规则监听哪一些的topicsets.add(topic);}}kafkaConsumer.subscribe(sets);long startTime System.currentTimeMillis();while (true) {ConsumerRecordsString, String records kafkaConsumer.poll(100);for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, value %s, topic %s, record.offset(), record.value(), record.topic());System.out.println();}long endTime System.currentTimeMillis();if(endTime - startTime 30000){System.out.println(------------------------------------------------------------------);break;}}}}
} 说明在实际需求中我需要收集在不同服务器上的日志微服务相同模块和不同模块或其他程序的日志采用的是flume进行收集希望能够对收集的日志进行分类区别是哪个程序产生的去网上找了一下在flume进行收集的时候能不能在日志前面加上应用的标识进行区别我没有找到如果有看到该博客的同行请不吝赐教。我这边就换了种思路就像前面我写的消费者示例一样不同的程序日志我往不同的topic中进行发送消息在消费者监听一定规则的topic然后进行消费这样就可以区分不同的应用程序的日志了。 转载于:https://www.cnblogs.com/TheoryDance/p/11183181.html