공부/메세지

아파치 카프카(apache kafka) 스터디 - 3

JangGiraffe 2022. 10. 24. 22:04

오늘 공부한 내용

1. 토픽에 데이터를 넣는 방법
2. 토픽에 있는 데이터를 가져가는 방법 및 컨슈머 그룹짓는법
3. 컨슈머그룹의 상세조회를 통한 모니터링(?)방법
4. 토픽에 있는 데이터를 지우는 방법

 


0. 왜인지 카프카가 꺼져있어서 EC2서버에서 재실행 해준 뒤 로컬에 설치된 ubuntu에서 토픽 리스트를 조회해보기

카프카 실행. 주키퍼는 안꺼져있었다.
스터디 2에서 실습했던 부분은 다행히 잘 남아 있다.


1. kafka-console-producer.sh를 통해 토픽에 데이터 넣

kafka-console-producer.sh를 통해 전송되는 레코드 값은 String 타입으로만 전송 가능(메세지가 byte로 변환되고 byteArraySerializer로만 직렬화된다고함.)

메시지 키를 가지는 레코드를 추가하기 위해서는 --property "parse.key=true"를 , 키의 구분자를 변경하기 위해서는 --property "key.separator=:"를 사용한다. : 위치에 구분자를 넣으면 된다. (기본구분자는 탭이다)

 


2. kafka-console-consumer.sh를 통해 토픽에 전송한 데이터 확인하기

필수옵션은 다음과 같다.

kafka-console-consumer.sh --bootstrap-server 카프카 클러스터 정보 --topic 토픽이름

bin/kafka-console-consumer.sh
--bootstrap-server my-kafka:9092
--topic hello.kafka
--property print.key=true	//해당 옵션을 true로 하면 키를 보여준다.
--property key.separator="-" // 키구분자를 - 로 설정
--group hello-group //--group 옵션
--from-beginning

응답값이 엉망징창으로(입력한 순서와 다르게) 온다. 파티션의 개념때문에 생기는 현상으로, 모든 파티션으로부터 동일한 중요도의 데이터를 가져오면서 섞여서 가져오는것. 순서를 보장하고 싶다면 가장 좋은 방법은 파티션 1개로 구성된 토픽을 만드는것이라고 한다. (현재 hello.kafka의 파티션은 4개이다.)

위에서 --group 옵션을 통해 컨슈머 그룹을 생성할 수 있다.

--group 옵션을 통해 컨슈머그룹을 생성할 수 있고, 가져온 메시지를 커밋한다.
커밋이란 컨슈머가 특정 레코드까지 처리를 완료했다가 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것이다.

__consumer_offsets :(내부토픽) 커밋정보 저장 토픽

 


3. kafka-consumer-groups.sh를 통해 컨슈머그룹 확인하기

1)kafka-consumer-groups.sh --bootstrap-server 서버정보 --list 명령어를 통해 확인이 가능하다.

bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --list
hello-group

2) --group 그룹명 --describe를 통해 상세 조회도 가능하다.

ubuntu@DESKTOP-0VPC2Q6:~/kafka_2.12-2.5.0$ bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --group hello-group --describe

Consumer group 'hello-group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
hello-group     hello.kafka     3          4               4               0               -               -               -
hello-group     hello.kafka     2          3               3               0               -               -               -
hello-group     hello.kafka     1          7               7               0               -               -               -
hello-group     hello.kafka     0          5               5               0               -               -               -

- GROUP,TOPIC,PATITION : 조회한 컨슈머 그룹이 마지막으로 커밋한 그룹,토픽,파티션
- CURRENT-OFFSET : (가장최신 오프셋) 데이터가 파티션에 들어올 때마다 1씩 증가함.(데이터가 몇개인지 알 수 있다)
- LOG-END-OFFSET : (커밋한 오프셋)해당 그룹의 컨슈머가 어느 오프셋까지 커밋했나 알 수 있따.
- LAG : 토픽의 파티션에 있는 데이터를 가져가는 데에 얼마나 지연이 발생했나 나타내는 지표다. LOG-END-OFFSET과 CURRENT-OFFSET의 차인듯.
- CONSUMER-ID : 컨슈머의 토픽 할당을 카프카 내부적으로 구분하기 위해 사용하는 ID다.
- HOST : 컨슈머가 동작하는 host명
- CLIENT-ID : 컨슈머에 할당된 ID. 사용자가 지정가능, 지정하지 않으면 자동생성.

 


4. kafka-delete-records.sh를 통해 토픽의 데이터 삭제하기

json파일에 삭제 옵션을 넣어준 뒤 해당 파일을 옵션에 넣어 토픽의 데이터를 삭제할 수 있다.

명령어 : kafka-delete-records.sh --bootstrap-server 서버정보 --offset-jsonfile 파일위치/파일명

jsonFile에 들어갈 값

{"partitions":[{"topic":"hello.kafka","partition":0,"offset":1}],"version":1}

- topic : 토픽명
- partition : 파티션 번호
- offset : 삭제할 오프셋(0부터(가장오래된 데이터부터) 지정된 offset(여기선 1)까지)

 

반응형