AWS MSK에 접속하는 방법은 몇가지가 있습니다.
그중에서 가장 많이 사용하는 SASL/SCRAM 인증을 알아보겠습니다.
참고로 SASL/SCRAM은 username, password
를 활용하는 전통적인 방법입니다.
거기에 암호화 조건이 추가된 방식입니다.
AWS에서는 AWSSecrets Manager를 통해 암호 리소스를 관리합니다.
그리고 MSK 암호 리소스는 정해진 규약에 따라 AmazonMSK_접두사 형식을 갖춰야 합니다.
또한 Secrets Manager는 AWS KMS로 키를 생성합니다.
그러면 암호 리소스를 생성할 수 있습니다.
물론 실제 비밀번호는 암호화 되어 있고, 여기서는 편의를 위해 평문으로 쓰겠습니다.
자세한 내용은 아래 참고 사이트를 확인해 주세요.
보안 암호 - AmazonMSK_foo { "username": "foo", "password": "foo-secret" }
JKS는 Java Key Store
의 약자로 키를 저장하는 저장소입니다.
여기에서는 JAAS 에 지정된 인증 정보를 저장하게 됩니다.
참고로 JAAS는 Java Authentication And Authorization Service
의 약자로,
Java에서 인증과 권한 부여 서비스를 제공하는 API입니다.
설치된 JDK 안에 TrustStore인 cacerts 파일을 지정된 경로로 복사합니다.
보통 JKS 파일은 프로젝트의 /resources
디렉토리에 위치합니다.
cp /JDKFolder/jre/lib/security/cacerts /kafka.client.truststore.jks
이제 생성한 암호 리소스로 클라이언트에서 MSK로 연결해 보겠습니다.
여기서는 Consumer를 예로 들겠습니다.
참고로 ’ ’ 안에 문자열은 실제 Kafka 정보로 교체해야 합니다.
아래 예제는 기본 설정으로, 현업에서는 훨씬 다양한 옵션 설정이 필요합니다.
fun consumerFactory(): ConsumerFactory<String, Any> { var consumerProps = mutableMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to 'Broker EndPoint', ConsumerConfig.GROUP_ID_CONFIG to 'Consumer Group', ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeseriallizer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java ) val saslJassConfig = String.format( org.apache.kafka.common.security.scram.ScramLoginModule required username=%s password=%s;", "foo" "foo-secret" ) consumerProps["security.protocol"] = "SASL_SSL" consumerProps["sasl.mechanism"] = "SCRAM-SHA-512" consumerProps["ssl.truststore.location"] = "/kafka.client.truststore.jks" consumerProps["sasl.jaas.config"] = saslJassConfig }
아래는 AWS에서 암호 리소스를 생성하고 클라이언트에서 접속하는 가이드입니다.
그리고 다른 인증 방식 가이드도 추가되어 있습니다.