logstash가 모든 일을 다합니다.
couchDB의 변경내역이 발생할 경우 kafka로 해당 데이터를 전송해야 하는 연계성 요청이 들어왔습니다.
여러 가지 방안을 고민하였으나, 확장성과 유연성을 위해서 logstash를 선택하였습니다.
실제로 가능한지 테스트 하기 위해서 개발 환경을 docker 기반으로 구성하여 테스트 했습니다.
시작하기 전에
docker 간의 통신을 위해서 docker network를 추가 합니다.
myHome 이름으로 네트워크를 생성 합니다.
$ docker network create myHome
자세한 docker network 설명은 Docker 네트워크 사용법을 참조하셔요.
couchdb 설정
couchdb는 docker에서 bitnami 에서 제공하는 이미지로 생성하였습니다.
다음 명령어로 image를 다운받고 실행합니다.
$ docker run --network myHome -p 5984:5984 --name couchdb bitnami/couchdb:latest
추후 테스트를 위해서 5984 port를 연결합니다.
docker가 온전하게 기동되었다면, http://localhost:5984 에 접근하면 아래와 같은 내용을 확인 할 수 있습니다.
{
"couchdb": "Welcome",
"version": "3.1.1",
"git_sha": "ce596c65d",
"uuid": "b7449ad8eda14515e4e0782a84c41f5d",
"features": [
"access-ready",
"partitioned",
"pluggable-storage-engines",
"reshard",
"scheduler"
],
"vendor": {
"name": "The Apache Software Foundation"
}
}
kafka 설정
kafka의 경우 기본적으로 zookeeper가 필요 하게 됩니다. 이를 쉽게 처리 하기 위해서 docker-compose를 이용합니다.
docker-compose.yml 파일을 생성합니다.
version: "2"
networks:
default:
external: true
name: myHome
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:2
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
해당 파일의 생성이 완료 되었다면, 다음 명령어로 기동 합니다.
$ docker-compose up
기동이 문제 없이 되었다면 아래와 같은 로그가 확인 됩니다.
kafka_1 | [2021-05-17 03:39:13,937] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-48 in 103 milliseconds, of which 102 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
logstash 설정
먼저 pipline 디렉토리를 생성한 후 logstash.conf 파일을 생성합니다.
$ mkdir pipline
$ vi logstash.conf
# logstash.conf 파일 내용
input {
couchdb_changes {
id => "my_plugin_id"
host => "couchdb"
username => "admin"
password => "couchdb"
port => "5984"
db => "test"
}
}
output {
stdout { codec => "rubydebug" }
kafka {
codec => "json"
topic_id => "test_topic"
bootstrap_servers => "logstash_kafka_1:9092"
}
}
주요 정보는 input과 output에 대한 정의 이며, 이후 db 종류가 많이 생기면, input에 추가 하면 됩니다. kafka 나, couchdb의 host는 docker ps 정보에서 이름을 활용하면 됩니다.
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
03bb311bf431 docker.elastic.co/logstash/logstash-oss:7.12.1 "/usr/local/bin/dock…" 50 seconds ago Up 48 seconds 5044/tcp, 9600/tcp compassionate_yonath
28a0315d2422 bitnami/kafka:2 "/opt/bitnami/script…" 4 days ago Up 4 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp logstash_kafka_1
fa1b5e62784b bitnami/zookeeper:3 "/opt/bitnami/script…" 4 days ago Up 4 minutes 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 8080/tcp logstash_zookeeper_1
786c603b9ae2 bitnami/couchdb:latest "/opt/bitnami/script…" 4 days ago Up 3 hours 4369/tcp, 9100/tcp, 0.0.0.0:5984->5984/tcp, :::5984->5984/tcp couchdb
준비가 다 되었다면, pipeline 디렉토리 주소를 알맞게 설정후 다음 명령어로 docker 를 기동 합니다.
$ docker run --rm -it -v /Users/admin/DEV/test/logstash/pipeline/:/usr/share/logstash/pipeline/ --network myHome docker.elastic.co/logstash/logstash-oss:7.12.1
테스트
couchdb의 utils를 이용해서 데이터를 적제 할 수 있습니다.
접근 주소 : http://localhost:5984/_utils/
해당 주소에 접근하여 test database를 생성하고 documents를 생성합니다.
데이터를 생성 하였으면, kafka에 제대로 적재 되었는지 확인해봅니다.
docker exec 명령어를 이용해서 kafka 내부로 접근합니다.
$ docker exec -it logstash_kafka_1 /bin/bash
첫번째로 topic의 목록을 조회 해봅니다.
$ kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
mytopic
test_topic
logstash를 이용하여 생성한 test_topic 이 확인되었다면, 데이터도 console로 확인해봅니다.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning
{"@timestamp":"2021-05-12T09:22:28.111Z","doc_as_upsert":true,"doc":{"test":"test"},"@version":"1"}
{"@timestamp":"2021-05-17T00:35:33.135Z","doc_as_upsert":true,"doc":{"helo":"kafka"},"@version":"1"}
이로서 연동 테스트가 완료 되었습니다.