盆暗の学習記録

データサイエンス ,エンジニアリング,ビジネスについて日々学んだことの備忘録としていく予定です。初心者であり独学なので内容には誤りが含まれる可能性が大いにあります。

pystanの環境構築で詰まったときのメモ

変なつまり方をしたのでメモ

結論

  • dockerのpython:3.9をベースイメージに環境を作ろうとしていたが、動かなくて困っていた
  • pystan 3.0 は gcc ≥ 9.0 が必要であるにも関わらず、python:3.9のイメージに入っているgccはver.8系だったのが原因と思われる
  • ubuntu:20.04をベースにしたら治った

症状

ドキュメントのQuick Startにあるコードを実行しようとしていたが、次のようなエラーが起こっていた

Building... This may take some time.
Messages from stanc:
Warning:
  The parameter mu has no priors.
Warning:
  The parameter tau has no priors.
Error handling request
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/aiohttp/web_protocol.py", line 422, in _handle_request
    resp = await self._request_handler(request)
  File "/usr/local/lib/python3.9/site-packages/aiohttp/web_app.py", line 499, in _handle
    resp = await handler(request)
  File "/usr/local/lib/python3.9/site-packages/httpstan/views.py", line 253, in handle_show_params
    services_module = httpstan.models.import_services_extension_module(model_name)
  File "/usr/local/lib/python3.9/site-packages/httpstan/models.py", line 90, in import_services_extension_module
    module: ModuleType = importlib.util.module_from_spec(spec)  # type: ignore
  File "<frozen importlib._bootstrap>", line 565, in module_from_spec
  File "<frozen importlib._bootstrap_external>", line 1108, in create_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
ImportError: /root/.cache/httpstan/4.4.2/models/dnt4r2yw/stan_services_model_dnt4r2yw.cpython-39-x86_64-linux-gnu.so: undefined symbol: _ZNSt19basic_ostringstreamIcSt11char_traitsIcESaIcEEC1Ev
Traceback (most recent call last):
  File "/src/getting-started.py", line 27, in <module>
    posterior = stan.build(schools_code, data=schools_data, random_seed=1)
  File "/usr/local/lib/python3.9/site-packages/stan/model.py", line 450, in build
    return asyncio.run(go())
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/site-packages/stan/model.py", line 439, in go
    raise RuntimeError(resp.json()["message"])
  File "/usr/local/lib/python3.9/site-packages/stan/common.py", line 24, in json
    return simdjson.loads(self.content)
  File "/usr/local/lib/python3.9/site-packages/simdjson/__init__.py", line 61, in loads
    return parser.parse(s, True)
ValueError: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc.

原因

おそらく、python:3.9のイメージに入っているgccが古すぎた

# gcc --version
gcc (Debian 8.3.0-6) 8.3.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

apt-get updateして新しいgccを入れようとしても「これが最新」と言われてしまい、このイメージでなんとかしようとするのは手間がかかりそうだった

対処

もっと新しいgccが入ったものを使うことにした

例えばubuntu 20.04はgcc 9.3.0がデフォルトで入っているのでこれを使うことにした

# gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
Copyright (C) 2019 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

これにより問題なくpystanが動くようになった

[python]loggerの出力が重複するのを防ぐ

pythonのloggingを使うときのメモ

背景

loggerインスタンスにStreamHandlerやFileHandlerを設定する処理は使いまわしたいので、関数にしたい。

その際、単純にその処理を関数にまとめると、その関数が複数回呼ばれて同じ名前のloggerが複数回参照された場合はそのたびに毎回addHandlerStreamHandlerを設定してしまうため、ログ出力が重複する。

# 悪い例
import logging


