盆暗の学習記録

データサイエンス ,エンジニアリング,ビジネスについて日々学んだことの備忘録としていく予定です。初心者であり独学なので内容には誤りが含まれる可能性が大いにあります。

機械学習モデルを動かすWeb APIを作ってみる(1):APIの作成

e-stat APIとかRESAS APIとか,Web APIは便利ですよね。
今回はPythonAPIを作る方法をメモしておくことにします。
APIで提供する機能は「機械学習による予測」としておきます。

今回は

  1. APIの仕様を決めて
  2. データを取得し
  3. 予測モデルを作り
  4. APIをローカルで動作させる

ところまでを書きます。デプロイについては今後勉強して書きます。

要件を決める

作るものをざっくり決めておきます。

土地総合情報システム | 国土交通省のデータを使って,不動産の価格予測を行うシステムを作ることにします。

条件

  • 東京都の中古マンションのみを対象とする
  • リノベーションされた物件は対象外とする(築年数の持つ意味が変わるため)

APIに送信する特徴量

以下の特徴量を送ることにします。

特徴量 説明 データ型
address 市区町村レベルまでの住所 string
area 専有面積 int or float
building_year 竣工年 int or float

入力(request)/出力(response)

例えば予測のリクエストを

{
    "address": "東京都千代田区",
    "area": 30,
    "building_year": 2013
}

のようにして送ると,

{
    "status": "OK",
    "preidcted": 40000000
}

のような値を返すAPIとします。

予測モデルを作る

データの取得

土地総合情報システム | 国土交通省APIを使います。

import requests
import json
import pandas as pd
import os

url = "https://www.land.mlit.go.jp/webland/api/TradeListSearch"
# 東京都,2005Q3 ~ 2019Q3のデータ(DLに10分ほどかかるので注意)
payload = {"area": 13, "from": 20053, "to": 20193}
response = requests.get(url, params=payload)

data = json.loads(response.text)
df = pd.DataFrame(data["data"])

# 保存
os.mkdir("input")
df.to_csv("input/raw.csv", index=False)

基礎的な前処理

まず基礎的な前処理を行い,APIで受け取るデータと同様の状況にしていきます。

import pandas as pd

df = pd.read_csv("input/raw.csv")

# 使用するデータの選択 ----------------------------
# マンションのみを対象にする
is_mansion = df["Type"] == "中古マンション等"
df = df.loc[is_mansion, :]

# リノベーションされた物件は対象外とする
is_not_renovated = df["Renovation"] != "改装済"
df = df.loc[is_not_renovated, :]


# 列名変更 ----------------------------------------
df = df.rename(columns={"TradePrice": "price", "Area": "area"})


# 特徴量の生成 ------------------------------------

# 住所
df["address"] = df["Prefecture"] + df["Municipality"]


# 竣工年の和暦を西暦にする
years = df["BuildingYear"].str.extract(r"(?P<period>昭和|平成|令和)(?P<year>\d+)")
years["year"] = years["year"].astype(float)
years["period"] = years["period"].map({"昭和": 1925, "平成": 1988, "令和": 2019})
df["building_year"] = years["period"] + years["year"]


# apiが利用される場面を考えて四半期を月に変更
years = df["Period"].str.extract(r"(\d+)")[0]
zen2han = {"1": "1", "2": "2", "3": "3", "4": "4"}
quarters = df["Period"].str.extract(r"(\d四半期)")[0]\
    .str.replace("四半期", "").map(zen2han).astype(int)
months = (quarters * 3 - 2).astype(str)
df["trade_date"] = pd.to_datetime(years + "-" + months)


# 使用する変数の取り出し
cols = ["price", "address", "area", "building_year", "trade_date"]
df = df[cols].dropna()

# 保存 --------------------------------------------
df.to_csv("input/basic_data.csv", index=False)

こんな感じになります。

f:id:nigimitama:20200209144919p:plain

  • priceは目的変数
  • address, area, building_yearは予測のときはAPIの利用者が入力する
  • trade_dateは予測のときは「APIが利用された日」を使う(システムが作る特徴量)

という想定です。

前処理関数の定義

addressは文字列,trade_dateはdatetimeか文字列なので,このまま機械学習モデルに入れるわけにはいきません。

今回はLightGBMを使うので完全にカテゴリカル変数であるaddressはLightGBM内でcategoricalにすればいいとしても,trade_dateは順序があるカテゴリカル変数なので数値にしたいところです。

そんなちょっとした特徴量の加工をするクラスを定義します1

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np


class skPlumberBase(BaseEstimator, TransformerMixin):
    """Pipelineに入れられるTransformerのベース"""

    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return self


class Date2Int(skPlumberBase):

    def __init__(self, target_col):
        self.target_col = target_col

    def transform(self, X):
        """unix時間に変換する"""
        dates = pd.to_datetime(X[self.target_col]).astype(np.int64) / 10**9
        X[self.target_col] = dates.astype(int)
        return X


class ToCategorical(skPlumberBase):
    """LightGBMにcategoryだと認識させるため,
    カテゴリカル変数をpandas category型にする
    """

    def __init__(self, target_col):
        self.target_col = target_col

    def transform(self, X):
        X[self.target_col] = X[self.target_col].astype("category")
        return X

Date2Intは

0         2019-07-01
1         2018-10-01
2         2018-07-01
3         2018-04-01
4         2018-04-01
             ...    
137548    2008-01-01
137549    2007-10-01
137550    2007-10-01
137551    2007-07-01
137552    2007-04-01
Name: trade_date, Length: 137553, dtype: object

0         1561939200
1         1538352000
2         1530403200
3         1522540800
4         1522540800
             ...    
137548    1199145600
137549    1191196800
137550    1191196800
137551    1183248000
137552    1175385600
Name: trade_date, Length: 137553, dtype: int32

のようにするものです。

unix時間にすると桁数が増えて使用メモリが増えて効率的ではない気がしますが,あくまで例ということで…

学習

前処理して,学習して,前処理パイプラインと予測モデルをpickleで保存します。

from sklearn.pipeline import Pipeline
from pipeline import Date2Int, ToCategorical
import pandas as pd
import pickle
import lightgbm as lgb
from sklearn.model_selection import train_test_split

