Knowledgebase

How to Use InfluxDB with Python Print

  • 0

Introduction

InfluxDB is a time-series database that powers most IoT projects like home automation apps, AI security cameras, health applications, server monitoring systems, and more. Although InfluxDB is a powerful database, you can enhance its functionalities using Python modules. Python offers a great library and its popularity is rapidly growing due to its user-friendly data structures and code versatility.

This guide explains how to use the InfluxDB library with Python on a Ubuntu server. The library is a flexible self-contained driver that allows you to establish communication to an InfluxDB database using Python.

Prerequisites

Before you begin:

Install the influxdb Library and Create a Sample Database

  1. Update the server.

    $ sudo apt update
    
  2. Install the pip Python package manager.

    $ sudo apt install -y python3-pip
    
  3. Using pip install the influxdb library.

    $ pip install influxdb
    

    If the above command returns an error, use:

    $ apt install -y python3-influxdb
    
  4. Log in to the InfluxDB database console. Replace EXAMPLE_PASSWORD with a strong password.

    $ influx -username 'admin' -password 'EXAMPLE_PASSWORD'
    

    Output:

    Connected to http://localhost:8086 version 1.6.7~rc0
    
    InfluxDB shell version: 1.6.7~rc0
    
  5. Create a sample fitness_app_db database.

    > CREATE DATABASE fitness_app_db
    
  6. Switch to the new fitness_app_db database.

    > USE fitness_app_db
    
  7. Insert sample data into the fitness_app_db database using the line protocol syntax.

    > INSERT steps_counter,user=john_doe daily_steps=1879i
    
       INSERT steps_counter,user=mary_doe daily_steps=2785i
    
       INSERT steps_counter,user=jane_smith daily_steps=4563i
    

    The above commands declare steps_counter as the name of the measurement. The user tag uniquely identifies each user in the database. Then the daily_steps field logs a user's total daily steps in the database. The trailing i at the end of the daily_steps field value instructs InfluxDB to insert values as integers.

  8. Query the steps_counter measurements to verify the data.

    > SELECT "user", "daily_steps" FROM "steps_counter"
    

    Output:

      name: steps_counter
    
      time                user       daily_steps
    
      ----                ----       -----------
    
      1687426847395972699 john_doe   1879
    
      1687426847414045751 mary_doe   2785
    
      1687426847498567130 jane_smith 4563
    
  9. Exit the InfluxDB database console.

    > quit
    

Create an InfluxDB Database Gateway Class

In this section, create a separate project directory to store your Python source code. Then, within the directory, create a custom InfluxDB database gateway module to communicate with the database you created earlier as described below.

  1. Create a new project directory for your application.

    $ mkdir project
    
  2. Switch to the new project directory.

    $ cd project
    
  3. Using a text editor such as Nano, open the influx_db_gateway.py file.

    $ nano influx_db_gateway.py
    
  4. Add the following configurations to the file.

    from influxdb import InfluxDBClient
    
    
    
    class InfluxDbGateway:
    
        def db_conn(self):
    
            client = InfluxDBClient(
    
                'localhost',
    
                8086,
    
                'admin',
    
                'EXAMPLE_PASSWORD',
    
                'fitness_app_db'
    
            )
    
            return client
    
    
    
        def query_data(self):
    
            inf_db_client = self.db_conn()
    
            result = inf_db_client.query('SELECT "user", "daily_steps" FROM "steps_counter"')
    
            return list(result)
    
    
    
        def insert_data(self, data_points):
    
            user = data_points['user']
    
            daily_steps = data_points['daily_steps']
    
            inf_db_client = self.db_conn()
    
            inf_db_client.write_points([
    
                {
    
                    "measurement": "steps_counter",
    
                    "tags": {"user": user},
    
                    "fields": {"daily_steps": daily_steps}
    
                }
    
            ])
    
            return ['success']
    

    Replace EXAMPLE_PASSWORD with the actual InfluxDB password you created earlier.

    Below is what the configuration file does:

    • from influxdb import InfluxDBClient declaration imports the InfluxDB library to the file.

    • class InfluxDbGateway: initializes a new class with three methods as below:

      • def db_conn(self): connects to the sample fitness_app_db database.

      • def query_data(self): queries the steps_counter measurement to retrieve points using the line protocol syntax SELECT "user", "daily_steps" FROM "steps_counter" and returns the data as a list (return list(result)).

      • def insert_data(self, data_points): accepts InfluxDB data points in JSON format and runs the inf_db_client.write_points() function to write the points to the database. After writing the points, the method returns a success message return ['success'].

    Save and close the file.

Create the Application Startup File

