Rails Threading Gains


Consider the following (extremely simplified) workflow:

1) Create a 2 new cloud servers (somewhere) 2) Wait for them to be created 3) Start an arbitrary process on both of them

Now, normally we would use sidekiq to perform our background processing, however, in workflow scenarios like the above where we want to synchronously wait on some operations to be performed in parallel then sidekiq doesn't quite fit the bill (we end up with a proliferation of jobs and batches that become very complex as the number of parallel operation in the same workflow increase). So, in this case, we'll roll our own.

What about the GIL


Class: LambdaContext

First we need to define a class called LambdaContext which serves simply to store our lambda definition and run-results:

class LambdaContext   attr_accessor :lambda_definition,      :parameters,      :stdout,      :stderr,      :stderr_backtrace,      :exit_code,      :executed 
def initialize(lambda_definition = nil, *args)
self.lambda_definition = lambda_definition self.parameters = args self.executed = false end end

Now we can simply create a collection of these lambda_context objects to run in parallel later on. From our sample workflow (assuming of course that the method create_server with parameter server_index exists):

lambda_contexts = [] 2.times do |idx|    lambda_def = lambda {|svr_idx| create_server(svr_idx) }   lambda_contexts << LambdaContext.new(lambda_def, idx) end

Note: we are passing in our svr_idx argument ourselves (instead of defining the lambda directly with the value of idx). In the above example it makes little difference, but in a case where the value of idx could change before the lamdba executed (reference value) we would want to be able use the reference value we originally wanted.

Class: ThreadRunner

We can now define a class to perform the operations in parallel using Rubies native Thread class implementation:

class ThreadRunner    attr_accessor :lambda_contexts    def initialize      @lambda_contexts = []    end 
# executes and returns true or false if success or fail
def execute # define the threads container threads = [] # loop through the lambdas @lambda_contexts.each do |lambda_context| # create a thread and start threads << run_thread(lambda_context) end # join all the threads remaining and capture the output return join_thread_batch(threads) end
# returns errors after execution is complete
def after_execution_get_full_errors errors = @lambda_contexts.select do |lc| !lc.stderr_backtrace.blank? end errors.map do |lc| "EXCEPTION: #{lc.stderr} AT #{lc.stderr_backtrace}" end end

# executes the lambda definition and stores the results
# in TLS for retrieval later def run_thread(lambda_context) lc = lambda_context lc.executed = true thread = Thread.new do with_connection do begin params = lc.parameters if params.blank? Thread.current[:stdout] = lc.lambda_definition.call else Thread.current[:stdout]=lc.lambda_definition.call(*params) end Thread.current[:exit_code] = 0 rescue => exc Thread.current[:stderr] = exc.message Thread.current[:exit_code] = -1 Thread.current[:stderr_backtrace] = exc.backtrace.join("\n") end end end return thread end # ensures that connections are handled in the case of # Ruby Timeouts (evil) def with_connection(&block) ActiveRecord::Base.connection_pool.with_connection do yield block end rescue Timeout::Error => exc ActiveRecord::Base.clear_active_connections! ActiveRecord::Base.connection.close raise exc end # joins all threads in the existing batch and stores their # results against the lambda_contexts collection def join_thread_batch(threads) all_ok = true threads.each_with_index do |thread, index| lambda_context = @lambda_contexts[index] thread.join lambda_context.stdout = thread[:stdout] lambda_context.stderr = thread[:stderr] lambda_context.stderr_backtrace = thread[:stderr_backtrace] lambda_context.exit_code = thread[:exit_code] all_ok = lambda_context.stderr.nil? if all_ok end return all_ok end end

Example Usage

# create thread_runner object thread_runner = ThreadRunner.new 
# add lambdas to the thread_runner
2.times do |idx| # define the method lambda lambda_def = lambda {|svr_idx| create_server(svr_idx) thread_runner.lambda_contexts << LambdaContext.new(lambda_def, idx) end
# execute all lambdas in parallel
# and wait for them to complete all_ok = thread_runner.execute # spit out errors if any failed unless all_ok # do something puts thread_runner.after_execution_get_full_errorsend ... profit!


Happy coding!

Originally published at blog.cloud66.com on August 22, 2017.



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Cloud 66

DevOps-as-a-Service to help developers build, deploy and maintain apps on any Cloud. Sign-up for a free trial by visting: www.cloud66.com