[Libpdtp-svn] r55 - /
Status: Alpha
Brought to you by:
bascule
From: <tar...@pd...> - 2004-12-20 23:14:44
|
Author: tarcieri Date: 2004-12-20 16:15:23 -0700 (Mon, 20 Dec 2004) New Revision: 55 Modified: Makefile.in download.c multiplexer.c multiplexer.h socketops.c transaction.c transaction.h Log: Reinclusion of old download and asynchronous I/O code, various minor cleanups Modified: Makefile.in =================================================================== --- Makefile.in 2004-12-17 23:31:55 UTC (rev 54) +++ Makefile.in 2004-12-20 23:15:23 UTC (rev 55) @@ -8,6 +8,7 @@ SHLIB_NAME=libpdtp.so.0 OBJS= alloc.o \ + async.o \ collection.o \ connection.o \ directory.o \ Modified: download.c =================================================================== --- download.c 2004-12-17 23:31:55 UTC (rev 54) +++ download.c 2004-12-20 23:15:23 UTC (rev 55) @@ -39,6 +39,7 @@ #include <string.h> #include "alloc.h" +#include "async.h" #include "connection.h" #include "debug.h" #include "errmsg.h" @@ -95,9 +96,20 @@ piece_transfer_parameters_t params; } *incoming_connection_parameters_t; -/* Prototypes */ -static void download_transaction_handler(const pdtp_transaction_in_t, void *arg); +/*** PROTOTYPES */ +/* Handle all incoming transactions relating to the download */ +static int download_transaction_handler(pdtp_multiplexer_t mplx, const pdtp_transaction_in_t, void *arg); + +/* Handle a completed outbound connection */ +static int download_connect_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg); + +/* Read the response of a host sent a PTPP request */ +static int download_handshake_read(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + +/* Handle the response to a PTPP request */ +static int download_handshake_response_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + /* The high level interface is defined first. Event handlers and actual mechanics of the download engine follow below as a series of static functions which are internal to this module. */ @@ -259,13 +271,16 @@ } #endif -static void download_transaction_handler(const pdtp_transaction_in_t txn, void *arg) +static int download_transaction_handler(pdtp_multiplexer_t mplx, const pdtp_transaction_in_t txn, void *arg) { int i; uint16_t id = pdtp__transaction_id(txn); uint32_t ip, piece_index; uint16_t port; + pdtp_sock_t sock; + pdtp_addr_t addr; + #ifndef NDEBUG debug("Download: Incoming transaction (%x)", id); #endif @@ -300,11 +315,93 @@ debug("Getting piece %d from %08x:%d", piece_index, ip, port); #endif - return; + addr = pdtp__ipv4_addr_create(ip, port); + sock = pdtp__nonblock_connect(addr); + pdtp__addr_destroy(addr); + + if(sock < 0) + goto err; + + pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, download_connect_handler, 0); + + return 1; err: #ifndef NDEBUG debug("An error occured!"); #endif - return; + return 0; } + +static int download_connect_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg) +{ + pdtp__multiplexer_remove_socket(mplx, sock); + +#ifndef NDEBUG + debug("Nonblock connect complete!"); +#endif + + if(pdtp__nonblock_connect_complete(sock) < 0) { +#ifndef NDEBUG + debug("Error: Connection refused"); +#endif + pdtp__socket_close(sock); + return 1; + } + +#ifndef NDEBUG + debug("Sending PTPP request..."); +#endif + + pdtp__async_write(mplx, sock, "PTPP", 4, download_handshake_read, arg); + + return 1; +} + +static int download_handshake_read(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ +#ifndef NDEBUG + debug("Handshake request sent!"); +#endif + + /* XXX Handle write error */ + if(bytes < 0) { +#ifndef NDEBUG + debug("Write error sending PTPP handshake"); +#endif + pdtp__socket_close(sock); + return 1; + } + + pdtp__async_read(mplx, sock, 8, download_handshake_response_handler, arg); + + return 1; +} + +static int download_handshake_response_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ +#ifndef NDEBUG + debug("Received handshake response"); +#endif + + /* XXX Handle read error */ + if(bytes < 0) + goto err; + + if(memcmp(buffer, "PTPP", 4)) + goto err; + +#ifndef NDEBUG + debug("Received valid PTPP response"); +#endif + + return 1; + +err: +#ifndef NDEBUG + debug("Error reading PTPP response"); +#endif + + pdtp__socket_close(sock); + return 1; +} Modified: multiplexer.c =================================================================== --- multiplexer.c 2004-12-17 23:31:55 UTC (rev 54) +++ multiplexer.c 2004-12-20 23:15:23 UTC (rev 55) @@ -137,12 +137,12 @@ pdtp__free(mplx); } -void pdtp__multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg) +void pdtp__multiplexer_add_socket(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg) { add_entry(mplx, sock, type, handler, arg); } -void pdtp__multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock) +void pdtp__multiplexer_remove_socket(pdtp_multiplexer_t mplx, pdtp_sock_t sock) { unsigned i; @@ -180,7 +180,7 @@ mplx->current_entry++; - if(!handler(fd, arg)) + if(!handler(mplx, fd, arg)) return; } } Modified: multiplexer.h =================================================================== --- multiplexer.h 2004-12-17 23:31:55 UTC (rev 54) +++ multiplexer.h 2004-12-20 23:15:23 UTC (rev 55) @@ -8,13 +8,13 @@ typedef struct pdtp_multiplexer *pdtp_multiplexer_t; typedef enum { PDTP_MPLX_READ, PDTP_MPLX_WRITE } pdtp_event_t; -typedef int (*pdtp_event_handler_t)(pdtp_sock_t sock, void *arg); +typedef int (*pdtp_event_handler_t)(pdtp_multiplexer_t, pdtp_sock_t, void *arg); pdtp_multiplexer_t pdtp__multiplexer_create(pdtp_connection_t); void pdtp__multiplexer_destroy(pdtp_multiplexer_t); -void pdtp__multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg); -void pdtp__multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock); +void pdtp__multiplexer_add_socket(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg); +void pdtp__multiplexer_remove_socket(pdtp_multiplexer_t mplx, pdtp_sock_t sock); void pdtp__multiplexer_run(pdtp_multiplexer_t mplx); Modified: socketops.c =================================================================== --- socketops.c 2004-12-17 23:31:55 UTC (rev 54) +++ socketops.c 2004-12-20 23:15:23 UTC (rev 55) @@ -388,7 +388,6 @@ return 0; } - int pdtp__writesock_int16(pdtp_sock_t sock, uint16_t value) { uint16_t v = htons(value); @@ -444,3 +443,5 @@ return connection_sock; } + + Modified: transaction.c =================================================================== --- transaction.c 2004-12-17 23:31:55 UTC (rev 54) +++ transaction.c 2004-12-20 23:15:23 UTC (rev 55) @@ -910,12 +910,12 @@ typedef struct async_transaction_params { pdtp_connection_t conn; - void (*success_callback)(const pdtp_transaction_in_t, void *); + int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *); void *arg; } *async_transaction_params_t; /* Callback for receiving transactions asynchronously */ -static int async_transaction_handler(pdtp_sock_t sock, void *arg) +static int async_transaction_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg) { pdtp_transaction_in_t txn; async_transaction_params_t params = (async_transaction_params_t)arg; @@ -933,7 +933,7 @@ assert(!pdtp__transaction_read_to_object(txn, txn->total_objs - 1)); } - params->success_callback(txn, params->arg); + params->success_callback(mplx, txn, params->arg); pdtp__transaction_in_destroy(txn); return 1; @@ -959,7 +959,7 @@ given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. */ -void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, void (*success_callback)(const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) +void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) { pdtp_transaction_in_t in; async_transaction_params_t params; @@ -974,6 +974,6 @@ params->success_callback = success_callback; params->arg = arg; - pdtp__multiplexer_add_sock(mplx, conn->sock, PDTP_MPLX_READ, async_transaction_handler, params); + pdtp__multiplexer_add_socket(mplx, conn->sock, PDTP_MPLX_READ, async_transaction_handler, params); } Modified: transaction.h =================================================================== --- transaction.h 2004-12-17 23:31:55 UTC (rev 54) +++ transaction.h 2004-12-20 23:15:23 UTC (rev 55) @@ -43,7 +43,7 @@ given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. */ -void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, void (*success_callback)(const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); +void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); void pdtp__transaction_in_destroy(pdtp_transaction_in_t); pdtp_transaction_type_t pdtp__transaction_type(pdtp_transaction_in_t); |