Limitasi concurrent work menggunakan Thread Pool

Thread Pool adalah suatu teknik perancangan software yang di-desain untuk mengontrol jumlah aksi asynchronous.

Kita telah mengetahui bahwa suatu sistem komputasi modern yang memiliki operating system, memiliki berbagai jenis thread:

  • Green thread
  • System thread

Kedua threading mechanism tersebut memiliki karakteristik yang sama: mereka dapat dieksekusi, paling tidak, dalam waktu yang hampir bersamaan (jika tidak bersamaan).

Karakteristik eksekusi thread yang bersamaan tersebut, terkadang menimbulkan konsekuensi yang patut dipertimbangkan. Yang antara lain adalah apabila terlalu banyak thread yang diciptakan melebihi dari kemampuan sistem itu sendiri, maka bisa-bisa proses tidak berjalan dengan baik, atau bahkan membuat sistem crash.

Let us assume ketika kita memiliki suatu model (MVC) yang bernama Hotel. Setiap kali kita menemukan data hotel yang belum terdapat dalam sistem, maka sistem perlu melakukan berbagai hal yang salah satunya adalah mengupload gambar-gambar ke AWS.

Jika dalam satu query terdapat 50 hotel yang belum tersimpan dalam database, maka akan terdapat paling tidak 50 request untuk melakukan image uploading ke AWS, diantara hal-hal lainnya.

Tentu saja, kita membutuhkan Thread untuk melakukan hal tersebut, karena sistem akan sangat tidak optimal ketika harus menunggu satu-per-satu untuk I/O operation yang sebetulnya dapat dilakukan secara hampir bersamaan.

Namun, ketika kita memproses semuanya menggunakan Thread-ing, maka bisa jadi sistem kita akan mengeksekusi semua 50 data dalam satu waktu, yang dapat memberatkan sistem, network dan juga database.

Solusi untuk masalah tersebut adalah dengan menggunakan teknik yang namanya Thread Pool. Pada dasarnya, setiap Thread yang ingin dieksekusi harus mengantri (queued) di dalam pool tersebut. Satu per satu, Thread akan dieksekusi oleh si pool. Karena keperluan untuk antrian ini diperlukan, maka kitapun perlu menggunakan data struktur Queue yang di Ruby bisa di representasikan oleh SizedQueue pun Queue.

Dalam struktur Queue tersebut, umum kita jumpai fungsi bernama push dan pop:

  • push: menambahkan data ke dalam Queue.
  • pop: mengambil data paling pertama dimasukkan, dan menghapusnya dari Queue.

Bedanya Queue dengan SizedQueue hanya terletak pada operasi push, dimana push pada SizedQueue mengharuskan jumlah data yang terkandung tidak melebihi dari konfigurasi saat SizedQueue di-konstruksi. Dalam arti lain, operasi push dalam SizedQueue bersifat blocking begitu sudah penuh.

Dengan mempertimbangkan kedua kemungkinan tersebut, maka kita tentu akan memilih SizedQueue untuk implementasi Thread Pool kita.

Pada awalnya, konstruksi ThreadPool kita sangat sederhana:

class ThreadPool  
  def initialize(num_workers)
    @num_workers = num_workers
  end

  def process_each(enumerable, &block)
    q = SizedQueue.new(@num_workers)

    enumerable.each { |thread| q << thread }
  end
end  

ThreadPool yang kita ciptakan akan memiliki fungsi process_each yang menerima enumerable (seperti Array). Setiap item dalam enumerable tersebut kemudian akan di-push (dengan operator <<).

Setelah itu, kita akan membuat thread sejumlah worker yang diinginkan.

class ThreadPool  
  def initialize(num_workers)
    @num_workers = num_workers
  end

  def process_each(enumerable, &block)
    q = SizedQueue.new(@num_workers)

    threads = @num_workers.times.map do
      Thread.new do
        until (item = q.pop) == :the_end
          block.call(item)
        end
      end
    end

    enumerable.each { |thread| q << thread }
    @num_workers.times { q << :the_end }

    threads.each(&:join)
  end
end  

Berikut contoh mulai menggunakan ThreadPool:

def insert_and_upload!  
  workers = ThreadsPool.new(5)
  workers.process_each(new_hotel_data) do |hotel_data|
    insert_hotel!(hotel_data)
    upload_thumbnails!(hotel_data)
  end
end  

Dengan demikian, kita akan memiliki 5 workers yang akan mengupload hotel baru, satu-per-satu.

Terdapat improvement yang dapat kita lakukan pada ThreadPool salah satunya, adalah dengan mengganti :the_end dengan hal lain. Karena, hal tersebut terlihat sangat absurd, dan, bagaimana jika sang programmer memang memiliki item yang kebetulan adalah :the_end?

  def process_each(enumerable, &block)
    q = SizedQueue.new(@num_workers)
    end_object = Object.new

    threads = @num_workers.times.map do
      Thread.new do
        until (item = q.pop) == end_object
          block.call(item)
        end
      end
    end

    enumerable.each { |thread| q << thread }
    @num_workers.times { q << end_object }

    threads.each(&:join)
  end

Kita juga membuat ThreadPool kita lebih toleran terhadap error yang terjadi di dalam thread itu sendiri, dengan membungkus proses operasi call ke dalam block begin-rescue.

    Thread.new do
      until (item = q.pop) == end_object
        begin
          block.call(item)
        rescue => e
          Informer.report_error(e)
        end
      end
    end

Tada! Kita telah menciptakan ThreadPool kita sendiri. Secara lengkap, kode kita terlihat seperti berikut.

class ThreadPool  
  def initialize(num_workers)
    @num_workers = num_workers
  end

  def process_each(enumerable, &block)
    q = SizedQueue.new(@num_workers)
    end_object = Object.new

    threads = @num_workers.times.map do
      Thread.new do
        until (item = q.pop) == end_object
          begin
            block.call(item)
          rescue => e
            Informer.report_error(e)
          end
        end
      end
    end

    enumerable.each { |thread| q << thread }
    @num_workers.times { q << end_object }

    threads.each(&:join)
  end
end  

Artikel - Artikel Terkait