Kafka 한번 살펴보자... Quickstart

2019. 6. 21. 17:11해보자/Kafka

발번역하면서, 살펴본다.

필요하다면 반드시, https://kafka.apache.org/quickstart 에 가서 직접 원문을 직접 읽어보는 것을 추천합니다.

 


해당 튜토리얼은 처음 해보며, Kafka 나 ZooKeeper 데이터가 전혀없다는 전제로 설명이 됩니다

Kafka console script 는 Unix 계열과 Windows 플랫폼별로 다르게 존재합니다.

Windows 플랫폼에서는 bin/ 대신에 bin\windows\ 을 사용하고,
스크립트 확장자를 sh에서 .bat로 바꿔서 실행하면 다 잘됩니다.

 

Step 1 : 다운로드

 

kafka 설치파일을 다운받아 아래와 같이 압축을 풀어주시면 되겠습니다.

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.12-2.2.0.tgz)

 

> tar -xzf kafka_2.12-2.2.0.tgz

> cd kafka_2.12-2.2.0

 

Step 2 : 서버 시작

 

Kafka는 Zookeeper를 사용하니, 먼저 Zookeeper를 시작하고 나서, Kafka server를 시작 해 주시길 바랍니다. 

 

PS D:\kafka> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

PS D:\kafka> .\bin\windows\kafka-server-start.bat .\config\server.properties

 

Step 3 : Topic 생성과 삭제

 

단일 파티션과 하나의 복제본에서 "test"라는 이름을 갖는 Topic을 생성해보도록 합시다.

그 다음 Topic 리스트 명령을 실행하면 등록된 Topic 이름을 확인 할 수 있습니다.

 

PS D:\kafka> .\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

PS D:\kafka> .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

 

이렇게 직접 수동으로 Topic을 만드는 방법이 있지만, 만약 존재하지 않은 Topic이 게시가 될 경우에는

Topic이 자동으로 생성되도록 브로커를 설정할수 있습니다. (기본 설정은 자동 설정으로 되어 있는 것 같아요.)

 

(추가)

토픽을 생성하는 것만 했지, 삭제하는 것을 하지 않았다. 

이부분에 대해서 검색을 해봤는데, 이래저래 문제가 많았는지, 제대로 가이드를 해주는 곳이 없어서 애먹었다.

 

우선 토픽 삭제 설정을 덮어쓰기해야 하기에, Zookeeper가 실행된 상태에서 kafka server를 시작한다.

PS D:\kafka> .\bin\windows\kafka-server-start.bat .\config\server.properties --override delete.topic.enable=true

 

그 삭제를 처리하면 되겠습니다.

 

PS D:\kafka> .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic test

Topic test is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

https://jaceklaskowski.gitbooks.io/apache-kafka/kafka-topic-deletion.html

 

 

Step 4 : 메시지 보내기

 

Kafka은 파일이나 표준 입력을 통해 입력을 받고, Kafka Cluster에 메시지를 보내는 Command Line Client를 포함하고 있습니다.

기본으로, 각 라인은 분리된 메시지로 보내집니다.

 

PS D:\kafka> .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

 

 

Step 5:  consumer 시작

 

kafka 표준 출력에 메세지를 dump out 할 Command Line Consumer를 가지고 있습니다.

 

PS D:\kafka>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

 

 

위 명령을 또 다른 터미널에서 실행하면, producer 터미널에 입력한 메세지가  consumer 터미널에 표시됩니다.

커맨드 라인 툴에 대한 다양한 추가 옵션은 위 명령에 인자값을 주지 말고 입력하면 두둥 하고 나옵니다.

 

PS D:\kafka> .\bin\windows\kafka-console-consumer.bat

 

Step 6 : 멀티-브로커 Cluster 셋팅

 

싱글 브로커만 실행했는데, Kafka의 경우, 단독 브로커는 Cluster가 한개일 뿐입니다.

좀 더 많은 브로커 인스턴스들을 시작하는 것 외에는 큰 변화가 없습니다.

그렇다면 이걸 좀 더 느껴보기 위해서, 세개의 노드로 Cluster를 확장해봅시다.

 (현재까지는 모두 로컬 머신에 전부 배치합니다.)

 

브로커 별 config 파일을 만들고, 아래와 같이 설정 값을 맞춰 보세요.

 

PS D:\kafka> cp .\config\server.properties .\config\server-1.properties

PS D:\kafka> cp .\config\server.properties .\config\server-2.properties

 

 

config/server-1.properties:

    broker.id=1

    listeners=PLAINTEXT://:9093

    log.dirs=/tmp/kafka-logs-1

config/server-2.properties:

    broker.id=2

    listeners=PLAINTEXT://:9094

    log.dirs=/tmp/kafka-logs-2

 

broker.id 속성값은 Cluster 내에서 유일한 값으로 설정해야 하는 이름입니다.

그리고 Port 와 로그 경로도 별도로 설정해야 충돌의 문제에서 벗어날수 있겠습니다.

앞서 이미 Zookeeper를 가지고 있고, 현재 동작중인 단일 노드를 가지고 있기 때문에,

두개의 노드만 추가로 시작하면 됩니다

 

PS D:\kafka> .\bin\windows\kafka-server-start.bat .\config\server-1.properties

PS D:\kafka> .\bin\windows\kafka-server-start.bat .\config\server-2.properties

 

3개의 replication-factor를 갖는 새로운 Topic을 생성하고, 목록을 확인해 보도록 하죠

 

PS D:\kafka> .\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

PS D:\kafka> .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

 

 

브로커가 무슨일을 하는지,  "describe topics" 명령을 실행해서 알아봅니다.

 

PS D:\kafka> .\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:segment.bytes=1073741824

        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

 

원본 Topic은 복제본이 없고,  생성할 때 Cluster에서 유일한 서버 0에 있다.

 

다시 consumer 랑 producer를 띄어서 새로운 Topic에 새로운 메시지를 게시하고 확인해보도록 합니다.

그 다음 테스트로, 결함 허용 테스트를 해보는 것입니다. 

Broker 1은 리더로 동작하는데, 이걸 kill 해보도록 합시다.

 

PS D:\kafka> .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic my-replicated-topic

>my test message 1

>my test message 2

>일괄 작업을 끝내시겠습니까 (Y/N)? Y

PS D:\kafka> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid

ProcessId

11880

PS D:\kafka> taskkill /pid 11880 /f

성공: 프로세스(PID 11880)가 종료되었습니다.

 

[ Unix/Linux ]

> ps aux | grep server-1.properties

7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

> kill -9 7564

 

 

그리고 나서 consumer를 띄워보면, 리더가 다운되더라도, 메시지는 아직까지 소비에 대해 유효한 것을 확인 할수 있습니ㅣ다.

 

PS D:\kafka> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

my test message 1

my test message 2

 

 

Step7 : Kafka Connect 사용 

 

처음 시작할 땐, 콘솔에서 데이터를 쓰고, 확인하는 것이 편리하겠지만, 

아마 다른 리소스에 있는 데이터를 사용하거나 Kafka에서 다른 시스템으로 데이터를 추출하는 것이 더 필요할 것입니다.

많은 시스템에 대해서, 커스터마이징 해서 통합 코드를 작성하는 번거로움 대신에,

Kafka Connect를 사용하여 간단히 데이터를 임포트 하거나 익스포트가 가능합니다.

 

Kafka Connect은 Kafka에서 데이터를 임포트하거나 익스포트 하는 툴이며, Kafka에 포함되어 있습니다.

Connector를 실행하고, 외부 시스템과 인터렉팅하여 커스텀 로직을 실행하는 확장  툴입니다.

quickstart에서는 파일에서 Kafka Topic으로 데이터를 임포트 하고, 데이터를 Kafka Topic에서 파일로 임포트 하는 

