[Pypt-offline-general] SF.net SVN: pypt-offline: [184] trunk/pypt_core.py
Status: Beta
Brought to you by:
riteshsarraf
|
From: <rit...@us...> - 2007-08-12 20:31:06
|
Revision: 184
http://pypt-offline.svn.sourceforge.net/pypt-offline/?rev=184&view=rev
Author: riteshsarraf
Date: 2007-08-12 13:31:08 -0700 (Sun, 12 Aug 2007)
Log Message:
-----------
* It was very wrong to put a try: without a proper exception clause. Never do it.
* Fixed TarGzipBZ2_Uncompress() call. It was stupidly wrong written.
* Some string formatting fixes
* Pass syncer a more descriptive variable name and identify whether bug parsing is required or not. For updates, we don't do bug parsing.
* Put the imports more properly. The import doesn't need to be done separately under each if
* Unclutter the code for pypt_magic parsing. Introduced the magic_check_and_uncompress()
* In magic_check_and_uncompress, for bug report files, we don't need to do anything to them and should safely ignore them
Modified Paths:
--------------
trunk/pypt_core.py
Modified: trunk/pypt_core.py
===================================================================
--- trunk/pypt_core.py 2007-08-12 02:55:04 UTC (rev 183)
+++ trunk/pypt_core.py 2007-08-12 20:31:08 UTC (rev 184)
@@ -378,17 +378,17 @@
except ImportError:
return False
- try:
- read_from = bz2.BZ2File(archive_file, 'r')
- except:
- return False
+ #try:
+ read_from = bz2.BZ2File(archive_file, 'r')
+ #except:
+ # return False
- try:
- write_to = open (os.path.join(path, filename), 'wb')
- except:
- return False
+ #try:
+ write_to = open (os.path.join(path, target_file), 'wb')
+ #except:
+ # return False
- if TarGzipBZ2_Uncomprerssed(read_from, write_to) != True:
+ if self.TarGzipBZ2_Uncompress(read_from, write_to) != True:
raise ArchiveError
write_to.close()
read_from.close()
@@ -400,17 +400,17 @@
except ImportError:
return False
- try:
- read_from = gzip.GzipFile(file, 'r')
- except:
- return False
+ #try:
+ read_from = gzip.GzipFile(file, 'r')
+ #except:
+ # return False
- try:
- write_to = open(os.path.join(path,filename), 'wb')
- except:
- return False
+ #try:
+ write_to = open(os.path.join(path,filename), 'wb')
+ #except:
+ # return False
- if TarGzipBZ2_Uncomprerssed(read_from, write_to) != True:
+ if self.TarGzipBZ2_Uncompress(read_from, write_to) != True:
raise ArchiveError
write_to.close()
read_from.close()
@@ -418,10 +418,10 @@
elif archive_type is 3:
# FIXME: This looks odd. Where are we writing to a file ???
- try:
- zip_file = zipfile.ZipFile(file, 'rb')
- except:
- return False
+ #try:
+ zip_file = zipfile.ZipFile(file, 'rb')
+ #except:
+ # return False
for filename in zip_file.namelist():
data = zip_file.read()
@@ -730,7 +730,7 @@
sys.exit(errno)
else:
- log.err("Aieee! I don't understand this errorcode\n" % (errno))
+ log.err("Error: I don't understand this errorcode\n" % (errno))
sys.exit(errno)
def get_pager_cmd(pager_cmd = None):
@@ -819,7 +819,7 @@
os.mkdir("pypt-downloads")
download_path = os.path.abspath("pypt-downloads")
except:
- log.err("Aieeee! I couldn't create a directory")
+ log.err("Error: I couldn't create a directory")
errfunc(1, '')
else:
download_path = os.path.abspath(ArgumentOptions.download_dir)
@@ -1243,18 +1243,27 @@
for error in errlist:
log.err("%s failed.\n" % (error))
-def syncer(install_file_path, target_path, arg_type=None):
- '''Syncer does the work of syncing the downloaded files.
+def syncer(install_file_path, target_path, path_type=None, bug_parse_required=None):
+ '''
+ Syncer does the work of syncing the downloaded files.
It syncs "install_file_path" which could be a valid file path
or a zip archive to "target_path"
- arg_type defines whether install_file_path is a zip file
+ path_type defines whether install_file_path is a zip file
or a folder path
+ # path_type
1 => install_file_path is a File
- 2 => install_file_path is a Folder'''
+ 2 => install_file_path is a Folder
+ '''
archive = Archiver()
+ try:
+ import pypt_magic
+ except ImportError:
+ log.err("Error: Module pypt_magic not found.\n")
+ sys.exit(1)
+
def display_options():
log.msg("(Y) Yes. Proceed with installation\n")
@@ -1275,231 +1284,235 @@
bug_subject = bugs_number[each_bug]
log.msg("%s\t%s\n" % (bug_num, bug_subject) )
- if arg_type == 1:
+ def magic_check_and_uncompress(archive_file=None, target_path=None, filename=None, Mode=None):
+
+ if pypt_magic.file(archive_file) == "application/x-bzip2":
+ retval = archive.decompress_the_file(archive_file, target_path, filename, 1)
+ elif pypt_magic.file(archive_file) == "application/x-gzip":
+ retval = archive.decompress_the_file(archive_file, target_path, filename, 2)
+ elif pypt_magic.file(archive_file) == "application/zip":
+ retval = archive.decompress_the_file(os.path.join(install_file_path, eachfile), target_path, eachfile, 3)
+ elif pypt_magic.file(filename) == "PGP armored data" or pypt_magic.file(filename) == "application/x-dpkg":
+ if os.access(target_path, os.W_OK):
+ shutil.copy(filename, target_path)
+ retval = True
+ else:
+ log.err("ERROR: Cannot write to target path %s.\n" % (target_path) )
+ elif filename.endswith(pypt_bug_file_format):
+ retval = False # We intentionally put the bug report files as not printed.
+ else:
+ log.err("ERROR: I couldn't understand file type %s.\n" % (filename) )
+ if retval is True:
+ log.msg("%s file synced.\n" % (filename))
+ os.unlink(archive_file)
+
+ if path_type == 1:
+
try:
import zipfile
except ImportError:
- log.err("Aieeee! Module zipfile not found.\n")
+ log.err("Error: Module zipfile not found.\n")
sys.exit(1)
- try:
- import pypt_magic
- except ImportError:
- log.err("Aieeee! Module pypt_magic not found.\n")
- sys.exit(1)
-
file = zipfile.ZipFile(install_file_path, "r")
- bugs_number = {}
- for filename in file.namelist():
- if filename.endswith(pypt_bug_file_format):
- bug_subject_file = file.read(filename)
- bug_subject = bug_subject_file.split('\r')
- del bug_subject_file
- for subject in bug_subject:
- if subject.startswith('#'):
- subject = subject.lstrip(subject.split(":")[0])
- break
- bugs_number[filename] = subject
+ if bug_parse_required is True:
- if bugs_number:
- # Display the list of bugs
- list_bugs()
- display_options()
- response = get_response()
-
- while True:
- if response == "?":
- display_options()
- response = get_response()
+ bugs_number = {}
+ for filename in file.namelist():
+ if filename.endswith(pypt_bug_file_format):
+ bug_subject_file = file.read(filename)
+ bug_subject = bug_subject_file.split('\r')
+ del bug_subject_file
+ for subject in bug_subject:
+ if subject.startswith('#'):
+ subject = subject.lstrip(subject.split(":")[0])
+ break
+ bugs_number[filename] = subject
- elif response.startswith('y') or response.startswith('Y'):
+ if bugs_number:
+ # Display the list of bugs
+ list_bugs()
+ display_options()
+ response = get_response()
+
+ while True:
+ if response == "?":
+ display_options()
+ response = get_response()
+
+ elif response.startswith('y') or response.startswith('Y'):
+ for filename in file.namelist():
+
+ data = open(filename, "wb")
+ data.write(file.read(filename))
+ data.close()
+
+ #FIXME: Fix this tempfile feature
+ # Access to the temporary file is not being allowed
+ # It's throwing a Permission denied exception
+ #try:
+ # import tempfile
+ #except ImportError:
+ # sys.stderr.write("Aieeee! Module pypt_magic not found.\n")
+ # sys.exit(1)
+ #data = tempfile.NamedTemporaryFile('wb', -1, '', '', os.curdir)
+ #data.write(file.read(filename))
+ #data = file.read(filename)
+
+ magic_check_and_uncompress(os.path.abspath(filename), target_path, filename)
+ sys.exit(0)
+
+ elif response.startswith('n') or response.startswith('N'):
+ log.err("Exiting gracefully on user request.\n\n")
+ sys.exit(0)
+
+ elif response.isdigit() is True:
+ found = False
+ for full_bug_file_name in bugs_number:
+ if response in full_bug_file_name:
+ bug_file_to_display = full_bug_file_name
+ found = True
+ break
+ if found == False:
+ log.err("Incorrect bug number %s provided.\n" % (response) )
+ response = get_response()
+
+ if found:
+ display_pager = PagerCmd()
+ retval = display_pager.send_to_pager(file.read(bug_file_to_display) )
+ if retval == 1:
+ log.err("Broken pager. Can't display the bug details.\n")
+ # Redisplay the menu
+ # FIXME: See a pythonic possibility of cleaning the screen at this stage
+ response = get_response()
+
+ elif response.startswith('r') or response.startswith('R'):
+ list_bugs()
+ response = get_response()
+
+ else:
+ log.err('Incorrect choice. Exiting\n')
+ sys.exit(1)
+ else:
+ log.msg("Great!!! No bugs found for all the packages that were downloaded.\n")
+ response = raw_input("Continue with Installation. Y/N ?")
+ response = response.rstrip("\r")
+ if response.endswith('y') or response.endswith('Y'):
+ log.verbose("Continuing with syncing the files.\n")
for filename in file.namelist():
data = open(filename, "wb")
data.write(file.read(filename))
data.close()
- #FIXME: Fix this tempfile feature
- # Access to the temporary file is not being allowed
- # It's throwing a Permission denied exception
- #try:
- # import tempfile
- #except ImportError:
- # sys.stderr.write("Aieeee! Module pypt_magic not found.\n")
- # sys.exit(1)
- #data = tempfile.NamedTemporaryFile('wb', -1, '', '', os.curdir)
- #data.write(file.read(filename))
- #data = file.read(filename)
-
- if pypt_magic.file(os.path.abspath(filename)) == "application/x-bzip2":
- archive.decompress_the_file(os.path.abspath(filename), target_path, filename, 1)
- elif pypt_magic.file(os.path.abspath(filename)) == "application/x-gzip":
- archive.decompress_the_file(os.path.abspath(filename), target_path, filename, 2)
- elif pypt_magic.file(filename) == "PGP armored data" or pypt_magic.file(filename) == "application/x-dpkg":
- if os.access(target_path, os.W_OK):
- shutil.copy(filename, target_path)
- log.msg("%s file synced.\n" % (filename))
- os.unlink(filename)
-
- elif response.startswith('n') or response.startswith('N'):
- log.err("Exiting gracefully on user request.\n\n")
+ magic_check_and_uncompress(os.path.abspath(filename), target_path, filename)
+ else:
+ log.msg("Exiting gracefully on user request.\n")
sys.exit(0)
+ elif bug_parse_required is False:
+
+ for filename in file.namelist():
+
+ data = open(filename, "wb")
+ data.write(file.read(filename))
+ data.close()
+
+ #FIXME: Fix this tempfile feature
+ # Access to the temporary file is not being allowed
+ # It's throwing a Permission denied exception
+ #try:
+ # import tempfile
+ #except ImportError:
+ # sys.stderr.write("Aieeee! Module pypt_magic not found.\n")
+ # sys.exit(1)
+ #data = tempfile.NamedTemporaryFile('wb', -1, '', '', os.curdir)
+ #data.write(file.read(filename))
+ #data = file.read(filename)
+ magic_check_and_uncompress(os.path.abspath(filename), target_path, filename)
+ else:
+ log.err("ERROR: Inappropriate argument sent to syncer during data fetch. Do you need to fetch bugs or not?\n")
+ sys.exit(1)
+
+ elif path_type == 2:
+ archive_file_types = ['application/x-bzip2', 'application/gzip', 'application/zip']
+
+ if bug_parse_required is True:
+ bugs_number = []
+ for filename in os.listdir(install_file_path):
+ if filename.endswith(pypt_bug_file_format):
+ bugs_number.append(filename)
- elif response.isdigit() is True:
- found = False
- for full_bug_file_name in bugs_number:
- if response in full_bug_file_name:
- bug_file_to_display = full_bug_file_name
- found = True
- break
- if found == False:
- log.err("Incorrect bug number %s provided.\n" % (response) )
+ if bugs_number:
+ #Give the choice to the user
+ list_bugs()
+ display_options()
+ response = get_response()
+
+ while True:
+ if response == "?":
+ display_options()
response = get_response()
-
- if found:
- display_pager = PagerCmd()
- retval = display_pager.send_to_pager(file.read(bug_file_to_display) )
- if retval == 1:
- log.err("Broken pager. Can't display the bug details.\n")
- # Redisplay the menu
- # FIXME: See a pythonic possibility of cleaning the screen at this stage
+
+ elif response.startswith('y') or response.startswith('Y'):
+
+ for eachfile in os.listdir(install_file_path):
+ archive_type = None
+
+ magic_check_and_uncompress(os.path.abspath(filename), target_path, filename)
+
+ elif response.startswith('n') or response.startswith('N'):
+ log.err("Exiting gracefully on user request.\n\n")
+ sys.exit(0)
+
+ elif response.isdigit() is True:
+ found = False
+ for full_bug_file_name in bugs_number:
+ if response in full_bug_file_name:
+ bug_file_to_display = full_bug_file_name
+ found = True
+ break
+ if found == False:
+ log.err("Incorrect bug number %s provided.\n" % (response) )
+ response = get_response()
+
+ if found:
+ display_pager = PagerCmd()
+ retval = display_pager.send_to_pager(file.read(bug_file_to_display) )
+ if retval == 1:
+ log.err("Broken pager. Can't display the bug details.\n")
+ # Redisplay the menu
+ # FIXME: See a pythonic possibility of cleaning the screen at this stage
+ response = get_response()
+
+ elif response.startswith('r') or response.startswith('R'):
+ list_bugs()
response = get_response()
-
- elif response.startswith('r') or response.startswith('R'):
- list_bugs()
- response = get_response()
-
- else:
- log.err('Incorrect choice. Exiting\n')
- sys.exit(1)
- else:
- log.msg("Great!!! No bugs found for all the packages that were downloaded.\n")
- response = raw_input("Continue with Installation. Y/N ?")
- response = response.rstrip("\r")
- if response.endswith('y') or response.endswith('Y'):
- log.verbose("Continuing with syncing the files.\n")
- for filename in file.namelist():
-
- data = open(filename, "wb")
- data.write(file.read(filename))
- data.close()
-
- if pypt_magic.file(os.path.abspath(filename)) == "application/x-bzip2":
- archive.decompress_the_file(os.path.abspath(filename), target_path, filename, 1)
- elif pypt_magic.file(os.path.abspath(filename)) == "application/x-gzip":
- archive.decompress_the_file(os.path.abspath(filename), target_path, filename, 2)
- elif pypt_magic.file(filename) == "PGP armored data" or pypt_magic.file(filename) == "application/x-dpkg":
- if os.access(target_path, os.W_OK):
- shutil.copy(filename, target_path)
- log.msg("%s file synced.\n" % (filename))
- os.unlink(filename)
+
+ else:
+ log.err('Incorrect choice. Exiting\n')
+ sys.exit(1)
else:
- log.msg("Exiting gracefully on user request.\n")
- sys.exit(0)
+ log.msg("Great!!! No bugs found for all the packages that were downloaded.\n")
+ response = raw_input("Continue with Installation. Y/N?")
+ response = response.rstrip("\r")
- elif arg_type == 2:
- archive_file_types = ['application/x-bzip2', 'application/gzip', 'application/zip']
-
- bugs_number = []
- for filename in os.listdir(install_file_path):
- if filename.endswith(pypt_bug_file_format):
- bugs_number.append(filename)
-
- if bugs_number:
- #Give the choice to the user
- list_bugs()
- display_options()
- response = get_response()
-
- while True:
- if response == "?":
- display_options()
- response = get_response()
-
- elif response.startswith('y') or response.startswith('Y'):
-
+ if response.startswith('y') or response.startswith('Y'):
+
for eachfile in os.listdir(install_file_path):
archive_type = None
- try:
- import pypt_magic
- except ImportError:
- log.err("Aieeee! module not found.\n")
- sys.exit(1)
- if pypt_magic.file(os.path.join(install_file_path, eachfile)) == "application/x-bzip2":
- archive.decompress_the_file(os.path.join(install_file_path, eachfile), target_path, eachfile, 1)
- elif pypt_magic.file(os.path.join(install_file_path, eachfile)) == "application/gzip":
- archive.decompress_the_file(os.path.join(install_file_path, eachfile), target_path, eachfile, 2)
- elif pypt_magic.file(os.path.join(install_file_path, eachfile)) == "application/zip":
- archive.decompress_the_file(os.path.join(install_file_path, eachfile), target_path, eachfile, 3)
- elif pypt_magic.file(os.path.join(install_file_path, eachfile)) == "PGP armored data" or pypt_magic.file(filename) == "application/x-dpkg":
- if os.access(target_path, os.W_OK):
- shutil.copy(os.path.join(install_file_path, eachfile), target_path)
- log.msg("%s file synced.\n" % (eachfile))
- else:
- log.err("Aieeee! I don't understand filetype %s\n" % (eachfile))
-
- elif response.startswith('n') or response.startswith('N'):
- log.err("Exiting gracefully on user request.\n\n")
+ magic_check_and_uncompress(os.path.abspath(filename), target_path, filename)
+ else:
+ log.msg("Exiting gracefully on user request.\n")
sys.exit(0)
+ elif bug_parse_required is False:
+ for eachfile in os.listdir(install_file_path):
+ archive_type = None
- elif response.isdigit() is True:
- found = False
- for full_bug_file_name in bugs_number:
- if response in full_bug_file_name:
- bug_file_to_display = full_bug_file_name
- found = True
- break
- if found == False:
- log.err("Incorrect bug number %s provided.\n" % (response) )
- response = get_response()
-
- if found:
- display_pager = PagerCmd()
- retval = display_pager.send_to_pager(file.read(bug_file_to_display) )
- if retval == 1:
- log.err("Broken pager. Can't display the bug details.\n")
- # Redisplay the menu
- # FIXME: See a pythonic possibility of cleaning the screen at this stage
- response = get_response()
-
- elif response.startswith('r') or response.startswith('R'):
- list_bugs()
- response = get_response()
-
- else:
- log.err('Incorrect choice. Exiting\n')
- sys.exit(1)
+ magic_check_and_uncompress(os.path.abspath(filename), target_path, filename)
else:
- log.msg("Great!!! No bugs found for all the packages that were downloaded.\n")
- response = raw_input("Continue with Installation. Y/N?")
- response = response.rstrip("\r")
-
- if response.startswith('y') or response.startswith('Y'):
-
- for eachfile in os.listdir(install_file_path):
- archive_type = None
- try:
- import pypt_magic
- except ImportError:
- log.err("Aieeee! module not found.\n")
- sys.exit(1)
-
- if pypt_magic.file(os.path.join(install_file_path, eachfile)) == "application/x-bzip2":
- archive.decompress_the_file(os.path.join(install_file_path, eachfile), target_path, eachfile, 1)
- elif pypt_magic.file(os.path.join(install_file_path, eachfile)) == "application/gzip":
- archive.decompress_the_file(os.path.join(install_file_path, eachfile), target_path, eachfile, 2)
- elif pypt_magic.file(os.path.join(install_file_path, eachfile)) == "application/zip":
- archive.decompress_the_file(os.path.join(install_file_path, eachfile), target_path, eachfile, 3)
- elif pypt_magic.file(os.path.join(install_file_path, eachfile)) == "PGP armored data" or pypt_magic.file(filename) == "application/x-dpkg":
- if os.access(target_path, os.W_OK):
- shutil.copy(os.path.join(install_file_path, eachfile), target_path)
- log.msg("%s file synced.\n" % (eachfile))
- else:
- log.err("Aieeee! I don't understand filetype %s\n" % (eachfile))
- else:
- log.msg("Exiting gracefully on user request.\n")
- sys.exit(0)
+ log.err("ERROR: Inappropriate argument sent to syncer during data fetch. Do you need to fetch bugs or not?\n")
+ sys.exit(1)
def main():
'''Here we basically do the sanity checks, some validations
@@ -1693,33 +1706,37 @@
if options.install_update:
#INFO: Comment these lines to do testing on Windows machines too
- if os.geteuid() != 0:
- log.err("\nYou need superuser privileges to execute this option\n")
- sys.exit(1)
+ #try:
+ # if os.geteuid() != 0:
+ # log.err("\nYou need superuser privileges to execute this option\n")
+ # sys.exit(1)
+ #except AttributeError:
+ # log.err("Are you really running the install command on a Debian box?\n")
+ # sys.exit(1)
if os.path.isfile(options.install_update) is True:
# Okay! We're a file. It should be a zip file
- syncer(options, 1)
+ syncer(options.install_update, apt_update_target_path, 1, bug_parse_required = False)
elif os.path.isdir(options.install_update) is True:
# We're a directory
- syncer(options, 2)
+ syncer(options.install_update, apt_update_target_path, 1, bug_parse_required = False)
else:
log.err("%s file not found\n" % (options.install_update))
sys.exit(1)
if options.install_upgrade:
#INFO: Comment these lines to do testing on Windows machines too
- #try:
- # if os.geteuid() != 0:
- # log.err("\nYou need superuser privileges to execute this option\n")
- # sys.exit(1)
- #except AttributeError:
- # log.err("Are you really running the install command on a Debian box?\n")
- # sys.exit(1)
+ try:
+ if os.geteuid() != 0:
+ log.err("\nYou need superuser privileges to execute this option\n")
+ sys.exit(1)
+ except AttributeError:
+ log.err("Are you really running the install command on a Debian box?\n")
+ sys.exit(1)
if os.path.isfile(options.install_upgrade) is True:
- syncer(options.install_upgrade, apt_package_target_path, 1)
+ syncer(options.install_upgrade, apt_package_target_path, 1, bug_parse_required = True)
elif os.path.isdir(options.install_upgrade) is True:
- syncer(options.install_upgrade, apt_package_target_path, 2)
+ syncer(options.install_upgrade, apt_package_target_path, 2, bug_parse_required = True)
else:
log.err("%s file not found\n" % (options.install_upgrade))
sys.exit(1)
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|