Mirai Translate TECH BLOG

株式会社みらい翻訳のテックブログ

Kafka Consumer の TopicAuthorizationException に立ち向かう

この記事は、みらい翻訳のカレンダー | Advent Calendar 2022 - Qiita の24日目です。

こんにちは。プラットフォーム開発部でリードエンジニアをしているchanceです。

アドベントカレンダーがまだ空いていそうなので滑り込みでもう一記事書いてみました。

私たちのチームでは、AWS の MSK と Debezium を用いて CDC を利用したデータマイグレーションを構築しています。

今回は MSK とそれを利用する Kafka Consumer に関するトラブルシューティングのお話です。

環境構成

まず前提となる構成についてお話しします。

CDC アーキテクチャ

CDC を構築することで、単なるレプリケーションのみでなく Consumer でさまざまな加工ができますし、別の Consumer を増やすことでそれぞれが独立したデータソースを構築することが可能です。

CDC の仕組みについては、詳しくは過去の弊社ブログも併せてご覧頂けたら幸いです。

miraitranslate-tech.hatenablog.jp

この後のお話に関わるため、構成情報をまとめておきます。

  • MSKの構成
    • インスタンスタイプ:t3.small
    • ブローカー数:3
    • 接続認証方式:IAM認証
  • Producerの構成
    • Debezium
    • ECS on Fargate
  • Consumerの構成
    • Spring Boot + Spring Kafka
    • ECS on Fargate

Kafka Consumer でエラー発生

商用環境にリリースして以降、Consumer で数日に一回程度の頻度で以下のエラーが発生していました。

{
    "@timestamp": "2022-11-28T19:01:04.657Z",
    "message": "Authentication/Authorization Exception and no authExceptionRetryInterval set",
    "logger_name": "org.springframework.kafka.listener.KafkaMessageListenerContainer",
    "level": "ERROR",
    "stack_trace": "org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [{トピック名}]\n"
}

StackTrace から TopicAuthorizationException という例外が確認できます。こちらのエラーが発生すると、Consumer の処理は停止するのですが、 Java プロセスは起動したままとなります。Java が生きているため ECS のコンテナも再起動されず、手動での再起動が必要な状態となってしまっていました。

Spring-Kafka のデフォルトの仕様として、認証の例外については致命的とみなしてコンテナ(Kafka のリスナーコンテナ)を停止させるようです。

docs.spring.io

authExceptionRetryInterval

When not null, a Duration to sleep between polls when an AuthenticationException or AuthorizationException is thrown by the Kafka client. When null, such exceptions are considered fatal and the container will stop.

−−−

nullでない場合、KafkaクライアントによってAuthenticationExceptionまたはAuthorizationExceptionがスローされたときのポーリング間のスリープ時間です。nullの場合、このような例外は致命的と見なされ、コンテナは停止します。

暫定対処

通信自体のリトライ設定は入れていましたが、AuthException という例外発生による処理停止に関しては考慮が不足していました。

CDC のしくみはそもそも非同期処理であり、今回のユースケースにおいては迅速に復旧すればサービス影響は全くないのですが、手動での再起動が必要とあってはレジリエンス(回復力)の観点で好ましくありません。

エラーログに従い authExceptionRetryInterval を設定することで暫定対処を行うことにしました。

しかし、Spring-Kafka の実装上この設定値はプロパティファイルやアノテーションでは設定できないようです。以下のように ContainerCustomizer で直接指定しました。 (こちらのリトライ設定は試行回数上限がないため、運用面など十分考慮の上での設定が必要です。)

ContainerCustomizer<String, Message, ConcurrentMessageListenerContainer<String, Message>> containerCustomizer(
    ConcurrentKafkaListenerContainerFactory<String, Message> factory) {

    ContainerCustomizer<String, Message, ConcurrentMessageListenerContainer<String, Message>> customizer =
        container -> {
            // 実際には数値はプロパティや環境変数などで注入。
            container.getContainerProperties()
                .setAuthExceptionRetryInterval(Duration.ofMillis(60000));
        };
    factory.setContainerCustomizer(customizer);

    return customizer;
}

この対応によってエラーが起きても復旧されるようになりましたが、発生原因の特定など根本解決には至っていないため、引き続き調査を行いました。

MSK での警告発生

