#!/usr/bin/env python3 import configparser import logging import os import signal import inspect import sys import time import requests # CONSTANTS DEFAULT_INTERVAL = 5 DEFAULT_FONT = "04B_08__.TTF" SUPPORTED_SCREENS = [ "waveshare_2.23_ssd1305" ] # GLOBAL VARIABLES SHOULD_EXIT = False class LibreViewSession: def __init__(self): self.authenticated = False self.token = "" self.connections = [] def authenticate(self, email, password): url = "https://api.libreview.io/llu/auth/login" if self.authenticated: return headers = { "version": "4.7", "product": "llu.ios", "Content-Type": "application/json", "Accept": "application/json" } payload = { "email": email, "password": password, } response = requests.post(url, json=payload, headers=headers) if response.status_code == 200: json = response.json() if 'status' in json: if json['status'] == 0 and 'data' in json: data = json['data'] auth_ticket = data['authTicket'] self.token = auth_ticket['token'] self.authenticated = True else: self.authenticated = False logging.debug("failed to authenticate with LibreView") return def acceptTerms(self): # TODO: finish this # url = "https://api.libreview.io/auth/continue/tou" print("not implemented yet, sorry") def getUser(self): # TODO: finish this # url = "https://api.libreview.io/user" print("not implemented yet, sorry") def getConnections(self): url = "https://api.libreview.io/llu/connections" headers = { "version": "4.7", "product": "llu.ios", "Accept": "application/json", "Authorization": "Bearer " + self.token } response = requests.get(url, headers=headers) json = response.json() if 'data' in json: self.connections = json['data'] def getGraph(self, patientId): url = f"https://api.libreview.io/llu/connections/{patientId}/graph" headers = { "version": "4.7", "product": "llu.ios", "Accept": "application/json", "Authorization": "Bearer " + self.token } response = requests.get(url, headers=headers) json = response.json() if 'data' in json: return json['data'] # TODO: determine what to do on error return json['message'] # FIXME: this probably isn't correct def file_exists(file_path): return os.path.isfile(file_path) def directory_exists(directory_path): return os.path.isdir(directory_path) def clear_terminal(): print("\033c", end='') def signal_handler(signum, frame): global SHOULD_EXIT if SHOULD_EXIT: logging.info("already received exit signal exiting NOW") sys.exit(1) SHOULD_EXIT = True logging.info(f"received {signal.Signals(signum).name} signal shutting down") logging.debug("stack trace:") for frame, filename, lineno, function, _, _ in inspect.getouterframes(frame): logging.debug(f"file: {filename}, line_number: {lineno}, function: {function}") def main(): global SHOULD_EXIT logging.basicConfig( stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) possible_config_file_locations = [ str(os.getenv("HOME")) + "/.config/glucose-monitor/config.ini", "/etc/glucose-monitor/config.ini", "config.ini" ] config_file_location = "" for possible_config_file_location in possible_config_file_locations: if file_exists(possible_config_file_location): config_file_location = possible_config_file_location logging.info("config file found at " + config_file_location) break if config_file_location == "": logging.fatal("could not find a config file") sys.exit(1) config = configparser.ConfigParser() config.read(config_file_location) if 'LOG' in config: if 'LEVEL' in config['LOG']: level = config['LOG']['LEVEL'].lower() if level == "debug": logging.getLogger().setLevel(logging.DEBUG) elif level == "info": logging.getLogger().setLevel(logging.INFO) elif level == "warning": logging.getLogger().setLevel(logging.WARNING) elif level == "error": logging.getLogger().setLevel(logging.ERROR) elif level == "critical": logging.getLogger().setLevel(logging.CRITICAL) else: logging.error("log level specified in config file is not valid") logging.debug("set log level to debug") if 'CREDENTIALS' not in config: # TODO: support credentials in environmental variables logging.fatal("no credentials specified in config file") sys.exit(1) if 'EMAIL' not in config['CREDENTIALS']: # TODO: support credentials in environmental variables logging.fatal("no email specified in config file") sys.exit(1) if 'PASSWORD' not in config['CREDENTIALS']: # TODO: support credentials in environmental variables logging.fatal("no password specified in config file") sys.exit(1) if 'GENERAL' in config and 'INTERVAL' in config['GENERAL']: delay = int(config['GENERAL']['INTERVAL']) else: delay = DEFAULT_INTERVAL show_on_screen = False show_in_terminal = True if 'SCREEN' in config: if 'TYPE' not in config['SCREEN']: logging.fatal("no screen type in config file") sys.exit(1) if config['SCREEN']['TYPE'].lower() not in SUPPORTED_SCREENS: logging.fatal("sorry screen type not supported") sys.exit(1) screen_type = config['SCREEN']['TYPE'].lower() if screen_type == "waveshare_2.23_ssd1305": show_on_screen = True SSD1305_library_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'drive') if directory_exists(SSD1305_library_directory): sys.path.append(SSD1305_library_directory) else: logging.fatal("failed to import SSD1305 library") sys.exit(1) from drive import SSD1305 from PIL import Image, ImageDraw, ImageFont display = SSD1305.SSD1305() display.Init() width = display.width height = display.height image = Image.new('1', (width, height)) padding = 0 top = padding x = 0 draw = ImageDraw.Draw(image) draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer if 'FONT' in config['SCREEN']: font = ImageFont.truetype(config['SCREEN']['FONT'], 8) else: font = ImageFont.truetype(DEFAULT_FONT, 8) def display_on_screen(buffer): display.getbuffer(buffer) display.ShowImage() def clear_display(): buffer = Image.new('1', (width, height)) draw = ImageDraw.Draw(image) draw.rectangle((0, 0, width, height), outline=0, fill=0) # black rectangle to clear screen, without clearing buffer display.getbuffer(buffer) display.ShowImage() logging.info("clearing display") clear_display() if "DISPLAY_IN_TERMINAL" in config['SCREEN']: if config['SCREEN']['DISPLAY_IN_TERMINAL'].lower() == "false": show_in_terminal = False libreview_session = LibreViewSession() libreview_session.authenticate(config['CREDENTIALS']['EMAIL'], config['CREDENTIALS']['PASSWORD']) if libreview_session.authenticated and not SHOULD_EXIT: logging.info("successfully authenticated with LibreView") else: logging.fatal("failed to authenticate with LibreView") sys.exit(1) try: libreview_session.getConnections() except Exception: logging.fatal("failed to get connections") sys.exit(1) if len(libreview_session.connections) > 1: logging.fatal("currently glucose-monitor only supports accounts with one connection") if len(libreview_session.connections) < 1: logging.fatal("to use glucose-monitor your libreview account must have one connection") patient_id = libreview_session.connections[0]['patientId'] first_run = True while not SHOULD_EXIT: if not SHOULD_EXIT and not first_run: time.sleep(delay) elif first_run: if show_in_terminal: clear_terminal() if show_on_screen: clear_display() first_run = False try: graph = libreview_session.getGraph(patient_id) except requests.exceptions.ConnectionError: logging.error("failed to get details about patient, connection error") continue except requests.exceptions.HTTPError: logging.error("failed to get details about patient, API returned invalid response") continue except requests.exceptions.RequestException: logging.error("failed to get details about patient an unknown error occurred") continue try: graph['connection']['glucoseMeasurement'] except KeyError: logging.error("the data returned from the API was not in an expected format") continue glucoseMeasurement = graph['connection']['glucoseMeasurement'] if show_in_terminal: clear_terminal() print(f"Timestamp: {glucoseMeasurement['Timestamp']}\n") print(f"Glucose units: {glucoseMeasurement['Value']}") print(f"Is high: {glucoseMeasurement['isHigh']}") print(f"Is low: {glucoseMeasurement['isLow']}") print(f"Refresh rate: {delay} seconds") if show_on_screen: clear_display() draw.text((x, top), glucoseMeasurement['Timestamp'], font=font, fill=255) draw.text((x, top+8), "Glucose units: " + str(glucoseMeasurement['Value']), font=font, fill=255) # Don't know why these two need to have more spaces to line up with the line ebovee draw.text((x, top+16), "Is high: " + str(glucoseMeasurement['isHigh']), font=font, fill=255) draw.text((x, top+24), "Is low: " + str(glucoseMeasurement['isLow']), font=font, fill=255) display_on_screen(image) logging.debug("exiting now...") if __name__ == "__main__": main()