[Libpdtp-svn] r58 - /
Status: Alpha
Brought to you by:
bascule
From: <tar...@pd...> - 2004-12-22 00:00:44
|
Author: tarcieri Date: 2004-12-21 17:01:23 -0700 (Tue, 21 Dec 2004) New Revision: 58 Modified: async.c download.c fileops.c fileops.h socketops.c transaction.c transaction.h Log: Download support improvements Modified: async.c =================================================================== --- async.c 2004-12-21 00:19:43 UTC (rev 57) +++ async.c 2004-12-22 00:01:23 UTC (rev 58) @@ -85,14 +85,9 @@ int n, ret; async_io_parameters_t params = (async_io_parameters_t)arg; -#ifndef NDEBUG - debug("async_read_handler invoked, trying to get %d bytes", params->bytes_total - params->bytes_transferred); -#endif - if((n = pdtp__atomic_readsock(sock, params->buffer + params->bytes_transferred, params->bytes_total - params->bytes_transferred)) < 1) { #ifndef NDEBUG debug("async_read_handler: Read error (%d)", n); - perror("pdtp__atomic_readsock"); #endif pdtp__multiplexer_remove_socket(mplx, sock); ret = params->callback(mplx, sock, 0, -1, params->arg); Modified: download.c =================================================================== --- download.c 2004-12-21 00:19:43 UTC (rev 57) +++ download.c 2004-12-22 00:01:23 UTC (rev 58) @@ -35,6 +35,7 @@ #include "pdtp.h" #include "sockets.h" +#include <assert.h> #include <stdio.h> #include <string.h> @@ -81,19 +82,19 @@ }; /* This structure stores piece transfer initialization parameters */ -typedef struct piece_transfer_parameters { +typedef struct piece_transfer_params { pdtp_download_t dl; uint32_t peer_address; uint16_t opcode; uint32_t piece_index; - unsigned bytes_transferred, bytes_total; + unsigned bytes_transferred, bytes_offset, bytes_total; uint8_t started; -} *piece_transfer_parameters_t; +} *piece_transfer_params_t; /* This structure stores incoming transfer parameters */ typedef struct incoming_connection_parameters { int fd; - piece_transfer_parameters_t params; + piece_transfer_params_t params; } *incoming_connection_parameters_t; /*** PROTOTYPES */ @@ -104,15 +105,24 @@ /* Handle a completed outbound connection */ static int download_connect_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg); -/* Read the response of a host sent a PTPP request */ +/* Read the response of a host sent a PTPP handshake */ static int download_handshake_read(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); -/* Handle the response to a PTPP request */ -static int download_handshake_response_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); +/* Send a piece transfer request after receiving a PTPP response */ +static int download_send_request(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); +/* Upload a piece to the given socket using asynchronous I/O */ +static int upload_piece(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + +/* Download a piece from the given socket using asynchronous I/O */ +static int download_piece(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + +/* Self-invoking callback which transfers a piece via asynchronous I/O */ +static int download_piece_callback(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + /* The high level interface is defined first. Event handlers and actual - mechanics of the download engine follow below as a series of static - functions which are internal to this module. */ + mechanics of the download engine follow below as the series of static + functions whose prototypes are defined above. */ pdtp_download_t pdtp_download_create(pdtp_connection_t conn, const char *path) { unsigned i, piece_count; @@ -212,8 +222,10 @@ void pdtp_download_run(pdtp_download_t dl, pdtp_file_t file) { + int i; pdtp_piecelist_t list; - pdtp_transaction_out_t txn; + pdtp_transaction_out_t out; + pdtp_transaction_in_t in; debug("pdtp_download_run() starting"); @@ -227,15 +239,37 @@ return; } - txn = pdtp__transaction_create(TXN_DOWNLOAD, 2); - pdtp__transaction_add_string(txn, OBJ_PATH, dl->path); - pdtp__piecelist_add_to_transaction(txn, list); + out = pdtp__transaction_create(TXN_DOWNLOAD, 2); + pdtp__transaction_add_string(out, OBJ_PATH, dl->path); + pdtp__piecelist_add_to_transaction(out, list); dl->mplx = pdtp__multiplexer_create(dl->conn); - pdtp__transact_async(dl->conn, dl->mplx, txn, download_transaction_handler, 0, 0); + if(pdtp__transact_async(dl->conn, dl->mplx, out, &in, download_transaction_handler, dl) < 0) { + /* XXX Handle error properly */ +#ifndef DEBUG + debug("Download initialization failed"); +#endif + + if(in) + pdtp__transaction_in_destroy(in); + + return; + } + + /* Since the asynchronous filter was created successfully this should + never fail unless there's a bug in the async transaction code */ + assert((i = pdtp__transaction_object_index(in, OBJ_HANDLE)) == 0); + + /* Extract the file handle from the response transaction */ + assert(pdtp__transaction_object_uint32(in, i, &dl->handle) == 0); + + pdtp__transaction_in_destroy(in); + + /* Run the main multiplexer loop, executing the download */ pdtp__multiplexer_run(dl->mplx); + /* Destroy the multiplexer and its associated filter rules */ pdtp__multiplexer_destroy(dl->mplx); return; @@ -246,7 +280,7 @@ } #if 0 -static void transfer_success_notify(piece_transfer_parameters_t params) +static void transfer_success_notify(piece_transfer_params_t params) { pdtp_transaction_out_t txn; @@ -258,7 +292,7 @@ pdtp__transaction_out_destroy(txn); } -static void transfer_failure_notify(piece_transfer_parameters_t params) +static void transfer_failure_notify(piece_transfer_params_t params) { pdtp_transaction_out_t txn; @@ -281,6 +315,9 @@ pdtp_sock_t sock; pdtp_addr_t addr; + piece_transfer_params_t params; + pdtp_download_t dl = (pdtp_download_t)arg; + #ifndef NDEBUG debug("Download: Incoming transaction (%x)", id); #endif @@ -322,12 +359,24 @@ if(sock < 0) goto err; - pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, download_connect_handler, 0); + params = NEW(piece_transfer_params); + params->started = 0; + params->dl = dl; + params->peer_address = ip; + params->opcode = id; + params->piece_index = piece_index; + params->bytes_transferred = 0; + params->bytes_offset = 0; + /* XXX Initialize this value properly */ + params->bytes_total = dl->piece_length; + + pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, download_connect_handler, params); + return 1; err: #ifndef NDEBUG - debug("An error occured!"); + debug("An error occured processing a download transaction!"); #endif return 0; @@ -345,6 +394,8 @@ #ifndef NDEBUG debug("Error: Connection refused"); #endif + + pdtp__free(arg); pdtp__socket_close(sock); return 1; } @@ -369,43 +420,199 @@ #ifndef NDEBUG debug("Write error sending PTPP handshake"); #endif + + pdtp__free(arg); pdtp__socket_close(sock); return 1; } - pdtp__async_read(mplx, sock, 8, download_handshake_response_handler, arg); + pdtp__async_read(mplx, sock, 8, download_send_request, arg); return 1; } -static int download_handshake_response_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +static int download_send_request(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) { + piece_transfer_params_t params = (piece_transfer_params_t)arg; + pdtp_async_handler_t transfer_function = 0; + uint16_t v16; + uint32_t v32; + uint8_t request[18]; + + if(bytes != 8) + goto err; + + /* Check the magic number */ + if(memcmp(buffer, "PTPP", 4)) + goto err; + + /* Check if the minimum supported protocol version is 1 */ + memcpy(&v16, (uint8_t *)buffer + 4, 2); + + if(htons(v16) != 1) + goto err; + + /* Select the protocol version. Currently hardcoded to 1 */ + v16 = htons(1); + memcpy(request, &v16, 2); + + /* Set operation code. */ + v16 = htons(params->opcode); + memcpy(request + 2, &v16, 2); + + /* Set address of coordinating server */ + v32 = htonl(pdtp__addr_ipv4(params->dl->conn->addr)); + memcpy(request + 4, &v32, 4); + + /* Set port of coordinating server. Initialize properly */ + v16 = htons(pdtp__addr_port(params->dl->conn->addr)); + memcpy(request + 8, &v16, 2); + + /* Set file handle ID. */ + v32 = htonl(params->dl->handle); + memcpy(request + 10, &v32, 4); + + /* Set piece ID. */ + v32 = htonl(params->piece_index); + memcpy(request + 14, &v32, 4); + + switch(params->opcode) { + case TXN_SENDPIECE: + transfer_function = upload_piece; + break; + case TXN_RECVPIECE: + transfer_function = download_piece; + break; + } + + pdtp__async_write(mplx, sock, request, 18, transfer_function, params); + + return 1; + +err: + pdtp__free(arg); + pdtp__socket_close(sock); + return 1; +} + +static int upload_piece(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ #ifndef NDEBUG - debug("Received handshake response"); + debug("Error: Piece uploads are not yet implemented"); #endif - /* XXX Handle read error */ - if(bytes != 8) { -#ifndef NDEBUG - debug("Read error/connection closed"); -#endif - goto err; + return 0; +} + +static int download_piece(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ + piece_transfer_params_t params = (piece_transfer_params_t)arg; + uint8_t piece_request[8]; + uint32_t value; + + if(bytes != 18) { + pdtp__free(arg); + pdtp__socket_close(sock); + + return 1; } - if(memcmp(buffer, "PTPP", 4)) - goto err; + /* Offset of piece chunk to transfer */ + value = htonl(params->bytes_offset); + memcpy(piece_request, &value, 4); -#ifndef NDEBUG - debug("Received valid PTPP response"); -#endif + /* Size of chunk to transfer */ + value = htonl(params->bytes_total); + memcpy(piece_request + 4, &value, 4); + pdtp__async_write(mplx, sock, piece_request, 8, download_piece_callback, arg); + return 1; +} +#if 0 +static int upload_piece_callback(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ + int nbytes; + uint8_t chunk[TRANSFER_CHUNK_SIZE]; + + piece_transfer_params_t params = (piece_transfer_params_t)arg; + + if(bytes < 0) + goto err; + + /* Check and set flag for if the transfer has started */ + if(!params->started) + params->started = 1; + else + params->bytes_transferred += bytes; + + if(params->bytes_transferred == params->bytes_total) { + /* XXX transfer_success_notify(params); */ + goto done; + } + + /* XXX Input validation on the piece index should occur earlier on */ + if(pdtp__file_seek(params->dl->file, (uint64_t)params->dl->piece_length * params->piece_index + params->bytes_transferred) < 0) + goto err; + + /* XXX Implement proper calculation of the number of bytes to read */ + if((nbytes = pdtp__file_read(params->dl->file, chunk, TRANSFER_CHUNK_SIZE)) < 0) + goto err; + + pdtp__async_write(mplx, sock, chunk, nbytes, upload_piece_callback, arg); + + return 1; + err: -#ifndef NDEBUG - debug("Error reading PTPP response"); + /* XXX transfer_failure_notify(params); */ +done: + /* XXX Add support for persistent connections */ + pdtp__free(params); + pdtp__socket_close(sock); + + return 1; +} #endif - - pdtp__socket_close(sock); - return 1; + +static int download_piece_callback(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ + piece_transfer_params_t params = (piece_transfer_params_t)arg; + + if(bytes < 0) + goto err; + + /* XXX Add proper size calculation here */ + if(!params->started) { + params->started = 1; + goto read_next_chunk; + } + + /* XXX Input validation on the piece index should occur earlier on */ + if(pdtp__file_seek(params->dl->file, (uint64_t)params->dl->piece_length * params->piece_index + params->bytes_transferred) < 0) + goto err; + + if(pdtp__file_write(params->dl->file, buffer, bytes) < 0) + goto err; + + params->bytes_transferred += bytes; + + if(params->bytes_transferred == params->bytes_total) { + /* XXX transfer_success_notify(params); */ + goto done; + } + +read_next_chunk: + /* XXX Add proper size calculation here */ + pdtp__async_read(mplx, sock, TRANSFER_CHUNK_SIZE, download_piece_callback, arg); + + goto done; +err: + /* XXX transfer_failure_notify(params); */ +done: + pdtp__multiplexer_remove_socket(mplx, sock); + pdtp__free(params); + pdtp__socket_close(sock); + + return 1; } Modified: fileops.c =================================================================== --- fileops.c 2004-12-21 00:19:43 UTC (rev 57) +++ fileops.c 2004-12-22 00:01:23 UTC (rev 58) @@ -154,7 +154,7 @@ return ret; } -int pdtp__file_write(pdtp_file_t file, void *buf, unsigned int nbytes) +int pdtp__file_write(pdtp_file_t file, const void *buf, unsigned int nbytes) { #ifdef WIN32 DWORD ret; Modified: fileops.h =================================================================== --- fileops.h 2004-12-21 00:19:43 UTC (rev 57) +++ fileops.h 2004-12-22 00:01:23 UTC (rev 58) @@ -21,6 +21,6 @@ int pdtp__file_read(pdtp_file_t file, void *buf, unsigned int nbytes); /** This function writes the given number of bytes (no 64-bit support) */ -int pdtp__file_write(pdtp_file_t file, void *buf, unsigned int nbytes); +int pdtp__file_write(pdtp_file_t file, const void *buf, unsigned int nbytes); #endif Modified: socketops.c =================================================================== --- socketops.c 2004-12-21 00:19:43 UTC (rev 57) +++ socketops.c 2004-12-22 00:01:23 UTC (rev 58) @@ -367,7 +367,7 @@ if(select((int)sock + 1, 0, &fdset, 0, &tv) < 1) return -1; - return send(sock, buf, nbytes, 0); + return pdtp__atomic_writesock(sock, buf, nbytes); } /* Write function. This uses a loop to ensure the data is written completely, Modified: transaction.c =================================================================== --- transaction.c 2004-12-21 00:19:43 UTC (rev 57) +++ transaction.c 2004-12-22 00:01:23 UTC (rev 58) @@ -803,7 +803,7 @@ uint16_t obj_count; txn->serial = htonl(txn->serial); - txn->id = htons(txn->id); + txn->id = htons(txn->id); length = htonl(txn->length); obj_count = htons(txn->obj_count); @@ -820,7 +820,7 @@ DWORD bytes = 0; txn->serial = htonl(txn->serial); - txn->id = htons(txn->id); + txn->id = htons(txn->id); length = htonl(txn->length); obj_count = htons(txn->obj_count); @@ -847,7 +847,7 @@ v32 = htonl(txn->serial); memcpy(buffer + 4, &v32, 4); - + v16 = htons(txn->id); memcpy(buffer + 8, &v16, 2); @@ -881,10 +881,10 @@ pdtp_transaction_in_t pdtp__transact(pdtp_connection_t conn, pdtp_transaction_out_t out, pdtp_read_mode_t mode) { pdtp_transaction_in_t in; - + /* XXX This implementation is temporary as it is incompatible with pdtp__transact_async */ - + out->serial = pdtp__txnmgr_get_serial(conn->txnmgr); pdtp__transaction_write(conn, out); @@ -900,12 +900,12 @@ if(pdtp__transaction_read_to_object(in, in->total_objs - 1) < 0) goto err; } - + return in; err: - /* XXX Mark the connection as bad and return 0. IMPORTANT: Ensure - conn->socket_lock is released before function returns. */ - return 0; + /* XXX Mark the connection as bad and return 0. IMPORTANT: Ensure + conn->socket_lock is released before function returns. */ + return 0; } typedef struct async_transaction_params { @@ -920,18 +920,18 @@ pdtp_transaction_in_t txn; async_transaction_params_t params = (async_transaction_params_t)arg; - pdtp__mutex_lock(params->conn->socket_lock); + pdtp__mutex_lock(params->conn->socket_lock); #ifndef NDEBUG - debug("+++ socket_lock: pdtp__transaction_read_direct()"); + debug("+++ socket_lock: pdtp__transaction_read_direct()"); #endif /* XXX Error handling needed here */ - assert((txn = pdtp__transaction_read_header(params->conn))); + assert((txn = pdtp__transaction_read_header(params->conn))); - if(txn->total_objs) { + if(txn->total_objs) { /* XXX Error handling needed here */ - assert(!pdtp__transaction_read_to_object(txn, txn->total_objs - 1)); - } + assert(!pdtp__transaction_read_to_object(txn, txn->total_objs - 1)); + } params->success_callback(mplx, txn, params->arg); pdtp__transaction_in_destroy(txn); @@ -940,33 +940,38 @@ } /* Asynchronous transaction interface. A connection and transaction are - specified, as well as a multiplexer, success callback, and error callback. + specified, as well as a multiplexer, success callback, and callback argument. This function is currently hardcoded only to support TXN_DOWNLOAD. Passing a transaction of any other type will result in an assertion failure. - The specified transaction will be synchronously sent to the server. If - an error response is received, the specified error callback will be - invoked with the server's error message. The error callback will also - be invoked if the format of the response transaction is wrong. The - response transaction should include one 32-bit unsigned integer object - which will be used as the key for receiving asynchronous transactions. - If a successful response of the proper format is received, then a filter - is created for the given multiplexer which will invoke the specified - success_callback whenever a transaction is received containing the same - object as received in the initial successful transaction response. + The specified transaction will be synchronously sent to the server. If an + error occurs receiving a response, an error response is received, or the + response transaction is malformatted, the function will return -1. + Otherwise, the function will return 0. If a response is successfully + received and the pdtp_transaction_in_t pointer is non-NULL, then the + response will be returned by assigning it to the given object reference. + In order for the filter to be created, he response transaction should + include only a single 32-bit unsigned integer object which will be used as + the key for receiving asynchronous transactions. If a successful response + of the proper format is received, then a filter is created for the given + multiplexer which will invoke the specified success_callback whenever a + transaction is received containing the same object as received in the + initial successful transaction response. + The interface currently specifies no way to destroy filter rules for a given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. - */ -void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) + */ +int pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, pdtp_transaction_in_t *in, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void *arg) { - pdtp_transaction_in_t in; async_transaction_params_t params; - + assert(out->id == TXN_DOWNLOAD); - assert((in = pdtp__transact(conn, out, PDTP_STORE))); + /* If we receive no transaction, return -1 */ + if(!(*in = pdtp__transact(conn, out, PDTP_STORE))) + return -1; /* XXX Need to figure out a way to GC this */ params = NEW(async_transaction_params); @@ -975,5 +980,6 @@ params->arg = arg; pdtp__multiplexer_add_socket(mplx, conn->sock, PDTP_MPLX_READ, async_transaction_handler, params); + + return 0; } - Modified: transaction.h =================================================================== --- transaction.h 2004-12-21 00:19:43 UTC (rev 57) +++ transaction.h 2004-12-22 00:01:23 UTC (rev 58) @@ -24,26 +24,30 @@ pdtp_transaction_in_t pdtp__transact(pdtp_connection_t, pdtp_transaction_out_t, pdtp_read_mode_t); /* Asynchronous transaction interface. A connection and transaction are - specified, as well as a multiplexer, success callback, and error callback. + specified, as well as a multiplexer, success callback, and callback argument. This function is currently hardcoded only to support TXN_DOWNLOAD. Passing a transaction of any other type will result in an assertion failure. - The specified transaction will be synchronously sent to the server. If - an error response is received, the specified error callback will be - invoked with the server's error message. The error callback will also - be invoked if the format of the response transaction is wrong. The - response transaction should include one 32-bit unsigned integer object - which will be used as the key for receiving asynchronous transactions. - If a successful response of the proper format is received, then a filter - is created for the given multiplexer which will invoke the specified - success_callback whenever a transaction is received containing the same - object as received in the initial successful transaction response. + The specified transaction will be synchronously sent to the server. If an + error occurs receiving a response, an error response is received, or the + response transaction is malformatted, the function will return -1. + Otherwise, the function will return 0. If a response is successfully + received and the pdtp_transaction_in_t pointer is non-NULL, then the + response will be returned by assigning it to the given object reference. + + In order for the filter to be created, he response transaction should + include only a single 32-bit unsigned integer object which will be used as + the key for receiving asynchronous transactions. If a successful response + of the proper format is received, then a filter is created for the given + multiplexer which will invoke the specified success_callback whenever a + transaction is received containing the same object as received in the + initial successful transaction response. The interface currently specifies no way to destroy filter rules for a given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. */ -void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); +int pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, pdtp_transaction_in_t *, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void *arg); void pdtp__transaction_in_destroy(pdtp_transaction_in_t); pdtp_transaction_type_t pdtp__transaction_type(pdtp_transaction_in_t); |