From: <lu...@us...> - 2008-11-25 01:15:45
|
Revision: 265 http://s3tools.svn.sourceforge.net/s3tools/?rev=265&view=rev Author: ludvigm Date: 2008-11-25 01:15:39 +0000 (Tue, 25 Nov 2008) Log Message: ----------- * s3/s3.py: improved retrying in send_request() and send_file() Modified Paths: -------------- s3cmd/trunk/ChangeLog s3cmd/trunk/S3/S3.py Modified: s3cmd/trunk/ChangeLog =================================================================== --- s3cmd/trunk/ChangeLog 2008-11-25 00:09:47 UTC (rev 264) +++ s3cmd/trunk/ChangeLog 2008-11-25 01:15:39 UTC (rev 265) @@ -1,5 +1,9 @@ 2008-11-24 Michal Ludvig <mi...@lo...> + * s3/s3.py: improved retrying in send_request() and send_file() + +2008-11-24 Michal Ludvig <mi...@lo...> + * s3cmd, S3/S3.py, NEWS: "s3cmd mv" for moving objects 2008-11-24 Michal Ludvig <mi...@lo...> Modified: s3cmd/trunk/S3/S3.py =================================================================== --- s3cmd/trunk/S3/S3.py 2008-11-25 00:09:47 UTC (rev 264) +++ s3cmd/trunk/S3/S3.py 2008-11-25 01:15:39 UTC (rev 265) @@ -6,6 +6,7 @@ import sys import os, os.path import base64 +import time import md5 import sha import hmac @@ -58,6 +59,9 @@ ## S3 sometimes sends HTTP-307 response redir_map = {} + ## Maximum attempts of re-issuing failed requests + _max_retries = 5 + def __init__(self, config): self.config = config @@ -328,7 +332,11 @@ debug("CreateRequest: resource[uri]=" + resource['uri']) return (method_string, resource, headers) - def send_request(self, request, body = None, retries = 5): + def _fail_wait(self, retries): + # Wait a few seconds. The more it fails the more we wait. + return (self._max_retries - retries + 1) * 3 + + def send_request(self, request, body = None, retries = _max_retries): method_string, resource, headers = request debug("Processing request, please wait...") try: @@ -345,6 +353,8 @@ except Exception, e: if retries: warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) return self.send_request(request, body, retries - 1) else: raise S3RequestError("Request failed for: %s" % resource['uri']) @@ -362,6 +372,8 @@ if retries: warning(u"Retrying failed request: %s" % resource['uri']) warning(unicode(e)) + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) return self.send_request(request, body, retries - 1) else: raise e @@ -371,7 +383,7 @@ return response - def send_file(self, request, file, throttle = 0, retries = 3): + def send_file(self, request, file, throttle = 0, retries = _max_retries): method_string, resource, headers = request size_left = size_total = headers.get("content-length") if self.config.progress_meter: @@ -379,56 +391,67 @@ else: info("Sending file '%s', please wait..." % file.name) timestamp_start = time.time() - conn = self.get_connection(resource['bucket']) - conn.connect() - conn.putrequest(method_string, self.format_uri(resource)) - for header in headers.keys(): - conn.putheader(header, str(headers[header])) - conn.endheaders() + try: + conn = self.get_connection(resource['bucket']) + conn.connect() + conn.putrequest(method_string, self.format_uri(resource)) + for header in headers.keys(): + conn.putheader(header, str(headers[header])) + conn.endheaders() + except Exception, e: + if retries: + warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + # Connection error -> same throttle value + return self.send_file(request, file, throttle, retries - 1) + else: + raise S3UploadError("Request failed for: %s" % resource['uri']) file.seek(0) md5_hash = md5.new() - while (size_left > 0): - debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name)) - data = file.read(self.config.send_chunk) - md5_hash.update(data) + try: + while (size_left > 0): + debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name)) + data = file.read(self.config.send_chunk) + md5_hash.update(data) + if self.config.progress_meter: + progress.update(delta_position = len(data)) + else: + debug("SendFile: Sending %d bytes to the server" % len(data)) + conn.send(data) + + size_left -= len(data) + if throttle: + time.sleep(throttle) + ## Call progress meter from here + debug("Sent %d bytes (%d %% of %d)" % ( + (size_total - size_left), + (size_total - size_left) * 100 / size_total, + size_total)) + md5_computed = md5_hash.hexdigest() + response = {} + http_response = conn.getresponse() + response["status"] = http_response.status + response["reason"] = http_response.reason + response["headers"] = convertTupleListToDict(http_response.getheaders()) + response["data"] = http_response.read() + response["size"] = size_total + conn.close() + except Exception, e: if self.config.progress_meter: - progress.update(delta_position = len(data)) + progress.done("failed") + if retries: + throttle = throttle and throttle * 5 or 0.01 + warning("Request failed: %s (%s)" % (resource['uri'], e)) + warning("Retrying on lower speed (throttle=%0.2f)" % throttle) + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + # Connection error -> same throttle value + return self.send_file(request, file, throttle, retries - 1) else: - debug("SendFile: Sending %d bytes to the server" % len(data)) - try: - conn.send(data) - except Exception, e: - if self.config.progress_meter: - progress.done("failed") - ## When an exception occurs insert a - if retries: - conn.close() - warning("Upload of '%s' failed %s " % (file.name, e)) - throttle = throttle and throttle * 5 or 0.01 - warning("Retrying on lower speed (throttle=%0.2f)" % throttle) - return self.send_file(request, file, throttle, retries - 1) - else: - debug("Giving up on '%s' %s" % (file.name, e)) - raise S3UploadError + debug("Giving up on '%s' %s" % (file.name, e)) + raise S3UploadError("Request failed for: %s" % resource['uri']) - size_left -= len(data) - if throttle: - time.sleep(throttle) - ## Call progress meter from here - debug("Sent %d bytes (%d %% of %d)" % ( - (size_total - size_left), - (size_total - size_left) * 100 / size_total, - size_total)) - md5_computed = md5_hash.hexdigest() - response = {} - http_response = conn.getresponse() - response["status"] = http_response.status - response["reason"] = http_response.reason - response["headers"] = convertTupleListToDict(http_response.getheaders()) - response["data"] = http_response.read() - response["size"] = size_total - conn.close() - timestamp_end = time.time() response["elapsed"] = timestamp_end - timestamp_start response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |