공부

kafka

빈v 2022. 7. 28. 16:20
728x90

 

zookeeper를 통해 kafka 관리

zookeeper를 실행 후 kafka 실행

 

분산환경에서 서비스 제공

 

 

kafka 폴더로 이동

 

주키퍼 실행

 

 

카프카 실행

 

 

프로듀서 실행

토픽 주제 - abcd

 

 

 

컨슈머 실행

topic 받고 싶은 주제

 

 

프로듀서가 보낸 topic을 구독하고 있는 consumer가 받게 됨

 

 

인텔리제이 사용시

project structure -> module -> dependency -> 자바 아카이브 파일 추가

 

이클립스 사용시

프로젝트 우클릭 -> build path -> add External archives -> 파일 추가

 

VSCode 사용시

javaproject -> referenced libraries -> 파일 추가

 

 

Producer설정

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

 

 

토픽 설정

        while(true){
            Scanner sc = new Scanner(System.in);
            String message=sc.nextLine();
           
            producer.send(new ProducerRecord<String,String>("topic", message));
        }

 

 

프로듀서 전체 코드

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;

public class Producer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> producer =
                new KafkaProducer<String, String>(properties);
        while(true) {
            Scanner sc = new Scanner(System.in);
            String message = sc.nextLine();

            producer.send(new ProducerRecord<String, String>("abcd", message));
        }
    }
}
 
 
 
Consumer 설정
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
 

 

토픽 지정

consumer.subscribe(Collections.singletonList("abcd"));
 
 
 
컨슈머 전체 코드
 
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"abcd");

        KafkaConsumer<String,String>consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("abcd"));

        String message=null;

       
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            for(ConsumerRecord<String, String> record: records){
                System.out.println(record.value());
            }

        }
    }
}

 

 

728x90
반응형