(정성덕 차장님 참고자료) Kafka - IS Kafka Adapter
IS 에 Kafka Adapter 연동하기
1.. IS 서버 정보
1.1 IS 설치
192.168.1.178 webm/webm
hostname 변경 : hostnamectl set-hostname sdwebm_test01
home : /home/webm/IS105
image : ./SoftwareAGInstaller20191216-LinuxX86.bin -readImage /home/webm/product/SAG105_LINUX2.zip
라이선스 : /home/webm/product/0000580354_Integration_Server100.xml
DB정보 : jdbc:wm:oracle://192.168.1.135:1521 sdwm105/sdwm105
DBC 수행
/home/webm/IS105/common/db/bin
./dbConfigurator.sh -a create -d oracle -l "jdbc:wm:oracle://192.168.1.135:1521;databaseName=orcl" -u sdwm105 -p sdwm105 -v Latest -pr IS
-c ARS, -pr MWS, -c XRF, -c TNS
1.2 Kafka Adapter를 추가로 설치한다.
./SoftwareAGInstaller20191216-LinuxX86.bin -readImage /home/webm/product/Kafka-Adapter-Linux.zip
-- 최상혁차장통해 받음
1.3 Kafka 라이브러리 설치

IS에 kafka 라이브러리 삽입
/home/webm/kafka/kafka_2.13-2.7.0/libs 로 사용
에 라이브러리가 있지만 혹시모른 버전 호환성이슈로 최상혁차장이 준 jar파일 사용
2.. Connection 생성
2.1 Connection 생성 (송신용)
Adapters 메뉴에 Kafka가 추가됨 확인

JDBC Connection 만들듯이 만들 수 있다..
오른쪽 Connections Types은Fix를 해야 7개가 나오며, 안 할경우는 2개만 나온다



설정 정보
Acknowledgments
0 : 보장 안함 [producer will not wait, No guarantee ]
1 : 보장성 약함 [leader will write the message to its local log]
all : 보장성 강함 [strongest available guarantee]
Value Serializer Class : com.wm.adapter.wmkafka.idata.IDataSerializer
Key Serializer Class : com.wm.adapter.wmkafka.idata.IDataSerializer

2.2 Connection 확인
서비스명 : wm.adapter.wmkafka.topic:describe
Topics : quickstart-events
Connection 명 : SDTest.Conn:KafkaProducerConn


3.. 송신 테스트
3.1 Producer Adapter 생성
일반 JDBC Adapter 만들듯이 만들다.

생성된 모습

Topic 을 입력하는 부분이 있으나, Adapter 연동시 넣어도 된다.

Input 값

3.2 송신 서비스
- 이름 : SDTest.Svc:KafkaProduceSvc01
Connection에서 Serializer를 IDataSerializer 로 설정했으니, IData를 바로 연결한다.

입력값

수신값 확인

4.. 수신 테스트
4.1 Consumer Connection 생성
Consumer Connection 생성시 Group ID를 넣지 않으면 에러가 발생...

Group ID 는 config/consumer.properties 에서 확인 가능

groupId가 consumer.properties에 기재되어 있으나, server.properties로 옮겼다
이 과정이 꼭 필요한지 확인필요

-> Auto Offset Reset eairlist
4.2 Adapter 생성
- Consume용 Connection을 이용해서 Adapter 생성
- 이름 : SDTest.Adap:KafkaConAdap01
4.3 수신 테스트
- 위에서 생성한 Adapter를 호출한다.
- topic : quickstart-events
- connectionName : SDTest.Conn:KafkaConsumerConn

4.4 결과물
- Topic 처음 생성만들었을 때부터 데이터가 다 들어온다.

** 재실행 시, 새로 데이터를 다시 받는다,
> 기존에 받은거 다시 안받는 설정이 필요할 듯하다.
재실행하다 보면 가끔 데이터가 없다고 나온다,, 이건 무슨 의미지???
offSet 설정으로 다르게 설정할 수 있을것 같은데,, 실제로 잘 되지 않음...
offSet 설정을 잘못하면 Noti가 되지 않는다..
5.. Noti 테스트
5.1 Connection
- 수신용 Connection 활용
5.2 Listener 생성
- 이름 : SDTest.Noti:KafkaNoti01
- 입력값
SDTest.Conn:KafkaConsumerConn
Topic : quickstart-events > 필수
** Partition과 Offset 값을 여러가지 값을 넣어봤지만 안넣었을때,, 성공, 관련정보 확인 필요

( 수정 )

->파티션 설정

-> Listener Notifications Enable 설정


5.3 Listener Notifications 생성
- 이름 : SDTest.Noti:KafkaNoti01
- Type : Synchronous Lisntener Notification
- 생성시 SDTest.Svc:KafkaConNotiSvc01 Flow Service선택


5.4 Noti 받을 Flow Service
Noti생성시 생성된 Document를 Noti시 실행하는 Flow 서비스 Input에 연결한다.연결한다...

로그찍고, 입력값 저장하도록 한다..

5.5 결과물
- SDTest.Svc:KafkaProduceSvc01을 실행하여 데이터 전송
- Noti를 통해 SDTest.Svc:KafkaConNotiSvc01 가 실행됨을 로그에서 확인할 수 있다

savePipeline 확인


