Learn how easy it is to sync an existing GitHub or Google Code repo to a SourceForge project! See Demo

Close

Perfomance improvements

bananabr
2014-07-03
2014-07-03
  • bananabr
    bananabr
    2014-07-03

    Hi all,

    First I would like to say that I am no python expert, so please be easy on me if what I did was stupid =]. I am system administrator at a Brazilian company and I am currently looking for an offsite backup solution for one of our linux based applications which is running on AWS. So I read about s3cmd and decided to give it a chance. My first choice was to test the 1.0.1 version, which seems to be the last stable version, but it was tooooo slow. So I decided to take a look at the 1.5rc1 and noticed that it used sub-processing for parallelism, but it still too slow for my use case.

    As I said, I am no python expert, but I am really on parallel programming, so I decided to give it a try and make a few changes to the cmd_sync_local2remote method in the 1.0.1 code.

    My test scenario for comparing my code with the last release was syncing 7 directories with 100 2MB files inside of each to an empty bucket from a t3.micro instance.

    Here is the comparison between the modified 1.0.1 version and the 1.5rc1 original code:

    1.5rc1 - Done. Uploaded 1468006400 bytes in 434.5 seconds, 3.22 MB/s.
    1.0.1mod - Done. Uploaded 1468006400 bytes in 84.1 seconds, 16.65 MB/s

    Here is the code for the modified cmd_sync_local2remote. It still have some improvements to be done like, proper error handling and a configurable/dynamic thread pool size.

    def cmd_sync_local2remote(args):
    def build_attr_header(src):
    import pwd, grp
    attrs = {}
    src = deunicodise(src)
    try:
    st = os.stat_result(os.stat(src))
    except OSError, e:
    raise InvalidFileError(u"%s: %s" % (unicodise(src), e.strerror))
    for attr in cfg.preserve_attrs_list:
    if attr == 'uname':
    try:
    val = pwd.getpwuid(st.st_uid).pw_name
    except KeyError:
    attr = "uid"
    val = st.st_uid
    warning(u"%s: Owner username not known. Storing UID=%d instead." % (unicodise(src), val))
    elif attr == 'gname':
    try:
    val = grp.getgrgid(st.st_gid).gr_name
    except KeyError:
    attr = "gid"
    val = st.st_gid
    warning(u"%s: Owner groupname not known. Storing GID=%d instead." % (unicodise(src), val))
    else:
    val = getattr(st, 'st
    ' + attr)
    attrs[attr] = val
    result = ""
    for k in attrs: result += "%s:%s/" % (k, attrs[k])
    return { 'x-amz-meta-s3cmd-attrs' : result[:-1] }

    s3 = S3(cfg)
    
    if cfg.encrypt:
        error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
        error(u"Either use unconditional 's3cmd put --recursive'")
        error(u"or disable encryption with --no-encrypt parameter.")
        sys.exit(1)
    
    ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
    destination_base_uri = S3Uri(args[-1])
    if destination_base_uri.type != 's3':
        raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
    destination_base = str(destination_base_uri)
    
    local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
    remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
    
    local_count = len(local_list)
    remote_count = len(remote_list)
    
    info(u"Found %d local files, %d remote files" % (local_count, remote_count))
    
    local_list, exclude_list = _filelist_filter_exclude_include(local_list)
    
    if single_file_local and len(local_list) == 1 and len(remote_list) == 1:
        ## Make remote_key same as local_key for comparison if we're dealing with only one file
        remote_list_entry = remote_list[remote_list.keys()[0]]
        # Flush remote_list, by the way
        remote_list = { local_list.keys()[0] : remote_list_entry }
    
    local_list, remote_list, existing_list = _compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True)
    
    local_count = len(local_list)
    remote_count = len(remote_list)
    
    info(u"Summary: %d local files to upload, %d remote files to delete" % (local_count, remote_count))
    
    if local_count > 0:
        ## Populate 'remote_uri' only if we've got something to upload
        if not destination_base.endswith("/"):
            if not single_file_local:
                raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
            local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
        else:
            for key in local_list:
                local_list[key]['remote_uri'] = unicodise(destination_base + key)
    
    if cfg.dry_run:
        for key in exclude_list:
            output(u"exclude: %s" % unicodise(key))
        if cfg.delete_removed:
            for key in remote_list:
                output(u"delete: %s" % remote_list[key]['object_uri_str'])
        for key in local_list:
            output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
    
        warning(u"Exitting now because of --dry-run")
        return
    
    if cfg.delete_removed:
        for key in remote_list:
            uri = S3Uri(remote_list[key]['object_uri_str'])
            s3.object_delete(uri)
            output(u"deleted: '%s'" % uri)
    
    total_size = [0]
    total_elapsed = 0.0
    timestamp_start = time.time()
    seq = [0]
    seq_lock = threading.Lock()
    stdout_lock = threading.Lock()
    file_list = local_list.keys()
    file_list_lock = threading.Lock()
    file_list.sort()
    
    def _proc_file(file):
        seq_lock.acquire()
        seq[0] += 1
        seq_label = "[%d of %d]" % (seq[0], local_count)
        seq_lock.release()
        item = local_list[file]
        src = item['full_name']
        uri = S3Uri(item['remote_uri'])
        extra_headers = copy(cfg.extra_headers)
        try:
            if cfg.preserve_attrs:
                attr_header = _build_attr_header(src)
                stdout_lock.acquire()
                debug(u"attr_header: %s" % attr_header)
                stdout_lock.release()
                extra_headers.update(attr_header)
            response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
        except InvalidFileError, e:
            stdout_lock.acquire()
            warning(u"File can not be uploaded: %s" % e)
            stdout_lock.release()
            return
        except S3UploadError, e:
            stdout_lock.acquire()
            error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
            stdout_lock.release()
            return
        speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
        if not cfg.progress_meter:
            stdout_lock.acquire()
            output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
                (item['full_name_unicode'], uri, response["size"], response["elapsed"],
                speed_fmt[0], speed_fmt[1], seq_label))
            stdout_lock.release()
        total_size[0] += response["size"]
    
    class myThread (threading.Thread):
        def run(self):
            while(file_list):
                file_list_lock.acquire()
                if file_list:
                    f = file_list.pop(0)
                else:
                    file_list_lock.release()
                    break                   
                file_list_lock.release()
                _proc_file(f)
    
    threads = []
    thread_pool = 0
    for f in file_list:
        if thread_pool == 50:
            for t in threads:
                t.start()
            for t in threads:
                t.join()
        threads.append(myThread())
        thread_pool+=1
    
    total_elapsed = time.time() - timestamp_start
    total_speed = total_elapsed and total_size[0]/total_elapsed or 0.0
    speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
    
    # Only print out the result if any work has been done or 
    # if the user asked for verbose output
    outstr = "Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s" % (total_size[0], total_elapsed, speed_fmt[0], speed_fmt[1])
    if total_size > 0:
        output(outstr)
    else:
        info(outstr)
    

    I would really appreciate some feedback on this as I never contributed with any open source project before.

    Thanks in advance,
    Daniel Santos