diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6ebd098 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +screens/__pycache__ +libreview/__pycache__ +screens/04B_08__.TTF diff --git a/04B_08__.TTF b/04B_08__.TTF deleted file mode 100644 index aadf20e..0000000 Binary files a/04B_08__.TTF and /dev/null differ diff --git a/glucose-monitor.py b/glucose-monitor.py new file mode 100644 index 0000000..0aca2fe --- /dev/null +++ b/glucose-monitor.py @@ -0,0 +1,410 @@ +#!/usr/bin/env python3 + +""" +glucose_monitor.py - A script to display glucose levels on a Raspberry Pi + +Configurable with a config file +""" + + +import argparse +import configparser +import logging +import os +import signal +import inspect +import sys +import time +from datetime import datetime + + +from libreview import LibreViewSession +from screens import DebugScreen +from screens import Screen + + +import requests +import yaml + + +# CONSTANTS +DEFAULT_DATA_REFRESH_INTERVAL = 5 +DEFAULT_SCREEN_REFRESH_INTERVAL = 1 +DEFAULT_FONT_SIZE = 9 +DEFAULT_DEBUG_SCREEN_WIDTH = 128 +DEFAULT_DEBUG_SCREEN_HEIGHT = 32 +DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS = "1" +SUPPORTED_SCREENS = [ + "debug", + "terminal", + "waveshare_2.23_ssd1305" +] + + +# GLOBAL VARIABLES +SHOULD_EXIT = False + + +def file_exists(file_path) -> bool: + """ + file_exists: + Checks if a file exists + + Arguments: + file_path: + The file that we are looking for + + Returns: + boolean: + Returns true if the file exists, and false if it doesn"t + """ + return os.path.isfile(file_path) + + +def directory_exists(directory_path): + """ + directory_exists: + Checks if a directory exists + + Arguments: + directory_path: + The directory that we are looking for + + Returns: + boolean: + Returns true if the file exists and is a directory, and false if it"s not + """ + return os.path.isdir(directory_path) + + +def ini_to_yaml(ini_file_path: str, yaml_file_path: str, lower_case: bool = False): + """ + ini_to_yaml: + Takes an ini file and produces a yaml file from it. + + Arguments: + ini_file_path: + The path to the ini file that you want to convert to a yaml file + + yaml_file_path: + The path to the yaml file that you wish to create + + lower_case = False + Change all the sections and keys to lower case, if false leaves the case alone + + Returns: + None + """ + config = configparser.ConfigParser() + config.read(ini_file_path) + + d = {} + for section in config.sections(): + if lower_case: + d[section.lower()] = {} + else: + d[section] = {} + for key, value in config[section].items(): + if lower_case: + d[section.lower()][key.lower()] = value + else: + d[section][key] = value + + with open(yaml_file_path, "w+") as file: + yaml.dump(d, file, default_flow_style=False) + + +def get_current_time(fmt: str = "%H:%M %d/%m/%Y") -> str: + """ + get_current_time: + Returns a string containing the date and/or time in the format specified in fmt + + Arguments: + fmt: + The format of the date and/or time to be returned in the string + + Returns: + Returns a string with the date and/or time in the format specified + """ + now = datetime.now() + + return now.strftime(fmt) + + +def time_since(dt: datetime) -> str: + """ + time_since: + Returns a string with the difference between the datetime passed and today + + Arguments: + dt: + A datetime class to get the time since + + Returns: + A string in the with minutes if difference can be calculated in minutes otherwise will just return seconds + Will be in the format of "2m 30s" or "40s" + """ + now = datetime.now() + + difference = now - dt + + total_seconds = difference.total_seconds() + + minutes = int(total_seconds // 60) + seconds = int(total_seconds % 60) + + if minutes > 0: + return f"{minutes}m {seconds}s" + return f"{seconds}s" + + +def signal_handler(signum, frame): + global SHOULD_EXIT + if SHOULD_EXIT: + logging.info("already received exit signal exiting NOW") + sys.exit(1) + SHOULD_EXIT = True + logging.info(f"received {signal.Signals(signum).name} signal shutting down") + logging.debug("stack trace:") + for frame, filename, lineno, function, _, _ in inspect.getouterframes(frame): + logging.debug(f"file: {filename}, line_number: {lineno}, function: {function}") + + +def main(): + global SHOULD_EXIT + + logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" + ) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + parser = argparse.ArgumentParser( + prog="glucose-monitor", + description="A program to display glucose levels and if you blood sugar is high" + ) + + parser.add_argument("-m", "--migrate", action="store_true") + parser.add_argument("-t", "--token") + parser.add_argument("-c", "--config") + + args = parser.parse_args() + + if args.config is not None: + if not file_exists(args.config): + logging.fatal("config file location {args.config} does not exist") + sys.exit(1) + else: + config_file_location = args.config + else: + possible_config_file_locations = [ + str(os.getenv("HOME")) + "/.config/glucose-monitor/config.yaml", + "/etc/glucose-monitor/config.yaml", + "config.yaml" + ] + config_file_location = "" + found_config_file = False + for possible_config_file_location in possible_config_file_locations: + if file_exists(possible_config_file_location): + config_file_location = possible_config_file_location + found_config_file = True + logging.info("config file found at " + config_file_location) + break + + if not found_config_file: + old_config_file_locations = [ + str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini", + "/etc/glucose-monitor/config.ini", + "config.ini" + ] + for old_config_file_location in old_config_file_locations: + if file_exists(old_config_file_location): + if not args.migrate: + logging.fatal("Sorry, ini files are no longer support, run glucose-monitor --migrate to move to yaml") + sys.exit(1) + else: + ini_to_yaml(old_config_file_location, old_config_file_location[:-4]+".yaml", lower_case=True) + config_file_location = old_config_file_location[:-4]+".yaml" + if config_file_location == "": + logging.fatal("could not find a config file") + sys.exit(1) + + with open(config_file_location, "r") as config_file: + config = yaml.safe_load(config_file) + + if "log" in config: + if "level" in config["log"]: + level = config["log"]["level"].lower() + if level == "debug": + logging.getLogger().setLevel(logging.DEBUG) + elif level == "info": + logging.getLogger().setLevel(logging.INFO) + elif level == "warning": + logging.getLogger().setLevel(logging.WARNING) + elif level == "error": + logging.getLogger().setLevel(logging.ERROR) + elif level == "critical": + logging.getLogger().setLevel(logging.CRITICAL) + else: + logging.error("log level specified in config file is not valid") + + logging.debug("set log level to debug") + + if args.token is None: + if "credentials" not in config: + # TODO: support credentials in environmental variables + logging.fatal("no credentials specified in config file") + sys.exit(1) + if "email" not in config["credentials"]: + # TODO: support credentials in environmental variables + logging.fatal("no email specified in config file") + sys.exit(1) + if "password" not in config["credentials"]: + # TODO: support credentials in environmental variables + logging.fatal("no password specified in config file") + sys.exit(1) + + data_refresh_interval = DEFAULT_DATA_REFRESH_INTERVAL + screen_refresh_interval = DEFAULT_SCREEN_REFRESH_INTERVAL + font = "DEFAULT" + font_size = DEFAULT_FONT_SIZE + lines = [ + "%T", + "GLUCOSE UNITS: %U" + ] + if "general" in config: + general_config = config["general"] + if "data_refresh_interval" in general_config: + data_refresh_interval = int(config["general"]["data_refresh_interval"]) + if "screen_refresh_interval" in general_config: + screen_refresh_interval = int(config["general"]["screen_refresh_interval"]) + + # TODO: Add format specifier to check if high and low + # if "upper_bound" in general_config: + # upper_bound = config["general"]["upper_bound"] + # if "lower_bound" in general_config: + # lower_bound = config["general"]["lower_bound"] + + if "font" in general_config: + font = general_config["font"] + logging.debug("found font in config, checking that it exists") + if not file_exists(font): + logging.fatal(f"font does not exist at path {font}") + sys.exit(1) + else: + logging.debug(f"found font at {font}") + if "font_size" in general_config: + font_size = general_config["font_size"] + + if "lines" in general_config: + lines = general_config["lines"] + + output_lines = [] + for line in lines: + temp = "" + if "%T" in line: + temp += line.replace("%T", '{get_current_time()}') + if "%U" in line: + temp += line.replace("%U", '{glucose_measurement["Value"]}') + if "%S" in line: + temp += line.replace("%S", '{time_since(glucose_measurement_timestamp_datetime)}') + output_lines.append(temp) + + display = Screen() + if "screen" in config: + screen_config = config["screen"] + if "type" not in screen_config: + logging.fatal("no screen type in config file") + sys.exit(1) + + screen_type = screen_config["type"].lower() + if screen_type not in SUPPORTED_SCREENS: + logging.fatal("sorry screen type not supported") + sys.exit(1) + + if screen_type == "waveshare_2.23_ssd1305": + from screens.waveshare_233_ssd1305 import Waveshare_233_SSD1305 + display = Waveshare_233_SSD1305() + elif screen_type == "debug": + width = DEFAULT_DEBUG_SCREEN_WIDTH + height = DEFAULT_DEBUG_SCREEN_HEIGHT + number_of_colour_bits = DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS + if "width" in screen_config: + width = int(screen_config["width"]) + if "height" in screen_config: + height = int(screen_config["height"]) + if "number_of_colour_bits" in screen_config: + number_of_colour_bits = screen_config["number_of_colour_bits"] + display = DebugScreen(width, height, number_of_colour_bits) + + + libreview_session = LibreViewSession() + if args.token is not None: + libreview_session.useToken(args.token) + else: + libreview_session.authenticate(config["credentials"]["email"], config["credentials"]["password"]) + + if libreview_session.authenticated and not SHOULD_EXIT: + logging.info("successfully authenticated with LibreView") + else: + logging.fatal("failed to authenticate with LibreView") + sys.exit(1) + + try: + libreview_session.getConnections() + except Exception as e: + logging.debug("got exception,", e) + logging.fatal("failed to get connections") + sys.exit(1) + + if len(libreview_session.connections) > 1: + logging.fatal("currently glucose-monitor only supports accounts with one connection") + sys.exit(1) + if len(libreview_session.connections) < 1: + logging.fatal("to use glucose-monitor your libreview account must have at least one connection") + sys.exit(1) + + patient_id = libreview_session.connections[0]["patientId"] + + if display.supports_custom_fonts: + if font != "DEFAULT": + display.setFont(font, int(font_size)) + else: + display.setDefaultFont() + + while not SHOULD_EXIT: + try: + graph = libreview_session.getGraph(patient_id) + graph["connection"]["glucoseMeasurement"] + glucose_measurement = graph["connection"]["glucoseMeasurement"] + glucose_measurement_timestamp = glucose_measurement["Timestamp"] + glucose_measurement_timestamp_datetime = datetime.strptime( + glucose_measurement_timestamp, + "%m/%d/%Y %I:%M:%S %p" + ) + except requests.exceptions.ConnectionError: + logging.error("failed to get details about patient, connection error") + except requests.exceptions.HTTPError: + logging.error("failed to get details about patient, API returned invalid response") + except requests.exceptions.RequestException: + logging.error("failed to get details about patient an unknown error occurred") + except KeyError: + logging.error("the data returned from the API was not in an expected format") + + for _ in range(data_refresh_interval//screen_refresh_interval): + display.clearScreen() + computed_output_lines = [] + for output_line in output_lines: + computed_output_lines.append(eval(f"f'{output_line}'")) + display.displayLines(computed_output_lines) + + time.sleep(screen_refresh_interval) + + logging.debug("exiting now...") + + +if __name__ == "__main__": + main() diff --git a/libreview/__init__.py b/libreview/__init__.py new file mode 100644 index 0000000..aa78543 --- /dev/null +++ b/libreview/__init__.py @@ -0,0 +1,2 @@ +from .libreview import LibreViewSession + diff --git a/libreview/libreview.py b/libreview/libreview.py new file mode 100644 index 0000000..29db203 --- /dev/null +++ b/libreview/libreview.py @@ -0,0 +1,115 @@ +import logging +from typing import Dict + + +import requests + + +class LibreViewSession: + def __init__(self): + self.authenticated = False + self.token = "" + + self.connections = [] + + def useToken(self, token): + url = "https://api.libreview.io/user" + + self.token = token + + headers = { + "version": "4.7", + "product": "llu.ios", + "Accept": "application/json", + "Authorization": "Bearer " + self.token + } + + response = requests.get(url, headers=headers) + + if response.status_code != 200: + logging.debug(f"got status code {response.status_code} when attempting to test token") + self.authenticated = False + else: + self.authenticated = True + + + def authenticate(self, email, password): + url = "https://api.libreview.io/llu/auth/login" + + if self.authenticated: + return + + headers = { + "version": "4.7", + "product": "llu.ios", + "Content-Type": "application/json", + "Accept": "application/json" + } + + payload = { + "email": email, + "password": password, + } + + response = requests.post(url, json=payload, headers=headers) + + if response.status_code == 200: + json = response.json() + if 'status' in json: + if json['status'] == 0 and 'data' in json: + data = json['data'] + auth_ticket = data['authTicket'] + self.token = auth_ticket['token'] + + self.authenticated = True + else: + self.authenticated = False + logging.debug(f"failed to authenticate with libreview, got status {json['status']}") + return + + def getConnections(self): + url = "https://api.libreview.io/llu/connections" + + headers = { + "version": "4.7", + "product": "llu.ios", + "Accept": "application/json", + "Authorization": "Bearer " + self.token + } + + response = requests.get(url, headers=headers) + + if response.status_code != 200: + logging.debug(f"got http response code {response.status_code} from libreview when attempting to get connections") + return [] + + json = response.json() + if 'data' in json: + self.connections = json['data'] + else: + print(response.content) + self.connections = [] + + def getGraph(self, patientId) -> Dict[str, Dict]: + """ + getGraph gets the LibreView graph for the {patientID} + """ + url = f"https://api.libreview.io/llu/connections/{patientId}/graph" + + headers = { + "version": "4.7", + "product": "llu.ios", + "Accept": "application/json", + "Authorization": "Bearer " + self.token + } + + response = requests.get(url, headers=headers) + + json = response.json() + if 'data' in json: + return json['data'] + else: + return {} + + + diff --git a/main.py b/main.py deleted file mode 100644 index b91712e..0000000 --- a/main.py +++ /dev/null @@ -1,354 +0,0 @@ -#!/usr/bin/env python3 - - -import configparser -import logging -import os -import signal -import inspect -import sys -import time - - -import requests - - -# CONSTANTS -DEFAULT_INTERVAL = 5 -DEFAULT_FONT = "04B_08__.TTF" -SUPPORTED_SCREENS = [ - "waveshare_2.23_ssd1305" -] - - -# GLOBAL VARIABLES -SHOULD_EXIT = False - - -class LibreViewSession: - def __init__(self): - self.authenticated = False - self.token = "" - - self.connections = [] - - def authenticate(self, email, password): - url = "https://api.libreview.io/llu/auth/login" - - if self.authenticated: - return - - headers = { - "version": "4.7", - "product": "llu.ios", - "Content-Type": "application/json", - "Accept": "application/json" - } - - payload = { - "email": email, - "password": password, - } - - response = requests.post(url, json=payload, headers=headers) - - if response.status_code == 200: - json = response.json() - if 'status' in json: - if json['status'] == 0 and 'data' in json: - data = json['data'] - auth_ticket = data['authTicket'] - self.token = auth_ticket['token'] - - self.authenticated = True - else: - self.authenticated = False - logging.debug("failed to authenticate with LibreView") - return - - def acceptTerms(self): - # TODO: finish this - # url = "https://api.libreview.io/auth/continue/tou" - - print("not implemented yet, sorry") - - def getUser(self): - # TODO: finish this - # url = "https://api.libreview.io/user" - - print("not implemented yet, sorry") - - def getConnections(self): - url = "https://api.libreview.io/llu/connections" - - headers = { - "version": "4.7", - "product": "llu.ios", - "Accept": "application/json", - "Authorization": "Bearer " + self.token - } - - response = requests.get(url, headers=headers) - - json = response.json() - if 'data' in json: - self.connections = json['data'] - - def getGraph(self, patientId): - url = f"https://api.libreview.io/llu/connections/{patientId}/graph" - - headers = { - "version": "4.7", - "product": "llu.ios", - "Accept": "application/json", - "Authorization": "Bearer " + self.token - } - - response = requests.get(url, headers=headers) - - json = response.json() - if 'data' in json: - return json['data'] - - # TODO: determine what to do on error - return json['message'] # FIXME: this probably isn't correct - - -def file_exists(file_path): - return os.path.isfile(file_path) - - -def directory_exists(directory_path): - return os.path.isdir(directory_path) - - -def clear_terminal(): - print("\033c", end='') - - -def signal_handler(signum, frame): - global SHOULD_EXIT - if SHOULD_EXIT: - logging.info("already received exit signal exiting NOW") - sys.exit(1) - SHOULD_EXIT = True - logging.info(f"received {signal.Signals(signum).name} signal shutting down") - logging.debug("stack trace:") - for frame, filename, lineno, function, _, _ in inspect.getouterframes(frame): - logging.debug(f"file: {filename}, line_number: {lineno}, function: {function}") - - -def main(): - global SHOULD_EXIT - - logging.basicConfig( - stream=sys.stdout, - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s' - ) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - possible_config_file_locations = [ - str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini", - "/etc/glucose-monitor/config.ini", - "config.ini" - ] - - config_file_location = "" - for possible_config_file_location in possible_config_file_locations: - if file_exists(possible_config_file_location): - config_file_location = possible_config_file_location - logging.info("config file found at " + config_file_location) - break - - if config_file_location == "": - logging.fatal("could not find a config file") - sys.exit(1) - - config = configparser.ConfigParser() - - config.read(config_file_location) - - if 'LOG' in config: - if 'LEVEL' in config['LOG']: - level = config['LOG']['LEVEL'].lower() - if level == "debug": - logging.getLogger().setLevel(logging.DEBUG) - elif level == "info": - logging.getLogger().setLevel(logging.INFO) - elif level == "warning": - logging.getLogger().setLevel(logging.WARNING) - elif level == "error": - logging.getLogger().setLevel(logging.ERROR) - elif level == "critical": - logging.getLogger().setLevel(logging.CRITICAL) - else: - logging.error("log level specified in config file is not valid") - - logging.debug("set log level to debug") - - if 'CREDENTIALS' not in config: - # TODO: support credentials in environmental variables - logging.fatal("no credentials specified in config file") - sys.exit(1) - if 'EMAIL' not in config['CREDENTIALS']: - # TODO: support credentials in environmental variables - logging.fatal("no email specified in config file") - sys.exit(1) - if 'PASSWORD' not in config['CREDENTIALS']: - # TODO: support credentials in environmental variables - logging.fatal("no password specified in config file") - sys.exit(1) - - if 'GENERAL' in config and 'INTERVAL' in config['GENERAL']: - delay = int(config['GENERAL']['INTERVAL']) - else: - delay = DEFAULT_INTERVAL - - show_on_screen = False - show_in_terminal = True - if 'SCREEN' in config: - if 'TYPE' not in config['SCREEN']: - logging.fatal("no screen type in config file") - sys.exit(1) - - if config['SCREEN']['TYPE'].lower() not in SUPPORTED_SCREENS: - logging.fatal("sorry screen type not supported") - sys.exit(1) - - screen_type = config['SCREEN']['TYPE'].lower() - - if screen_type == "waveshare_2.23_ssd1305": - show_on_screen = True - SSD1305_library_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'drive') - if directory_exists(SSD1305_library_directory): - sys.path.append(SSD1305_library_directory) - else: - logging.fatal("failed to import SSD1305 library") - sys.exit(1) - - from drive import SSD1305 - from PIL import Image, ImageDraw, ImageFont - - display = SSD1305.SSD1305() - - display.Init() - - width = display.width - height = display.height - image = Image.new('1', (width, height)) - - padding = 0 - top = padding - x = 0 - - draw = ImageDraw.Draw(image) - - draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer - - if 'FONT' in config['SCREEN']: - font = ImageFont.truetype(config['SCREEN']['FONT'], 8) - else: - font = ImageFont.truetype(DEFAULT_FONT, 8) - - def display_on_screen(buffer): - display.getbuffer(buffer) - display.ShowImage() - - def clear_display(): - buffer = Image.new('1', (width, height)) - draw = ImageDraw.Draw(image) - draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer - display.getbuffer(buffer) - display.ShowImage() - - logging.info("clearing display") - clear_display() - - if "DISPLAY_IN_TERMINAL" in config['SCREEN']: - if config['SCREEN']['DISPLAY_IN_TERMINAL'].lower() == "false": - show_in_terminal = False - - - libreview_session = LibreViewSession() - libreview_session.authenticate(config['CREDENTIALS']['EMAIL'], config['CREDENTIALS']['PASSWORD']) - - if libreview_session.authenticated and not SHOULD_EXIT: - logging.info("successfully authenticated with LibreView") - else: - logging.fatal("failed to authenticate with LibreView") - sys.exit(1) - - try: - libreview_session.getConnections() - except Exception: - logging.fatal("failed to get connections") - sys.exit(1) - - if len(libreview_session.connections) > 1: - logging.fatal("currently glucose-monitor only supports accounts with one connection") - if len(libreview_session.connections) < 1: - logging.fatal("to use glucose-monitor your libreview account must have one connection") - - patient_id = libreview_session.connections[0]['patientId'] - - first_run = True - while not SHOULD_EXIT: - if not SHOULD_EXIT and not first_run: - time.sleep(delay) - elif first_run: - if show_in_terminal: - clear_terminal() - - if show_on_screen: - clear_display() - - first_run = False - - try: - graph = libreview_session.getGraph(patient_id) - except requests.exceptions.ConnectionError: - logging.error("failed to get details about patient, connection error") - continue - except requests.exceptions.HTTPError: - logging.error("failed to get details about patient, API returned invalid response") - continue - except requests.exceptions.RequestException: - logging.error("failed to get details about patient an unknown error occurred") - continue - - try: - graph['connection']['glucoseMeasurement'] - except KeyError: - logging.error("the data returned from the API was not in an expected format") - continue - - glucoseMeasurement = graph['connection']['glucoseMeasurement'] - - if show_in_terminal: - clear_terminal() - - print(f"Timestamp: {glucoseMeasurement['Timestamp']}\n") - print(f"Glucose units: {glucoseMeasurement['Value']}") - print(f"Is high: {glucoseMeasurement['isHigh']}") - print(f"Is low: {glucoseMeasurement['isLow']}") - print(f"Refresh rate: {delay} seconds") - - if show_on_screen: - clear_display() - - draw.text((x, top), glucoseMeasurement['Timestamp'], font=font, fill=255) - draw.text((x, top+8), "Glucose units: " + str(glucoseMeasurement['Value']), font=font, fill=255) - # Don't know why these two need to have more spaces to line up with the line ebovee - draw.text((x, top+16), "Is high: " + str(glucoseMeasurement['isHigh']), font=font, fill=255) - draw.text((x, top+24), "Is low: " + str(glucoseMeasurement['isLow']), font=font, fill=255) - - display_on_screen(image) - - logging.debug("exiting now...") - - -if __name__ == "__main__": - main() diff --git a/screens/__init__.py b/screens/__init__.py new file mode 100644 index 0000000..4c455b6 --- /dev/null +++ b/screens/__init__.py @@ -0,0 +1,4 @@ +from .screen import Screen +from .terminal_screen import TerminalScreen +from .debug_screens import DebugScreen +from .waveshare_233_ssd1305 import Waveshare_233_SSD1305 diff --git a/screens/debug_screens.py b/screens/debug_screens.py new file mode 100644 index 0000000..ce10481 --- /dev/null +++ b/screens/debug_screens.py @@ -0,0 +1,84 @@ +from typing import List +import logging +import os + + +import psutil +from PIL import Image, ImageDraw, ImageFont + + +from .screen import Screen + + +class DebugScreen(Screen): + def __init__(self, width: int, height: int, number_of_colour_bits: str): + self.type = "Debug Screen" + + self.supports_custom_fonts = True + + self.width = width + self.height = height + self.image = Image.new(number_of_colour_bits, (self.width, self.height)) # Create a new image with 1 bit colour + self.draw = ImageDraw.Draw(self.image) + + def setDefaultFont(self): + logging.debug("setting font to default font") + self.font = ImageFont.load_default() + + def setFont(self, font: str, size: int): + logging.debug(f"setting font to {font} with size, {size}") + self.font_size = size + self.font = ImageFont.truetype(font, size) + + def clearBuffer(self): + self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) + + def displayOnScreen(self): + self.image.show() + + def displayLines(self, lines: List[str], x_margin: int = 2): + # displays text on the screen, the line that is first in the list is displayed on the top + self.clearBuffer() + self.clearScreen() + + x_start = x_margin + y_start = 0 + x_padding = 0 + y_padding = 0 + for i, line in enumerate(lines): + self.draw.text( + ( + x_start+(i*x_padding), + y_start+y_padding + ), + line, + font=self.font, + fill=255 + ) + bounding_box = self.draw.textbbox( + ( + (x_start+x_padding), + y_start+y_padding, + ), + line, + font=self.font, + ) + bottom = bounding_box[3] + y_padding = bottom + self.displayOnScreen() + + def clearScreen(self): + xdg_open_pid = -1 + for p in psutil.process_iter(): + if p.ppid() == os.getpid() and p.name() == "xdg-open": + logging.debug(f"found xdg-open process with a process id {p.pid}") + xdg_open_pid = p.pid + elif xdg_open_pid != -1 and p.ppid() == xdg_open_pid: + logging.debug(f"killing process, {p.name()} {p.cmdline()}") + p.terminate() + p.kill() + + self.clearBuffer() + draw = ImageDraw.Draw(self.image) + # black rectangle to clear screen, without clearing buffer + draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) diff --git a/drive/SSD1305.py b/screens/drive/SSD1305.py similarity index 100% rename from drive/SSD1305.py rename to screens/drive/SSD1305.py diff --git a/drive/config.py b/screens/drive/config.py similarity index 100% rename from drive/config.py rename to screens/drive/config.py diff --git a/screens/screen.py b/screens/screen.py new file mode 100644 index 0000000..a176e67 --- /dev/null +++ b/screens/screen.py @@ -0,0 +1,25 @@ +from typing import List +import logging + + +class Screen: + def __init__(self): + self.type = None + + self.supports_custom_fonts = False + + def getScreenType(self): + return self.type + + def clearScreen(self): + raise NotImplementedError("Subclass must implement this method") + + def setFont(self, font: str, size: int): + raise NotImplementedError("Subclass must implement this method") + + def setDefaultFont(self): + raise NotImplementedError("Subclass must implement this method") + + def displayLines(self, lines: List[str], x_margin: int = 2): + logging.debug(f"Not implemented, not using {lines}, {x_margin}") + raise NotImplementedError("Subclass must implement this method") diff --git a/screens/terminal_screen.py b/screens/terminal_screen.py new file mode 100644 index 0000000..77a13db --- /dev/null +++ b/screens/terminal_screen.py @@ -0,0 +1,37 @@ +from typing import List +import logging +import os + + +from .screen import Screen + + +class TerminalScreen(Screen): + def __init__(self): + super().__init__() + self.type = "Terminal Screen" + + self.width, self.height = os.get_terminal_size() + + def setDefaultFont(self): + return 0 + + def setFont(self, font: str, size: int): + logging.debug(f"TerminalScreen doesn't support custom fonts, can't set font to {font}, or size to {size}") + raise NotImplementedError("TerminalScreen doesn't support custom fonts") + + def displayLines(self, lines: List[str], x_padding: int = 0, y_padding: int = 0): + # displays text on the screen + self.clearScreen() + + y = 0 + x = 0 + for i, line in enumerate(lines): + # TODO: Calculate x and y before printing to check if we are going to go off the terminal + print('\n' * int(y+(y_padding*i)), end='') + print(' ' * int(x+(x_padding*i)), end='') + print(line, end='') + + + def clearScreen(self): + print("\033c", end='') diff --git a/screens/waveshare_233_ssd1305.py b/screens/waveshare_233_ssd1305.py new file mode 100644 index 0000000..08bdadd --- /dev/null +++ b/screens/waveshare_233_ssd1305.py @@ -0,0 +1,80 @@ +from typing import List +import logging + + +from PIL import Image, ImageDraw, ImageFont + + +from .screen import Screen + + +class Waveshare_233_SSD1305(Screen): + def __init__(self): + from drive import SSD1305 + + self.type = "Waveshare 2.33 SSD1305 OLED" + + self.supports_custom_fonts = True + + self.ssd1305 = SSD1305.SSD1305() + self.ssd1305.Init() + + self.width = self.ssd1305.width + self.height = self.ssd1305.height + self.image = Image.new('1', (self.width, self.height)) # Create new image with 1 bit colour + self.draw = ImageDraw.Draw(self.image) + + def setDefaultFont(self): + logging.debug("setting font to default font") + self.font = ImageFont.load_default() + + def setFont(self, font, size): + self.font_size = size + self.font = ImageFont.truetype(font, size) + + def clearBuffer(self): + self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) + + def displayOnScreen(self): + self.ssd1305.getbuffer(self.image) + self.ssd1305.ShowImage() + + def displayLines(self, lines: List[str], x_margin: int = 2): + # displays text on the screen, the line that is first in the list is displayed on the top + self.clearBuffer() + self.clearScreen() + + x_start = x_margin + y_start = 0 + x_padding = 0 + y_padding = 0 + for i, line in enumerate(lines): + self.draw.text( + ( + x_start+(i*x_padding), + y_start+y_padding + ), + line, + font=self.font, + fill=255 + ) + bounding_box = self.draw.textbbox( + ( + (x_start+x_padding), + y_start+y_padding, + ), + line, + font=self.font, + ) + bottom = bounding_box[3] + y_padding = bottom + self.displayOnScreen() + + def clearScreen(self): + # Used by parent class Screen + buffer = Image.new('1', (self.width, self.height)) + draw = ImageDraw.Draw(self.image) + # black rectangle to clear screen, without clearing buffer + draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) + self.ssd1305.getbuffer(buffer) + self.ssd1305.ShowImage() diff --git a/version b/version new file mode 100644 index 0000000..0e81df0 --- /dev/null +++ b/version @@ -0,0 +1 @@ +0.0.0.1