python – What’s the best practices I ain’t using in this code?

I have a fundamental but interesting question.

I have written this python code which checks the payload sent from a specific IoT topic.

First, I would love to get honest feedback and learn how to improve my code to be most efficient, readable, and straightforward.

Second I would like to know when you are using classes in python and when just functions and methods.
Before You jump, let me clarify. I know when to use classes which is obvious, and you are using many “users” who share common attributes, But !!!!!! As it seems, and maybe I am wrong here, the best practice (in most of the production code I am familiar with). It is always to force the code for use in classes, even in cases where you can easily write the code without classes.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.

import glob
import logging
import platform
import sys
import time

import requests
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder

# This module purpose is to test valid publish to AWS IoT topics.
# It uses the Message Broker for AWS IoT to send and receive messages
# through an MQTT connection. On Run, the device connects to the server,
# subscribes to a topic.
# The device should receive messages from the message broker, since it is subscribed to a specific topic.

# Define constants
PAYLOAD_ACCEPTANCE_PERCENTAGE_TOLERANCE = 0.8
TRANSMISSION_RATE_PER_SEC = 10
NO_OF_SECONDS_TO_TEST_SUB_ON_TOPIC = 3
THINGS_URL_PROD = 'https://some_production_url'
THINGS_URL_TEST = 'https://some_production_url'


# main function
def main():
    # Instantiate an object of type SubscribeOnTopics
    device_1 = SubscribeOnTopics()

    # Fetching the device host name.
    host_name = device_1.get_hostname_from_device()

    # Fetching the device drone id from dronethings table.
    drone_id = str(device_1.get_drone_id(host_name))

    # Subscribing to a specific topic.
    device_services_topic = f'device/services/{drone_id}'
    device_1.call_iot(device_services_topic)


# Defining logger
def make_logger():
    log = logging.getLogger(__name__)
    log.setLevel(logging.INFO)
    formatter = logging.Formatter('%(levelname)s - %(message)s')
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(formatter)
    log.addHandler(handler)
    return log


class SubscribeOnTopics:

    def __init__(self):
        self.endpoint = "some.eu-central-1.amazonaws.com"
        self.cert_filepath = r'/home/pi/drone/aws_cert/' 
                             + glob.glob1("/home/pi/drone/aws_cert/", "*.crt")(0),
        self.pri_key_filepath = r'/home/pi/drone/aws_cert/' 
                                + glob.glob1("/home/pi/drone/aws_cert/", "*.key")(0),
        self.ca_filepath = r'/home/pi/drone/aws_cert/rootCA.pem'
        self.count = 0
        self.count_payload = 1

    def get_hostname_from_device(self):
        return platform.node()

    # .env file contain 2 lines:
    # env=test/prod
    # API_KEY=some_api_key
    def convert_env_file_to_dict(self):
        with open('/home/pi/Desktop/device/.env') as env_file:
            env_to_dict = {}
            lines = env_file.readlines()
            for line in lines:
                strip_line = line.replace('n', '')
                splitted_line = strip_line.split('=')
                env_to_dict.update({splitted_line(0): splitted_line(1)})
            return env_to_dict

    def get_device_env_value(self):
        return self.convert_env_file_to_dict()('env')

    def get_dronething_compatiable_url(self):
        if 'test' == self.get_device_env_value():
            return THINGS_URL_TEST
        elif 'prod' == self.get_device_env_value():
            return THINGS_URL_PROD
        else:
            raise Exception('The device env key did not found. Please check .env file and make sure he is valid')

    def get_drone_id(self, hostname):
        """
        This method extracts the desired device drone id from dronethings table.
        @param hostname: str - the desired device hostname.
        @return: int - device drone id.
        """
        dronethings_table_response = requests.get(url=self.get_dronething_compatiable_url(),
                                                headers={'x-api-key': self.convert_env_file_to_dict()('API_KEY')},
                                                params={'thingName': hostname})
        if dronethings_table_response.status_code != 200:
            logger.error(f'n  status code : {dronethings_table_response.status_code}n'
                         f'API call failed, JSON response:n {dronethings_table_response.text} n')
            return
        device_drone_id = dronethings_table_response.json().get('droneId')
        if not dronethings_table_response.json():
            logger.error(f'n{hostname} does not exists in dronethings table !!!!n'
                         f'verify that you specified a valid device numbern'
                         f' if so check dronethings table and check if it is listed there')
            return
        elif not device_drone_id:
            logger.error(f'n{hostname} droneId field in dronethings table is NULL ......n'
                         f'make sure to fill droneId id field, for the desired device, in dronethings table ')
            return
        return device_drone_id

    # Callback when the subscribed topic receives a message
    def on_message_received(self, topic, payload):
        logger.info(
            "Received message from topic '{}': {}n COUNT PAYLOADS {}".format(topic, payload, self.count_payload))
        self.count_payload += 1

    def call_iot(self, topic):
        """
        This method connects to AWS IoT core and subscribes to a given topic. Then validates the transmission quality
        according to the number of received data payloads.
        @param topic: str - AWS IoT topic the method subscribe.
        @return: bool - True if the received payloads are in the tolerance range, else exit with code 1..
        """
        try:
            # Spin up resources
            event_loop_group = io.EventLoopGroup(1)
            host_resolver = io.DefaultHostResolver(event_loop_group)
            client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

            mqtt_connection = mqtt_connection_builder.mtls_from_path(
                endpoint=self.endpoint,
                cert_filepath=self.cert_filepath(0),
                pri_key_filepath=self.pri_key_filepath(0),
                client_bootstrap=client_bootstrap,
                ca_filepath=self.ca_filepath,
                client_id='client_id',
                clean_session=False,
                keep_alive_secs=6)

            logger.info("Connecting to {}".format(
                self.endpoint))

            connect_future = mqtt_connection.connect()

            # Future.result() waits until a result is available
            connect_future.result()
            logger.info("Connected!")
        except:
            raise Exception('Failed to connect')

        # Setting timer for subscribe duration.
        try:
            start_time = time.time()
            seconds = NO_OF_SECONDS_TO_TEST_SUB_ON_TOPIC

            while True:
                current_time = time.time()
                elapsed_time = current_time - start_time

                if elapsed_time > seconds:
                    logger.info("Finished iterating in: " + str(int(elapsed_time)) + " seconds")
                    break

                # Subscribe to topic.
                logger.info(f"Subscribing to topic {topic}...")
                subscribe_future, packet_id = mqtt_connection.subscribe(
                    topic=topic,
                    qos=mqtt.QoS.AT_LEAST_ONCE,
                    callback=self.on_message_received)

                subscribe_result = subscribe_future.result()
                logger.info(f"Subscribed with {str(subscribe_result('qos'))} packet no: {packet_id}")
            if self.count_payload >= (seconds * TRANSMISSION_RATE_PER_SEC) * PAYLOAD_ACCEPTANCE_PERCENTAGE_TOLERANCE:
                logger.info('transmission is valid')
                return True
            else:
                logger.info('transmission rate is NOT valid')
                exit(1)

        except:
            raise Exception('failed to Subscribe')

        finally:
            # Disconnect
            logger.info("Disconnecting...")
            disconnect_future = mqtt_connection.disconnect()
            disconnect_future.result()
            logger.info("Disconnected!")


if __name__ == '__main__':
    logger = make_logger()
    main()