본문 바로가기
DevOps

[Kafka] 카프카 명령어 모음(웬만한 건 다 있음)

by Jayson Jeong 2024. 10. 7.

 

목차

  1. 카프카 설정 및 실행
  2. 토픽
  3. 컨슈머
  4. 메세지 송수신 테스트
  5. 카프카 부하 테스트

 

카프카 디렉토리 구조와 쉘 스크립트 목록

카프카를 구성하는 디렉토리 목록

디렉토리 설명
bin 카프카를 실행 및 관리 할 수 있는 쉘스크립트들이 있음
config 카프카의 설정 파일 디렉토리 server.properties 파일을 이용하여 카프카 설정을 적용할 수 있음
logs 카프카 로그 디렉토리

 

 

카프카 쉘스크립트 목록

 

커맨드를 실행하는 기본 경로는 카프카가 설치된 디렉토리(ex. kafka_2.13-3.2.0)에서 커맨드를 실행하는 것을 권장함.

bin 디렉토리까지 들어와서 커맨드를 실행할 경우 ./[스크립트명] 으로 커맨드를 실행해 줘야 함.

 

 

 


1. 카프카 설정 및 실행

1.1. 카프카(브로커) 서버 설정 정보 조회

#카프카 설정 파일 조회
vi config/server.properties


#주키퍼 및 브로커 포트 조회
netstat -ntpl
netstat -ntpl | grep 2181
netstat -ntpl | grep 9092

 

1.2. 카프카 시작 및 종료

1.2.1. 주키퍼 시작 및 종료

#주키퍼 실행
bin/zookeeper-server-start.sh config/zookeeper.properties

#주키퍼 종료
bin/zookeeper-server-stop.sh

#독립 주키퍼 실행
bin/zkServer.sh start

#독립 주키퍼 종료
bin/zkServer.sh stop

 

1.2.2. 브로커 시작 및 종료

#브로커 시작
bin/kafka-server-start.sh config/server.properties


#브로커 백그라운드로 실행
./bin/kafka-server-start.sh -daemon config/server.properties


#브로커 중지
bin/kafka-server-stop.sh


#브로커 재시작
bin/kafka-server-stop.sh && bin/kafka-server-start.sh config/server.properties

 

 

1.3. Dynamic Config 설정

1.3.1. Brocker의 Dynamic Config 확인

#Dynamic config 조회(broker)
bin/kafka-configs.sh --describe --bootstrap-server [카프카주소] --entity-type brokers --entity-name [브로커ID] 

ex)
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0

 

1.3.2. Topic의 Dynamic Config 확인

#Dynamic config 조회(topic)
bin/kafka-configs.sh --describe --bootstrap-server [카프카주소] --entity-type topics --entity-name [토픽명]
ex)
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics --entity-name TEST-TOPIC

 

1.3.3. Dynamic Config 추가 및 삭제

#추가
bin/kafka-configs.sh --alter --add-config --bootstrap-server [브로커주소] --entity-type brokers --entity-default
[config option 입력]
ex)
bin/kafka-configs.sh --alter --add-config --bootstrap-server localhost:9092 --entity-type brokers --entity-default
delete.topic.enable=true

#삭제
bin/kafka-configs.sh --alter --delete-config --bootstrap-server [브로커주소] --entity-type brokers --entity-default
[config option key 입력]
ex)
bin/kafka-configs.sh --alter --delete-config --bootstrap-server localhost:9092 --entity-type brokers --entity-default
delete.topic.enable

2. Broker 명령어

2.1. Broker 조회

#카프카 내장 주키퍼 사용 시
#브로커 목록 조회
bin/zookeeper-shell.sh [주키퍼주소] ls /brokers/ids
ex)
bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

#브로커 상태 조회
bin/zookeeper-shell.sh [주키퍼주소] get /brokers/ids/[브로커ID]
ex)
bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0


#별도로 설치한 주키퍼 사용 시
#브로커 목록 조회
bin/zkCli.sh -server [주키퍼주소] ls /brokers/ids
ex)
bin/zkCli.sh -server localhost:2181 ls /brokers/ids

#브로커 상태 조회
bin/zkCli.sh -server [주키퍼주소] get /brokers/ids/[브로커ID]
ex)
bin/zkCli.sh -server localhost:2181 get /brokers/ids/[브로커ID]

