MARS – 大規模科学技術計算ライブラリ

CJNANです。

最近、アリババクラウドで、大規模科学技術計算処理するためのライブラリMARSをGithubで公開しました。ここで、MARSについて皆さんにご紹介します。

本記事では、データ処理に関連する内容であり、クラウド製品のご紹介はございません。

Tensor

Pythonを使ってデータ分析や機械学習をやる時に、Tensor(テンソル)と言うデータ構造をよく扱います。Tensorは多次元(N-dimension)の配列として表現できて、例えば、RGB画像の場合は、X軸とY軸の座標値とRed、Green、Blue 3つの色情報を表す色軸(チャンネルとも呼ばれる)もあります。つまり一枚の画像の情報を表現するためには3次元のデータ構造が必要であり、一般的なデータベースのテーブル(2次元)よりも多次元配列であるTensorの方がデータ処理に向いていますね。

Numpy

PythonはRと共にデータ処理で一番使われている言語であり、Python独自の強大なエコシステムができています。その中でもNumpyが一番メジャーなライブラリであり、Pandas、Scipy、scikit-learnと共に、機械学習業界でも活用されています。NumpyはTensorに対して、ndarrayと言うオブジェクトが中心の役割を果たしています。Ndarrayをうまく活用するとTensorデータを効率よく扱い、データ処理をより簡単に実現できます。しかし、Numpyはデフォルトに一台のマシンで動きますので、マシンリソースに依存され、データ量が莫大になると、処理が難しくなるケースがあります。

ギャップ

もちろん、大規模の多次元Tensorデータに対しては、MapReduceやHiveSQLで簡単なデータ処理をすることも可能ですが、コード実現が比較的複雑で、データ処理よりも、分散処理の知識がもっと問われます。そして、Tensor構造に対して多少複雑な科学演算処理を行う場合、MapReduceやHiveSQLの開発負荷が膨大になり、ハードルが高くなります。例えば、Aと言うMatrixデータに対して、Transposeを実現する場合に、MapReduceのPythonコード(データ読み込みやジョブコントロールを除いた部分だけ)が下記のようになります。

しかしこの処理をNumpyで実現する場合は、一行で済めますので、開発難易度が結構下がります。

import numpy as np
a = np.matrix(....)
at = a.transpose()
#or
a = a.T

分散システムではリソースの制限を解決し、大規模データの処理はできますが、Numpyのような効率的な科学演算には向いてないので、このギャップを埋めるために、Alibaba CloudではNumpyの分散処理ライブラリMARSを作りました。

MARS

まず、こちらはMARSのGithubです。もし、MARSの開発にご興味ある方はぜひ使って見てください。もちろん、MARSに対して、貴方のコードを貢献することも大歓迎です。

リンク:https://github.com/mars-project/mars

最初、MARSを開発するきっかけは、とあるお客様から、毎日MaxComputeを使って、定期的に何百億行のテーブルのSVD(特異値分解)演算処理を行いましたが、計算時間が長いし、コストも高くなる課題がありました。そこでAlibaba CloudはMaxComputeのリソース使って、効率よく科学演算できるような仕組みを作れないかを考え、Numpyの分散構成機能をMARSで実現しました。MARSをMaxComputeに対応することで、よく発生するメモリアウト問題や、CPUリソースの無駄PB/EBレベルのデータの科学演算も実現可能になります。

下記に、現在MARSが対応する機能をまとめてみました。

  • シングルノードと分散構成に全部対応可能
  • CPU、GPU、マルチスレットモードの切り替えるができる
  • 現在、MARSはNumpyのAPI Interfaceと100%互換性を実現し、将来にはPandasにも対応する予定です。
  • 今後は、MARSをMaxComputeに実装し、MaxComputeでも科学演算ができるようになるでしょう。

Tail

MARSには、実行するタスクに対して、グラフ化に変換し、グラフアルゴリズムを対応しする考え方です。具体的言うと、下記のようになります。

例えば、 b = (a+1).sum(axis =1)の関数を実行する場合、下記のような粗粒度グラフに変換します。

