AWS MSK에 접속하는 방법은 몇가지가 있습니다.
그중에서 가장 많이 사용하는 SASL/SCRAM 인증을 알아보겠습니다.
참고로 SASL/SCRAM은 username, password를 활용하는 전통적인 방법입니다.
거기에 암호화 조건이 추가된 방식입니다.
AWS에서는 AWSSecrets Manager를 통해 암호 리소스를 관리합니다.
그리고 MSK 암호 리소스는 정해진 규약에 따라 AmazonMSK_접두사 형식을 갖춰야 합니다.
또한 Secrets Manager는 AWS KMS로 키를 생성합니다.
그러면 암호 리소스를 생성할 수 있습니다.
물론 실제 비밀번호는 암호화 되어 있고, 여기서는 편의를 위해 평문으로 쓰겠습니다.
자세한 내용은 아래 참고 사이트를 확인해 주세요.
보안 암호 - AmazonMSK_foo
{
"username": "foo",
"password": "foo-secret"
}
JKS는 Java Key Store의 약자로 키를 저장하는 저장소입니다.
여기에서는 JAAS 에 지정된 인증 정보를 저장하게 됩니다.
참고로 JAAS는 Java Authentication And Authorization Service의 약자로,
Java에서 인증과 권한 부여 서비스를 제공하는 API입니다.
설치된 JDK 안에 TrustStore인 cacerts 파일을 지정된 경로로 복사합니다.
보통 JKS 파일은 프로젝트의 /resources 디렉토리에 위치합니다.
cp /JDKFolder/jre/lib/security/cacerts /kafka.client.truststore.jks
이제 생성한 암호 리소스로 클라이언트에서 MSK로 연결해 보겠습니다.
여기서는 Consumer를 예로 들겠습니다.
참고로 ’ ’ 안에 문자열은 실제 Kafka 정보로 교체해야 합니다.
아래 예제는 기본 설정으로, 현업에서는 훨씬 다양한 옵션 설정이 필요합니다.
fun consumerFactory(): ConsumerFactory<String, Any> {
var consumerProps = mutableMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to 'Broker EndPoint',
ConsumerConfig.GROUP_ID_CONFIG to 'Consumer Group',
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeseriallizer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
)
val saslJassConfig = String.format(
org.apache.kafka.common.security.scram.ScramLoginModule required username=%s password=%s;",
"foo"
"foo-secret"
)
consumerProps["security.protocol"] = "SASL_SSL"
consumerProps["sasl.mechanism"] = "SCRAM-SHA-512"
consumerProps["ssl.truststore.location"] = "/kafka.client.truststore.jks"
consumerProps["sasl.jaas.config"] = saslJassConfig
}
아래는 AWS에서 암호 리소스를 생성하고 클라이언트에서 접속하는 가이드입니다.
그리고 다른 인증 방식 가이드도 추가되어 있습니다.