카프카 서버는 별도의 PC에 설치한 상태에서 파이썬으로 producer와 consumer를 구현하는 예제입니다. 카프카 서버를 구축하는 방법은 아래 링크 두개를 참고하세요.
- 카프카 서버 설치 : https://depotceffio.tistory.com/entry/우분투에-아파치-카프카-설치하기
- 카프카 서버 설정 : https://depotceffio.tistory.com/entry/아파치-카프카-서버-외부-접속-설정
시작에 앞서 라이브러리 하나만 설치하고 시작하겠습니다.
pip install kafka-python
파이썬에서 카프카 라이브러리는 몇개가 있는데, 다른 것들은 파이썬은 그냥 API처럼 쓰고 실제로 별도 설치를 해야 하는데 이건 그냥 이거만 설치하면 된다고 해서 간단히 구현해볼 때 많이 쓴다고 합니다. 이걸로 진행할게요.
producer 샘플 코드
먼저 카프카 프로듀서 정의를 합니다.
from kafka import KafkaProducer
from json import dumps
producer = KafkaProducer(acks=0,
client_id='test_producer',
compression_type='gzip',
bootstrap_servers=['192.168.0.xxx:9092'],
key_serializer=None,
value_serializer=lambda x: dumps(x).encode('utf-8'))
부트스트랩 서버에는 본인의 카프카 서버 IP를 집어넣구요, client_id는 아무거나 써도 상관없습니다. 이제 데이터를 json 형태로 밀어넣어 줍니다.
data = {'str' : 'hello world'},
producer.send('test', value=data)
producer.flush()
토픽을 미리 만들 필요는 없고, 그냥 send 안에 토픽명을 적어 주면 알아서 만들어집니다. test라는 토픽을 만든 거죠. 기존에 카프카 서버에서 만들어둔 토픽을 써도 됩니다.
값을 몇 번 바꿔서 몇번 밀어넣어 줬다 치고, 컨슈머 쪽으로 넘어가 보죠.
consumer 샘플 코드
컨슈머를 정의합니다.
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'test',
bootstrap_servers=['192.168.0.xxx:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='test-group',
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=1000
)
이렇게 해 놓고, 한번 카프카에 쌓인 메세지를 받아와 보겠습니다.
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
message.topic, message.partition, message.offset, message.key, message.value
))
이렇게 하면 메세지가 들어간 순서대로 뱉어낼 겁니다. 그리고 이 코드를 다시 한 번 실행하면 아무것도 안 나올 거예요. 이미 다 불러왔기 때문입니다.이때 프로듀서 쪽에서 다시 메세지를 밀어넣은 다음 다시 한번 받아오면, 새로운 메세지만 받아오게 됩니다.
만약에 뭔가 컨슈머에서 프로듀서가 메세지를 내는 족족 받아오는 걸 테스트하고 싶다면 아래와 같이 한번 해보시죠.
while 1:
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
message.topic, message.partition, message.offset, message.key, message.value
))
time.sleep(1)
댓글