python で InfluxDB v1 に対してデータの読み書きを行う際に、ayncio を使って非同期化したいなと思ったのですが、今使っているライブラリの InfluxDBClient は asyncio に対応していないようでした。

代わりに influxdata 社の influx-client を使うと asyncio を使えそうだったのですが、こちらは基本的に InfluxDB v2 以降にのみ対応しているようで、Flux という言語にしか対応していません。
InfluxDB v1 以前は InfluxQL という SQL に近い言語が使われていました。
こちら、以下のページが参考になりました。
ただ、InfluxDB 1.7以降でも設定を変更することで Flux が使えるようになるそうです。
従って、InfluxDB v1 でも Flux を有効にしたうえで、Flux でクエリを記述してデータを取得するようにすれば、asyncio を使って非同期化することができそうです。
ちなみに、データの書き込みに関しては Flux は関係ありませんでした。
以下、InfluxDB v1 の前提で書いていますが、v2 でも基本的には同じです。bucket
token
の指定方法が異なるだけです。
環境
- InfluxDB 1.8.10
Flux 言語を有効化する
InfluxDB を Docker で動かしていたのですが、環境変数に以下を追加するだけで OK でした。
INFLUXDB_HTTP_FLUX_ENABLED=true
サンプルコード
データの書き込み (write_api)
import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client.client.write.point import Point
from influxdb_client.domain.write_precision import WritePrecision
if __name__ == '__main__':
async def main():
url = 'http://<url>:8086'
username = 'username'
password = 'password'
dbname = 'dbname'
rpname = 'rpname'
# https://zeppelin.apache.org/docs/0.10.1/interpreter/influxdb.html
bucket = f'{dbname}/{rpname}' # InfluxDB 1.8 では dbname/rpname がバケット名として使われる
token = f'{username}:{password}' # InfluxDB 1.8 では username:password がトークンとして使われる
org = "-" # InfluxDB 1.8 ではよくわからん
data = []
data.append({
'fields': {
'hoge': 1
},
'measurement': 'measurement',
'tags': {
'hostname': 'hostname',
}
})
points = [ Point.from_dict(i, WritePrecision.MS) for i in data ]
async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
write_api = client.write_api()
successfully = await write_api.write(bucket=bucket, record=points)
print(successfully)
データの読み込み (query_api)
import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
if __name__ == '__main__':
async def main():
url = 'http://<url>:8086'
username = 'username'
password = 'password'
dbname = 'dbname'
rpname = 'rpname'
# https://zeppelin.apache.org/docs/0.10.1/interpreter/influxdb.html
bucket = f'{dbname}/{rpname}' # InfluxDB 1.8 では dbname/rpname がバケット名として使われる
token = f'{username}:{password}' # InfluxDB 1.8 では username:password がトークンとして使われる
org = "-" # InfluxDB 1.8 ではよくわからん
query = f"""
from(bucket: "{bucket}")
|> range(start: -10m)
|> filter(fn:(r) => r._measurement == "measurement")
|> filter(fn: (r) => r["_field"] == "field")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
"""
async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
tables = await client.query_api().query(query=query)
for table in tables:
print(table)
for record in table.records:
print (record.values)
asyncio.run(main())
Flux については以下のページが参考になりました。