E-Mapreduceでデータの前処理を実践

こんにちは、Kouです。

データセットの前処理と言えば、オープンソースのPandasでDataframeとSeriesを操作するのが一般的ですが、データのサイズが大きいほど、大量のデータを読み込もうとすると、メモリ不足でエラーになる可能性、もしくは長時間待たされる可能性も高くなります。

実際のデータ分析業務の中で、かならずしもビッグデータの規模とは言えないですが、本記事では、ビッグデータの前処理に焦点をあてながら、PySpark(Apache SparkのPythonインタフェース)で2次元テーブル、そしてテーブルの列からデータの取り扱い方について、展開させて頂きたいと思います。

事前準備

まず、E-Mapreduce(Hadoop)のクラスタを構築します。構築の詳細手順はAlibaba Cloud公式ドキュメントに記載されておりますので、ご参照頂ければと思います。

次は、検証に使うデータセットをダウンロードします。こちらはAnalytics Vidhya社の一般公開データセットをダウンロードしました。とあるリテール会社の商品販売データです。最後ダウンロードしたデータセット(csvファイル)を一旦Alibaba Cloud OSSにアップロードしておきます。

f:id:sbc_kou:20190913161520p:plain

データフレームの作成

E-Mapreduceのマスターノードにsshログインし、下記のコマンドでPySpark Shellを起動します。

pyspark --master yarn

f:id:sbc_kou:20190913162357p:plain

ossutilosscmdなどのECSからOSSへのアクセスツール要らず、下記コマンドのように、sparkよりossバケットにtrain.csvファイルを参照することによって、直接データフレームを作成することができるのでかなり楽です。

train = spark.read.option("inferSchema","true").option("header","true").csv("oss://xxxxxxxxxxxxxxxx/train.csv")

データフレームの操作

データ型の確認

コラムのデータ型を確認するために、printSchema()を利用して、木構造のスキーマをプリントすることができます。

>>>train.printSchema()
root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
先頭要素の確認

headを使って、先頭のN行を表示させることができ、Pandasのheadオペレーションと似ています。

>>> train.head(5)
[Row(User_ID=1000001, Product_ID=u'P00069042', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370), Row(User_ID=1000001, Product_ID=u'P00248942', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200), Row(User_ID=1000001, Product_ID=u'P00087842', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422), Row(User_ID=1000001, Product_ID=u'P00085442', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057), Row(User_ID=1000002, Product_ID=u'P00285442', Gender=u'M', Age=u'55+', Occupation=16, City_Category=u'C', Stay_In_Current_City_Years=u'4+', Marital_Status=0, Product_Category_1=8, Product_Category_2=None, Product_Category_3=None, Purchase=7969)]

さらに、2次元テーブルの形式で返したい場合は、showオペレーションを使うことも可能です。

>>> train.show(2,truncate=True)
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
データ集計

特定の列に対して、要約統計量(平均値、標準偏差、最大値、最小値、カウント)を取得したい場合、descrie()を利用します。

>>> train.describe('Purchase').show()
+-------+-----------------+
|summary|         Purchase|
+-------+-----------------+
|  count|           550068|
|   mean|9263.968712959126|
| stddev|5023.065393820575|
|    min|               12|
|    max|            23961|
+-------+-----------------+
コラムのサブセット

列をサブセットにするには、選択操作を使用する必要があり、またコンマで区切られた列名を渡す必要があります。例えば、「User_ID」と「年齢」の先頭の5行を選択してみましょう。

>>> train.select('User_ID','Age').show(5)
+-------+----+
|User_ID| Age|
+-------+----+
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000002| 55+|
+-------+----+
only showing top 5 rows
フィルタ操作

DataFrame のPurchase列にフィルタ操作を適用し、15000を超える値を持つ行を選出ことができます。例えば、条件を通す必要があるデータフレームのPurchase列にフィルタをかけて、15000を超える購入数をプリントしてみましょう。

>>> train.filter(train.Purchase > 15000).show(5)
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000003| P00193542|     M|26-35|        15|            A|                         3|             0|                 1|                 2|              null|   15227|
|1000004| P00184942|     M|46-50|         7|            B|                         2|             1|                 1|                 8|                17|   19215|
|1000004| P00346142|     M|46-50|         7|            B|                         2|             1|                 1|                15|              null|   15854|
|1000004|  P0097242|     M|46-50|         7|            B|                         2|             1|                 1|                16|              null|   15686|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
マップ操作

マップ操作を使用して、DataFrame の各行に関数を適用できます。この関数を適用した後、RDDの形式で結果を取得します。TrainのUser_ID列にマップ操作を適用し、関数を適用した後にマップされたRDD(x,1)の最初の5 つの要素をプリントしてみましょう。

>>> train.select('User_ID').rdd.map(lambda x:(x,1)).take(5)
[(Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000002), 1)]
SQL操作

RDDとは異なり、SparkはDataFrameでSQLクエリを実行することができます。DataFrameにSQLクエリを適用するには、DataFrameをテーブルとして登録する必要があります。まず、データフレームをテーブルとして登録してみましょう。

train.registerTempTable('train_table')

上記のコードでは、registerTempTable操作の助けを借りて、テーブル('train_table')として'train'を登録しました。'train_table' にSQLクエリを適用してProduct_ID を選択してみましょう。結果を得るには、show()アクションを適用する必要があります。

>>> spark.sql('select Product_ID from train_table').show(5)
+----------+
|Product_ID|
+----------+
| P00069042|
| P00248942|
| P00087842|
| P00085442|
| P00285442|
+----------+
only showing top 5 rows

上記のコードでは、SQLクエリを指定するためにspark.sql を使いしました。次はもっと複雑なクエリで、train_tableで各年齢グループの最大購入を取得してみましょう。

>>> spark.sql('select Age, max(Purchase) from train_table group by Age').show()
+-----+-------------+
|  Age|max(Purchase)|
+-----+-------------+
|18-25|        23958|
|26-35|        23961|
| 0-17|        23955|
|46-50|        23960|
|51-55|        23960|
|36-45|        23960|
|  55+|        23960|
+-----+-------------+

最後

この記事では、E-Mapreduceを利用して、Apache SparkのDataFrameの最も一般的な操作のいくつかについて紹介しました、もし皆さん普段の分析業務にお役に立てば幸いです。実はDataFrameで定義されている操作は数多くあり、1つの記事ですべてをカバーするのは難しいですが、DataFrame操作の詳細については、PythonのPyspark.sqlモジュールドキュメントをご参照いただければと思います。