# データ読み込み
df = pd.read_csv("input/basic_data.csv")
y = df["price"]
X = df.drop("price", axis=1)

# 前処理パイプラインの定義
preprocess = Pipeline(steps=[
    ("date_to_int", Date2Int(target_col="trade_date")),
    ("to_category", ToCategorical(target_col="address"))
], verbose=True)

# 前処理
X = preprocess.transform(X)

# データを分割
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)

# 学習
params = {
    "n_estimators": 100_000,
    "min_child_samples": 15,
    "max_depth": 4,
    "colsample_bytree": 0.7,
    "random_state": 42
}
model = lgb.LGBMRegressor(**params)
model.fit(X_train, y_train,
          eval_metric="rmse",
          eval_set=[(X_test, y_test)],
          early_stopping_rounds=100)
print("best scores:", dict(model.best_score_["valid_0"]))

# 保存
pickle.dump(preprocess, open("preprocess.pkl", "wb"))
pickle.dump(model, open("model.pkl", "wb"))

testデータに対するRMSEが1950万円くらいあります。これほど少ない特徴量だとさすがにひどい精度になりますね。

best scores: {'rmse': 19500026.074094355, 'l2': 380251016890359.75}

APIを作る

テストを書く

作るものを決める

これから作るAPIは,予測したい物件の特徴量を

{
    "address": "東京都千代田区",
    "area": 30,
    "building_year": 2013
}

のようにJSONにして送ると,

{
    "status": "OK",
    "preidcted": 40000000
}

のようなJSONの値を返すAPIとします。

テストを書く

import unittest
import requests
import json


class APITest(unittest.TestCase):
    URL = "http://localhost:5000/api/predict"
    DATA = {
        "address": "東京都千代田区",
        "area": 30,
        "building_year": 2013
    }

    def test_normal_input(self):
        # リクエストを投げる
        response = requests.post(self.URL, json=self.DATA)
        # 結果
        print(response.text)  # 本来は不要だが,確認用
        result = json.loads(response.text)  # JSONをdictに変換
        # ステータスコードが200かどうか
        self.assertEqual(response.status_code, 200)
        # statusはOKかどうか
        self.assertEqual(result["status"], "OK")
        # 非負の予測値があるかどうか
        self.assertTrue(0 <= result["predicted"])


if __name__ == "__main__":
    unittest.main()

※あくまで例です。実際にちゃんと作るときはもっと沢山(正常系だけでなく異常系も)テストケースを作ります。

Flaskでアプリを作成

FlaskPythonの軽量なWebフレームワーク(Webアプリを簡単に作ることができるライブラリ)で,極めて少ないコード量でアプリを作ることができます。

機械学習モデルを動かすだけ」みたいな単純な動作をするAPIには最適なフレームワークです。

以下のように書いていきます。

from flask import Flask, request, jsonify, abort
import pandas as pd
import pickle
from datetime import datetime
import sys
sys.path.append("./model")  # 前処理で使った自作モジュール「pipeline」を読み込むためPYTHONPATHに追加
app = Flask(__name__)

# アプリ起動時に前処理パイプラインと予測モデルを読み込んでおく
preprocess = pickle.load(open("model/preprocess.pkl", "rb"))
model = pickle.load(open("model/model.pkl", "rb"))


@app.route('/api/predict', methods=["POST"])
def predict():
    """/api/predict にPOSTリクエストされたら予測値を返す関数"""
    try:
        # APIにJSON形式で送信された特徴量
        X = pd.DataFrame(request.json, index=[0])
        # 特徴量を追加
        X["trade_date"] = datetime.now()
        # 前処理
        X = preprocess.transform(X)
        # 予測
        y_pred = model.predict(X, num_iteration=model.best_iteration_)
        response = {"status": "OK", "predicted": y_pred[0]}
        return jsonify(response), 200
    except Exception as e:
        print(e)  # デバッグ用
        abort(400)


@app.errorhandler(400)
def error_handler(error):
    """abort(400) した時のレスポンス"""
    response = {"status": "Error", "message": "Invalid Parameters"}
    return jsonify(response), error.code


if __name__ == "__main__":
    app.run(debug=True)  # 開発用サーバーの起動

前節で書いたテストを走らせるとこうなります

$ python3 api_test.py 
{
  "predicted": 45833222.1903707, 
  "status": "OK"
}

.
----------------------------------------------------------------------
Ran 1 test in 0.015s

OK

期待通り,predictedとstatusが返っているようです。

GitHubリポジトリ

今回のコードをまとめたGitHubリポジトリはこちらになります:

github.com

[Python]リストやデータフレームを任意の要素数で分割する

仕事でちょくちょく必要になって、そのたびに考えたり調べたりしていたのでメモ

問題

長さ(行数) Nのリスト(データフレーム)を、要素数(行数) K個ずつの塊に分割する。

最後の塊に分割する際に K個未満になった場合は、その要素数で分割する。 f:id:nigimitama:20200125110520p:plain

リストの場合

# データの用意
lis = [1, 2, 3, 4, 5]
k = 2
n = len(lis)

インデックスだけ欲しい場合

# インデックスだけ欲しい場合
[(i, i+k) for i in range(0, n, k)]
[(0, 2), (2, 4), (4, 6)]

スライスしたい場合

# リストをスライスしたい場合
[lis[i:i+k] for i in range(0, n, k)]
[[1, 2], [3, 4], [5]]

データフレームの場合

リストとはスライスのときに必要となるインデックスが変わってくる

# データの用意
import pandas as pd
df = pd.DataFrame({"a": [1, 2, 3, 4, 5], "b": ["a","b","c","d","e"]})
k = 2
n = df.shape[0]

インデックスだけ欲しい場合

# インデックスだけ欲しい場合
[(i, i+k-1) for i in range(0, n, k)]
[(0, 1), (2, 3), (4, 5)]

データフレームをスライスしたい場合

# データフレームをスライスしたい場合
dfs = [df.loc[i:i+k-1, :] for i in range(0, n, k)]
for df_i in dfs:
    display(df_i)  # NOTE: display()はjupyter内でのみ有効

f:id:nigimitama:20200125105554p:plain

もうちょっとしっかり書く場合

