アプリケーションエンジニアのid:tkzwtksです。今回はバッチ処理の冪等性(べきとうせい、idempotence)について、どう考えるか/考えてきたかをご紹介します。
このエントリを書くきっかけとなったのは、はてなエンジニア有志で定期的に開催しているCloudNative推進会です。ここでは、社内のシステムをクラウドネイティブにしていくため「クラウドネイティブなシステムとはどういうものか?」を考えており、この会での「クラウドネイティブなバッチ処理」の議論も踏まえつつ説明していきます。
バッチ処理における冪等性とは
冪等性とは「ある同じ操作を何度行っても、いつも同じ結果が得られる性質」です。これは処理が成功した場合だけではなく、エラーや不整合の状態も含めて同じ結果になるということです。
バッチ処理における「ある同じ操作」とは、バッチ処理を起動させるメッセージ送信のことを指します。つまり、同じ起動メッセージが何度送信されても、得られる結果が同じになる処理が、冪等なバッチ処理ということになります。
メッセージ送信の信頼性を考慮する
なぜバッチ処理を冪等にしたいのでしょう。例えば、同じ起動メッセージが何度も送信されてしまったときのことを考えてみます。
メッセージごとにバッチが起動されてその処理が冪等でなかった場合、本来ならバッチが一度だけ起動された結果が期待されたにもかかわらず、(処理内容によりますが)複数回の処理が実行されて正しくない結果になってしまいます。
これは、起動メッセージ送信の信頼性がExactly Once(厳密に1回)、つまりcrontabのように一度だけ起動されることが保証される環境ではそれほど問題になりません。確実に1回だけ起動されるので、冪等性を検討する必要性は低くなります。
一方で、信頼性がAt Least Once(少なくとも1回)、つまりバッチ処理が1回以上起動されうる環境においては、冪等性について優先度高く検討する必要があります。
クラウドネイティブで可用性を高めるために
ここで、クラウドネイティブなシステムにおける冪等性について考えてみます。
例えば、Amazon CloudWatch Eventsのようなマネージドサービスを組み込むことで、冗長性やスケーラビリティの高いシステムを作ることができます。また、CloudWatch Eventsはドキュメントにも複数回トリガーされる場合があると明記されています。
▶ 1つのイベントに応じてルールが複数回トリガーされました。…… – CloudWatch Eventsのトラブルシューティング
ほかにもAWS LambdaやAWS SQSなど、At Least Onceなクラウドサービスはいくつも存在します。こうしたサービスで実装コストを抑えた可用性の高いシステムを実現するには、バッチ処理を冪等に実装することになります。
クラウドネイティブな環境を推進していく上で、バッチ処理の冪等性を考慮することは避けられないでしょう。
どのような場合に冪等性を考慮すべきか
冪等性は全ての処理で必須の性質というわけではなく、求められる要件によっては考慮する必要がありません。
例えば「当日午前0時以降の全てのアクセス数を集計する」という要件ではどうでしょうか? 継続的にアクセスされるのであれば、実行ごとに結果は増加するはずです。そのため冪等な処理にする必要はなく、実装も必要ありません。
一方で「当日の午前0時から午前6時までのアクセス数を集計する」という処理ではどうでしょう。「当日午前0時から午前6時までのアクセス」は不変なものであり、集計処理ごとに変化するのはおかしいはずです。この場合は、冪等性を考慮しながら実装する必要があるでしょう。
冪等な実装における3つのケーススタディ
ここで、よくありがちな実装パターンについて説明します。
なお、コード片をいくつか提示しますが、あくまで概念を説明する簡易なコードです。実際の運用では、データベース(以下、DB)のトランザクションや起動時の排他処理について考慮する必要があるでしょう。
ケース1: n分前までに更新されたレコードを集計する
入力が変化すると、当然ですが前提条件が変化するため、冪等な処理になりません。
よく見かけるバッチ処理に「実行時刻からn分前までに更新されたレコードを対象に集計する」ものがあります。簡単なGoのコードで表現すると以下のようになります。
func main() { // 実行時刻を集計範囲の終了時刻とする end := time.Now() // 実行時刻から1時間前を集計範囲の開始時刻とする start := now.Add(time.Minute * -60) // 集計 result, err := aggregate(start, end) if err != nil { return } fmt.Println(result) }
このように、実行開始時にバッチ処理内で対象の範囲を決めるコードはしばしば見かけますが、これでは実行ごとに抽出範囲が変わってしまうため、結果として入力が変化することと同様な状態になり、冪等な処理にはなりません。
この場合、バッチ処理では開始時刻と終了時刻を指定できるように実装し、起動メッセージで開始時刻と終了時刻を送信することで、冪等にできます。これをGoのコードで簡単に表現すると以下のようになるでしょう。
func main() { // 集計範囲の開始/終了を環境変数で渡す start := flag.String() end := flag.String() result, err := aggregate(start, end) if err != nil { fmt.Errorf("%w", err) return } fmt.Println(result) }
この例では開始と終了を環境変数で受け取って、集計処理に渡しています。
ちなみに「実行時刻からn分前までに更新されたレコードを対象」にする場合、集計の終了時刻さえ渡せば自動的に開始時刻も決まりますが、運用上はバッチ処理内で範囲を決めるより、起動時に渡すほうが扱いやすくなります。定期実行以外のイレギュラーな作業で集計範囲を少し変えたいときなどに、実装の修正が必要なくなるためです。
このように相対的な時刻を入力に使う場合は、範囲を決めるのをバッチ処理の仕事にしないなど、注意が必要です。
ケース2: DB上の対象レコードを更新する
ケース1では、対象を参照して集計する処理について述べました。参照するだけでなく対象を更新する場合は、当然ですがさらに注意が必要です。何も対策しなければ実行するたびにDBが更新され、結果が変わってしまいます。
このケースでは、更新した対象が更新済みであると分かるようにしておいて、再実行の際には更新済みの対象を避ける方法が考えられます。コード例は次の通りです。
func main() { // targetsは更新対象 for _, v := range targets { if shouldSkip(v) { continue } update(v) setCompleted(v) } }
また別の考え方として、再実行時にも同じ値で更新するという方法があります。
例えば「公開開始時刻」と「公開終了時刻」を持つレコードがDB上にあるとして、あるタイミングで終了時刻を一括で更新する処理を考えます。このバッチ処理の引数として終了時刻を渡すようにすると、毎回同じ値で更新されるため、結果として処理は冪等になります。コード例は次の通りです。
func main() { // 環境変数で終了時刻を渡す closed_at, _ := time.Parse(time.RFC3339, os.Getenv("CLOSED_AT")) // targetsは更新対象 // update内では "UPDATE table_a SET closed_at = :closed_at WHERE id IN(1, 2, 3...)" のようなクエリが実行され、 // 一括で更新される update(targets, closed_at) }
なお、ケース1と同様に、相対的な値を使うと冪等にならない可能性があることには注意が必要です。先程の例で、終了時刻を引数で渡さず「実行時刻から1週間後に終了する」という処理にすると、実行するごとに終了時刻が1週間後に伸びてしまいます。
また見逃しやすい点として、レコードが更新時刻を持っている場合にバッチ処理を再実行すると、更新時刻だけ変わってしまって結果として冪等にならないことがあります。簡単な例を挙げると以下のようなクエリです。
UPDATE table_a SET closed_at = :closed_at, updated_at = NOW() WHERE id IN(1, 2, 3);
上記のクエリでは、終了時刻(closed_at
)は外から渡せるようになっているものの、同時にupdated_at
をNOW()
で更新してしまうため、冪等になりません。
「更新時刻が変わったほうが、いつバッチ処理が実行されたか分かりやすい」という考え方もあるとは思いますが、その場合はバッチ処理の実行ログで分かるようにしておくほうがよいでしょう。
ケース3: 対象ユーザーにメールを送信する
「対象となるユーザーを何らかの条件で抽出して、メールを送信する」というバッチ処理を考えます。仮に対象者の抽出処理は冪等だったとしても、その後のメール送信で特に何も対策しなければ、実行ごとにメールを送信してしまうことになります。
これはケース2と同様に、送信済みのユーザーを記録しておき、再実行の際には送信処理をスキップするという実装によって、同じメールを何度も送信してしまうことを回避できます。以下のコードが実装例です。
func main() { // targetsは更新対象 for _, mail := range targets { if hasSent(mail) { continue } // メール送信に成功したらerrはnil err := sendMail(mail) if err != nil { setError(mail, err) continue } setHasSent(mail) } }
送信済みユーザーの記録にはDBだけでなく、テキストファイルなども利用できます。
なお、今回はメール送信を例に挙げていますが、処理中に外部のシステムと連携する場合、連携先のシステムが冪等だとは限らないため、注意が必要です。
はてなダイアリーからはてなブログに自動移行
ここで、はてなにおける冪等なバッチ処理の実例として、はてなダイアリーからはてなブログへの自動移行を取り上げて説明しようと思います。これは、はてなダイアリーのサービス終了にともなって2019年3月から7月にかけて実施されました。
- 2019年3月7日をもって、はてなダイアリーの閲覧を除いた全ての機能を停止しました。はてなダイアリーからはてなブログへの自動移行を開始します
- はてなダイアリーからはてなブログへの自動移行が完了し、はてなダイアリーでの記事の公開が終了しました
自動移行に必要な3つの工程
はてなダイアリーをはてなブログに移行するには、以下の工程を上から順に実行していく必要がありました。
- 移行先のブログを作る
- 日記データを移行させる
- 移行完了メールを送信する
この一連の工程は、それぞれが冪等である必要があります。冪等性を考慮せず、全ての工程が1つのバッチ処理として実装されている場合はどうなるでしょうか。
例えば、移行先のブログは作成できたものの、何らかの理由でデータの移行に失敗したときを考えてみましょう。理由を特定して修正した上で再実行することになりますが、冪等性が考慮されていなければ、再実行時に移行先を再作成しようとしたり、移行済みの記事を重複移行させたりしてしまいます。
全工程を1つのバッチ処理として、再実行前にまっさらにするという方法も考えられましたが、移行作業全体の時間コストにも影響してしまうため選択していません。後述するように進捗を保存しておいて、完了済みの工程をスキップするようにしました。
それでは、それぞれの工程で必要な作業を簡単に解説していましょう。
工程1. 移行先のブログを作る
まず、はてなダイアリーの移行先となるはてなブログを作成する必要があります。
はてなブログを作るには、ブログのURLを設定する必要があります。これは実行時に動的に決定する必要はなく、事前に決めておけばよいため、ブログを作る際に必要な入力を固定でき、実行ごとに変化することはありません。
ブログを作るのに成功すれば、DBに「ブログ作成成功」として保存しました。同じく失敗したら「失敗」として保存し、原因が究明できたら必要な修正を施して再開する、ということをやっていきました。
工程2. 日記データを移行させる
移行先のブログを作ることに成功すれば、日記データを移行できます。日記データの移行に必要なものは、移行元のはてなダイアリー、移行先のはてなブログ、ブログの公開状態の3つです。これが実行ごとに変化することはありません。
移行開始時にはステータスを「移行作業中」とし、移行できた日記データのエントリそれぞれを「移行済み」としてDBに保存していきました。仮に途中の移行が失敗して再実行することになっても、「移行済み」のエントリに関して処理をスキップすれば重複移行を避けられます。
日記データ全体の移行が完了すれば「完全移行済」としてDBに保存し、これ以降のどこかで処理に失敗しても、データ移行が重ねて行われないようにしました。
工程3. 完了メールを送信する
日記データの移行が完了すると、完了メールを送信する工程に移ります。メール送信で必要なものは、移行先のはてなブログの情報とメールアドレスです。これらが実行ごとに変化することはありません。
メールの送信に成功すれば「メール送信済み」としてDBに記録していき、再実行時には「送信済み」ならばスキップします。これで、同じメールが複数回送信されることを避けられます。
なぜ工程を3つに分割したか?
そもそも一連の処理を分割したのには、以下の理由がありました。
- 移行処理の進捗を把握しやすくする
- 失敗した場合の再実行をできるだけ容易にする。
この施策では移行する対象が多く、どれくらい時間がかかるのか開始前に想定できなかったため、進捗をできるだけ把握しやすくしたいと考えていました。工程を分割することで、あるダイアリーの移行がどの工程まで進んだか、移行処理が完全に終了しているかなど、細かい進捗を把握できるようになりました。
また、どのような理由で失敗するかも分からないので、再実行が発生する前提で実装する必要がありました。どこかの工程で失敗した場合、処理を修正した上で再実行する必要があります。すでに終わった工程は再実行する必要がないので、全体を上記のように分割し、それぞれを冪等に実装することで、再実行時の手間をできるだけ減らすことを狙いました。
全ての工程をジョブキューで結合する
上記の3つの工程を結合する必要があります。各工程の処理は、ジョブキューによって管理していました。
分割した工程の処理が一方向に進むように、前工程が完了したら次工程のジョブを入れるようにします。実行時には、前の工程が終わっていることと、現工程が完了済みではなく実行中でもないことを確認し、問題なければ実行します。
各工程がそれぞれ冪等なので、同じジョブがエンキューされても問題ありません。実行中であれば何もせず、完了済みであれば次の工程のジョブを入れるだけで終了します。結果として、再実行時には最初の工程のジョブをエンキューするだけでよく、再実行時の手順や考慮することを大幅に減らすことができました。実際、何度か再実行することになりましたが、非常にスムーズでした。
これを全ての移行対象のはてなダイアリーに対して実行していき、はてなダイアリーからはてなブログへの自動移行を完遂しました。
まとめ
今回は、バッチ処理の冪等性に関する考え方を説明しました。その上で、はてなダイアリーからはてなブログへの自動移行を実例として紹介しました。
今後、特にクラウドネイティブな環境でバッチ処理を動かしていく上で、冪等性について検討することは避けられないでしょう。そのような場面で、このエントリの内容が少しでも役立つことがあれば幸いです。
はてなでは、バッチ処理について一緒に議論したり、既存システムのクラウドネイティブ化に興味のあるエンジニアを募集しています。