Hi, welcome to Word of Mike, my little corner of the internet. I am a Software/Web Developer working in North Yorkshire. I mainly write about programming but my other passion is politics so beware. (click to hide)

2014-12-14 18:51:44 UTC

Parallel Query in Rails (Querying Multiple Databases)


I needed a kind of Shard-Query way to read data from multiple databases with the same schema in our Rails app. We have a multitenant app with a separate database per client and I wanted a way to query across a group of databases, quickly.

I reasoned that a kind of MapReduce approach, where the query was fired at each database concurrently, using threads, and then the results were reduced into one result set and returned, was the way to go. It became obvious quite quickly that Rails wasn't going to make this simple for me.

I have released this as the gem Querrel.

I could do this naively without concurrency and just loop over the different databases querying each one and collating the results, but this will take longer the more databases I'm querying across:

dbs.each do |db|
  ActiveRecord::Base.establish_connection(db.to_s)
  results[db] = scope.reload
end
ActiveRecord::Base.establish_connection(Rails.env)

With threads, the theory is they'll all run concurrently and the total time will be however long the longest query takes.

The crux is that it needs to be able to use different connection pools to query the same model concurrently in different threads. Because AR determines the connection pool in a hierarchical fashion from the AR class that we're querying, we cannot change this in each separate thread because the AR class is being used by all the threads.

# This will cause all kind of problems because our modification of the connection pool for Post is not threadsafe.
threads = []
results = {}
[:db1, :db2].each do |db|
  threads << Thread.new do
    Post.establish_connection(db)
    results[db] = Post.where(title: "Foo")
  end
end
Post.establish_connection(Rails.env)

I couldn't find a way around this, so I had to initialise my own connection pool and use it vanilla without connecting it up to AR. I wrote this method to create the connection:

def while_connected_to(db, &b)
  conf = ActiveRecord::ConnectionAdapters::ConnectionSpecification.new(Rails.configuration.database_configuration[db.to_s], "mysql2_connection")
  pool = ActiveRecord::ConnectionAdapters::ConnectionPool.new(conf)
  pool.with_connection(&b)
ensure
  pool.disconnect!
end

This will create a connection pool connected using the database settings for the environment passed to it and yield a connection object to the block. In order to still use AR relations, I'm using `to_sql` combined with AR::Connection#select_all (inspired by the implementation of AR::Base#find_by_sql):

def map(scope, dbs)
  sql = scope.to_sql
  query_model = scope.model
  results = {}

  threads = []
  dbs.each do |db|
    threads << Thread.new do
      while_connected_to(db) do |conn|
        result_set = conn.select_all(sql, "MapReduce Load")
        column_types = result_set.column_types

        results[db] = result_set.map { |record| query_model.instantiate(record, column_types) }
      end
    end
  end
  threads.each(&:join)

  results
end

We have to be wary here of how Rails and database connections behave with Threads. If we use a model method in our thread that causes the model to check out a connection for that thread, it does not check it back in automatically, so you can easily exhaust your connection pool. We're not touching the model in the threads so we're okay.

With our results from each database, we will just simply merge them for the time being, making the records read only in the process. So to hook this whole thing up:

class ShardQuery
  class << self
    def action(scope, options = {})
      buckets = map(scope, options[:on])
      reduce(buckets)
    end

    def map(scope, dbs)
      sql = scope.to_sql
      query_model = scope.model
      results = {}

      threads = []
      dbs.each do |db|
        threads << Thread.new do
          while_connected_to(db) do |conn|
            result_set = conn.select_all(sql, "MapReduce Load")
            column_types = result_set.column_types

            results[db] = result_set.map { |record| query_model.instantiate(record, column_types) }
          end
        end
      end
      threads.each(&:join)

      results
    end

    def reduce(buckets)
      buckets.map do |db, results|
        results.map do |result|
          result.tap { |r| r.readonly! }
        end
      end.flatten
    end

    def while_connected_to(db, &b)
      conf = ActiveRecord::ConnectionAdapters::ConnectionSpecification.new(Rails.configuration.database_configuration[db.to_s], "mysql2_connection")
      pool = ActiveRecord::ConnectionAdapters::ConnectionPool.new(conf)
      pool.with_connection(&b)
    ensure
      pool.disconnect!
    end
  end
end

Let's benchmark this!

Benchmark.ips do |x|
  x.report('sequential_2_dbs') { ShardQuery.sequential_action(Student.where(firstname: "Asif"), on: [:db1, :db2]) }
  x.report('threaded_2_dbs') { ShardQuery.threaded_action(Student.where(firstname: "Asif"), on: [:db1, :db2]) }
  x.compare!
end

Benchmark.ips do |x|
  x.report('sequential_3_dbs') { ShardQuery.sequential_action(Student.where(firstname: "Asif"), on: [:db1, :db2, :db3]) }
  x.report('threaded_3_dbs') { ShardQuery.threaded_action(Student.where(firstname: "Asif"), on: [:db1, :db2, :db3]) }
  x.compare!
end

Benchmark.ips do |x|
  x.report('sequential_4_dbs') { ShardQuery.sequential_action(Student.where(firstname: "Asif"), on: [:db1, :db2, :db3, :db4]) }
  x.report('threaded_4_dbs') { ShardQuery.threaded_action(Student.where(firstname: "Asif"), on: [:db1, :db2, :db3, :db4]) }
  x.compare!
end
# 2 databases
Calculating -------------------------------------
    sequential_2_dbs     1.000  i/100ms
      threaded_2_dbs    14.000  i/100ms
-------------------------------------------------
    sequential_2_dbs     40.178  (±37.3%) i/s -    147.000 
      threaded_2_dbs    149.854  (±22.0%) i/s -    700.000 

Comparison:
      threaded_2_dbs:      149.9 i/s
    sequential_2_dbs:       40.2 i/s - 3.73x slower


# 3 databases
Calculating -------------------------------------
    sequential_3_dbs     2.000  i/100ms
      threaded_3_dbs     9.000  i/100ms
-------------------------------------------------
    sequential_3_dbs     22.601  (±31.0%) i/s -    102.000 
      threaded_3_dbs    100.917  (±24.8%) i/s -    477.000 

Comparison:
      threaded_3_dbs:      100.9 i/s
    sequential_3_dbs:       22.6 i/s - 4.47x slower


# 4 databases
Calculating -------------------------------------
    sequential_4_dbs     1.000  i/100ms
      threaded_4_dbs     7.000  i/100ms
-------------------------------------------------
    sequential_4_dbs     18.097  (±33.2%) i/s -     76.000 
      threaded_4_dbs     83.095  (±26.5%) i/s -    364.000 

Comparison:
      threaded_4_dbs:       83.1 i/s
    sequential_4_dbs:       18.1 i/s - 4.59x slower

As you can see, the more databases we query the better the advantage over the sequential implementation. More improvement would probably be had in JRuby, for instance, because of how threads are implemented in MRI but I don't have the facility to test that at the moment.

I'd like to support more stuff properly like ordering, and I hope to release a small gem soon which encapsulates it all.