본문 바로가기
카테고리 없음

아파치 카프카 파이썬으로 구현하기

by 오피스포디 2022. 6. 12.
반응형

카프카 서버는 별도의 PC에 설치한 상태에서 파이썬으로 producer와 consumer를 구현하는 예제입니다. 카프카 서버를 구축하는 방법은 아래 링크 두개를 참고하세요.

 

 

시작에 앞서 라이브러리 하나만 설치하고 시작하겠습니다.

 

pip install kafka-python

 

파이썬에서 카프카 라이브러리는 몇개가 있는데, 다른 것들은 파이썬은 그냥 API처럼 쓰고 실제로 별도 설치를 해야 하는데 이건 그냥 이거만 설치하면 된다고 해서 간단히 구현해볼 때 많이 쓴다고 합니다. 이걸로 진행할게요.

 

producer 샘플 코드

먼저 카프카 프로듀서 정의를 합니다.

 

from kafka import KafkaProducer
from json import dumps

producer = KafkaProducer(acks=0, 
                         client_id='test_producer',
                         compression_type='gzip', 
                         bootstrap_servers=['192.168.0.xxx:9092'],
                         key_serializer=None,
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

 

부트스트랩 서버에는 본인의 카프카 서버 IP를 집어넣구요, client_id는 아무거나 써도 상관없습니다. 이제 데이터를 json 형태로 밀어넣어 줍니다.

 

data = {'str' : 'hello world'},
producer.send('test', value=data)
producer.flush()

 

토픽을 미리 만들 필요는 없고, 그냥 send 안에 토픽명을 적어 주면 알아서 만들어집니다. test라는 토픽을 만든 거죠. 기존에 카프카 서버에서 만들어둔 토픽을 써도 됩니다.

 

값을 몇 번 바꿔서 몇번 밀어넣어 줬다 치고, 컨슈머 쪽으로 넘어가 보죠.

 

consumer 샘플 코드

컨슈머를 정의합니다.

 

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'test',
     bootstrap_servers=['192.168.0.xxx:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='test-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')),
     consumer_timeout_ms=1000
)

 

이렇게 해 놓고, 한번 카프카에 쌓인 메세지를 받아와 보겠습니다.

 

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
        message.topic, message.partition, message.offset, message.key, message.value
    ))

 

이렇게 하면 메세지가 들어간 순서대로 뱉어낼 겁니다. 그리고 이 코드를 다시 한 번 실행하면 아무것도 안 나올 거예요. 이미 다 불러왔기 때문입니다.이때 프로듀서 쪽에서 다시 메세지를 밀어넣은 다음 다시 한번 받아오면, 새로운 메세지만 받아오게 됩니다.

 

만약에 뭔가 컨슈머에서 프로듀서가 메세지를 내는 족족 받아오는 걸 테스트하고 싶다면 아래와 같이 한번 해보시죠.

 

while 1:
    for message in consumer:
        print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
            message.topic, message.partition, message.offset, message.key, message.value
        ))
    
    time.sleep(1)

 

반응형

댓글