如何更可控、稳定地写Rails脚本操作线上数据库的记录

2016/11/08 Rails

描述:某个功能需要为数据库增加相关数据。大概逻辑是:根据表Company,根据某些条件往Department和DepartmentsUser表中插入数据。 然后将User表中满足筛选条件的status字段进行更改。

结果:需要超过一个小时才将整个脚本跑完,也就是这一小时都在影响线上数据库

反思:可以更可控、稳定地写Ruby脚本操作线上数据库的表吗?

一、思考脚本是否可以在项目升级前运行

如果脚本可以在项目升级前运行,则可以在一定程度上更早的发现问题,以做出对正式升级时的预防。比如,操作新的表,或新的字段。这种情况是完全可以提早运行脚本的。 有的脚本是必须要升级之前运行的,由于线上代码逻辑依赖于这个脚本的数据。所以,需要在升级之前跑一下,升级之后再跑一下。这样的脚本需要写成可重复执行。 但是有的脚本是只能升级后跑的,则千万别在升级前跑

二、是否有长时间commit操作

警惕长时间commit的操作。举个例子:比如用update_all或批量插入在一定程度上可以更快的运行sql,但是,这些操作的commit的时间可能会很长(如果你用的是MySQL)。这时候用户可以进行select 或 insert操作,但是对update操作就会阻塞了。 如果时间过长则很有可能超时。这样会让用户感知到系统的延迟或崩溃。所以,不使用这种批量操作,排除commit长时间的操作,使用粒度小的create,update,destroy操作或者每次小批量的commit操作。如果一个commit很大,中间异常断了,则前面跑的 sql语句就废弃了。而粒度小的操作则可以避免这样。虽然,总的执行时间会更长,但是,这样就不会长时间占用数据库的某个表而影响用户的一些使用。 如果真的无法避免长时间的操作,就需要在后半夜大家睡觉的时候跑脚本或者进行数据库操作了(比如新增字段,新增索引等)。

三、捕获异常错误

使用异常,捕获异常错误,将异常打印出来。或者可以在全局新建一个数组,然后将某次异常的操作中相关记录的id存入数组中,最后打印出来,就可以知道在执行过程中哪个记录是有问题的,可以很方便的追踪错误,后续就可以 很方便的进行验证和纠错了。

四、让脚本可以重复运行

尽量让脚本可以重复运行。即使加入额外的find或者find_or_create_by查询。或者捕获异常,而不是让异常中断操作。不要使用清空数据库的方式,因为这样需要从0条记录开始跑,所需要的时间更长。 脚本重复运行要注意的是,避免每次的重复运行增加脏数据。只有update操作的脚本可以重复运行,如果是create的操作,可以在操作前先判断一下是否存在,不存在则创建。 find_or_create_by方法可以使用block方法,更方便的使用。

五、使用find_each 或 find_in_batches

使用find_each 或 find_in_batches 分批次的进行查询循环操作。注意,find_in_batches得到的是对象数组集合。

六、使用gem ‘ruby-progressbar’ 愉快的展示执行进度

在Gemfile中假如 gem ‘ruby-progressbar’, require: false。 然后在脚本文件中 require ‘ruby-progressbar’ 进行使用。可以很方便的展示脚本执行的百分比和进度。

七、统计脚本结果信息,对比正确的信息以确定脚本运行无误。

八、踩过的坑。。。。。

在rails c(用的是pry环境)不要跑脚本,因为脚本代码中如果有next方法,会使用到的是pry的next方法而不是ruby的next方法,然后你就掉坑里了这对rails 4.0 的版本有这个问题, 4.2 以上的版本修复了这个问题
代码中不要定义和数据库字段相同名称的方法,如果有重名的方法,请先确认是需要该方法还是需要的是数据库的字段。是数据库的字段请使用read_attributes
让每个脚本只执行一个循环模块,进行一种数据的更改。尽量保持独立。如果有互相依赖的循环逻辑,再写在一个脚本中。

九、对执行脚本后,线上数据的分析和校验。我觉得这也是最难的一步了,这一步根据不同项目而不同。我觉得应该在执行脚本之前,将验证数据的方案也准备好,

