Major rewrite and refactor under way

Nearly there, needs some more testing around the WaveShare and Terminal
screen classes.
devel
Ronald 1 year ago
parent 54296e7bbe
commit 0b2c692392

3
.gitignore vendored

@ -0,0 +1,3 @@
screens/__pycache__
libreview/__pycache__
screens/04B_08__.TTF

Binary file not shown.

@ -0,0 +1,2 @@
from .libreview import LibreViewSession

@ -0,0 +1,114 @@
import logging
import requests
class LibreViewSession:
def __init__(self):
self.authenticated = False
self.token = ""
self.connections = []
def useToken(self, token):
url = "https://api.libreview.io/user"
self.token = token
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
logging.debug(f"got status code {response.status_code} when attempting to test token")
self.authenticated = False
else:
self.authenticated = True
def authenticate(self, email, password):
url = "https://api.libreview.io/llu/auth/login"
if self.authenticated:
return
headers = {
"version": "4.7",
"product": "llu.ios",
"Content-Type": "application/json",
"Accept": "application/json"
}
payload = {
"email": email,
"password": password,
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
json = response.json()
if 'status' in json:
if json['status'] == 0 and 'data' in json:
data = json['data']
auth_ticket = data['authTicket']
self.token = auth_ticket['token']
self.authenticated = True
else:
self.authenticated = False
logging.debug(f"failed to authenticate with libreview, got status {json['status']}")
return
def getConnections(self):
url = "https://api.libreview.io/llu/connections"
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
logging.debug(f"got http response code {response.status_code} from libreview when attempting to get connections")
return []
json = response.json()
if 'data' in json:
self.connections = json['data']
else:
print(response.content)
self.connections = []
def getGraph(self, patientId):
"""
getGraph gets the LibreView graph for the {patientID}
"""
url = f"https://api.libreview.io/llu/connections/{patientId}/graph"
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
json = response.json()
if 'data' in json:
return json['data']
else:
return None

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import argparse
import configparser
import logging
import os
@ -8,6 +9,12 @@ import signal
import inspect
import sys
import time
from datetime import datetime
from libreview import LibreViewSession
from screens import DebugScreen
from screens import TerminalScreen
import requests
@ -15,8 +22,13 @@ import requests
# CONSTANTS
DEFAULT_INTERVAL = 5
DEFAULT_FONT = "04B_08__.TTF"
DEFAULT_FONT_SIZE = 9
DEFAULT_DEBUG_SCREEN_WIDTH = 128
DEFAULT_DEBUG_SCREEN_HEIGHT = 32
DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS = '1'
SUPPORTED_SCREENS = [
"debug",
"terminal",
"waveshare_2.23_ssd1305"
]
@ -25,95 +37,6 @@ SUPPORTED_SCREENS = [
SHOULD_EXIT = False
class LibreViewSession:
def __init__(self):
self.authenticated = False
self.token = ""
self.connections = []
def authenticate(self, email, password):
url = "https://api.libreview.io/llu/auth/login"
if self.authenticated:
return
headers = {
"version": "4.7",
"product": "llu.ios",
"Content-Type": "application/json",
"Accept": "application/json"
}
payload = {
"email": email,
"password": password,
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
json = response.json()
if 'status' in json:
if json['status'] == 0 and 'data' in json:
data = json['data']
auth_ticket = data['authTicket']
self.token = auth_ticket['token']
self.authenticated = True
else:
self.authenticated = False
logging.debug("failed to authenticate with LibreView")
return
def acceptTerms(self):
# TODO: finish this
# url = "https://api.libreview.io/auth/continue/tou"
print("not implemented yet, sorry")
def getUser(self):
# TODO: finish this
# url = "https://api.libreview.io/user"
print("not implemented yet, sorry")
def getConnections(self):
url = "https://api.libreview.io/llu/connections"
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
json = response.json()
if 'data' in json:
self.connections = json['data']
def getGraph(self, patientId):
url = f"https://api.libreview.io/llu/connections/{patientId}/graph"
headers = {
"version": "4.7",
"product": "llu.ios",
"Accept": "application/json",
"Authorization": "Bearer " + self.token
}
response = requests.get(url, headers=headers)
json = response.json()
if 'data' in json:
return json['data']
# TODO: determine what to do on error
return json['message'] # FIXME: this probably isn't correct
def file_exists(file_path):
return os.path.isfile(file_path)
@ -122,10 +45,6 @@ def directory_exists(directory_path):
return os.path.isdir(directory_path)
def clear_terminal():
print("\033c", end='')
def signal_handler(signum, frame):
global SHOULD_EXIT
if SHOULD_EXIT:
@ -150,25 +69,41 @@ def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
possible_config_file_locations = [
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini",
"/etc/glucose-monitor/config.ini",
"config.ini"
]
config_file_location = ""
for possible_config_file_location in possible_config_file_locations:
if file_exists(possible_config_file_location):
config_file_location = possible_config_file_location
logging.info("config file found at " + config_file_location)
break
if config_file_location == "":
logging.fatal("could not find a config file")
sys.exit(1)
parser = argparse.ArgumentParser(
prog='glucose-monitor',
description='A program to display glucose levels and if you blood sugar is high'
)
config = configparser.ConfigParser()
parser.add_argument('-t', '--token')
parser.add_argument('-c', '--config')
args = parser.parse_args()
if args.config is not None:
if not file_exists(args.config):
logging.fatal("config file location {args.config} does not exist")
sys.exit(1)
else:
config_file_location = args.config
else:
possible_config_file_locations = [
str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini",
"/etc/glucose-monitor/config.ini",
"config.ini"
]
config_file_location = ""
for possible_config_file_location in possible_config_file_locations:
if file_exists(possible_config_file_location):
config_file_location = possible_config_file_location
logging.info("config file found at " + config_file_location)
break
if config_file_location == "":
logging.fatal("could not find a config file")
sys.exit(1)
config = configparser.ConfigParser()
config.read(config_file_location)
if 'LOG' in config:
@ -189,91 +124,77 @@ def main():
logging.debug("set log level to debug")
if 'CREDENTIALS' not in config:
# TODO: support credentials in environmental variables
logging.fatal("no credentials specified in config file")
sys.exit(1)
if 'EMAIL' not in config['CREDENTIALS']:
# TODO: support credentials in environmental variables
logging.fatal("no email specified in config file")
sys.exit(1)
if 'PASSWORD' not in config['CREDENTIALS']:
# TODO: support credentials in environmental variables
logging.fatal("no password specified in config file")
sys.exit(1)
if args.token is None:
if 'CREDENTIALS' not in config:
# TODO: support credentials in environmental variables
logging.fatal("no credentials specified in config file")
sys.exit(1)
if 'EMAIL' not in config['CREDENTIALS']:
# TODO: support credentials in environmental variables
logging.fatal("no email specified in config file")
sys.exit(1)
if 'PASSWORD' not in config['CREDENTIALS']:
# TODO: support credentials in environmental variables
logging.fatal("no password specified in config file")
sys.exit(1)
if 'GENERAL' in config and 'INTERVAL' in config['GENERAL']:
delay = int(config['GENERAL']['INTERVAL'])
else:
delay = DEFAULT_INTERVAL
delay = DEFAULT_INTERVAL
font = "DEFAULT"
font_size = DEFAULT_FONT_SIZE
if 'GENERAL' in config:
general_config = config['GENERAL']
if 'INTERVAL' in general_config:
delay = int(config['GENERAL']['INTERVAL'])
if 'UPPER_BOUND' in general_config:
upper_bound = config['GENERAL']['UPPER_BOUND']
if 'LOWER_BOUND' in general_config:
lower_bound = config['GENERAL']['LOWER_BOUND']
if 'FONT' in general_config:
font = general_config['FONT']
logging.debug("found font in config, checking that it exists")
if not file_exists(font):
logging.fatal(f"font does not exist at path {font}")
sys.exit(1)
else:
logging.debug(f"found font at {font}")
if 'FONT_SIZE' in general_config:
font_size = general_config['FONT_SIZE']
show_on_screen = False
show_in_terminal = True
display = TerminalScreen()
if 'SCREEN' in config:
if 'TYPE' not in config['SCREEN']:
screen_config = config['SCREEN']
if 'TYPE' not in screen_config:
logging.fatal("no screen type in config file")
sys.exit(1)
if config['SCREEN']['TYPE'].lower() not in SUPPORTED_SCREENS:
screen_type = screen_config['TYPE'].lower()
if screen_type not in SUPPORTED_SCREENS:
logging.fatal("sorry screen type not supported")
sys.exit(1)
screen_type = config['SCREEN']['TYPE'].lower()
if screen_type == "waveshare_2.23_ssd1305":
show_on_screen = True
SSD1305_library_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'drive')
if directory_exists(SSD1305_library_directory):
sys.path.append(SSD1305_library_directory)
else:
logging.fatal("failed to import SSD1305 library")
sys.exit(1)
from drive import SSD1305
from PIL import Image, ImageDraw, ImageFont
display = SSD1305.SSD1305()
display.Init()
width = display.width
height = display.height
image = Image.new('1', (width, height))
padding = 0
top = padding
x = 0
draw = ImageDraw.Draw(image)
draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer
if 'FONT' in config['SCREEN']:
font = ImageFont.truetype(config['SCREEN']['FONT'], 8)
else:
font = ImageFont.truetype(DEFAULT_FONT, 8)
def display_on_screen(buffer):
display.getbuffer(buffer)
display.ShowImage()
def clear_display():
buffer = Image.new('1', (width, height))
draw = ImageDraw.Draw(image)
draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer
display.getbuffer(buffer)
display.ShowImage()
logging.info("clearing display")
clear_display()
if "DISPLAY_IN_TERMINAL" in config['SCREEN']:
if config['SCREEN']['DISPLAY_IN_TERMINAL'].lower() == "false":
show_in_terminal = False
from screens.waveshare_233_ssd1305 import Waveshare_233_SSD1305
display = Waveshare_233_SSD1305()
elif screen_type == "debug":
width = DEFAULT_DEBUG_SCREEN_WIDTH
height = DEFAULT_DEBUG_SCREEN_HEIGHT
number_of_colour_bits = DEFAULT_DEBUG_SCREEN_NUMBER_OF_COLOUR_BITS
if 'WIDTH' in screen_config:
width = int(screen_config['WIDTH'])
if 'HEIGHT' in screen_config:
height = int(screen_config['HEIGHT'])
if 'NUMBER_OF_COLOUR_BITS' in screen_config:
number_of_colour_bits = screen_config['NUMBER_OF_COLOUR_BITS']
display = DebugScreen(width, height, number_of_colour_bits)
libreview_session = LibreViewSession()
libreview_session.authenticate(config['CREDENTIALS']['EMAIL'], config['CREDENTIALS']['PASSWORD'])
if args.token is not None:
libreview_session.useToken(args.token)
else:
libreview_session.authenticate(config['CREDENTIALS']['EMAIL'], config['CREDENTIALS']['PASSWORD'])
if libreview_session.authenticated and not SHOULD_EXIT:
logging.info("successfully authenticated with LibreView")
@ -283,32 +204,32 @@ def main():
try:
libreview_session.getConnections()
except Exception:
except Exception as e:
logging.debug("got exception,", e)
logging.fatal("failed to get connections")
sys.exit(1)
if len(libreview_session.connections) > 1:
logging.fatal("currently glucose-monitor only supports accounts with one connection")
sys.exit(1)
if len(libreview_session.connections) < 1:
logging.fatal("to use glucose-monitor your libreview account must have one connection")
logging.fatal("to use glucose-monitor your libreview account must have at least one connection")
sys.exit(1)
patient_id = libreview_session.connections[0]['patientId']
first_run = True
while not SHOULD_EXIT:
if not SHOULD_EXIT and not first_run:
time.sleep(delay)
elif first_run:
if show_in_terminal:
clear_terminal()
if show_on_screen:
clear_display()
if display.supports_custom_fonts:
if font != "DEFAULT":
display.setFont(font, int(font_size))
else:
display.setDefaultFont()
first_run = False
while not SHOULD_EXIT:
display.clearScreen()
try:
graph = libreview_session.getGraph(patient_id)
graph['connection']['glucoseMeasurement']
except requests.exceptions.ConnectionError:
logging.error("failed to get details about patient, connection error")
continue
@ -318,34 +239,21 @@ def main():
except requests.exceptions.RequestException:
logging.error("failed to get details about patient an unknown error occurred")
continue
try:
graph['connection']['glucoseMeasurement']
except KeyError:
logging.error("the data returned from the API was not in an expected format")
continue
glucoseMeasurement = graph['connection']['glucoseMeasurement']
if show_in_terminal:
clear_terminal()
print(f"Timestamp: {glucoseMeasurement['Timestamp']}\n")
print(f"Glucose units: {glucoseMeasurement['Value']}")
print(f"Is high: {glucoseMeasurement['isHigh']}")
print(f"Is low: {glucoseMeasurement['isLow']}")
print(f"Refresh rate: {delay} seconds")
if show_on_screen:
clear_display()
now = datetime.now()
lines = [
f"{now.strftime("%H:%M:%S")}",
str(glucoseMeasurement['Value'])
]
display.displayLines(lines)
draw.text((x, top), glucoseMeasurement['Timestamp'], font=font, fill=255)
draw.text((x, top+8), "Glucose units: " + str(glucoseMeasurement['Value']), font=font, fill=255)
# Don't know why these two need to have more spaces to line up with the line ebovee
draw.text((x, top+16), "Is high: " + str(glucoseMeasurement['isHigh']), font=font, fill=255)
draw.text((x, top+24), "Is low: " + str(glucoseMeasurement['isLow']), font=font, fill=255)
time.sleep(delay)
display_on_screen(image)
logging.debug("exiting now...")

@ -0,0 +1,3 @@
from .terminal_screen import TerminalScreen
from .debug_screens import DebugScreen
from .waveshare_233_ssd1305 import Waveshare_233_SSD1305

@ -0,0 +1,65 @@
import os
import logging
from typing import List
import psutil
from PIL import Image, ImageDraw, ImageFont
from .screen import Screen
class DebugScreen(Screen):
def __init__(self, width: int, height: int, number_of_colour_bits: str):
self.type = "Debug Screen"
self.supports_custom_fonts = True
self.width = width
self.height = height
self.image = Image.new(number_of_colour_bits, (self.width, self.height)) # Create a new image with 1 bit colour
self.draw = ImageDraw.Draw(self.image)
def setDefaultFont(self):
logging.debug("setting font to default font")
ImageFont.load_default()
def setFont(self, font: str, size: int):
logging.debug(f"setting font to {font} with size, {size}")
self.font = font
self.font_size = size
ImageFont.truetype(font, size)
def clearBuffer(self):
self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)
def displayOnScreen(self):
self.image.show()
def displayLines(self, lines: List[str], x_padding: int = 0, y_padding: int = 4):
# displays text on the screen, the line that is first in the list is displayed on the top
self.clearBuffer()
self.clearScreen()
x_start = 0
y_start = 0
for i, line in enumerate(lines):
self.draw.text((x_start+(i*x_padding), y_start+int(i*self.font_size)+(i*y_padding)), line, fill=255)
self.displayOnScreen()
def clearScreen(self):
xdg_open_pid = -1
for p in psutil.process_iter():
if p.ppid() == os.getpid() and p.name() == "xdg-open":
logging.debug(f"found xdg-open process with a process id {p.pid}")
xdg_open_pid = p.pid
elif xdg_open_pid != -1 and p.ppid() == xdg_open_pid:
logging.debug(f"killing process, {p.name()} {p.cmdline()}")
p.terminate()
p.kill()
self.clearBuffer()
draw = ImageDraw.Draw(self.image)
# black rectangle to clear screen, without clearing buffer
draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)

