diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6ebd098 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +screens/__pycache__ +libreview/__pycache__ +screens/04B_08__.TTF diff --git a/04B_08__.TTF b/04B_08__.TTF deleted file mode 100644 index aadf20e..0000000 Binary files a/04B_08__.TTF and /dev/null differ diff --git a/libreview/__init__.py b/libreview/__init__.py new file mode 100644 index 0000000..aa78543 --- /dev/null +++ b/libreview/__init__.py @@ -0,0 +1,2 @@ +from .libreview import LibreViewSession + diff --git a/libreview/libreview.py b/libreview/libreview.py new file mode 100644 index 0000000..c28a495 --- /dev/null +++ b/libreview/libreview.py @@ -0,0 +1,114 @@ +import logging + + +import requests + + +class LibreViewSession: + def __init__(self): + self.authenticated = False + self.token = "" + + self.connections = [] + + def useToken(self, token): + url = "https://api.libreview.io/user" + + self.token = token + + headers = { + "version": "4.7", + "product": "llu.ios", + "Accept": "application/json", + "Authorization": "Bearer " + self.token + } + + response = requests.get(url, headers=headers) + + if response.status_code != 200: + logging.debug(f"got status code {response.status_code} when attempting to test token") + self.authenticated = False + else: + self.authenticated = True + + + def authenticate(self, email, password): + url = "https://api.libreview.io/llu/auth/login" + + if self.authenticated: + return + + headers = { + "version": "4.7", + "product": "llu.ios", + "Content-Type": "application/json", + "Accept": "application/json" + } + + payload = { + "email": email, + "password": password, + } + + response = requests.post(url, json=payload, headers=headers) + + if response.status_code == 200: + json = response.json() + if 'status' in json: + if json['status'] == 0 and 'data' in json: + data = json['data'] + auth_ticket = data['authTicket'] + self.token = auth_ticket['token'] + + self.authenticated = True + else: + self.authenticated = False + logging.debug(f"failed to authenticate with libreview, got status {json['status']}") + return + + def getConnections(self): + url = "https://api.libreview.io/llu/connections" + + headers = { + "version": "4.7", + "product": "llu.ios", + "Accept": "application/json", + "Authorization": "Bearer " + self.token + } + + response = requests.get(url, headers=headers) + + if response.status_code != 200: + logging.debug(f"got http response code {response.status_code} from libreview when attempting to get connections") + return [] + + json = response.json() + if 'data' in json: + self.connections = json['data'] + else: + print(response.content) + self.connections = [] + + def getGraph(self, patientId): + """ + getGraph gets the LibreView graph for the {patientID} + """ + url = f"https://api.libreview.io/llu/connections/{patientId}/graph" + + headers = { + "version": "4.7", + "product": "llu.ios", + "Accept": "application/json", + "Authorization": "Bearer " + self.token + } + + response = requests.get(url, headers=headers) + + json = response.json() + if 'data' in json: + return json['data'] + else: + return None + + + diff --git a/main.py b/main.py index b91712e..63f7b83 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 +import argparse import configparser import logging import os @@ -8,6 +9,12 @@ import signal import inspect import sys import time +from datetime import datetime + + +from libreview import LibreViewSession +from screens import DebugScreen +from screens import TerminalScreen import requests @@ -15,8 +22,13 @@ import requests # CONSTANTS DEFAULT_INTERVAL = 5 -DEFAULT_FONT = "04B_08__.TTF" +DEFAULT_FONT_SIZE = 9 +DEFAULT_DEBUG_SCREEN_WIDTH = 128 +DEFAULT_DEBUG_SCREEN_HEIGHT = 32 +DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS = '1' SUPPORTED_SCREENS = [ + "debug", + "terminal", "waveshare_2.23_ssd1305" ] @@ -25,95 +37,6 @@ SUPPORTED_SCREENS = [ SHOULD_EXIT = False -class LibreViewSession: - def __init__(self): - self.authenticated = False - self.token = "" - - self.connections = [] - - def authenticate(self, email, password): - url = "https://api.libreview.io/llu/auth/login" - - if self.authenticated: - return - - headers = { - "version": "4.7", - "product": "llu.ios", - "Content-Type": "application/json", - "Accept": "application/json" - } - - payload = { - "email": email, - "password": password, - } - - response = requests.post(url, json=payload, headers=headers) - - if response.status_code == 200: - json = response.json() - if 'status' in json: - if json['status'] == 0 and 'data' in json: - data = json['data'] - auth_ticket = data['authTicket'] - self.token = auth_ticket['token'] - - self.authenticated = True - else: - self.authenticated = False - logging.debug("failed to authenticate with LibreView") - return - - def acceptTerms(self): - # TODO: finish this - # url = "https://api.libreview.io/auth/continue/tou" - - print("not implemented yet, sorry") - - def getUser(self): - # TODO: finish this - # url = "https://api.libreview.io/user" - - print("not implemented yet, sorry") - - def getConnections(self): - url = "https://api.libreview.io/llu/connections" - - headers = { - "version": "4.7", - "product": "llu.ios", - "Accept": "application/json", - "Authorization": "Bearer " + self.token - } - - response = requests.get(url, headers=headers) - - json = response.json() - if 'data' in json: - self.connections = json['data'] - - def getGraph(self, patientId): - url = f"https://api.libreview.io/llu/connections/{patientId}/graph" - - headers = { - "version": "4.7", - "product": "llu.ios", - "Accept": "application/json", - "Authorization": "Bearer " + self.token - } - - response = requests.get(url, headers=headers) - - json = response.json() - if 'data' in json: - return json['data'] - - # TODO: determine what to do on error - return json['message'] # FIXME: this probably isn't correct - - def file_exists(file_path): return os.path.isfile(file_path) @@ -122,10 +45,6 @@ def directory_exists(directory_path): return os.path.isdir(directory_path) -def clear_terminal(): - print("\033c", end='') - - def signal_handler(signum, frame): global SHOULD_EXIT if SHOULD_EXIT: @@ -150,25 +69,41 @@ def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - possible_config_file_locations = [ - str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini", - "/etc/glucose-monitor/config.ini", - "config.ini" - ] - - config_file_location = "" - for possible_config_file_location in possible_config_file_locations: - if file_exists(possible_config_file_location): - config_file_location = possible_config_file_location - logging.info("config file found at " + config_file_location) - break - - if config_file_location == "": - logging.fatal("could not find a config file") - sys.exit(1) + parser = argparse.ArgumentParser( + prog='glucose-monitor', + description='A program to display glucose levels and if you blood sugar is high' + ) - config = configparser.ConfigParser() + parser.add_argument('-t', '--token') + parser.add_argument('-c', '--config') + + args = parser.parse_args() + if args.config is not None: + if not file_exists(args.config): + logging.fatal("config file location {args.config} does not exist") + sys.exit(1) + else: + config_file_location = args.config + else: + possible_config_file_locations = [ + str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini", + "/etc/glucose-monitor/config.ini", + "config.ini" + ] + + config_file_location = "" + for possible_config_file_location in possible_config_file_locations: + if file_exists(possible_config_file_location): + config_file_location = possible_config_file_location + logging.info("config file found at " + config_file_location) + break + + if config_file_location == "": + logging.fatal("could not find a config file") + sys.exit(1) + + config = configparser.ConfigParser() config.read(config_file_location) if 'LOG' in config: @@ -189,91 +124,77 @@ def main(): logging.debug("set log level to debug") - if 'CREDENTIALS' not in config: - # TODO: support credentials in environmental variables - logging.fatal("no credentials specified in config file") - sys.exit(1) - if 'EMAIL' not in config['CREDENTIALS']: - # TODO: support credentials in environmental variables - logging.fatal("no email specified in config file") - sys.exit(1) - if 'PASSWORD' not in config['CREDENTIALS']: - # TODO: support credentials in environmental variables - logging.fatal("no password specified in config file") - sys.exit(1) + if args.token is None: + if 'CREDENTIALS' not in config: + # TODO: support credentials in environmental variables + logging.fatal("no credentials specified in config file") + sys.exit(1) + if 'EMAIL' not in config['CREDENTIALS']: + # TODO: support credentials in environmental variables + logging.fatal("no email specified in config file") + sys.exit(1) + if 'PASSWORD' not in config['CREDENTIALS']: + # TODO: support credentials in environmental variables + logging.fatal("no password specified in config file") + sys.exit(1) - if 'GENERAL' in config and 'INTERVAL' in config['GENERAL']: - delay = int(config['GENERAL']['INTERVAL']) - else: - delay = DEFAULT_INTERVAL + delay = DEFAULT_INTERVAL + font = "DEFAULT" + font_size = DEFAULT_FONT_SIZE + if 'GENERAL' in config: + general_config = config['GENERAL'] + if 'INTERVAL' in general_config: + delay = int(config['GENERAL']['INTERVAL']) + + if 'UPPER_BOUND' in general_config: + upper_bound = config['GENERAL']['UPPER_BOUND'] + if 'LOWER_BOUND' in general_config: + lower_bound = config['GENERAL']['LOWER_BOUND'] + + if 'FONT' in general_config: + font = general_config['FONT'] + logging.debug("found font in config, checking that it exists") + if not file_exists(font): + logging.fatal(f"font does not exist at path {font}") + sys.exit(1) + else: + logging.debug(f"found font at {font}") + if 'FONT_SIZE' in general_config: + font_size = general_config['FONT_SIZE'] - show_on_screen = False - show_in_terminal = True + display = TerminalScreen() if 'SCREEN' in config: - if 'TYPE' not in config['SCREEN']: + screen_config = config['SCREEN'] + if 'TYPE' not in screen_config: logging.fatal("no screen type in config file") sys.exit(1) - if config['SCREEN']['TYPE'].lower() not in SUPPORTED_SCREENS: + screen_type = screen_config['TYPE'].lower() + if screen_type not in SUPPORTED_SCREENS: logging.fatal("sorry screen type not supported") sys.exit(1) - screen_type = config['SCREEN']['TYPE'].lower() - if screen_type == "waveshare_2.23_ssd1305": - show_on_screen = True - SSD1305_library_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'drive') - if directory_exists(SSD1305_library_directory): - sys.path.append(SSD1305_library_directory) - else: - logging.fatal("failed to import SSD1305 library") - sys.exit(1) - - from drive import SSD1305 - from PIL import Image, ImageDraw, ImageFont - - display = SSD1305.SSD1305() - - display.Init() - - width = display.width - height = display.height - image = Image.new('1', (width, height)) - - padding = 0 - top = padding - x = 0 - - draw = ImageDraw.Draw(image) - - draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer - - if 'FONT' in config['SCREEN']: - font = ImageFont.truetype(config['SCREEN']['FONT'], 8) - else: - font = ImageFont.truetype(DEFAULT_FONT, 8) - - def display_on_screen(buffer): - display.getbuffer(buffer) - display.ShowImage() - - def clear_display(): - buffer = Image.new('1', (width, height)) - draw = ImageDraw.Draw(image) - draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer - display.getbuffer(buffer) - display.ShowImage() - - logging.info("clearing display") - clear_display() - - if "DISPLAY_IN_TERMINAL" in config['SCREEN']: - if config['SCREEN']['DISPLAY_IN_TERMINAL'].lower() == "false": - show_in_terminal = False + from screens.waveshare_233_ssd1305 import Waveshare_233_SSD1305 + display = Waveshare_233_SSD1305() + elif screen_type == "debug": + width = DEFAULT_DEBUG_SCREEN_WIDTH + height = DEFAULT_DEBUG_SCREEN_HEIGHT + number_of_colour_bits = DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS + if 'WIDTH' in screen_config: + width = int(screen_config['WIDTH']) + if 'HEIGHT' in screen_config: + height = int(screen_config['HEIGHT']) + if 'NUMBER_OF_COLOUR_BITS' in screen_config: + number_of_colour_bits = screen_config['NUMBER_OF_COLOUR_BITS'] + display = DebugScreen(width, height, number_of_colour_bits) libreview_session = LibreViewSession() - libreview_session.authenticate(config['CREDENTIALS']['EMAIL'], config['CREDENTIALS']['PASSWORD']) + if args.token is not None: + libreview_session.useToken(args.token) + else: + libreview_session.authenticate(config['CREDENTIALS']['EMAIL'], config['CREDENTIALS']['PASSWORD']) if libreview_session.authenticated and not SHOULD_EXIT: logging.info("successfully authenticated with LibreView") @@ -283,32 +204,32 @@ def main(): try: libreview_session.getConnections() - except Exception: + except Exception as e: + logging.debug("got exception,", e) logging.fatal("failed to get connections") sys.exit(1) if len(libreview_session.connections) > 1: logging.fatal("currently glucose-monitor only supports accounts with one connection") + sys.exit(1) if len(libreview_session.connections) < 1: - logging.fatal("to use glucose-monitor your libreview account must have one connection") + logging.fatal("to use glucose-monitor your libreview account must have at least one connection") + sys.exit(1) patient_id = libreview_session.connections[0]['patientId'] - first_run = True - while not SHOULD_EXIT: - if not SHOULD_EXIT and not first_run: - time.sleep(delay) - elif first_run: - if show_in_terminal: - clear_terminal() - - if show_on_screen: - clear_display() + if display.supports_custom_fonts: + if font != "DEFAULT": + display.setFont(font, int(font_size)) + else: + display.setDefaultFont() - first_run = False + while not SHOULD_EXIT: + display.clearScreen() try: graph = libreview_session.getGraph(patient_id) + graph['connection']['glucoseMeasurement'] except requests.exceptions.ConnectionError: logging.error("failed to get details about patient, connection error") continue @@ -318,34 +239,21 @@ def main(): except requests.exceptions.RequestException: logging.error("failed to get details about patient an unknown error occurred") continue - - try: - graph['connection']['glucoseMeasurement'] except KeyError: logging.error("the data returned from the API was not in an expected format") continue glucoseMeasurement = graph['connection']['glucoseMeasurement'] - if show_in_terminal: - clear_terminal() - - print(f"Timestamp: {glucoseMeasurement['Timestamp']}\n") - print(f"Glucose units: {glucoseMeasurement['Value']}") - print(f"Is high: {glucoseMeasurement['isHigh']}") - print(f"Is low: {glucoseMeasurement['isLow']}") - print(f"Refresh rate: {delay} seconds") - - if show_on_screen: - clear_display() + now = datetime.now() + lines = [ + f"{now.strftime("%H:%M:%S")}", + str(glucoseMeasurement['Value']) + ] + display.displayLines(lines) - draw.text((x, top), glucoseMeasurement['Timestamp'], font=font, fill=255) - draw.text((x, top+8), "Glucose units: " + str(glucoseMeasurement['Value']), font=font, fill=255) - # Don't know why these two need to have more spaces to line up with the line ebovee - draw.text((x, top+16), "Is high: " + str(glucoseMeasurement['isHigh']), font=font, fill=255) - draw.text((x, top+24), "Is low: " + str(glucoseMeasurement['isLow']), font=font, fill=255) + time.sleep(delay) - display_on_screen(image) logging.debug("exiting now...") diff --git a/screens/__init__.py b/screens/__init__.py new file mode 100644 index 0000000..093751d --- /dev/null +++ b/screens/__init__.py @@ -0,0 +1,3 @@ +from .terminal_screen import TerminalScreen +from .debug_screens import DebugScreen +from .waveshare_233_ssd1305 import Waveshare_233_SSD1305 diff --git a/screens/debug_screens.py b/screens/debug_screens.py new file mode 100644 index 0000000..6352eb8 --- /dev/null +++ b/screens/debug_screens.py @@ -0,0 +1,65 @@ +import os +import logging +from typing import List + + +import psutil +from PIL import Image, ImageDraw, ImageFont + + +from .screen import Screen + + +class DebugScreen(Screen): + def __init__(self, width: int, height: int, number_of_colour_bits: str): + self.type = "Debug Screen" + + self.supports_custom_fonts = True + + self.width = width + self.height = height + self.image = Image.new(number_of_colour_bits, (self.width, self.height)) # Create a new image with 1 bit colour + self.draw = ImageDraw.Draw(self.image) + + def setDefaultFont(self): + logging.debug("setting font to default font") + ImageFont.load_default() + + def setFont(self, font: str, size: int): + logging.debug(f"setting font to {font} with size, {size}") + self.font = font + self.font_size = size + ImageFont.truetype(font, size) + + def clearBuffer(self): + self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) + + def displayOnScreen(self): + self.image.show() + + def displayLines(self, lines: List[str], x_padding: int = 0, y_padding: int = 4): + # displays text on the screen, the line that is first in the list is displayed on the top + self.clearBuffer() + self.clearScreen() + + x_start = 0 + y_start = 0 + for i, line in enumerate(lines): + self.draw.text((x_start+(i*x_padding), y_start+int(i*self.font_size)+(i*y_padding)), line, fill=255) + self.displayOnScreen() + + def clearScreen(self): + xdg_open_pid = -1 + for p in psutil.process_iter(): + if p.ppid() == os.getpid() and p.name() == "xdg-open": + logging.debug(f"found xdg-open process with a process id {p.pid}") + xdg_open_pid = p.pid + elif xdg_open_pid != -1 and p.ppid() == xdg_open_pid: + logging.debug(f"killing process, {p.name()} {p.cmdline()}") + p.terminate() + p.kill() + + self.clearBuffer() + draw = ImageDraw.Draw(self.image) + # black rectangle to clear screen, without clearing buffer + draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) diff --git a/drive/SSD1305.py b/screens/drive/SSD1305.py similarity index 100% rename from drive/SSD1305.py rename to screens/drive/SSD1305.py diff --git a/drive/config.py b/screens/drive/config.py similarity index 100% rename from drive/config.py rename to screens/drive/config.py diff --git a/screens/screen.py b/screens/screen.py new file mode 100644 index 0000000..5e2ae6b --- /dev/null +++ b/screens/screen.py @@ -0,0 +1,14 @@ +class Screen: + def __init__(self): + self.type = None + + self.supports_custom_fonts = False + + def getScreenType(self): + return self.type + + def clearScreen(self): + self.__clearScreen() + + def __clearScreen(self): + raise NotImplementedError("Subclass must implement this method") diff --git a/screens/terminal_screen.py b/screens/terminal_screen.py new file mode 100644 index 0000000..1794d07 --- /dev/null +++ b/screens/terminal_screen.py @@ -0,0 +1,28 @@ +from typing import List +import os + + +from .screen import Screen + + +class TerminalScreen(Screen): + def __init__(self): + self.type = "Terminal Screen" + + self.width, self.height = os.get_terminal_size() + + def displayLines(self, lines: List[str], x_padding: int = 0, y_padding: int = 4): + # displays text on the screen + self.clearScreen() + + y = 0 + x = 0 + for i, line in enumerate(lines): + # TODO: Calculate x and y before printing to check if we are going to go off the terminal + print('\n' * int(y+(y_padding*i)), end='') + print(' ' * int(x+(x_padding*i)), end='') + print(line, end='') + + + def clearScreen(self): + print("\033c", end='') diff --git a/screens/waveshare_233_ssd1305.py b/screens/waveshare_233_ssd1305.py new file mode 100644 index 0000000..55450f5 --- /dev/null +++ b/screens/waveshare_233_ssd1305.py @@ -0,0 +1,56 @@ +from typing import List + + +from PIL import Image, ImageDraw, ImageFont + + +from .screen import Screen + + +class Waveshare_233_SSD1305(Screen): + def __init__(self): + from drive import SSD1305 + + self.type = "Waveshare 2.33 SSD1305 OLED" + + self.supports_custom_fonts = True + + self.ssd1305 = SSD1305.SSD1305() + self.ssd1305.Init() + + self.width = self.ssd1305.width + self.height = self.ssd1305.height + self.image = Image.new('1', (self.width, self.height)) # Create new image with 1 bit colour + self.draw = ImageDraw.Draw(self.image) + + def setDefaultFont(self): + ImageFont.load_default() + + def setFont(self, font, size): + self.font = font + self.font_size = size + ImageFont.truetype(font, size) + + def clearBuffer(self): + self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) + + def displayOnScreen(self): + self.ssd1305.getbuffer(self.image) + self.ssd1305.ShowImage() + + def displayLines(self, lines: List[str]): + # displays text on the screen, the line that is first in the list is displayed on the top + y = 0 + x = 0 + for i, line in lines: + self.draw.text((x, y+(i*self.font_size)), line, font=self.font, fill=255) + self.displayOnScreen() + + def clearScreen(self): + # Used by parent class Screen + buffer = Image.new('1', (self.width, self.height)) + draw = ImageDraw.Draw(self.image) + # black rectangle to clear screen, without clearing buffer + draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) + self.ssd1305.getbuffer(buffer) + self.ssd1305.ShowImage()