而不是脚本执行完就过了,而不知道线上数据到底正确不正确。所以,执行脚本其实应该分两部分。一部分是正常的更改逻辑,另一部分是验证线上数据正确与否的方案。

也许会因此会有成倍的代码量,但是,如果给线上数据带来隐患,给用户造成损失或不便,这才是更大的问题。

代码示例分析:

require 'ruby-progressbar'
progressbar = ProgressBar.create :total => Company.count, format: "%p%% [%B] %c/%C %E"
progressbar.log "= 开始初始化 ="
Company.find_each do |company|
  begin
    department = Department.create!(company_id: company.id, name: company.subdomain, parent_id: 0, is_default: true)
    company.agents.each do |agent|
      begin
        DepartmentsUser.create!(user_id: agent.id, department_id: department.id)
      rescue => e
        puts e
        next
      end
    end
  rescue => e
    puts e
    next
  end
  progressbar.increment
end

progressbar.log "=== 初始化完毕 ==="
User.where(p: ['A','UG']).update_all(status: true)

代码示例小结:

1.脚本是为新的数据库补充对应数据,所以可以在项目升级前运行

2.脚本使用了find_each

3.脚本进行了异常捕获

4.脚本使用了ruby-progressbar

可以改进的地方:

1.脚本不能重复跑,如果重复跑的话,对于create操作会产生脏数据;将create!改为find_or_create_by!

2.没有收集异常错误的记录

3.没有统计脚本结果信息,对比正确的信息

4.建议将progressbar.increment 放到循环的最开始

5.使用了update_all 这种可能长时间锁表的操作

代码示例改进:

require 'ruby-progressbar'
progressbar = ProgressBar.create :total => Company.count, format: "%p%% [%B] %c/%C %E"
progressbar.log "= 开始初始化 ="
error_user = []
error_company = []
update_user_error = []
Company.find_each do |company|
  progressbar.increment
  begin
    department = Department.find_or_create_by!(company_id: company.id, name: company.subdomain, parent_id: 0, is_default: true)
    company.users.each do |user|
      begin
        DepartmentsUser.find_or_create_by!(user_id: user.id, department_id: department.id)
      rescue => e
        puts e
        error_user << user.id
        next
      end
    end
  rescue => e
    puts e
    error_company << company.id
    next
  end
end

User.where(p: ['A','UG']).find_each do |user|
  begin
    user.update!(status: true)
  rescue => e
    puts e
    update_user_error << user.id
  end
end

progressbar.log "Company count: #{Company.count} -- Department count: #{Department.count} -- DepartmentsUser count: #{DepartmentsUser.count} User count: #{User.count}"
progressbar.log "error: error_user: #{error_user}; error_company: #{error_company}"
progressbar.log "=== 初始化完毕 ==="
执行: bundle exec rails runner -e development init_department.rb

= 开始初始化 =                                                                                                                                                                        
100% [======================================================================================================================================================================] xxx/xxx Time: 00:00:00
Company count: xxx -- Department count: xxx -- DepartmentsUser count: xxx User count: xxx                                                                                                          
error: error_user: []; error_company: []                                                                                                                                                         
=== 初始化完毕 ===

本文提供的几个思路其实都不难,只是往往我们都忽略了。

无论是写rails脚本,rake脚本, 复杂的migration或者其他语言的脚本操作线上的数据库,都应该思考如何对线上的数据库影响最小,对用户的使用影响最小,以及最终的结果是否正确 也许在本地或测试环境你的脚本运行的没有问题,但是,线上的数据数量往往是本地和测试环境的n倍,并且已经有很多隐藏的脏数据,各种奇妙的事情都有可能发生。即使是在半夜运行脚本, 能够预先做更多的准备对应出现的异常情况,也能更快的找到问题的所在。

准备下一次的项目升级

补充

使用rake脚本,将MySQL数据同步到ElasticSearch

STDOUT.sync = true

# 该方法实现find_in_batches 功能,但是order by 是以 updated_at 排序,在使用ES增量同步时,提高了效率
  def find_in_batches(relation, order_cloumn = "updated_at", batch_size = 1000)
    relation = relation.order("#{order_cloumn} ASC").limit(batch_size)
    records = relation.to_a

    while records.any?
      records_size = records.size
      offset_column = records.last.send("#{order_cloumn}")

      yield records

      break if records_size < batch_size
      records = relation.where("#{order_cloumn} >= ?", offset_column).to_a
    end
  end

