Helve’s Python memo

Pythonを使った機械学習や最適化の備忘録

PythonとPandasでInfluxDBを操作する

PythonとPandasを使って、時系列データベースInfluxDBを操作する方法についてまとめた。

目次

はじめに

InfluxDBは時系列データの扱いに特化したデータベースである。 概要は以下の記事を参照。

時系列データベースInfluxDB入門 - Helve’s Python memo

PythonでInfluxDBを操作するには、influx-pythonというライブラリを使う。 このライブラリは、InfluxDBを開発したinfluxdata社が公開しているものである。 influx-pythonを使うと、PandasのDataFrameでInfluxDBとデータを授受できる。

環境は以下の通り。

また、Pythonのバージョンは以下の通り。

バージョン
Python 3.7.6
NumPy 1.18.1
Pandas 1.0.1
Influxdb-python 5.2.2

ライブラリのインストール

PythonからInfluxDBを操作するライブラリinflux-pythonをインストールする。

condaの場合:

conda install -c pdrops influxdb

pipの場合:

pip install influxdb

以降では、ライブラリを以下のようにインポートすることを前提とする。

import numpy as np
import pandas as pd
import influxdb

influx-pythonの基本

influx-pythonでInfluxDBを操作するために、次の2つのクラスが用意されている。

  • InfluxDBClient: JSONを使う
  • DataFrameClient: PandasのDataFrameを使う

本記事では、DataFrameClientを扱う。

DataFrameClientクラス

DataFrameClientクラスの主な引数は以下の通り。

influxdb.DataFrameClient(host=u'localhost', port=8086, username=u'root',
                         password=u'root', database=None)

hostはアクセス先のInfluxDBがあるPCのIPアドレスである。 同じPCの場合はデフォルト値のlocalhostでよい。

portはInfluxDBのポート番号である。 8086はInfluxDBのデフォルトのポート番号である。

username, passwordはInfluxDBにアクセスするためのユーザ名、パスワードである。 'root'はInfluxDBのデフォルト値と同じである。

databaseは接続先のデータベース名である。 後ほどメソッド実行時に指定できるので、DataFrameClientオブジェクト作成時に指定する必要はない。

DataFrameClientクラスの主なメソッドは次の通り。

メソッド名 説明
create_database(<データベース名>) データベースを作成する
get_list_database() データベース一覧を取得する
get_list_measurements() measurement一覧を取得する
drop_database(<データベース名>) データベースを削除する
drop_measurement(<measurement名>) measurementを削除する
write_points() DataFrameを書き込む
query() データを取得する

write_points()query()については、以降で詳しく述べる。

InfluxDBへの接続

DataFrameClientクラスを使って、InfluxDBへ接続する。 まず、DataFrameClientオブジェクトを作成し、 pd_testという名前のデータベースを作成する。

client = influxdb.DataFrameClient()
client.create_database("pd_test")

データベースを既に作成している場合は、以下のようにしても良い。

client = influxdb.DataFrameClient(database="pd_test")

DataFrameをInfluxDBに書き込む

PandasのDataFrameをInfluxDBに書き込むには、 write_pointsメソッドを用いる。

DataFrameClient.write_points(dataframe, measurement,database=None)

引数の説明は次の通り。

  • dataframe: PandasのDataFrame
  • measurement: データを追加するmeasurement(表の名前)
  • database: データベース名。

ここで、DataFrameClientオブジェクトにデータベースが設定されていない場合、databaseは必須である。 databaseを指定しないと、write_pointsメソッド実行時に以下のエラーが出る。

nfluxDBClientError: 400: {"error":"database is required"}

一方、DataFrameClientオブジェクトにデータベースが設定されており、 かつwrite_pointsメソッドで指定しない場合、オブジェクトに設定されたデータベースにデータが追加される。

例として、PandasのDataFrameを、先ほど作成したpd_testデータベースに追加する。 DataFrameは、10行×2列の大きさで、 2020年4月1日から1日周期でインデックスを振っている。 ここで、measurement名はmeas1とした。

array = np.arange(20).reshape(-1, 2)
index = pd.date_range(pd.Timestamp("20200401"), freq="1D", periods=10)
df = pd.DataFrame(array, index=index, columns=["A", "B"])  

client.write_points(df, "meas1", database="pd_test")

DataFrameをInfluxDBから取得する

InfluxDBからデータを取得するには、queryメソッドを用いる。

DataFrameClient.query(query, database=None)

引数の説明は以下の通り。

  • query: InfluxDBのクエリ文。
  • database: データベース名。

クエリ文の構文の詳細については、別記事で説明予定。

ここで、DataFrameClientオブジェクトにデータベースが設定されていない場合、databaseは必須である。 一方、DataFrameClientオブジェクトにデータベースが設定されており、 かつwrite_pointsメソッドで指定しない場合、オブジェクトに設定されたデータベースにデータが追加される。

また、queryメソッドの戻り値は辞書型(正確にはdefaultdict)であり、 キーがmeasurement, 値がデータフレームになる。

例として、先程write_pointsメソッドで書き込んだデータを取得する。

q1 = "SELECT * FROM meas1"
res = client.query(q1, database="pd_test")
print(res)

ここで、クエリ文

SELECT * FROM meas1

は、measurement meas1の全てのfieldを取得することを表す。

実行結果 queryメソッドの戻り値resは辞書型であり、 キーがmeasurement meas1, 値がDataFrameとなる。 また、DataFrameのindexDateTimeIndex, columnsはInfluxDBのfieldとなる。

defaultdict(<class 'list'>, {'meas1':                             A   B
2020-04-01 00:00:00+00:00   0   1
2020-04-02 00:00:00+00:00   2   3
2020-04-03 00:00:00+00:00   4   5
2020-04-04 00:00:00+00:00   6   7
2020-04-05 00:00:00+00:00   8   9
2020-04-06 00:00:00+00:00  10  11
2020-04-07 00:00:00+00:00  12  13
2020-04-08 00:00:00+00:00  14  15
2020-04-09 00:00:00+00:00  16  17
2020-04-10 00:00:00+00:00  18  19})

なお、DataFrameを直接取得する場合には、

df = list(res.values())[0]

などのようにする。

参考

Welcome to InfluxDB’s documentation! — InfluxDB 5.3.0 documentation

時系列データベースInfluxDB入門 - Helve’s Python memo