ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka
    공부 2022. 7. 28. 16:20
    728x90

     

    zookeeper를 통해 kafka 관리

    zookeeper를 실행 후 kafka 실행

     

    분산환경에서 서비스 제공

     

     

    kafka 폴더로 이동

     

    주키퍼 실행

     

     

    카프카 실행

     

     

    프로듀서 실행

    토픽 주제 - abcd

     

     

     

    컨슈머 실행

    topic 받고 싶은 주제

     

     

    프로듀서가 보낸 topic을 구독하고 있는 consumer가 받게 됨

     

     

    인텔리제이 사용시

    project structure -> module -> dependency -> 자바 아카이브 파일 추가

     

    이클립스 사용시

    프로젝트 우클릭 -> build path -> add External archives -> 파일 추가

     

    VSCode 사용시

    javaproject -> referenced libraries -> 파일 추가

     

     

    Producer설정

    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

     

     

    토픽 설정

            while(true){
                Scanner sc = new Scanner(System.in);
                String message=sc.nextLine();
               
                producer.send(new ProducerRecord<String,String>("topic", message));
            }

     

     

    프로듀서 전체 코드

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.Properties;
    import java.util.Scanner;

    public class Producer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            KafkaProducer<String, String> producer =
                    new KafkaProducer<String, String>(properties);
            while(true) {
                Scanner sc = new Scanner(System.in);
                String message = sc.nextLine();

                producer.send(new ProducerRecord<String, String>("abcd", message));
            }
        }
    }
     
     
     
    Consumer 설정
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
     

     

    토픽 지정

    consumer.subscribe(Collections.singletonList("abcd"));
     
     
     
    컨슈머 전체 코드
     
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;

    public class Consumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"abcd");

            KafkaConsumer<String,String>consumer = new KafkaConsumer<>(properties);

            consumer.subscribe(Collections.singletonList("abcd"));

            String message=null;

           
            while(true){
                ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
                for(ConsumerRecord<String, String> record: records){
                    System.out.println(record.value());
                }

            }
        }
    }

     

     

    728x90
    반응형

    '공부' 카테고리의 다른 글

    클라이언트 서버 채팅  (0) 2022.07.29
    DB  (0) 2022.07.28
    HashMap  (0) 2022.07.28
    서버 통신  (0) 2022.07.28
    서버 클라이언트 통신  (0) 2022.07.27

    댓글

Designed by Tistory.