In this section, you will combine the sample databae, and gateway module to create a startup file for the application as described below.

  1. Create a new index.py file.

    $ nano index.py
    
  2. Add the following configurations to the file.

    import http.server
    
    from http import HTTPStatus
    
    import socketserver
    
    import json
    
    import influx_db_gateway
    
    
    
    class HttpHandler(http.server.SimpleHTTPRequestHandler):
    
        def do_GET(self):
    
            self.send_response(HTTPStatus.OK)
    
            self.send_header('Content-type', 'application/json')
    
            self.end_headers()
    
    
    
            db_gateway = influx_db_gateway.InfluxDbGateway()
    
            result = db_gateway.query_data()
    
    
    
            self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8"))
    
    
    
        def do_POST(self):
    
            content_length = int(self.headers['Content-Length'])
    
            post_data = self.rfile.read(content_length)
    
    
    
            data_points = json.loads(post_data)
    
    
    
            self.send_response(HTTPStatus.OK)
    
            self.send_header('Content-type', 'application/json')
    
            self.end_headers()
    
    
    
            db_gateway = influx_db_gateway.InfluxDbGateway()
    
            result = db_gateway.insert_data(data_points)
    
    
    
            self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8"))
    
    
    
    httpd = socketserver.TCPServer(('', 8082), HttpHandler)
    
    print("Web server is now running on port 8082...")
    
    
    
    try:
    
        httpd.serve_forever()
    
    except KeyboardInterrupt:
    
        httpd.server_close()
    
        print("The HTTP server has stopped running.")
    

    Below is what the configuration file does:

    • import...: imports the Python HTTP modules http.server, HTTPStatus, socketserver, JSON module json, and the custom influx_db_gateway module you created earlier.

    • HttpHandler(http.server.SimpleHTTPRequestHandler): handles the following HTTP methods:

      • GET: retrieves points from the database using the db_gateway = influx_db_gateway.InfluxDbGateway() and result = db_gateway.query_data() statements.

      • POST: inserts new data points in the database using the db_gateway = influx_db_gateway.InfluxDbGateway() and result = db_gateway.insert_data(data_points) statements.

    • The following code returns the correct status codes and headers to HTTP clients.

      ...
      
      self.send_response(HTTPStatus.OK)
      
      self.send_header('Content-type', 'application/json')
      
      self.end_headers()
      
    • The code block below starts a web server that accepts incoming HTTP requests on port 8082.

      ...
      
      httpd = socketserver.TCPServer(('', 8082), HttpHandler)
      
      print("Web server is now running on port 8082...")
      
      
      
      try:
      
          httpd.serve_forever()
      
      except KeyboardInterrupt: 
      
          httpd.server_close()
      
          print("The HTTP server has stopped running.")
      

    Save and close the file.

Test the Application

  1. Run the application in the background.

    $ python3 index.py
    

    Output:

    Web server is now running on port 8082...     
    
  2. Using curl, retrieve all points from the InfluxDB database.

    $ curl -X GET http://localhost:8082/
    

    Output:

    [
    
       [
    
         {
    
           "time": "2023-06-23T09:19:20.963877Z",
    
           "user": "john_doe",
    
           "daily_steps": 1879
    
         },
    
         {
    
           "time": "2023-06-23T09:19:20.978350Z",
    
           "user": "mary_doe",
    
           "daily_steps": 2785
    
         },
    
         {
    
           "time": "2023-06-23T09:19:21.020994Z",
    
           "user": "jane_smith",
    
           "daily_steps": 4563
    
         }
    
       ]
    
    ]
    
  3. Run the following commands to write new data points to the database.

    $ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "james_gates", "daily_steps": 2948}'
    
    
    
    $ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "eva_dukes", "daily_steps": 2759}'
    

    Output:

    ...
    
    [
    
      "success"
    
    ]
    
  4. Fetch the application again to verify if the new data is added.

    $ curl -X GET http://localhost:8082/
    

    Output:

    [
    
       [
    
         {
    
           "time": "2023-06-23T09:19:20.963877Z",
    
           "user": "john_doe",
    
           "daily_steps": 1879
    
         },
    
         {
    
           "time": "2023-06-23T09:19:20.978350Z",
    
           "user": "mary_doe",
    
           "daily_steps": 2785
    
         },
    
         {
    
           "time": "2023-06-23T09:19:21.020994Z",
    
           "user": "jane_smith",
    
           "daily_steps": 4563
    
         },
    
         {
    
           "time": "2023-06-23T09:24:13.425451Z",
    
           "user": "james_gates",
    
           "daily_steps": 2948
    
         },
    
         {
    
           "time": "2023-06-23T09:24:24.832866Z",
    
           "user": "eva_dukes",
    
           "daily_steps": 2759
    
         }
    
       ]
    
    ]
    
  5. To stop the application. View its Job ID.

    $ jobs
    

    Output:

    [1]+  Running                 python3 index.py &
    
  6. Kill the job ID to stop the application.

    $ kill %1
    

Conclusion

In this guide, you have used the InfluxDB database with Python on a Ubuntu 20.04 server. You created a sample InfluxDB database, queried, and submitted data via Python source code using the curl utility.

For more information about InfluxDB, visit the following resources.


Was this answer helpful?
Back

Powered by WHMCompleteSolution