@ -0,0 +1,14 @@
class Screen:
def __init__(self):
self.type = None
self.supports_custom_fonts = False
def getScreenType(self):
return self.type
def clearScreen(self):
self.__clearScreen()
def __clearScreen(self):
raise NotImplementedError("Subclass must implement this method")

@ -0,0 +1,28 @@
from typing import List
import os
from .screen import Screen
class TerminalScreen(Screen):
def __init__(self):
self.type = "Terminal Screen"
self.width, self.height = os.get_terminal_size()
def displayLines(self, lines: List[str], x_padding: int = 0, y_padding: int = 4):
# displays text on the screen
self.clearScreen()
y = 0
x = 0
for i, line in enumerate(lines):
# TODO: Calculate x and y before printing to check if we are going to go off the terminal
print('\n' * int(y+(y_padding*i)), end='')
print(' ' * int(x+(x_padding*i)), end='')
print(line, end='')
def clearScreen(self):
print("\033c", end='')

@ -0,0 +1,56 @@
from typing import List
from PIL import Image, ImageDraw, ImageFont
from .screen import Screen
class Waveshare_233_SSD1305(Screen):
def __init__(self):
from drive import SSD1305
self.type = "Waveshare 2.33 SSD1305 OLED"
self.supports_custom_fonts = True
self.ssd1305 = SSD1305.SSD1305()
self.ssd1305.Init()
self.width = self.ssd1305.width
self.height = self.ssd1305.height
self.image = Image.new('1', (self.width, self.height)) # Create new image with 1 bit colour
self.draw = ImageDraw.Draw(self.image)
def setDefaultFont(self):
ImageFont.load_default()
def setFont(self, font, size):
self.font = font
self.font_size = size
ImageFont.truetype(font, size)
def clearBuffer(self):
self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)
def displayOnScreen(self):
self.ssd1305.getbuffer(self.image)
self.ssd1305.ShowImage()
def displayLines(self, lines: List[str]):
# displays text on the screen, the line that is first in the list is displayed on the top
y = 0
x = 0
for i, line in lines:
self.draw.text((x, y+(i*self.font_size)), line, font=self.font, fill=255)
self.displayOnScreen()
def clearScreen(self):
# Used by parent class Screen
buffer = Image.new('1', (self.width, self.height))
draw = ImageDraw.Draw(self.image)
# black rectangle to clear screen, without clearing buffer
draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)
self.ssd1305.getbuffer(buffer)
self.ssd1305.ShowImage()
Loading…
Cancel
Save