간단한 connector를 이용해서, Kafka Connect가 어떻게 동작하는지 확인해 보도록 합시다.

 

첫번째로, 테스트 할 시드 데이터를 만드는 것부터 시작합니다.

 

D:\kafka> echo foo> test.txt

D:\kafka> echo bar>> test.txt

 

Unix에선

echo -e "foo\nbar" > test.txt

 

싱글 모드로 connector 두개를 실행할 것입니다. 그리고 두 connector들은 단독, 로컬, 전용(dedicated) 프로세스상에 동작하는 것을 의미합니ㅣ다.

파라미터로 세개의 설정 파일을 제공합니다.

 

첫번째 파라미터인 "connect-standalone.properties"는 Kafka Connect 프로세스에 대한 설정인데,

Kafka broker와 같은 연결하기 위한 일반적인 구성과 데이터의 직렬화 포멧 설정등을 포함하고 있습니다.

나머지 구성파일들은 각각 생성하기 위한 connect를 명시합니다.

이들 파일은 유니크한 connector이름. 초기화 할  connector클래스 그리고 connector에 필요로한 다른 구성들을 포함합니다

 

 

PS D:\kafka> .\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-file-source.properties .\config\connect-file-sink.properties

 

Kafka와 함께 포함되어있는 이 샘플 설정 파일들은 기 실행된 기본 로컬 클러스터 설정을 사용하고 두개의 커넥터를 생성합니다.

첫번째는 input 파일에서 라인을 읽고 Kafka Topic에 produce 하는 source connector이고, 

두번째는 Kafka Topic에서 메세지를 읽고, output 파일에 라인별로 생산하는 sink connector 입니다.

 

startup 동안, Connector가 인스턴스 초기진행 중임을 나타내는 것을 포함하는 여러 로그 메시지를 보게됩니다.

Kafka Connect프로세스가 한번 시작되면, source connector는 test.txt 에서 라인을 읽기 시작하고 connect-test Topic에  파이프라인에 메세지를 생산하고, sink connector는 connect-test topic에서 메세지를 읽고, test.sink.txt 파일에  작성합니다.

출력 파일의 내용을 검증해서 완전한 파이프 라인을 통해 전달되는 그 데이터를 검증할 수 있습니다.

 

PS D:\kafka> more test.sink.txt

foo

boo

 

데이터는 connect-test Kafka Topic에 저장되고 있습니다.

따라서 Topic에 데이터를 보기 위해 console consumer를 실행할수 있습니다.

 

PS D:\kafka> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic connect-test --from-beginning

{"schema":{"type":"string","optional":false},"payload":"foo"}

{"schema":{"type":"string","optional":false},"payload":"boo"}

 

커넥터는 데이터를 처리를 계속하기에 파일에 데이터를 추가하고, 파이프 라인을 통해서 옮겨가는 것을 볼수 있다.

 

D:\kafka>echo another line>> test.txt

 

console consumer ouput과 sink file에 나타나는 라인을 볼수 있습니다.

 

 

Step 8 : 데이터 처리를 위해 Kafka Stream을 사용하세요

 

Kafka Streams은 mission-critical 실시간 어플리케이션 과 마이크로서비스를 위한  클라이언트 라이브러리입니다.

그리고 input 또는 output 데이터를 Kafka Cluster에 저장합니다.

Kafka Streams 은 작성의 단순함 과 표준 Java 와 Scala 어플리케이션을 배포하는 것을 결합합니다.

 

클라이언트 측에 어플리케이션에 높은 확장가능하고, 탄력적으로, 결함허용, 분산 그리고 더 많은 기능들을 만들기 위해 

Kafka의 서버측 Cluster 기술의 이득을 포함합니다.

 

quicksart 예제는 이 라이브러리가 코딩된 스트리밍하는 어플리케이션이 어떻게 동작하는지 설명합니다.

https://kafka.apache.org/22/documentation/streams/quickstart

 

다음주에는 이걸 한번 더 보도록 하겠습니다.