Consumerのエラーとは別途、MSK 側でも Broker のログに数日おきに不審なログが散見されました。

[2022-11-15 03:11:45,563] WARN Unable to reconnect to ZooKeeper service, session 0x20000003e13000a has expired (org.apache.zookeeper.ClientCnxn)
[2022-11-15 03:11:45,563] WARN Exception caught (org.apache.zookeeper.ClientCnxnSocketNetty)
org.apache.zookeeper.ClientCnxn$SessionExpiredException: Unable to reconnect to ZooKeeper service, session 0x20000003e13000a has expired
at ...(StackTrace省略)

WARN レベルではあるものの、Unable to reconnect to ZooKeeper service というエラーで、通信がうまくいっていないように見えます。こちらはしばらくすると自然復旧しており、3-Broker 構成としていることで Kafka としての可用性には大きな影響は出ていないようでした。

しかし、原因不明なままではいつ大問題に発展するかわかりません。また、毎回同時ではないのですが、先述の Consumer 側での認証エラーと同じタイミングで発生することもあり、両者の関連も気になりました。

原因調査

調査1. MSK

MSK にトラフィックが大量に流れたり、特定の時間帯だったりといった明確な発生契機はありませんでしたが、数日に一度は発生しており、何かしらが不安定であることは間違いありません。

ここで、事象の発生時間帯におけるリソースグラフを注意深くみることで、怪しい挙動が見えてきました。こちらのグラフは、上がMSKのCPU使用率、下がメモリの使用量になります。

MSK Broker(1〜3号機) CPU使用率

MSK Broker(1号機) メモリ使用量

それぞれ1分単位での平均のグラフになりますが、これを5分平均にしているとここまで顕著な挙動が見えません。また、メモリの方は MSK のマネジメントコンソール画面ではグラフ化されておらず、CloudWatch Metrics の方から Memory の Used / Buffered / Cache / Free をスタック表示する必要があります。監視のメトリクス選定や集計単位については、いろいろと熟慮が必要になりそうです。

Kafka は JavaScala で実装されており、JVM で動いています。上記の挙動からすると GC による Stop the World が起きていたのではないかと推測されます。t3.small ではメモリが 2GB しかないのですが、Kafka を商用運用する上ではこれは少なすぎるようです。

調査2. Consumer

TopicAuthorizationException という認証系の例外であり、Consumer から Kafka への接続方式として IAM 認証を用いていたことから、IAM 認証情報の更新タイミングも疑いました。しかし、IAM 認証情報の保持期限は 1 時間に設定されており、仮にこれが原因とするともう少し高頻度で発生してもよさそうです。

状況証拠のみでは原因特定に至らず、例外の発生箇所から Kafka のソースコードを追ってみることにしました。

まずはERRORログの出力箇所を手がかりにします。 Authentication/Authorization Exception and no authExceptionRetryInterval set のログはSpring-Kafkaのコード内で出力されています。

spring-kafka/KafkaMessageListenerContainer.java at main · spring-projects/spring-kafka · GitHub

一部省略しつつ、抜粋します。