# 表側が順番通りの整数でないデータフレームにも対応した場合
def slice_df(df: pd.DataFrame, size: int) -> list:
    """pandas.DataFrameを行数sizeずつにスライスしてリストに入れて返す"""
    previous_index = list(df.index)
    df = df.reset_index(drop=True)
    n = df.shape[0]
    list_indices = [(i, i+size) for i in range(0, n, size)]
    df_indices = [(i, i+size-1) for i in range(0, n, size)]
    sliced_dfs = []
    for i in range(len(df_indices)):
        begin_i, end_i = df_indices[i][0], df_indices[i][1]
        begin_l, end_l = list_indices[i][0], list_indices[i][1]
        df_i = df.loc[begin_i:end_i, :]
        df_i.index = previous_index[begin_l:end_l]
        sliced_dfs += [df_i]
    return sliced_dfs
df.index = ["A","B","C","D","E"]
dfs = slice_df(df, size=2)
for df_i in dfs:
    display(df_i)  # NOTE: display()はjupyter内でのみ有効

f:id:nigimitama:20200125105753p:plain

どの程度sparseだと疎行列ライブラリで計算が速くなるのか

気になったので試してみました。

TL; DR

Objective

「疎行列クラスの計算速度向上効果は、データがどの程度疎(sparse)になったら発揮されるのか」について調べた。

Methods & Data

0か1の要素からなるデータを作り、その計算時間を

  1. 和の演算
  2. 積の演算
  3. XGBoostの学習
  4. LightGBMの学習

の4つのタスクについて計測した。

Findings

  1. 行列の和・積の演算は、非ゼロ要素の割合がどの程度であっても(1%でも100%でも)np.arrayよりcsr_matrixやcsc_matrixのほうが高速であった。
  2. XGBoostは非ゼロ要素の割合が約90%以下であれば疎行列クラスの使用による学習時間向上がみられた。
  3. LightGBMは非ゼロ要素の割合がどの程度であっても、疎行列クラスによる学習時間の向上はみられなかった。
  4. データがsparseでない状況で疎行列クラスを使用することで計算速度が低下するようなことはなかった。

なお、今回使用したコードです。

github.com

はじめに

疎行列とは

疎行列(sparse matrix)とは、成分のほとんどがゼロである行列のこと。

この疎行列のようなゼロの多いデータを効率よく格納するためのクラスがscipyなどのライブラリに実装されています。

今回試すこと

疎行列クラスでデータを保持すると、非ゼロ要素のデータしか持たないため、メモリ使用量の観点では極めて効率がよくなります。

また、計算速度も高くなると言われています。

今回は「どの程度非ゼロ要素が少なくなると疎行列ライブラリによる計算速度向上の効果が出るのか」について試してみます。

方法

環境

データ

0.0か1.0が確率pで現れる二項分布を使って任意の密度の行列を作ります。

def gen_dataset(nrow, ncol, p):
    x = np.random.binomial(n=1, p=p, size=nrow*ncol)
    X = x.reshape(nrow, ncol).astype(np.float16)
    return X

行列クラス

以下の4つを比較します。

1. numpy.array()

疎行列ではない行列クラス

2. scipy.sparse.lil_matrix

疎行列。リストの中に非ゼロ要素の列インデックスと値を保持したリストを入れて保持する(List of lists: LIL)方式。

3. scipy.sparse.csr_matrix

圧縮行格納方式(Compressed Sparse Row: CSR)といって、非ゼロ要素の行インデックス情報を圧縮して保持する。

