DynamoDB Streams 기반 사용량 비례 과금 모델 구현 과정
Dugi 🎈
이번 글에서는 저번 문제편에 이어서, 작년 12월에 릴리즈된 사용량 비례 과금 모델을 Amazon DynamoDB Streams에 기반한 스트림 처리 시스템을 이용하여 구현한 과정에 대해 살펴보겠습니다.
사용량 비례 과금 모델의 요구사항을 다시 짚고 넘어갑니다.
사용자가 어떤 기능을 사용할 때마다 측정이 이루어져야 합니다.
따라서, 우리는 기능 사용을 담당하는 비즈니스 로직에서 “기능 사용 이벤트”를 발행하고, 이벤트 스트림을 구독하는 consumer에서 이벤트를 집계하여 고객이 사용한 총량을 계산합니다.
사용량 한도 기능:
기능 사용이 일어나는 로직에서 현재 사용량과 사용 한도를 비교할 수 있어야 합니다.
사용량 집계가 일어나는 윈도우는 고객사마다 서로 다르며, 변경될 수 있습니다.
고객마다 결제를 원하는 날짜가 다르기 때문에, 한 달간의 사용량을 집계하더라도 그 기준일이 각자 다릅니다.
실시간으로 변하는 사용량을 조회할 수 있어야 합니다.
사용량이 특정 수치에 도달하는 것을 트리거로 고객에게 이메일 알림을 보내는 등 원하는 로직을 실행할 수 있어야 합니다.
매 기능 사용이 일어날 때마다, 예를 들어, 동기적으로 데이터베이스의 특정 row를 업데이트하는 전략은 적절하지 않습니다. 동시성 문제나 집계 결과 오차, 낮은 처리 속도와 같은 문제 중 하나는 피할 수 없게 됩니다. 반면 스트림 처리 시스템을 기반으로 하는 경우 높은 처리량을 감당할 수 있습니다. 또한 실시간 처리의 딜레이를 초 단위 이하로 줄일 수 있어 집계 오차도 줄일 수 있습니다.
스트림 처리를 사용량 기반 과금 시스템의 기반 전략으로 가져가며 선택할 수 있는 기술의 폭이 넓어졌습니다. 2010년대 초 이 분야에서 많은 연구가 진행되면서 Kafka, Spark, Flink, 그 외에도 수많은 위대한 스트림 처리 프레임워크가 만들어졌기 때문입니다. 그러나, 오픈소스 진영의 스트림 처리 프레임워크는 선택에서 제외하기로 결정했습니다. 당시 사용량 기반 과금 시스템의 릴리즈 일자가 고정되어 바꿀 수 없었는데, 팀에서 사용하고 있지 않은 기술을 도입하여 시행착오를 겪을 수 있는 시간 여유가 없었기 때문입니다. 또한, 스트림 처리를 수행하는 분산 처리 시스템의 운영 경험이 없는 상태에서 과금 및 결제와 같은 중요 도메인에 바로 도입하는 위험을 부담하지 않기로 했습니다. 혹시 설정값을 잘못 넣었다가 데이터에 만들어지지 않거나... 오차가 생기거나... 날아가 버리면 안 되니까요..
반면 저희 팀에서 선택한 기술은 다시 돌고 돌아 채널톡과 스타트업의 영원한 친구, “fully managed” 서비스를 제공하는 AWS입니다. AWS에서 제공하는 스트림 처리 시스템 중 Amazon Kinesis Data Streams(KDS)과 Amazon DynamoDB Streams가 적절합니다. 저희 팀은 둘 모두 이미 활용하고 있는데, KDS는 내부 메시징 큐 시스템에, DynamoDB Streams는 primary database 역할을 하는 DynamoDB에서 secondary database로 데이터를 전파하는 데 사용합니다. 두 가지 기술 모두 managed service를 제공하여 운영적인 어려움이 없습니다.
KDS와 DynamoDB Streams는 높은 처리량을 지원하기 위해 둘 모두 데이터 스트림을 여러 파티션(샤드)로 나눈 모델을 가지고 있습니다. 파티션 내의 데이터 레코드는 순서에 따라 배열되어 있기 때문에, 데이터를 읽어나가는 consumer는 내가 어디까지 읽었는지 기록해 두면서 앞으로 데이터를 읽어나가게 됩니다. 파티션의 개수는 트래픽의 증감에 따라 함께 늘어나거나 줄어들기 때문에, consumer를 각 파티션에 배정하고 이것을 적절하게 coordinate하는 것이 중요합니다. 분산 어플리케이션에서 coordination을 직접 구현하는 것은 까다롭기 때문에 저희 팀은 AWS에서 제공하는 Kinesis Client Library(KCL)의 Java SDK를 사용하고 있습니다. 애플리케이션에서 각 데이터 레코드를 어떻게 처리할지 인터페이스를 구현해서 KCL에 제공하면, KCL은 스레드 풀을 운영하여 각 파티션의 데이터를 읽어나가며, 각 프로세스에게 할당된 파티션을 적절하게 조정하기 때문에 애플리케이션 개발자에게 매우 간편합니다. 채널톡 내부 메시징 큐 시스템에서 KCL을 2년 넘게 활용하고 있었기 때문에 KCL에 대한 기술적 검증은 이미 완료된 상태입니다.
KCL은 그 이름에서 추측할 수 있듯 원래 KDS를 consume하기 위해 만들어진 라이브러리이지만, DynamoDB Streams는 KDS와 비슷한 인터페이스를 가지고 있기 때문에 KCL을 약간의 조정과 함께 활용할 수 있습니다. DynamoDB Streams Kinesis Adapter를 붙이면 KCL로 DynamoDB Streams를 읽을 수 있습니다. (단 KCL v2는 지원되지 않고 v1만 사용할 수 있는 점은 유의해야 합니다!)
반면 KDS와 DynamoDB Streams는 세부적인 운영에서의 차이도 있습니다. DynamoDB Streams는 최대 24시간의 데이터 보관 기간(리텐션)만 가지는 데 반해 KDS는 최대 365일까지 데이터를 보관할 수 있습니다(물론 보관에 따른 비용을 지불해야 하지만요). DynamoDB의 변경사항을 바로 KDS로 전파하는 것도 AWS에서 built-in으로 바로 제공합니다. 하지만 이 경우 각 change data가 스트림에 중복으로 나타나거나 순서가 서로 바뀔 수 있습니다 [1]. 반면 DynamoDB Streams는 정확히 한 번 처리와 순서에 대한 보장을 제공합니다 [2].
이러한 점을 고려하여 사용량 이벤트 퍼블리시는 DynamoDB에 레코드 생성, 스트림 처리 집계는 DynamoDB Streams를 KCL로 읽어나가는 방식으로 가져가기로 결정했습니다. 데이터는 이미 DynamoDB에 보관되기 때문에 KDS의 긴 리텐션 기간을 이용하지 않아도 좋고 오히려 데이터를 쉽게 확인할 수 있어 디버깅에 용이합니다. DynamoDB에 보관되는 이벤트는 TTL을 활용하여 일정 기간이 지나면 삭제합니다.
다른 구조로는 DynamoDB나 KDS에 바로 publish한 사용량 이벤트 레코드를 S3에 모아 저장하고, 집계하는 것은 Athena로 하는 것을 생각해볼 수 있습니다. DynamoDB나 KDS에 데이터가 오래 저장될 필요가 없고, S3이 훨씬 저렴하기 때문에 저장 비용을 절약할 수 있다는 장점이 있습니다. 하지만 실시간으로 사용량이 업데이트 될 때마다 알림을 보내는 요구사항을 해결하기가 어렵기 때문에 이 구조로는 모든 것을 해결할 수 없습니다. 제품 기능으로 제공하는 실시간 사용량 업데이트로는 사용하기 어렵겠지만, 다만 집계하는 로직을 Athena에서 바꿔가면서 시험해 볼 수 있기 때문에 내부 분석용으로는 적합한 구조라고 생각합니다.
팀에서 KDS와 DynamoDB Streams를 활용하고 있던 사례로 내부 메시징 큐 시스템과 secondary database로의 변경사항 전파가 있습니다. 두 사례의 공통점은 스트림의 각 레코드를 독립적이고 stateless하게 처리할 수 있다는 것입니다. 하지만 스트림 집계의 경우 레코드를 읽어나가면서 집계 결과를 유지해야 하기 때문에 stateful한 처리가 필요하다는 것이 차이점입니다.
또, 저희 팀에서는 DynamoDB를 다양한 도메인에 활용하고 있기 때문에, DynamoDB의 전공분야가 아닌 집계를 이번 기회에 잘 정립할 수 있다면 다른 곳에도 적용할 수 있는 기회가 열려 있습니다. 저희 팀은 기능 사용량 집계를 DynamoDB Streams 처리 파이프라인으로 구현한 후, 실제로 집계 오차가 있었다고 문제편에서 언급했던 “일회성메시지, 캠페인, 서포트봇, 자동화규칙 등 부가기능의 집계값” 문제도 이 파이프라인을 통해 일부 개선할 수 있었습니다.
KCL이 애플리케이션 프로그래머에게 제공하는 인터페이스는 IRecordProcessor
로, 우리는 스트림을 읽어나가는 worker의 초기화 (initialize
), 각 레코드를 순차적으로 접근 (processRecords
), worker가 내려가야 할 상황에서의 cleanup (shutdown
) handler를 작성할 수 있습니다. Stateless 처리의 경우 initialize
와 shutdown
시점에는 중요한 동작이 필요하지 않고, processRecords
도 단순히 루프를 돌며 각 레코드에 필요한 처리를 하면 됩니다.
여기에서 체크포인트라는 개념에 유의해야 합니다. 체크포인트는 “스트림에서 어디까지 처리했다”는 정보를 외부에 기록해두는 것입니다. 아까 DynamoDB Streams와 KDS는 데이터가 여러 파티션(샤드)으로 나누어져 있다는 것을 기억하시죠? 파티션의 각 레코드는 순서대로 부여되는 sequence number를 가지고 있습니다. 스트림의 한 파티션을 따라 읽어가면서, 처리한 sequence number를 외부에 기록해두면 (체크포인트), 처리하던 프로세스가 종료되었을 때 다른 프로세스가 기록해둔 sequence number를 보고 그 다음부터 계속 작업을 이어갈 수 있게 됩니다.
그런데 체크포인트를 어디에 넣어야 할까요?
(1)과 (2) 모두 가능합니다. doSomeWork(record)
에서 작업이 실패하거나 도중에 프로세스가 비정상적으로 종료된 경우를 생각해봅시다. (1)의 경우 doSomeWork
이전에 이미 그 레코드에 대한 체크포인트를 기록했기 때문에, 다른 worker가 이 파티션을 잡게 되면 이 레코드는 건너뜁니다. (2)의 경우 체크포인트가 기록되지 않았기 때문에 그 레코드부터 다시 시작합니다. 즉, (1)은 레코드가 처리되지 않을 가능성이, (2)는 레코드가 한 번 이상 처리될 가능성이 있습니다. 바로 스트림 처리 시스템에서 자주 등장하는 개념인 “최대 한 번 처리” (at most once) 와 “최소 한 번 처리” (at least once) 입니다. 둘 중 어떤 것이 필요한지는 doSomeWork
에서 요구하는 것이 무엇이냐에 따라 다르기 때문에 알아서 잘 결정해야 합니다. 하지만 doSomeWork
가 멱등성 있는 (idempotent) 오퍼레이션인 경우, “최소 한 번 처리” 로 코드를 작성하면 우리는 “정확히 한 번 처리”하는 보장을 획득하게 됩니다.
여기에서 조금 더 생각해보면 stateful 처리에서 “정확히 한 번 처리”할 수 있는 아이디어로 이어지는데, doSomeWork
를 멱등성 있게 만드는 방법이 무엇인지 생각해보면 됩니다. (함수형 프로그래밍에 조예가 있으시다면 여기에서 State
monad를 떠올리실지도 모르겠네요.)
먼저 (6)에서 체크포인트를 레코드 처리 이후에 하고 있으므로, 우리는 기본적으로 “최소 한 번 처리” 전략을 따르게 됩니다. 외부 저장소 (repository
) 에 집계 결과(state
)를 기록하는데, 여기에 어디까지 처리했는지(sequenceNumber
)를 함께 저장하는 것이 멱등성 있는 처리의 아이디어입니다.
(4), (5)에서 중간 집계 결과를 저장하면서 그 집계 결과가 어디까지 처리한 것인지 sequence number를 함께 저장하는데, (6)에서 체크포인트 하기 전에 worker가 종료되더라도 다른 worker가 이 파티션을 잡게 되면 (2)에서 이미 처리한 레코드를 건너뛸 수 있기 때문에 레코드가 중복 처리되지 않습니다.
(5)에서 중간 집계 결과를 외부로 저장하기 전에 worker가 종료되면 그 worker가 수행한 작업은 손실되지만, 다른 worker가 (2)에서 건너뛰지 않고 레코드를 다시 집계하여 결과적으로 올바른 결과를 얻어낼 수 있습니다.
바로 위 bullet의 시나리오처럼 레코드가 다시 집계될 수 있기 때문에 doSomeWork
가 호출할 때마다 결과가 다르다면 (다시 말해, “순수”하지 않다면) 처리에 오류가 있을 수 있습니다. 외부 상태에 의존하지 않는 집계 함수를 사용하는 것이 필요합니다.
doSomeWork
에 버그가 있어 처리가 되지 않거나, 로직을 수정하고 싶은 경우에도 유연하게 대응할 수 있습니다. 어디까지 처리했는지 sequence number를 알고 있기 때문에, (DynamoDB Streams의 리텐션인 24시간 안에) 재배포하면 놓친 곳부터 다시 처리할 수 있고, 최신 레코드를 따라잡으면 싱크가 맞게 됩니다. 개발 중 이렇게 처리 상태를 리셋하는 것이 상당히 유용했습니다.
Stateful한 집계에서 batch(records
)에 대한 처리 결과를 한 번의 데이터베이스 write로 업데이트할 수 있다는 점이 성능을 개선할 수 있는 지점입니다. Batch에 대한 결과를 프로세스 종료로 인해 잃어버려도, 다른 worker가 저장 완료한 체크포인트부터 다시 안전하게 시작할 수 있습니다. Batch의 크기가 커질수록 데이터베이스 write를 줄여 더 높은 처리량(throughput)에 도달할 수 있지만, 그만큼 각 레코드에 대한 결과가 데이터베이스에 반영되기까지 딜레이가 있어 실시간성을 희생합니다.
Batch size나 딜레이는 KCL의 configuration을 통해 조정할 수 있습니다.
KinesisClientLibConfiguration#maxRecords
(default 10,000)
Batch의 최대 크기를 결정합니다.
KinesisClientLibConfiguration#idleTimeBetweenReadsInMillis
(default 1,000 / 1초)
Batch를 가져오는 polling 사이의 간격을 결정합니다. 이 간격이 클수록 polling을 가끔 하기 때문에 스트림에 새로운 레코드가 등장했을 때 이것이 반영되기까지의 딜레이가 늘어나 실시간성이 떨어집니다. 하지만 polling을 가끔 하면 더 많은 레코드를 batching할 수 있는 기회가 늘어납니다.
따라서, 만약 처리량이 너무 높아서 데이터베이스에 대한 읽기/쓰기를 버티기 어려운 상황이라면? maxRecords
와 idleTimeBetweenReadsInMillis
를 늘려 더 많은 레코드를 한 번에 batch로 처리하도록 조정해볼 생각이었습니다. 반대로 실시간 업데이트의 딜레이를 줄여야 하는 상황이라면? idleTimeBetweenReadsInMillis
를 줄이는 것으로 대응합니다.
저희는 먼저 설정을 기본값으로 유지한 채 부하 테스트를 통해 문제가 생기지 않는지 점검했습니다. k6.js 를 이용하여 DynamoDB에 레코드를 발행하고, DynamoDB Streams를 KCL을 통해 읽어나가는 애플리케이션을 구성했습니다 (0.1 vCPU, 2GiB RAM x 4 replicas). 사용량 기반 과금 개편에서 예상되는 최대 처리량은 1k ~ 2k TPS 이기 때문에 2k TPS까지 트래픽을 올려가며 테스트를 진행했습니다.
테스트 결과, 애플리케이션과 데이터베이스 모두 병목이 일어나는 지점 없이 집계 작업을 수행할 수 있었습니다. 문제가 없다는 것은 다음과 같은 지표로 확인이 가능합니다.
애플리케이션의 Profile (CPU/Memory Utilization)
데이터베이스의 Profile (CPU Utilization)
KCL에서 제공하는 metrics: [3]
RecordsProcessed
: 처리한 레코드의 수
MillisBehindLatest
: 현재 체크포인트가 스트림의 가장 최신 레코드에서 얼마나 뒤쳐져 있는지. 0 이상이 되면 처리가 밀리고 있다는 의미입니다.
RecordProcessor.processRecords.Time
: processRecords(records)
실행에 걸리는 시간. 평균적인 수치를 유지하는지, 혹은 한 번 씩 툭툭 튀는 구간이 있는지 보면서 특별히 처리가 오래 걸리는 레코드가 있는지 체크할 수 있습니다.
이 지표는 파티션(샤드)별로 따로 볼 수도 있기 때문에 전체적인 처리가 막히는지, 혹은 특정 샤드나 특정 worker에만 문제가 있는지도 확인할 수 있습니다.
예시로 문제가 있었던 상황의 Cloudwatch 대시보드입니다. 다른 샤드의 MillisBehindLatest
는 2초 내로 유지되고 있습니다. (즉, 기능 사용 이벤트가 발행되면 현재 사용량에 2초 이내에 반영된다!) 특정 샤드 하나만 뒤쳐진 정도가 증가하고 있는데, 이것은 09:30 때 발행된 레코드에서 뭔가 문제가 있어 그 샤드의 처리가 더 이상 전진하지 못하는 상황이라고 추측할 수 있습니다. 이렇게 특정 파티션에만 예외적인 상황이 발생한 경우, 그 파티션을 처리하던 인스턴스에 문제가 있는 경우가 대부분입니다. 그 프로세스에 SIGTERM을 주어 다른 프로세스로 그 파티션의 처리를 넘기는 방식으로 해결이 가능합니다.
어떤 프로세스가 어떤 샤드의 처리를 담당하는지는 lease table [4] 을 통해 확인할 수 있습니다. Lease table은 KCL이 스스로 운용하는 DynamoDB 테이블로, 어떤 worker가 어떤 샤드를 어디까지 읽었는지 기록하고 worker failure가 발생했을 때 다른 worker가 이어받을 수 있도록 coordination을 도와주는 자료구조입니다. 디버깅할 때 이 테이블의 도움을 받아 문제 상황을 파악할 수 있습니다.
Lease table 이 원인이 되었던 (개발 환경이라 재미있었던) 장애를 하나 공유드리겠습니다. Lease table도 DynamoDB 테이블이라서 과금되기 때문에, 저희는 이 테이블에 provision된 리소스를 최소한으로 배정해둔 후 그것을 까먹었습니다 😅. 본격적으로 기능 테스트를 하던 중 사용량 집계 처리가 아주 느려지는 장애가 제보되었는데, 애플리케이션 로직이나 KCL 설정값에는 문제가 없는데 왜 스트림이 앞으로 못 가지.. 라고 하면서 문제를 찾아다녔습니다. 당연히 문제는 lease table에 대한 요청이 throttle되어 업데이트가 안 되는 것이었고, 간단히 해결되었습니다 🥲
잠시 다른 이야기로 넘어갔지만, 부하 테스트 과정에서 별도로 한 번에 몇 개씩 레코드를 묶어 처리하는지 batch의 크기도 함께 측정했습니다. 이 때 진행한 실험에서는 batch의 크기가 20~100 범위에서 유지되었습니다. 따라서 매 레코드마다 데이터베이스 쓰기를 수행할 때와 비교하여 데이터베이스에 주어지는 부하가 1/20 ~ 1/100 수준으로 감소할 것을 예상할 수 있습니다.
운영 환경에서 스트림 처리에 문제가 없는지 확인하기 위해 앞서 언급한 KCL 지표를 활용하여 Cloudwatch 대시보드를 구성하고 Cloudwatch 알람을 달아두었습니다. 대시보드는 아래와 같이 구성했습니다.
SUM(RecordProcessed)
: 총 처리량을 알 수 있습니다.
트래픽이 평소보다 증가했는지, 그렇지 않았지만 문제가 생겼는지 장애 상황에서 문맥을 파악할 수 있습니다.
MAX(RecordProcessor.processRecords.Time)
: 레코드를 처리하는 루프가 지연되고 있는지 확인할 수 있습니다.
루프 내에서 데이터베이스 쓰기를 수행하기 때문에, 여기에서 문제가 있는 경우 (락이 잡혀서 특정 레코드에서 전진이 불가하다던지) 원인 파악에 도움을 줄 수 있습니다.
MAX(MillisBehindLatest)
: 스트림 처리가 지연되고 있는지 확인할 수 있습니다. 저희는 일반적인 상황에서 2초 이내의 지연 시간을 기대하고 있습니다. 처리 지연이 분 단위로 증가할 경우 실시간 사용량 기능의 정확도에 문제가 발생할 수 있으므로 알람을 설정해 두었습니다.
파이프라인을 모두 구성한 후 정확하게 작동하는지 확인하기 위해 두 가지 방법으로 점검했습니다. 첫 번째는 DynamoDB 테이블에 레코드를 넣는 publisher와 KCL consumer에서 모두 레코드 수를 기록하며 두 수치를 비교하는 실험을 진행했습니다. 두 번째는 실제 애플리케이션 로직과 함께 파이프라인을 배포하고, DynamoDB 테이블의 스캔을 통해 수동 집계한 결과와 스트림 집계 파이프라인에서 계산한 결과를 서로 비교했습니다.
먼저 첫 번째 실험의 결과부터 보겠습니다. 최종적인 정확도와 함께 실시간 정확도를 함께 점검하기 위해 아래와 같은 차트를 그릴 수 있도록 데이터를 측정했습니다.
위 그림에서 source (하얀색) plot이 publisher 입장에서의 count, stream (초록색) plot이 consumer 입장에서의 count입니다. 실시간 정확도는 두 가지 수치를 통해 볼 수 있습니다. 시간 측면에서의 delay는 publisher가 발행한 레코드가 consumer에서 처리되기까지의 시간입니다 (파란색 화살표). 처리량 측면에서의 delay는 특정 시점에서 publisher가 인지한 레코드 수와 consumer가 인지한 레코드 수의 차이입니다 (빨간색 화살표). 둘 모두 0에 가까울수록 실시간성이 높습니다. 여기에서 측정한 결과를 우리가 기획적으로 수용할 수 있는지 점검하기 위해 실험을 진행했습니다.
먼저 일반적인 상황의 plot입니다.
Subscriber(KCL consumer)의 그래프가 계단 모양인 이유는 여러 레코드를 모아 batch로 처리하기 때문입니다. 이 상황에서는 시간 딜레이는 최대 2초, 처리량 오차는 최대 200임을 확인할 수 있었습니다.
다음으로 샤드 교체 (이 링크에서 설명하는 shard lineage에 관한 내용입니다.) 상황에서의 plot입니다. 샤드 교체가 일어나는 경우, 새로운 샤드가 등장하면서 worker가 이 샤드를 구독하여 처리하기까지 coordination이 되어야 하기 때문에 약간 더 딜레이가 생깁니다. 이 상황에서는 시간 딜레이가 최대 20초, 처리량 오차는 최대 2000 정도였습니다.
이제 이 결과를 기획 회의로 들고 가서 이야기하면 됩니다. (사용자는 기능을 사용한 뒤 a) 2초 이내에 사용량에 찍히는 것을 확인할 수 있고, b) 특정 시점에서 사용량을 조회했을 때 200 정도 적게 나올 수 있다. 하루에 한 번 정도 간헐적으로 이 오차는 20초/2000 으로 늘어날 수 있다. 괜찮나요?) 이 정도 실시간 오차는 수용할 수 있고, 실제로 결제가 일어날 때는 딜레이를 모두 기다렸다가 정확한 결과를 사용하기 때문에 과금에서의 오류는 없습니다. 그래서 이 파이프라인을 이번 사용량 기반 과금 개편에서 이용할 수 있게 되었습니다.
두 번째 실험은 사용량 이벤트 레코드가 생성되는 DynamoDB 테이블을 스캔한 결과와 스트림 처리를 통해 계산한 결과가 서로 일치하는지 확인하는 것입니다. 먼저 개발 환경에서 2주 간 집계한 데이터를 점검해 보고, 그 이후 파이프라인을 운영 환경에 먼저 배포한 뒤 실제 데이터로 2주 간 측정한 결과도 점검했습니다. 측정 결과 오차가 0이었기 때문에 릴리즈 일정을 미루지 않고 그대로 진행하게 되었습니다(휴!).
DynamoDB와 DynamoDB Streams, Kinesis Client Library를 이용하여 기능 사용량을 측정하고 집계하는 파이프라인을 구성한 과정을 살펴보았습니다. 지금 이 시스템은 운영 환경에서 잘 작동하고 있으며, 월간 약 1억 5천만 건 이상의 이밴트를 오차 없이 처리하고 있습니다. 문제 정의를 다시 한 번 보며 사용량 집계 파이프라인의 역할을 다시 한 번 짚어보겠습니다.
사용자가 어떤 기능을 사용할 때마다 측정이 이루어져야 합니다.
따라서, 우리는 기능 사용을 담당하는 비즈니스 로직에서 “기능 사용 이벤트”를 발행하고, 이벤트 스트림을 구독하는 consumer에서 이벤트를 집계하여 고객이 사용한 총량을 계산합니다. → 기능 사용 이벤트는 DynamoDB에 레코드로 생성됩니다. DynamoDB Streams를 애플리케이션에서 Kinesis Client Library를 통해 읽어나갑니다. 사용량 총합을 구하는 알고리즘은 멱등성 있는 처리를 할 수 있어 애플리케이션 배포나 비정상 종료 상황에서도 손실이나 중복 집계가 일어나지 않습니다.
사용량 한도 기능:
기능 사용이 일어나는 로직에서 현재 사용량과 사용 한도를 비교할 수 있어야 합니다. → 실시간 사용량 업데이트의 오차는 시간 측면에서는 2초 이내, 처리량 측면에서는 200 이내입니다. 사용량 한도 기능은 이 실시간 사용량을 참조하여, 기능 사용 이벤트를 발행하기 전 실시간 사용량이 한도를 넘었다면 기능 사용을 막습니다.
사용량 집계가 일어나는 윈도우는 고객사마다 서로 다르며, 변경될 수 있습니다.
고객마다 결제를 원하는 날짜가 다르기 때문에, 한 달간의 사용량을 집계하더라도 그 기준일이 각자 다릅니다. → 일반적인 내용이 아니라서 이번 글에서는 다루지 않았지만, 집계 과정에서 고객사의 결제일을 고려할 수 있도록 구현했습니다.
실시간으로 변하는 사용량을 조회할 수 있어야 합니다.
→ 2번에서 언급했듯 사용자 또한 스트림 처리 과정에서 업데이트되는 실시간 사용량을 조회할 수 있습니다.
사용량이 특정 수치에 도달하는 것을 트리거로 고객에게 이메일 알림을 보내는 등 원하는 로직을 실행할 수 있어야 합니다.
→ 메인 애플리케이션에서 집계 작업이 일어나기 때문에, 특정 수치에 도달했을 때 애플리케이션에서 원하는 로직을 실행할 수 있고 기존 코드 또한 활용할 수 있습니다.
6개월 동안 새로운 과금 체계를 잘 지탱해준 파이프라인이지만, 프로덕션 환경에서 이 시스템을 운영하면서 몇 가지 문제점이 발견되었습니다. 첫 번째는 여전히 높은 관계형 데이터베이스의 부하입니다. 관계형 데이터베이스에서 사용량을 기록하는 테이블은 (KCL의 레코드 배치 처리에 의해 쓰기 효율이 높아졌지만!) 읽기와 쓰기 모두 많이 실행됩니다.
지금 RDS의 부하 상당수가 이 테이블에 대한 읽기 및 쓰기에서 발생하고 있으므로, 잘 해결할 수 있다면 RDS 스케일 다운으로 비용을 절약할 수 있다고 생각합니다. 두 번째는 역시 관계형 데이터베이스 문제로 (역시 문제를 일으키는 상습범 😓) 트랜잭션 업데이트가 사용량 업데이트에 의해 자주 수행되면서 vacuum이 원활하지 않을 때 RDS의 성능이 급격하게 저하되는 문제가 발생합니다.
이 문제점에 대해서는 다른 블로그 글 @Transactional의 해로움 에서 다뤘으니 읽어보셔도 좋을 것 같아요. 관계형 데이터베이스로 PostgreSQL를 사용하고 있는데, PostgreSQL에서 update는 기존 tuple에 삭제 표시만 하고 새로운 값을 가지는 tuple을 추가하는 방법으로 구현되기 때문에 집계처럼 update가 자주 일어나는 workload에는 적절하지 않을지도 모르겠네요.
그렇다면 다음 단계는 무엇일까요? Apache Kafka와 같은 스트림 처리 시스템은 관계형 데이터베이스에 고통받지 않아도 되도록 높은 처리량과 오차 없는 집계(정확히 한 번 처리) 보장을 제공한다고 알려져 있습니다. 사용량 기반 과금 업데이트 당시에는 기존에 사용하지 않았던 아키텍처를 검증하고 실험할 시간이 부족했기 때문에 이번 글에서 다룬 구조를 선택했습니다. 하지만 지금 팀에서는 새로운 기술이 기존 시스템을 개선할 수 있는 대안이 되지 않을까 고민하고 있습니다.
We Make a Future Classic Product
채널팀과 함께 성장하고 싶은 분을 기다립니다