2. Topic 명령어

2.1. Topic 조회

#kafka로 조회
bin/kafka-topics.sh --list --bootstrap-server=[브로커주소]
ex)
bin/kafka-topics.sh --list --bootstrap-server=localhost:9092 

#zookeeper로 조회
bin/kafka-topics.sh --list --zookeeper [주키퍼주소]
ex)
bin/kafka-topics.sh --list --zookeeper localhost:2181

※bin 안에서 쉘스크립트 실행시 ./kafka-topics.sh 로 실행

 

2.2. Topic 생성

#Topic 생성
bin/kafka-topics.sh --create --topic [토픽명] --bootstrap-server [카프카주소] --replication-factor [복제값] --partitions [파티션값]
ex)
bin/kafka-topics.sh --create --topic TEST-TOPIC --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

 

replication-factor : 

partitions : 

2.3. Topic 삭제

#kafka를 이용
bin/kafka-topics.sh --delete --bootstrap-server [카프카주소] --topic [토픽명] 
ex)
bin/kafka-topics.sh --delete TEST-TOPIC --bootstrap-server localhost:9092 --topic

#zookeeper를 이용
bin/kafka-topics.sh --delete --zookeeper [주키퍼주소] --topic TEST-TOPIC
ex)
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic TEST-TOPIC

3. Consumer 명령어

3.1. 그룹

#그룹 리스트 조회
bin/kafka-consumer-groups.sh --list --bootstrap-server [브로커주소] 
ex) 
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 

#그룹 상세 조회
bin/kafka-consumer-groups.sh --describe --bootstrap-server [브로커주소] --group [그룹명] 
ex) 
bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group TEST-GROUP 

#그룹 생성
bin/kafka-console-consumer.sh --bootstrap-server [브로커주소] --topic [토픽명] --group [그룹명]
ex)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TEST-TOPIC --group TEST-GROUP

#그룹삭제
bin/kafka-consumer-groups.sh --delete --bootstrap-server [브로커주소] --group [그룹명]
ex)
bin/kafka-consumer-groups.sh --delete --bootstrap-server localhost:9092 --group TEST-GROUP

 


4. 메세지 송수신 테스트

4.1. Producer를 이용한 Topic에 메시지 보내기

#producer를 이용한 메시지 발송
bin/kafka-console-producer.sh --broker-list [브로커주소] --topic [토픽명]
[내용 입력]
ex)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TEST-TOPIC 엔터
test message sending


#property 추가
bin/kafka-console-producer.sh --broker-list [브로커주소] --topic [토픽명] --property [프로퍼티]
... 
--property [프로퍼티]

ex)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TEST-TOPIC --property "parse.key=true"
--property "print.key=true" 엔터
test message sending

 

4.2. Topic의 메시지 받기

#topic에 보관 중인 records 조회
bin/kafka-console-consumer.sh --bootstrap-server [브로커주소] --topic [토픽명] --from-beginning
ex)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TEST-TOPIC --from-beginning


#consumer를 이용한 메시지 받기
bin/kafka-console-consumer.sh --bootstrap-server [브로커주소] --topic [토픽명]
ex)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TEST-TOPIC


#Topic 내의 보관 중인 records 삭제
#Topic 데이터 삭제 주기를 줄였다가 늘리는 방식 1000ms -> 10000ms
bin/kafka-topics.sh --alter --bootstrap-server [브로커주소] --topic [토픽명] --config retention.ms=[ms값]
ex)
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic TEST-TOPIC --config retention.ms=1000
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic TEST-TOPIC --config retention.ms=10000


#Topic 내의 보관 중인 records 삭제
#삭제할 records의 topic, partitions, offset 범위를 설정한 json 파일을 생성 후
json 파일을 이용하여 records 삭제
vi test.json
{
    "partitions": [
        {
            "topic": "my-topic",
            "partition": 0,
            "offset": 3
        }
    ],
    "version": 1
}
bin/kafka-delete.records.sh --bootstrap-server localhost:9092 --offset-json-file test.json

 

 

 

 

 

 

Reference.

https://firststep-de.tistory.com/69#2-1.%20%EC%B9%B4%ED%94%84%EC%B9%B4%20%EB%B8%8C%EB%A1%9C%EC%BB%A4%20%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0%3A-1