multiprocessing.Poolがエラーになる人へ

Pythonで並列プログラミングしようとしたときに真っ先に思い浮かぶのが
threadingやmultitprocessingモジュールですよね。

multiprocessingにはPoolという便利なクラスがあって、同時に起動するプロセス数を
制限しながら処理を並列実行できたりします。

例:

import multiprocessing as mulp

def func(self, n):
    return n ** 3

pool = mulp.Pool()
r = pool.map(func, range(10))
print r  # [0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

しかし、この時渡す関数funcがクラスのインスタンスメソッドだったり、匿名関数だったり、関数中で定義した関数だったり、した場合には
エラーが出ます。

import multiprocessing as mulp

class MultiTest(object):
    def func(self, n):
        return n ** 3
        
    def execute(self):
        pool = mulp.Pool()
        r = pool.map(self.func, range(10))
        print r

tester = MultiTest()
tester.execute()  # error!! (pickle error)

ヘルパー関数やヘルパークラスを作ることでこの制限を回避できます。

def tomap(args):
    return getattr(args[0], args[1])(*args[2:])

def toapply(cls, mtd_name, *args, **kwargs):
    return getattr(cls, mtd_name)(*args, **kwargs)

class MulHelper(object):
    def __init__(self, cls, mtd_name):
        self.cls = cls
        self.mtd_name = mtd_name

    def __call__(self, *args, **kwargs):
        return getattr(self.cls, self.mtd_name)(*args, **kwargs)

こんな感じの。これを使って、次のようなサンプルコードを書いたので試してみては。

#!python
# -*- coding: utf-8 -*-

import multiprocessing as mulp
import itertools as itrt

def main():
    tester = MultiTest()
    tester.execute()
    
class MultiTest(object):
    def func(self, n):
        return n ** 3
        
    def execute(self):
        pool = mulp.Pool(4)  # 引数に数字を入れると同時起動数。引数なしなら勝手にcpu情報見に行って最大プロセス数を割り当てる
        # どれかひとつをコメントアウト

        #  失敗する
        # r = pool.map(self.func, range(10))

        #  これも失敗する
        # f = lambda x: toapply(self, 'func', x)
        # r = pool.apply(f,  2)

        # 成功する関数版
        # args = itrt.izip(itrt.repeat(self), itrt.repeat('func'), range(10))
        # r = pool.map(tomap, args)

        # 成功する関数版2
        # r = pool.apply(toapply, (self, 'func', 2))

        # 成功するクラス版
        # r = pool.map(MulHelper(self, 'func'), range(10))

        print 'result:', r

def tomap(args):
    return getattr(args[0], args[1])(*args[2:])

def toapply(cls, mtd_name, *args, **kwargs):
    return getattr(cls, mtd_name)(*args, **kwargs)

class MulHelper(object):
    def __init__(self, cls, mtd_name):
        self.cls = cls
        self.mtd_name = mtd_name

    def __call__(self, *args, **kwargs):
        return getattr(self.cls, self.mtd_name)(*args, **kwargs)

if __name__ == '__main__':
    main()