def get_logger(name, level=logging.DEBUG):
    """loggerを設定する (悪い例)"""
    logger = logging.getLogger(name)
    logger.setLevel(level)
    # 毎回Handlerを設定してしまう場合
    ch = logging.StreamHandler()
    ch.setLevel(level)
    formatter = logging.Formatter(
        fmt="[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger


class Horse:

    def __init__(self):
        self.logger = get_logger(self.__class__.__name__)

    def scream(self):
        self.logger.debug("ヒヒーン!")


if __name__ == "__main__":
    h1 = Horse()
    h2 = Horse()
    h2.scream()

この場合、2回__init__が呼び出されたために同じloggerに2つのStreamHandlerが設定されているため、次のように2つ重複してログが出力される

[2021-01-27 08:32:59] [Horse] [DEBUG] ヒヒーン!
[2021-01-27 08:32:59] [Horse] [DEBUG] ヒヒーン!

提案

そこでhasHandlersを使ってHandlerの存在を判定して、Handlerがなければ追加するようにすればよさそう。

import logging


def get_logger(name, level=logging.DEBUG):
    logger = logging.getLogger(name)
    logger.setLevel(level)
    if not logger.hasHandlers():
        ch = logging.StreamHandler()
        ch.setLevel(level)
        formatter = logging.Formatter(
            fmt="[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S"
        )
        ch.setFormatter(formatter)
        logger.addHandler(ch)
    return logger


class Horse:

    def __init__(self):
        self.logger = get_logger(self.__class__.__name__)

    def scream(self):
        self.logger.debug("ヒヒーン!")


if __name__ == "__main__":
    h1 = Horse()
    h2 = Horse()
    h2.scream()
[2021-01-27 08:35:54] [Horse] [DEBUG] ヒヒーン!

このようにすれば重複したログ出力は避けられる。

LightGBMの理論のまとめ

今更ながらLightGBMの論文を読んだのでその時のメモを残しておきます。

GPUでの計算への適応など、計算機での活用に関する技術については省略しています。

要約

LightGBM

  • pre-pruning(決定木の枝をそれ以上分岐させても予測が改善しなくなったら分割を停止する剪定方法)
  • best-first(情報利得が最大の枝から順に伸ばす。pre-pruningが使える。)
  • histogram-based(連続値の特徴量をbinに離散化することで最適な分割点の探索の計算量を激減させる)

という決定木の高速化に関する既存の技術と、

  • Gradient-based One-Side Sampling(勾配の情報を活用して近似精度の良いサブサンプリングをする)
  • Exclusive Feature Bundling(one-hot encodingをしたように相互に排他的になっているスパースな特徴量をlabel encodingをした特徴量のような密な特徴量にまとめることで効率良く学習する)

というLightGBM自身が新たに提案した2つの技術を使った、

高速で精度の良い勾配ブースティング決定木。

LightGBMが使う既存の技術

pre-pruning

(この節とその次の節はほぼShi, 2007の序盤の説明をまとめたもの)

決定木のすべてのノードの不純度がゼロになるような完全に成長させきった状態にしてしまうと過学習の問題が発生する。そのため、剪定(pruning)を行ってほどよい木の大きさにする必要がある。pruningの方法は次の2種類に分けられる(Shi, 2007)。

  1. pre-pruning:木を拡張させることによる予測の改善がなくなったら(それ以上分割すると誤差が増えるなら)木の拡張を停止する。
  2. post-pruning:一度木を完全に成長させて、それから一番予測精度が良かった木のサイズを選択する。

pre-pruningのほうが無駄な計算を避けられる可能性があり効率的だが、本当に最適な木の大きさになる前に学習を停止させてしまう、「early stopping」という問題がある。

early stopping問題

決定木が各ノードで分割をする際は、一つの特徴量だけを使って情報利得を評価して分割する。そのため、複数の特徴量を組み合わせると予測精度が上がるような効果(交互作用効果)を評価することはできない。

以下の表はあるデータセットを表しており、a,bが特徴量でclassが目的変数である。a,bの両方を使うと正確に分類できるが、単体では予測に寄与しない。

f:id:nigimitama:20210116210941p:plain

このデータのように交互作用効果しかもたず単体では予測に寄与しない特徴量だけを含むデータセットでは、pre-pruningの決定木は分割を行わずに学習を終了してしまう。この問題があるためにpre-pruningはあまり評価されず、post-pruningの手法が開発された。

しかし、実世界のデータセットではこの問題はあまり起きないという報告もある。Frank(2000)やShi(2007)は数十個のオープンなデータセットを使ってpre-pruningとpost-pruningを比較し、pruningのパラメータ調整によっては両者のパフォーマンスに大きな差が生まれないことを示した。

(考察:これは私の仮説だが、parity problemのように「単体ではまったく予測に寄与しないが交互作用はある」というデータセットは実データでは稀ということかもしれない。ちなみにLightGBMはparity problemを解くことはできないが、aとbを掛け合わせた変数を追加してやれば解くことができるので特徴量の工夫で対応できないことはない)

best-first (leaf-wise) tree

標準的な決定木はdepth-firstという種類のアルゴリズムを使っているが、LightGBMが使っているのはbest-firstというタイプのものである。それぞれの特徴は次の通り。

  1. best-first:木を分岐していく各ステップで、すべての分割可能なノードの中でもっとも不純度を低下させるノードを分割する。
  2. depth-first:ノードの分割の順番は固定(通常は左の枝が先に分割される)

両者は木が最大に成長した時に得られる木は一緒だが、best-firstは最良のノードから分割していくので、pre-pruningを使うことができる。

histogram-based

GBDTの計算量の大部分は最適な分岐点の探索にある。これまでに研究されてきた分岐点の探索の高速化にはpre-sortedアルゴリズムとhistogram-basedアルゴリズムの2つがある。

  1. pre-sorted:事前に特徴量をソートしておき、すべての分割可能な点を評価して最適な分割点を探す。正確だが計算量が非常に多い。
  2. histogram-based:連続値の特徴量を離散化して少ない数(例えば256個とか)のbinにして、その中で最適な分割点を探す。binの数だけ評価すればいいので分割点の探索自体の計算量は非常に少ない。

histogram-basedアルゴリズムを使うほうが大幅に高速化ができるので、LightGBMはこちらをベースに研究している。

考察:予測精度は激減しないのか?

例えば100万レコードあって100万のユニークな値がある特徴量を256個のbinに離散化するようなことを考えると、値を減らしすぎではないか?予測精度も大きく悪化してしまうのではないか?と思ってしまう。

しかし、実証的には問題ないという報告がある。LightGBMが参照している先行研究のひとつであるLi et al.(2008)ではbinの数を65536個にしたときと256個にした時でtestデータに対するNDCGスコアの差はほとんど無かったと報告している。また、LightGBMの論文もXGBoostのexact greedy algorithm(pre-sorted algorithmのこと)を使ったモデルよりもよい精度を出している。

f:id:nigimitama:20210116211005p:plain

理論的に問題ないのかについての回答は、いまのところ私は見つけられていない。しかし、現在のところ私は2つの仮説を考えている。

  1. LightGBMのAlgorithm 1を見た感じだと、分割の探索のたびにそのノードに含まれるデータを使ってヒストグラムを作っていると思われる。そのため、1種類のbinsだけで学習するのではなく色々な分割の仕方をした多様なbinsを使って評価している。また、根ノードから伸びていった子孫のノードは含まれるサンプルサイズもある程度減っているため、最初の分割は大雑把でも続く分割は徐々に近似誤差が小さい分割ができており、仮に根ノードでの分割でbinsにまとめたことによる誤差が多かったとしても、子孫のノードでその誤差を補うことができているのかもしれない。
  2. そもそもブースティング自体が弱学習器をたくさん使って強い学習器を作る技術なので、個別の弱学習器が多少さらに弱くなったとしても、少し反復数を増やせばその損失を補えるのかもしれない。

f:id:nigimitama:20210116211117p:plain

LightGBMが新たに提案した技術

背景

histogram-basedのGBDTは、ヒストグラムの作成にO(#data × #feature)かかり、分岐点の探索にO(#bin × #feature)かかる。つまり、計算量の大部分はヒストグラムの作成が占めている。

そこでLightGBMでは、#dataを減らすためにGOSSという技術を導入し、#featureを減らすためにEFBという技術を使う。

Gradient-based One-Side Sampling(GOSS)

サンプルサイズを減らす際、もっともシンプルなのがランダムサンプリングすることである。Stochastic Gradient Boosting(Friedman, 2002)は決定木を作る際にランダムサンプリングした部分サンプルを使って学習していたが、ランダムサンプリングは近似精度がそんなに良いわけではないので、この方法は一般には予測精度を下げる。

AdaBoostをベースにしているならデータインスタンスに対する重みを持っているので、それを使って適応的にサンプリングすることができてより効果的だが、GBDTには明示的な重みはないので、この方法を直接適用することはできない。

GBDTの勾配はAdaBoostの重みに相当するもので、インスタンスの重要度の指標に使うことができる(勾配が小さい=訓練誤差が小さい=十分に予測できている=学習済み)。そこでGOSSは勾配の情報を使ってサンプリングを行う。

勾配が小さいサンプルをすべて削除するような処理にしてしまうとデータの分布が変化して予測精度に悪影響が出るため、GOSSは大きい勾配を持つインスタンスは残して、小さい勾配を持つインスタンスに対してはランダムサンプリングを行う

具体的には、GOSSはまずデータインスタンスを勾配の絶対値に従って並び替え、上位$a\times 100\%$のインスタンスを選ぶ。そして残りのデータから$b \times 100\%$のインスタンスをランダムにサンプリングする。その後、情報利得の計算の際にサンプリングされた小さい勾配のデータを定数$\frac{1-a}{b}$で増幅する(勾配が少ないインスタンスの勾配の総和がサンプリング前と同じになるように増やしている)。

f:id:nigimitama:20210116211203p:plain

これは多くの場合で近似精度がランダムサンプリングより優れている(詳しくは論文の3.2節の理論解析を参照)。

Exclusive Feature Bundling(EFB)

特徴量の次元数が大きいデータセットは、スパースであることが多い。スパースなデータは情報を損失することなく次元削減できる可能性がある。

具体的には、相互に排他的な(同時にゼロでない値をとることが無い)特徴量を1つの特徴量にまとめることで、安全に次元数を減らすことができる。

これは2つのステップからなり、まずGreedy Bundlingというアルゴリズムでは、値の衝突(conflict:同時に非ゼロの値をとること)の回数が一定の閾値以下であれば同じbundle(特徴量の束)に追加していく。つぎに、Merge Exclusive Featuresというアルゴリズムで、同じbundleに含まれる特徴量を、それぞれの値が取る範囲がかぶらないように範囲をシフトさせつつ一つの特徴量にまとめる。例えば、[0, 10)の値をとる特徴量と[0, 20)の値をとる特徴量がある場合、後者に10を足して[10, 30)にして両者をマージする。

f:id:nigimitama:20210116211308p:plain

実験結果

5つのオープンなデータセットで比較を行い、GOSSとEFBを使うことで6~21倍高速化していること、そんな高速化をしてもXGBoostでexact greedy algorithmに匹敵する(というか少し上回る)精度を出していることを示した。

また、SGBとGOSSの精度を同じサンプリング比率の下で比べた時は常にGOSSのほうが少し精度が高くなっており、「ランダムサンプリングよりも勾配の情報を使ってサンプリングしたほうがサンプリングの近似精度が良い」という理論と整合的な結果も出ている。

f:id:nigimitama:20210116211341p:plain

参考

Ke, G., Meng, Q., Finley, T., Wang, T., Chen, W., Ma, W., ... & Liu, T. Y. (2017). Lightgbm: A highly efficient gradient boosting decision tree. In Advances in neural information processing systems (pp. 3146-3154).

Friedman, J. H. (2002). Stochastic gradient boosting. Computational statistics & data analysis, 38(4), 367-378.

Frank, E. (2000). Pruning decision trees and lists (Doctoral dissertation, University of Waikato).

Shi, H. (2007). Best-first decision tree learning (Doctoral dissertation, The University of Waikato).

Li, P., Wu, Q., & Burges, C. J. (2008). Mcrank: Learning to rank using multiple classification and gradient boosting. In Advances in neural information processing systems (pp. 897-904).

LightGBMの「No further splits with positive gain」というwarningの意味

LightGBM関連の論文やソースコードを読んでいてわかったことをメモ

概要

LightGBMを使っているとたまに

No further splits with positive gain, best gain: -inf

のようなwarningが表示されることがある。

これはLightGBMが内部で決定木を成長させている際にpre-pruning(特徴空間をそれ以上分割しても情報利得が得られないので分割を停止する仕組み)が働いたことを示している1

ソースコード的には https://github.com/microsoft/LightGBM/blob/master/src/treelearner/serial_tree_learner.cpp#L197-L201 などにある。

// cannot split, quit
if (best_leaf_SplitInfo.gain <= 0.0) {
  Log::Warning("No further splits with positive gain, best gain: %f", best_leaf_SplitInfo.gain);
  break;
}

もう少し詳しく

決定木の成長アルゴリズム

決定木を学習させる際に「どの枝から伸ばすか」についてのアルゴリズムは主に2種類ある(Shi, 2007)。

  1. depth-first (level-wise) :順番は固定で、通常左から右の順で伸ばす。
  2. best-first (leaf-wise) :もっとも利得がある(分岐することで不純度・予測誤差を下げる)枝から伸ばす

一般的に使われるものはdepth-firstだが、LightGBMはbest-firstを使っている。

これらは枝を伸ばす順番が違うだけで、木を完全に成長させた場合(終端ノードの不純度がゼロになるまで分岐させた場合)は同じ木になる。

ただし、best-firstだと決定木の剪定(pruning)のアルゴリズムの選択肢がdepth-firstに比べて増える

決定木の剪定アルゴリズム

剪定アルゴリズムは2種類ある

  1. pre-pruning:さらなる分岐を行うことで予測誤差を下げるなら分岐し、下げないなら分岐しない
  2. post-pruning:まず決定木を完全に成長させ、次に最も予測誤差を下げる木のサイズを選択する

pre-pruningは本当に最適な木のサイズを発見できない可能性があるため精度が劣る可能性があるものの、計算がpost-pruningより少なくて済む。

LightGBMはpre-pruningを使っている。

本題

前述のソースコードは計算機と学習器に関するハイパーパラメータがデフォルト(device_type = cpu, tree_learner = serial, linear_learner = false)の場合に使われる決定木のコードで、コードとコメントから察するに最良の分岐を探索している箇所で、分岐による情報利得がゼロ以下なら打ち切るようになっている。

// Get a leaf with max split gain
int best_leaf = static_cast<int>(ArrayArgs<SplitInfo>::ArgMax(best_split_per_leaf_));
// Get split information for best leaf
const SplitInfo& best_leaf_SplitInfo = best_split_per_leaf_[best_leaf];
// cannot split, quit
if (best_leaf_SplitInfo.gain <= 0.0) {
  Log::Warning("No further splits with positive gain, best gain: %f", best_leaf_SplitInfo.gain);
  break;
}

今後は

No further splits with positive gain

のwarningが出たら「pre-pruningが行われたんだな」と考えて、min_samples_in_leaf などの別のハイパーパラメータを調整する手がかりにすればよいかもしれない(pruningさせるのとハイパーパラメータで分岐を止めるのとどちらが良いのかはわからないが)

参考文献

Ke, G., Meng, Q., Finley, T., Wang, T., Chen, W., Ma, W., ... & Liu, T. Y. (2017). Lightgbm: A highly efficient gradient boosting decision tree. In Advances in neural information processing systems (pp. 3146-3154).

Shi, H. (2007). Best-first decision tree learning (Doctoral dissertation, The University of Waikato).


  1. なぜ-infという極端な値になっているのかはわからなかった

scikit-learn Pipelineの基本の使い方

個人的に業務ではよく使うのでもっと多くの人に認知されてほしいという想いを込めてメモ

Pipelineとは

scikit-learnにはPipelineというclassがある。これは複数の前処理用クラスと予測モデルをまとめて一つのオブジェクトにすることができるもの。

例えば、StandardScalerで特徴量の標準化を行って線形回帰で学習/予測を行う場合、Pipelineを使わない場合は以下のようにStandardScalerとLinearRegressionで別々にfitやtransformを行わないといけない。

# データの用意
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
X, y = make_regression(random_state=0)
X_train, X_test, y_train, y_test = \
    train_test_split(X, y, random_state=0)
    
# Pipelineを使わない場合
scaler = StandardScaler()
estimator = LinearRegression()
## fit
X_train_ = scaler.fit_transform(X_train)
estimator.fit(X_train_, y_train)
## predict
X_test_ = scaler.transform(X_test)
y_pred = estimator.predict(X_test_)

この処理を実際のアプリにデプロイすることを考えると、fit済みのscalerとestimatorをそれぞれ保存・管理してアプリ上に展開しないといけない。

今回はscalerが一つなのでまだいいが、これが数十個になると非常に面倒であり、そういう複雑なコードはバグのもとになる。

一方、Pipelineを使えば同様の処理は以下のように書くことができる。

from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
X, y = make_regression(random_state=0)
X_train, X_test, y_train, y_test = \
    train_test_split(X, y, random_state=0)

# pipelineを使う場合
pipe = Pipeline(steps=[
    ('scaler', StandardScaler()),
    ('estimator', LinearRegression())
])
pipe.fit(X_train, y_train)
y_pred_ = pipe.predict(X_test)
# もちろん同じ結果が得られる
all(y_pred == y_pred_)  # True

自作のTransformerを作る

自作の前処理用クラスを作るにはどうしたらよいのだろうか。

基底クラスを定義する

StandardScalerなどのソースコードを見ると

class StandardScaler(TransformerMixin, BaseEstimator):
    ...

というようにTransformerMixin, BaseEstimatorを継承している。

TransformerMixinfit_transformしかメソッドを持っていない一方、Transformerはfittransformも持たないといけないので、そちらは自分で定義してやる必要がある。

なのでこの記事に書いてあるように

from sklearn.base import BaseEstimator, TransformerMixin

class BaseTransformer(BaseEstimator, TransformerMixin):

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return self

のようなクラスを定義してやれば使いやすい基底クラスになるはず。

自作のTransformerを定義する

上で定義した基底クラスを継承して、やりたい処理を書けば自作のTransformerの完成。

class Double(BaseTransformer):
    
    def transform(self, X):
        return X*2

データの生成からPipelineでの利用まで全体をコードに書くなら、以下のような感じ。

from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin

class BaseTransformer(BaseEstimator, TransformerMixin):

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return self


class Double(BaseTransformer):
    
    def transform(self, X):
        return X*2

    
X, y = make_regression(random_state=0)
X_train, X_test, y_train, y_test = \
    train_test_split(X, y, random_state=0)

# pipelineを使う場合
pipe = Pipeline(steps=[
    ('double', Double()),
    ('estimator', LinearRegression())
])
pipe.fit(X_train, y_train)
pipe.predict(X_test)

memory:fitの再計算を回避する

transformerのfitに多くの計算量を要する場合、fitしたtransformerをキャッシュすることができる。引数memoryディレクトリのパスの文字列かjoblib.Memoryオブジェクトを入れることで、そちらにキャッシュされる。

6.1.1.3. Caching transformers: avoid repeated computation

from tempfile import mkdtemp
from shutil import rmtree
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
estimators = [('reduce_dim', PCA()), ('clf', SVC())]
cachedir = mkdtemp()
pipe = Pipeline(estimators, memory=cachedir)

# Clear the cache directory when you don't need it anymore
rmtree(cachedir)

参考

sklearn.pipeline.Pipeline — scikit-learn 0.23.2 documentation

6.1. Pipelines and composite estimators — scikit-learn 0.23.2 documentation

sklearnのpipelineに自分で定義した関数を流し込む - Qiita

データの前処理で並列処理を使う

pythonのmultiprocessingパッケージについてのメモ。

docs.python.org

コード例

こう書けば良いんじゃないかな、と思った実装例を載せていきます。

以下では説明の簡単のために'x'というカラムを2倍にするだけのdoubleという関数を並列処理にしていますが、実際にはWeb APIを呼び出すときに使ったりします。

ProcessクラスとPipeクラスを使う場合

処理するデータを明示的に分割して別々のプロセスで処理する場合。

import pandas as pd
from multiprocessing import Process, Pipe


def double(conn) -> None:
    '''親プロセスからデータを受け取って処理して送り返す'''
    records = conn.recv()
    records = map(_double, records)
    conn.send(records)
    conn.close()
    return None


def _double(record: dict) -> dict:
    '''xというカラムを2倍にする'''
    record['x'] = record['x'] * 2
    return record


def multipocess(df: pd.DataFrame, n_workers: int) -> pd.DataFrame:
    # DataFrameを送ることはできないのでdictのListにする
    records = df.to_dict('records')

    # データを分割
    size = len(records) // n_workers + 1
    records_list = _each_slice(records, size)

    parent_conns = [None] * n_workers
    child_conns = [None] * n_workers
    processes = [None] * n_workers
    for i in range(n_workers):
        # 子プロセスにデータを送るためのPipeを作成
        parent_conns[i], child_conns[i] = Pipe()
        # 子プロセスを作成
        processes[i] = Process(
            target=double,
            args=(child_conns[i],)
        )
        # 子プロセスにデータを送る
        parent_conns[i].send(records_list[i])
        # 子プロセスの処理を開始
        processes[i].start()

    results = [None] * n_workers
    for i in range(n_workers):
        # 子プロセスから結果を受け取り
        results[i] = parent_conns[i].recv()
        # 子プロセスをjoin
        processes[i].join(timeout=10)

    df_list = [pd.DataFrame(res) for res in results]
    return pd.concat(df_list).reset_index(drop=True)


def _each_slice(arr: list, size: int) -> list:
    '''listを要素数sizeのリストに分割する'''
    return [arr[i:i + size] for i in range(0, len(arr), size)]


if __name__ == '__main__':
    # データを用意
    df = pd.DataFrame({'x': list(range(10))})
    # 並列処理
    df = multipocess(df, n_workers=4)
    print(df)

結果

    x
0   0
1   2
2   4
3   6
4   8
5  10
6  12
7  14
8  16
9  18

Poolクラスを使う場合

データを分けて複数のプロセスを管理して‥という作業をPoolクラスに任せる場合。

こっちのほうが楽です。

ただ、使えない環境もあります。AWS Lambdaではプロセスの共有メモリがサポートされていないのでPoolクラスはエラーを起こします。その場合は面倒ですが前節のPipeクラスを使う必要があります1

import pandas as pd
from multiprocessing import Pool


def double(record: dict) -> dict:
    record['x'] = record['x'] * 2
    return record


if __name__ == '__main__':
    # データを用意
    df = pd.DataFrame({'x': list(range(10))})
    # DataFrameを送ることはできないのでdictのListにする
    records = df.to_dict('records')

    # 並列処理
    n_workers = 4
    with Pool(n_workers) as pool:
        results = pool.map(double, records)

    df = pd.DataFrame(results)
    print(df)

結果

    x
0   0
1   2
2   4
3   6
4   8
5  10
6  12
7  14
8  16
9  18

なぜmultiprocessingなのか

ちなみに、なぜmultiprocessingなのかというと、pandasがmultithreadに非対応なので2、並列処理する関数の中でpandasも使えるという実装の自由度を考えるとデータ処理で使うにはmultiprocessingのほうが相性が良いのかなと思っています。pandasを使わないのであればmultithreadでいいかもしれません。


  1. Parallel Processing in Python with AWS Lambda | AWS Compute Blog

  2. Frequently Asked Questions (FAQ) — pandas 1.1.3 documentation。ここではver.0.11の話をしていますが、私は0.25.3時代に実際にmultithreadを試して上手く行かなかった経験があります。おそらく1.1.3現在も変わらないはず。

unittestでかかった時間を計測する

前にどこかで見かけたやり方をメモ。

unittestでは、各テストの前にはsetUp()が実行され、テストの後にtearDown()が実行される。

なのでその前後でtime()を呼んで、その差分を表示させる。

from time import time
import unittest

class TestStringMethods(unittest.TestCase):
    
    def setUp(self):
        self.time_begin = time()

    def tearDown(self):
        t = time() - self.time_begin
        print(f"{self.id()}: {t:.3f}s")
        
    def test_upper(self):
        self.assertEqual('foo'.upper(), 'FOO')


if __name__ == '__main__':
    unittest.main()

実行するとこんな感じになる。

 $  python unittest_time.py
__main__.TestStringMethods.test_upper: 0.000s
.
----------------------------------------------------------------------
Ran 1 test in 0.001s

OK