5.6 재실행
- 첫실행시 전체 데이터가 다 들어오고
- 재실행 시 추가된 데이터만 들어온다
- 어딘가에 마지막으로 수집한 데이터를 저장하고 있을 것이다.
** 기타 설정..
group.id 설정이 꼭 필요할지는 아직 모름..
consumer.properties에 설정되어 있는 group.id를 server.properties에 넣음
직접적인 연관은 없어 보인다.
** Consumer Connection 생성시 group.id 없을경우 에러가 발생 consumer.properties 에서 group.id를 찾아서 넣어보니, 에러가 발생하지 않았다
그러나, group.id는 오타를 내서 입력해도 에러가 발생하지 않는것보니.. 실제 validation 체크는 나중에 하는 것으로 보인다.
./config/server.properties 에 추가
# consumer group id
group.id=test-consumer-group
비고 사항
topic 이름과 partitions 확인
wm.adapter.wmkafka.topic:list

=========================== 다양한 로그 대응 =================================
** 비고 : ASync로 생성시 UM 없어서 관련 Warning 뜬다.. 현재 작동안 하는 것과 는 별개인듯

============================================================
Thread 늘리라고 로그 뜸
Cron Daemon Pool thread pool increased by 10 threads, current maximum is 35.

Connection Thread 수정해봄. 이것과 상관 없음..

Extend 수정...
watt.server.cronMaxThreads=1000
일단 더이상 발생하지 않음
=============================================================
Publish Events 클릭시 Error발생,,
Publish Events역할 확인 필요

EventTypeStore 는 어떻게 추가 해야 하나?
기타 작업
문서상에 있어서 작업했으나, 필요성을 확인 필요..
WmKafkaAdapter 패키지에도 라이브러리를 넣는다...
/home/webm/IS105/IntegrationServer/instances/default/packages/WmKafkaAdapter/code/jars

Serializer 종류
-- confluent --
io.confluent.kafka.serializers.KafkaAvroSerializer
SASL Kafka
Confluent Kafaka

Kafka 구성
이해하기 앞서 메세지 시스템과 kafka 기본 이해 필요
참고자료 )
Message 시스템
Kafka 개념
(★)
예제)
Kafka Clustering
kafka local multi broker 설정

- 클러스터링 테스트 환경 수정 ( 2 Broker / 2 Zookeeper )
- Server#1
- 192.168.1.63:9092 ( Broker 1)
- 192.168.1.63:2181 ( Zookeeper 1 )
- Server#2
- 192.168.1.122:9092 ( Broker 2 )
- 192.168.1.122:2181 ( Zookeeper 2 )
step 1. kafka_Install/conf -> vi zookeeper.properties

step2. kafka_Install/conf -> vi server.properties

...

step3. myid 설정
- zookeeper 설치 후 myid 파일을 생성하여 각 서버의 고유 ID를 설정한다.
- myid 명시 파일은 dataDir = /tmp/zookeeper 폴더에 생성한다.
- mkdir /tmp/zookeeper
- echo {Server 고유id 값 | 1, 2, 3, ... } > /tmp/zookeeper/myid
- Server#1 -> echo 1 / Server#2 -> echo 2

step4. zookeeper , kafka server 실행
Kafka 실행 할때 설정파일 전부 삭제하고 다시 명령어대로 실행 시키면 정상작동
주키퍼 시작 : zooStartup.sh
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> /dev/null &
nohup ./zookeeper-server-start.sh /webM/kafka_2.13-2.7.0/config/zookeeper.properties >> /dev/null &
카프카 시작 : kafkaStartup.sh
nohup ./bin/kafka-server-start.sh ./config/server.properties >> /dev/null &
step 5. Topics 관리
- 토픽 생성
예시 명령어 )
./kafka-topics.sh --create --zookeeper 192.168.1.63:2181, 192.168.1.122:2181 --replication-factor 2 --partitions1 --topic TopicName
./bin/kafka-topics.sh --create --topic TopicName --bootstrap-server 192.168.1.178:9092
zookeeper 에 토픽 생성 // Broker(kafka server) 에 토픽 생성 차이 ??? ???? ???
- -create : topic 생성
- -topic : 생성할 topic 명
- -zookeeper : zookeeper가 실행 중인 Hostname
- -partitions : 생성할 topic의 파티션 수
- -replication-factor : 생성할 topic의 복사본 수 (cluster 수 만큼 MAX)
- 토픽 삭제
예시 명령어 )
./kafka-topics.sh --delete --zookeeper 192.168.1.63:2181, 192.168.1.122:2181 -- topic TopicName
- 토픽 정보 확인
./kafka-topics.sh --describe --zookeeper 192.168.1.63:2181, 192.168.1.122:2181 (--topic TopicName)

-> 생성한 Topic1과 Topic2의 구성을 확인 가능하다
step 6. Message 생성 / 소비 테스트
- Pruducer (Cluster)

-> stand alone 일 때는 --broker-list 에 한 서버만 (./kafka-console-producer.sh --broker-list 192.168.1.63:9092 --topic Topic1)
- Consumer
./kafka-console-consumer.sh --bootstrap-server 192.168.1.63:9092,192.168.1.122:9092 --topic Topic1 --from-beginning
Error 발생 시 ) Configured broker.id 1 doesn't match stored broker.id 0 in meta.properties

cf )


serializer 테스트 설정
kafka-console-producer --broker-list IPADDRESS:9092 --topic TOPIC --property value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
Kafka Offset 테스트
192.168.1.67 ( IS server ) <-> 192.168.1.40 ( kafka server )
- Producer / Consumer Connection 세팅
- Producer Connection

- Consumer Connection

- IS Service
- Producer

- Notification Service

- messages 를 JsonString으로 바꾸어 debugLog에 넣어줌
- DB & Insert Adapter세팅

- TABLE 생성시 NUMERIC(22,0) 으로 Create -> Java.lang.long에 매핑 되는 데이터 타입

- offset의 input field type을 java.math.BigDecimal -> java.lang.Object
- 테스트 결과
- Producer Service

- DB

- Server Log
