Pythonで並列処理・並行処理を提供する標準モジュールは数多くあり、初めてだと違いを理解するのは困難です。この記事では、それぞれの違いについて調べました。
thread
モジュール(Python 2), _thread
モジュール(Python 3)
かつてPython 2にはthread
モジュールという複数のスレッドを扱うためのモジュールが存在していましたが、Python 3でdeprecated扱いになりました。一応_thread
モジュールという名前で残っています。公式でも述べられているように、一般には、thread/_thread
モジュールではなく、より高レベルなthreading
モジュールの使用が推奨されるようです。
threading
モジュール
threading
モジュールは、先述の通り、複数のスレッドを扱うためのモジュールです。thread/_thread
モジュールより高レベルとはいうものの、この後に紹介するモジュールに比べるとまだまだ低レベルで、C++11のthread
ライブラリと同程度の印象を受けます。
コード例を以下に示します。threading.Thread
クラスを継承したクラスを作るのが常套手段のようです。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name, sleep_time):
threading.Thread.__init__(self)
self.name = name
self.sleep_time = sleep_time
def run(self):
print ("Starting " + self.name)
time.sleep(self.sleep_time)
print ("Exiting " + self.name)
thread_num = 3
threads = []
for i in range(thread_num):
threads.append(MyThread("Thread-{}".format(i), 5 - i))
for th in threads:
th.start()
for th in threads:
th.join()
print("end")
この例では、3つのスレッドをstart()でほぼ同時に立ち上げます。3つのスレッドは、それぞれ5秒、4秒、3秒待機したのちに終了します。すべてのスレッドが終了するのをjoin()で待ってからプログラムを終了します。このプログラムを実行するとちょうど約5秒で終わることが確かめられます。
ここで、Pythonでのマルチスレッド処理は、C++とのマルチスレッド処理とは大きく異なることを知っておくのは重要です。Pythonの主要な実装系の一つであるCPythonにはGIL(Global Interpreter Lock)という機構があり、複数のスレッドが同時にPythonのバイトコードを実行することを許しません(参考:GlobalInterpreterLock - Python Wiki)。なので、例えばCPU速度がボトルネックになるような重い計算処理(いわゆるCPU boundな処理)を、このthreading
を使って複数のスレッドに割り振って動かしたとしても、実際にはGILの制約のために複数のスレッドが同時に実行されることはなく、処理時間は期待したように短くならないはずです。
一方、ディスクの読み出し・書き込みなどのI/O待ち時間が大量に発生するような処理(いわゆるI/O boundな処理)であれば、GILは問題にならないので、このthreading
を使ってマルチスレッド化することで処理時間が早くなる可能性があります。
threading
モジュールは後述する他のモジュールに比べて自由度が高い分、デッドロックやデータ競合が起こらないように十分考慮してプログラムを組む必要があります。以下は、リソースをロックする順番をめぐってデッドロックしてしまう例です。
import threading
a_lock = threading.Lock()
b_lock = threading.Lock()
def foo():
with a_lock:
with b_lock:
print ("a -> b")
def bar():
with b_lock:
with a_lock:
print ("b -> a")
class MyThread(threading.Thread):
def __init__(self, func):
threading.Thread.__init__(self)
self.func = func
def run(self):
while True:
self.func()
th1 = MyThread(foo)
th2 = MyThread(bar)
th1.start()
th2.start()
th1.join()
th2.join()
multiprocessing
モジュール
multiprocessing
モジュールは、複数のプロセスを扱うためのモジュールです。スレッドの代わりにサブプロセスを立ち上げてそちらで処理させることで、GILの問題を回避することができます。ただし、サブプロセスの立ち上げは、スレッドの立ち上げに比べると重い処理なので、本当にプロセス単位での並列化が必要なのか、言い換えると、スレッド単位の並列化で十分ということはないか、一考が必要です。
multiprocessing.Process
multiprocessing.Process
を使った例を以下に示します。先に示した、threading.Thread
クラスを用いた例と使い方は同じです。こちらもプログラマが使い方を誤るとデッドロックやデータ競合を引き起こすので、要注意です。
import multiprocessing
import time
class MyProcess(multiprocessing.Process):
def __init__(self, name, sleep_time):
multiprocessing.Process.__init__(self)
self.name = name
self.sleep_time = sleep_time
def run(self):
print ("Starting " + self.name)
time.sleep(self.sleep_time)
print ("Exiting " + self.name)
process_num = 3
processs = []
for i in range(process_num):
processs.append(MyProcess("Process-{}".format(i), 5 - i))
for th in processs:
th.start()
for th in processs:
th.join()
print("end")
multiprocessing.Pool
さらに、multiprocessing
モジュールは、「データを複数プロセスにばらまいて、複数プロセスで計算させ、結果を集める」(fork-join)という、並列処理でよくあるユースケースを実現する専用のAPIを追加で提供しています。それがmultiprocessing.Pool
です。
例えば、「実数からなるあるリストが与えられたとき、リストの各要素を2乗したリストを出力」する処理を、multiprocessing.Pool
を用いて4プロセス並列で実行する例を以下に示します。
import multiprocessing
def pow2(n):
return n * n
before = list(range(100000000))
with multiprocessing.Pool(4) as p:
after = p.map(pow2, before)
print(before[:5])
print(after[:5])
このプログラムを実行中にpsコマンドなどでプロセスを見ると、メインプロセスに加えてサブプロセスが4つ立ち上がっているのを観察できると思います。
より複雑な機能に関しては公式ドキュメントをご覧ください。
multiprocessing.dummy.Pool
multiprocessing
には、あまり知られていないmultiprocessing.dummy.Pool
モジュールが存在しています。前の節で紹介したmultiprocessing.Pool
はプロセス単位で処理を並列化したのに対し、このmultiprocessing.dummy.Pool
はスレッド単位で処理を並列化します。ともにAPIは同じです。
前節のコードを、スレッド単位で並列化するように変更した例を以下に示します。
import multiprocessing.dummy
def pow2(n):
return n * n
before = list(range(100000000))
with multiprocessing.dummy.Pool(4) as p:
after = p.map(pow2, before)
print(before[:5])
print(after[:5])
プロセス単位での並列化と、スレッド単位での並列化を、1行の書き換えのみで簡単に切り替えられるので、並列化の対象となる処理がCPU boundかI/O boundかを実際に確かめたいときに使えるテクニックだと思います。参考:multithreading - How to use threading in Python? - Stack Overflow
concurrent.futures
モジュール
Python3.2からconcurrent.futures
モジュールが提供されるようになりました。Python 2.x系でもPyPIから同名のパッケージを取得可能です。
これまで見てきたようなマルチスレッドやマルチプロセスの処理を隠蔽して、複数の処理を同時に行うための抽象度の高い機能を提供します。具体的には、Future
と呼ばれるクラスを提供することで、「非同期処理が完了した状態、または、未完了の状態」を表すことができるようになります。このモジュールの導入の経緯については、PEP 3148 -- futures - execute computations asynchronously | Python.org に詳しいです。
concurrent.futures
が提供するメインの機能は、futures.ThreadPoolExecutor
とfutures.ProcessPoolExecutor
です。それぞれ、マルチスレッド処理、マルチプロセス処理を扱いたい時に用います。まず、futures.ProcessPoolExecutor
を使って、1000個のタスクをプロセス並列で同時に実行する例を以下に示します。(最初はmultiprocessing.Pool()の例と同じくサイズ1億で試したのですが、メモリ使用量が際限なく増えてしまいました)
from concurrent import futures
def pow2(n):
return n * n
before = list(range(1000))
with futures.ProcessPoolExecutor(max_workers=4) as executor:
after = executor.map(pow2, before)
print(before[:5])
print(after[:5])
これは今まで見てきたmultiprocessing.Pool()
とかなり似た書き方です。
上の例ではFuture
という概念は隠蔽されていてますが、陽に扱うこともできます。以下に、map()
を使わずFuture
オブジェクトを用いて並列処理する例を示します。
from concurrent import futures
def pow2(n):
return n * n
before = list(range(1000))
with futures.ThreadPoolExecutor(max_workers=4) as executor:
print("submission starts")
to_do = []
for num in before:
future = executor.submit(pow2, num)
to_do.append(future)
print("submission ends")
after = []
for future in futures.as_completed(to_do):
res = future.result()
after.append(res)
print(before[:5])
print(after[:5])
実行例は以下です。処理順は不定です。
submission starts
submission ends
[0, 1, 2, 3, 4]
[262144, 244036, 122500, 163216, 280900]
concurrent.futures
モジュール
concurrent.futures
モジュールはPython 3.4から導入された、イベントループに基づく非同期処理を行うためのモジュールです。
このモジュールは公式ドキュメントの量が半端ではなく、自分もあまり理解できていないため、紹介のみにとどめます。
まとめ
以下の方針でモジュールを選ぶのがよいと思います。
- 処理はI/O boundである
- 処理はfork-joinモデルで並列化できる
- →
multiprocessing.dummy.Pool
を使う
- → または、
futures.ThreadPoolExecutor
のmap
関数を使う
- 処理はもっと複雑
- →
futures.ThreadPoolExecutor
のsubmit関数
を使って、タスク単位に処理を行う
- → または、
threading
を使って、さらに柔軟にモデルを組み立てる
- 処理はCPU boundである
- 処理はfork-joinモデルで並列化できる
- →
multiprocessing.Pool
を使う
- → または、
futures.ProcessPoolExecutor
のmap
関数を使う
- 処理はもっと複雑
- →
futures.ProcessPoolExecutor
のsubmit関数
を使って、タスク単位に処理を行う
- → または、
multiprocessing
を使って、さらに柔軟にモデルを組み立てる
参考URL