そして、粗粒度のグラフは自動的にデータを複数のChunkに分割し、同じ処理に対して、並列に実行できるような細粒度のグラフに変換します。

各Chunkのジョブは、Workerノードに適当に分散され、並列処理を行い、最終的に結果をまとめます。

こうする理由とメリットは、並列処理によるマルチタスクの実行時間を削減できる一方、データをChunkに割ることで、データが小さくなり、メモリが足りない場合でもハンドリングができるメリットがあります。

Tailに分けたデータはActorにより、ジョブのプロセスを制御しますが、ここでは、Actorに関する内容を割愛します。

デモ

MARSの実際の動きを見せるために、ここで2つの応用シーンを作りました。

1.モンテカルロ法で円周率を計算する

モンテカルロ法はシミュレーションや数値計算を乱数を用いて行う手法の総称であり、もっと簡単に言うと、ポイントを正方形の中にランダムにプロットして、円の中に入ったポイント数から面積をシミュレーションして、円周率を逆算する方法です。

ここでは、Jupyter Notebookで実行下結果になります。

  1. まずはPythonで普通に100MBをシミュレーションした結果が40秒

  1. データを10倍増やして1GBをNumpyで処理した場合は7秒ぐらい

3. Numexprを使ったバージョンは4,7秒、少しは早くなりましたね

4.マルチコアCPUに対して、パラレル処理をやってみました結果

5.ここでNvidia P100のGPUを使って、並列処理をやってみました。すごい、さすがに並列はGPUですね

6.最後はMARS、ここではシングルマシンで回してみました。3.8秒、GPUまではないですが、一般のNumpyよりは倍ぐらい早くなりましたね

 

ここで更にデータ量を100Gに増やしたら、NumpyとGPUの場合はメモリアウトしましたが、33秒、MARSは問題なく動きました。データは100倍に増えましたが、時間は10倍だけ、つまりデータが膨大であるほど、MARSの優位性が出てきますね。

2. PCAを使った顔認証

ここでは、MARSを使ってちょっとおもしろいものをつくりました。PCA(主成分分析)を使った顔認証ですね。SVDやPCAについてのアルゴリズム紹介は割愛します。ご興味ある方は、ググって見るといろいろ資料がありますので、ご参照ください。

ここでは、同じくJupyter Notebookで実現したSVDとPCA顔認証の結果です。

  1. Numpyで1GBのMatrixをSVDした場合は8分

  1. MARSはなんと405ms、SVD処理でもモンテカルロ法の実証結果と同じでした

ここで、PCAを使った顔認証のコードを皆さんに共有しました。MARSやPCAに興味あるかたは、ご参照ください。データ・セットはORLの400枚の顔写真で、40枚をテストした結果、97.5%の正解率でした。コードはJupyter Nontebookで実現しましたので、ウィジェットなどのインストールが多少発生しますので、ご注意ください。

import os
import cv2
import glob
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt 
import matplotlib.image as mpimg
from ipywidgets import interact, IntSlider
import mars.tensor as mt
from mars.session import new_session

def readdata(path):
    for root, dirs, files in os.walk(rootdir):
        dirs.sort()
        for dir in dirs:
            dirpath = os.path.join(root, dir)
            filelist = os.listdir(dirpath)
            filelist.sort()
            for file in filelist:
                filepath = os.path.join(root, dir, file)
                img = cv2.imread(filepath,  cv2.IMREAD_GRAYSCALE)
                img = img.reshape(1, img.size)
                label = int(dir[1:])

                if (file == filelist[0]):
                    try:
                        testdata = np.vstack((testdata, img))
                        test_label = np.vstack((test_label, label))
                        test_path = np.vstack((test_path, filepath))
                    except:
                        testdata = img
                        test_label = label
                        test_path = filepath
                else:
                    try:
                        traindata = np.vstack((traindata, img))
                        train_label = np.vstack((train_label, label))
                        train_path = np.vstack((train_path, filepath))
                    except:
                        traindata = img      
                        train_label = label
                        train_path = filepath
                                               
    return traindata, testdata, train_label, test_label, train_path, test_path