これについてはWikipediaの説明がわかりやすい(疎行列 - Wikipedia

4. scipy.sparse.csc_matrix

圧縮列格納方式(Compressed Sparse Column)。CSRの列バージョン。

検証1:和の演算

方法

  • 同じ大きさの正方行列 A,Bを作ってA + Bします。
  • 演算を3回繰り返して平均処理時間で評価します。
  • 非ゼロ要素の割合を[0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]に変化させつつ演算の処理時間を計測していきます。
  • 行列のサイズに依存して結果が変わるかもしれないので、行数・列数を[100, 1000, 10000]に変えて試します。
import pandas as pd
import timeit
import numpy as np
from scipy.sparse import lil_matrix, csr_matrix, csc_matrix

def gen_dataset(nrow=100000, ncol=100, p=0.5):
    x = np.random.binomial(n=1, p=p, size=nrow*ncol)
    X = x.reshape(nrow, ncol).astype(np.float16)
    return X

results = []
for nrow in [100, 1000, 10000]:
    ncol = nrow
    for p in [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]:
        print(f"# nrow={nrow}, ncol={ncol}, p={p} "+"-"*50)
        A_np = gen_dataset(nrow=nrow, ncol=ncol, p=p)
        B_np = gen_dataset(nrow=nrow, ncol=ncol, p=p)
        for matrix in ["np", "lil", "csr", "csc"]:
            if matrix == "lil":
                A = lil_matrix(A_np)
                B = lil_matrix(B_np)
            elif matrix == "csr":
                A = csr_matrix(A_np)
                B = csr_matrix(B_np)
            elif matrix == "csc":
                A = csc_matrix(A_np)
                B = csc_matrix(B_np)
            else:
                A = A_np
                B = B_np
            t = timeit.Timer('A + B', globals=globals())
            fit_times = t.repeat(repeat=3, number=1)
            result = {"nrow": nrow, "ncol": ncol, "p": p, "matrix": matrix, 
                            "mean": np.mean(fit_times), "std": np.std(fit_times)}
            results.append(result)
results = pd.DataFrame(results)

結果

f:id:nigimitama:20191208214336p:plain

lil_matrixは公式ドキュメントでも「lil + lilの演算は遅い」と書かれているだけあって、遅いです。

lilを除いてグラフを再描画するとこうなります

f:id:nigimitama:20191208214345p:plain

そこそこの大きさの行列になってくると、CSCCSRがnp.arrayと大差をつけて非常に速いです。

そしてこの関係は密な(非ゼロ要素が多い)行列でも変わりませんでした。

疎行列でなくてもCSRCSCのほうがnp.arrayよりも高速に演算できるみたいです。(なんで?)

なお、CSRCSCで比べても大差はありませんでした。

f:id:nigimitama:20191208214256p:plain

検証2:積の演算

方法

  • 同じ大きさの正方行列 A,Bを作ってA.dot(B)します。
  • 演算を3回繰り返して平均処理時間で評価します。
  • 非ゼロ要素の割合を[0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]に変化させつつ演算の処理時間を計測していきます。
  • 行列のサイズに依存して結果が変わるかもしれないので、行数・列数を[100, 1000]に変えて試します。

結果

f:id:nigimitama:20191208214552p:plain

CSC = CSR < LIL << np.arrayという感じの順番になりました。

こちらもCSRCSCの結果は同程度でした。

f:id:nigimitama:20191208214633p:plain

検証3:XGBoostの学習

方法

  • fit()を10回繰り返して平均処理時間で評価します。
  • 非ゼロ要素の割合を[0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]に変化させつつ演算の処理時間を計測していきます。
  • 行列のサイズに依存して結果が変わるかもしれないので、行数を[100, 1000, 10000, 100000]と変化させ、列数は[10, 100]と変化させてすべての組み合わせについて試してみます。
  • lil_matrixには対応していないので、np.array, csr_matrix, csc_matrixの3種で比較します。
import pandas as pd
import timeit
import xgboost as xgb
import lightgbm as lgb
import numpy as np
from scipy.sparse import lil_matrix, csr_matrix, csc_matrix


def gen_dataset(nrow=100000, ncol=100, p=0.5):
    x = np.random.binomial(n=1, p=p, size=nrow*ncol)
    X = x.reshape(nrow, ncol).astype(np.float16)
    y = np.apply_along_axis(_func, 1, X)
    return X, y


def _func(row: np.array):
    return np.sum(row) + np.random.rand()

results = []
for nrow in [100, 1000, 10000, 100000]:
    for ncol in [10, 100]:
        for p in [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]:
            print(f"# nrow={nrow}, ncol={ncol}, p={p} "+"-"*50)
            X_np, y = gen_dataset(nrow=nrow, ncol=ncol, p=p)
            for algorithm in ["xgb", "lgb"]:
                model = xgb.XGBRegressor(objective="reg:squarederror") if algorithm == "xgb" else lgb.LGBMRegressor()
                for matrix in ["np", "csr", "csc"]:
                    if matrix == "csr":
                        X = csr_matrix(X_np)
                    elif matrix == "csc":
                        X = csc_matrix(X_np)
                    else:
                        X = X_np
                    t = timeit.Timer('model.fit(X, y)', globals=globals())
                    fit_times = t.repeat(repeat=3, number=1)
                    result = {"nrow": nrow, "ncol": ncol, "p": p, "matrix": matrix, "algorithm": algorithm,
                              "mean": np.mean(fit_times), "std": np.std(fit_times)}
                    results.append(result)
results = pd.DataFrame(results)

結果

非ゼロ要素の割合が少なくなる(よりsparseになる)ほど、CSRCSCのほうがnp.arrayよりもずっと高速に計算できるようです。

そして、非ゼロ要素の割合が90%くらいであっても、CSCCSRを使ったほうが高速であるようです。

f:id:nigimitama:20191208215327p:plain

検証4:LightGBMの学習

方法

XGBoostと同じです。

結果

行数・列数、非ゼロ要素の割合を変えてみても、np.array、csrcscは同程度の計算時間であり、疎行列クラスの計算速度向上はみられませんでした

なぜなのかはわかりません・・・

f:id:nigimitama:20191208214850p:plain

おわりに

まとめると

  1. CSRCSCはnp.arrayより和・積の演算が高速
  2. XGBoostは非ゼロ要素が約90%以下であればCSCCSRを使ったほうがいい
  3. LightGBMは疎行列クラスを使っても計算速度の向上は期待できなさそう1
  4. ゼロ要素の少ないデータ(密なデータ)であっても、疎行列クラスを使って計算が遅くなることはない

という感じでしょうか。

CSRCSCのデメリットは、scipyのドキュメントを見ると、圧縮している行/列方向へのスライス操作が遅くなることや構造を変更することであるようなので、計算上はメリットが多くてデメリットはほぼ無いんじゃないか、という印象です。

前処理では使わないほうがいいでしょうが、機械学習アルゴリズムに投入する段階になったらCSRCSCに変換してから突っ込めば疎行列クラスの恩恵をうまく受けられるかもしれません。

  1. データに10%以上ゼロ要素がある
  2. 使おうとしている機械学習アルゴリズムが疎行列クラスを受け入れている(疎行列用の高効率な演算を実装している)

という状況であれば疎行列クラスを試してみるのはアリだと思います。


  1. とはいえ、sparseなデータであれば疎行列クラスのほうがメモリ効率は良いはずなので「どんなに疎なデータであってもnp.arrayを使えばいい」というわけではない

cronでプログラムを定期的に自動実行させる

かなり詰まったのでメモ。

環境

cron

cronの設定方法は2つある

  • crontabコマンドを使う方法
  • /env/cron.dにcrontabファイルを置いておく方法

crontabコマンドを使う方法

概要

この方法のメリット

  • 簡単で、管理者でなくても設定できる

この方法のデメリット

  • ユーザーごとに単一のcrontabファイルを使うため、もし誤ってcrontab -rで削除してしまうと全部消える(人によってはこちらは非推奨とするらしい)

コマンド

# crontabを閲覧
crontab -l
# crontabを編集
crontab -e

設定方法

# m h  dom mon dow   command
分 時 日 月 曜日  コマンド

例えば次のような感じに書く。

# 全部* → 1分ごと
* *    * * *  [command]

# 毎時0分に実行
0 *    * * *  [command]

# 毎日0:00に実行
0 0    * * *  [command]

また、5つの*の部分の代わりに@monthlyなどの書き方をしても良い (Ubuntu Manpage: crontab - cron を駆動するための一覧表

cronの起動

sudo service cron start

/etc/cron.dに置く方法

概要

この方法のメリット

  • タスクのジャンルごとにcrontabファイルを分けられるので、本格的な使用に向く
  • あやまって削除してしまうなんてことは起こりにくい

この方法のデメリット

  • 管理者権限で操作する必要がある

crontabファイルの準備

/etc/crontabのフォーマットを使うのが楽なので、/etc/cron.dというディレクトリにコピーして編集する。

# test_cronという名前でコピーする場合
cp /etc/crontab /etc/cron.d/test_cron

この際、ファイル名に拡張子をつけない(.が含まれるファイルは実行されないらしい)

設定方法

crontabコマンドを使う場合と少し異なり、userのフィールドがある。

# m h dom mon dow user  command
分 時 日 月 曜日 ユーザー コマンド

例えば次のように書く。

# m h dom mon dow user  command
* *     * * *   user   echo "test" >> ~/test.log

私が詰まったポイント

私の場合、userをrootにしていたときはcronが動作しなかった。

自分がログイン中のユーザー名にしたところ、ちゃんと動作するようになった。

cronの起動

sudo service cron start

cronの再読み込み

cronデーモンは/etc/cron.d内のファイルや/etc/crontabの変更を監視しており、変更はすぐ再読み込みしてくれるはずだが、手動でやる場合は以下のようなコマンドが使える。

# cronの再起動
sudo service cron restart
# cronの状態を確認する
sudo service cron status

crontabの設定を楽にするサイト

f:id:nigimitama:20191110182341p:plain

f:id:nigimitama:20191110195920p:plain

参考になるサイト

cron力をつけよう!全てのcrontab入門者に贈る9個のテクニック · DQNEO起業日記

↑実践的な話でとてもよかった

追記:cronに環境変数を渡す

cronは通常のbashとは異なる環境変数の下で動いている。そのため、Dockerのコンテナに与えた環境変数などがcronのジョブ内で使えない問題が発生する。

そこで、cronを設定する時に

declare -p | grep '\-x' > /cron.env

みたいな感じに必要な環境変数をファイルに書き出しておいて、cronの設定ファイル内で

BASH_ENV=/cron.env
DIR=/path/to/workdir
# m h dom mon dow user command
* * * * *   root    /bin/bash "${DIR}/main.sh" > "${DIR}/cron.log" 2>&1

のようにして読み込むことで環境変数を渡すことができる。

参考

cron — CronジョブからDockerセットの環境変数にアクセスするにはどうすればよいですか

スパースなデータをXGBoost.DMatrixに入れるときはpd.DataFrame/np.arrayを使ってはいけない

XGBoostにはDMatrixという独自のデータ保持用クラスがあります。Documentの説明では

optimized for both memory efficiency and training speed

と書いてあり、私は「自動で疎行列クラスとかにしてくれるんだろうなぁ」と思っていたのですが、そうでもない様子なのでメモしておきます。

以下では、「core.pyを読む」の節でxgboostのpythonで書かれた部分について調べた結果を述べ、それ以降の節ではC++で書かれた部分についてコードを動かすことで中身を推測していきます(私はC++が読めないため)。

(ここでは色々事情があってスパースなデータを使う場合のことを考えますが、決定木系のアルゴリズムならカテゴリカル変数はOne Hot Encoding(≒ダミー変数)ではなく、まずLabel Encodingを検討すべきです)

core.pyを読む

DMatrixクラスはpython-package/xgboost/core.pyの321行目あたりで定義されています。

f:id:nigimitama:20191106190007p:plain

DMatrixの__init__メソッドではデータのオブジェクトをdataで受け取るのですが、わりとすぐに_maybe_pandas_dataというメソッドに通しています。

# xgboost/core.py line 378
data, feature_names, feature_types = _maybe_pandas_data(data,
                                                        feature_names,
                                                        feature_types)

_maybe_pandas_data226行目あたりで定義されていて、pandasがインストール済みで、かつdataがDataFrameである場合にpd.DataFrameをnp.arrayに変換しています。

# xgboost/core.py line 226
def _maybe_pandas_data(data, feature_names, feature_types):
    """ Extract internal data from pd.DataFrame for DMatrix data """
    if not (PANDAS_INSTALLED and isinstance(data, DataFrame)):
        return data, feature_names, feature_types

    # (中略)
    
    data = data.values.astype('float')
    return data, feature_names, feature_types

DMatrix.__init__ではその下を見ていくと、dataのクラスに応じて別のinitメソッドを呼ぶ条件分岐があります。

scipy.sparse系の疎行列クラスであれば_init_from_csr_init_from_cscが呼ばれ、pd.DataFrameやnp.arrayであれば_init_from_npy2dが呼ばれることになります。

# xgboost/core.py line 393
if isinstance(data, (STRING_TYPES, os_PathLike)):
    handle = ctypes.c_void_p()
    _check_call(_LIB.XGDMatrixCreateFromFile(c_str(os_fspath(data)),
                                             ctypes.c_int(silent),
                                             ctypes.byref(handle)))
    self.handle = handle
elif isinstance(data, scipy.sparse.csr_matrix):
    self._init_from_csr(data)
elif isinstance(data, scipy.sparse.csc_matrix):
    self._init_from_csc(data)
elif isinstance(data, np.ndarray):
    self._init_from_npy2d(data, missing, nthread)
elif isinstance(data, DataTable):
    self._init_from_dt(data, nthread)
else:
    try:
        csr = scipy.sparse.csr_matrix(data)
        self._init_from_csr(csr)
    except:
        raise TypeError('can not initialize DMatrix from'
                        ' {}'.format(type(data).__name__))

小まとめ

_init_from_npy2dなどの中身はC++のメソッドを呼んでいて、そこから先はC++が読めない私ではわからない領域なのですが、ここまでのコードを読む限り、

(少なくともpython側では)疎行列クラスでDMatrixに投入しない限り、pd.DataFrameやnp.arrayが自動で疎行列クラスに変換されることはない

という事がわかります。

このことが分析上どう影響するかについては次節で調べてみます。

メモリ使用量の違い

私はc++で書かれている処理を読むことができないため、ここからはコードを動かしてみて 「pd.DataFrameからDMatrixオブジェクトを作った場合のメモリ使用量」と 「疎行列クラスからDMatrixオブジェクトを作った場合のメモリ使用量」 を比較して、間接的に両者の違いを見てみます。

実験方法

以下の方法で比較します。

  • 500,000行×1000列のスパースな特徴量を作る
  • DMatrixに投入して、元のデータオブジェクトを削除した時点のメモリ使用量で比較する

コード

import xgboost as xgb
import pandas as pd
import numpy as np
import psutil
import gc
import sys
from scipy.sparse import csr_matrix

# setting
nrows = 500_000
ncols = 1000
use_sparse = True if sys.argv[1] == "sparse" else False

# generate data
np.random.seed(0)
categories = pd.Series(np.random.randint(low=0, high=ncols, size=nrows))
X = pd.get_dummies(categories)

if use_sparse:
    X = csr_matrix(X)
else:
    # どうせ_init_from_npy2dでnp.float32のarrayに変換されるので、モニタリングしやすくするため予め変換しておく
    X = np.array(X, dtype=np.float32)

print(f"data size: {sys.getsizeof(X) / 1024**2:.2f} MB, shape: {X.shape}")
print(f"used memory before make DMatrix: {psutil.virtual_memory().used / 1024**3:.2f} GB")
dtrain = xgb.DMatrix(X)
del X
gc.collect()
print(f"used memory after delete X: {psutil.virtual_memory().used / 1024**3:.2f} GB")

結果

$ python3 DMatrix.py DataFrame
data size: 1907.35 MB, shape: (500000, 1000)
used memory before make DMatrix: 7.04 GB
used memory after delete X: 8.89 GB

$ python3 DMatrix.py sparse
data size: 0.00 MB, shape: (500000, 1000)
used memory before make DMatrix: 5.16 GB
used memory after delete X: 5.17 GB

考察

pd.DataFrame/np.arrayを使ったほうは、元のデータが約1.9GBあり、DMatrix変換後にメモリの使用量が約1.9GB増えています。DMatrixに投入したデータがそのままメモリに乗っかったような数値で推移しています。

つまり、python側からC++で書かれたメソッドにデータが渡った後も、特にnp.arrayを疎行列クラスに変換するような処理は行われていないと考えられます。

一方で疎行列クラスを使用した方は、DMatrixにデータを投じる前後でメモリ使用量がほとんど変化していません。

疎行列クラスのデータをDMatrixに投入すると、その先でも疎行列クラスとして受け取ってくれるのだと考えられます。

すなわち、スパースデータを使う場合は疎行列クラスに変換した後にDMatrixに投入すべきであると考えられます。

計算時間の違い

再びコードを動かしてみて 「pd.DataFrameから作ったDMatrixオブジェクトでの学習時間」と 「疎行列クラスから作ったDMatrixオブジェクトでの学習時間」 を比較して、間接的に両者の違いを見てみます。

実験方法

以下の条件で行います。

  • 回帰問題
  • 100,000行×100列の二値変数のみの特徴量
  • 予測を100回繰り返して、平均的な計算時間を測る
    • その際の時間を比較する
  • 疎行列クラスはscipy.sparse.csr_matrixを使用

なお、計測環境は以下のとおりです

コード

import xgboost as xgb
import pandas as pd
import numpy as np
import sys
from scipy.sparse import csr_matrix
import timeit

# setting
nrows = 100_000
ncols = 100
use_sparse = True if sys.argv[1] == "sparse" else False

# generate data
np.random.seed(0)
categories = pd.Series(np.random.randint(low=0, high=ncols, size=nrows))
X = pd.get_dummies(categories)
w = np.random.uniform(size=ncols)
y = np.dot(X, w) + np.random.normal(size=nrows)

if use_sparse:
    X = csr_matrix(X)
print(f"type of X: {type(X)}")

print(f"data size: {sys.getsizeof(X) / 1024**2:.2f} MB, shape: {X.shape}")
dtrain = xgb.DMatrix(X, label=y)

param = {"tree_method": "exact"}
t = timeit.Timer("xgb.train(param, dtrain)", globals=globals())
results = t.repeat(repeat=100, number=1)
print(f"train time: {np.mean(results):.1f} ± {np.std(results):.3f} sec. (n={len(results)})")

結果

$ python3 performance.py DataFrame
type of X: <class 'pandas.core.frame.DataFrame'>
data size: 9.54 MB, shape: (100000, 100)
train time: 1.5 ± 0.237 sec. (n=100)

$ python3 performance.py sparse
type of X: <class 'scipy.sparse.csr.csr_matrix'>
data size: 0.00 MB, shape: (100000, 100)
train time: 0.1 ± 0.015 sec. (n=100)
  • DataFrameからDMatrixを作って学習した場合は平均1.5秒程度
  • 疎行列クラスからDMatrixを作って学習した場合は平均0.1秒程度

の学習時間になりました。

考察

疎行列クラスはスパースデータの計算が高速化されることが知られています。

DataFrameを使ってDMatrixを作った場合は、xgboostの内部で疎行列クラスに変換されるようなことが無いため、疎行列クラスからDMatrixを作った場合に比べて圧倒的に計算が遅いという結果になったのだと思われます。

計算時間の点からも、スパースデータを使う場合は疎行列クラスに変換した後にDMatrixに投入すべきであると考えられます。

まとめ

  • pd.DataFrameやnp.arrayのデータをxgboost.DMatrixに入れると、DMatrix.__init__()ですべての要素がnp.float32のnp.arrayに変換される。

    • これの何が問題か:pd.get_dummies()の直後はuint8だがfloat32になるとメモリ使用量が約4倍増加することになる →DataFrame時点ではわからないがひっそりと大量のメモリを食う
    • np.arrayに変換されたデータはその後C++で書かれたAPIにデータが渡った後も疎行列クラスなどに変換されている様子はなく、float32のarrayのままであると考えられる。
      • (スパースなデータの場合、無駄にメモリ使用量と計算時間が多くなる)
  • scipy.sparseの疎行列クラスのデータをDMatrixに入れた場合、妙な変換はされず、疎行列クラスの特性(省メモリ・高速な計算)を持ったままXGBoostの学習に入ることができる

→スパースなデータを使う場合は、疎行列クラスのインスタンスにした後にDMatrixに入れるべき

XGBoostで自作の目的関数を使う

XGBoostの素敵なポイントの一つは、自分で定義した関数を目的関数に使うことができる点です。

でもどういう関数にしたらよいのかがわからなくて過去に戸惑ったことがあるのでメモしておきます。(詳しいやり方はXGBoostのdocumentationに書いてあります)

定義すべき関数

XGBoostなどの勾配ブースティング決定木アルゴリズムでは誤差関数・目的関数を2種類使います。

  1. early stopping1用の誤差関数(metric)
  2. 個々の木の学習で使う目的関数(objective)

誤差関数(metric function)

metricは誤差二乗和(Sum of Squared Error: SSE)

や平均二乗誤差(Mean of Squared Error: MSE)のことです。他の機械学習アルゴリズムでもよく見かけるやつです。

XGBoostでは、予測値のarrayと実測値が入ったDMatrixを受け取り、文字列と誤差を評価した実数を返すような関数を定義します。誤差二乗和なら以下のような感じです。

import numpy as np
import xgboost as xgb

def my_sse(y_hat: np.array, dtrain: xgb.DMatrix) -> [str, float]:
    """custom metric: sum of squared error"""
    y = dtrain.get_label()
    N = y_hat.shape[0]
    error = np.sum((y_hat - y)**2)
    return "sse", float(error)

目的関数(objective function)

XGBoostでは、objective functionは予測値のarrayと実測値が入ったDMatrixを受け取ってmetricの1次の勾配と2次の勾配を返すような関数として実装します。

metricがSSEなら、目的関数の出発点は総和を取る前の二乗誤差(squared error)であり、微分後の式を簡単にするために1/2を掛けて

とすれば、1次の勾配と2次の勾配は

となります。

def my_squared_error(y_hat: np.array, dtrain: xgb.DMatrix) -> [np.array, np.array]:
    """custom objective: squared error"""
    y = dtrain.get_label()
    N = y_hat.shape[0]
    gradient = y_hat - y
    hessian = np.ones(N)
    return gradient, hessian

使用例

上記の自作squared errorが正しく動いているか検証してみます。

from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
import pandas as pd
import xgboost as xgb
import numpy as np


def my_sse(y_hat: np.array, dtrain: xgb.DMatrix) -> [str, float]:
    """custom metric: sum of squared error"""
    y = dtrain.get_label()
    N = y_hat.shape[0]
    error = np.sum((y_hat - y)**2)
    return "sse", float(error)

    
def my_squared_error(y_hat: np.array, dtrain: xgb.DMatrix) -> [np.array, np.array]:
    """custom objective: squared error"""
    y = dtrain.get_label()
    N = y_hat.shape[0]
    gradient = y_hat - y
    hessian = np.ones(N)
    return gradient, hessian


# load data
boston = load_boston()
X = boston["data"]
y = boston["target"]

# split data
X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    test_size=0.2, random_state=0)
# for early stopping
X_train, X_eval, y_train, y_eval = train_test_split(X_train, y_train,
                                                    test_size=0.2, random_state=0)

# make DMatrix data
dtrain = xgb.DMatrix(X_train, label=y_train)
deval = xgb.DMatrix(X_eval, label=y_eval)
dtest = xgb.DMatrix(X_test, label=y_test)

# 標準のmetricではmseは存在せずrmseしかないのでrmseにする
params = {'objective': 'reg:squarederror', 'metric': 'rmse'}
watchlist = [(deval, 'eval')]
n_trees = 100
# 標準のsquared errorで学習
model1 = xgb.train(params, dtrain, num_boost_round=n_trees,
                   evals=watchlist, early_stopping_rounds=2)
# カスタム目的関数で学習
model2 = xgb.train(params, dtrain, num_boost_round=n_trees,
                   evals=watchlist, early_stopping_rounds=2,
                   obj=my_squared_error, feval=my_sse)

# 予測結果が一致するかどうか確認
pred1 = model1.predict(dtest)
print(f"head of pred1: {pred1[0:5]}")
pred2 = model2.predict(dtest)
print(f"head of pred2: {pred2[0:5]}")
print(f"予測結果が一致するもの:{sum(pred1 == pred2)}件({sum(pred1 == pred2) / X_test.shape[0]:.0%})")
head of pred1: [23.791985 26.905416 23.237139 10.941677 21.959444]
head of pred2: [23.791985 26.905416 23.237139 10.941677 21.959444]
予測結果が一致するもの:102件(100%)

結果は一致しました。問題なくsquared error / mse(rmse)を実装できていると思われます。

自作の目的関数の使い所

例えば「(予測精度が多少下がってもいいから)予測値が実測値を上回らないようにしてほしい」とか、あるいはその逆とかいった非対称な評価関数を設定したいときには使えると思います。

また、Kaggleにおいてコンペの評価指標に向けて最適化するためにカスタム目的関数を利用する場合もあるみたいですね。


  1. early stoppingとは、決定木を作るごとに予測誤差を評価して、決定木を追加しても予測誤差が改善しなくなってきたときに決定木の追加を止める手法のことです。

データ分析時のメモリ使用量を減らす方法

最近少し覚えたことをまとめます。

(基本的にpythonのコードと共に述べていきますが、Rの場合についても少し触れていきます。)

不要なオブジェクトの削除

「使わなくなったオブジェクトを削除してメモリを開放する」という考えです。

del

例えば以下のコードを試してみます。

import gc
import psutil
import numpy as np
import sys
nrows = 500000
ncols = 100
print('データを生成します')
X = np.random.rand(nrows, ncols)
print(f'データのサイズ: {sys.getsizeof(X) / 1024**2: .1f} MB')
mem_before = psutil.virtual_memory().used
print(f'del前のメモリ使用量: {mem_before / 1024**3 :.1f} GB')
del X
mem_after = psutil.virtual_memory().used
mem_diff = mem_before - mem_after
print(f'del後のメモリ使用量: {mem_after / 1024**3 :.1f} GB ({mem_diff / 1024**2 :.1f} MB reduced)')

上のコードを実行すると、以下のような結果になります。

データを生成します
データのサイズ:  381.5 MB
del前のメモリ使用量: 11.5 GB
del後のメモリ使用量: 11.2 GB (362.4 MB reduced)

手動でのガベージコレクションは不要

delのあとに明示的にガベージコレクションgc.collect())を使う方法も考えられますが、基本的に自動でのガベージコレクションが働くのでやらなくて問題ないみたいです。