// KafkaMessageListenerContainer.java
 public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
        extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {

        public void run() {
            initialize();
            Throwable exitThrowable = null;
            boolean failedAuthRetry = false;
            while (isRunning()) {
                try {
                    pollAndInvoke();
                    if (failedAuthRetry) {
                        publishRetryAuthSuccessfulEvent();
                        failedAuthRetry = false;
                    }
                }
                catch (NoOffsetForPartitionException nofpe) {
                    // (...省略)
                }
                catch (AuthenticationException | AuthorizationException ae) {
                    if (this.authExceptionRetryInterval == null) {
                        ListenerConsumer.this.logger.error(ae,
                                "Authentication/Authorization Exception and no authExceptionRetryInterval set");
                        this.fatalError = true;
                        exitThrowable = ae;
                        break;
                    }
                    else {
                        ListenerConsumer.this.logger.error(ae,
                                "Authentication/Authorization Exception, retrying in "
                                        + this.authExceptionRetryInterval.toMillis() + " ms");
                        publishRetryAuthEvent(ae);
                        failedAuthRetry = true;
                        // We can't pause/resume here, as KafkaConsumer doesn't take pausing
                        // into account when committing, hence risk of being flooded with
                        // GroupAuthorizationExceptions.
                        // see: https://github.com/spring-projects/spring-kafka/pull/1337
                        sleepFor(this.authExceptionRetryInterval);
                    }
                }
                // (...省略)
                finally {
                    clearThreadState();
                }
            }
            wrapUp(exitThrowable);
        }

Spring-Kafkaとしては既に発生しているExceptionを拾っているだけのようです。

次に、ERRORログの直前で出力されているWARNログにも着目しました。

{
    "@timestamp": "2022-11-28T19:01:04.657Z",
    "message": "[Consumer clientId={クライアントID}, groupId={グループID}] Not authorized to read from partition {パーティション名}.",
    "logger_name": "org.apache.kafka.clients.consumer.internals.Fetcher",
    "level": "WARN",
}

Not authorized to read from partition のログが実装されているのは(Spring-Kafkaでなく)Kafkaのソースコードになります。

kafka/Fetcher.java at master · a0x8o/kafka · GitHub

Fetcher クラスなので、Consumer から Kafka への Fetch のタイミングで発生しているようです。こちらも関連箇所を一部のみ抜粋してみます。

private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetch) {
    TopicPartition tp = nextCompletedFetch.partition;
    FetchResponseData.PartitionData partition = nextCompletedFetch.partitionData;
    long fetchOffset = nextCompletedFetch.nextFetchOffset;
    CompletedFetch completedFetch = null;
    Errors error = Errors.forCode(partition.errorCode());

    try {
        if (!subscriptions.hasValidPosition(tp)) {
            // 省略...
        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                //we log the actual partition and not just the topic to help with ACL propagation issues in large clusters
                log.warn("Not authorized to read from partition {}.", tp);
                throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
        }
        //省略...
}

Fetchした結果、レスポンスのpartitionDataに入っているエラーコードで判別して TopicAuthorizationException をスローしているようです。

さらにもう少しソースコードを読んでいき、TopicAuthorizationException 発生時の処理概要をまとめてみました。

TopicAuthorizationException発生時の内部動作

つまり、Consumer は(通信上は)正常に返されたレスポンスを見て例外と判断しているだけで、本当にエラーが起きていたのは Broker 側ということがわかりました。

Broker側が原因となると、調査1. の MSK の不安定な挙動が影響していそうです。発生時刻が必ず一致しないとはいえ、別々だった二つの事象は根本的には同一の原因である可能性が高まります。

恒久対応

ということで、恒久対応としては MSK のインスタンスサイズを上げることにしました。恒久対応を実施した途端にエラーの発生はパタリと止んだため、対策としては効果があったようです。

しかし、2週間ほど運用したところでまた TopicAuthorizationException が 1 件だけ発生しました。ただし、以前のように GC 起因と思われるリソース消費なども見られず、おそらくネットワークの瞬断ではないかと推測しています。このことからも、暫定対応で実施したリトライ設定はレジリエンスの観点から引き続き設定しておくべきと考えられます。

まとめ

MSK と Kafka Consumer を運用していてのエラー発生事例と、調査内容をシェアさせて頂きました。

今回は以下の学びを得られました。

  • MSK の商用運用は t3.small は避けるべき。
    • パーストインスタンスのため CPU を気にしがちだが、Kafka においてはメモリの方が足りない。
  • Consumer の AuthException 例外は Broker 側が起因。
    • Broker側を安定させれば発生頻度は落ち着く。
    • ただし、発生件数はゼロにはならないため要注意。
    • 特に Spring Kafka の場合、デフォルトでConsumerが停止してしまうためリトライ設定を入れておく。

T系インスタンスは商用環境で利用すべきでないというのは基本的なプラクティスですが、今回は CDC の導入し始めでトラフィック量も少なく、検証環境での性能試験では大きな問題は見られなかったため、コスト面を重視してミニマムに構築していました。

本格的に MSK を導入する場合はそもそも最初からT系インスタンスにすることはないでしょうし、そういった背景からか調べてみても事例などがあまり見当たりませんでした。お恥ずかしい失敗ではありますが、事例共有も少ないので勇気を持ってご紹介したいと思います。

本件がどなたかのお役に立ちましたら幸いです。

最後に

みらい翻訳では、エンジニアを募集しています。

ご興味のある方は、ぜひ下記リンクよりご応募・お問い合わせをお待ちしております。

miraitranslate.com