Introduction
InfluxDB is a time-series database that powers most IoT projects like home automation apps, AI security cameras, health applications, server monitoring systems, and more. Although InfluxDB is a powerful database, you can enhance its functionalities using Python modules. Python offers a great library and its popularity is rapidly growing due to its user-friendly data structures and code versatility.
This guide explains how to use the InfluxDB library with Python on a Ubuntu server. The library is a flexible self-contained driver that allows you to establish communication to an InfluxDB database using Python.
Prerequisites
Before you begin:
Using SSH, access the server.
Create a non-root sudo user and switch to the user account.
Install InfluxDB and create an administrator account.
Install the influxdb Library and Create a Sample Database
Update the server.
$ sudo apt updateInstall the
pipPython package manager.$ sudo apt install -y python3-pipUsing
pipinstall theinfluxdblibrary.$ pip install influxdbIf the above command returns an error, use:
$ apt install -y python3-influxdbLog in to the InfluxDB database console. Replace
EXAMPLE_PASSWORDwith a strong password.$ influx -username 'admin' -password 'EXAMPLE_PASSWORD'Output:
Connected to http://localhost:8086 version 1.6.7~rc0 InfluxDB shell version: 1.6.7~rc0Create a sample
fitness_app_dbdatabase.> CREATE DATABASE fitness_app_dbSwitch to the new
fitness_app_dbdatabase.> USE fitness_app_dbInsert sample data into the
fitness_app_dbdatabase using the line protocol syntax.> INSERT steps_counter,user=john_doe daily_steps=1879i INSERT steps_counter,user=mary_doe daily_steps=2785i INSERT steps_counter,user=jane_smith daily_steps=4563iThe above commands declare
steps_counteras the name of themeasurement. Theusertag uniquely identifies each user in the database. Then thedaily_stepsfield logs a user's total daily steps in the database. The trailingiat the end of thedaily_stepsfield value instructs InfluxDB to insert values as integers.Query the
steps_countermeasurements to verify the data.> SELECT "user", "daily_steps" FROM "steps_counter"Output:
name: steps_counter time user daily_steps ---- ---- ----------- 1687426847395972699 john_doe 1879 1687426847414045751 mary_doe 2785 1687426847498567130 jane_smith 4563Exit the InfluxDB database console.
> quit
Create an InfluxDB Database Gateway Class
In this section, create a separate project directory to store your Python source code. Then, within the directory, create a custom InfluxDB database gateway module to communicate with the database you created earlier as described below.
Create a new
projectdirectory for your application.$ mkdir projectSwitch to the new
projectdirectory.$ cd projectUsing a text editor such as
Nano, open theinflux_db_gateway.pyfile.$ nano influx_db_gateway.pyAdd the following configurations to the file.
from influxdb import InfluxDBClient class InfluxDbGateway: def db_conn(self): client = InfluxDBClient( 'localhost', 8086, 'admin', 'EXAMPLE_PASSWORD', 'fitness_app_db' ) return client def query_data(self): inf_db_client = self.db_conn() result = inf_db_client.query('SELECT "user", "daily_steps" FROM "steps_counter"') return list(result) def insert_data(self, data_points): user = data_points['user'] daily_steps = data_points['daily_steps'] inf_db_client = self.db_conn() inf_db_client.write_points([ { "measurement": "steps_counter", "tags": {"user": user}, "fields": {"daily_steps": daily_steps} } ]) return ['success']Replace
EXAMPLE_PASSWORDwith the actual InfluxDB password you created earlier.Below is what the configuration file does:
from influxdb import InfluxDBClientdeclaration imports the InfluxDB library to the file.class InfluxDbGateway:initializes a new class with three methods as below:def db_conn(self):connects to the samplefitness_app_dbdatabase.def query_data(self):queries thesteps_countermeasurement to retrieve points using the line protocol syntaxSELECT "user", "daily_steps" FROM "steps_counter"and returns the data as a list (return list(result)).def insert_data(self, data_points):accepts InfluxDB data points in JSON format and runs theinf_db_client.write_points()function to write the points to the database. After writing the points, the method returns a success messagereturn ['success'].
Save and close the file.
Create the Application Startup File
In this section, you will combine the sample databae, and gateway module to create a startup file for the application as described below.
Create a new
index.pyfile.$ nano index.pyAdd the following configurations to the file.
import http.server from http import HTTPStatus import socketserver import json import influx_db_gateway class HttpHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() db_gateway = influx_db_gateway.InfluxDbGateway() result = db_gateway.query_data() self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8")) def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data_points = json.loads(post_data) self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() db_gateway = influx_db_gateway.InfluxDbGateway() result = db_gateway.insert_data(data_points) self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8")) httpd = socketserver.TCPServer(('', 8082), HttpHandler) print("Web server is now running on port 8082...") try: httpd.serve_forever() except KeyboardInterrupt: httpd.server_close() print("The HTTP server has stopped running.")Below is what the configuration file does:
import...: imports the Python HTTP moduleshttp.server,HTTPStatus,socketserver, JSON modulejson, and the custominflux_db_gatewaymodule you created earlier.HttpHandler(http.server.SimpleHTTPRequestHandler):handles the following HTTP methods:GET: retrieves points from the database using thedb_gateway = influx_db_gateway.InfluxDbGateway()andresult = db_gateway.query_data()statements.POST: inserts new data points in the database using thedb_gateway = influx_db_gateway.InfluxDbGateway()andresult = db_gateway.insert_data(data_points)statements.
The following code returns the correct status codes and headers to HTTP clients.
... self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers()The code block below starts a web server that accepts incoming HTTP requests on port
8082.... httpd = socketserver.TCPServer(('', 8082), HttpHandler) print("Web server is now running on port 8082...") try: httpd.serve_forever() except KeyboardInterrupt: httpd.server_close() print("The HTTP server has stopped running.")
Save and close the file.
Test the Application
Run the application in the background.
$ python3 index.pyOutput:
Web server is now running on port 8082...Using
curl, retrieve all points from the InfluxDB database.$ curl -X GET http://localhost:8082/Output:
[ [ { "time": "2023-06-23T09:19:20.963877Z", "user": "john_doe", "daily_steps": 1879 }, { "time": "2023-06-23T09:19:20.978350Z", "user": "mary_doe", "daily_steps": 2785 }, { "time": "2023-06-23T09:19:21.020994Z", "user": "jane_smith", "daily_steps": 4563 } ] ]Run the following commands to write new data points to the database.
$ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "james_gates", "daily_steps": 2948}' $ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "eva_dukes", "daily_steps": 2759}'Output:
... [ "success" ]Fetch the application again to verify if the new data is added.
$ curl -X GET http://localhost:8082/Output:
[ [ { "time": "2023-06-23T09:19:20.963877Z", "user": "john_doe", "daily_steps": 1879 }, { "time": "2023-06-23T09:19:20.978350Z", "user": "mary_doe", "daily_steps": 2785 }, { "time": "2023-06-23T09:19:21.020994Z", "user": "jane_smith", "daily_steps": 4563 }, { "time": "2023-06-23T09:24:13.425451Z", "user": "james_gates", "daily_steps": 2948 }, { "time": "2023-06-23T09:24:24.832866Z", "user": "eva_dukes", "daily_steps": 2759 } ] ]To stop the application. View its Job ID.
$ jobsOutput:
[1]+ Running python3 index.py &Kill the job ID to stop the application.
$ kill %1
Conclusion
In this guide, you have used the InfluxDB database with Python on a Ubuntu 20.04 server. You created a sample InfluxDB database, queried, and submitted data via Python source code using the curl utility.
For more information about InfluxDB, visit the following resources.