また、ガベージコレクションを行ったとしても、メモリーがOSに返されるとは限らないようです1

Rの場合

一方でRはオブジェクトを削除しただけではメモリ使用量が変わらないため、手動でのガベージコレクションが有効なようです2

rm(X)
gc(reset = TRUE)

データ型の最適化

numpyにはuint8float32int64などの細かいデータ型が用意されています。

例えば以下のようにnp.iinfo()np.finfo()のメソッドを使うと、そのデータ型でもてる最小値・最大値などの情報を見ることができます。

import numpy as np
print(np.iinfo(np.int8))
print(np.iinfo(np.int16))
print(np.iinfo(np.int32))
print(np.iinfo(np.int64))
print(np.finfo(np.float16))
print(np.finfo(np.float32))
print(np.finfo(np.float64))
Machine parameters for int8
---------------------------------------------------------------
min = -128
max = 127
---------------------------------------------------------------

Machine parameters for int16
---------------------------------------------------------------
min = -32768
max = 32767
---------------------------------------------------------------
(以下略)

8bitのint8は-128~127の範囲の値しか表現できませんが、例えば二値変数(0, 1)や月(1~12)や日(1~31)のデータなどであればこのくらいの表現力で十分ですよね。なので最適化しましょう、というのがこのアプローチの考え方です。