def pca_compress(train, test, k):
    data_mat = np.vstack((train, test))
    data_mean = np.mean(data_mat, axis=0, keepdims=True)
    data_new = data_mat - data_mean
    cov_data = data_new.T.dot(data_new) / (data_new.shape[1] - 1)

    U, s, V = np.linalg.svd(cov_data)
    V = V.T
    vecs = V[:, :k]

    data_transformed = data_new.dot(vecs) 
    return data_transformed, data_mean, vecs

def pca_compress_mars(train, test, k):
    data_mat = mt.vstack((train, test))
    data_mean = mt.mean(data_mat, axis=0, keepdims=True)
    data_new = data_mat - data_mean
    cov_data = data_new.T.dot(data_new) / (data_new.shape[1] - 1)
    
    U, s, V = mt.linalg.svd(cov_data)
    V = V.T
    vecs = V[:, :k]

    data_transformed = data_new.dot(vecs) 
    return session.run(data_transformed, data_mean, vecs)

def inference(train, test, train_label, test_label):
    for i, test_data in enumerate(test):
        for j,  train_data in enumerate(train):
            try:
                distance = np.vstack((distance, np.dot(train_data, test_data) / (np.linalg.norm(test_data) * np.linalg.norm(train_data))))
            except:
                distance = np.dot(train_data, test_data) / (np.linalg.norm(test_data) * np.linalg.norm(train_data))
        try:
            result = np.vstack((result, train_label[np.where(distance==np.max(distance))[0]]))
        except:
            result =train_label[np.where(distance==np.max(distance))[0]]
        del(distance)

    acc = np.divide(np.intersect1d(result, test_label).shape, test.shape)    
    return result, acc

#read data from ORL Face Dataset
print('step1: Loading Dataset')
rootdir = 'orl_faces/'
train, test, train_label, test_label, train_path, test_path = readdata(rootdir)
print('trainset:',train.shape[0], 'images')
print('testset:',test.shape[0], 'images')

for i, image in enumerate(test):
    plt.subplot(4, 10, i+1)
    img =cv2.imread(list(train_path[1])[0])
    image = image.reshape(112,92)
    plt.imshow(image, cmap='gray')
    plt.axis("off")

%%time 
#Train face recog model using PCA
print('step2: Reduction using PCA')
dimension = 1000
#numpy
print('numpy mode')
#%time data_t, face_mean, face_feutuers = pca_compress(train, test, dimension)
#mars
print('mars-single mode')
session = new_session()
data_t, face_mean, face_feutuers = pca_compress_mars(train, test, dimension)
#data post processing
train_t = data_t[:train.shape[0]]
test_t = data_t[train.shape[0]:]
print('Done')

#show face feutures
plt.subplot(2,10,1)
plt.imshow(face_mean.reshape(112,92), cmap='gray')
plt.axis("off")
for i, feutuer in enumerate(face_feutuers.T[:10,:]):
    plt.subplot(2, 10, 11+i)
    plt.imshow(feutuer.reshape(112,92), cmap='gray')
    plt.axis("off")

#Demonstration
print('step4: Demonstration')
results, acc = inference(train_t, test_t, train_label, test_label)
print('Total Accuracy is ', acc[0])

@interact(num = IntSlider(min = 0, max = 39, step = 1))
def show(num):
    plt.subplot(121)
    print(str(int(results[num])))
    img = cv2.imread('orl_faces/s' + str(int(results[num])) + '/1.pgm')
    plt.imshow(img, cmap='gray')
    plt.axis("off")
    
    plt.subplot(122)
    print(test_path[num][0])
    img = cv2.imread(test_path[num][0])
    plt.imshow(img, cmap='gray')
    plt.axis("off")

最後に

以上はMARSについてのしょうかいでした。まだまだ公開したばかりのプロジェクトですが、今後の進化が楽しみですね。もし、MaxComputeにも実装されたら、大規模の科学演算ができるようになりますので、MaxComputeの価値もさらなる飛躍ができるでしょう。では、また面白いネタがありましたら、皆さんに共有します。