ari23の研究ノート

メーカ勤務エンジニアの技術ブログです

dask.dataframe.read_csv()のblocksize

業務でDaskを使ってデータ分析をしているのですが、つまらないことで数日溶かしてしまったので、備忘録としてまとめます🐜

Daskとは

Daskとは、Pythonライブラリの1つであり、NumpyやPandasの並列処理や巨大なデータを扱うのが得意なライブラリです。

私は、NumpyやPandasよりも速度が非常に早いので、サイズの大きい.csvファイルの読み込みによく使っています。

詳細は他のブログなどを参考にしてください。

dask.dataframe.read_csv()

このDaskのread_csv()を使って.csvファイルを読み込んで、軽く分析するとどうしてもエラーで止まってしまう現象に遭遇しました。

ググってみると「列名がおかしい」とかありましたが、そんなことはなく、試しにタイムスタンプだけプロットしたらこんなグラフが出力されました。

timestampAbnormal
timestampAbnormal

「ん???」
タイムスタンプをただプロットしただけなので、右肩上がりの真っ直ぐな線が出ること期待していましたが、途中で折り返したグラフが出てきました。

そこで、このタイムスタンプをIPythonでprintとするとこんな結果が返ってきました。

In [27]: timestamp
Out[27]:
0    3.016625e+03
1    3.026625e+03
2    3.059125e+03
3    3.061625e+03
4    3.064125e+03
...
32213  2.880547e+07
32214  2.880549e+07
32215  2.880551e+07
32216   2.880553e+07
32217  2.880567e+07
Name: timestamp, Length: 1436500, dtype: float64

Lengthが1436500に対し、自動で振られた連番の最後が本来なら1436499になるはすなのに、32217であることがわかりました。

つまり、最初に読み込んだ時点のDaskDataFrameが複数に分割されていることを意味します。

ここで、DaskAPIのドキュメントを見直します。

Parameters:
blocksize:str, int or None, optional
Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores. If None, use a single block for each file. Can be a number like 64000000 or a string like “64MB”

Google翻訳
大きなファイルを切り取るバイト数。 デフォルト値は、使用可能な物理メモリとコアの数に基づいて計算されます。 なしの場合、各ファイルに単一のブロックを使用します。 64000000のような数値または「64MB」のような文字列を指定できます

○| ̄|_

ちゃんと書いてありました。トホホ

blocksizeに引数を渡していないので、PCスペックに基づいた値で分割されたようです。

下の方を見ると、分割せずに読み込むにはこのblocksizeにNoneを与えればよいとわかりました。

To get around this you can specify blocksize=None to not split files into multiple partitions, at the cost of reduced parallelism.

Google翻訳
これを回避するには、blocksize = Noneを指定して、ファイルを複数のパーティションに分割しないようにしますが、並列性は低下します

したがって、以下のように書き換えます。

import dask.dataframe as dd 
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()

#df = dd.read_csv(fpath).compute()  # 複数に分割される可能性あり
df = dd.read_csv(fpath, blocksize=None).compute()  # blocksizeにNoneを指定して一括で読み込む

ser_timestamp = df['timestamp']
ser_timestamp.plot()
plt.show()

すると、下記のような想定通りのグラフを得ることが出来ました。

timestampNormal
timestampNormal

おわりに

Dask自体がPandasなど比べると有名でないせいか、関連する記事が見つからなかったため、今回ブログとしてまとめました。

というか、みんなちゃんとドキュメント読んでるんですよね。すみませんorz

参考になれば幸いです(^^)