自動で型変換するコード

こんな感じのやつを使っています。

import pandas as pd
import numpy as np

def reduce_mem_usage(df: pd.DataFrame) -> pd.DataFrame:
    """データフレームのすべてのカラムのデータ型を最適化してメモリ使用量を抑えるやつ"""
    start_mem = df.memory_usage().sum() / 1024**2
    print(f'Memory usage of dataframe is {start_mem:.2f} MB')
    df = df.apply(_optimize_dtype, axis=0)
    end_mem = df.memory_usage().sum() / 1024**2
    print(f'Memory usage after optimization is: {end_mem:.2f} MB')
    print(f'Decreased by {100 * (start_mem - end_mem) / start_mem:.1f}%')
    return df


def _optimize_dtype(col):
    """あるカラム/変数のデータ型を最適化するやつ"""
    col_type = col.dtype
    if col_type != 'object' and col_type != 'datetime64[ns]':
        c_min = col.min()
        c_max = col.max()
        # intにできるなら変換するが、NAがあればfloatのままにする(intはNAを保持できない)
        is_int = False
        if _is_int(col):
            if col.isnull().sum() == 0:
                is_int = True
            else:
                print(f'{col.name} was not convert to int because it has NA')
        # bit数の最適化
        if is_int:
            if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                col = col.astype(np.int8)
            elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                col = col.astype(np.int16)
            elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                col = col.astype(np.int32)
            elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                col = col.astype(np.int64)
        else:
            if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                # feather-formatがfloat16を受け入れないためfloat32にする
                col = col.astype(np.float32)
            elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                col = col.astype(np.float32)
            else:
                col = col.astype(np.float64)
    return col


