#!/usr/bin/python3 """ xmltv-proc-nz by Hadley Rich Licensed under the BSD License. Processes an XMLTV file in various ways. To use pipe an XML file like so: cat freeview.xml | xmltv-proc-nz > better-file.xml or: xmltv-proc-nz freeview.xml > better-file.xml Changes: JSW = Stephen Worthington 0.5.9b JSW - Change BASE_URL from nzepg.org to epg.org.nz - Fetch JSON data from mypvr.jsw.gen.nz instead of BASE_URL 0.5.9c JSW - Comment out TV1 BBCWorld processing as it is now unused and also broken. 0.5.9d JSW - Add SearchReplaceTitleLocal() to use local web server JSON data. 0.5.9e Wade MaxField - Change EpDesc to work with NZ series/episode data in subtitles and descriptions. 0.5.9e JSW - Process Sky Movies channels to put the subtitle data into the description, the title into the subtitle and change the title to "Movie". This is a JSW customisation and will not be wanted by everyone, so it is controlled by the JSW flag. 0.5.9f JSW - Fix the PlusOnes processing for the new Freeview lineup from March 2022. 0.5.9g JSW - Fix exceptions in Sky Movies processing when there is no subtitle. 0.6.0 JSW - Convert to Python 3. - Fix post processing. - Delete BBCWorld processing. - Generalise JSON base URL processing to use a JSON base URL list. - Reverse the default for BaseProcessor.valid. Set valid=True when valid data is obtained from one URL, even if other URLs fail. - Remove JSW flag - now works by whether it finds the matching json data. - Make PlusOnes use json configuration. 0.6.1 JSW - Make failure to access version update data a warning instead of a failure. This was causing xmltv-proc-nz to fail as epg.org.nz is no longer working (although the site still exists). - Remove epg.org.nz as it is no longer working. 0.6.2 JSW - Add a file:/// URL to JSON_BASE_URLS to support the use of /etc/mythtv-epg-nz for configuring xmltv-proc-nz, as required for Andrew Ruthven's .deb install. 0.6.3 JSW - Fix Sky Movies procssing, so that it works. - Sky Movies: Move PREMIERE to description. - Sky Movies: Join up split titles where part of the title is in the subtitle. """ #TODO: Find repeats #TODO: Regex replacements for categories import csv import json import logging import time import re import sys import urllib.request, urllib.parse, urllib.error from xml.etree import cElementTree as ElementTree from datetime import datetime, timedelta, tzinfo from optparse import OptionParser try: import tmdb except ImportError: tmdb = False try: import tvdb_api except ImportError: tvdb = False else: tvdb = tvdb_api.Tvdb(language='en') NAME = 'xmltv-proc-nz' #URL = 'http://nice.net.nz/xmltv-proc-nz' URL = 'http://www.jsw.gen.nz/mythtv/xmltv-proc-nz' VERSION = '0.6.2 JSW' #BASE_URL = 'http://epg.org.nz' BASE_URL = '' #JSON_BASE_URLS = ['http://epg.org.nz', 'http://localhost/json', 'file:///etc/mythtv-epg-nz/xmltv-proc-nz/json'] JSON_BASE_URLS = ['http://localhost/json', 'file:///etc/mythtv-epg-nz/xmltv-proc-nz/json'] TIME_FORMAT = '%Y%m%d%H%M%S' LOG_LEVEL = logging.INFO #LOG_LEVEL = logging.WARNING #LOG_LEVEL = logging.DEBUG log = logging.getLogger(NAME) logging.basicConfig(level=LOG_LEVEL, format='%(message)s') class UTC(tzinfo): """ Represents the UTC timezone """ def utcoffset(self, dt): return timedelta(0) def tzname(self, dt): return "UTC" def dst(self, dt): return timedelta(0) class LocalTimezone(tzinfo): """ Represents the computers local timezone """ def __init__(self): self.STDOFFSET = timedelta(seconds = -time.timezone) if time.daylight: self.DSTOFFSET = timedelta(seconds = -time.altzone) else: self.DSTOFFSET = self.STDOFFSET self.DSTDIFF = self.DSTOFFSET - self.STDOFFSET tzinfo.__init__(self) def utcoffset(self, dt): if self._isdst(dt): return self.DSTOFFSET else: return self.STDOFFSET def dst(self, dt): if self._isdst(dt): return self.DSTDIFF else: return timedelta(0) def tzname(self, dt): return time.tzname[self._isdst(dt)] def _isdst(self, dt): tt = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.weekday(), 0, -1) stamp = time.mktime(tt) tt = time.localtime(stamp) return tt.tm_isdst > 0 localtz = LocalTimezone() utc = UTC() def urlopen(url): return urllib.request.urlopen(urllib.request.Request(url, headers={'User-Agent': '%s/%s' % (NAME, VERSION)})) # Convert a Python 2 sort() cmp= function into a key= function def cmp_to_key(mycmp): class K: def __init__(self, obj, *args): self.obj = obj def __lt__(self, other): return mycmp(self.obj, other.obj) < 0 def __gt__(self, other): return mycmp(self.obj, other.obj) > 0 def __eq__(self, other): return mycmp(self.obj, other.obj) == 0 def __le__(self, other): return mycmp(self.obj, other.obj) <= 0 def __ge__(self, other): return mycmp(self.obj, other.obj) >= 0 def __ne__(self, other): return mycmp(self.obj, other.obj) != 0 return K class BaseProcessor(object): valid = False def __call__(self, programme): raise NotImplementedError def post_process(self, programmes): raise NotImplementedError class Overrides(BaseProcessor): """ Use a web service to override shows in specific timeslots. """ def __init__(self): if not tvdb: log.warning('Overrides: tvdb_api module not found.') self.overrides = None for json_base_url in JSON_BASE_URLS: try: data = urlopen('%s/overrides/+json' % json_base_url).read() except IOError: log.warning('Overrides: Fetching data from %s failed.' % json_base_url) else: try: overrides = json.loads(data) if self.overrides == None: self.overrides = overrides else: self.overrides += overrides except ValueError: log.warning('Overrides: JSON parse from %s failed.' % json_base_url) else: for o in self.overrides: o['start'] = datetime.strptime(o['start'], '%Y-%m-%d %H:%M:%S') o['start'] = o['start'].replace(tzinfo=utc) o['start'] = o['start'].astimezone(localtz) o['start'] = o['start'].replace(tzinfo=None) self.valid = True def __call__(self, programme): if not self.valid: return try: start = programme.get('start') stop = programme.get('stop') if ' ' in start: start, offset = start.split(' ') if ' ' in stop: stop = stop.split(' ')[0] start = datetime.strptime(start, TIME_FORMAT) stop = datetime.strptime(stop, TIME_FORMAT) channel = programme.get('channel') except: log.debug('Overrides: Ignoring invalid programme') return for o in self.overrides: if start == o['start'] and channel == o['xmltvid']: log.info('Overrides: Found program on %s at %s', channel, start) if programme.find('previously-shown') is not None: programme.remove(programme.find('previously-shown')) if 'previously_shown' in o and o['previously_shown']: previously_shown = ElementTree.SubElement(programme, 'previously-shown') if 'season' in o and o['season'] and 'episode' in o and o['episode']: if programme.find('episode-num') is not None: programme.remove(programme.find('episode-num')) episode_num = ElementTree.SubElement(programme, 'episode-num') episode_num.set('system', 'xmltv_ns') episode_num.text = '%s.%s.0' % (o['season'] - 1, o['episode'] - 1) if tvdb and 'tvdb_id' in o and o['tvdb_id']: show = tvdb[o['tvdb_id']] try: episode = show[o['season']][o['episode']] except: log.error('Error getting episode %02dx%02d of %s', o['season'], o['episode'], o['tvdb_id']) continue log.info( 'Overrides: Using %s - %02dx%02d - %s', show['seriesname'], int(episode['seasonnumber']), int(episode['episodenumber']), episode['episodename'] ) if 'firstaired' in episode and episode['firstaired']: if programme.find('date') is not None: programme.remove(programme.find('date')) date = ElementTree.SubElement(programme, 'date') date.text = episode['firstaired'].replace('-', '') if programme.find('sub-title') is not None: programme.remove(programme.find('sub-title')) sub_title = ElementTree.SubElement(programme, 'sub-title') sub_title.text = episode['episodename'] if programme.find('desc') is not None: if episode['overview']: programme.find('desc').text = episode['overview'] else: desc = ElementTree.SubElement(programme, 'desc') desc.text = episode['overview'] if 'rating' in episode and episode['rating']: if programme.find('star-rating') is not None: programme.remove(programme.find('star-rating')) rating = ElementTree.SubElement(programme, 'star-rating') value = ElementTree.SubElement(rating, 'value') value.text = '%s/10' % episode['rating'] class PlusOnes(BaseProcessor): def __init__(self): self.xmltvids = None for json_base_url in JSON_BASE_URLS: try: log.debug('PlusOnes: urlopen(%s/plus-ones/+json)' % json_base_url) data = urlopen('%s/plus-ones/+json' % json_base_url).read() except IOError: log.warning('PlusOnes: Fetching data from %s failed.' % json_base_url) else: try: xmltvids = json.loads(data) if self.xmltvids == None: self.xmltvids = xmltvids else: self.xmltvids += xmltvids self.valid = True if log.getEffectiveLevel() >= logging.DEBUG: log.debug('PlusOnes from %s: ' % json_base_url) for xmltvid in xmltvids: log.debug(' ' + xmltvid) except ValueError: log.warning('PlusOnes: JSON parse from %s failed.' % json_base_url) raise def __call__(self, programme): if not self.valid: return if programme.get('channel') in self.xmltvids: previously_shown = ElementTree.SubElement(programme, 'previously-shown') class Movies(BaseProcessor): """ Augment movies with data from themoviedb.com """ def __init__(self): self.cache = {} if not tmdb: log.warning('Movies: TMDB module not found.') self.excludes = [] for json_base_url in JSON_BASE_URLS: try: data = urlopen('%s/movie-channels/+json' % json_base_url).read() except IOError: log.warning('Movies: Fetching channel data from %s failed.' % json_base_url) else: try: self.channels = json.loads(data) except ValueError: log.warning('Movies: Parsing channel data failed.') try: data = urlopen('%s/movie-excludes/+json' % json_base_url).read() except IOError: log.warning('Movies: Fetching exclude data from %s failed.' % json_base_url) else: try: exclude_strings = json.loads(data) for e in exclude_strings: self.excludes.append(re.compile(e)) self.valid = True except ValueError: log.warning('Movies: Parsing exclude data from %s failed.' % json_base_url) def __call__(self, programme): if not self.valid: return try: start = programme.get('start') stop = programme.get('stop') title = programme.find('title').text channel = programme.get('channel') except: log.debug('Movies: Ignoring invalid programme') return if stop is None: return # Unfortunately strptime can't handle numeric timezones so we strip it. # It's only for getting possible movies so won't matter too much. if ' ' in start: start = start.split(' ')[0] if ' ' in stop: stop = stop.split(' ')[0] start_time = time.mktime(time.strptime(start, TIME_FORMAT)) stop_time = time.mktime(time.strptime(stop, TIME_FORMAT)) duration = stop_time - start_time if duration <= 5400 or duration > 14400: # Between 90 mins and 4 hours return if channel not in self.channels: return for regex in self.excludes: if regex.match(title): return log.debug('Movies: Possible movie "%s" (duration %dm)', title, duration/60) movie = None if title in self.cache: if self.cache[title] is None: log.debug('Movies: Cached ignore for "%s"', title) return else: movie = self.cache[title] log.debug('Movies: Cache hit for "%s"', title) else: try: results = tmdb.search(title.replace('?', '')) except: log.exception('Movies: TMDB problem searching') return matches = [] for result in results: if normalise_movie_title(title) == normalise_movie_title(result['name']) and result['language'] == 'en': matches.append(result) log.debug('Movies: Exact title matches: %d', len(matches)) for movie in matches: log.debug('Movies: Found match "%s" (%s)', movie['name'], movie['released']) if len(matches) == 1: try: log.debug('Movies: Cache miss for "%s"', title) movie = tmdb.getMovieInfo(matches[0]['id']) except: log.exception('Movies: TMDB problem fetching info') return self.cache[title] = movie else: self.cache[title] = None return log.info('Movies: Adding info from TMDB for %s', title) show_type = ElementTree.SubElement(programme, 'category') show_type.text = 'movie' if 'categories' in movie and 'genre' in movie['categories']: for c in movie['categories']['genre']: exists = False for old_cat in programme.findall('category'): if old_cat.text == c: exists = True if not exists: category = ElementTree.SubElement(programme, 'category') category.text = c if 'overview' in movie and movie['overview']: if programme.find('desc') is not None: programme.find('desc').text = movie['overview'] else: desc = ElementTree.SubElement(programme, 'desc') desc.text = movie['overview'] if 'url' in movie and movie['url']: if programme.find('url') is not None: programme.find('url').text = movie['url'] else: url = ElementTree.SubElement(programme, 'url') url.text = movie['url'] if 'runtime' in movie and movie['runtime']: if programme.find('length') is not None: programme.remove(programme.find('length')) length = ElementTree.SubElement(programme, 'length') length.set('units', 'minutes') length.text = movie['runtime'] if 'released' in movie and movie['released']: if programme.find('date') is not None: programme.find('date').text = movie['released'].replace('-', '') else: date = ElementTree.SubElement(programme, 'date') date.text = movie['released'].replace('-', '') if 'rating' in movie and movie['rating']: if programme.find('star-rating') is not None: programme.remove(programme.find('star-rating')) rating = ElementTree.SubElement(programme, 'star-rating') value = ElementTree.SubElement(rating, 'value') value.text = '%s/10' % movie['rating'] if 'cast' in movie: if programme.find('credits') is not None: programme.remove(programme.find('credits')) credits = ElementTree.SubElement(programme, 'credits') directors = [] actors = [] if 'director' in movie['cast']: for d in movie['cast']['director']: director = ElementTree.SubElement(credits, 'director') director.text = d['name'] if 'actor' in movie['cast']: for a in movie['cast']['actor']: actor = ElementTree.SubElement(credits, 'actor') actor.text = a['name'] actor.set('role', a['character']) class HD(BaseProcessor): """ Look for a HD note in a description. """ regexes = ( re.compile(r'HD\.?$'), re.compile(r'\(HD\)$'), ) def __call__(self, programme): desc = programme.find('desc') if desc is not None and desc.text: for regex in self.regexes: matched = regex.search(desc.text) if matched: log.debug('HD: Found "%s"', programme.find('title').text) if programme.find('video') is not None: if programme.find('quality') is None: quality = ElementTree.SubElement(programme.find('video'), 'quality') quality.text = 'HDTV' elif programme.find('quality').text != 'HDTV': programme.find('quality').text = 'HDTV' else: video = ElementTree.SubElement(programme, 'video') present = ElementTree.SubElement(video, 'present') present.text = 'yes' aspect = ElementTree.SubElement(video, 'aspect') aspect.text = '16:9' quality = ElementTree.SubElement(video, 'quality') quality.text = 'HDTV' desc.text = regex.sub('', desc.text) class Subtitle(BaseProcessor): """ Look for a subtitle in a description. """ regexes = ( re.compile(r"(Today|Tonight)?:? ?'(?P.*?)'\.\s?"), re.compile(r"'(?P.{2,60}?)\.'\s"), re.compile(r"(?P.{2,60}?):\s"), ) def __call__(self, programme): desc = programme.find('desc') if desc is not None and desc.text: for regex in self.regexes: matched = regex.match(desc.text) if matched and 'subtitle' not in programme: subtitle = ElementTree.SubElement(programme, 'sub-title') subtitle.text = matched.group('subtitle') log.debug('Subtitle: "%s" for "%s"', subtitle.text, programme.find('title').text) desc.text = regex.sub('', desc.text) class SeasonEpisodeFromDesc(BaseProcessor): """ Look for a Season/Episode info in a description. """ regexes = ( re.compile(r'(?i)\s?S\s?(\d+),?\s?Ep?\s?(\d+)'), re.compile(r'(?i)\s?S\s?(\d+),?\s?Episode\s?(\d+)'), ) def __call__(self, programme): desc = programme.find('desc') if desc is not None and desc.text: for regex in self.regexes: matched = regex.search(desc.text) if matched: season, episode = [int(x) for x in matched.groups()] log.info('SeasonEpisodeDesc: Found season %s episode %s for "%s"', season, episode, programme.find('title').text) episode_num = ElementTree.SubElement(programme, 'episode-num') episode_num.set('system', 'xmltv_ns') episode_num.text = '%s.%s.0' % (season - 1, episode - 1) class SeasonEpisodeFromSubtitle(BaseProcessor): """ Look for a Season/Episode info in a subtitle. """ regexes = ( re.compile(r'(?i)\s?S\s?(\d+),?\s?Ep?\s?(\d+)'), re.compile(r'(?i)\s?S\s?(\d+),?\s?Episode\s?(\d+)'), ) def __call__(self, programme): subtitle = programme.find('sub-title') if subtitle is not None and subtitle.text: for regex in self.regexes: matched = regex.search(subtitle.text,) if matched: season, episode = [int(x) for x in matched.groups()] log.info('SeasonEpisodeSubtitle: Found season %s episode %s for "%s"', season, episode, programme.find('title').text) episode_num = ElementTree.SubElement(programme, 'episode-num') episode_num.set('system', 'xmltv_ns') episode_num.text = '%s.%s.0' % (season - 1, episode - 1) class EpisodeFromDesc(BaseProcessor): """ Look for a Episode info in a description. """ regexes = ( re.compile(r'(?i)\s?Ep\.?\s?(\d+)'), re.compile(r'(?i)\s?Episode\.?\s?(\d+)'), ) def __call__(self, programme): desc = programme.find('desc') episode_num = programme.find('episode-num') if episode_num is None: if desc is not None and desc.text: for regex in self.regexes: matched = regex.search(desc.text) if matched: episode = int(matched.group(1)) log.info('EpisodeDesc: Found episode %s for "%s"', episode, programme.find('title').text) episode_num = ElementTree.SubElement(programme, 'episode-num') episode_num.set('system', 'xmltv_ns') episode_num.text = '.%s.0' % (episode - 1) class EpisodeFromSubtitle(BaseProcessor): """ Look for a Episode info in a subtitle. """ regexes = ( re.compile(r'(?i)\s?Ep\.?\s?(\d+)'), re.compile(r'(?i)\s?Episode\.?\s?(\d+)'), ) def __call__(self, programme): subtitle = programme.find('sub-title') episode_num = programme.find('episode-num') if episode_num is None: if subtitle is not None and subtitle.text: for regex in self.regexes: matched = regex.search(subtitle.text) if matched: episode = int(matched.group(1)) log.info('EpisodeSubtitle: Found episode %s for "%s"', episode, programme.find('title').text) episode_num = ElementTree.SubElement(programme, 'episode-num') episode_num.set('system', 'xmltv_ns') episode_num.text = '.%s.0' % (episode - 1) class SeasonFromDesc(BaseProcessor): """ Look for a Season info in a description. """ regexes = ( re.compile(r'(?i)^S\s?(\d+)'), re.compile(r'(?i)\sS\s?(\d+)'), re.compile(r'(?i)\s?Season\s?(\d+)'), ) def __call__(self, programme): desc = programme.find('desc') episode_num = programme.find('episode-num') if episode_num is None: if desc is not None and desc.text: for regex in self.regexes: matched = regex.search(desc.text) if matched: season = int(matched.group(1)) log.info('SeasonDesc: Found season %s for "%s"', season, programme.find('title').text) episode_num = ElementTree.SubElement(programme, 'episode-num') episode_num.set('system', 'xmltv_ns') episode_num.text = '%s..0' % (season - 1) class SeasonFromSubtitle(BaseProcessor): """ Look for a Season info in a subtitle. """ regexes = ( re.compile(r'(?i)^S\s?(\d+)'), re.compile(r'(?i)\sS\s?(\d+)'), re.compile(r'(?i)\s?Season\s?(\d+)'), ) def __call__(self, programme): subtitle = programme.find('sub-title') episode_num = programme.find('episode-num') if episode_num is None: if subtitle is not None and subtitle.text: for regex in self.regexes: matched = regex.search(subtitle.text) if matched: season = int(matched.group(1)) log.info('SeasonSubtitle: Found season %s for "%s"', season, programme.find('title').text) episode_num = ElementTree.SubElement(programme, 'episode-num') episode_num.set('system', 'xmltv_ns') episode_num.text = '%s..0' % (season - 1) class SearchReplaceTitle(BaseProcessor): """ Use a web service to normalise titles. """ def __init__(self): self.replacements = None for json_base_url in JSON_BASE_URLS: try: data = urlopen('%s/title-replacements/+json' % json_base_url).read() except IOError: log.warning('SearchReplaceTitle: Fetching replacements from %s failed.' % json_base_url) else: try: replacements = json.loads(data) if self.replacements == None: self.replacements = replacements else: self.replacements += replacements self.valid = True if log.getEffectiveLevel() >= logging.DEBUG: log.debug('SearchReplaceTitle from %s: ' % json_base_url) for replacement in replacements: log.debug(' ' + str(replacement)) except ValueError: log.warning('SearchReplaceTitle: JSON parse from %s failed.' % json_base_url) def __call__(self, programme): if not self.valid: return for r in self.replacements: old_title = programme.find('title').text if re.match(r['search'], old_title): if r['description_match']: # If there's a description_match then make sure the programme # has a desc and it matches desc = programme.find('desc') if desc is None: continue if not re.match(r['description_match'], desc.text): continue desc.text = re.sub(r['description_match'], '', desc.text) programme.find('title').text = re.sub(r['search'], r['replace'], programme.find('title').text) if old_title != programme.find('title').text: log.info( 'SearchReplaceTitle: Changed from "%s" to "%s"', old_title, programme.find('title').text ) class Categories(BaseProcessor): """ Use a web service to add categories by title. """ def __init__(self): self.categories = None for json_base_url in JSON_BASE_URLS: try: data = urlopen('%s/categories/+json' % json_base_url).read() except IOError: log.warning('Categories: Fetching data from %s failed.' % json_base_url) else: try: categories = json.loads(data) if self.categories == None: self.categories = categories else: self.categories += categories self.valid = True except ValueError: log.warning('Categories: JSON parse from %s failed.' % json_base_url) def __call__(self, programme): if self.valid: for c in self.categories: if 'category' not in c: continue if programme.find('title').text == c['title']: # Remove existing categories for category in programme.findall('category'): programme.remove(category) show_type = ElementTree.SubElement(programme, 'category') show_type.text = c['show_type'] if 'categories' in c: for newcat in c['categories']: category = ElementTree.SubElement(programme, 'category') category.text = newcat log.info( 'Categories: Added categories for "%s"', programme.find('title').text ) class SkyMoviesChannels(BaseProcessor): """ Process Sky Movies channels to put the subtitle data into the description." """ def __init__(self): self.sky_movies_xmltvid_list = None for json_base_url in JSON_BASE_URLS: try: data = urlopen('%s/sky-movies-xmltvids/+json' % json_base_url).read() except IOError: log.warning('SkyMoviesChannels: Fetching data from %s failed.' % json_base_url) else: try: sky_movies_xmltvid_list = json.loads(data) if self.sky_movies_xmltvid_list == None: self.sky_movies_xmltvid_list = sky_movies_xmltvid_list else: self.sky_movies_xmltvid_list += sky_movies_xmltvid_list self.valid = True if log.getEffectiveLevel() >= logging.DEBUG: log.debug('SkyMoviesChannels from %s: ' % json_base_url) for sky_movies_xmltvid in sky_movies_xmltvid_list: log.debug(' ' + sky_movies_xmltvid) except ValueError: log.warning('Categories: JSON parse from %s failed.' % json_base_url) def __call__(self, programme): if not self.valid: return if programme.get('channel') in self.sky_movies_xmltvid_list: subtitle = programme.find('sub-title') if subtitle == None: log.info( 'SkyMoviesChannels: channel=%s title=%s no subtitle', programme.get('channel'), programme.find('title').text ) else: #sys.stderr.write('programme=' + ElementTree.tostring(programme, encoding='utf-8') + '\n') title = programme.find('title').text subtitle = programme.find('sub-title').text premiere = False if title != 'Stay tuned...' and title != 'On DUKE today...': if title[0:9] == 'PREMIERE ': premiere = True title = title[9:] if title[0:10] == 'PREMIERE: ': premiere = True title = title[10:] if title[-3:] == '...': if subtitle[0:3] == '...': subtitle = subtitle[3:] location = subtitle.find('. ') if location != -1: title_second_part = subtitle[0:location] subtitle = subtitle[location+2:] title = title[0:-3] + ' ' + title_second_part programme.find('title').text = title programme.find('desc').text = ('PREMIERE: ' if premiere else '') + subtitle + ' ' + programme.find('desc').text programme.find('sub-title').text = '' log.info( 'SkyMoviesChannels: channel=%s title=%s fixed', programme.get('channel'), programme.find('title').text ) def compare_programme(x, y): """ Comparison helper to sort the children elements of an XMLTV programme tag. """ programme_order = ( 'title', 'sub-title', 'desc', 'credits', 'date', 'category', 'language', 'orig-language', 'length', 'icon', 'url', 'country', 'episode-num', 'video', 'audio', 'previously-shown', 'premiere', 'last-chance', 'new', 'subtitles', 'rating', 'star-rating', ) if programme_order.index(x.tag) < programme_order.index(y.tag): return -1 elif programme_order.index(x.tag) > programme_order.index(y.tag): return 1 else: return 0 def normalise_movie_title(title): """ Normalise titles to help comparisons. """ normalised = title.lower() if normalised.startswith('the '): normalised = normalised[4:] normalised = re.sub('[^a-z ]', '', normalised) normalised = re.sub(' +', ' ', normalised) normalised = normalised.replace(' the ', ' ') return normalised def indent(elem, level=0): """ Make ElementTree output pretty. """ i = "\n" + level * "\t" if len(elem): if not elem.text or not elem.text.strip(): elem.text = i + "\t" if not elem.tail or not elem.tail.strip(): elem.tail = i for elem in elem: indent(elem, level+1) if not elem.tail or not elem.tail.strip(): elem.tail = i else: if level and (not elem.tail or not elem.tail.strip()): elem.tail = i def check_for_updates(): """ Check for script updates. """ if BASE_URL != '': try: url = '%s/xmltv-proc-nz/+json' % BASE_URL data = urlopen(url).read() except IOError: log.warning(f'Update check failed, cannot access {url}') else: try: stats = json.loads(data) except ValueError as e: print(e) log.critical('Version check failed') sys.exit(4) if stats['version'] > VERSION: log.warning( 'A new version (%s) is available at %s (current version %s)', stats['version'], URL, VERSION ) if stats['critical']: log.critical('Version update is critical, exiting') sys.exit(5) if __name__ == '__main__': parser = OptionParser(version='%prog ' + str(VERSION)) parser.set_defaults(debug=False) parser.add_option('--debug', action='store_true', help='output debugging information.') parser.add_option('--verbose', action='store_true', help='output verbose information.') (options, args) = parser.parse_args() if options.verbose: log.setLevel(logging.INFO) if options.debug: log.setLevel(logging.DEBUG) check_for_updates() if sys.stdin.isatty(): if len(args) == 0: log.critical('No input file') sys.exit(2) data = open(args[0], 'rb').read() else: data = sys.stdin.buffer.read() processors = [ PlusOnes(), SearchReplaceTitle(), Subtitle(), Categories(), Movies(), HD(), SeasonEpisodeFromDesc(), SeasonEpisodeFromSubtitle(), EpisodeFromDesc(), EpisodeFromSubtitle(), SeasonFromDesc(), SeasonFromSubtitle(), Overrides(), SkyMoviesChannels() ] tree = ElementTree.XML(data) for processor in processors: for programme in tree.findall('.//programme'): try: processor(programme) except: log.exception("Failed processing with processor: %s", processor) try: processor.post_process(tree) except NotImplementedError: pass except: log.exception("Failed post processing with processor: %s", processor) for programme in tree.findall('.//programme'): programme[:] = sorted(programme, key=cmp_to_key(compare_programme)) indent(tree) print('') print('') print(ElementTree.tostring(tree, encoding='unicode'))