Mirai Translate TECH BLOG

株式会社みらい翻訳のテックブログ

EventBridge Pipesを利用してKafka Consumer Lambdaを実装する

この記事は、みらい翻訳 Advent Calendar 2022 の21日目です。

 

こんにちは。プラットフォーム開発部でアプリエンジニアをしているeinです。

本記事では、MSK (Managed Streaming for Apache Kafka) のConsumerとしてのLambdaを実装してみます。

趣旨

本記事で言いたいこと

re:Invent2022でEventBridge Pipesという新しいイベント伝搬の仕組みが発表され、話題になっています。本記事ではPipesを利用してMSKをイベントソースとするLambdaを実装し、従来のトリガーを利用したLambdaとの比較をします。

本記事で言わないこと

MSKやPipesの実装方法の詳細は紹介しません。

トリガーを利用したLambda

Pipesがリリースされる以前から、MSKをトリガーとするLambdaを実装することができていました。まずは、このMSKトリガーを利用したLambdaについて述べます。

Lambdaの実装方法

チュートリアルに沿っていけば実装できます。端的には、主に以下の条件を満たしたLambdaにトリガーを追加することでMSKのConsumerとして動作させられます。

  • LambdaをMSKと同一VPCに設置する
  • LambdaにMSKへのアクセスを許可するIAMポリシーを与える
  • LambdaにMSKへのインバウンドを許可したSecurity Groupを与える

他にも細かな条件が公式ドキュメントに書いてあります。

Lambdaのサンプルコード

標準出力にKafkaメッセージ内容を出力するLambdaを実装してみます。ここではKafkaメッセージのスキーマJSONであるとしましょう。

ランタイムはNode.jsとします。ごく最低限の実装は以下になります。ビジネスロジックの前に、メッセージのNULLチェック、Base64デコード、JSONパースといった前処理が必要です。

exports.handler = async (event) => {
    // event.records にイベントオブジェクトの配列が渡される
  for (let key in event.records) {
  event.records[key].map((record) => {
    // record.value がKafkaメッセージ本体
      if (!record.value) {
      // ガード節
        console.log("Message is null");
        return;
      }

   
     // EventBridgeはメッセージをBase64エンコードする
      const value = JSON.parse(Buffer.from(record.value, "base64").toString());

// ビジネスロジック
      console.log(value);
    })
  }
};

// 出力例
// { "hello": "world" }

Pipesを利用したLambda

続いて、Pipesを利用してMSKのConsumerとしてのLambdaを実装します。

Pipesの実装方法

EventBridgeマネジメントコンソールにPipesのページが追加されています。「パイプを作成」を実行すると作成できます。

Pipesには、ソース・フィルタリング・エンリッチメント・ターゲット、の4つのコンポーネントが存在します。エンリッチメントは特徴的な機能で、ターゲットにイベントを渡す前にLambdaやStepFunctionsを経由してイベントを加工することができます。

本記事では、エンリッチメントにBase64デコード等の前処理を任せるLambdaを仕込んでみます。

Pipes作成画面

Lambdaの実装方法

エンリッチメントやターゲットに指定するLambdaに、特に目立つ制約はありません。VPCに設置する必要はなく、IAMも自身が必要とするだけ(本記事ではCloudWatchLogsの許可)のポリシーを与えればよいです。

IAM認証はPipesが引き受けてくれています。Pipesを作成する際に実行IAMロールをデフォルトにすると、MSKへのアクセス許可やLambda呼び出し許可を備えたIAMロールとポリシーが自動作成され、Pipesに付与されます。

VPCやSecurityGroupを指定していない点は不思議です。PipesにもVPC等は設定していません。内部でよしなにしてくれる様です。本番採用の際にはアーキテクチャを調べる必要があるでしょう。

エンリッチメントLambdaのサンプルコード

エンリッチメントにBase64デコードやJSONパースを任せてみます。ここでもKafkaメッセージのスキーマはJSONであるとしましょう。

export const handler = async(events) => {
  // トリガーとはイベントの形式が異なる
// eventsにイベントオブジェクトの配列が入っている
// 配列の長さの最大はBatchSize
  return events.map((event) => ({
   // { "decoded": "デコードしたJSON" } をターゲットに渡す
   decoded: event.value
     ? JSON.parse(Buffer.from(event.value, "base64")).toString()
     : { decoded: {} }
  }));
};

ターゲットLambdaのサンプルコード

ターゲットの実装は、エンリッチメントから渡されたJSONを表示するのみになります。

トリガーの場合と比較して、ボイラープレートをエンリッチメントに押し付けることで、ビジネスロジックに集中できています。また、エンリッチメントは流用できるので、複数のConsumerを作りたい際にDRYなコードを書きやすくなっています。

export const handler = async(events) => {
// { "decoded": "デコードしたJSON" } の配列がエンリッチメントから渡ってくる
events.map((event) => {
  // ビジネスロジック
   console.log(event.decoded);
});
};

// 出力例
// { "hello": "world" }

まとめ

  • Pipesを利用してMSKのConsumerとしてのLambdaを実装する場合、トリガーよりも簡単に実装できる
  • PipesのエンリッチメントLambdaに前処理を任せることで、ターゲットLambdaの責務を極小にできる

余談

今年2022年には、LambdaとMSKの連携に関するリリースがいくつか発表されています。2022年3月にLambdaにおけるMSKのIAM認証がサポートされました。さらに、2022年8月にはLambdaに任意のコンシューマグループIDを設定できるようになりました。

後者のコンシューマグループID対応は個人的に特に嬉しいリリースです。

これまでは、Lambdaもしくはトリガーが変わるとコンシューマグループIDも変わってしまっていました。つまり、特定のトピックを読み取るLambdaやトリガーを作り変えたい場合に、オフセットを維持してくれませんでした。

コンシューマグループIDを設定できればオフセットを維持できます。Pipesという実装の選択肢も増え、LambdaでKafka Consumerを実装する魅力がとても増していると思います。

We are hiring!

みらい翻訳では、エンジニアを募集しています。

ご興味のある方は、ぜひ下記リンクよりご応募・お問い合わせをお待ちしております。

miraitranslate.com