def _is_int(col):
    """あるカラム/変数が整数かどうかを判定する"""
    fractional, _ = np.modf(col)
    if np.sum(fractional) == 0: # 小数部がすべて0
        return True
    else:
        return False

使用例は次のような感じです。

import numpy as np
import pandas as pd
import sys

# データを生成
nrows = 500_000
ncols = 100
X = [np.random.randint(low=0, high=50, size=nrows) for x in range(ncols)]
X = pd.DataFrame(np.array(X).T)
print('データを生成 ---------------------------')
print('dtypes: ')
print(X.dtypes.value_counts().to_frame(name='rows'))
print(f'データ型変換前のオブジェクトのサイズ: {sys.getsizeof(X) / 1024**2 : .1f} MB')

print('\nデータ型を変換 -------------------------')
X = reduce_mem_usage(X)
print('dtypes: ')
print(X.dtypes.value_counts().to_frame(name='rows'))
print(f'データ型変換後のオブジェクトのサイズ: {sys.getsizeof(X) / 1024**2 : .1f} MB')
データを生成 ---------------------------
dtypes:
       rows
int64   100
データ型変換前のオブジェクトのサイズ:  381.5 MB

データ型を変換 -------------------------
Memory usage of dataframe is 381.47 MB
Memory usage after optimization is: 47.68 MB
Decreased by 87.5%
dtypes:
      rows
