You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
13 KiB
Python

#!/usr/bin/env python3
"""
glucose_monitor.py - A script to display glucose levels on a Raspberry Pi
Configurable with a config file
"""
import argparse
import configparser
import logging
import os
import signal
import inspect
import sys
import time
from datetime import datetime
from libreview import LibreViewSession
from screens import DebugScreen
from screens import Screen
import requests
import yaml
# CONSTANTS
DEFAULT_DATA_REFRESH_INTERVAL = 5
DEFAULT_SCREEN_REFRESH_INTERVAL = 1
DEFAULT_FONT_SIZE = 9
DEFAULT_DEBUG_SCREEN_WIDTH = 128
DEFAULT_DEBUG_SCREEN_HEIGHT = 32
DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS = "1"
SUPPORTED_SCREENS = [
"debug",
"terminal",
"waveshare_2.23_ssd1305"
]
# GLOBAL VARIABLES
SHOULD_EXIT = False
def file_exists(file_path) -> bool:
"""
file_exists:
Checks if a file exists
Arguments:
file_path:
The file that we are looking for
Returns:
boolean:
Returns true if the file exists, and false if it doesn"t
"""
return os.path.isfile(file_path)
def directory_exists(directory_path):
"""
directory_exists:
Checks if a directory exists
Arguments:
directory_path:
The directory that we are looking for
Returns:
boolean:
Returns true if the file exists and is a directory, and false if it"s not
"""
return os.path.isdir(directory_path)
def ini_to_yaml(ini_file_path: str, yaml_file_path: str, lower_case: bool = False):
"""
ini_to_yaml:
Takes an ini file and produces a yaml file from it.
Arguments:
ini_file_path:
The path to the ini file that you want to convert to a yaml file
yaml_file_path:
The path to the yaml file that you wish to create
lower_case = False
Change all the sections and keys to lower case, if false leaves the case alone
Returns:
None
"""
config = configparser.ConfigParser()
config.read(ini_file_path)
d = {}
for section in config.sections():
if lower_case:
d[section.lower()] = {}
else:
d[section] = {}
for key, value in config[section].items():
if lower_case:
d[section.lower()][key.lower()] = value
else:
d[section][key] = value
with open(yaml_file_path, "w+") as file:
yaml.dump(d, file, default_flow_style=False)
def get_current_time(fmt: str = "%H:%M %d/%m/%Y") -> str:
"""
get_current_time:
Returns a string containing the date and/or time in the format specified in fmt
Arguments:
fmt:
The format of the date and/or time to be returned in the string
Returns:
Returns a string with the date and/or time in the format specified
"""
now = datetime.now()
return now.strftime(fmt)
def time_since(dt: datetime) -> str:
"""
time_since:
Returns a string with the difference between the datetime passed and today
Arguments:
dt:
A datetime class to get the time since
Returns:
A string in the with minutes if difference can be calculated in minutes otherwise will just return seconds
Will be in the format of "2m 30s" or "40s"
"""
now = datetime.now()
difference = now - dt
total_seconds = difference.total_seconds()
minutes = int(total_seconds // 60)
seconds = int(total_seconds % 60)
if minutes > 0:
return f"{minutes}m {seconds}s"
return f"{seconds}s"
def signal_handler(signum, frame):
global SHOULD_EXIT
if SHOULD_EXIT:
logging.info("already received exit signal exiting NOW")
sys.exit(1)
SHOULD_EXIT = True
logging.info(f"received {signal.Signals(signum).name} signal shutting down")
logging.debug("stack trace:")
for frame, filename, lineno, function, _, _ in inspect.getouterframes(frame):
logging.debug(f"file: {filename}, line_number: {lineno}, function: {function}")
def main():
global SHOULD_EXIT
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
parser = argparse.ArgumentParser(
prog="glucose-monitor",
description="A program to display glucose levels and if you blood sugar is high"
)
parser.add_argument("-m", "--migrate", action="store_true")
parser.add_argument("-t", "--token")
parser.add_argument("-c", "--config")
args = parser.parse_args()
if args.config is not None:
if not file_exists(args.config):
logging.fatal("config file location {args.config} does not exist")
sys.exit(1)
else:
config_file_location = args.config
else:
possible_config_file_locations = [
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.yaml",
"/etc/glucose-monitor/config.yaml",
"config.yaml"
]
config_file_location = ""
found_config_file = False
for possible_config_file_location in possible_config_file_locations:
if file_exists(possible_config_file_location):
config_file_location = possible_config_file_location
found_config_file = True
logging.info("config file found at " + config_file_location)
break
if not found_config_file:
old_config_file_locations = [
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini",
"/etc/glucose-monitor/config.ini",
"config.ini"
]
for old_config_file_location in old_config_file_locations:
if file_exists(old_config_file_location):
if not args.migrate:
logging.fatal("Sorry, ini files are no longer support, run glucose-monitor --migrate to move to yaml")
sys.exit(1)
else:
ini_to_yaml(old_config_file_location, old_config_file_location[:-4]+".yaml", lower_case=True)
config_file_location = old_config_file_location[:-4]+".yaml"
if config_file_location == "":
logging.fatal("could not find a config file")
sys.exit(1)
with open(config_file_location, "r") as config_file:
config = yaml.safe_load(config_file)
if "log" in config:
if "level" in config["log"]:
level = config["log"]["level"].lower()
if level == "debug":
logging.getLogger().setLevel(logging.DEBUG)
elif level == "info":
logging.getLogger().setLevel(logging.INFO)
elif level == "warning":
logging.getLogger().setLevel(logging.WARNING)
elif level == "error":
logging.getLogger().setLevel(logging.ERROR)
elif level == "critical":
logging.getLogger().setLevel(logging.CRITICAL)
else:
logging.error("log level specified in config file is not valid")
logging.debug("set log level to debug")
if args.token is None:
if "credentials" not in config:
# TODO: support credentials in environmental variables
logging.fatal("no credentials specified in config file")
sys.exit(1)
if "email" not in config["credentials"]:
# TODO: support credentials in environmental variables
logging.fatal("no email specified in config file")
sys.exit(1)
if "password" not in config["credentials"]:
# TODO: support credentials in environmental variables
logging.fatal("no password specified in config file")
sys.exit(1)
data_refresh_interval = DEFAULT_DATA_REFRESH_INTERVAL
screen_refresh_interval = DEFAULT_SCREEN_REFRESH_INTERVAL
font = "DEFAULT"
font_size = DEFAULT_FONT_SIZE
lines = [
"%T",
"GLUCOSE UNITS: %U"
]
if "general" in config:
general_config = config["general"]
if "data_refresh_interval" in general_config:
data_refresh_interval = int(config["general"]["data_refresh_interval"])
if "screen_refresh_interval" in general_config:
screen_refresh_interval = int(config["general"]["screen_refresh_interval"])
# TODO: Add format specifier to check if high and low
# if "upper_bound" in general_config:
# upper_bound = config["general"]["upper_bound"]
# if "lower_bound" in general_config:
# lower_bound = config["general"]["lower_bound"]
if "font" in general_config:
font = general_config["font"]
logging.debug("found font in config, checking that it exists")
if not file_exists(font):
logging.fatal(f"font does not exist at path {font}")
sys.exit(1)
else:
logging.debug(f"found font at {font}")
if "font_size" in general_config:
font_size = general_config["font_size"]
if "lines" in general_config:
lines = general_config["lines"]
output_lines = []
for line in lines:
temp = ""
if "%T" in line:
temp += line.replace("%T", '{get_current_time()}')
if "%U" in line:
temp += line.replace("%U", '{glucose_measurement["Value"]}')
if "%S" in line:
temp += line.replace("%S", '{time_since(glucose_measurement_timestamp_datetime)}')
output_lines.append(temp)
display = Screen()
if "screen" in config:
screen_config = config["screen"]
if "type" not in screen_config:
logging.fatal("no screen type in config file")
sys.exit(1)
screen_type = screen_config["type"].lower()
if screen_type not in SUPPORTED_SCREENS:
logging.fatal("sorry screen type not supported")
sys.exit(1)
if screen_type == "waveshare_2.23_ssd1305":
from screens.waveshare_233_ssd1305 import Waveshare_233_SSD1305
display = Waveshare_233_SSD1305()
elif screen_type == "debug":
width = DEFAULT_DEBUG_SCREEN_WIDTH
height = DEFAULT_DEBUG_SCREEN_HEIGHT
number_of_colour_bits = DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS
if "width" in screen_config:
width = int(screen_config["width"])
if "height" in screen_config:
height = int(screen_config["height"])
if "number_of_colour_bits" in screen_config:
number_of_colour_bits = screen_config["number_of_colour_bits"]
display = DebugScreen(width, height, number_of_colour_bits)
libreview_session = LibreViewSession()
if args.token is not None:
libreview_session.useToken(args.token)
else:
libreview_session.authenticate(config["credentials"]["email"], config["credentials"]["password"])
if libreview_session.authenticated and not SHOULD_EXIT:
logging.info("successfully authenticated with LibreView")
else:
logging.fatal("failed to authenticate with LibreView")
sys.exit(1)
try:
libreview_session.getConnections()
except Exception as e:
logging.debug("got exception,", e)
logging.fatal("failed to get connections")
sys.exit(1)
if len(libreview_session.connections) > 1:
logging.fatal("currently glucose-monitor only supports accounts with one connection")
sys.exit(1)
if len(libreview_session.connections) < 1:
logging.fatal("to use glucose-monitor your libreview account must have at least one connection")
sys.exit(1)
patient_id = libreview_session.connections[0]["patientId"]
if display.supports_custom_fonts:
if font != "DEFAULT":
display.setFont(font, int(font_size))
else:
display.setDefaultFont()
while not SHOULD_EXIT:
try:
graph = libreview_session.getGraph(patient_id)
graph["connection"]["glucoseMeasurement"]
glucose_measurement = graph["connection"]["glucoseMeasurement"]
glucose_measurement_timestamp = glucose_measurement["Timestamp"]
glucose_measurement_timestamp_datetime = datetime.strptime(
glucose_measurement_timestamp,
"%m/%d/%Y %I:%M:%S %p"
)
except requests.exceptions.ConnectionError:
logging.error("failed to get details about patient, connection error")
except requests.exceptions.HTTPError:
logging.error("failed to get details about patient, API returned invalid response")
except requests.exceptions.RequestException:
logging.error("failed to get details about patient an unknown error occurred")
except KeyError:
logging.error("the data returned from the API was not in an expected format")
for _ in range(data_refresh_interval//screen_refresh_interval):
display.clearScreen()
computed_output_lines = []
for output_line in output_lines:
computed_output_lines.append(eval(f"f'{output_line}'"))
display.displayLines(computed_output_lines)
time.sleep(screen_refresh_interval)
logging.debug("exiting now...")
if __name__ == "__main__":
main()