From: Dirk M. <di...@us...> - 2004-07-26 12:46:07
|
Update of /cvsroot/freevo/freevo/lib/pyepg In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv30104 Added Files: .cvsignore __init__.py compat.py config.py epg_types.py epg_xmltv.py xmltv.py Log Message: first draft of pyepg (not integrated in Freevo) --- NEW FILE: config.py --- # configuration file for pyepg __all__ = ['TV_DATEFORMAT', 'TV_TIMEFORMAT', 'TV_DATETIMEFORMAT' ] # The file format version number. It must be updated when incompatible # changes are made to the file format. EPG_VERSION = 5 TV_DATEFORMAT = '%e-%b' # Day-Month: 11-Jun TV_TIMEFORMAT = '%H:%M' # Hour-Minute 14:05 TV_DATETIMEFORMAT = '%A %b %d %I:%M %p' # Thursday September 24 8:54 am # variables needed for compat.py DEBUG = 1 # debug level encoding = 'latin-1' # encoding LOCALE = 'latin-1' # locale setting --- NEW FILE: .cvsignore --- *.pyc *.pyo --- NEW FILE: epg_types.py --- # -*- coding: iso-8859-1 -*- # ----------------------------------------------------------------------- # epg_types.py - This file contains the types for the Freevo Electronic # Program Guide module. # ----------------------------------------------------------------------- # $Id: epg_types.py,v 1.1 2004/07/26 12:45:56 dischi Exp $ # # Notes: # Todo: # # ----------------------------------------------------------------------- # $Log: epg_types.py,v $ # Revision 1.1 2004/07/26 12:45:56 dischi # first draft of pyepg (not integrated in Freevo) # # Revision 1.20 2004/07/10 12:33:41 dischi # header cleanup # # Revision 1.19 2004/07/01 22:49:49 rshortt # Unicode fix. # # Revision 1.18 2004/06/22 01:07:49 rshortt # Move stuff into __init__() and fix a bug for twisted's serialization. # # Revision 1.17 2004/03/05 20:49:11 rshortt # Add support for searching by movies only. This uses the date field in xmltv # which is what tv_imdb uses and is really acurate. I added a date property # to TvProgram for this and updated findMatches in the record_client and # recordserver. # # ----------------------------------------------------------------------- # Freevo - A Home Theater PC framework # Copyright (C) 2002 Krister Lagerstrom, et al. # Please see the file freevo/Docs/CREDITS for a complete list of authors. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MER- # CHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # ----------------------------------------------------------------------- */ import sys import copy import time, os, string import config # Cache variables for last GetPrograms() cache_last_start = None cache_last_stop = None cache_last_chanids = None cache_last_result = None cache_last_time = 0 class TvProgram: def __init__(self): self.channel_id = '' self.title = '' self.desc = '' self.sub_title = '' self.start = 0.0 self.stop = 0.0 self.ratings = {} self.advisories = [] self.categories = [] self.date = None # Due to problems with Twisted's marmalade this should not be changed # to a boolean type. self.scheduled = 0 def __str__(self): bt = time.localtime(self.start) # Beginning time tuple et = time.localtime(self.stop) # End time tuple begins = '%s-%02d-%02d %02d:%02d' % (bt[0], bt[1], bt[2], bt[3], bt[4]) ends = '%s-%02d-%02d %02d:%02d' % (et[0], et[1], et[2], et[3], et[4]) s = '%s to %s %3s %s' % (begins, ends, self.channel_id, self.title) return s def __cmp__(self, other): """ compare function, return 0 if the objects are identical, 1 otherwise """ if not other: return 1 return self.title != other.title or \ self.start != other.start or \ self.stop != other.stop or \ self.channel_id != other.channel_id def getattr(self, attr): """ return the specific attribute as string or an empty string """ if attr == 'start': return Unicode(time.strftime(config.TV_TIMEFORMAT, time.localtime(self.start))) if attr == 'stop': return Unicode(time.strftime(config.TV_TIMEFORMAT, time.localtime(self.stop))) if attr == 'date': return Unicode(time.strftime(config.TV_DATEFORMAT, time.localtime(self.start))) if attr == 'time': return self.getattr('start') + u' - ' + self.getattr('stop') if hasattr(self, attr): return getattr(self,attr) return '' class TvChannel: def __init__(self): self.id = '' self.displayname = '' self.tunerid = '' self.logo = '' self.programs = [] self.times = None def Sort(self): # Sort the programs so that the earliest is first in the list f = lambda a, b: cmp(a.start, b.start) self.programs.sort(f) def __str__(self): s = 'CHANNEL ID %-20s' % self.id if self.programs: s += '\n' for program in self.programs: s += ' ' + String(program) + '\n' else: s += ' NO DATA\n' return s class TvGuide: def __init__(self): # These two types map to the same channel objects self.chan_dict = {} # Channels mapped using the id self.chan_list = [] # Channels, ordered self.EPG_VERSION = config.EPG_VERSION self.timestamp = 0.0 def AddChannel(self, channel): if not self.chan_dict.has_key(channel.id): # Add the channel to both the dictionary and the list. This works # well in Python since they will both point to the same object! self.chan_dict[channel.id] = channel self.chan_list.append(channel) def AddProgram(self, program): # The channel must be present, or the program is # silently dropped if self.chan_dict.has_key(program.channel_id): p = self.chan_dict[program.channel_id].programs if len(p) and p[-1].start < program.stop and p[-1].stop > program.start: # the tv guide is corrupt, the last entry has a stop time higher than # the next start time. Correct that by reducing the stop time of # the last entry if config.DEBUG > 1: print 'wrong stop time: %s' % \ String(self.chan_dict[program.channel_id].programs[-1]) self.chan_dict[program.channel_id].programs[-1].stop = program.start if len(p) and p[-1].start == p[-1].stop: # Oops, something is broken here self.chan_dict[program.channel_id].programs = p[:-1] self.chan_dict[program.channel_id].programs += [program] # Get all programs that occur at least partially between # the start and stop timeframe. # If start is None, get all programs from the start. # If stop is None, get all programs until the end. # The chanids can be used to select only certain channel id's, # all channels are returned otherwise # # The return value is a list of channels (TvChannel) def GetPrograms(self, start = None, stop = None, chanids = None): if start == None: start = 0 if stop == None: stop = 2147483647 # Year 2038 # Return a cached version? global cache_last_start, cache_last_stop, cache_last_chanids global cache_last_time, cache_last_result if (cache_last_start == start and cache_last_stop == stop and cache_last_chanids == chanids and time.time() < cache_last_time): if config.DEBUG > 1: a = cache_last_time - time.time() print 'epg: Returning cached results, valid for %1.1f secs.' % a return cache_last_result[:] # Return a copy channels = [] for chan in self.chan_list: if chanids and (not chan.id in chanids): continue # Copy the channel info c = TvChannel() c.id = chan.id c.displayname = chan.displayname c.tunerid = chan.tunerid c.logo = chan.logo c.times = chan.times # Copy the programs that are inside the indicated time bracket f = lambda p, a=start, b=stop: not (p.start > b or p.stop < a) c.programs = filter(f, chan.programs) channels.append(c) # Update cache variables cache_last_start = start cache_last_stop = stop if chanids: cache_last_chanids = chanids[:] else: cache_last_chanids = None cache_last_timeout = time.time() + 20 cache_last_result = channels[:] # Make a copy in case the caller modifies it if config.DEBUG > 1: print 'epg: Returning new results' return channels def Sort(self): # Sort all channel programs in time order for chan in self.chan_list: chan.Sort() def __str__(self): s = 'XML TV Guide\n' for chan in self.chan_list: s += String(chan) return s --- NEW FILE: xmltv.py --- # # xmltv.py - Python interface to XMLTV format, based on XMLTV.pm # # Copyright (C) 2001 James Oakley # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Notes: # - Uses qp_xml instead of DOM. It's way faster # - Read and write functions use file objects instead of filenames # - Unicode is removed on dictionary keys because the xmlrpclib marshaller # chokes on it. It'll always be Latin-1 anyway... (famous last words) # # Yes, A lot of this is quite different than the Perl module, mainly to keep # it Pythonic # # If you have any trouble: jf...@fu... # # Changes for Freevo: # o change data_format to '%Y%m%d%H%M%S %Z' # o delete all encode, return Unicode # o add except AttributeError: for unhandled elements (line 250ff) from xml.utils import qp_xml from xml.sax import saxutils import string, types, re # The Python-XMLTV version VERSION = "0.5.15" # The date format used in XMLTV date_format = '%Y%m%d%H%M%S %Z' # Note: Upstream xmltv.py uses %z so remember to change that when syncing date_format_notz = '%Y%m%d%H%M%S' # These characters are illegal in XML XML_BADCHARS = re.compile(u'[^\x09\x0A\x0D\x20-\uD7FF\uE000-\uFFFD\u10000-\u10FFFF]') # # Options. They may be overridden after this module is imported # # The extraction process could be simpler, building a tree recursively # without caring about the element names, but it's done this way to allow # special handling for certain elements. If 'desc' changed one day then # ProgrammeHandler.desc() can be modified to reflect it class _ProgrammeHandler: """ Handles XMLTV programme elements """ # # <tv> sub-tags # def title(self, node): return _readwithlang(node) def sub_title(self, node): return _readwithlang(node) def desc(self, node): return _readwithlang(node) def credits(self, node): return _extractNodes(node, self) def date(self, node): return node.textof() def category(self, node): return _readwithlang(node) def language(self, node): return _readwithlang(node) def orig_language(self, node): return _readwithlang(node) def length(self, node): data = {} data['units'] = _getxmlattr(node, u'units') try: length = int(node.textof()) except ValueError: pass data['length'] = length return data def icon(self, node): data = {} for attr in (u'src', u'width', u'height'): if node.attrs.has_key(('', attr)): data[attr] = _getxmlattr(node, attr) return data def url(self, node): return node.textof() def country(self, node): return _readwithlang(node) def episode_num(self, node): system = _getxmlattr(node, u'system') if system == '': system = 'onscreen' return (node.textof(), system) def video(self, node): result = {} for child in node.children: result[child.name] = self._call(child) return result def audio(self, node): result = {} for child in node.children: result[child.name] = self._call(child) return result def previously_shown(self, node): data = {} for attr in (u'start', u'channel'): if node.attrs.has_key(('', attr)): data[attr] = node.attrs[('', attr)] return data def premiere(self, node): return _readwithlang(node) def last_chance(self, node): return _readwithlang(node) def new(self, node): return 1 def subtitles(self, node): data = {} if node.attrs.has_key(('', u'type')): data['type'] = _getxmlattr(node, u'type') for child in node.children: if child.name == u'language': data['language'] = _readwithlang(child) return data def rating(self, node): data = {} data['icon'] = [] if node.attrs.has_key(('', u'system')): data['system'] = node.attrs[('', u'system')] for child in node.children: if child.name == u'value': data['value'] = child.textof() elif child.name == u'icon': data['icon'].append(self.icon(child)) if len(data['icon']) == 0: del data['icon'] return data def star_rating(self, node): data = {} data['icon'] = [] for child in node.children: if child.name == u'value': data['value'] = child.textof() elif child.name == u'icon': data['icon'].append(self.icon(child)) if len(data['icon']) == 0: del data['icon'] return data # # <credits> sub-tags # def actor(self, node): return node.textof() def director(self, node): return node.textof() def writer(self, node): return node.textof() def adapter(self, node): return node.textof() def producer(self, node): return node.textof() def presenter(self, node): return node.textof() def commentator(self, node): return node.textof() def guest(self, node): return node.textof() # # <video> and <audio> sub-tags # def present(self, node): return _decodeboolean(node) def colour(self, node): return _decodeboolean(node) def aspect(self, node): return node.textof() def stereo(self, node): return node.textof() # # Magic # def _call(self, node): try: return getattr(self, string.replace(node.name.encode(), '-', '_'))(node) except NameError: return '**Unhandled Element**' except AttributeError: return '**Unhandled Element**' class _ChannelHandler: """ Handles XMLTV channel elements """ def display_name(self, node): return _readwithlang(node) def icon(self, node): data = {} for attr in (u'src', u'width', u'height'): if node.attrs.has_key(('', attr)): data[attr] = _getxmlattr(node, attr) return data def url(self, node): return node.textof() # # More Magic # def _call(self, node): try: return getattr(self, string.replace(node.name.encode(), '-', '_'))(node) except NameError: return '**Unhandled Element**' # # Some convenience functions, treat them as private # def _extractNodes(node, handler): """ Builds a dictionary from the sub-elements of node. 'handler' should be an instance of a handler class """ result = {} for child in node.children: if not result.has_key(child.name): result[child.name] = [] result[child.name].append(handler._call(child)) return result def _getxmlattr(node, attr): """ If 'attr' is present in 'node', return the value, else return an empty string Yeah, yeah, namespaces are ignored and all that stuff """ if node.attrs.has_key((u'', attr)): return node.attrs[(u'', attr)] else: return u'' def _readwithlang(node): """ Returns a tuple containing the text of a 'node' and the text of the 'lang' attribute """ return (node.textof(), _getxmlattr(node, u'lang')) def _decodeboolean(node): text = node.textof().lower() if text == 'yes': return 1 elif text == 'no': return 0 else: return None def _node_to_programme(node): """ Create a programme dictionary from a qp_xml 'node' """ handler = _ProgrammeHandler() programme = _extractNodes(node, handler) for attr in (u'start', u'channel'): programme[attr] = node.attrs[(u'', attr)] if (u'', u'stop') in node.attrs: programme[u'stop'] = node.attrs[(u'', u'stop')] #else: # Sigh. Make show zero-length. This will allow the show to appear in # searches, but it won't be seen in a grid, if the grid is drawn to # scale #programme[u'stop'] = node.attrs[(u'', u'start')] return programme def _node_to_channel(node): """ Create a channel dictionary from a qp_xml 'node' """ handler = _ChannelHandler() channel = _extractNodes(node, handler) channel['id'] = node.attrs[('', 'id')] return channel def read_programmes(fp): """ Read an XMLTV file and get out the relevant information for each programme. Parameter: file object to read from Returns: list of hashes with start, titles, etc. """ parser = qp_xml.Parser() doc = parser.parse(fp.read()) programmes = [] for node in doc.children: if node.name == u'programme': programmes.append(_node_to_programme(node)) return programmes def read_data(fp): """ Get the source and other info from an XMLTV file. Parameter: filename to read from Returns: dictionary of <tv> attributes """ parser = qp_xml.Parser() doc = parser.parse(fp.read()) attrs = {} for key in doc.attrs.keys(): attrs[key[1]] = doc.attrs[key] return attrs def read_channels(fp): """ Read the channels.xml file and return a list of channel information. """ parser = qp_xml.Parser() doc = parser.parse(fp.read()) channels = [] for node in doc.children: if node.name == u'channel': channels.append(_node_to_channel(node)) return channels class Writer: """ A class for generating XMLTV data **All strings passed to this class must be Unicode, except for dictionary keys** """ def __init__(self, fp, encoding="iso-8859-1", date=None, source_info_url=None, source_info_name=None, generator_info_url=None, generator_info_name=None): """ Arguments: 'fp' -- A File object to write XMLTV data to 'encoding' -- The text encoding that will be used. *Defaults to 'iso-8859-1'* 'date' -- The date this data was generated. *Optional* 'source_info_url' -- A URL for information about the source of the data. *Optional* 'source_info_name' -- A human readable description of 'source_info_url'. *Optional* 'generator_info_url' -- A URL for information about the program that is generating the XMLTV document. *Optional* 'generator_info_name' -- A human readable description of 'generator_info_url'. *Optional* """ self.fp = fp self.encoding = encoding self.date = date self.source_info_url = source_info_url self.source_info_name = source_info_name self.generator_info_url = generator_info_url self.generator_info_name = generator_info_name s = """<?xml version="1.0" encoding="%s"?> <!DOCTYPE tv SYSTEM "xmltv.dtd"> """ % self.encoding # tv start tag s += "<tv" for attr in ('date', 'source_info_url', 'source_info_name', 'generator_info_url', 'generator_info_name'): if attr: s += ' %s="%s"' % (attr, self.__dict__[attr]) s += ">\n" self.fp.write(s) def _validateStructure(self, d): """ Raises 'TypeError' if any strings are not Unicode Argumets: 's' -- A dictionary """ if type(d) == types.StringType: raise TypeError ('All strings, except keys, must be in Unicode. Bad string: %s' % d) elif type(d) == types.DictType: for key in d.keys(): self._validateStructure(d[key]) elif type(d) == types.TupleType or type(d) == types.ListType: for i in d: self._validateStructure(i) def _formatCDATA(self, cdata): """ Returns fixed and encoded CDATA Arguments: 'cdata' -- CDATA you wish to encode """ # Let's do what 4Suite does, and replace bad characters with '?' cdata = XML_BADCHARS.sub(u'?', cdata) return saxutils.escape(cdata).encode(self.encoding) def _formatTag(self, tagname, attrs=None, pcdata=None, indent=4): """ Return a simple tag Arguments: 'tagname' -- Name of tag 'attrs' -- dictionary of attributes 'pcdata' -- Content 'indent' -- Number of spaces to indent """ s = indent*' ' s += '<%s' % tagname if attrs: for key in attrs.keys(): s += ' %s="%s"' % (key, self._formatCDATA(attrs[key])) if pcdata: s += '>%s</%s>\n' % (self._formatCDATA(pcdata), tagname) else: s += '/>\n' return s def end(self): """ Write the end of an XMLTV document """ self.fp.write("</tv>\n") def write_programme(self, programme): """ Write a single XMLTV 'programme' Arguments: 'programme' -- A dict representing XMLTV data """ self._validateStructure(programme) s = ' <programme' # programme attributes for attr in ('start', 'channel'): if programme.has_key(attr): s += ' %s="%s"' % (attr, self._formatCDATA(programme[attr])) else: raise ValueError("'programme' must contain '%s' attribute" % attr) for attr in ('stop', 'pdc-start', 'vps-start', 'showview', 'videoplus', 'clumpidx'): if programme.has_key(attr): s += ' %s="%s"' % (attr, self._formatCDATA(programme[attr])) s += '>\n' # Required children err = 0 if programme.has_key('title'): if len(programme['title']) > 0: for title in programme['title']: if title[1] != u'': attrs = {'lang': title[1]} else: attrs=None s += self._formatTag('title', attrs, title[0]) else: err = 1 else: err = 1 if err: raise ValueError("'programme' must contain at least one 'title' element") # Zero or more children with PCDATA and 'lang' attribute for element in ('sub-title', 'desc', 'category', 'country'): if programme.has_key(element): for item in programme[element]: if item[1] != u'': attrs = {'lang': item[1]} else: attrs=None s += self._formatTag(element, attrs, item[0]) # Zero or one children with PCDATA and 'lang' attribute for element in ('language', 'orig-language', 'premiere', 'last-chance'): if programme.has_key(element): if len(programme[element]) != 1: raise ValueError("Only one '%s' element allowed" % element) if programme[element][0][1] != u'': attrs = {'lang': programme[element][0][1]} else: attrs=None s += self._formatTag(element, attrs, programme[element][0][0]) # Credits if programme.has_key('credits'): s += ' <credits>\n' for credit in ('director', 'actor', 'writer', 'adapter', 'producer', 'presenter', 'commentator', 'guest'): if programme['credits'][0].has_key(credit): for name in programme['credits'][0][credit]: s += self._formatTag(credit, pcdata=name, indent=6) s += ' </credits>\n' # Date if programme.has_key('date'): if len(programme['date']) != 1: raise ValueError("Only one 'date' element allowed") s += self._formatTag('date', pcdata=programme['date'][0]) # Length if programme.has_key('length'): if len(programme['length']) != 1: raise ValueError("Only one 'length' element allowed") s += self._formatTag('length', {'units': programme['length'][0]['units']}, str(programme['length'][0]['length']).decode(self.encoding)) # Icon if programme.has_key('icon'): for icon in programme['icon']: if icon.has_key('src'): s += self._formatTag('icon', icon) else: raise ValueError("'icon' element requires 'src' attribute") # URL if programme.has_key('url'): for url in programme['url']: s += self._formatTag('url', pcdata=url) # Episode-num if programme.has_key('episode-num'): if len(programme['episode-num']) != 1: raise ValueError("Only one 'episode-num' element allowed") s += self._formatTag('episode-num', {'system': programme['episode-num'][0][1]}, programme['episode-num'][0][0]) # Video and audio details for element in ('video', 'audio'): if programme.has_key(element): s += ' <%s>\n' % element for key in programme[element][0]: s += self._formatTag(key, pcdata=str(programme[element][0][key]).decode(self.encoding), indent=6) s += ' </%s>\n' % element # Previously shown if programme.has_key('previously-shown'): s += self._formatTag('previously-shown', programme['previously-shown'][0]) # New if programme.has_key('new'): s += self._formatTag('new') # Subtitles if programme.has_key('subtitles'): s += ' <subtitles' if programme['subtitles'][0].has_key('type'): s += ' type="%s"' % self._formatCDATA(programme['subtitles'][0]['type']) s += '>\n' if programme['subtitles'][0].has_key('language'): if programme['subtitles'][0]['language'][1] != u'': attrs = {'lang': programme['subtitles'][0]['language'][1]} else: attrs = None s += self._formatTag('language', None, programme['subtitles'][0]['language'][0], indent=6) s += ' </subtitles>\n' # Rating and star rating for element in ('rating', 'star-rating'): if programme.has_key(element): s += ' <%s' % element if element == 'rating': if programme[element][0].has_key('system'): s += ' system="%s"' % self._formatCDATA(programme[element][0]['system']) s += '>\n' if programme[element][0].has_key('value'): s += self._formatTag('value', pcdata=programme[element][0]['value'], indent=6) if programme[element][0].has_key('icon'): for icon in programme[element][0]['icon']: s += self._formatTag('icon', icon, indent=6) s += ' </%s>\n' % element # End tag s += ' </programme>\n' self.fp.write(s) def write_channel(self, channel): """ Write a single XMLTV 'channel' Arguments: 'channel' -- A dict representing XMLTV data """ self._validateStructure(channel) s = ' <channel id="%s">\n' % channel['id'] # Write display-name(s) err = 0 if channel.has_key('display-name'): if len(channel['display-name']) > 0: for name in channel['display-name']: if name[1] != u'': attrs = {'lang': name[1]} else: attrs = None s += self._formatTag('display-name', attrs, name[0]) else: err = 1 else: err = 1 if err: raise ValueError("'channel' must contain at least one 'display-name' element") # Icon if channel.has_key('icon'): for icon in channel['icon']: if icon.has_key('src'): s += self._formatTag('icon', icon) else: raise ValueError("'icon' element requires 'src' attribute") # URL if channel.has_key('url'): for url in channel['url']: s += self._formatTag('url', pcdata=url) s += ' </channel>\n' self.fp.write(s) if __name__ == '__main__': # Tests from pprint import pprint from StringIO import StringIO import sys # An example file xmldata = StringIO("""<?xml version="1.0" encoding="iso-8859-1"?> <!DOCTYPE tv SYSTEM "xmltv.dtd"> <tv date="20030811003608 -0300" source-info-url="http://www.funktronics.ca/python-xmltv" source-info-name="Funktronics" generator-info-name="python-xmltv" generator-info-url="http://www.funktronics.ca/python-xmltv"> <channel id="C10eltv.zap2it.com"> <display-name>Channel 10 ELTV</display-name> <url>http://www.eastlink.ca/</url> </channel> <channel id="C11cbht.zap2it.com"> <display-name lang="en">Channel 11 CBHT</display-name> <icon src="http://tvlistings2.zap2it.com/tms_network_logos/cbc.gif"/> </channel> <programme start="20030702000000 ADT" channel="C23robtv.zap2it.com" stop="20030702003000 ADT"> <title>This Week in Business</title> <category>Biz</category> <category>Fin</category> <date>2003</date> <audio> <stereo>stereo</stereo> </audio> </programme> <programme start="20030702000000 ADT" channel="C36wuhf.zap2it.com" stop="20030702003000 ADT"> <title>Seinfeld</title> <sub-title>The Engagement</sub-title> <desc>In an effort to grow up, George proposes marriage to former girlfriend Susan.</desc> <category>Comedy</category> <country>USA</country> <language>English</language> <orig-language>English</orig-language> <premiere lang="en">Not really. Just testing</premiere> <last-chance>Hah!</last-chance> <credits> <actor>Jerry Seinfeld</actor> <producer>Larry David</producer> </credits> <date>1995</date> <length units="minutes">22</length> <episode-num system="xmltv_ns">7 . 1 . 1/1</episode-num> <video> <colour>1</colour> <present>1</present> <aspect>4:3</aspect> </video> <audio> <stereo>stereo</stereo> </audio> <previously-shown start="19950921103000 ADT" channel="C12whdh.zap2it.com"/> <new/> <subtitles type="teletext"> <language>English</language> </subtitles> <rating system="VCHIP"> <value>PG</value> <icon src="http://some.ratings/PGicon.png" width="64" height="64"/> </rating> <star-rating> <value>4/5</value> <icon src="http://some.star/icon.png" width="32" height="32"/> </star-rating> </programme> </tv> """) pprint(read_data(xmldata)) xmldata.seek(0) pprint(read_channels(xmldata)) xmldata.seek(0) pprint(read_programmes(xmldata)) # Test the writer programmes = [{'audio': [{'stereo': u'stereo'}], 'category': [(u'Biz', u''), (u'Fin', u'')], 'channel': u'C23robtv.zap2it.com', 'date': [u'2003'], 'start': u'20030702000000 ADT', 'stop': u'20030702003000 ADT', 'title': [(u'This Week in Business', u'')]}, {'audio': [{'stereo': u'stereo'}], 'category': [(u'Comedy', u'')], 'channel': u'C36wuhf.zap2it.com', 'country': [(u'USA', u'')], 'credits': [{'producer': [u'Larry David'], 'actor': [u'Jerry Seinfeld']}], 'date': [u'1995'], 'desc': [(u'In an effort to grow up, George proposes marriage to former girlfriend Susan.', u'')], 'episode-num': [(u'7 . 1 . 1/1', u'xmltv_ns')], 'language': [(u'English', u'')], 'last-chance': [(u'Hah!', u'')], 'length': [{'units': u'minutes', 'length': 22}], 'new': [1], 'orig-language': [(u'English', u'')], 'premiere': [(u'Not really. Just testing', u'en')], 'previously-shown': [{'channel': u'C12whdh.zap2it.com', 'start': u'19950921103000 ADT'}], 'rating': [{'icon': [{'height': u'64', 'src': u'http://some.ratings/PGicon.png', 'width': u'64'}], 'system': u'VCHIP', 'value': u'PG'}], 'star-rating': [{'icon': [{'height': u'32', 'src': u'http://some.star/icon.png', 'width': u'32'}], 'value': u'4/5'}], 'start': u'20030702000000 ADT', 'stop': u'20030702003000 ADT', 'sub-title': [(u'The Engagement', u'')], 'subtitles': [{'type': u'teletext', 'language': (u'English', u'')}], 'title': [(u'Seinfeld', u'')], 'video': [{'colour': 1, 'aspect': u'4:3', 'present': 1}]}] channels = [{'display-name': [(u'Channel 10 ELTV', u'')], 'id': u'C10eltv.zap2it.com', 'url': [u'http://www.eastlink.ca/']}, {'display-name': [(u'Channel 11 CBHT', u'en')], 'icon': [{'src': u'http://tvlistings2.zap2it.com/tms_network_logos/cbc.gif'}], 'id': u'C11cbht.zap2it.com'}] w = Writer(sys.stdout, encoding="iso-8859-1", date="20030811003608 -0300", source_info_url="http://www.funktronics.ca/python-xmltv", source_info_name="Funktronics", generator_info_name="python-xmltv", generator_info_url="http://www.funktronics.ca/python-xmltv") for c in channels: w.write_channel(c) for p in programmes: w.write_programme(p) w.end() --- NEW FILE: compat.py --- import traceback import __builtin__ import config try: # this should crash when not running with Freevo # XXX check if util is freevo.util import util # load util for String() and Unicode() if config.DEBUG: print 'compat.py: freevo detected, auto configure' for i in config.__all__ + [ 'encoding', 'LOCALE' ]: setattr(config, i, getattr(util.config, i)) if util.config.DEBUG > config.DEBUG: config.DEBUG = util.config.DEBUG except: if config.DEBUG: print "compat.py: starting compat lib" def _debug_(s, level=1): if config.DEBUG < level: return # add the current trace to the string where = traceback.extract_stack(limit = 2)[0] if isinstance(s, unicode): s = s.encode(config.encoding, 'replace') s = '%s (%s): %s' % (where[0][where[0].rfind('/')+1:], where[1], s) # print debug message print s def Unicode(string, encoding=config.encoding): if string.__class__ == str: try: return unicode(string, encoding) except Exception, e: try: return unicode(string, config.LOCALE) except Exception, e: print 'Error: Could not convert %s to unicode' % repr(string) print 'tried encoding %s and %s' % (encoding, config.LOCALE) print e elif string.__class__ != unicode: return unicode(str(string), config.LOCALE) return string def String(string, encoding=config.encoding): if string.__class__ == unicode: return string.encode(encoding, 'replace') if string.__class__ != str: try: return str(string) except: return unicode(string).encode(encoding, 'replace') return string __builtin__.__dict__['Unicode'] = Unicode __builtin__.__dict__['String'] = String __builtin__.__dict__['_debug_'] = _debug_ try: foo = _('test string') except: __builtin__.__dict__['_']= lambda m: m --- NEW FILE: epg_xmltv.py --- # -*- coding: iso-8859-1 -*- # ----------------------------------------------------------------------- # epg_xmltv.py - Freevo Electronic Program Guide module for XMLTV # ----------------------------------------------------------------------- # $Id: epg_xmltv.py,v 1.1 2004/07/26 12:45:56 dischi Exp $ # # Notes: # Todo: # # ----------------------------------------------------------------------- # $Log: epg_xmltv.py,v $ # Revision 1.1 2004/07/26 12:45:56 dischi # first draft of pyepg (not integrated in Freevo) # # Revision 1.53 2004/07/11 12:25:44 dischi # fix bad German titles # # Revision 1.52 2004/07/10 12:33:41 dischi # header cleanup # # Revision 1.51 2004/06/23 20:22:19 dischi # fix popup crash # # Revision 1.50 2004/06/22 01:10:21 rshortt # Add ratings and advisories. # # ----------------------------------------------------------------------- # Freevo - A Home Theater PC framework # Copyright (C) 2002 Krister Lagerstrom, et al. # Please see the file freevo/Docs/CREDITS for a complete list of authors. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MER- # CHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # ----------------------------------------------------------------------- */ import sys import time import os import traceback import calendar import shutil import _strptime as strptime # The XMLTV handler from openpvr.sourceforge.net import xmltv as xmltv # The EPG data types. They need to be in an external module in order for # pickling to work properly when run from inside this module and from the # tv.py module. import epg_types as epg_types EPG_TIME_EXC = _('Time conversion error') cached_guide = None def list_channels(XMLTV_FILE, TV_CHANNELS=None, verbose=True): if TV_CHANNELS: return TV_CHANNELS if not os.path.isfile(XMLTV_FILE): return [] if verbose: print 'epg_xmltv.py: Adding all channels' xmltv_channels = None f = open(XMLTV_FILE) xmltv_channels = xmltv.read_channels(f) f.close() # Was the guide read successfully? if not xmltv_channels: return [] TV_CHANNELS = [] for chan in xmltv_channels: id = chan['id'].encode('latin-1', 'ignore') c = epg_types.TvChannel() c.id = id if ' ' in id: # Assume the format is "TUNERID CHANNELNAME" c.displayname = id.split()[1] # XXX Educated guess c.tunerid = id.split()[0] # XXX Educated guess else: displayname = chan['display-name'][0][0] if ' ' in displayname: c.displayname = displayname.split()[1] c.tunerid = displayname.split()[0] else: c.displayname = displayname c.tunerid = _('REPLACE WITH TUNERID FOR %s') % displayname TV_CHANNELS.append((c.displayname, c.id, c.tunerid)) return TV_CHANNELS def load_guide(XMLTV_FILE, TV_CHANNELS=None, verbose=True): """ Load a guide from the raw XMLTV file using the xmltv.py support lib. Returns a TvGuide or None if an error occurred """ # Create a new guide guide = epg_types.TvGuide() # Is there a file to read from? if os.path.isfile(XMLTV_FILE): gotfile = 1 guide.timestamp = os.path.getmtime(XMLTV_FILE) else: print 'XMLTV file (%s) missing!' % XMLTV_FILE gotfile = 0 if not TV_CHANNELS: # get channel listing TV_CHANNELS = list_channels(XMLTV_FILE, verbose=True) if not TV_CHANNELS: # still no listing? return None # Add the channels that are in the config list, or all if the # list is empty if verbose: print 'epg_xmltv.py: Only adding channels in list' for data in TV_CHANNELS: (id, disp, tunerid) = data[:3] c = epg_types.TvChannel() c.id = id c.displayname = disp c.tunerid = tunerid # Handle the optional time-dependent station info c.times = [] if len(data) > 3 and len(data[3:4]) == 3: for (days, start_time, stop_time) in data[3:4]: c.times.append((days, int(start_time), int(stop_time))) guide.AddChannel(c) xmltv_programs = None if gotfile: if verbose: print 'reading xmltv data' f = open(XMLTV_FILE) xmltv_programs = xmltv.read_programmes(f) f.close() # Was the guide read successfully? if not xmltv_programs: return guide # Return the guide, it has the channels at least... needed_ids = [] for chan in guide.chan_dict: needed_ids.append(chan) if verbose: print 'creating guide for %s' % needed_ids for p in xmltv_programs: if not p['channel'] in needed_ids: continue try: prog = epg_types.TvProgram() prog.channel_id = p['channel'] prog.title = Unicode(p['title'][0][0]) if p.has_key('date'): prog.date = Unicode(p['date'][0][0]) if p.has_key('category'): prog.categories = [ cat[0] for cat in p['category'] ] if p.has_key('rating'): for r in p['rating']: if r.get('system') == 'advisory': prog.advisories.append(String(r.get('value'))) continue prog.ratings[String(r.get('system'))] = String(r.get('value')) if p.has_key('desc'): # prog.desc = Unicode(util.format_text(p['desc'][0][0])) pass if p.has_key('sub-title'): prog.sub_title = p['sub-title'][0][0] try: prog.start = timestr2secs_utc(p['start']) try: prog.stop = timestr2secs_utc(p['stop']) except: # Fudging end time prog.stop = timestr2secs_utc(p['start'][0:8] + '235900' + \ p['start'][14:18]) except EPG_TIME_EXC: continue # fix bad German titles to make favorites working if prog.title.endswith('. Teil'): prog.title = prog.title[:-6] if prog.title.rfind(' ') > 0: try: part = int(prog.title[prog.title.rfind(' ')+1:]) prog.title = prog.title[:prog.title.rfind(' ')].rstrip() if prog.sub_title: prog.sub_title = u'Teil %s: %s' % (part, prog.sub_title) else: prog.sub_title = u'Teil %s' % part except Exception, e: print e guide.AddProgram(prog) except: traceback.print_exc() print 'Error in tv guide, skipping' guide.Sort() return guide def timestr2secs_utc(timestr): """ Convert a timestring to UTC (=GMT) seconds. The format is either one of these two: '20020702100000 CDT' '200209080000 +0100' """ # This is either something like 'EDT', or '+1' try: tval, tz = timestr.split() except ValueError: tval = timestr tz = str(-time.timezone/3600) if tz == 'CET': tz='+1' # Is it the '+1' format? if tz[0] == '+' or tz[0] == '-': tmTuple = ( int(tval[0:4]), int(tval[4:6]), int(tval[6:8]), int(tval[8:10]), int(tval[10:12]), 0, -1, -1, -1 ) secs = calendar.timegm( tmTuple ) adj_neg = int(tz) >= 0 try: min = int(tz[3:5]) except ValueError: # sometimes the mins are missing :-( min = 0 adj_secs = int(tz[1:3])*3600+ min*60 if adj_neg: secs -= adj_secs else: secs += adj_secs else: # No, use the regular conversion ## WARNING! BUG HERE! # The line below is incorrect; the strptime.strptime function doesn't # handle time zones. There is no obvious function that does. Therefore # this bug is left in for someone else to solve. try: secs = time.mktime(strptime.strptime(timestr, xmltv.date_format)) except ValueError: timestr = timestr.replace('EST', '') secs = time.mktime(strptime.strptime(timestr, xmltv.date_format)) return secs --- NEW FILE: __init__.py --- import compat import epg_xmltv def list_channels(data = 'tmp/TV.xml'): return epg_xmltv.list_channels(data, verbose=True) def load(data = 'tmp/TV.xml'): return epg_xmltv.load_guide(data, verbose=True) |