int8   100
データ型変換後のオブジェクトのサイズ:  47.7 MB

このコードはkaggleのkernelを元にえじさんが改造したものを自分用に改造したものです。

えじさんのものからの変更点は

  • loggerに関する部分をprintに置き換え(私がまだloggerに慣れていないため)
  • floatであっても小数点以下が全部0でNAの無いカラムはintに変換する
  • forループでなくapply()を使う(applyの方が何倍か速い3

です。

疎行列クラスを使う

ダミー変数(One Hot Encodingした列)が多いデータの場合、非ゼロ要素の情報しか保持しない疎行列用のクラスを使うことでメモリ使用量を大幅に削減できます。

pythonの場合はscipy.sparseを、Rの場合は{Matrix}パッケージを使うことで効率よく疎行列を保持できます。

import pandas as pd
import numpy as np
import sys
import scipy.sparse
nrows = 500000
ncols = 500
categories = pd.Series(np.random.randint(low=0, high=ncols, size=nrows))
X = pd.get_dummies(categories)
print(f'dtypes: {X.dtypes.value_counts().to_dict()}')
print(f'データのサイズ: {sys.getsizeof(X) / 1024**2: .1f} MB')
X_csr = scipy.sparse.csr_matrix(X, shape=X.shape)
print(f'csr_matrixのサイズ: {sys.getsizeof(X_csr): .1f} Byte')
dtypes: {dtype('uint8'): 500}
データのサイズ:  238.4 MB
csr_matrixのサイズ:  56.0 Byte

  1. アプリケーションのメモリー使用量の最適化

  2. Rでメモリを解放したい - まずは蝋の翼から。

  3. 使用例で使った50万行x100列のデータの変換を10回行って実行時間の平均をとると、applyを使う場合1.62±0.01秒、forループの場合19.668±0.292秒でした。