WIP: Add support for custom format, also rename main.py -> glucose-monitor.py #1

Draft
ronald1985 wants to merge 4 commits from devel into master

3
.gitignore vendored

@ -0,0 +1,3 @@
screens/__pycache__
libreview/__pycache__
screens/04B_08__.TTF

Binary file not shown.

@ -0,0 +1,410 @@
#!/usr/bin/env python3
"""
glucose_monitor.py - A script to display glucose levels on a Raspberry Pi
Configurable with a config file
"""
import argparse
import configparser
import logging
import os
import signal
import inspect
import sys
import time
from datetime import datetime
from libreview import LibreViewSession
from screens import DebugScreen
from screens import Screen
import requests
import yaml
# CONSTANTS
DEFAULT_DATA_REFRESH_INTERVAL = 5
DEFAULT_SCREEN_REFRESH_INTERVAL = 1
DEFAULT_FONT_SIZE = 9
DEFAULT_DEBUG_SCREEN_WIDTH = 128
DEFAULT_DEBUG_SCREEN_HEIGHT = 32
DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS = "1"
SUPPORTED_SCREENS = [
"debug",
"terminal",
"waveshare_2.23_ssd1305"
]
# GLOBAL VARIABLES
SHOULD_EXIT = False
def file_exists(file_path) -> bool:
"""
file_exists:
Checks if a file exists
Arguments:
file_path:
The file that we are looking for
Returns:
boolean:
Returns true if the file exists, and false if it doesn"t
"""
return os.path.isfile(file_path)
def directory_exists(directory_path):
"""
directory_exists:
Checks if a directory exists
Arguments:
directory_path:
The directory that we are looking for
Returns:
boolean:
Returns true if the file exists and is a directory, and false if it"s not
"""
return os.path.isdir(directory_path)
def ini_to_yaml(ini_file_path: str, yaml_file_path: str, lower_case: bool = False):
"""
ini_to_yaml:
Takes an ini file and produces a yaml file from it.
Arguments:
ini_file_path:
The path to the ini file that you want to convert to a yaml file
yaml_file_path:
The path to the yaml file that you wish to create
lower_case = False
Change all the sections and keys to lower case, if false leaves the case alone
Returns:
None
"""
config = configparser.ConfigParser()
config.read(ini_file_path)
d = {}
for section in config.sections():
if lower_case:
d[section.lower()] = {}
else:
d[section] = {}
for key, value in config[section].items():
if lower_case:
d[section.lower()][key.lower()] = value
else:
d[section][key] = value
with open(yaml_file_path, "w+") as file:
yaml.dump(d, file, default_flow_style=False)
def get_current_time(fmt: str = "%H:%M %d/%m/%Y") -> str:
"""
get_current_time:
Returns a string containing the date and/or time in the format specified in fmt
Arguments:
fmt:
The format of the date and/or time to be returned in the string
Returns:
Returns a string with the date and/or time in the format specified
"""
now = datetime.now()
return now.strftime(fmt)
def time_since(dt: datetime) -> str:
"""
time_since:
Returns a string with the difference between the datetime passed and today
Arguments:
dt:
A datetime class to get the time since
Returns:
A string in the with minutes if difference can be calculated in minutes otherwise will just return seconds
Will be in the format of "2m 30s" or "40s"
"""
now = datetime.now()
difference = now - dt
total_seconds = difference.total_seconds()
minutes = int(total_seconds // 60)
seconds = int(total_seconds % 60)
if minutes > 0:
return f"{minutes}m {seconds}s"
return f"{seconds}s"
def signal_handler(signum, frame):
global SHOULD_EXIT
if SHOULD_EXIT:
logging.info("already received exit signal exiting NOW")
sys.exit(1)
SHOULD_EXIT = True
logging.info(f"received {signal.Signals(signum).name} signal shutting down")
logging.debug("stack trace:")
for frame, filename, lineno, function, _, _ in inspect.getouterframes(frame):
logging.debug(f"file: {filename}, line_number: {lineno}, function: {function}")
def main():
global SHOULD_EXIT
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
parser = argparse.ArgumentParser(
prog="glucose-monitor",
description="A program to display glucose levels and if you blood sugar is high"
)
parser.add_argument("-m", "--migrate", action="store_true")
parser.add_argument("-t", "--token")
parser.add_argument("-c", "--config")
args = parser.parse_args()
if args.config is not None:
if not file_exists(args.config):
logging.fatal("config file location {args.config} does not exist")
sys.exit(1)
else:
config_file_location = args.config
else:
possible_config_file_locations = [
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.yaml",
"/etc/glucose-monitor/config.yaml",
"config.yaml"
]
config_file_location = ""
found_config_file = False
for possible_config_file_location in possible_config_file_locations:
if file_exists(possible_config_file_location):
config_file_location = possible_config_file_location
found_config_file = True
logging.info("config file found at " + config_file_location)
break
if not found_config_file:
old_config_file_locations = [
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini",
"/etc/glucose-monitor/config.ini",
"config.ini"
]
for old_config_file_location in old_config_file_locations:
if file_exists(old_config_file_location):
if not args.migrate:
logging.fatal("Sorry, ini files are no longer support, run glucose-monitor --migrate to move to yaml")
sys.exit(1)
else:
ini_to_yaml(old_config_file_location, old_config_file_location[:-4]+".yaml", lower_case=True)
config_file_location = old_config_file_location[:-4]+".yaml"
if config_file_location == "":
logging.fatal("could not find a config file")
sys.exit(1)
with open(config_file_location, "r") as config_file:
config = yaml.safe_load(config_file)
if "log" in config:
if "level" in config["log"]:
level = config["log"]["level"].lower()
if level == "debug":
logging.getLogger().setLevel(logging.DEBUG)
elif level == "info":
logging.getLogger().setLevel(logging.INFO)
elif level == "warning":
logging.getLogger().setLevel(logging.WARNING)
elif level == "error":
logging.getLogger().setLevel(logging.ERROR)
elif level == "critical":
logging.getLogger().setLevel(logging.CRITICAL)
else:
logging.error("log level specified in config file is not valid")
logging.debug("set log level to debug")
if args.token is None:
if "credentials" not in config:
# TODO: support credentials in environmental variables
logging.fatal("no credentials specified in config file")
sys.exit(1)
if "email" not in config["credentials"]:
# TODO: support credentials in environmental variables
logging.fatal("no email specified in config file")
sys.exit(1)
if "password" not in config["credentials"]:
# TODO: support credentials in environmental variables
logging.fatal("no password specified in config file")
sys.exit(1)
data_refresh_interval = DEFAULT_DATA_REFRESH_INTERVAL
screen_refresh_interval = DEFAULT_SCREEN_REFRESH_INTERVAL
font = "DEFAULT"
font_size = DEFAULT_FONT_SIZE
lines = [
"%T",
"GLUCOSE UNITS: %U"
]
if "general" in config:
general_config = config["general"]
if "data_refresh_interval" in general_config:
data_refresh_interval = int(config["general"]["data_refresh_interval"])
if "screen_refresh_interval" in general_config:
screen_refresh_interval = int(config["general"]["screen_refresh_interval"])
# TODO: Add format specifier to check if high and low
# if "upper_bound" in general_config:
# upper_bound = config["general"]["upper_bound"]
# if "lower_bound" in general_config:
# lower_bound = config["general"]["lower_bound"]
if "font" in general_config:
font = general_config["font"]
logging.debug("found font in config, checking that it exists")
if not file_exists(font):
logging.fatal(f"font does not exist at path {font}")
sys.exit(1)
else:
logging.debug(f"found font at {font}")
if "font_size" in general_config:
font_size = general_config["font_size"]
if "lines" in general_config:
lines = general_config["lines"]
output_lines = []
for line in lines:
temp = ""
if "%T" in line:
temp += line.replace("%T", '{get_current_time()}')
if "%U" in line:
temp += line.replace("%U", '{glucose_measurement["Value"]}')
if "%S" in line:
temp += line.replace("%S", '{time_since(glucose_measurement_timestamp_datetime)}')
output_lines.append(temp)
display = Screen()
if "screen" in config:
screen_config = config["screen"]
if "type" not in screen_config:
logging.fatal("no screen type in config file")
sys.exit(1)
screen_type = screen_config["type"].lower()
if screen_type not in SUPPORTED_SCREENS:
logging.fatal("sorry screen type not supported")
sys.exit(1)
if screen_type == "waveshare_2.23_ssd1305":
from screens.waveshare_233_ssd1305 import Waveshare_233_SSD1305
display = Waveshare_233_SSD1305()
elif screen_type == "debug":
width = DEFAULT_DEBUG_SCREEN_WIDTH
height = DEFAULT_DEBUG_SCREEN_HEIGHT
number_of_colour_bits = DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS
if "width" in screen_config:
width = int(screen_config["width"])
if "height" in screen_config:
height = int(screen_config["height"])
if "number_of_colour_bits" in screen_config:
number_of_colour_bits = screen_config["number_of_colour_bits"]
display = DebugScreen(width, height, number_of_colour_bits)
libreview_session = LibreViewSession()
if args.token is not None:
libreview_session.useToken(args.token)
else:
libreview_session.authenticate(config["credentials"]["email"], config["credentials"]["password"])
if libreview_session.authenticated and not SHOULD_EXIT:
logging.info("successfully authenticated with LibreView")
else:
logging.fatal("failed to authenticate with LibreView")
sys.exit(1)
try:
libreview_session.getConnections()
except Exception as e:
logging.debug("got exception,", e)
logging.fatal("failed to get connections")
sys.exit(1)
if len(libreview_session.connections) > 1:
logging.fatal("currently glucose-monitor only supports accounts with one connection")
sys.exit(1)
if len(libreview_session.connections) < 1:
logging.fatal("to use glucose-monitor your libreview account must have at least one connection")
sys.exit(1)
patient_id = libreview_session.connections[0]["patientId"]
if display.supports_custom_fonts:
if font != "DEFAULT":
display.setFont(font, int(font_size))
else:
display.setDefaultFont()
while not SHOULD_EXIT:
try:
graph = libreview_session.getGraph(patient_id)
graph["connection"]["glucoseMeasurement"]
glucose_measurement = graph["connection"]["glucoseMeasurement"]
glucose_measurement_timestamp = glucose_measurement["Timestamp"]
glucose_measurement_timestamp_datetime = datetime.strptime(
glucose_measurement_timestamp,
"%m/%d/%Y %I:%M:%S %p"
)
except requests.exceptions.ConnectionError:
logging.error("failed to get details about patient, connection error")
except requests.exceptions.HTTPError:
logging.error("failed to get details about patient, API returned invalid response")
except requests.exceptions.RequestException:
logging.error("failed to get details about patient an unknown error occurred")
except KeyError:
logging.error("the data returned from the API was not in an expected format")
for _ in range(data_refresh_interval//screen_refresh_interval):
display.clearScreen()
computed_output_lines = []
for output_line in output_lines:
computed_output_lines.append(eval(f"f'{output_line}'"))
display.displayLines(computed_output_lines)
time.sleep(screen_refresh_interval)
logging.debug("exiting now...")
if __name__ == "__main__":
main()

@ -0,0 +1,2 @@
from .libreview import LibreViewSession

@ -0,0 +1,115 @@
import logging
from typing import Dict
import requests
class LibreViewSession:
def __init__(self):
self.authenticated = False
self.token = ""
self.connections = []
def useToken(self, token):
url = "https://api.libreview.io/user"
self.token = token
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
logging.debug(f"got status code {response.status_code} when attempting to test token")
self.authenticated = False
else:
self.authenticated = True
def authenticate(self, email, password):
url = "https://api.libreview.io/llu/auth/login"
if self.authenticated:
return
headers = {
"version": "4.7",
"product": "llu.ios",
"Content-Type": "application/json",
"Accept": "application/json"
}
payload = {
"email": email,
"password": password,
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
json = response.json()
if 'status' in json:
if json['status'] == 0 and 'data' in json:
data = json['data']
auth_ticket = data['authTicket']
self.token = auth_ticket['token']
self.authenticated = True
else:
self.authenticated = False
logging.debug(f"failed to authenticate with libreview, got status {json['status']}")
return
def getConnections(self):
url = "https://api.libreview.io/llu/connections"
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
logging.debug(f"got http response code {response.status_code} from libreview when attempting to get connections")
return []
json = response.json()
if 'data' in json:
self.connections = json['data']
else:
print(response.content)
self.connections = []
def getGraph(self, patientId) -> Dict[str, Dict]:
"""
getGraph gets the LibreView graph for the {patientID}
"""
url = f"https://api.libreview.io/llu/connections/{patientId}/graph"
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
json = response.json()
if 'data' in json:
return json['data']
else:
return {}

@ -1,354 +0,0 @@
#!/usr/bin/env python3
import configparser
import logging
import os
import signal
import inspect
import sys
import time
import requests
# CONSTANTS
DEFAULT_INTERVAL = 5
DEFAULT_FONT = "04B_08__.TTF"
SUPPORTED_SCREENS = [
"waveshare_2.23_ssd1305"
]
# GLOBAL VARIABLES
SHOULD_EXIT = False
class LibreViewSession:
def __init__(self):
self.authenticated = False
self.token = ""
self.connections = []
def authenticate(self, email, password):
url = "https://api.libreview.io/llu/auth/login"
if self.authenticated:
return
headers = {
"version": "4.7",
"product": "llu.ios",
"Content-Type": "application/json",
"Accept": "application/json"
}
payload = {
"email": email,
"password": password,
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
json = response.json()
if 'status' in json:
if json['status'] == 0 and 'data' in json:
data = json['data']
auth_ticket = data['authTicket']
self.token = auth_ticket['token']
self.authenticated = True
else:
self.authenticated = False
logging.debug("failed to authenticate with LibreView")
return
def acceptTerms(self):
# TODO: finish this
# url = "https://api.libreview.io/auth/continue/tou"
print("not implemented yet, sorry")
def getUser(self):
# TODO: finish this
# url = "https://api.libreview.io/user"
print("not implemented yet, sorry")
def getConnections(self):
url = "https://api.libreview.io/llu/connections"
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
json = response.json()
if 'data' in json:
self.connections = json['data']
def getGraph(self, patientId):
url = f"https://api.libreview.io/llu/connections/{patientId}/graph"
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
json = response.json()
if 'data' in json:
return json['data']
# TODO: determine what to do on error
return json['message'] # FIXME: this probably isn't correct
def file_exists(file_path):
return os.path.isfile(file_path)
def directory_exists(directory_path):
return os.path.isdir(directory_path)
def clear_terminal():
print("\033c", end='')
def signal_handler(signum, frame):
global SHOULD_EXIT
if SHOULD_EXIT:
logging.info("already received exit signal exiting NOW")
sys.exit(1)
SHOULD_EXIT = True
logging.info(f"received {signal.Signals(signum).name} signal shutting down")
logging.debug("stack trace:")
for frame, filename, lineno, function, _, _ in inspect.getouterframes(frame):
logging.debug(f"file: {filename}, line_number: {lineno}, function: {function}")
def main():
global SHOULD_EXIT
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
possible_config_file_locations = [
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini",
"/etc/glucose-monitor/config.ini",
"config.ini"
]
config_file_location = ""
for possible_config_file_location in possible_config_file_locations:
if file_exists(possible_config_file_location):
config_file_location = possible_config_file_location
logging.info("config file found at " + config_file_location)
break
if config_file_location == "":
logging.fatal("could not find a config file")
sys.exit(1)
config = configparser.ConfigParser()
config.read(config_file_location)
if 'LOG' in config:
if 'LEVEL' in config['LOG']:
level = config['LOG']['LEVEL'].lower()
if level == "debug":
logging.getLogger().setLevel(logging.DEBUG)
elif level == "info":
logging.getLogger().setLevel(logging.INFO)
elif level == "warning":
logging.getLogger().setLevel(logging.WARNING)
elif level == "error":
logging.getLogger().setLevel(logging.ERROR)
elif level == "critical":
logging.getLogger().setLevel(logging.CRITICAL)
else:
logging.error("log level specified in config file is not valid")
logging.debug("set log level to debug")
if 'CREDENTIALS' not in config:
# TODO: support credentials in environmental variables
logging.fatal("no credentials specified in config file")
sys.exit(1)
if 'EMAIL' not in config['CREDENTIALS']:
# TODO: support credentials in environmental variables
logging.fatal("no email specified in config file")
sys.exit(1)
if 'PASSWORD' not in config['CREDENTIALS']:
# TODO: support credentials in environmental variables
logging.fatal("no password specified in config file")
sys.exit(1)
if 'GENERAL' in config and 'INTERVAL' in config['GENERAL']:
delay = int(config['GENERAL']['INTERVAL'])
else:
delay = DEFAULT_INTERVAL
show_on_screen = False
show_in_terminal = True
if 'SCREEN' in config:
if 'TYPE' not in config['SCREEN']:
logging.fatal("no screen type in config file")
sys.exit(1)
if config['SCREEN']['TYPE'].lower() not in SUPPORTED_SCREENS:
logging.fatal("sorry screen type not supported")
sys.exit(1)
screen_type = config['SCREEN']['TYPE'].lower()
if screen_type == "waveshare_2.23_ssd1305":
show_on_screen = True
SSD1305_library_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'drive')
if directory_exists(SSD1305_library_directory):
sys.path.append(SSD1305_library_directory)
else:
logging.fatal("failed to import SSD1305 library")
sys.exit(1)
from drive import SSD1305
from PIL import Image, ImageDraw, ImageFont
display = SSD1305.SSD1305()
display.Init()
width = display.width
height = display.height
image = Image.new('1', (width, height))
padding = 0
top = padding
x = 0
draw = ImageDraw.Draw(image)
draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer
if 'FONT' in config['SCREEN']:
font = ImageFont.truetype(config['SCREEN']['FONT'], 8)
else:
font = ImageFont.truetype(DEFAULT_FONT, 8)
def display_on_screen(buffer):
display.getbuffer(buffer)
display.ShowImage()
def clear_display():
buffer = Image.new('1', (width, height))
draw = ImageDraw.Draw(image)
draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer
display.getbuffer(buffer)
display.ShowImage()
logging.info("clearing display")
clear_display()
if "DISPLAY_IN_TERMINAL" in config['SCREEN']:
if config['SCREEN']['DISPLAY_IN_TERMINAL'].lower() == "false":
show_in_terminal = False
libreview_session = LibreViewSession()
libreview_session.authenticate(config['CREDENTIALS']['EMAIL'], config['CREDENTIALS']['PASSWORD'])
if libreview_session.authenticated and not SHOULD_EXIT:
logging.info("successfully authenticated with LibreView")
else:
logging.fatal("failed to authenticate with LibreView")
sys.exit(1)
try:
libreview_session.getConnections()
except Exception:
logging.fatal("failed to get connections")
sys.exit(1)
if len(libreview_session.connections) > 1:
logging.fatal("currently glucose-monitor only supports accounts with one connection")
if len(libreview_session.connections) < 1:
logging.fatal("to use glucose-monitor your libreview account must have one connection")
patient_id = libreview_session.connections[0]['patientId']
first_run = True
while not SHOULD_EXIT:
if not SHOULD_EXIT and not first_run:
time.sleep(delay)
elif first_run:
if show_in_terminal:
clear_terminal()
if show_on_screen:
clear_display()
first_run = False
try:
graph = libreview_session.getGraph(patient_id)
except requests.exceptions.ConnectionError:
logging.error("failed to get details about patient, connection error")
continue
except requests.exceptions.HTTPError:
logging.error("failed to get details about patient, API returned invalid response")
continue
except requests.exceptions.RequestException:
logging.error("failed to get details about patient an unknown error occurred")
continue
try:
graph['connection']['glucoseMeasurement']
except KeyError:
logging.error("the data returned from the API was not in an expected format")
continue
glucoseMeasurement = graph['connection']['glucoseMeasurement']
if show_in_terminal:
clear_terminal()
print(f"Timestamp: {glucoseMeasurement['Timestamp']}\n")
print(f"Glucose units: {glucoseMeasurement['Value']}")
print(f"Is high: {glucoseMeasurement['isHigh']}")
print(f"Is low: {glucoseMeasurement['isLow']}")
print(f"Refresh rate: {delay} seconds")
if show_on_screen:
clear_display()
draw.text((x, top), glucoseMeasurement['Timestamp'], font=font, fill=255)
draw.text((x, top+8), "Glucose units: " + str(glucoseMeasurement['Value']), font=font, fill=255)
# Don't know why these two need to have more spaces to line up with the line ebovee
draw.text((x, top+16), "Is high: " + str(glucoseMeasurement['isHigh']), font=font, fill=255)
draw.text((x, top+24), "Is low: " + str(glucoseMeasurement['isLow']), font=font, fill=255)
display_on_screen(image)
logging.debug("exiting now...")
if __name__ == "__main__":
main()

@ -0,0 +1,4 @@
from .screen import Screen
from .terminal_screen import TerminalScreen
from .debug_screens import DebugScreen
from .waveshare_233_ssd1305 import Waveshare_233_SSD1305

@ -0,0 +1,84 @@
from typing import List
import logging
import os
import psutil
from PIL import Image, ImageDraw, ImageFont
from .screen import Screen
class DebugScreen(Screen):
def __init__(self, width: int, height: int, number_of_colour_bits: str):
self.type = "Debug Screen"
self.supports_custom_fonts = True
self.width = width
self.height = height
self.image = Image.new(number_of_colour_bits, (self.width, self.height)) # Create a new image with 1 bit colour
self.draw = ImageDraw.Draw(self.image)
def setDefaultFont(self):
logging.debug("setting font to default font")
self.font = ImageFont.load_default()
def setFont(self, font: str, size: int):
logging.debug(f"setting font to {font} with size, {size}")
self.font_size = size
self.font = ImageFont.truetype(font, size)
def clearBuffer(self):
self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)
def displayOnScreen(self):
self.image.show()
def displayLines(self, lines: List[str], x_margin: int = 2):
# displays text on the screen, the line that is first in the list is displayed on the top
self.clearBuffer()
self.clearScreen()
x_start = x_margin
y_start = 0
x_padding = 0
y_padding = 0
for i, line in enumerate(lines):
self.draw.text(
(
x_start+(i*x_padding),
y_start+y_padding
),
line,
font=self.font,
fill=255
)
bounding_box = self.draw.textbbox(
(
(x_start+x_padding),
y_start+y_padding,
),
line,
font=self.font,
)
bottom = bounding_box[3]
y_padding = bottom
self.displayOnScreen()
def clearScreen(self):
xdg_open_pid = -1
for p in psutil.process_iter():
if p.ppid() == os.getpid() and p.name() == "xdg-open":
logging.debug(f"found xdg-open process with a process id {p.pid}")
xdg_open_pid = p.pid
elif xdg_open_pid != -1 and p.ppid() == xdg_open_pid:
logging.debug(f"killing process, {p.name()} {p.cmdline()}")
p.terminate()
p.kill()
self.clearBuffer()
draw = ImageDraw.Draw(self.image)
# black rectangle to clear screen, without clearing buffer
draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)

@ -0,0 +1,25 @@
from typing import List
import logging
class Screen:
def __init__(self):
self.type = None
self.supports_custom_fonts = False
def getScreenType(self):
return self.type
def clearScreen(self):
raise NotImplementedError("Subclass must implement this method")
def setFont(self, font: str, size: int):
raise NotImplementedError("Subclass must implement this method")
def setDefaultFont(self):
raise NotImplementedError("Subclass must implement this method")
def displayLines(self, lines: List[str], x_margin: int = 2):
logging.debug(f"Not implemented, not using {lines}, {x_margin}")
raise NotImplementedError("Subclass must implement this method")

@ -0,0 +1,37 @@
from typing import List
import logging
import os
from .screen import Screen
class TerminalScreen(Screen):
def __init__(self):
super().__init__()
self.type = "Terminal Screen"
self.width, self.height = os.get_terminal_size()
def setDefaultFont(self):
return 0
def setFont(self, font: str, size: int):
logging.debug(f"TerminalScreen doesn't support custom fonts, can't set font to {font}, or size to {size}")
raise NotImplementedError("TerminalScreen doesn't support custom fonts")
def displayLines(self, lines: List[str], x_padding: int = 0, y_padding: int = 0):
# displays text on the screen
self.clearScreen()
y = 0
x = 0
for i, line in enumerate(lines):
# TODO: Calculate x and y before printing to check if we are going to go off the terminal
print('\n' * int(y+(y_padding*i)), end='')
print(' ' * int(x+(x_padding*i)), end='')
print(line, end='')
def clearScreen(self):
print("\033c", end='')

@ -0,0 +1,80 @@
from typing import List
import logging
from PIL import Image, ImageDraw, ImageFont
from .screen import Screen
class Waveshare_233_SSD1305(Screen):
def __init__(self):
from drive import SSD1305
self.type = "Waveshare 2.33 SSD1305 OLED"
self.supports_custom_fonts = True
self.ssd1305 = SSD1305.SSD1305()
self.ssd1305.Init()
self.width = self.ssd1305.width
self.height = self.ssd1305.height
self.image = Image.new('1', (self.width, self.height)) # Create new image with 1 bit colour
self.draw = ImageDraw.Draw(self.image)
def setDefaultFont(self):
logging.debug("setting font to default font")
self.font = ImageFont.load_default()
def setFont(self, font, size):
self.font_size = size
self.font = ImageFont.truetype(font, size)
def clearBuffer(self):
self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)
def displayOnScreen(self):
self.ssd1305.getbuffer(self.image)
self.ssd1305.ShowImage()
def displayLines(self, lines: List[str], x_margin: int = 2):
# displays text on the screen, the line that is first in the list is displayed on the top
self.clearBuffer()
self.clearScreen()
x_start = x_margin
y_start = 0
x_padding = 0
y_padding = 0
for i, line in enumerate(lines):
self.draw.text(
(
x_start+(i*x_padding),
y_start+y_padding
),
line,
font=self.font,
fill=255
)
bounding_box = self.draw.textbbox(
(
(x_start+x_padding),
y_start+y_padding,
),
line,
font=self.font,
)
bottom = bounding_box[3]
y_padding = bottom
self.displayOnScreen()
def clearScreen(self):
# Used by parent class Screen
buffer = Image.new('1', (self.width, self.height))
draw = ImageDraw.Draw(self.image)
# black rectangle to clear screen, without clearing buffer
draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)
self.ssd1305.getbuffer(buffer)
self.ssd1305.ShowImage()

@ -0,0 +1 @@
0.0.0.1
Loading…
Cancel
Save