この記事は、みらい翻訳 Advent Calendar 2022 の21日目です。
こんにちは。プラットフォーム開発部でアプリエンジニアをしているeinです。
本記事では、MSK (Managed Streaming for Apache Kafka) のConsumerとしてのLambdaを実装してみます。
趣旨
本記事で言いたいこと
re:Invent2022でEventBridge Pipesという新しいイベント伝搬の仕組みが発表され、話題になっています。本記事ではPipesを利用してMSKをイベントソースとするLambdaを実装し、従来のトリガーを利用したLambdaとの比較をします。
本記事で言わないこと
MSKやPipesの実装方法の詳細は紹介しません。
トリガーを利用したLambda
Pipesがリリースされる以前から、MSKをトリガーとするLambdaを実装することができていました。まずは、このMSKトリガーを利用したLambdaについて述べます。
Lambdaの実装方法
チュートリアルに沿っていけば実装できます。端的には、主に以下の条件を満たしたLambdaにトリガーを追加することでMSKのConsumerとして動作させられます。
- LambdaをMSKと同一VPCに設置する
- LambdaにMSKへのアクセスを許可するIAMポリシーを与える
- LambdaにMSKへのインバウンドを許可したSecurity Groupを与える
他にも細かな条件が公式ドキュメントに書いてあります。
Lambdaのサンプルコード
標準出力にKafkaメッセージ内容を出力するLambdaを実装してみます。ここではKafkaメッセージのスキーマはJSONであるとしましょう。
ランタイムはNode.jsとします。ごく最低限の実装は以下になります。ビジネスロジックの前に、メッセージのNULLチェック、Base64デコード、JSONパースといった前処理が必要です。
Pipesを利用したLambda
続いて、Pipesを利用してMSKのConsumerとしてのLambdaを実装します。
Pipesの実装方法
EventBridgeマネジメントコンソールにPipesのページが追加されています。「パイプを作成」を実行すると作成できます。
Pipesには、ソース・フィルタリング・エンリッチメント・ターゲット、の4つのコンポーネントが存在します。エンリッチメントは特徴的な機能で、ターゲットにイベントを渡す前にLambdaやStepFunctionsを経由してイベントを加工することができます。
本記事では、エンリッチメントにBase64デコード等の前処理を任せるLambdaを仕込んでみます。
Lambdaの実装方法
エンリッチメントやターゲットに指定するLambdaに、特に目立つ制約はありません。VPCに設置する必要はなく、IAMも自身が必要とするだけ(本記事ではCloudWatchLogsの許可)のポリシーを与えればよいです。
IAM認証はPipesが引き受けてくれています。Pipesを作成する際に実行IAMロールをデフォルトにすると、MSKへのアクセス許可やLambda呼び出し許可を備えたIAMロールとポリシーが自動作成され、Pipesに付与されます。
VPCやSecurityGroupを指定していない点は不思議です。PipesにもVPC等は設定していません。内部でよしなにしてくれる様です。本番採用の際にはアーキテクチャを調べる必要があるでしょう。
エンリッチメントLambdaのサンプルコード
エンリッチメントにBase64デコードやJSONパースを任せてみます。ここでもKafkaメッセージのスキーマはJSONであるとしましょう。
ターゲットLambdaのサンプルコード
ターゲットの実装は、エンリッチメントから渡されたJSONを表示するのみになります。
トリガーの場合と比較して、ボイラープレートをエンリッチメントに押し付けることで、ビジネスロジックに集中できています。また、エンリッチメントは流用できるので、複数のConsumerを作りたい際にDRYなコードを書きやすくなっています。
まとめ
- Pipesを利用してMSKのConsumerとしてのLambdaを実装する場合、トリガーよりも簡単に実装できる
- PipesのエンリッチメントLambdaに前処理を任せることで、ターゲットLambdaの責務を極小にできる
余談
今年2022年には、LambdaとMSKの連携に関するリリースがいくつか発表されています。2022年3月にLambdaにおけるMSKのIAM認証がサポートされました。さらに、2022年8月にはLambdaに任意のコンシューマグループIDを設定できるようになりました。
後者のコンシューマグループID対応は個人的に特に嬉しいリリースです。
これまでは、Lambdaもしくはトリガーが変わるとコンシューマグループIDも変わってしまっていました。つまり、特定のトピックを読み取るLambdaやトリガーを作り変えたい場合に、オフセットを維持してくれませんでした。
コンシューマグループIDを設定できればオフセットを維持できます。Pipesという実装の選択肢も増え、LambdaでKafka Consumerを実装する魅力がとても増していると思います。
We are hiring!
みらい翻訳では、エンジニアを募集しています。
ご興味のある方は、ぜひ下記リンクよりご応募・お問い合わせをお待ちしております。