プログラムを中心とした個人的なメモ用のブログです。 タイトルは迷走中。
内容の保証はできませんのであしからずご了承ください。

2022/12/21

[Python] InfluxDB 1.8 で influxdb_client を使う

event_note2022/12/21 7:17

python で InfluxDB v1 に対してデータの読み書きを行う際に、ayncio を使って非同期化したいなと思ったのですが、今使っているライブラリの InfluxDBClient は asyncio に対応していないようでした。

代わりに influxdata 社の influx-client を使うと asyncio を使えそうだったのですが、こちらは基本的に InfluxDB v2 以降にのみ対応しているようで、Flux という言語にしか対応していません。
InfluxDB v1 以前は InfluxQL という SQL に近い言語が使われていました。

こちら、以下のページが参考になりました。

ただ、InfluxDB 1.7以降でも設定を変更することで Flux が使えるようになるそうです。

従って、InfluxDB v1 でも Flux を有効にしたうえで、Flux でクエリを記述してデータを取得するようにすれば、asyncio を使って非同期化することができそうです。
ちなみに、データの書き込みに関しては Flux は関係ありませんでした。

以下、InfluxDB v1 の前提で書いていますが、v2 でも基本的には同じです。
bucket token の指定方法が異なるだけです。

環境

  • InfluxDB 1.8.10

Flux 言語を有効化する

InfluxDB を Docker で動かしていたのですが、環境変数に以下を追加するだけで OK でした。

INFLUXDB_HTTP_FLUX_ENABLED=true

サンプルコード

データの書き込み (write_api)

import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client.client.write.point import Point
from influxdb_client.domain.write_precision import WritePrecision

if __name__ == '__main__':
    
    async def main():
        url = 'http://<url>:8086'
        username = 'username'
        password = 'password'
        dbname = 'dbname'
        rpname = 'rpname'
        # https://zeppelin.apache.org/docs/0.10.1/interpreter/influxdb.html
        bucket = f'{dbname}/{rpname}' # InfluxDB 1.8 では dbname/rpname がバケット名として使われる
        token = f'{username}:{password}' # InfluxDB 1.8 では username:password がトークンとして使われる
        org = "-" # InfluxDB 1.8 ではよくわからん
  
        data = []
        data.append({
            'fields': {
                'hoge': 1
            },
            'measurement': 'measurement',
            'tags': {
                'hostname': 'hostname',
            }
        })
        points = [ Point.from_dict(i, WritePrecision.MS) for i in data ]
        
        async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
            write_api = client.write_api()
            successfully = await write_api.write(bucket=bucket, record=points)
            print(successfully)

データの読み込み (query_api)

import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

if __name__ == '__main__':
    
    async def main():
        url = 'http://<url>:8086'
        username = 'username'
        password = 'password'
        dbname = 'dbname'
        rpname = 'rpname'
        # https://zeppelin.apache.org/docs/0.10.1/interpreter/influxdb.html
        bucket = f'{dbname}/{rpname}' # InfluxDB 1.8 では dbname/rpname がバケット名として使われる
        token = f'{username}:{password}' # InfluxDB 1.8 では username:password がトークンとして使われる
        org = "-" # InfluxDB 1.8 ではよくわからん

        query = f"""
        from(bucket: "{bucket}")
        |> range(start: -10m)
        |> filter(fn:(r) => r._measurement == "measurement")
        |> filter(fn: (r) => r["_field"] == "field")
        |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
        """

        async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
            tables = await client.query_api().query(query=query)
            for table in tables:
                print(table)
                for record in table.records:
                    print (record.values)

    asyncio.run(main())

Flux については以下のページが参考になりました。