Kafka는 왜 탄생했나?
단방향 통신
App and Services → Relational Data Ware House
Source Application, Target Application이 많아지면서 데이터를 전송해지는 라인이 매우 복잡해졌다.
Source Application, Target Application이 많아질수록 key value Store
즉, 데이터를 전송해지는 라인도 그만큼 많아졌다. 데이터를 전송해지는 라인이 많아지면
배포와 장애에 대응하기가 어려워진다.
데이터를 전송할 때 프로토콜포멧 파편화가 심해지고
추후 데이터 포멧 내부에 변경사항이 생기면 유지보수하기어려워진다.
apache kafka는 위와 같은 어려움을 해결해준다.
Source Application과 Target Application의 coupling을 약하게 하기위해 탄생하였다.
Source Application는 apache kafka에 데이터를 전송해주고
Target Application은 kafka에서 데이터를 가져오면 된다.
Source Application은 클릭로그, 결제로그
Source Application에서 보낼 수 있는 데이터전송 방식은 거의 제한이 없다.
json, tsv, avro, etc…. 여러 포멧을 지원한다.
Target Application은 로그적재, 로그처리
kafka는 topic이라는 개념이 있다.
Topic#1
Topic#2
Topic#3
쉽게 말해서 Q라고 생각하면된다.
Q에 데이터를 넣는 역할은 Source Application(Kafka Producer)
Q에 데이터를 가져오는 역할은 Target Application(Kafka Consumer)
Producer와 Consumer는 라이브러리로 되어있어 애플리케이션에서 구현이 가능하다.
Q의 역할을 한다.
Kafka에는 다양한 데이터가 들어갈 수 있는데 그 데이터가 들어갈 수 있는 공간을 Topic이라고 한다.
AMQP와는 다르게 동작한다
Kafka에서는 Topic을 여러개 생성할 수 있다.
Topic은 Database의 Table이나 파일시스템의 폴더와 유사한 성질을 가진다.
Topic에 Producer가 Data를 넣게되고 Consumer는 Data를 가져가게된다.
목적에 따라 click_log, send_sms, location_log 등과 같이
무슨 데이터를 담는지 명확히 명시하면 추후에도 유지보수를 편리하게 관리가 가능하다.
Kafka Topic 내부
- 하나의 Topic은 여러개의 파티션으로 구성이 가능하다.
01234 로 데이터를 가져온다.
예제 코드는 Java를 사용하며, Kafka의 Producer API와 Consumer API를 사용하여
데이터를 전송하고 수신하는 방법을 소개하겠다.
Producer API를 사용하여 데이터를 생성하고 Kafka 클러스터에 전송하며,
Consumer API를 사용하여 데이터를 수신하고 처리한다. 예제 코드는 다음과 같은 기능을 구현한다.
- Producer에서 데이터를 생성하고 Kafka 클러스터에 전송
- Consumer에서 데이터를 수신하고 출력
아래는 예제 코드의 간략한 흐름
- KafkaProducerExample.java
- KafkaProducer를 생성하고, 주어진 Topic에 데이터를 전송
- ProducerRecord 클래스를 사용하여 데이터를 생성하고, send() 메서드를 호출하여 데이터를 전송
- KafkaConsumerExample.java
- KafkaConsumer를 생성하고, 주어진 Topic에서 데이터를 수신
- poll() 메서드를 호출하여 데이터를 가져오고, 출력
- KafkaUtils.java
- Kafka 클러스터의 설정을 관리
- Properties 클래스를 사용하여 Kafka 클러스터의 주소, Serializer 및 Deserializer 등을 설정
위의 흐름을 기반으로 Kafka를 사용하는 방법을 이해하고,
실제로 데이터를 전송하고 수신하는 애플리케이션을 구현한 예제코드를 확인해보자.
- Producer 예제 코드
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
public class ProducerExample {
public static void main(String[] args) throws Exception{
// Topic 이름, Key, Value 설정
String topicName = "test";
String key = "Key1";
String value = "Value-1";
// Producer 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 메시지 전송
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,key,value);
producer.send(record);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
위의 예제 코드는 KafkaProducer를 생성하고, 주어진 Topic에 데이터를 전송하는 방법
- Properties 객체를 이용하여 Producer 설정을 구성합니다.
- bootstrap.servers : Kafka 클러스터의 서버와 포트 정보를 설정합니다. (콤마로 구분하여 여러개 설정 가능)
- key.serializer : Producer에서 사용할 key의 직렬화 방법을 설정합니다.
- value.serializer : Producer에서 사용할 value의 직렬화 방법을 설정합니다.
- KafkaProducer 객체를 생성합니다. 생성자로는 설정된 Properties 객체를 전달합니다.
- ProducerRecord 객체를 생성하여 전송할 메시지를 설정합니다. 생성자로는 Topic, Key, Value를 전달합니다.
- producer.send(record) 메소드를 호출하여 메시지를 전송합니다.
- producer.close() 메소드를 호출하여 Producer를 종료합니다.
- Consumer 예제 코드
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
public class ConsumerExample {
public static void main(String[] args) throws Exception{
// Topic 이름과 Consumer Group 설정
String topicName = "test";
String groupId = "test-group";
// Consumer 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName)); // Topic 구독
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); // 메시지 수신
for (ConsumerRecord<String, String> record : records) // 수신된 메시지에 대해 반복 처리
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
위의 예제 코드는 KafkaConsumer를 생성하고, 주어진 Topic에서 데이터를 수신하는 방법
- Properties 객체를 이용하여 Consumer 설정을 구성합니다.
- bootstrap.servers : Kafka 클러스터의 서버와 포트 정보를 설정합니다. (콤마로 구분하여 여러개 설정 가능)
- group.id : Consumer Group의 이름을 설정합니다. 같은 Consumer Group에 속한 Consumer는 Topic의 Partition을 공유하여 메시지를 처리합니다.
- key.deserializer : Consumer에서 사용할 key의 역직렬화 방법을 설정합니다.
- value.deserializer : Consumer에서 사용할 value의 역직렬화 방법을 설정합니다.
- KafkaConsumer 객체를 생성합니다. 생성자로는 설정된 Properties 객체를 전달합니다.
- consumer.subscribe(Arrays.asList(topicName)) 메소드를 호출하여 Topic을 구독합니다.
- consumer.poll(100) 메소드를 호출하여 메시지를 수신합니다. 매개변수는 poll timeout 값으로, milliseconds 단위입니다.
- ConsumerRecords 객체에 수신된 메시지가 저장됩니다.
- for 루프를 통해 ConsumerRecords에 저장된 메시지에 대해 반복 처리합니다.
- 각 메시지의 offset, key, value 정보를 출력합니다.
위 예제 코드들은 Kafka를 활용하여 메시지 전송 및 수신하는 기본적인 예제 코드이다.
- 이 코드는 Kafka 클러스터에 데이터를 생성하고 수신하는 방법이다.
- Kafka는 Topic, Partition 및 Offset이라는 세 가지 개념을 사용하여 데이터를 관리한다.
Topic은 데이터 스트림의 주제를 나타내며, Partition은 Topic을 나누어 관리하는 단위이다. - 각 Partition은 순차적으로 처리되며, 데이터의 위치를 나타내는 Offset을 가지고 있다.
Kafka는 대규모 실시간 데이터 스트림 처리를 위한 분산 메시징 시스템으로,
이를 사용하여 빠르고 안정적인 데이터 전송을 구현할 수 있다.
대기업은 카프카 전문가가 따로 있어서 그 것만 전문으로 한다고한다.
부럽다
'DevOps > kafka' 카테고리의 다른 글
[Kafka] Python에 kafka 활용한 간단예제 ft.Event-driven microservices (0) | 2023.05.08 |
---|---|
[Kafka] Kafka Manager API를 사용하여 Kafka 클러스터 모니터링하는 법: 카프카 클러스터의 고가용성확보 (0) | 2023.05.08 |