(정성덕 차장님 참고자료) Kafka - IS Kafka Adapter



IS 에 Kafka Adapter 연동하기


1.. IS 서버 정보

1.1 IS 설치
192.168.1.178 webm/webm
hostname 변경 : hostnamectl set-hostname sdwebm_test01
home : /home/webm/IS105
image : ./SoftwareAGInstaller20191216-LinuxX86.bin -readImage /home/webm/product/SAG105_LINUX2.zip
라이선스 : /home/webm/product/0000580354_Integration_Server100.xml
DB정보 : jdbc:wm:oracle://192.168.1.135:1521 sdwm105/sdwm105

DBC 수행
/home/webm/IS105/common/db/bin
./dbConfigurator.sh -a create -d oracle -l "jdbc:wm:oracle://192.168.1.135:1521;databaseName=orcl" -u sdwm105 -p sdwm105 -v Latest -pr IS

-c ARS, -pr MWS, -c XRF, -c TNS

1.2 Kafka Adapter를 추가로 설치한다.
./SoftwareAGInstaller20191216-LinuxX86.bin -readImage /home/webm/product/Kafka-Adapter-Linux.zip
-- 최상혁차장통해 받음

1.3 Kafka 라이브러리 설치



IS에 kafka 라이브러리 삽입

/home/webm/kafka/kafka_2.13-2.7.0/libs 로 사용
라이브러리가 있지만 혹시모른 버전 호환성이슈로 최상혁차장이 준 jar파일 사용


2.. Connection 생성


2.1 Connection 생성 (송신용)

Adapters 메뉴에 Kafka가 추가됨 확인



JDBC Connection 만들듯이 만들 수 있다..
오른쪽 Connections Types은Fix를 해야 7개가 나오며, 안 할경우는 2개만 나온다








설정 정보

Acknowledgments
0 : 보장 안함 [producer will not wait, No guarantee ]
1 : 보장성 약함 [leader will write the message to its local log]
all : 보장성 강함 [strongest available guarantee]

Value Serializer Class : com.wm.adapter.wmkafka.idata.IDataSerializer
Key Serializer Class : com.wm.adapter.wmkafka.idata.IDataSerializer



2.2 Connection 확인


서비스명 : wm.adapter.wmkafka.topic:describe
Topics : quickstart-events
Connection 명 : SDTest.Conn:KafkaProducerConn





3.. 송신 테스트

3.1 Producer Adapter 생성


일반 JDBC Adapter 만들듯이 만들다.


생성된 모습

Topic 을 입력하는 부분이 있으나, Adapter 연동시 넣어도 된다.


Input 값

3.2 송신 서비스

- 이름 : SDTest.Svc:KafkaProduceSvc01

Connection에서 Serializer를 IDataSerializer 로 설정했으니, IData를 바로 연결한다.


입력값


수신값 확인



4.. 수신 테스트

4.1 Consumer Connection 생성

Consumer Connection 생성시 Group ID를 넣지 않으면 에러가 발생...


Group ID 는 config/consumer.properties 에서 확인 가능



groupId가 consumer.properties에 기재되어 있으나, server.properties로 옮겼다
이 과정이 꼭 필요한지 확인필요




-> Auto Offset Reset eairlist


4.2 Adapter 생성
- Consume용 Connection을 이용해서 Adapter 생성
- 이름 : SDTest.Adap:KafkaConAdap01

4.3 수신 테스트
- 위에서 생성한 Adapter를 호출한다.
- topic : quickstart-events
- connectionName : SDTest.Conn:KafkaConsumerConn



4.4 결과물
- Topic 처음 생성만들었을 때부터 데이터가 다 들어온다.

** 재실행 시, 새로 데이터를 다시 받는다,
> 기존에 받은거 다시 안받는 설정이 필요할 듯하다.
재실행하다 보면 가끔 데이터가 없다고 나온다,, 이건 무슨 의미지???
offSet 설정으로 다르게 설정할 수 있을것 같은데,, 실제로 잘 되지 않음...
offSet 설정을 잘못하면 Noti가 되지 않는다..


