목차
- 카프카 설정 및 실행
- 토픽
- 컨슈머
- 메세지 송수신 테스트
- 카프카 부하 테스트
카프카 디렉토리 구조와 쉘 스크립트 목록
디렉토리 | 설명 |
bin | 카프카를 실행 및 관리 할 수 있는 쉘스크립트들이 있음 |
config | 카프카의 설정 파일 디렉토리 server.properties 파일을 이용하여 카프카 설정을 적용할 수 있음 |
logs | 카프카 로그 디렉토리 |
커맨드를 실행하는 기본 경로는 카프카가 설치된 디렉토리(ex. kafka_2.13-3.2.0)에서 커맨드를 실행하는 것을 권장함.
bin 디렉토리까지 들어와서 커맨드를 실행할 경우 ./[스크립트명] 으로 커맨드를 실행해 줘야 함.
1. 카프카 설정 및 실행
1.1. 카프카(브로커) 서버 설정 정보 조회
#카프카 설정 파일 조회
vi config/server.properties
#주키퍼 및 브로커 포트 조회
netstat -ntpl
netstat -ntpl | grep 2181
netstat -ntpl | grep 9092
1.2. 카프카 시작 및 종료
1.2.1. 주키퍼 시작 및 종료
#주키퍼 실행
bin/zookeeper-server-start.sh config/zookeeper.properties
#주키퍼 종료
bin/zookeeper-server-stop.sh
#독립 주키퍼 실행
bin/zkServer.sh start
#독립 주키퍼 종료
bin/zkServer.sh stop
1.2.2. 브로커 시작 및 종료
#브로커 시작
bin/kafka-server-start.sh config/server.properties
#브로커 백그라운드로 실행
./bin/kafka-server-start.sh -daemon config/server.properties
#브로커 중지
bin/kafka-server-stop.sh
#브로커 재시작
bin/kafka-server-stop.sh && bin/kafka-server-start.sh config/server.properties
1.3. Dynamic Config 설정
1.3.1. Brocker의 Dynamic Config 확인
#Dynamic config 조회(broker)
bin/kafka-configs.sh --describe --bootstrap-server [카프카주소] --entity-type brokers --entity-name [브로커ID]
ex)
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0
1.3.2. Topic의 Dynamic Config 확인
#Dynamic config 조회(topic)
bin/kafka-configs.sh --describe --bootstrap-server [카프카주소] --entity-type topics --entity-name [토픽명]
ex)
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics --entity-name TEST-TOPIC
1.3.3. Dynamic Config 추가 및 삭제
#추가
bin/kafka-configs.sh --alter --add-config --bootstrap-server [브로커주소] --entity-type brokers --entity-default
[config option 입력]
ex)
bin/kafka-configs.sh --alter --add-config --bootstrap-server localhost:9092 --entity-type brokers --entity-default
delete.topic.enable=true
#삭제
bin/kafka-configs.sh --alter --delete-config --bootstrap-server [브로커주소] --entity-type brokers --entity-default
[config option key 입력]
ex)
bin/kafka-configs.sh --alter --delete-config --bootstrap-server localhost:9092 --entity-type brokers --entity-default
delete.topic.enable
2. Broker 명령어
2.1. Broker 조회
#카프카 내장 주키퍼 사용 시
#브로커 목록 조회
bin/zookeeper-shell.sh [주키퍼주소] ls /brokers/ids
ex)
bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
#브로커 상태 조회
bin/zookeeper-shell.sh [주키퍼주소] get /brokers/ids/[브로커ID]
ex)
bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0
#별도로 설치한 주키퍼 사용 시
#브로커 목록 조회
bin/zkCli.sh -server [주키퍼주소] ls /brokers/ids
ex)
bin/zkCli.sh -server localhost:2181 ls /brokers/ids
#브로커 상태 조회
bin/zkCli.sh -server [주키퍼주소] get /brokers/ids/[브로커ID]
ex)
bin/zkCli.sh -server localhost:2181 get /brokers/ids/[브로커ID]
2. Topic 명령어
2.1. Topic 조회
#kafka로 조회
bin/kafka-topics.sh --list --bootstrap-server=[브로커주소]
ex)
bin/kafka-topics.sh --list --bootstrap-server=localhost:9092
#zookeeper로 조회
bin/kafka-topics.sh --list --zookeeper [주키퍼주소]
ex)
bin/kafka-topics.sh --list --zookeeper localhost:2181
※bin 안에서 쉘스크립트 실행시 ./kafka-topics.sh 로 실행
2.2. Topic 생성
#Topic 생성
bin/kafka-topics.sh --create --topic [토픽명] --bootstrap-server [카프카주소] --replication-factor [복제값] --partitions [파티션값]
ex)
bin/kafka-topics.sh --create --topic TEST-TOPIC --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
replication-factor :
partitions :
2.3. Topic 삭제
#kafka를 이용
bin/kafka-topics.sh --delete --bootstrap-server [카프카주소] --topic [토픽명]
ex)
bin/kafka-topics.sh --delete TEST-TOPIC --bootstrap-server localhost:9092 --topic
#zookeeper를 이용
bin/kafka-topics.sh --delete --zookeeper [주키퍼주소] --topic TEST-TOPIC
ex)
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic TEST-TOPIC
3. Consumer 명령어
3.1. 그룹
#그룹 리스트 조회
bin/kafka-consumer-groups.sh --list --bootstrap-server [브로커주소]
ex)
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
#그룹 상세 조회
bin/kafka-consumer-groups.sh --describe --bootstrap-server [브로커주소] --group [그룹명]
ex)
bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group TEST-GROUP
#그룹 생성
bin/kafka-console-consumer.sh --bootstrap-server [브로커주소] --topic [토픽명] --group [그룹명]
ex)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TEST-TOPIC --group TEST-GROUP
#그룹삭제
bin/kafka-consumer-groups.sh --delete --bootstrap-server [브로커주소] --group [그룹명]
ex)
bin/kafka-consumer-groups.sh --delete --bootstrap-server localhost:9092 --group TEST-GROUP
4. 메세지 송수신 테스트
4.1. Producer를 이용한 Topic에 메시지 보내기
#producer를 이용한 메시지 발송
bin/kafka-console-producer.sh --broker-list [브로커주소] --topic [토픽명]
[내용 입력]
ex)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TEST-TOPIC 엔터
test message sending
#property 추가
bin/kafka-console-producer.sh --broker-list [브로커주소] --topic [토픽명] --property [프로퍼티]
...
--property [프로퍼티]
ex)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TEST-TOPIC --property "parse.key=true"
--property "print.key=true" 엔터
test message sending
4.2. Topic의 메시지 받기
#topic에 보관 중인 records 조회
bin/kafka-console-consumer.sh --bootstrap-server [브로커주소] --topic [토픽명] --from-beginning
ex)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TEST-TOPIC --from-beginning
#consumer를 이용한 메시지 받기
bin/kafka-console-consumer.sh --bootstrap-server [브로커주소] --topic [토픽명]
ex)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TEST-TOPIC
#Topic 내의 보관 중인 records 삭제
#Topic 데이터 삭제 주기를 줄였다가 늘리는 방식 1000ms -> 10000ms
bin/kafka-topics.sh --alter --bootstrap-server [브로커주소] --topic [토픽명] --config retention.ms=[ms값]
ex)
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic TEST-TOPIC --config retention.ms=1000
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic TEST-TOPIC --config retention.ms=10000
#Topic 내의 보관 중인 records 삭제
#삭제할 records의 topic, partitions, offset 범위를 설정한 json 파일을 생성 후
json 파일을 이용하여 records 삭제
vi test.json
{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"offset": 3
}
],
"version": 1
}
bin/kafka-delete.records.sh --bootstrap-server localhost:9092 --offset-json-file test.json
Reference.
'DevOps' 카테고리의 다른 글
[API Gateway] Spring Cloud Gateway(SCG), KrakenD, Nginx 를 이용한 API Gateway 구축 (0) | 2024.02.27 |
---|---|
[Kubenetes] 쿠버네티스 완전 삭제 방법 (0) | 2024.02.22 |
[Kubernetes] Ubuntu 리눅스 쿠버네티스 클러스터 구축 방법 (0) | 2024.02.14 |
[Redis] 레디스 주요 명령어 모음 (0) | 2023.07.05 |
[Linux] EC2 Ubuntu 리눅스 Docker 설치 방법 및 도커 주요 명령어 (0) | 2023.07.05 |