공부
kafka
빈v
2022. 7. 28. 16:20
728x90
zookeeper를 통해 kafka 관리
zookeeper를 실행 후 kafka 실행
분산환경에서 서비스 제공
kafka 폴더로 이동
주키퍼 실행
카프카 실행
프로듀서 실행
토픽 주제 - abcd
컨슈머 실행
topic 받고 싶은 주제
프로듀서가 보낸 topic을 구독하고 있는 consumer가 받게 됨
인텔리제이 사용시
project structure -> module -> dependency -> 자바 아카이브 파일 추가
이클립스 사용시
프로젝트 우클릭 -> build path -> add External archives -> 파일 추가
VSCode 사용시
javaproject -> referenced libraries -> 파일 추가
Producer설정
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
토픽 설정
while(true){
Scanner sc = new Scanner(System.in);
String message=sc.nextLine();
producer.send(new ProducerRecord<String,String>("topic", message));
}
프로듀서 전체 코드
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
KafkaProducer<String, String> producer =
new KafkaProducer<String, String>(properties);
while(true) {
Scanner sc = new Scanner(System.in);
String message = sc.nextLine();
producer.send(new ProducerRecord<String, String>("abcd", message));
}
}
}
Consumer 설정
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
토픽 지정
consumer.subscribe(Collections.singletonList("abcd"));
컨슈머 전체 코드
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"abcd");
KafkaConsumer<String,String>consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("abcd"));
String message=null;
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
for(ConsumerRecord<String, String> record: records){
System.out.println(record.value());
}
}
}
}
728x90
반응형