はじめに
業務で大量データを扱うバッチを作成する機会がありました。その際、大量データ処理時のDBにかかる負荷やタイムアウトを考慮して設計・実装する必要があったので記事にまとめます。
要件
まず、今回作成したバッチの要件を簡単に説明します。
そのシステムにはデータベースが2つあり(DB1、DB2とする)、DB1のテーブルに格納されている大量データ(約3700万件)をDB2のテーブル(全部で4テーブル)にInsertし、その後DB1のテーブルから当該データをDeleteするという処理です。
ちなみに大量データというのはテーブルに格納されている全レコードではなく、ある条件で絞ったレコードが対象です。
要件としては非常に単純なものなんですが、やはり処理対象のレコード数が膨大であることがネックになると予想しました。
設計・実装
Insert処理
まずは何も考えずに大量データに対して一気にInsert処理を行うように実装してバッチ実行してみました。
# Usage: rails data_migration:insert_records
# 約3700万件のレコード取得
records = TargetFrom.where(status: 1)
# Insert処理
begin
ActiveRecord::Base.transaction do
TargetTo.insert_all!(records)
end
rescue => e
raise ActiveRecord::Rollback
puts e
end
すると案の定、MySQLのタイムアウトエラーが出て処理が停止しました。
$ rails data_migration:insert_records
rails aborted!
ActiveRecord::Rollback: ActiveRecord::Rollback
/path/to/app/lib/tasks/data_migration.rake:26:in `rescue in block (2 levels) in <main>'
/path/to/app/lib/tasks/data_migration.rake:13:in `block (2 levels) in <main>'
Caused by:
ActiveRecord::StatementInvalid: Mysql2::Error::ConnectionError: MySQL server has gone away
/path/to/app/lib/tasks/data_migration.rake:16:in `block (4 levels) in <main>'
/path/to/app/lib/tasks/data_migration.rake:15:in `each'
/path/to/app/lib/tasks/data_migration.rake:15:in `block (3 levels) in <main>'
/path/to/app/lib/tasks/data_migration.rake:14:in `block (2 levels) in <main>'
Caused by:
Mysql2::Error::ConnectionError: MySQL server has gone away
/path/to/app/lib/tasks/data_migration.rake:16:in `block (4 levels) in <main>'
/path/to/app/lib/tasks/data_migration.rake:15:in `each'
/path/to/app/lib/tasks/data_migration.rake:15:in `block (3 levels) in <main>'
/path/to/app/lib/tasks/data_migration.rake:14:in `block (2 levels) in <main>'
Tasks: TOP => data_migration:insert_records
(See full trace by running task with --trace)
エラーメッセージで検索するとMySQLのmax_allowed_packet
というパラメーターで設定されている値より多いレコード数に対してデータ操作しようとするとタイムアウトになるとのことでした。
max_allowed_packet
の現在値を確認してみます。
mysql> show variables like 'max_allowed_packet';
+--------------------+----------+
| Variable_name | Value |
+--------------------+----------+
| max_allowed_packet | 67108864 |
+--------------------+----------+
1 row in set (0.00 sec)
max_allowed_packet
の値は約6700万となっているのでパラメーター的には問題ないはずなんですが、実際にタイムアウトエラーが出ているのでどうにかしないといけません。
そこで、大量データを適当な件数毎の配列に分割し、その単位でInsert処理を行うように実装を修正しました。
# Usage: rails data_migration:insert_records
# 約3700万件のレコード取得
records = TargetFrom.where(status: 1)
# 適当な件数毎の配列に分割
records_each_slice = records.each_slice(1_000_000).to_a
# Insert処理
begin
ActiveRecord::Base.transaction do
records_each_slice.each_with_index do |target, index|
TargetTo.insert_all!(target)
puts "TargetTo(#{index + 1}/#{records_each_slice.size}) 登録完了"
# => TargetTo(n/38) 登録完了
end
end
rescue => e
raise ActiveRecord::Rollback
puts e
end
するとタイムアウトエラーは出なくなり、(時間はかかるが)処理が完了するようになりました。
今回は100万件毎の配列に分割しましたが、RDSインスタンスのインスタンスタイプやMySQLのパラメーターなどを勘案して適宜チューニングするのがいいと思います。
Delete処理
Delete処理でも同じように大量データを適当な件数毎の配列に分割し、その単位でDelete処理を行うように実装して実行してみました。
# Usage: rails data_migration:delete_records
# 約3700万件のレコード取得
records = TargetFrom.where(status: 1)
# 適当な件数毎の配列に分割
records_in_batches = records.each_slice(1_000_000).to_a
# Delete処理
begin
ActiveRecord::Base.transaction do
records_each_slice.each_with_index do |target, index|
target.map(&:destroy!)
puts "TargetFrom(#{index + 1}/#{records_each_slice.size}) 削除完了"
# => TargetFrom(n/38) 削除完了
sleep(0.1)
end
end
rescue => e
raise ActiveRecord::Rollback
puts e
end
しかし、この実装ではタイムアウトエラーこそ出ないものの一向に処理が完了しませんでした(一晩バッチを動かし続けても約700万件ほどしか削除できていなかった)。
その理由としてRailsにはdestroy_all!
メソッドが存在しないためdestroy!
メソッドを使って一件ずつレコードを削除する実装になっていることが挙げられます。
destroy!
メソッドではなくdelete!
メソッドを使うようにしても処理速度はほとんど改善しませんでした(そもそもTargetFromに依存している関連モデルはないのでコールバック処理もない)。
そこで、思い切ってレコードを物理削除するのではなく論理削除する設計に変更することにしました。
# Usage: rails data_migration:update_records
# 約3700万件のレコード取得
records = TargetFrom.where(status: 1)
# 適当な件数毎の配列に分割
records_each_slice = records.each_slice(1_000_000).to_a
# Update処理
begin
ActiveRecord::Base.transaction do
records_each_slice.each_with_index do |target, index|
target.update_all!(status: -1)
puts "TargetFrom(#{index + 1}/#{records_each_slice.size}) 更新完了"
# => TargetFrom(n/38) 更新完了
end
end
rescue => e
raise ActiveRecord::Rollback
puts e
end
今回はstatus
カラムを-1(使用していない・使用する予定のない値)に更新することで論理削除としました。
設計によってはdelete_flag
などのカラムを追加し値を設定することで論理削除としてもいいかと思います。ただしこの設計の場合、このモデルを参照している処理でdelete_flag
の値をチェックする必要が出てきたりするのでよく考えて設計する必要があります。
in_batchesメソッド
今回は適当な件数毎の配列に分割するのにeach_slice
メソッドを使いましたが、これとよく似たin_batches
というメソッドがあります。
in_batches
メソッドは以下のように使います。
TargetFrom.where(status: 1).in_batches(of: 1_000_000) do |records_in_batches|
...
end
このようにin_batches
メソッドを使用しても適当な件数毎に分けて処理を行うことができます。
しかしin_batches
メソッドはメモリの消費を抑えることができますが、発行するSQLの問題で処理速度が遅いという弱点があります。そのため、今回のように適当な件数毎に分けて処理を行いたい場合、each_slice
メソッドを使って分割することをおすすめします。
ちなみにin_batches
メソッドはRailsの組み込みメソッドなのでActiveRecordの配列でしか使うことができません。一方、each_slice
メソッドはRubyの組み込みメソッドなのでEnumerable型のデータであれば何にでも使うことができます。
まとめ
どんなシステムであっても長期間運用し続けているとどんどんデータが蓄積していきます。大量データを抱えるDBの速度問題というのはどんなシステムでも避けられない課題の一つでしょう。
大量データに対するデータ操作 (Insert/Update/Delete) という開発要件はあまりなさそうで実は結構あることだと思います。
同じ開発要件でどう設計・実装しようか悩んでいる人の参考になれば幸いです。