-
728x90
zookeeper를 통해 kafka 관리
zookeeper를 실행 후 kafka 실행
분산환경에서 서비스 제공
kafka 폴더로 이동
주키퍼 실행
카프카 실행
프로듀서 실행
토픽 주제 - abcd
컨슈머 실행
topic 받고 싶은 주제
프로듀서가 보낸 topic을 구독하고 있는 consumer가 받게 됨
인텔리제이 사용시
project structure -> module -> dependency -> 자바 아카이브 파일 추가
이클립스 사용시
프로젝트 우클릭 -> build path -> add External archives -> 파일 추가
VSCode 사용시
javaproject -> referenced libraries -> 파일 추가
Producer설정
Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());토픽 설정
while(true){Scanner sc = new Scanner(System.in);String message=sc.nextLine();producer.send(new ProducerRecord<String,String>("topic", message));}프로듀서 전체 코드
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;import java.util.Scanner;
public class Producer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> producer =new KafkaProducer<String, String>(properties);while(true) {Scanner sc = new Scanner(System.in);String message = sc.nextLine();
producer.send(new ProducerRecord<String, String>("abcd", message));}}}Consumer 설정Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());토픽 지정
consumer.subscribe(Collections.singletonList("abcd"));컨슈머 전체 코드import java.time.Duration;import java.util.Collections;import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;
public class Consumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"abcd");
KafkaConsumer<String,String>consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("abcd"));
String message=null;
while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));for(ConsumerRecord<String, String> record: records){System.out.println(record.value());}
}}}728x90반응형'공부' 카테고리의 다른 글
클라이언트 서버 채팅 (0) 2022.07.29 DB (0) 2022.07.28 HashMap (0) 2022.07.28 서버 통신 (0) 2022.07.28 서버 클라이언트 통신 (0) 2022.07.27