#coding: utf-8
# 第一次的时候,要确保有对应的索引存在
# rake es_sync:sync_data_es from=2016-03-04 to=2016-09-08 table=Ticket company_id=x batch_size=1000
# 支持 并发 rake parallel=1/4 es_sync:sync_data_es form=2016-03-04 table=Ticket
# batch_size=1000 批量取的数量
# 公司参数可传可不传
# table 参数要是对应model的名称 如: Ticket, to 和conpany_id 可以不传
# 对于有自定义字段的同步,可以在最后加 es_sync=true 优化自定义字段的同步速度,暂时对ticket有效
namespace :es_sync do

  desc 'MySQL 数据记录同步到ES'
  task sync_data_es: :environment do
    from       = ENV['from']
    to         = ENV['to']
    model      = ENV['table']
    company_id = ENV['company_id']
    lot_num    = ENV['parallel'].split('/').first if ENV['parallel']
    batch_size = (ENV['batch_size'] || 1000).to_i

    from = Time.parse(from)
    to   = to.present? ? Time.parse(to) : Time.now
    record_total = 0
    i = 0
    s = Time.now

    time = s.strftime("%Y%m%d%H%M%S")
    log_name = "es_sync_error_log_#{time}_#{model}_#{lot_num}.log"
    LOGGER = create_logger(log_name)
    if model.blank?
      LOGGER.record do |title, log|
        title << "ES Sync Mysql"
        log[:info] = "table param is blank"
      end
      return
    end
    LOGGER.record do |title, log|
      title << "ES Sync Mysql"
      log[:info] = "++++++++++ start sync work ++++++++++"
    end

    model = model.camelize.constantize
    model.index_name # 用于验证是否有对应的ES索引,没有会报错而不再继续进行
    if model == WorkLog
      where_sql = model.where(created_at: (from..to))
    else
      where_sql = model.where(updated_at: (from..to))
    end
    if company_id.present?
      where_sql = where_sql.where(company_id: company_id)
    end
    text_field_fields = $ARGV.last == "es_sync=true" ? TextField.get_text_select_fields : nil
    find_in_batches(where_sql, "updated_at", batch_size) do |records|
      body_ary = []
      i += 1
      next unless Paralleler.valid?(i)
      records.each do |record|
        begin
          record.custom_field_es_hash = text_field_fields if text_field_fields.present?
          body_ary << { index: { _id: record.id, data: record.as_indexed_json } }
        rescue => e
          options                           = {'exception'=>{}}
          options['record_id']              = record.id
          options['occur_at']               = Time.current
          options['exception']['message']   = e.try(:message)
          options['exception']['backtrace'] = e.try(:app_backtrace)
          LOGGER.record  do |title, log|
            title << "ES Sync Mysql"
            log[:info] = "++++++++++++error: #{options}"
          end
        end
      end
      next if body_ary.blank?
      begin
        model.__elasticsearch__.client.bulk({
          index: model.index_name,
          type: model.document_type,
          body: body_ary
        })
        record_total += records.size
        last_record = body_ary.last
        last_id     = last_record.try(:[], :index).try(:[], :_id)
        LOGGER.record do |title, log|
          title << "ES Sync Mysql"
          log[:info] = "NO.#{i} batch imported(#{records.size})"
          log[:record_totaltotal] = "-------- #{record_total}"
          log[:last_id] = " last record id: #{last_id}"
        end
      rescue => e
        options  = {'exception'=>{}}
        first_record = body_ary.first
        first_id     = first_record.try(:[], :index).try(:[], :_id)
        options.merge!({first_id: first_id, last_id: last_id})
        options['occur_at']               = Time.current
        options['exception']['message']   = e.try(:message)
        options['exception']['backtrace'] = e.try(:app_backtrace)
        LOGGER.record do |title, log|
          title << "ES Sync Mysql"
          log[:info] = "++++++++++++error: #{options}"
        end
      end
    end
    model.__elasticsearch__.refresh_index!

# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++sync deleted record start
  if model != WorkLog
    LOGGER.record do |title, log|
       title << "ES Sync Mysql"
       log[:info] = "sync deleted record start"
     end
    where_sql = EsSyncDeletedRecord.where(record_type: model.to_s).where("updated_at >= ?", from)
    if company_id.present?
      where_sql = where_sql.where(company_id: company_id)
    end
    i = 0
    total = 0
    find_in_batches(where_sql, "updated_at", batch_size) do |records|
      i += 1
      next unless Paralleler.valid?(i)
      body = records.map{|record| {delete: {_id: record.record_id}}}
      model.__elasticsearch__.client.bulk({
        index: model.index_name,
        type: model.document_type,
        body: body
      })
      total += records.size
    end
    LOGGER.record do |title, log|
       title << "ES Sync Mysql Delete"
       log[:info] = "-------- deleted record count #{total}"
       log[:count] = "-------- EsSyncDeletedRecord count #{where_sql.count}"
     end
   end
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    e = Time.now
    x = e - s
    LOGGER.record do |title, log|
      title << "ES Sync Mysql"
      log[:info] = "Sync work is done~~~~~~"
      log[:second] = "Need seconds: #{x}"
    end
  end

# rake es_sync:check_sync_data_es from=2016-03-04 to=2016-09-08 table=Ticket company_id=x
# 公司参数可传可不传
# table 参数要是对应model的名称 如: Ticket
# 同步数量小于5w的不适合用本脚本,直接使用import force:true 不断同步就可以
# 没有updated_at 字段的表不可以检验
  desc 'MySQL 数据记录同步到ES校验'
  task check_sync_data_es: :environment do
    from       = ENV['from']
    to         = ENV['to']
    model      = ENV['table']
    company_id = ENV['company_id']
    count_from_mysql = ENV['count'] ? ENV['count'].to_i : nil

    from = Time.parse(from)
    to   = to.present? ? Time.parse(to) : Time.now

    if model.blank?
      puts "table param is invalid or blank"
      return
    end
    puts "++++++++++ check sync work ++++++++++"

    model = model.camelize.constantize
    model_mysql = model.where(updated_at: (from..to))
    if company_id
      model_mysql = model_mysql.where(company_id: company_id)
    end

    count_from_mysql = count_from_mysql || model_mysql.count
    count_from_es    = Elasticsearch::Model.client.count(index: model.index_name)["count"]
    index_name = model.index_name
    puts "Count from Mysql #{index_name}: #{count_from_mysql}"
    puts "Count from ES #{index_name}: #{count_from_es}"
    puts "开始进行MySQL-ES #{index_name}数据的随机抽查"

    num = count_from_mysql / 1000
    shuffle_ids = []
    if num > 999
      # 每num条随机抽一个, 抽取1000个
      1000.times.each do |item|
        min = item * num
        max = min + num
        r_num = rand(min..max)
        shuffle_ids << model.where("id >= #{r_num}").take.try(:id)
      end
    else
      shuffle_ids = model_mysql.order("rand()").limit(1000).pluck(:id)
    end
    hash = {}
    result = model.__elasticsearch__.search(
      _source: {include: 'updated_at'},
      query: {filtered: {filter: {terms: {
        _id: shuffle_ids
        }}}},
      size: 1000
      )

    result.results.to_a.each do |item|
      hash[item[:_id].to_s] = Time.zone.parse(item[:_source][:updated_at])
    end
    model_mysql.where(id: shuffle_ids).pluck(:id, :updated_at).each do |item|
      id = item.first.to_s
      if hash[id] != item.last
        puts "++++++++ disaccorded #{model} id: #{id}"
      else
        puts "++++++++ #{model} id: #{id} OK"
      end
    end
    puts "++++++++++ check sync work finish ++++++++++"
  end
end
写rails 脚本, 对ES进行mapping 预定义
# bundle exec rails runner -e development 2017-05-11-add_columns_to_users_es_mapping.rb

client = Elasticsearch::Model.client
client.indices.put_mapping index: "users", type: "user", body: {
  user: {
    properties: {
      source_type: {type: :string, index: :not_analyzed},
      source_id: {type: :integer}
    }
  }
}

Search

    Table of Contents