You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
411 lines
13 KiB
Python
411 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""
|
|
glucose_monitor.py - A script to display glucose levels on a Raspberry Pi
|
|
|
|
Configurable with a config file
|
|
"""
|
|
|
|
|
|
import argparse
|
|
import configparser
|
|
import logging
|
|
import os
|
|
import signal
|
|
import inspect
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
|
|
from libreview import LibreViewSession
|
|
from screens import DebugScreen
|
|
from screens import Screen
|
|
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
|
|
# CONSTANTS
|
|
DEFAULT_DATA_REFRESH_INTERVAL = 5
|
|
DEFAULT_SCREEN_REFRESH_INTERVAL = 1
|
|
DEFAULT_FONT_SIZE = 9
|
|
DEFAULT_DEBUG_SCREEN_WIDTH = 128
|
|
DEFAULT_DEBUG_SCREEN_HEIGHT = 32
|
|
DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS = "1"
|
|
SUPPORTED_SCREENS = [
|
|
"debug",
|
|
"terminal",
|
|
"waveshare_2.23_ssd1305"
|
|
]
|
|
|
|
|
|
# GLOBAL VARIABLES
|
|
SHOULD_EXIT = False
|
|
|
|
|
|
def file_exists(file_path) -> bool:
|
|
"""
|
|
file_exists:
|
|
Checks if a file exists
|
|
|
|
Arguments:
|
|
file_path:
|
|
The file that we are looking for
|
|
|
|
Returns:
|
|
boolean:
|
|
Returns true if the file exists, and false if it doesn"t
|
|
"""
|
|
return os.path.isfile(file_path)
|
|
|
|
|
|
def directory_exists(directory_path):
|
|
"""
|
|
directory_exists:
|
|
Checks if a directory exists
|
|
|
|
Arguments:
|
|
directory_path:
|
|
The directory that we are looking for
|
|
|
|
Returns:
|
|
boolean:
|
|
Returns true if the file exists and is a directory, and false if it"s not
|
|
"""
|
|
return os.path.isdir(directory_path)
|
|
|
|
|
|
def ini_to_yaml(ini_file_path: str, yaml_file_path: str, lower_case: bool = False):
|
|
"""
|
|
ini_to_yaml:
|
|
Takes an ini file and produces a yaml file from it.
|
|
|
|
Arguments:
|
|
ini_file_path:
|
|
The path to the ini file that you want to convert to a yaml file
|
|
|
|
yaml_file_path:
|
|
The path to the yaml file that you wish to create
|
|
|
|
lower_case = False
|
|
Change all the sections and keys to lower case, if false leaves the case alone
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
config = configparser.ConfigParser()
|
|
config.read(ini_file_path)
|
|
|
|
d = {}
|
|
for section in config.sections():
|
|
if lower_case:
|
|
d[section.lower()] = {}
|
|
else:
|
|
d[section] = {}
|
|
for key, value in config[section].items():
|
|
if lower_case:
|
|
d[section.lower()][key.lower()] = value
|
|
else:
|
|
d[section][key] = value
|
|
|
|
with open(yaml_file_path, "w+") as file:
|
|
yaml.dump(d, file, default_flow_style=False)
|
|
|
|
|
|
def get_current_time(fmt: str = "%H:%M %d/%m/%Y") -> str:
|
|
"""
|
|
get_current_time:
|
|
Returns a string containing the date and/or time in the format specified in fmt
|
|
|
|
Arguments:
|
|
fmt:
|
|
The format of the date and/or time to be returned in the string
|
|
|
|
Returns:
|
|
Returns a string with the date and/or time in the format specified
|
|
"""
|
|
now = datetime.now()
|
|
|
|
return now.strftime(fmt)
|
|
|
|
|
|
def time_since(dt: datetime) -> str:
|
|
"""
|
|
time_since:
|
|
Returns a string with the difference between the datetime passed and today
|
|
|
|
Arguments:
|
|
dt:
|
|
A datetime class to get the time since
|
|
|
|
Returns:
|
|
A string in the with minutes if difference can be calculated in minutes otherwise will just return seconds
|
|
Will be in the format of "2m 30s" or "40s"
|
|
"""
|
|
now = datetime.now()
|
|
|
|
difference = now - dt
|
|
|
|
total_seconds = difference.total_seconds()
|
|
|
|
minutes = int(total_seconds // 60)
|
|
seconds = int(total_seconds % 60)
|
|
|
|
if minutes > 0:
|
|
return f"{minutes}m {seconds}s"
|
|
return f"{seconds}s"
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
global SHOULD_EXIT
|
|
if SHOULD_EXIT:
|
|
logging.info("already received exit signal exiting NOW")
|
|
sys.exit(1)
|
|
SHOULD_EXIT = True
|
|
logging.info(f"received {signal.Signals(signum).name} signal shutting down")
|
|
logging.debug("stack trace:")
|
|
for frame, filename, lineno, function, _, _ in inspect.getouterframes(frame):
|
|
logging.debug(f"file: {filename}, line_number: {lineno}, function: {function}")
|
|
|
|
|
|
def main():
|
|
global SHOULD_EXIT
|
|
|
|
logging.basicConfig(
|
|
stream=sys.stdout,
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog="glucose-monitor",
|
|
description="A program to display glucose levels and if you blood sugar is high"
|
|
)
|
|
|
|
parser.add_argument("-m", "--migrate", action="store_true")
|
|
parser.add_argument("-t", "--token")
|
|
parser.add_argument("-c", "--config")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.config is not None:
|
|
if not file_exists(args.config):
|
|
logging.fatal("config file location {args.config} does not exist")
|
|
sys.exit(1)
|
|
else:
|
|
config_file_location = args.config
|
|
else:
|
|
possible_config_file_locations = [
|
|
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.yaml",
|
|
"/etc/glucose-monitor/config.yaml",
|
|
"config.yaml"
|
|
]
|
|
config_file_location = ""
|
|
found_config_file = False
|
|
for possible_config_file_location in possible_config_file_locations:
|
|
if file_exists(possible_config_file_location):
|
|
config_file_location = possible_config_file_location
|
|
found_config_file = True
|
|
logging.info("config file found at " + config_file_location)
|
|
break
|
|
|
|
if not found_config_file:
|
|
old_config_file_locations = [
|
|
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini",
|
|
"/etc/glucose-monitor/config.ini",
|
|
"config.ini"
|
|
]
|
|
for old_config_file_location in old_config_file_locations:
|
|
if file_exists(old_config_file_location):
|
|
if not args.migrate:
|
|
logging.fatal("Sorry, ini files are no longer support, run glucose-monitor --migrate to move to yaml")
|
|
sys.exit(1)
|
|
else:
|
|
ini_to_yaml(old_config_file_location, old_config_file_location[:-4]+".yaml", lower_case=True)
|
|
config_file_location = old_config_file_location[:-4]+".yaml"
|
|
if config_file_location == "":
|
|
logging.fatal("could not find a config file")
|
|
sys.exit(1)
|
|
|
|
with open(config_file_location, "r") as config_file:
|
|
config = yaml.safe_load(config_file)
|
|
|
|
if "log" in config:
|
|
if "level" in config["log"]:
|
|
level = config["log"]["level"].lower()
|
|
if level == "debug":
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
elif level == "info":
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
elif level == "warning":
|
|
logging.getLogger().setLevel(logging.WARNING)
|
|
elif level == "error":
|
|
logging.getLogger().setLevel(logging.ERROR)
|
|
elif level == "critical":
|
|
logging.getLogger().setLevel(logging.CRITICAL)
|
|
else:
|
|
logging.error("log level specified in config file is not valid")
|
|
|
|
logging.debug("set log level to debug")
|
|
|
|
if args.token is None:
|
|
if "credentials" not in config:
|
|
# TODO: support credentials in environmental variables
|
|
logging.fatal("no credentials specified in config file")
|
|
sys.exit(1)
|
|
if "email" not in config["credentials"]:
|
|
# TODO: support credentials in environmental variables
|
|
logging.fatal("no email specified in config file")
|
|
sys.exit(1)
|
|
if "password" not in config["credentials"]:
|
|
# TODO: support credentials in environmental variables
|
|
logging.fatal("no password specified in config file")
|
|
sys.exit(1)
|
|
|
|
data_refresh_interval = DEFAULT_DATA_REFRESH_INTERVAL
|
|
screen_refresh_interval = DEFAULT_SCREEN_REFRESH_INTERVAL
|
|
font = "DEFAULT"
|
|
font_size = DEFAULT_FONT_SIZE
|
|
lines = [
|
|
"%T",
|
|
"GLUCOSE UNITS: %U"
|
|
]
|
|
if "general" in config:
|
|
general_config = config["general"]
|
|
if "data_refresh_interval" in general_config:
|
|
data_refresh_interval = int(config["general"]["data_refresh_interval"])
|
|
if "screen_refresh_interval" in general_config:
|
|
screen_refresh_interval = int(config["general"]["screen_refresh_interval"])
|
|
|
|
# TODO: Add format specifier to check if high and low
|
|
# if "upper_bound" in general_config:
|
|
# upper_bound = config["general"]["upper_bound"]
|
|
# if "lower_bound" in general_config:
|
|
# lower_bound = config["general"]["lower_bound"]
|
|
|
|
if "font" in general_config:
|
|
font = general_config["font"]
|
|
logging.debug("found font in config, checking that it exists")
|
|
if not file_exists(font):
|
|
logging.fatal(f"font does not exist at path {font}")
|
|
sys.exit(1)
|
|
else:
|
|
logging.debug(f"found font at {font}")
|
|
if "font_size" in general_config:
|
|
font_size = general_config["font_size"]
|
|
|
|
if "lines" in general_config:
|
|
lines = general_config["lines"]
|
|
|
|
output_lines = []
|
|
for line in lines:
|
|
temp = ""
|
|
if "%T" in line:
|
|
temp += line.replace("%T", '{get_current_time()}')
|
|
if "%U" in line:
|
|
temp += line.replace("%U", '{glucose_measurement["Value"]}')
|
|
if "%S" in line:
|
|
temp += line.replace("%S", '{time_since(glucose_measurement_timestamp_datetime)}')
|
|
output_lines.append(temp)
|
|
|
|
display = Screen()
|
|
if "screen" in config:
|
|
screen_config = config["screen"]
|
|
if "type" not in screen_config:
|
|
logging.fatal("no screen type in config file")
|
|
sys.exit(1)
|
|
|
|
screen_type = screen_config["type"].lower()
|
|
if screen_type not in SUPPORTED_SCREENS:
|
|
logging.fatal("sorry screen type not supported")
|
|
sys.exit(1)
|
|
|
|
if screen_type == "waveshare_2.23_ssd1305":
|
|
from screens.waveshare_233_ssd1305 import Waveshare_233_SSD1305
|
|
display = Waveshare_233_SSD1305()
|
|
elif screen_type == "debug":
|
|
width = DEFAULT_DEBUG_SCREEN_WIDTH
|
|
height = DEFAULT_DEBUG_SCREEN_HEIGHT
|
|
number_of_colour_bits = DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS
|
|
if "width" in screen_config:
|
|
width = int(screen_config["width"])
|
|
if "height" in screen_config:
|
|
height = int(screen_config["height"])
|
|
if "number_of_colour_bits" in screen_config:
|
|
number_of_colour_bits = screen_config["number_of_colour_bits"]
|
|
display = DebugScreen(width, height, number_of_colour_bits)
|
|
|
|
|
|
libreview_session = LibreViewSession()
|
|
if args.token is not None:
|
|
libreview_session.useToken(args.token)
|
|
else:
|
|
libreview_session.authenticate(config["credentials"]["email"], config["credentials"]["password"])
|
|
|
|
if libreview_session.authenticated and not SHOULD_EXIT:
|
|
logging.info("successfully authenticated with LibreView")
|
|
else:
|
|
logging.fatal("failed to authenticate with LibreView")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
libreview_session.getConnections()
|
|
except Exception as e:
|
|
logging.debug("got exception,", e)
|
|
logging.fatal("failed to get connections")
|
|
sys.exit(1)
|
|
|
|
if len(libreview_session.connections) > 1:
|
|
logging.fatal("currently glucose-monitor only supports accounts with one connection")
|
|
sys.exit(1)
|
|
if len(libreview_session.connections) < 1:
|
|
logging.fatal("to use glucose-monitor your libreview account must have at least one connection")
|
|
sys.exit(1)
|
|
|
|
patient_id = libreview_session.connections[0]["patientId"]
|
|
|
|
if display.supports_custom_fonts:
|
|
if font != "DEFAULT":
|
|
display.setFont(font, int(font_size))
|
|
else:
|
|
display.setDefaultFont()
|
|
|
|
while not SHOULD_EXIT:
|
|
try:
|
|
graph = libreview_session.getGraph(patient_id)
|
|
graph["connection"]["glucoseMeasurement"]
|
|
glucose_measurement = graph["connection"]["glucoseMeasurement"]
|
|
glucose_measurement_timestamp = glucose_measurement["Timestamp"]
|
|
glucose_measurement_timestamp_datetime = datetime.strptime(
|
|
glucose_measurement_timestamp,
|
|
"%m/%d/%Y %I:%M:%S %p"
|
|
)
|
|
except requests.exceptions.ConnectionError:
|
|
logging.error("failed to get details about patient, connection error")
|
|
except requests.exceptions.HTTPError:
|
|
logging.error("failed to get details about patient, API returned invalid response")
|
|
except requests.exceptions.RequestException:
|
|
logging.error("failed to get details about patient an unknown error occurred")
|
|
except KeyError:
|
|
logging.error("the data returned from the API was not in an expected format")
|
|
|
|
for _ in range(data_refresh_interval//screen_refresh_interval):
|
|
display.clearScreen()
|
|
computed_output_lines = []
|
|
for output_line in output_lines:
|
|
computed_output_lines.append(eval(f"f'{output_line}'"))
|
|
display.displayLines(computed_output_lines)
|
|
|
|
time.sleep(screen_refresh_interval)
|
|
|
|
logging.debug("exiting now...")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|