#!/usr/bin/env python3 # -*- coding: utf-8 -*- ############################################################################## # epgdiff -- MythTV EPG differences utility # # Lists the programmes in the EPG data that are present in future EPG data but # not in the past EPG data. # # Author: J S Worthington # Created: 2021-10-24 ############################################################################## import argparse import asyncio import datetime import lxml import MythTV import os import socket import stat import struct import subprocess import sys import time import traceback from dataclasses import dataclass from enum import IntEnum from pprint import pprint from typing import Dict, List import MythTV.services_api.send as api import MythTV.services_api.utilities as util ############################################################################## # Program exit codes. ############################################################################## class ExitCode(IntEnum): OK = 0 MessageAndExit = 1 MissingPackageOrProgram = 2 WindowingProblem = 3 FrontendProblem = 4 LircProblem = 5 ############################################################################## # Startup - check for packages and programs. ############################################################################## program_name = os.path.basename(sys.argv[0]) if subprocess.call(['which', 'xdotool'], stdout=subprocess.DEVNULL) != 0: print('Executable "xdotool" is missing. Please install it.') print('In Ubuntu, use "apt install xdotool".') exit(ExitCode.MissingPackageOrProgram) ############################################################################## # # Version: 0.1 2021-10-26 # - Text output only. # Version: 0.2 2021-10-26 # - Playing with the frontend API to see if automation of selecting a # programme is possible. # Version: 0.3 2021-11-02 # - Fixed problem with processing the upcoming recordings. # - Added -G GUI option. # Version: 0.4 2021-11-12 # - Added lirc support. # - Fixed window scrolling to keep lines visible at the top and bottom. # Version: 0.5 2021-11-16 # - Added lirc manager mode to be used to switch off "mythtv" lirc clients # when the epgdiff window is on top, and to switch off "epgdiff" lirc # clients when they are not the top window. # Version: 0.6 2021-11-25 # - First working version with GUI and lirc. # Version: 0.7 2021-12-06 # - Make debug log file work when not running as root. # - Default debug output to stdout, so that it will be captured by the # irexec journal when epgdiff is run from irexec. # - Handle unable to connect to mythfrontend. # - Handle window closed externally. # - Handle frontend socket closed (mythfrontend not running now). # Version 0.8 2021-12-27 # - Fix config.xml handling. # Version 0.9 2022-12-14 # - Fix help text for channums. # Version 0.10 2023-04-23 # - Fix for newer, faster PC messing up timings for mythfrontend commands. # Version 0.11 2023-12-23 # - Minor window settings adjustments. # - Add SoHo channel to default list. # - Change .format() to f'' strings. # Version 0.12 2024-04-29 # - Fix a new TypeError traceback caused by MythTV upgrade to v34 - # Dvr/GetUpcomingList now returns Channel.ChanId as an int instead of an # str. # - Remove deleted channel "Jones! Too" (4208) # ############################################################################## VERSION = '0.12' ############################################################################## # Configuration ############################################################################## # Set this to True get debug output. #DEBUG_OUTPUT: bool = True DEBUG_OUTPUT: bool = False # Set this to send debug output to a separate debug log file. #DEBUG_FILE: bool = True DEBUG_FILE: bool = False # Location of default config.xml file. CONFIG_XML: str = '/etc/mythtv/config.xml' # The IP address of the mythbackend to talk to. Default: 127.0.0.1 HOST: str = '127.0.0.1' # The address of the frontend. FE_HOST: str = 'localhost' # The frontend port number. FE_PORT: int = 6546 # List of channel numbers (channel.channum) to do EPG diffs on. DEFAULT_CHANNUM_LIST: List[int] = [19, 4005, 4009, 4010, 4018, 4071, 4075, 4076, 4083, 4210, 4008] #DEFAULT_CHANNUM_LIST: List[int] = [] # List of chanids (channel.chanid) to do EPG diffs on. DEFAULT_CHANID_LIST: List[int] = [] # Default X display to connect to. DEFAULT_DISPLAY: str = ':0' # Window height constraints. # Warning: MIN_WINDOW_HEIGHT must be > 2*KEEP_ON_DISPLAY MIN_WINDOW_HEIGHT: int = 7 MAX_WINDOW_HEIGHT: int = 32 # Window width constraints. MIN_WINDOW_WIDTH: int = 30 MAX_WINDOW_WIDTH: int = 200 # Keep this number of lines of text on display above and below the current # selected line, if possible. KEEP_ON_DISPLAY: int = 3 # Number of lines to move when the PgUp or PgDn keys are used. PG_KEY_DISTANCE: int = 10 # Lirc socket to create and manage when running in lirc_manager mode and the # lirc socket to connect to in window mode. LIRC_SOCKET: str = '/dev/lircd' # The lirc source socket to connect to when running in lirc_manager mode. # This should be the same socket created by lircd, as configured in # /etc/lirc/lirc_options.conf in the "output=" setting, or an alias or link # to that socket. LIRC_SOURCE: str = '/run/lirc/lircd' # List of locations to try to write the debug log file to. These locations # are tried in order, and if they all fail, no debug log file is written - the # debug output will be sent to sysout. DEBUG_DIRS: dict[str] = ['/var/log/mythtv', '/var/log', '/tmp', '.'] # Delay time to use before checking mythfrontend has moved to the new location. # There is a problem with mythfrontend - when a key command is sent to it, it # takes some time to respond and do the screen update. Until the screen update # happens, the a query to mythfrontend for its current location will return # the old location. So a delay is needed before checking the location. The # faster the machine mythfrontend is running on, the longer the delay needed. # If epgdiff exists with a "wrong location" error, increase the DELAY time. # If it takes too long for epgdiff to navigate to the correct mythfrontend # screen, you can try reducing the DELAY time until "wrong location" errors # occur, then increase it a bit until the "wrong location" errors stop # happening. Units: Seconds DELAY: float = 0.1 ############################################################################## # Constants ############################################################################## # Programme line prefix PLP: str = ' ' # Length of the programme line prefix. PLPL: int = len(PLP) # Window/taskbar icon (.png encoded as base64) icon = b'' ############################################################################## # Debug output. ############################################################################## if DEBUG_OUTPUT: global epgdiff_debug epgdiff_debug = None global previous_end previous_end = '\n' def dprint_init() -> None: global epgdiff_debug if epgdiff_debug == None: epgdiff_debug = sys.stdout if DEBUG_FILE: for debug_dir in DEBUG_DIRS: try: epgdiff_debug = open(debug_dir + '/' + program_name + '-debug.log', 'w', 1) except: continue break dprint(program_name + ' debug output started') def dprint(*args, end='\n', **kwargs) -> None: global epgdiff_debug global previous_end if epgdiff_debug != None: if previous_end == '\n': print(datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3] + ' ', *args, **kwargs, file=epgdiff_debug) else: print(*args, end=end, file=epgdiff_debug, **kwargs) previous_end = end def dpprint(*args, **kwargs) -> None: pprint(*args, **kwargs, stream=epgdiff_debug) else: def dprint_init() -> None: pass def dprint(*args, end='\n', **kwargs) -> None: pass def dpprint(*args, **kwargs) -> None: pass dprint_init() ############################################################################## # Recording status. ############################################################################## class RecStatus(IntEnum): Pending = -15 Failing = -14 #OtherRecording = -13 (obsolete) #OtherTuning = -12 (obsolete) MissedFuture = -11 Tuning = -10 Failed = -9 TunerBusy = -8 LowDiskSpace = -7 Cancelled = -6 Missed = -5 Aborted = -4 Recorded = -3 Recording = -2 WillRecord = -1 Unknown = 0 DontRecord = 1 PreviousRecording = 2 CurrentRecording = 3 EarlierShowing = 4 TooManyRecordings = 5 NotListed = 6 Conflict = 7 LaterShowing = 8 Repeat = 9 Inactive = 10 NeverRecord = 11 Offline = 12 #OtherShowing = 13 (obsolete) ############################################################################## # Status of a title in the upcoming recordings list. ############################################################################## class RecStatus2(IntEnum): WillNotRecord = 0 WillRecord = 1 PreviouslyRecorded = 2 LaterShowing = 3 Conflict = 4 NotListed = 5 BadRecording = 6 def __str__(self): return ' RPLCNB'[int(self)] def map_RecStatus(status: RecStatus) -> RecStatus2: if status == RecStatus.Conflict: return RecStatus2.Conflict elif status == RecStatus.LaterShowing: return RecStatus2.LaterShowing elif status == RecStatus.NotListed: return RecStatus2.NotListed elif int(status) <= RecStatus.Aborted: return RecStatus2.BadRecording elif status == RecStatus.Recording or status == RecStatus.WillRecord: return RecStatus2.WillRecord elif status == RecStatus.CurrentRecording or status == RecStatus.PreviousRecording: return RecStatus2.PreviouslyRecorded else: return RecStatus2.WillNotRecord def update_RecStatus(status: RecStatus, old_status:RecStatus2) -> RecStatus2: status2 = map_RecStatus(status) if status2 > old_status: return status2 else: return old_status ############################################################################## # Hex id() ############################################################################## def hexid(obj): return hex(id(obj)) ############################################################################## # Read config.xml file to get database settings only (other settings ignored). ############################################################################## class ConfigXml: DBHostName = 'localhost' DBUsername = 'mythtv' DBPassword = 'mythtv' DBName = 'mythconverg' DBPort = 3306 _conf_trans = { 'Host':'DBHostName', 'UserName':'DBUserName', 'Password':'DBPassword', 'DatabaseName':'DBName', 'Port':'DBPort' } def readXML(self, filename): if not os.access(filename, os.R_OK): dprint('File ' + filename + ' not accessible!') return False try: config = lxml.etree.parse(filename) for child in config.xpath('/Configuration/Database')[0].getchildren(): if child.tag in self._conf_trans: #dprint('child.tag=' + str(child.tag)) setattr(self, self._conf_trans[child.tag], child.text) except Exception as e: dprint(str(e)) dprint('lxml.etree.parse failed') return False return True ############################################################################## # Backend class. Handles communication with a backend and database selected # using a config.xml file. If no config.xml file is provided, the local # database and backend are used. ############################################################################## class Backend(): def __init__(self, config_xml='') -> None: dprint('config_xml="' + config_xml + '"') self.config_xml = config_xml if self.config_xml != '': self.remote_config = ConfigXml() dprint(f'{self.remote_config=}') if not self.remote_config.readXML(self.config_xml): dprint('Failed to read or correctly parse ' + self.config_xml + ' file, using ' + HOST) self.config_xml = '' self.host = HOST # dprint(str(self.remote_config.DBHostName)) # dprint(str(self.remote_config.DBUsername)) # dprint(str(self.remote_config.DBPassword)) # dprint(str(self.remote_config.DBName)) # dprint(str(self.remote_config.DBPort)) self.host = self.remote_config.DBHostName dprint('self.host = ' + self.host) else: self.host = HOST self.backend = api.Send(self.host) # Initialise the UTCOffset so that the conversion to local time works. api.Send(self.host) util.get_utc_offset(self.backend) # Connect to the database. if self.config_xml != '': try: self.db = MythTV.MythDB( DBHostName = self.remote_config.DBHostName, DBUsername = self.remote_config.DBUsername, DBPassword = self.remote_config.DBPassword, DBName = self.remote_config.DBName, DBPort = self.remote_config.DBPort ) except MythTV.exceptions.MythDBError as e: if e.ecode == MythTV.MythError.DB_SCHEMAMISMATCH: dprint('Schema mismatch connecting to ' + self.host + ' database, trying again with schema ' + str(e.remote)) MythTV.MythDB._schema_local = e.remote self.db = MythTV.MythDB( DBHostName = self.remote_config.DBHostName, DBUsername = self.remote_config.DBUsername, DBPassword = self.remote_config.DBPassword, DBName = self.remote_config.DBName, DBPort = self.remote_config.DBPort ) dprint('After retry') else: raise MythTV.MythSchema._schema_local = MythTV.SCHEMA_VERSION else: self.db = MythTV.MythDB() self.dbc = self.db.cursor() def get_upcoming_recordings(self) -> List[str]: try: resp_dict = self.backend.send(endpoint='Dvr/GetUpcomingList?ShowAll=True') except: dprint(self.host + ' get_upcoming_recordings(): Exception') return [] if list(resp_dict.keys())[0] in ['Abort', 'Warning']: #sys.exit(f'\n{list(resp_dict.values())[0]}\n') dprint(self.host + ' get_upcoming_recordings(): Abort or Warning') dprint(str(resp_dict)) progs = [] else: progs = resp_dict['ProgramList']['Programs'] return progs ############################################################################## # If epgdiff is already running and displaying a window, activate it and # exit. ############################################################################## def activate_existing_epgdiff_window() -> None: result = subprocess.run(['xdotool', 'search', '--name', program_name], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) if result.returncode != 0: dprint('Existing epgdiff window not found') return window_id = int(result.stdout) dprint(f'{window_id=}') result = subprocess.run(['xdotool', 'windowactivate', str(window_id)]) if result.returncode != 0: print('Unable to switch to existing epgdiff window, exiting.') exit(ExitCode.WindowingProblem) print('Existing epgdiff window activated, exiting.') exit(ExitCode.OK) ############################################################################## # Activate the mythfrontend window so that it will receive typed characters. ############################################################################## def activate_mythfrontend_window() -> None: result = subprocess.run(['xdotool', 'search', '--name', 'MythTV Frontend'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) if result.returncode != 0: print('Mythfrontend does not appear to be running, exiting.') exit(ExitCode.WindowingProblem) window_id = int(result.stdout.split(b'\n')[0]) dprint(f'{window_id=}') result = subprocess.run(['xdotool', 'windowactivate', str(window_id)]) if result.returncode != 0: print('Unable to switch to mythfrontend window, exiting.') exit(ExitCode.WindowingProblem) ############################################################################## # Main ############################################################################## parser = argparse.ArgumentParser( description='MythTV EPG Diff (Version: ' + VERSION + ')', formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=15) ) parser.add_argument('-A', '--all-channels', dest='all_channels', default=False, action='store_true', help='Ignore chanid and channum lists and diff all chanids that have EPG data') parser.add_argument('-V', '--version', dest='version', default=False, action='store_true', help='Display the version number and exit') parser.add_argument('-G', '--gui', dest='gui', default=False, action='store_true', help='Use the GUI interface') parser.add_argument('-M', '--lirc-manager', type=str, dest='lirc_manager', default='', action='store', help='Run as a daemon managing the lirc connections to ' + LIRC_SOCKET + '. Requires the username for the user running the X session for MythTV') parser.add_argument('-a', '--all', dest='all', default=False, action='store_true', help='Display all new programmes (including those that match recording rules)' + ' - also displays recording status for programmes that match recording rules' ) parser.add_argument('-D', '--delay', type=float, dest='delay', default=DELAY, action='store', help='Set the delay time between key commands to mythfrontend. See the DELAY value in the Configuration section of the program code for more explanation.') parser.add_argument('-d', '--dump-upcoming', dest='dump_upcoming', default=False, action='store_true', help='Dump the upcoming recordings data (debug option)') parser.add_argument('-c', '--config', type=str, action='store', default=CONFIG_XML, help='Location of MythTV config.xml file (default: ' + CONFIG_XML + ')') parser.add_argument('-f', '--fast', dest='fast', default=False, action='store_true', help='Do not fetch the upcoming recordings list, which takes a long time. ' + 'This will cause all new programmes to be listed, regardless of their matching existing recording rules.' ) parser.add_argument('-i', '--chanids', nargs='+', type=int, dest='chanids', action='store', default=[], help='Space separated list of chanid values. Can be used with -n') parser.add_argument('-l', '--lirc', dest='lirc', default=False, action='store_true', help='Use lirc (infrared remote control') parser.add_argument('-n', '--channums', nargs='+', type=int, dest='channums', action='store', default=[], help='Space separated list of channum values. Can be used with -i') parser.add_argument('-t', '--themes', dest='themes', default=False, action='store_true', help='Try out themes (debug option)') args = parser.parse_args() if args.version: print('Version '+VERSION) exit(ExitCode.MessageAndExit) if os.environ.get('DISPLAY') == None: os.environ['DISPLAY'] = DEFAULT_DISPLAY if args.lirc_manager != '': try: import psutil except ImportError: print('Please install the Python 3 psutil module. On some systems this will be available as a system package.') print('On Ubuntu, use "apt install python3-psutil". Otherwise try "pip3 install psutil".') exit(ExitCode.MissingPackageOrProgram) # Get the window ID of the currently active window. async def active_window_id(): proc = await asyncio.create_subprocess_shell('sudo -u ' + args.lirc_manager + ' xdotool getactivewindow', shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) result, result2 = await proc.communicate() #dprint(f'{dir(proc)=}') dprint(f'active_window_id: {proc.returncode=} {result=}') if proc.returncode != 0: print('Unable to get the currently active window number, exiting') return None dprint('active_window_id: return next') return int(result) class LircManager: class LircClient: async def reader_writer(self, direction, reader, writer, manager) -> None: # The 'direction' parameter is only used for debug output. dprint(f'reader_writer {hexid(self)} {direction}: start') dprint(f'reader_writer {hexid(self)} {direction}: {hexid(manager.epgdiff_clients_connected)=}') while True: dprint(f'reader_writer {hexid(self)} {direction}: before reader.readline()') try: line = await reader.readline() except Exception as e: print(traceback.format_exc()) break dprint(f'reader_writer {hexid(self)} {direction}: {line=} {manager.epgdiff_clients_connected=}') dprint(f'reader_writer {hexid(self)} {direction}: before at_eof()') if reader.at_eof(): dprint(f'reader_writer {hexid(self)} {direction}: at_eof()') break window_id = await active_window_id() dprint(f'reader_writer {hexid(self)} {direction}: {self.is_epgdiff=} {self.window_id=} {window_id=}') if window_id == None: print('Unable to obtain the active window ID, exiting.') break if manager.epgdiff_clients_connected == 0 or \ (self.is_epgdiff and self.window_id == window_id and manager.epgdiff_window_id == window_id) or \ (not self.is_epgdiff and window_id != manager.epgdiff_window_id): writer.write(line) dprint(f'reader_writer {hexid(self)} {direction}: wrote {line=}') else: dprint(f'reader_writer {hexid(self)} {direction}: discarded {line=}') dprint(f'reader_writer {hexid(self)} {direction}: end of loop') dprint(f'reader_writer {hexid(self)} {direction}: cleanup') await writer.drain() writer.close() self.close(manager) dprint(f'reader_writer {hexid(self)} {direction}: end') def __init__(self, client_reader, client_writer, manager): dprint(f'LircClient {hexid(self)} __init__: {hexid(manager.epgdiff_clients_connected)=}') self.closed = False self.client_reader = client_reader self.client_writer = client_writer self.window_id = None pid, uid, gid = struct.unpack('3i', client_reader._transport._sock.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize('3i'))) cmdline = psutil.Process(pid).cmdline() if cmdline[0][:6] == 'python': client_name = os.path.basename(cmdline[1]) else: client_name = os.path.basename(cmdline[0]) self.is_epgdiff = (client_name == program_name) dprint(f'LircClient {hexid(self)}: {cmdline=} {client_name=} {self.is_epgdiff=}') if self.is_epgdiff: manager.epgdiff_clients_connected += 1 dprint(f'LircClient {hexid(self)}: {manager.epgdiff_clients_connected=}') async def init(self, manager): dprint(f'LircClient {hexid(self)} init: {manager.epgdiff_clients_connected=}') self.window_id = await active_window_id() if self.is_epgdiff: manager.epgdiff_window_id = self.window_id dprint(f'LircClient {hexid(self)} init: {manager.epgdiff_window_id=}') (self.lircd_reader, self.lircd_writer) = await asyncio.open_unix_connection(LIRC_SOURCE) self.lircd_to_client_task = asyncio.create_task( self.reader_writer('lircd_to_client', self.lircd_reader, self.client_writer, manager) ) self.client_to_lircd_task = asyncio.create_task( self.reader_writer('client_to_lircd', self.client_reader, self.lircd_writer, manager) ) def close(self, manager): dprint(f'LircClient {hexid(self)} close') if not self.closed: self.closed = True if self.is_epgdiff: dprint(f'LircClient {hexid(self)} close: {manager.epgdiff_clients_connected=}') manager.epgdiff_clients_connected -= 1 dprint(f'LircClient {hexid(self)} close: {manager.epgdiff_clients_connected=}') manager.epgdiff_window_id = None self.lircd_to_client_task.cancel() self.client_to_lircd_task.cancel() async def lirc_server(self, reader, writer) -> None: dprint(f'lirc_server {hexid(self)}: creating client') client = self.LircClient(reader, writer, self) self.client_list.append(client) await client.init(self) dprint(f'lirc_server {hexid(self)}: client={hexid(client)}') def __init__(self): print(program_name + ': Running in lirc_manager mode') self.client_list = [] self.epgdiff_clients_connected = 0 self.epgdiff_window_id = None if os.path.exists(LIRC_SOCKET): os.remove(LIRC_SOCKET) loop = asyncio.get_event_loop() self.skt = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.skt.bind(LIRC_SOCKET) os.chmod(LIRC_SOCKET, stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP|stat.S_IWGRP|stat.S_IROTH|stat.S_IWOTH) server = loop.run_until_complete(asyncio.start_unix_server(self.lirc_server, sock=self.skt)) try: loop.run_forever() except KeyboardInterrupt: pass # Cleanup for client in self.client_list: client.close(self) server.close() loop.run_until_complete(server.wait_closed()) loop.close() os.remove(LIRC_SOCKET) print('\n' + program_name + ': lirc_manager mode exited') exit(ExitCode.OK) LircManager()() try: import pyautogui except ImportError: print('Please install the Python 3 PyAutoGUI module. Try "pip3 install pyautogui".') print('The pyautogui module requires python3-xlib. This will be installed automatically') print('by pip3, but on many systems (including Ubuntu) it is better to install it as a') print('system package. On Ubuntu, use "apt install python3-xlib" and do that before') print('running pip3.') exit(ExitCode.MissingPackageOrProgram) try: import PySimpleGUI as sg except ImportError: print('Please install the Python 3 PySimpleGUI module. Try "pip3 install pysimplegui".') exit(ExitCode.MissingPackageOrProgram) if args.lirc: try: import lirc except ImportError: print('Unable to import the Python 3 lirc module. This is normally installed as part of the main lirc package - is the main lirc package installed?.') exit(ExitCode.MissingPackageOrProgram) if args.themes: # Choose a Theme for the Layout sg.theme('DarkBlue16') layout = [[sg.Text('List of InBuilt Themes')], [sg.Text('Please Choose a Theme to see Demo window')], [sg.Listbox(values = sg.theme_list(), size =(20, 12), key ='-LIST-', enable_events = True)], [sg.Button('Exit')]] window = sg.Window('Theme List', layout) # This is an Event Loop while True: event, values = window.read() if event in (None, 'Exit'): break sg.theme(values['-LIST-'][0]) sg.popup_get_text(f"This is {values['-LIST-'][0]}") # Close window.close() exit(ExitCode.OK) # If there is an exisiting epgdiff window, activate it and exit. activate_existing_epgdiff_window() be = Backend(args.config) if not args.fast: upcoming = be.get_upcoming_recordings() rec_status: Dict[str, int] = {} if upcoming==[]: print('No upcoming recordings') elif args.dump_upcoming: print('Upcoming recordings = ') dpprint(upcoming) rec_status_count = {} for recording in upcoming: status = int(recording['Recording']['Status']) title = str(recording['Channel']['ChanId']) + '|' + recording['Title'] count = rec_status_count.get(status) if count == None: rec_status_count[status] = 1 else: rec_status_count[status] = count + 1 old_status = rec_status.get(title) if old_status == None: rec_status[title] = map_RecStatus(RecStatus(status)) else: rec_status[title] = update_RecStatus(RecStatus(status), old_status) if args.dump_upcoming: for key, value in rec_status_count.items(): dprint(key, RecStatus(int(key)), value) channum_chanids = [] if args.all_channels: args.chanids = [] rows = be.dbc.execute("SELECT DISTINCT chanid FROM program ORDER BY chanid;") for chanid_row in be.dbc: args.chanids.append(chanid_row[0]) else: if args.chanids == [] and args.channums == []: args.channums = DEFAULT_CHANNUM_LIST args.chanids = DEFAULT_CHANID_LIST for channum in args.channums: rows = be.dbc.execute(f'SELECT chanid FROM channel WHERE channum={channum};') for chanid_row in be.dbc: channum_chanids.append(chanid_row[0]) @dataclass class Channel: @dataclass class Programme: title: str status: str chanid: int channum: str callsign: str name: str programmes: List[Programme] channels: List[Channel] = [] for chanid in channum_chanids + args.chanids: rows = be.dbc.execute(f'SELECT chanid,channum,callsign,name FROM channel WHERE chanid={chanid};') row = be.dbc.fetchone() if row == None: dprint('Invalid chanid: ' + str(chanid)) else: channel = Channel(int(row[0]), row[1], row[2], row[3], []) #dprint(channel.chanid, '-', channel.channum, '-', channel.callsign, '-', channel.name) sql = f""" select distinct title from program where chanid={chanid} and starttime >= utc_timestamp() and title not in (select distinct title from program where chanid={chanid} and starttime < utc_timestamp()) order by title; """ rows = be.dbc.execute(sql) for fetched in be.dbc: title = fetched[0] if args.fast: dprint(PLP + title) channel.programmes.append(Channel.Programme(title, '')) else: found_status = rec_status.get(str(chanid) + '|' + title, RecStatus2.WillNotRecord) if args.all or found_status == RecStatus2.WillNotRecord: channel.programmes.append(Channel.Programme(title, str(found_status))) channels.append(channel) output = [] for channel in channels: output.append(str(str(channel.chanid) + ' - ' + channel.channum + ' - ' + channel.callsign + ' - ' + channel.name)) for programme in channel.programmes: if args.fast: output.append(PLP + programme.title) else: if str(programme.status) == ' ': output.append(PLP + programme.title) else: output.append(PLP + programme.title + " '" + programme.status + "'") if args.gui: # Modified version of AsyncConnection from the lirc async_client.py file. # On closing, puts None in the queue to allow the calling module to know # that the connection is closed and it can shut down. from lirc.client import AbstractConnection as AbstractConnection class AsyncConnectionModified(object): ''' Asynchronous read interface on top of an AbstractConnection. Parameters: - connection: Typically a lirc.RawConnection or lirc.LircdConnection. - loop: AbstractEventLoop, typically obtained using asyncio.get_event_loop(). ''' def __init__(self, connection: AbstractConnection, loop: asyncio.AbstractEventLoop): def read_from_fd(): ''' Read data from the connection fd and put into queue. ''' line = self._conn.readline(0) if line: asyncio.ensure_future(self._queue.put(line)) self._conn = connection self._loop = loop self._queue = asyncio.Queue() self._loop.add_reader(self._conn.fileno(), read_from_fd) async def close(self): ''' Clean up loop and the base connection. ''' self._loop.remove_reader(self._conn.fileno()) await self._queue.put(None) async def readline(self) -> str: ''' Asynchronous get next line from the connection. ''' return await self._queue.get() def __aiter__(self): ''' Return async iterator. ''' return self async def __anext__(self): ''' Implement async iterator.next(). ''' return await self._queue.get() async def __aenter__(self): ''' Implement "async with". ''' return self async def __aexit__(self, exc_type, exc, traceback): ''' Implement exit from "async with". ''' self.close() class GUI: content: str line_no: int line_count: int width: int height: int first_programme: str window: sg.Window ir_q: asyncio.Queue try: fe = MythTV.Frontend(FE_HOST, FE_PORT) except MythTV.exceptions.MythFEError: print('Unable to connect to mythfrontend') exit(ExitCode.FrontendProblem) # The MythTV Python bindings have a bug - MythTV.MythError.CLOSEDSOCKET # is undefined, but is attempted to be raised when a socket closed # error happens. So instead an AttributeError occurs. The following # code has a workaround until this is fixed. The fix should instead # be raising MythTV.MythError.SOCKET, so this code will work with both # that and AttributeError. def do_jump(self, jumppoint, expected_location) -> None: dprint('jumppoint: ' + jumppoint) try: result = self.fe.jump[jumppoint] except Exception as e: if type(e) == MythTV.exceptions.MythError or type(e) == AttributeError: result = False else: raise if not result: print(' Jump failed') exit(ExitCode.FrontendProblem) actual_location = self.fe.sendQuery('location') dprint(f' location: {actual_location}') if actual_location != expected_location: print(' Wrong location') exit(ExitCode.FrontendProblem) def do_key(self, key, expected_location) -> None: dprint('key: ' + str(key)) try: result = self.fe.key[key] except Exception as e: if type(e) == MythTV.exceptions.MythError or type(e) == AttributeError: result = False else: raise if not result: print(' Key failed') exit(ExitCode.FrontendProblem) time.sleep(args.delay) actual_location = self.fe.sendQuery('location') dprint(f' location: {actual_location}') if actual_location != expected_location: print(' Wrong location') exit(ExitCode.FrontendProblem) def gotoTitleSearch(self) -> None: self.do_jump('mainmenu', 'mainmenu') self.do_key('down', 'mainmenu') self.do_key('right', 'manage_recordings.xml') self.do_key('right', 'tv_schedule.xml') self.do_key('down', 'tv_schedule.xml') self.do_key('down', 'tv_schedule.xml') self.do_key('right', 'tv_search.xml') self.do_key('right', 'ProgLister') self.do_key('right', 'ProgLister') def search_for_title(self, title) -> None: dprint(f'search_for_title: {title=}') self.gotoTitleSearch() activate_mythfrontend_window() subprocess.run(['wmctrl', '-a "MythTV Frontend"']) pyautogui.write(title) self.do_key('down', 'ProgLister') self.do_key('enter', 'ProgLister') class Direction(IntEnum): Up = -1 Down = 1 # Is line_no outside the limits of the window? def outside_limits(self, line_no: int, direction: Direction) -> bool: if direction == self.Direction.Up: if line_no < 1: return True else: return False else: if line_no >= self.line_count: return True else: return False # Is a line of text a programme line (does it have a programme line prefix at # the start of the line)? def is_programme_line(self, line_no: int) -> bool: return self.content[line_no][0:PLPL] == PLP # Move which line is selected in the window. def move_selection(self, direction: Direction, distance: int = 1) -> None: indexes = self.window['selected'].get_indexes() dprint(f'{indexes=}') index_valid = True try: self.line_no = indexes[0] except IndexError: index_valid = False if index_valid: new_line_no = self.line_no + int(direction) * distance while True: if self.outside_limits(new_line_no, direction): if direction == self.Direction.Up: new_line_no = 1 direction = self.Direction.Down else: new_line_no = self.line_count - 1 direction = self.Direction.Up if self.is_programme_line(new_line_no): break new_line_no += int(direction) if new_line_no != self.line_no: if new_line_no < self.first_displayed_line: # Scroll to keep the selected line visible. new_first_displayed_line = self.first_displayed_line - (self.line_no - new_line_no) if new_first_displayed_line < 0: new_first_displayed_line = 0 self.first_displayed_line = new_first_displayed_line elif new_line_no >= self.first_displayed_line + self.height: # Scroll to keep the selected line visible. new_first_displayed_line = self.first_displayed_line + (new_line_no - self.line_no) if new_first_displayed_line > self.line_count - self.height: new_first_displayed_line = self.line_count - self.height self.first_displayed_line = new_first_displayed_line self.line_no = new_line_no dprint(f'1: {self.line_no=}, {self.first_displayed_line=}') distance = self.line_no - self.first_displayed_line dprint(f'Top: {distance=}') if distance <= KEEP_ON_DISPLAY: self.first_displayed_line -= KEEP_ON_DISPLAY - distance if self.first_displayed_line < 0: self.first_displayed_line = 0 else: distance = self.height - (self.line_no - self.first_displayed_line) dprint(f'Bottom: {distance=} {self.height=}') if distance <= KEEP_ON_DISPLAY: self.first_displayed_line += KEEP_ON_DISPLAY - distance + 1 dprint(f'2: {self.line_no=}, {self.first_displayed_line=}') self.window['selected'].update(scroll_to_index = self.first_displayed_line, set_to_index = self.line_no) dprint(f'{self.line_no=} {self.first_displayed_line=}') def __init__(self, content): self.content = content self.line_count = len(output) self.first_displayed_line = 0 self.first_programme = '' self.width = MIN_WINDOW_WIDTH if self.line_count > MAX_WINDOW_HEIGHT: self.height = MAX_WINDOW_HEIGHT else: self.height = self.line_count if self.height < MIN_WINDOW_HEIGHT: self.height = MIN_WINDOW_HEIGHT self.line_no = 0 for line in self.content: lline = len(line) if lline > self.width: self.width = lline if self.first_programme == '': if self.is_programme_line(self.line_no): self.first_programme = line else: self.line_no += 1 if self.width > MAX_WINDOW_WIDTH: self.width = MAX_WINDOW_WIDTH #sg.theme('DarkBlue16') layout = [ [sg.Listbox( values=self.content, default_values = [self.first_programme], enable_events=True, select_mode = sg.LISTBOX_SELECT_MODE_SINGLE, bind_return_key = True, key='selected', size=(self.width, self.height), )], [sg.Button('Exit')] ] self.window = sg.Window( 'MythTV epgdiff', layout, icon=icon, font=('Helvetica', 20), return_keyboard_events=True, finalize=True ) #dpprint(dir(self.window)) dprint(f'{self.first_programme=}') #print = sg.Print self.ir_q = asyncio.Queue() if args.lirc: try: self.ir = lirc.LircdConnection(program_name, socket_path=LIRC_SOCKET) except RuntimeError: print('Cannot open lircd socket ' + LIRC_SOCKET) exit(ExitCode.LircProblem) else: self.ir = None async def sg_event_loop(self) -> None: while True: if self.ir_q.qsize() != 0: event = await self.ir_q.get() values = None dprint(f'lirc event={event}') else: event, values = self.window.read(0) if event == None: event = 'Exit' if event == sg.TIMEOUT_KEY: await asyncio.sleep(0.01) else: dprint(f'{event=} {values=}') for prefix in ['KP_', 'MouseWheel:']: if event.startswith(prefix): event = event[len(prefix):] event = event.rstrip('0123456789:') if event in (sg.WIN_CLOSED, 'Exit', 'Escape'): break elif event == 'Up': self.move_selection(self.Direction.Up) elif event == 'Down': self.move_selection(self.Direction.Down) elif event == 'Next': # PgDn keys self.move_selection(self.Direction.Down, PG_KEY_DISTANCE) elif event == 'Prior': # PgUp keys self.move_selection(self.Direction.Up, PG_KEY_DISTANCE) elif event == 'selected': self.move_selection(self.Direction.Down, 0) elif event == "Enter" or event == 'Return': dprint(event) self.search_for_title(self.content[self.line_no][PLPL:]) if self.ir != None: await self.conn.close() dprint('ir closed') self.window.close() if values == None: dprint('values: None') else: dprint('values:', values['selected']) if args.lirc: async def get_lines(self): self.conn = AsyncConnectionModified(self.ir, asyncio.get_event_loop()) while True: keypress = await self.conn.readline() await self.ir_q.put(keypress) dprint(f'{keypress=}') if keypress == None or keypress == 'Escape': break await self.conn.close() self.ir.close() self.ir = None dprint('get_lines finished') async def run(self): self.sg_event_loop_task = asyncio.create_task(self.sg_event_loop()) if self.ir != None: self.get_lines_task = asyncio.create_task(self.get_lines()) await asyncio.gather(self.get_lines_task, self.sg_event_loop_task) else: await asyncio.gather(self.sg_event_loop_task) dprint('After gather') def event_loop_exception_handler(loop, context): e = context.get('exception') self.sg_event_loop_task.cancel() if self.ir != None: self.get_lines_task.cancel() if e != None: dprint(f'event_loop_exception_handler: {e=}') raise e def main(self): loop = asyncio.get_event_loop() loop.set_exception_handler(self.event_loop_exception_handler) asyncio.run(self.run()) w = GUI(output) w.main() else: for line in output: print(line) exit(ExitCode.OK)