こんにちは! ROBOT PAYMENT サブスクペイのシステム基盤チームの 高尾 です。
システム基盤チームではアプリケーションやシステムインフラの基礎部分の構築や管理、メンテナンス、性能改善などを行っています。
今回はAWS LambdaでAthenaを使い、SQSの未処理メッセージ(Lambdaトリガーに渡されていないメッセージ)がわかるようにした経緯と考え方について話をしようと思います。
はじめに
サブスクペイでは決済処理の都度、決済処理をした旨をユーザーや加盟店にメール通知するようにしています。
決済処理~メール通知の流れをざっくり書くと下記の通りとなります。
- サーバでメッセージの種類ごとにMessageGroupIdをつけた決済通知メッセージ(宛先、差出人、件名、本文)をSQSキューに渡す
- SQSキューから決済通知メッセージをLambdaに渡す
- Lambdaから外部メールサービスに決済通知メッセージを渡す
決済通知メッセージは当然ながら、サブスクサービスの決済処理時(毎月5日など)に作成・送信されます。
したがって会員数が多かったり増加しているサブスクサービスの決済日は、大量のメール送信処理が必要となります。
そして案の定、SQSキューと連携しているLambdaトリガーの処理が一時的に間に合わなくなることがありました。 メール送信の遅延が発生したのです。
キュー内のメッセージ確認方法を探る
すぐにメール送信の遅延対策を進めることになったのですが、同時にそれとは別にキューで待機しているメッセージが どのMessageGroupIdに多く見られるのか把握する必要がある、との話が出てきました。
キュー内のメッセージはコンソール上やAWS CLIから確認することは可能ですが、AWSの仕様上10件が限界です。
参考:https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/step-receive-delete-message.html
何百件、何千件と待機メッセージが発生した場合には全容把握はとても間に合いません。
万事休すと思われたのですが、しばらくしてあるアイディアが浮かびました。
Athenaを使い、サーバのSQSへのメッセージ送信ログとLambdaトリガーのログをトレースIDで比較する!
実は、サーバのメッセージ送信ログはログ保管用のS3バケットに送信されていました。
そこでLambdaトリガーのログについてもS3バケットに送信することで、Athenaを使って比較ができるのではないかと考えたのです。
サーバのメッセージ送信ログとLambdaトリガーの処理ログにはトレースIDが共通して出力するようにしてありました。
つまりあるトレースIDがサーバログにだけ存在する場合、そのトレースIDのメッセージはまだキュー内で待機中ということになります。
確認方法を実装する
キュー内の待機メッセージ確認については、新たに別のLambda関数(以下、確認用Lambda)を作成して進めていきました。
確認用Lambdaのコードは大きく分けて以下の処理で構成しています。
- # Athena実行(サーバログ取得 → Lambdaトリガーログ取得 → 各ログのトレースID照合 → ログ絞り込み)
- # MessageGroupIdごとの待機メッセージ数を算出
- # CloudWatchでカスタムメトリクスを作成(MessageGroupIdごとの待機メッセージ数をグラフ化)
これをEventBridgeで定期実行するよう設定することで、どのMessageGroupIdの待機メッセージが多いのか把握しやすくなるというわけです。
※注意点※
ここで算出したキューでの待機メッセージはあくまでも確認用LambdaでAthenaを実行した瞬間の数値です。
キュー内のメッセージは、Lambdaが同時実行数の限界に達さない限りLambdaトリガーへ流れ続けるので、Athenaの実行タイミングによっては結果が変わってくる場合もあります。
ここまで待機メッセージ数を求める考え方を記載しましたが、肝心の待機メッセージそのものはどうやって確認するのかと思われた方もいると思います。
実はその答えとしては、確認用Lambdaコードの「# Athena実行」部分をほぼそのままAthenaで手動実行すれば良いということになります。
確認用LambdaはAthenaを実行し、そこで割り出したキュー内の各待機メッセージ情報(トレースID、MessageGroupId等)のうち、MessageGroupIdの個数を算出するものです。
当初は待機メッセージの中身についてはDynamoDBに書き込むことも考えたのですが、メッセージがキュー内で待機し続けること自体が頻度の低い事象であるため、手動でAthenaを操作する方向にしました。
以上、SQSの未処理メッセージ把握についての考え方でした。
ご参考になれば幸いです。
最後までお読みいただきありがとうございました。
We are hiring!!
ROBOT PAYMENTでは一緒に働く仲間を募集しています!!!
speakerdeck.com
www.robotpayment.co.jp