5.. Noti 테스트

5.1 Connection
- 수신용 Connection 활용

5.2 Listener 생성

- 이름 : SDTest.Noti:KafkaNoti01
- 입력값
SDTest.Conn:KafkaConsumerConn
Topic : quickstart-events > 필수

** Partition과 Offset 값을 여러가지 값을 넣어봤지만 안넣었을때,, 성공, 관련정보 확인 필요



( 수정 )


->파티션 설정

-> Listener Notifications Enable 설정





5.3 Listener Notifications 생성

- 이름 : SDTest.Noti:KafkaNoti01
- Type : Synchronous Lisntener Notification
- 생성시 SDTest.Svc:KafkaConNotiSvc01 Flow Service선택



5.4 Noti 받을 Flow Service
Noti생성시 생성된 Document를 Noti시 실행하는 Flow 서비스 Input에 연결한다.연결한다...


로그찍고, 입력값 저장하도록 한다..


5.5 결과물

- SDTest.Svc:KafkaProduceSvc01을 실행하여 데이터 전송
- Noti를 통해 SDTest.Svc:KafkaConNotiSvc01 가 실행됨을 로그에서 확인할 수 있다


savePipeline 확인



5.6 재실행

- 첫실행시 전체 데이터가 다 들어오고
- 재실행 시 추가된 데이터만 들어온다
- 어딘가에 마지막으로 수집한 데이터를 저장하고 있을 것이다.



** 기타 설정..
group.id 설정이 꼭 필요할지는 아직 모름..

consumer.properties에 설정되어 있는 group.id를 server.properties에 넣음
직접적인 연관은 없어 보인다.
** Consumer Connection 생성시 group.id 없을경우 에러가 발생 consumer.properties 에서 group.id를 찾아서 넣어보니, 에러가 발생하지 않았다
그러나, group.id는 오타를 내서 입력해도 에러가 발생하지 않는것보니.. 실제 validation 체크는 나중에 하는 것으로 보인다.

./config/server.properties 에 추가
# consumer group id
group.id=test-consumer-group





비고 사항


topic 이름과 partitions 확인
wm.adapter.wmkafka.topic:list


=========================== 다양한 로그 대응 =================================


** 비고 : ASync로 생성시 UM 없어서 관련 Warning 뜬다.. 현재 작동안 하는 것과 는 별개인듯



============================================================

Thread 늘리라고 로그 뜸
Cron Daemon Pool thread pool increased by 10 threads, current maximum is 35.


Connection Thread 수정해봄. 이것과 상관 없음..


Extend 수정...

watt.server.cronMaxThreads=1000
일단 더이상 발생하지 않음

=============================================================

Publish Events 클릭시 Error발생,,
Publish Events역할 확인 필요


EventTypeStore 는 어떻게 추가 해야 하나?



기타 작업

문서상에 있어서 작업했으나, 필요성을 확인 필요..
WmKafkaAdapter 패키지에도 라이브러리를 넣는다...
/home/webm/IS105/IntegrationServer/instances/default/packages/WmKafkaAdapter/code/jars




Serializer 종류




-- confluent --
io.confluent.kafka.serializers.KafkaAvroSerializer



SASL Kafka


Confluent Kafaka





Kafka 구성



이해하기 앞서 메세지 시스템과 kafka 기본 이해 필요

참고자료 )

Message 시스템

Kafka 개념
(★)
예제)


Kafka Clustering

kafka local multi broker 설정




  • 클러스터링 테스트 환경 수정 ( 2 Broker / 2 Zookeeper )

  • Server#1
    • 192.168.1.63:9092 ( Broker 1)
    • 192.168.1.63:2181 ( Zookeeper 1 )

  • Server#2
    • 192.168.1.122:9092 ( Broker 2 )
    • 192.168.1.122:2181 ( Zookeeper 2 )






step 1. kafka_Install/conf -> vi zookeeper.properties


step2. kafka_Install/conf -> vi server.properties

...


step3. myid 설정

  • zookeeper 설치 후 myid 파일을 생성하여 각 서버의 고유 ID를 설정한다.
  • myid 명시 파일은 dataDir = /tmp/zookeeper 폴더에 생성한다.
    • mkdir /tmp/zookeeper
    • echo {Server 고유id 값 | 1, 2, 3, ... } > /tmp/zookeeper/myid
    • Server#1 -> echo 1 / Server#2 -> echo 2



step4. zookeeper , kafka server 실행

zookeeper 실행 명령어
./zookeeper-server-start.sh /webM/kafka_2.13-2.7.0/config/zookeeper.properties &

kafka 실행 명령어
./kafka-server-start.sh /webM/kafka_2.13-2.7.0/config/server.properties &

-> Kafka 서버 실행 시 에러 발생 주의

Kafka 실행 할때 설정파일 전부 삭제하고 다시 명령어대로 실행 시키면 정상작동

주키퍼 시작 : zooStartup.sh
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> /dev/null &

nohup ./zookeeper-server-start.sh /webM/kafka_2.13-2.7.0/config/zookeeper.properties >> /dev/null &

카프카 시작 : kafkaStartup.sh
nohup ./bin/kafka-server-start.sh ./config/server.properties >> /dev/null &


step 5. Topics 관리


  1. 토픽 생성

예시 명령어 )

./kafka-topics.sh --create --zookeeper 192.168.1.63:2181, 192.168.1.122:2181 --replication-factor 2 --partitions1 --topic TopicName

./bin/kafka-topics.sh --create --topic TopicName --bootstrap-server 192.168.1.178:9092


zookeeper 에 토픽 생성 // Broker(kafka server) 에 토픽 생성 차이 ??? ???? ???

  • -create : topic 생성
  • -topic : 생성할 topic 명
  • -zookeeper : zookeeper가 실행 중인 Hostname
  • -partitions : 생성할 topic의 파티션 수
  • -replication-factor : 생성할 topic의 복사본 수 (cluster 수 만큼 MAX)

  1. 토픽 삭제

예시 명령어 )

./kafka-topics.sh --delete --zookeeper 192.168.1.63:2181, 192.168.1.122:2181 -- topic TopicName


  1. 토픽 정보 확인

./kafka-topics.sh --describe --zookeeper 192.168.1.63:2181, 192.168.1.122:2181 (--topic TopicName)


-> 생성한 Topic1과 Topic2의 구성을 확인 가능하다

step 6. Message 생성 / 소비 테스트

  • Pruducer (Cluster)
-> stand alone 일 때는 --broker-list 에 한 서버만 (./kafka-console-producer.sh --broker-list 192.168.1.63:9092 --topic Topic1)

  • Consumer

./kafka-console-consumer.sh --bootstrap-server 192.168.1.63:9092,192.168.1.122:9092 --topic Topic1 --from-beginning




Error 발생 시 ) Configured broker.id 1 doesn't match stored broker.id 0 in meta.properties


cf )


serializer 테스트 설정



kafka-console-producer --broker-list IPADDRESS:9092 --topic TOPIC --property value.serializer=org.apache.kafka.common.serialization.IntegerSerializer



Kafka Offset 테스트




192.168.1.67 ( IS server ) <-> 192.168.1.40 ( kafka server )

  1. Producer / Consumer Connection 세팅

  • Producer Connection

  • Consumer Connection



  1. IS Service

  • Producer

  • Notification Service

    • messages 를 JsonString으로 바꾸어 debugLog에 넣어줌

  1. DB & Insert Adapter세팅

    • TABLE 생성시 NUMERIC(22,0) 으로 Create -> Java.lang.long에 매핑 되는 데이터 타입

    • offset의 input field type을 java.math.BigDecimal -> java.lang.Object


  1. 테스트 결과

  • Producer Service

  • DB

  • Server Log