libpdtp-svn Mailing List for libpdtp
Status: Alpha
Brought to you by:
bascule
You can subscribe to this list here.
2004 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
(3) |
Nov
(3) |
Dec
(13) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2005 |
Jan
(6) |
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tar...@pd...> - 2005-01-12 00:16:59
|
Author: tarcieri Date: 2005-01-11 17:18:20 -0700 (Tue, 11 Jan 2005) New Revision: 66 Modified: download.c Log: Fixes to piece length calculation logic Modified: download.c =================================================================== --- download.c 2005-01-12 00:10:38 UTC (rev 65) +++ download.c 2005-01-12 00:18:20 UTC (rev 66) @@ -375,7 +375,7 @@ piece_count++; /* Calculate length of piece to transfer */ - if(piece_index < piece_count) + if(piece_index < piece_count - 1) params->bytes_total = dl->piece_length; else params->bytes_total = dl->length % dl->piece_length; |
From: <tar...@pd...> - 2005-01-12 00:09:23
|
Author: tarcieri Date: 2005-01-11 17:10:38 -0700 (Tue, 11 Jan 2005) New Revision: 65 Modified: download.c Log: Fixes to piece length calculation logic Modified: download.c =================================================================== --- download.c 2005-01-10 21:53:35 UTC (rev 64) +++ download.c 2005-01-12 00:10:38 UTC (rev 65) @@ -371,14 +371,16 @@ /* Calculate number of pieces in file */ piece_count = dl->length / dl->piece_length; - if(dl->length % dl->piece_length) + if(dl->length % dl->piece_length) { piece_count++; - /* Calculate length of piece to transfer */ - if(piece_index < piece_count - 1) - params->bytes_total = dl->piece_length; - else - params->bytes_total = dl->length % dl->piece_length; + /* Calculate length of piece to transfer */ + if(piece_index < piece_count) + params->bytes_total = dl->piece_length; + else + params->bytes_total = dl->length % dl->piece_length; + } else + params->bytes_total = dl->piece_length; pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, download_connect_handler, params); |
From: <tar...@pd...> - 2005-01-10 21:52:16
|
Author: tarcieri Date: 2005-01-10 14:53:35 -0700 (Mon, 10 Jan 2005) New Revision: 64 Modified: configure Log: OpenSSL detection fixes Modified: configure =================================================================== --- configure 2005-01-04 02:08:49 UTC (rev 63) +++ configure 2005-01-10 21:53:35 UTC (rev 64) @@ -12,8 +12,9 @@ if [ "$OPENSSL_BASE" = "" ]; then printf "couldn't find OpenSSL!\n" - printf "Please make sure you have the OpenSSL libraries and headers installed" - printf "If you do, set the OPENSSL_BASE environment variable to the correct base path" + printf "Please make sure you have the OpenSSL libraries and headers installed.\n" + printf "If you do, set the OPENSSL_BASE environment variable to the correct base path.\n" + exit 1 fi fi |
From: <tar...@pd...> - 2005-01-04 02:08:12
|
Author: tarcieri Date: 2005-01-03 19:08:49 -0700 (Mon, 03 Jan 2005) New Revision: 63 Modified: configure Log: Fixed OpenSSL location script bug Modified: configure =================================================================== --- configure 2005-01-03 23:13:29 UTC (rev 62) +++ configure 2005-01-04 02:08:49 UTC (rev 63) @@ -10,7 +10,7 @@ if [ "$OPENSSL_BASE" = "" ]; then OPENSSL_BASE=`./find-openssl.sh` - if [ $OPENSSL_BASE = "" ]; then + if [ "$OPENSSL_BASE" = "" ]; then printf "couldn't find OpenSSL!\n" printf "Please make sure you have the OpenSSL libraries and headers installed" printf "If you do, set the OPENSSL_BASE environment variable to the correct base path" |
From: <tar...@pd...> - 2005-01-03 23:13:00
|
Author: tarcieri Date: 2005-01-03 16:13:29 -0700 (Mon, 03 Jan 2005) New Revision: 62 Modified: download.c Log: Readded success and failure reporting Modified: download.c =================================================================== --- download.c 2005-01-03 20:41:37 UTC (rev 61) +++ download.c 2005-01-03 23:13:29 UTC (rev 62) @@ -281,7 +281,6 @@ { } -#if 0 static void transfer_success_notify(piece_transfer_params_t params) { pdtp_transaction_out_t txn; @@ -305,7 +304,6 @@ pdtp__transaction_write(params->dl->conn, txn); pdtp__transaction_out_destroy(txn); } -#endif static int download_transaction_handler(pdtp_multiplexer_t mplx, const pdtp_transaction_in_t txn, void *arg) { @@ -405,6 +403,7 @@ #ifndef NDEBUG debug("Error: Connection refused"); #endif + transfer_failure_notify(arg); pdtp__free(arg); pdtp__socket_close(sock); @@ -431,6 +430,7 @@ #ifndef NDEBUG debug("Write error sending PTPP handshake"); #endif + transfer_failure_notify(arg); pdtp__free(arg); pdtp__socket_close(sock); @@ -501,6 +501,8 @@ return 1; err: + transfer_failure_notify(arg); + pdtp__free(arg); pdtp__socket_close(sock); return 1; @@ -521,7 +523,10 @@ uint8_t piece_request[8]; uint32_t value; + /* Check to see length of response was sufficient */ if(bytes != 18) { + transfer_failure_notify(arg); + pdtp__free(arg); pdtp__socket_close(sock); @@ -618,7 +623,7 @@ params->bytes_transferred += bytes; if(params->bytes_transferred == params->bytes_total) { - /* XXX transfer_success_notify(params); */ + transfer_success_notify(params); goto done; } @@ -641,9 +646,8 @@ #ifndef DEBUG debug("Transfer failure!"); #endif - /* XXX transfer_failure_notify(params); */ + transfer_failure_notify(params); done: - pdtp__multiplexer_remove_socket(mplx, sock); pdtp__free(params); pdtp__socket_close(sock); |
From: <tar...@pd...> - 2005-01-03 20:41:01
|
Author: tarcieri Date: 2005-01-03 13:41:37 -0700 (Mon, 03 Jan 2005) New Revision: 61 Modified: download.c piecelist.c transaction.c Log: Download, transaction handling, and piece list bugfixes Modified: download.c =================================================================== --- download.c 2004-12-23 08:42:38 UTC (rev 60) +++ download.c 2005-01-03 20:41:37 UTC (rev 61) @@ -1,6 +1,6 @@ /* download.c - High level download control interface - Copyright (C)2003-04 Anthony Arcieri + Copyright (C)2003-05 Anthony Arcieri All rights reserved. Redistribution and use in source and binary forms, with or without Modified: piecelist.c =================================================================== --- piecelist.c 2004-12-23 08:42:38 UTC (rev 60) +++ piecelist.c 2005-01-03 20:41:37 UTC (rev 61) @@ -1,6 +1,6 @@ /* piecelist.c - Piece list generation and management code - Copyright (C)2003-04 Anthony Arcieri + Copyright (C)2003-05 Anthony Arcieri All rights reserved. Redistribution and use in source and binary forms, with or without @@ -37,6 +37,7 @@ #include <string.h> #include "alloc.h" +#include "debug.h" #include "fileops.h" #include "md5.h" #include "piecelist.h" @@ -46,17 +47,21 @@ #define CHECKSUM_SIZE 16 typedef struct pdtp_piecelistnode { - int start, end; + unsigned start, end; struct pdtp_piecelistnode *next; } *pdtp_piecelistnode_t; struct pdtp_piecelist { pdtp_piecelistnode_t head, tail; - int count; + unsigned count; }; -static void piecelist_append(pdtp_piecelist_t list, int i) +static void piecelist_append(pdtp_piecelist_t list, unsigned i) { +#ifndef NDEBUG + debug("Appending piece %d", i); +#endif + if(!list->head) { list->head = list->tail = NEW(pdtp_piecelistnode); list->head->start = i; @@ -187,11 +192,19 @@ pdtp_piecelistnode_t cur; uint32_t *buffer; +#ifndef NDEBUG + debug("Allocating %d element piece list", list->count); +#endif + buffer = (uint32_t *)pdtp__alloc(0, list->count * 8); for(cur = list->head; cur; cur = cur->next) { - buffer[i++] = cur->start; - buffer[i++] = cur->end; + buffer[i++] = htonl(cur->start); + buffer[i++] = htonl(cur->end); + +#ifndef NDEBUG + debug("Adding piece range from %d to %d", cur->start, cur->end); +#endif } pdtp__transaction_add_object(txn, OBJ_PIECELIST, buffer, list->count * 8); Modified: transaction.c =================================================================== --- transaction.c 2004-12-23 08:42:38 UTC (rev 60) +++ transaction.c 2005-01-03 20:41:37 UTC (rev 61) @@ -1,6 +1,6 @@ /* transaction.c - Code for constructing and processing transactions - Copyright (C)2003-04 Anthony Arcieri + Copyright (C)2003-05 Anthony Arcieri All rights reserved. Redistribution and use in source and binary forms, with or without @@ -973,6 +973,10 @@ if(!(*in = pdtp__transact(conn, out, PDTP_STORE))) return -1; + /* Check for error */ + if(pdtp__transaction_id(*in) != TXN_DOWNLOAD) + return -1; + /* XXX Need to figure out a way to GC this */ params = NEW(async_transaction_params); params->conn = conn; |
From: <tar...@pd...> - 2004-12-23 08:41:57
|
Author: tarcieri Date: 2004-12-23 01:42:38 -0700 (Thu, 23 Dec 2004) New Revision: 60 Modified: download.c Log: Download support updates Modified: download.c =================================================================== --- download.c 2004-12-23 08:26:54 UTC (rev 59) +++ download.c 2004-12-23 08:42:38 UTC (rev 60) @@ -593,6 +593,7 @@ static int download_piece_callback(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) { piece_transfer_params_t params = (piece_transfer_params_t)arg; + uint32_t count; #ifndef NDEBUG debug("download_piece_callback invoked"); @@ -626,8 +627,14 @@ debug("Doing read_next_chunk"); #endif + /* Calculate bytes to read */ + count = params->bytes_total - params->bytes_transferred; + + if(count > TRANSFER_CHUNK_SIZE) + count = TRANSFER_CHUNK_SIZE; + /* XXX Add proper size calculation here */ - pdtp__async_read(mplx, sock, TRANSFER_CHUNK_SIZE, download_piece_callback, arg); + pdtp__async_read(mplx, sock, count, download_piece_callback, arg); return 1; err: |
From: <tar...@pd...> - 2004-12-23 08:26:17
|
Author: tarcieri Date: 2004-12-23 01:26:54 -0700 (Thu, 23 Dec 2004) New Revision: 59 Modified: download.c Log: Download support improvements Modified: download.c =================================================================== --- download.c 2004-12-22 00:01:23 UTC (rev 58) +++ download.c 2004-12-23 08:26:54 UTC (rev 59) @@ -229,9 +229,11 @@ debug("pdtp_download_run() starting"); + dl->file = file; + /* pdtp_piecelist_create() handles not only MD5 checksumming but also truncating the file to the specified length */ - if(!(list = pdtp__piecelist_create(file, (pdtp_filelen_t)dl->length, dl->piece_length, dl->hashes))) { + if(!(list = pdtp__piecelist_create(dl->file, (pdtp_filelen_t)dl->length, dl->piece_length, dl->hashes))) { /* XXX Some sort of error handling here */ debug("Couldn't hash existing file"); @@ -309,7 +311,7 @@ { int i; uint16_t id = pdtp__transaction_id(txn); - uint32_t ip, piece_index; + uint32_t ip, piece_index, piece_count; uint16_t port; pdtp_sock_t sock; @@ -368,9 +370,18 @@ params->bytes_transferred = 0; params->bytes_offset = 0; - /* XXX Initialize this value properly */ - params->bytes_total = dl->piece_length; + /* Calculate number of pieces in file */ + piece_count = dl->length / dl->piece_length; + if(dl->length % dl->piece_length) + piece_count++; + + /* Calculate length of piece to transfer */ + if(piece_index < piece_count - 1) + params->bytes_total = dl->piece_length; + else + params->bytes_total = dl->length % dl->piece_length; + pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, download_connect_handler, params); return 1; @@ -525,6 +536,10 @@ value = htonl(params->bytes_total); memcpy(piece_request + 4, &value, 4); +#ifndef DEBUG + debug("Piece download beginning"); +#endif + pdtp__async_write(mplx, sock, piece_request, 8, download_piece_callback, arg); return 1; @@ -579,6 +594,10 @@ { piece_transfer_params_t params = (piece_transfer_params_t)arg; +#ifndef NDEBUG + debug("download_piece_callback invoked"); +#endif + if(bytes < 0) goto err; @@ -603,11 +622,18 @@ } read_next_chunk: +#ifndef NDEBUG + debug("Doing read_next_chunk"); +#endif + /* XXX Add proper size calculation here */ pdtp__async_read(mplx, sock, TRANSFER_CHUNK_SIZE, download_piece_callback, arg); - goto done; + return 1; err: +#ifndef DEBUG + debug("Transfer failure!"); +#endif /* XXX transfer_failure_notify(params); */ done: pdtp__multiplexer_remove_socket(mplx, sock); |
From: <tar...@pd...> - 2004-12-22 00:00:44
|
Author: tarcieri Date: 2004-12-21 17:01:23 -0700 (Tue, 21 Dec 2004) New Revision: 58 Modified: async.c download.c fileops.c fileops.h socketops.c transaction.c transaction.h Log: Download support improvements Modified: async.c =================================================================== --- async.c 2004-12-21 00:19:43 UTC (rev 57) +++ async.c 2004-12-22 00:01:23 UTC (rev 58) @@ -85,14 +85,9 @@ int n, ret; async_io_parameters_t params = (async_io_parameters_t)arg; -#ifndef NDEBUG - debug("async_read_handler invoked, trying to get %d bytes", params->bytes_total - params->bytes_transferred); -#endif - if((n = pdtp__atomic_readsock(sock, params->buffer + params->bytes_transferred, params->bytes_total - params->bytes_transferred)) < 1) { #ifndef NDEBUG debug("async_read_handler: Read error (%d)", n); - perror("pdtp__atomic_readsock"); #endif pdtp__multiplexer_remove_socket(mplx, sock); ret = params->callback(mplx, sock, 0, -1, params->arg); Modified: download.c =================================================================== --- download.c 2004-12-21 00:19:43 UTC (rev 57) +++ download.c 2004-12-22 00:01:23 UTC (rev 58) @@ -35,6 +35,7 @@ #include "pdtp.h" #include "sockets.h" +#include <assert.h> #include <stdio.h> #include <string.h> @@ -81,19 +82,19 @@ }; /* This structure stores piece transfer initialization parameters */ -typedef struct piece_transfer_parameters { +typedef struct piece_transfer_params { pdtp_download_t dl; uint32_t peer_address; uint16_t opcode; uint32_t piece_index; - unsigned bytes_transferred, bytes_total; + unsigned bytes_transferred, bytes_offset, bytes_total; uint8_t started; -} *piece_transfer_parameters_t; +} *piece_transfer_params_t; /* This structure stores incoming transfer parameters */ typedef struct incoming_connection_parameters { int fd; - piece_transfer_parameters_t params; + piece_transfer_params_t params; } *incoming_connection_parameters_t; /*** PROTOTYPES */ @@ -104,15 +105,24 @@ /* Handle a completed outbound connection */ static int download_connect_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg); -/* Read the response of a host sent a PTPP request */ +/* Read the response of a host sent a PTPP handshake */ static int download_handshake_read(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); -/* Handle the response to a PTPP request */ -static int download_handshake_response_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); +/* Send a piece transfer request after receiving a PTPP response */ +static int download_send_request(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); +/* Upload a piece to the given socket using asynchronous I/O */ +static int upload_piece(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + +/* Download a piece from the given socket using asynchronous I/O */ +static int download_piece(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + +/* Self-invoking callback which transfers a piece via asynchronous I/O */ +static int download_piece_callback(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + /* The high level interface is defined first. Event handlers and actual - mechanics of the download engine follow below as a series of static - functions which are internal to this module. */ + mechanics of the download engine follow below as the series of static + functions whose prototypes are defined above. */ pdtp_download_t pdtp_download_create(pdtp_connection_t conn, const char *path) { unsigned i, piece_count; @@ -212,8 +222,10 @@ void pdtp_download_run(pdtp_download_t dl, pdtp_file_t file) { + int i; pdtp_piecelist_t list; - pdtp_transaction_out_t txn; + pdtp_transaction_out_t out; + pdtp_transaction_in_t in; debug("pdtp_download_run() starting"); @@ -227,15 +239,37 @@ return; } - txn = pdtp__transaction_create(TXN_DOWNLOAD, 2); - pdtp__transaction_add_string(txn, OBJ_PATH, dl->path); - pdtp__piecelist_add_to_transaction(txn, list); + out = pdtp__transaction_create(TXN_DOWNLOAD, 2); + pdtp__transaction_add_string(out, OBJ_PATH, dl->path); + pdtp__piecelist_add_to_transaction(out, list); dl->mplx = pdtp__multiplexer_create(dl->conn); - pdtp__transact_async(dl->conn, dl->mplx, txn, download_transaction_handler, 0, 0); + if(pdtp__transact_async(dl->conn, dl->mplx, out, &in, download_transaction_handler, dl) < 0) { + /* XXX Handle error properly */ +#ifndef DEBUG + debug("Download initialization failed"); +#endif + + if(in) + pdtp__transaction_in_destroy(in); + + return; + } + + /* Since the asynchronous filter was created successfully this should + never fail unless there's a bug in the async transaction code */ + assert((i = pdtp__transaction_object_index(in, OBJ_HANDLE)) == 0); + + /* Extract the file handle from the response transaction */ + assert(pdtp__transaction_object_uint32(in, i, &dl->handle) == 0); + + pdtp__transaction_in_destroy(in); + + /* Run the main multiplexer loop, executing the download */ pdtp__multiplexer_run(dl->mplx); + /* Destroy the multiplexer and its associated filter rules */ pdtp__multiplexer_destroy(dl->mplx); return; @@ -246,7 +280,7 @@ } #if 0 -static void transfer_success_notify(piece_transfer_parameters_t params) +static void transfer_success_notify(piece_transfer_params_t params) { pdtp_transaction_out_t txn; @@ -258,7 +292,7 @@ pdtp__transaction_out_destroy(txn); } -static void transfer_failure_notify(piece_transfer_parameters_t params) +static void transfer_failure_notify(piece_transfer_params_t params) { pdtp_transaction_out_t txn; @@ -281,6 +315,9 @@ pdtp_sock_t sock; pdtp_addr_t addr; + piece_transfer_params_t params; + pdtp_download_t dl = (pdtp_download_t)arg; + #ifndef NDEBUG debug("Download: Incoming transaction (%x)", id); #endif @@ -322,12 +359,24 @@ if(sock < 0) goto err; - pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, download_connect_handler, 0); + params = NEW(piece_transfer_params); + params->started = 0; + params->dl = dl; + params->peer_address = ip; + params->opcode = id; + params->piece_index = piece_index; + params->bytes_transferred = 0; + params->bytes_offset = 0; + /* XXX Initialize this value properly */ + params->bytes_total = dl->piece_length; + + pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, download_connect_handler, params); + return 1; err: #ifndef NDEBUG - debug("An error occured!"); + debug("An error occured processing a download transaction!"); #endif return 0; @@ -345,6 +394,8 @@ #ifndef NDEBUG debug("Error: Connection refused"); #endif + + pdtp__free(arg); pdtp__socket_close(sock); return 1; } @@ -369,43 +420,199 @@ #ifndef NDEBUG debug("Write error sending PTPP handshake"); #endif + + pdtp__free(arg); pdtp__socket_close(sock); return 1; } - pdtp__async_read(mplx, sock, 8, download_handshake_response_handler, arg); + pdtp__async_read(mplx, sock, 8, download_send_request, arg); return 1; } -static int download_handshake_response_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +static int download_send_request(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) { + piece_transfer_params_t params = (piece_transfer_params_t)arg; + pdtp_async_handler_t transfer_function = 0; + uint16_t v16; + uint32_t v32; + uint8_t request[18]; + + if(bytes != 8) + goto err; + + /* Check the magic number */ + if(memcmp(buffer, "PTPP", 4)) + goto err; + + /* Check if the minimum supported protocol version is 1 */ + memcpy(&v16, (uint8_t *)buffer + 4, 2); + + if(htons(v16) != 1) + goto err; + + /* Select the protocol version. Currently hardcoded to 1 */ + v16 = htons(1); + memcpy(request, &v16, 2); + + /* Set operation code. */ + v16 = htons(params->opcode); + memcpy(request + 2, &v16, 2); + + /* Set address of coordinating server */ + v32 = htonl(pdtp__addr_ipv4(params->dl->conn->addr)); + memcpy(request + 4, &v32, 4); + + /* Set port of coordinating server. Initialize properly */ + v16 = htons(pdtp__addr_port(params->dl->conn->addr)); + memcpy(request + 8, &v16, 2); + + /* Set file handle ID. */ + v32 = htonl(params->dl->handle); + memcpy(request + 10, &v32, 4); + + /* Set piece ID. */ + v32 = htonl(params->piece_index); + memcpy(request + 14, &v32, 4); + + switch(params->opcode) { + case TXN_SENDPIECE: + transfer_function = upload_piece; + break; + case TXN_RECVPIECE: + transfer_function = download_piece; + break; + } + + pdtp__async_write(mplx, sock, request, 18, transfer_function, params); + + return 1; + +err: + pdtp__free(arg); + pdtp__socket_close(sock); + return 1; +} + +static int upload_piece(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ #ifndef NDEBUG - debug("Received handshake response"); + debug("Error: Piece uploads are not yet implemented"); #endif - /* XXX Handle read error */ - if(bytes != 8) { -#ifndef NDEBUG - debug("Read error/connection closed"); -#endif - goto err; + return 0; +} + +static int download_piece(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ + piece_transfer_params_t params = (piece_transfer_params_t)arg; + uint8_t piece_request[8]; + uint32_t value; + + if(bytes != 18) { + pdtp__free(arg); + pdtp__socket_close(sock); + + return 1; } - if(memcmp(buffer, "PTPP", 4)) - goto err; + /* Offset of piece chunk to transfer */ + value = htonl(params->bytes_offset); + memcpy(piece_request, &value, 4); -#ifndef NDEBUG - debug("Received valid PTPP response"); -#endif + /* Size of chunk to transfer */ + value = htonl(params->bytes_total); + memcpy(piece_request + 4, &value, 4); + pdtp__async_write(mplx, sock, piece_request, 8, download_piece_callback, arg); + return 1; +} +#if 0 +static int upload_piece_callback(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ + int nbytes; + uint8_t chunk[TRANSFER_CHUNK_SIZE]; + + piece_transfer_params_t params = (piece_transfer_params_t)arg; + + if(bytes < 0) + goto err; + + /* Check and set flag for if the transfer has started */ + if(!params->started) + params->started = 1; + else + params->bytes_transferred += bytes; + + if(params->bytes_transferred == params->bytes_total) { + /* XXX transfer_success_notify(params); */ + goto done; + } + + /* XXX Input validation on the piece index should occur earlier on */ + if(pdtp__file_seek(params->dl->file, (uint64_t)params->dl->piece_length * params->piece_index + params->bytes_transferred) < 0) + goto err; + + /* XXX Implement proper calculation of the number of bytes to read */ + if((nbytes = pdtp__file_read(params->dl->file, chunk, TRANSFER_CHUNK_SIZE)) < 0) + goto err; + + pdtp__async_write(mplx, sock, chunk, nbytes, upload_piece_callback, arg); + + return 1; + err: -#ifndef NDEBUG - debug("Error reading PTPP response"); + /* XXX transfer_failure_notify(params); */ +done: + /* XXX Add support for persistent connections */ + pdtp__free(params); + pdtp__socket_close(sock); + + return 1; +} #endif - - pdtp__socket_close(sock); - return 1; + +static int download_piece_callback(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ + piece_transfer_params_t params = (piece_transfer_params_t)arg; + + if(bytes < 0) + goto err; + + /* XXX Add proper size calculation here */ + if(!params->started) { + params->started = 1; + goto read_next_chunk; + } + + /* XXX Input validation on the piece index should occur earlier on */ + if(pdtp__file_seek(params->dl->file, (uint64_t)params->dl->piece_length * params->piece_index + params->bytes_transferred) < 0) + goto err; + + if(pdtp__file_write(params->dl->file, buffer, bytes) < 0) + goto err; + + params->bytes_transferred += bytes; + + if(params->bytes_transferred == params->bytes_total) { + /* XXX transfer_success_notify(params); */ + goto done; + } + +read_next_chunk: + /* XXX Add proper size calculation here */ + pdtp__async_read(mplx, sock, TRANSFER_CHUNK_SIZE, download_piece_callback, arg); + + goto done; +err: + /* XXX transfer_failure_notify(params); */ +done: + pdtp__multiplexer_remove_socket(mplx, sock); + pdtp__free(params); + pdtp__socket_close(sock); + + return 1; } Modified: fileops.c =================================================================== --- fileops.c 2004-12-21 00:19:43 UTC (rev 57) +++ fileops.c 2004-12-22 00:01:23 UTC (rev 58) @@ -154,7 +154,7 @@ return ret; } -int pdtp__file_write(pdtp_file_t file, void *buf, unsigned int nbytes) +int pdtp__file_write(pdtp_file_t file, const void *buf, unsigned int nbytes) { #ifdef WIN32 DWORD ret; Modified: fileops.h =================================================================== --- fileops.h 2004-12-21 00:19:43 UTC (rev 57) +++ fileops.h 2004-12-22 00:01:23 UTC (rev 58) @@ -21,6 +21,6 @@ int pdtp__file_read(pdtp_file_t file, void *buf, unsigned int nbytes); /** This function writes the given number of bytes (no 64-bit support) */ -int pdtp__file_write(pdtp_file_t file, void *buf, unsigned int nbytes); +int pdtp__file_write(pdtp_file_t file, const void *buf, unsigned int nbytes); #endif Modified: socketops.c =================================================================== --- socketops.c 2004-12-21 00:19:43 UTC (rev 57) +++ socketops.c 2004-12-22 00:01:23 UTC (rev 58) @@ -367,7 +367,7 @@ if(select((int)sock + 1, 0, &fdset, 0, &tv) < 1) return -1; - return send(sock, buf, nbytes, 0); + return pdtp__atomic_writesock(sock, buf, nbytes); } /* Write function. This uses a loop to ensure the data is written completely, Modified: transaction.c =================================================================== --- transaction.c 2004-12-21 00:19:43 UTC (rev 57) +++ transaction.c 2004-12-22 00:01:23 UTC (rev 58) @@ -803,7 +803,7 @@ uint16_t obj_count; txn->serial = htonl(txn->serial); - txn->id = htons(txn->id); + txn->id = htons(txn->id); length = htonl(txn->length); obj_count = htons(txn->obj_count); @@ -820,7 +820,7 @@ DWORD bytes = 0; txn->serial = htonl(txn->serial); - txn->id = htons(txn->id); + txn->id = htons(txn->id); length = htonl(txn->length); obj_count = htons(txn->obj_count); @@ -847,7 +847,7 @@ v32 = htonl(txn->serial); memcpy(buffer + 4, &v32, 4); - + v16 = htons(txn->id); memcpy(buffer + 8, &v16, 2); @@ -881,10 +881,10 @@ pdtp_transaction_in_t pdtp__transact(pdtp_connection_t conn, pdtp_transaction_out_t out, pdtp_read_mode_t mode) { pdtp_transaction_in_t in; - + /* XXX This implementation is temporary as it is incompatible with pdtp__transact_async */ - + out->serial = pdtp__txnmgr_get_serial(conn->txnmgr); pdtp__transaction_write(conn, out); @@ -900,12 +900,12 @@ if(pdtp__transaction_read_to_object(in, in->total_objs - 1) < 0) goto err; } - + return in; err: - /* XXX Mark the connection as bad and return 0. IMPORTANT: Ensure - conn->socket_lock is released before function returns. */ - return 0; + /* XXX Mark the connection as bad and return 0. IMPORTANT: Ensure + conn->socket_lock is released before function returns. */ + return 0; } typedef struct async_transaction_params { @@ -920,18 +920,18 @@ pdtp_transaction_in_t txn; async_transaction_params_t params = (async_transaction_params_t)arg; - pdtp__mutex_lock(params->conn->socket_lock); + pdtp__mutex_lock(params->conn->socket_lock); #ifndef NDEBUG - debug("+++ socket_lock: pdtp__transaction_read_direct()"); + debug("+++ socket_lock: pdtp__transaction_read_direct()"); #endif /* XXX Error handling needed here */ - assert((txn = pdtp__transaction_read_header(params->conn))); + assert((txn = pdtp__transaction_read_header(params->conn))); - if(txn->total_objs) { + if(txn->total_objs) { /* XXX Error handling needed here */ - assert(!pdtp__transaction_read_to_object(txn, txn->total_objs - 1)); - } + assert(!pdtp__transaction_read_to_object(txn, txn->total_objs - 1)); + } params->success_callback(mplx, txn, params->arg); pdtp__transaction_in_destroy(txn); @@ -940,33 +940,38 @@ } /* Asynchronous transaction interface. A connection and transaction are - specified, as well as a multiplexer, success callback, and error callback. + specified, as well as a multiplexer, success callback, and callback argument. This function is currently hardcoded only to support TXN_DOWNLOAD. Passing a transaction of any other type will result in an assertion failure. - The specified transaction will be synchronously sent to the server. If - an error response is received, the specified error callback will be - invoked with the server's error message. The error callback will also - be invoked if the format of the response transaction is wrong. The - response transaction should include one 32-bit unsigned integer object - which will be used as the key for receiving asynchronous transactions. - If a successful response of the proper format is received, then a filter - is created for the given multiplexer which will invoke the specified - success_callback whenever a transaction is received containing the same - object as received in the initial successful transaction response. + The specified transaction will be synchronously sent to the server. If an + error occurs receiving a response, an error response is received, or the + response transaction is malformatted, the function will return -1. + Otherwise, the function will return 0. If a response is successfully + received and the pdtp_transaction_in_t pointer is non-NULL, then the + response will be returned by assigning it to the given object reference. + In order for the filter to be created, he response transaction should + include only a single 32-bit unsigned integer object which will be used as + the key for receiving asynchronous transactions. If a successful response + of the proper format is received, then a filter is created for the given + multiplexer which will invoke the specified success_callback whenever a + transaction is received containing the same object as received in the + initial successful transaction response. + The interface currently specifies no way to destroy filter rules for a given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. - */ -void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) + */ +int pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, pdtp_transaction_in_t *in, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void *arg) { - pdtp_transaction_in_t in; async_transaction_params_t params; - + assert(out->id == TXN_DOWNLOAD); - assert((in = pdtp__transact(conn, out, PDTP_STORE))); + /* If we receive no transaction, return -1 */ + if(!(*in = pdtp__transact(conn, out, PDTP_STORE))) + return -1; /* XXX Need to figure out a way to GC this */ params = NEW(async_transaction_params); @@ -975,5 +980,6 @@ params->arg = arg; pdtp__multiplexer_add_socket(mplx, conn->sock, PDTP_MPLX_READ, async_transaction_handler, params); + + return 0; } - Modified: transaction.h =================================================================== --- transaction.h 2004-12-21 00:19:43 UTC (rev 57) +++ transaction.h 2004-12-22 00:01:23 UTC (rev 58) @@ -24,26 +24,30 @@ pdtp_transaction_in_t pdtp__transact(pdtp_connection_t, pdtp_transaction_out_t, pdtp_read_mode_t); /* Asynchronous transaction interface. A connection and transaction are - specified, as well as a multiplexer, success callback, and error callback. + specified, as well as a multiplexer, success callback, and callback argument. This function is currently hardcoded only to support TXN_DOWNLOAD. Passing a transaction of any other type will result in an assertion failure. - The specified transaction will be synchronously sent to the server. If - an error response is received, the specified error callback will be - invoked with the server's error message. The error callback will also - be invoked if the format of the response transaction is wrong. The - response transaction should include one 32-bit unsigned integer object - which will be used as the key for receiving asynchronous transactions. - If a successful response of the proper format is received, then a filter - is created for the given multiplexer which will invoke the specified - success_callback whenever a transaction is received containing the same - object as received in the initial successful transaction response. + The specified transaction will be synchronously sent to the server. If an + error occurs receiving a response, an error response is received, or the + response transaction is malformatted, the function will return -1. + Otherwise, the function will return 0. If a response is successfully + received and the pdtp_transaction_in_t pointer is non-NULL, then the + response will be returned by assigning it to the given object reference. + + In order for the filter to be created, he response transaction should + include only a single 32-bit unsigned integer object which will be used as + the key for receiving asynchronous transactions. If a successful response + of the proper format is received, then a filter is created for the given + multiplexer which will invoke the specified success_callback whenever a + transaction is received containing the same object as received in the + initial successful transaction response. The interface currently specifies no way to destroy filter rules for a given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. */ -void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); +int pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, pdtp_transaction_in_t *, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void *arg); void pdtp__transaction_in_destroy(pdtp_transaction_in_t); pdtp_transaction_type_t pdtp__transaction_type(pdtp_transaction_in_t); |
From: <tar...@pd...> - 2004-12-21 00:19:10
|
Author: tarcieri Date: 2004-12-20 17:19:43 -0700 (Mon, 20 Dec 2004) New Revision: 57 Added: async.c async.h Log: Inclusion of previously missing asynchronous I/O code Added: async.c =================================================================== --- async.c 2004-12-21 00:12:02 UTC (rev 56) +++ async.c 2004-12-21 00:19:43 UTC (rev 57) @@ -0,0 +1,155 @@ +/* + async.c - Asynchronous read/write functions + Copyright (C)2003-04 Anthony Arcieri + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * The name of Anthony Arcieri may not be used to endorse or promote + products derived from this software without specific prior written + permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "machdep.h" +#include "pdtp.h" + +#include <string.h> + +#include "alloc.h" +#include "async.h" +#include "debug.h" +#include "multiplexer.h" +#include "socketops.h" + +typedef struct async_io_parameters { + uint8_t *buffer; + unsigned bytes_transferred, bytes_total; + pdtp_async_handler_t callback; + void *arg; +} *async_io_parameters_t; + +static int async_read_handler(pdtp_multiplexer_t, pdtp_sock_t, void *arg); +static int async_write_handler(pdtp_multiplexer_t, pdtp_sock_t, void *arg); + +void pdtp__async_read(pdtp_multiplexer_t mplx, pdtp_sock_t sock, unsigned length, pdtp_async_handler_t completion_callback, void *arg) +{ + async_io_parameters_t params = NEW(async_io_parameters); + + params->buffer = pdtp__alloc(0, length); + params->bytes_transferred = 0; + params->bytes_total = length; + params->callback = completion_callback; + params->arg = arg; + + pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_READ, async_read_handler, params); +} + +void pdtp__async_write(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *buffer, unsigned length, pdtp_async_handler_t completion_callback, void *arg) +{ + async_io_parameters_t params = NEW(async_io_parameters); + + params->buffer = pdtp__alloc(0, length); + params->bytes_transferred = 0; + params->bytes_total = length; + params->callback = completion_callback; + params->arg = arg; + + memcpy(params->buffer, buffer, length); + + pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, async_write_handler, params); +} + +static int async_read_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg) +{ + int n, ret; + async_io_parameters_t params = (async_io_parameters_t)arg; + +#ifndef NDEBUG + debug("async_read_handler invoked, trying to get %d bytes", params->bytes_total - params->bytes_transferred); +#endif + + if((n = pdtp__atomic_readsock(sock, params->buffer + params->bytes_transferred, params->bytes_total - params->bytes_transferred)) < 1) { +#ifndef NDEBUG + debug("async_read_handler: Read error (%d)", n); + perror("pdtp__atomic_readsock"); +#endif + pdtp__multiplexer_remove_socket(mplx, sock); + ret = params->callback(mplx, sock, 0, -1, params->arg); + + pdtp__free(params->buffer); + pdtp__free(params); + + return ret; + } + + params->bytes_transferred += n; + +#ifndef NDEBUG + debug("async_read_handler: %d bytes read, %d remain", n, params->bytes_total - params->bytes_transferred); +#endif + + if(params->bytes_transferred < params->bytes_total) + return 1; + + pdtp__multiplexer_remove_socket(mplx, sock); + ret = params->callback(mplx, sock, params->buffer, params->bytes_total, params->arg); + + pdtp__free(params->buffer); + pdtp__free(params); + + return ret; +} + +static int async_write_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg) +{ + int n, ret; + async_io_parameters_t params = (async_io_parameters_t)arg; + +#ifndef NDEBUG + debug("async_write_handler invoked"); +#endif + + if((n = pdtp__atomic_writesock(sock, params->buffer + params->bytes_transferred, params->bytes_total - params->bytes_transferred)) < 1) { + pdtp__multiplexer_remove_socket(mplx, sock); + ret = params->callback(mplx, sock, 0, -1, params->arg); + + pdtp__free(params->buffer); + pdtp__free(params); + + return ret; + } + + params->bytes_transferred += n; + + if(params->bytes_transferred < params->bytes_total) + return 1; + + pdtp__multiplexer_remove_socket(mplx, sock); + ret = params->callback(mplx, sock, params->buffer, params->bytes_total, params->arg); + + pdtp__free(params->buffer); + pdtp__free(params); + + return ret; +} Added: async.h =================================================================== --- async.h 2004-12-21 00:12:02 UTC (rev 56) +++ async.h 2004-12-21 00:19:43 UTC (rev 57) @@ -0,0 +1,24 @@ +#ifndef ASYNC_H +#define ASYNC_H + +#include "machdep.h" +#include "pdtp.h" + +#include "multiplexer.h" +#include "socketops.h" + +typedef int (*pdtp_async_handler_t)(pdtp_multiplexer_t, pdtp_sock_t, const void *buffer, int bytes, void *arg); + +/* Asynchronous read function. The specified amount of data will be read + from the given socket, Once this has been completed, the specified + completion_callback will be invoked with the data and the given argument */ +void pdtp__async_read(pdtp_multiplexer_t mplx, pdtp_sock_t sock, unsigned length, pdtp_async_handler_t completion_callback, void *arg); + +/* Asynchronous write function. The specified amount of data will be + written from the specified buffer to the given socket. Once this has + been completed, the specified completion_callback will be invoked with + the given argument */ +void pdtp__async_write(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *buffer, +unsigned length, pdtp_async_handler_t completion_callback, void *arg); + +#endif /* ASYNC_H */ |
From: <tar...@pd...> - 2004-12-21 00:11:24
|
Author: tarcieri Date: 2004-12-20 17:12:02 -0700 (Mon, 20 Dec 2004) New Revision: 56 Modified: download.c multiplexer.c Log: Improved debugging Modified: download.c =================================================================== --- download.c 2004-12-20 23:15:23 UTC (rev 55) +++ download.c 2004-12-21 00:12:02 UTC (rev 56) @@ -365,7 +365,7 @@ #endif /* XXX Handle write error */ - if(bytes < 0) { + if(bytes != 4) { #ifndef NDEBUG debug("Write error sending PTPP handshake"); #endif @@ -385,8 +385,12 @@ #endif /* XXX Handle read error */ - if(bytes < 0) + if(bytes != 8) { +#ifndef NDEBUG + debug("Read error/connection closed"); +#endif goto err; + } if(memcmp(buffer, "PTPP", 4)) goto err; Modified: multiplexer.c =================================================================== --- multiplexer.c 2004-12-20 23:15:23 UTC (rev 55) +++ multiplexer.c 2004-12-21 00:12:02 UTC (rev 56) @@ -173,6 +173,10 @@ continue; } +#ifndef DEBUG + debug("multiplexer: Socket %d has revents %d", mplx->fdtable[mplx->current_entry].fd, mplx->fdtable[mplx->current_entry].revents); +#endif + mplx->remaining_events--; fd = mplx->fdtable[mplx->current_entry].fd; handler = mplx->callback_table[mplx->current_entry].handler; |
From: <tar...@pd...> - 2004-12-20 23:14:44
|
Author: tarcieri Date: 2004-12-20 16:15:23 -0700 (Mon, 20 Dec 2004) New Revision: 55 Modified: Makefile.in download.c multiplexer.c multiplexer.h socketops.c transaction.c transaction.h Log: Reinclusion of old download and asynchronous I/O code, various minor cleanups Modified: Makefile.in =================================================================== --- Makefile.in 2004-12-17 23:31:55 UTC (rev 54) +++ Makefile.in 2004-12-20 23:15:23 UTC (rev 55) @@ -8,6 +8,7 @@ SHLIB_NAME=libpdtp.so.0 OBJS= alloc.o \ + async.o \ collection.o \ connection.o \ directory.o \ Modified: download.c =================================================================== --- download.c 2004-12-17 23:31:55 UTC (rev 54) +++ download.c 2004-12-20 23:15:23 UTC (rev 55) @@ -39,6 +39,7 @@ #include <string.h> #include "alloc.h" +#include "async.h" #include "connection.h" #include "debug.h" #include "errmsg.h" @@ -95,9 +96,20 @@ piece_transfer_parameters_t params; } *incoming_connection_parameters_t; -/* Prototypes */ -static void download_transaction_handler(const pdtp_transaction_in_t, void *arg); +/*** PROTOTYPES */ +/* Handle all incoming transactions relating to the download */ +static int download_transaction_handler(pdtp_multiplexer_t mplx, const pdtp_transaction_in_t, void *arg); + +/* Handle a completed outbound connection */ +static int download_connect_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg); + +/* Read the response of a host sent a PTPP request */ +static int download_handshake_read(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + +/* Handle the response to a PTPP request */ +static int download_handshake_response_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg); + /* The high level interface is defined first. Event handlers and actual mechanics of the download engine follow below as a series of static functions which are internal to this module. */ @@ -259,13 +271,16 @@ } #endif -static void download_transaction_handler(const pdtp_transaction_in_t txn, void *arg) +static int download_transaction_handler(pdtp_multiplexer_t mplx, const pdtp_transaction_in_t txn, void *arg) { int i; uint16_t id = pdtp__transaction_id(txn); uint32_t ip, piece_index; uint16_t port; + pdtp_sock_t sock; + pdtp_addr_t addr; + #ifndef NDEBUG debug("Download: Incoming transaction (%x)", id); #endif @@ -300,11 +315,93 @@ debug("Getting piece %d from %08x:%d", piece_index, ip, port); #endif - return; + addr = pdtp__ipv4_addr_create(ip, port); + sock = pdtp__nonblock_connect(addr); + pdtp__addr_destroy(addr); + + if(sock < 0) + goto err; + + pdtp__multiplexer_add_socket(mplx, sock, PDTP_MPLX_WRITE, download_connect_handler, 0); + + return 1; err: #ifndef NDEBUG debug("An error occured!"); #endif - return; + return 0; } + +static int download_connect_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg) +{ + pdtp__multiplexer_remove_socket(mplx, sock); + +#ifndef NDEBUG + debug("Nonblock connect complete!"); +#endif + + if(pdtp__nonblock_connect_complete(sock) < 0) { +#ifndef NDEBUG + debug("Error: Connection refused"); +#endif + pdtp__socket_close(sock); + return 1; + } + +#ifndef NDEBUG + debug("Sending PTPP request..."); +#endif + + pdtp__async_write(mplx, sock, "PTPP", 4, download_handshake_read, arg); + + return 1; +} + +static int download_handshake_read(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ +#ifndef NDEBUG + debug("Handshake request sent!"); +#endif + + /* XXX Handle write error */ + if(bytes < 0) { +#ifndef NDEBUG + debug("Write error sending PTPP handshake"); +#endif + pdtp__socket_close(sock); + return 1; + } + + pdtp__async_read(mplx, sock, 8, download_handshake_response_handler, arg); + + return 1; +} + +static int download_handshake_response_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, const void *buffer, int bytes, void *arg) +{ +#ifndef NDEBUG + debug("Received handshake response"); +#endif + + /* XXX Handle read error */ + if(bytes < 0) + goto err; + + if(memcmp(buffer, "PTPP", 4)) + goto err; + +#ifndef NDEBUG + debug("Received valid PTPP response"); +#endif + + return 1; + +err: +#ifndef NDEBUG + debug("Error reading PTPP response"); +#endif + + pdtp__socket_close(sock); + return 1; +} Modified: multiplexer.c =================================================================== --- multiplexer.c 2004-12-17 23:31:55 UTC (rev 54) +++ multiplexer.c 2004-12-20 23:15:23 UTC (rev 55) @@ -137,12 +137,12 @@ pdtp__free(mplx); } -void pdtp__multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg) +void pdtp__multiplexer_add_socket(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg) { add_entry(mplx, sock, type, handler, arg); } -void pdtp__multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock) +void pdtp__multiplexer_remove_socket(pdtp_multiplexer_t mplx, pdtp_sock_t sock) { unsigned i; @@ -180,7 +180,7 @@ mplx->current_entry++; - if(!handler(fd, arg)) + if(!handler(mplx, fd, arg)) return; } } Modified: multiplexer.h =================================================================== --- multiplexer.h 2004-12-17 23:31:55 UTC (rev 54) +++ multiplexer.h 2004-12-20 23:15:23 UTC (rev 55) @@ -8,13 +8,13 @@ typedef struct pdtp_multiplexer *pdtp_multiplexer_t; typedef enum { PDTP_MPLX_READ, PDTP_MPLX_WRITE } pdtp_event_t; -typedef int (*pdtp_event_handler_t)(pdtp_sock_t sock, void *arg); +typedef int (*pdtp_event_handler_t)(pdtp_multiplexer_t, pdtp_sock_t, void *arg); pdtp_multiplexer_t pdtp__multiplexer_create(pdtp_connection_t); void pdtp__multiplexer_destroy(pdtp_multiplexer_t); -void pdtp__multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg); -void pdtp__multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock); +void pdtp__multiplexer_add_socket(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg); +void pdtp__multiplexer_remove_socket(pdtp_multiplexer_t mplx, pdtp_sock_t sock); void pdtp__multiplexer_run(pdtp_multiplexer_t mplx); Modified: socketops.c =================================================================== --- socketops.c 2004-12-17 23:31:55 UTC (rev 54) +++ socketops.c 2004-12-20 23:15:23 UTC (rev 55) @@ -388,7 +388,6 @@ return 0; } - int pdtp__writesock_int16(pdtp_sock_t sock, uint16_t value) { uint16_t v = htons(value); @@ -444,3 +443,5 @@ return connection_sock; } + + Modified: transaction.c =================================================================== --- transaction.c 2004-12-17 23:31:55 UTC (rev 54) +++ transaction.c 2004-12-20 23:15:23 UTC (rev 55) @@ -910,12 +910,12 @@ typedef struct async_transaction_params { pdtp_connection_t conn; - void (*success_callback)(const pdtp_transaction_in_t, void *); + int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *); void *arg; } *async_transaction_params_t; /* Callback for receiving transactions asynchronously */ -static int async_transaction_handler(pdtp_sock_t sock, void *arg) +static int async_transaction_handler(pdtp_multiplexer_t mplx, pdtp_sock_t sock, void *arg) { pdtp_transaction_in_t txn; async_transaction_params_t params = (async_transaction_params_t)arg; @@ -933,7 +933,7 @@ assert(!pdtp__transaction_read_to_object(txn, txn->total_objs - 1)); } - params->success_callback(txn, params->arg); + params->success_callback(mplx, txn, params->arg); pdtp__transaction_in_destroy(txn); return 1; @@ -959,7 +959,7 @@ given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. */ -void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, void (*success_callback)(const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) +void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) { pdtp_transaction_in_t in; async_transaction_params_t params; @@ -974,6 +974,6 @@ params->success_callback = success_callback; params->arg = arg; - pdtp__multiplexer_add_sock(mplx, conn->sock, PDTP_MPLX_READ, async_transaction_handler, params); + pdtp__multiplexer_add_socket(mplx, conn->sock, PDTP_MPLX_READ, async_transaction_handler, params); } Modified: transaction.h =================================================================== --- transaction.h 2004-12-17 23:31:55 UTC (rev 54) +++ transaction.h 2004-12-20 23:15:23 UTC (rev 55) @@ -43,7 +43,7 @@ given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. */ -void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, void (*success_callback)(const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); +void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, int (*success_callback)(pdtp_multiplexer_t, const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); void pdtp__transaction_in_destroy(pdtp_transaction_in_t); pdtp_transaction_type_t pdtp__transaction_type(pdtp_transaction_in_t); |
From: <tar...@pd...> - 2004-12-17 23:31:17
|
Author: tarcieri Date: 2004-12-17 16:31:55 -0700 (Fri, 17 Dec 2004) New Revision: 54 Modified: download.c multiplexer.c multiplexer.h transaction.c transaction.h Log: Initial reimportation of old download code Modified: download.c =================================================================== --- download.c 2004-12-16 00:00:07 UTC (rev 53) +++ download.c 2004-12-17 23:31:55 UTC (rev 54) @@ -56,10 +56,10 @@ struct pdtp_download { /* File path */ char *path; - - /* Handle of file */ - uint32_t handle; - + + /* Handle of file */ + uint32_t handle; + /* Length of file */ uint64_t length; @@ -68,7 +68,7 @@ /* List of hashes */ uint8_t *hashes; - + /* Connection associated with download */ pdtp_connection_t conn; @@ -95,6 +95,9 @@ piece_transfer_parameters_t params; } *incoming_connection_parameters_t; +/* Prototypes */ +static void download_transaction_handler(const pdtp_transaction_in_t, void *arg); + /* The high level interface is defined first. Event handlers and actual mechanics of the download engine follow below as a series of static functions which are internal to this module. */ @@ -104,7 +107,7 @@ uint32_t length; void *data; char *errstr; - + pdtp_transaction_out_t out; pdtp_transaction_in_t in; pdtp_download_t dl; @@ -130,21 +133,21 @@ pdtp__transaction_in_destroy(in); return 0; } - + dl = NEW(pdtp_download); dl->path = pdtp__strdup(path); if((i = pdtp__transaction_object_index(in, OBJ_FILELEN)) < 0) goto err; - + if(pdtp__transaction_object_uint64(in, i, &dl->length) < 0) goto err; #ifndef NDEBUG debug("dl->length is: %d", (int)dl->length); #endif - + if((i = pdtp__transaction_object_index(in, OBJ_PIECELEN)) < 0) goto err; @@ -161,13 +164,13 @@ /* Check if the hashes block is the expected length */ if(pdtp__transaction_object_length(in, i, &length) < 0) goto err; - + if(piece_count * 16 != length) goto err; if(!(data = pdtp__transaction_object_data(in, i))) goto err; - + dl->hashes = (uint8_t *)pdtp__alloc(0, length); memcpy(dl->hashes, data, length); @@ -201,7 +204,7 @@ pdtp_transaction_out_t txn; debug("pdtp_download_run() starting"); - + /* pdtp_piecelist_create() handles not only MD5 checksumming but also truncating the file to the specified length */ if(!(list = pdtp__piecelist_create(file, (pdtp_filelen_t)dl->length, dl->piece_length, dl->hashes))) { @@ -217,8 +220,12 @@ pdtp__piecelist_add_to_transaction(txn, list); dl->mplx = pdtp__multiplexer_create(dl->conn); - pdtp__transact_async(dl->conn, dl->mplx, txn, 0, 0, 0); + pdtp__transact_async(dl->conn, dl->mplx, txn, download_transaction_handler, 0, 0); + pdtp__multiplexer_run(dl->mplx); + + pdtp__multiplexer_destroy(dl->mplx); + return; } @@ -229,14 +236,14 @@ #if 0 static void transfer_success_notify(piece_transfer_parameters_t params) { - pdtp_transaction_out_t txn; + pdtp_transaction_out_t txn; - txn = pdtp__transaction_create(TXN_XFERSUCCESS, 3); - pdtp__transaction_add_int32(txn, OBJ_HANDLE, params->dl->handle); - pdtp__transaction_add_int32(txn, OBJ_PIECEID, params->piece_index); - pdtp__transaction_add_int32(txn, OBJ_IPV4ADDR, params->peer_address); - pdtp__transaction_write(params->dl->conn, txn); - pdtp__transaction_out_destroy(txn); + txn = pdtp__transaction_create(TXN_XFERSUCCESS, 3); + pdtp__transaction_add_int32(txn, OBJ_HANDLE, params->dl->handle); + pdtp__transaction_add_int32(txn, OBJ_PIECEID, params->piece_index); + pdtp__transaction_add_int32(txn, OBJ_IPV4ADDR, params->peer_address); + pdtp__transaction_write(params->dl->conn, txn); + pdtp__transaction_out_destroy(txn); } static void transfer_failure_notify(piece_transfer_parameters_t params) @@ -251,3 +258,53 @@ pdtp__transaction_out_destroy(txn); } #endif + +static void download_transaction_handler(const pdtp_transaction_in_t txn, void *arg) +{ + int i; + uint16_t id = pdtp__transaction_id(txn); + uint32_t ip, piece_index; + uint16_t port; + +#ifndef NDEBUG + debug("Download: Incoming transaction (%x)", id); +#endif + + /* XXX More types will definitely be added in the future */ + if(id != TXN_SENDPIECE && id != TXN_RECVPIECE) { +#ifndef NDEBUG + debug("Received bogus transaction through download filter"); +#endif + goto err; + } + + if((i = pdtp__transaction_object_index(txn, OBJ_PIECEID)) < 0) + goto err; + + if(pdtp__transaction_object_uint32(txn, i, &piece_index) < 0) + goto err; + + if((i = pdtp__transaction_object_index(txn, OBJ_IPV4ADDR)) < 0) + goto err; + + if(pdtp__transaction_object_uint32(txn, i, &ip) < 0) + goto err; + + if((i = pdtp__transaction_object_index(txn, OBJ_PORT)) < 0) + goto err; + + if(pdtp__transaction_object_uint16(txn, i, &port) < 0) + goto err; + +#ifndef NDEBUG + debug("Getting piece %d from %08x:%d", piece_index, ip, port); +#endif + + return; +err: +#ifndef NDEBUG + debug("An error occured!"); +#endif + + return; +} Modified: multiplexer.c =================================================================== --- multiplexer.c 2004-12-16 00:00:07 UTC (rev 53) +++ multiplexer.c 2004-12-17 23:31:55 UTC (rev 54) @@ -126,10 +126,10 @@ resize_table(mplx, INITIAL_TABLE_SIZE); - return 0; + return mplx; } -void pdtp_multiplexer_destroy(pdtp_multiplexer_t mplx) +void pdtp__multiplexer_destroy(pdtp_multiplexer_t mplx) { pdtp__free(mplx->fdtable); pdtp__free(mplx->callback_table); @@ -137,12 +137,12 @@ pdtp__free(mplx); } -void pdtp_multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg) +void pdtp__multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg) { add_entry(mplx, sock, type, handler, arg); } -void pdtp_multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock) +void pdtp__multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock) { unsigned i; @@ -158,7 +158,7 @@ #endif } -void pdtp_multiplexer_run(pdtp_multiplexer_t mplx) +void pdtp__multiplexer_run(pdtp_multiplexer_t mplx) { int fd; pdtp_event_handler_t handler; Modified: multiplexer.h =================================================================== --- multiplexer.h 2004-12-16 00:00:07 UTC (rev 53) +++ multiplexer.h 2004-12-17 23:31:55 UTC (rev 54) @@ -11,9 +11,11 @@ typedef int (*pdtp_event_handler_t)(pdtp_sock_t sock, void *arg); pdtp_multiplexer_t pdtp__multiplexer_create(pdtp_connection_t); -void pdtp_multiplexer_destroy(pdtp_multiplexer_t); +void pdtp__multiplexer_destroy(pdtp_multiplexer_t); -void pdtp_multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg); -void pdtp_multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock); +void pdtp__multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg); +void pdtp__multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock); +void pdtp__multiplexer_run(pdtp_multiplexer_t mplx); + #endif Modified: transaction.c =================================================================== --- transaction.c 2004-12-16 00:00:07 UTC (rev 53) +++ transaction.c 2004-12-17 23:31:55 UTC (rev 54) @@ -908,6 +908,37 @@ return 0; } +typedef struct async_transaction_params { + pdtp_connection_t conn; + void (*success_callback)(const pdtp_transaction_in_t, void *); + void *arg; +} *async_transaction_params_t; + +/* Callback for receiving transactions asynchronously */ +static int async_transaction_handler(pdtp_sock_t sock, void *arg) +{ + pdtp_transaction_in_t txn; + async_transaction_params_t params = (async_transaction_params_t)arg; + + pdtp__mutex_lock(params->conn->socket_lock); +#ifndef NDEBUG + debug("+++ socket_lock: pdtp__transaction_read_direct()"); +#endif + + /* XXX Error handling needed here */ + assert((txn = pdtp__transaction_read_header(params->conn))); + + if(txn->total_objs) { + /* XXX Error handling needed here */ + assert(!pdtp__transaction_read_to_object(txn, txn->total_objs - 1)); + } + + params->success_callback(txn, params->arg); + pdtp__transaction_in_destroy(txn); + + return 1; +} + /* Asynchronous transaction interface. A connection and transaction are specified, as well as a multiplexer, success callback, and error callback. This function is currently hardcoded only to support TXN_DOWNLOAD. Passing @@ -928,12 +959,21 @@ given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. */ -void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, void (*success_callback)(pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) +void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, void (*success_callback)(const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) { pdtp_transaction_in_t in; + async_transaction_params_t params; assert(out->id == TXN_DOWNLOAD); - if(!(in = pdtp__transact(conn, out, PDTP_STORE))) { - } + assert((in = pdtp__transact(conn, out, PDTP_STORE))); + + /* XXX Need to figure out a way to GC this */ + params = NEW(async_transaction_params); + params->conn = conn; + params->success_callback = success_callback; + params->arg = arg; + + pdtp__multiplexer_add_sock(mplx, conn->sock, PDTP_MPLX_READ, async_transaction_handler, params); } + Modified: transaction.h =================================================================== --- transaction.h 2004-12-16 00:00:07 UTC (rev 53) +++ transaction.h 2004-12-17 23:31:55 UTC (rev 54) @@ -43,7 +43,7 @@ given multiplexer because it should be unnecessary. Filter rules will be destroyed along with the multiplexer. */ -void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, void (*success_callback)(pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); +void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, void (*success_callback)(const pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); void pdtp__transaction_in_destroy(pdtp_transaction_in_t); pdtp_transaction_type_t pdtp__transaction_type(pdtp_transaction_in_t); |
From: <tar...@pd...> - 2004-12-15 23:59:34
|
Author: tarcieri Date: 2004-12-15 17:00:07 -0700 (Wed, 15 Dec 2004) New Revision: 53 Modified: alloc.c alloc.h collection.c download.c error.c hmac.c multiplexer.c multiplexer.h pdtp.h piecelist.c socketops.c threadops.c transaction.c Log: Changed allocator to realloc semantics, transaction cleanups/bugfixes, and poll()-based multiplexer implementation Modified: alloc.c =================================================================== --- alloc.c 2004-12-14 23:39:56 UTC (rev 52) +++ alloc.c 2004-12-16 00:00:07 UTC (rev 53) @@ -38,7 +38,7 @@ #include <string.h> /* Prototype for the internal allocator */ -static void *pdtp__internal_alloc(int size); +static void *pdtp__internal_alloc(void *ptr, int size); static void pdtp__internal_oom_handler(const char *allocator_name); #ifdef WIN32 @@ -46,7 +46,7 @@ #endif /* Symbols for the internal allocator/deallocator/oom handler */ -void *(*pdtp__alloc)(int size) = pdtp__internal_alloc; +void *(*pdtp__alloc)(void *, int size) = pdtp__internal_alloc; #ifdef WIN32 void (*pdtp__free)(void *ptr) = pdtp__internal_free; #else @@ -60,16 +60,21 @@ abort(); } -static void *pdtp__internal_alloc(int size) +static void *pdtp__internal_alloc(void *ptr, int size) { void *ret; #ifdef WIN32 - if(!(ret = HeapAlloc(GetProcessHeap(), 0, size))) - pdtp__oom_handler("HeapAlloc"); + if(!ptr) { + if(!(ret = HeapAlloc(GetProcessHeap(), 0, size))) + pdtp__oom_handler("HeapAlloc"); + } else { + if(!(ret = HeapReAlloc(GetProcessHeap(), 0, ptr, size))) + pdtp__oom_handler("HeapReAlloc"); + } #else - if(!(ret = malloc(size))) - pdtp__oom_handler("malloc"); + if(!(ret = realloc(ptr, size))) + pdtp__oom_handler("realloc"); #endif /* WIN32 */ return ret; @@ -82,7 +87,7 @@ } #endif -void pdtp_alloc_set(void *(*function)(int)) +void pdtp_alloc_set(void *(*function)(void *, int)) { pdtp__alloc = function; } @@ -99,7 +104,7 @@ char *pdtp__strdup(const char *str) { - char *ret = (char *)pdtp__alloc((unsigned)strlen(str) + 1); + char *ret = (char *)pdtp__alloc(0, (unsigned)strlen(str) + 1); strcpy(ret, str); Modified: alloc.h =================================================================== --- alloc.h 2004-12-14 23:39:56 UTC (rev 52) +++ alloc.h 2004-12-16 00:00:07 UTC (rev 53) @@ -1,20 +1,25 @@ #ifndef PDTP_ALLOC_H #define PDTP_ALLOC_H + /** @file alloc.h - @brief Function pointers to the allocator and deallocator functions used by libpdtp. - These can be changed if necessary using the pdalloc_set() and pdfree_set() functions. + @brief Function pointers to the allocator and deallocator functions used by + libpdtp. These can be changed if necessary using the pdtp_alloc_set() and + pdtp_free_set() functions. */ + /** allocation callback function */ -extern void *(*pdtp__alloc)(int size); +extern void *(*pdtp__alloc)(void *ptr, int size); + /** free callback function */ extern void (*pdtp__free)(void *ptr); + /** out of memory handler callback */ extern void (*pdtp__oom_handler(const char *allocator_name)); /** Macro for allocating an internal structure */ -#define NEW(type) (type##_t)pdtp__alloc(sizeof(struct type)) +#define NEW(type) (type##_t)pdtp__alloc(0, sizeof(struct type)) /* Convenience functions built around the allocator */ /** pdtp version of strdup */ Modified: collection.c =================================================================== --- collection.c 2004-12-14 23:39:56 UTC (rev 52) +++ collection.c 2004-12-16 00:00:07 UTC (rev 53) @@ -64,7 +64,7 @@ ret->n_nodes = 0; ret->n_buckets = primes[0]; - ret->buckets = (pdtp_collection_node_t *)pdtp__alloc(sizeof(pdtp_collection_node_t) * primes[0]); + ret->buckets = (pdtp_collection_node_t *)pdtp__alloc(0, sizeof(pdtp_collection_node_t) * primes[0]); for(i = 0; i < primes[0]; i++) ret->buckets[i] = NULL; @@ -117,7 +117,7 @@ { int i; unsigned j; - pdtp_collection_node_t n, t, *new_buckets = (pdtp_collection_node_t *)pdtp__alloc(sizeof(pdtp_collection_node_t) * bucket_count); + pdtp_collection_node_t n, t, *new_buckets = (pdtp_collection_node_t *)pdtp__alloc(0, sizeof(pdtp_collection_node_t) * bucket_count); for(i = 0; i < bucket_count; i++) new_buckets[i] = NULL; Modified: download.c =================================================================== --- download.c 2004-12-14 23:39:56 UTC (rev 52) +++ download.c 2004-12-16 00:00:07 UTC (rev 53) @@ -168,8 +168,7 @@ if(!(data = pdtp__transaction_object_data(in, i))) goto err; - dl->hashes = (uint8_t *)pdtp__alloc(length); - + dl->hashes = (uint8_t *)pdtp__alloc(0, length); memcpy(dl->hashes, data, length); pdtp__transaction_in_destroy(in); @@ -198,10 +197,8 @@ void pdtp_download_run(pdtp_download_t dl, pdtp_file_t file) { - unsigned i; pdtp_piecelist_t list; - pdtp_transaction_in_t in; - pdtp_transaction_out_t out; + pdtp_transaction_out_t txn; debug("pdtp_download_run() starting"); @@ -215,65 +212,14 @@ return; } - out = pdtp__transaction_create(TXN_DOWNLOAD, 2); - pdtp__transaction_add_string(out, OBJ_PATH, dl->path); - pdtp__piecelist_add_to_transaction(out, list); + txn = pdtp__transaction_create(TXN_DOWNLOAD, 2); + pdtp__transaction_add_string(txn, OBJ_PATH, dl->path); + pdtp__piecelist_add_to_transaction(txn, list); - in = pdtp__transact(dl->conn, out, PDTP_STORE); - pdtp__transaction_out_destroy(out); - - if(!in) { -#ifndef NDEBUG - debug("pdtp_download_run: pdtp__transaciton_wait returned NULL"); -#endif - return; - } - - if(!pdtp__transaction_id(in)) { -#ifndef NDEBUG - debug("pdtp_download_run: Server error beginning download"); -#endif - /* XXX Fix error handling */ - - pdtp__transaction_in_destroy(in); - return; - } - - if((i = pdtp__transaction_object_index(in, OBJ_HANDLE)) < 0) { -#ifndef NDEBUG - debug("No handle object!"); -#endif - goto err; - } - - if(pdtp__transaction_object_uint32(in, i, &dl->handle) < 0) { -#ifndef NDEBUG - debug("Invalid handle object!"); -#endif - goto err; - } - -#ifndef NDEBUG - debug("dl->handle is: %d", dl->handle); -#endif - - pdtp__transaction_in_destroy(in); - dl->mplx = pdtp__multiplexer_create(dl->conn); + pdtp__transact_async(dl->conn, dl->mplx, txn, 0, 0, 0); return; -err: - pdtp_set_last_error(ERROR_DOWNLOAD_INIT); - -#ifndef NDEBUG - debug("Freeing transaction"); -#endif - - pdtp__transaction_in_destroy(in); - -#ifndef NDEBUG - debug("Error running download"); -#endif } void pdtp_download_stop(pdtp_download_t dl) Modified: error.c =================================================================== --- error.c 2004-12-14 23:39:56 UTC (rev 52) +++ error.c 2004-12-16 00:00:07 UTC (rev 53) @@ -188,7 +188,7 @@ #ifdef WIN32 int size = INITIAL_BUFFER; - msg = (char *)pdtp__alloc(size); + msg = (char *)pdtp__alloc(0, size); va_start(ap, format); while(_vsnprintf(msg, size, format, ap) < 0) { @@ -198,7 +198,7 @@ if((size *= 2) > MAX_BUFFER) return; - msg = (char *)pdtp__alloc(size); + msg = (char *)pdtp__alloc(0, size); va_start(ap, format); } @@ -217,7 +217,7 @@ #if !defined(WIN32) && !defined(HAVE_VASPRINTF) int l; - msg = (char *)pdtp__alloc(INITIAL_BUFFER); + msg = (char *)pdtp__alloc(0, INITIAL_BUFFER); va_start(ap, format); l = vsnprintf(msg, INITIAL_BUFFER, format, ap); @@ -230,7 +230,7 @@ l++; pdtp__free(msg); - msg = (char *)pdtp__alloc(l); + msg = (char *)pdtp__alloc(0, l); va_start(ap, format); vsnprintf(msg, l, format, ap); Modified: hmac.c =================================================================== --- hmac.c 2004-12-14 23:39:56 UTC (rev 52) +++ hmac.c 2004-12-16 00:00:07 UTC (rev 53) @@ -54,7 +54,7 @@ pdtp__hasher_add_data(hasher, key, klen); klen = pdtp__hasher_digest_size(hasher); - kptr = (uint8_t *)pdtp__alloc(klen); + kptr = (uint8_t *)pdtp__alloc(0, klen); pdtp__hasher_get_digest(hasher, kptr); } else Modified: multiplexer.c =================================================================== --- multiplexer.c 2004-12-14 23:39:56 UTC (rev 52) +++ multiplexer.c 2004-12-16 00:00:07 UTC (rev 53) @@ -34,7 +34,9 @@ #include "machdep.h" #include "pdtp.h" +#include "alloc.h" #include "connection.h" +#include "debug.h" #include "multiplexer.h" #if defined(HAVE_POLL) && !defined(HAVE_MULTIPLEXER) @@ -46,13 +48,146 @@ /* Initial size of the file descriptor table */ #define INITIAL_TABLE_SIZE 16 +struct callback_table_entry { + pdtp_event_handler_t handler; + void *arg; +}; + +struct pdtp_multiplexer { + int remaining_events, current_entry; + int entry_count, table_size; + struct pollfd *fdtable; + struct callback_table_entry *callback_table; +}; + +static void resize_table(pdtp_multiplexer_t mplx, unsigned size) +{ + mplx->table_size = size; + mplx->fdtable = pdtp__alloc(mplx->fdtable, size * sizeof(struct pollfd)); + mplx->callback_table = pdtp__alloc(mplx->callback_table, size * sizeof(struct callback_table_entry)); +} + +static void add_entry(pdtp_multiplexer_t mplx, int fd, pdtp_event_t type, pdtp_event_handler_t handler, void *arg) +{ + /* If the table is full double its size */ + if(mplx->entry_count == mplx->table_size) + resize_table(mplx, mplx->table_size * 2); + + mplx->fdtable[mplx->entry_count].fd = fd; + mplx->fdtable[mplx->entry_count].revents = 0; + + switch(type) { + case PDTP_MPLX_READ: + mplx->fdtable[mplx->entry_count].events = POLLIN; + break; + case PDTP_MPLX_WRITE: + mplx->fdtable[mplx->entry_count].events = POLLOUT; + break; + } + + mplx->callback_table[mplx->entry_count].handler = handler; + mplx->callback_table[mplx->entry_count].arg = arg; + + mplx->entry_count++; +} + +static void remove_entry(pdtp_multiplexer_t mplx, unsigned index) +{ + /* If the entry isn't the last one plug the hole in the table with + the last entry */ + if(index != --mplx->entry_count) { + mplx->fdtable[index].fd = mplx->fdtable[mplx->entry_count].fd; + mplx->fdtable[index].events = mplx->fdtable[mplx->entry_count].events; + mplx->fdtable[index].revents = mplx->fdtable[mplx->entry_count].revents; + + mplx->callback_table[index].handler = mplx->callback_table[mplx->entry_count].handler; + mplx->callback_table[index].arg = mplx->callback_table[mplx->entry_count].arg; + + /* Ensure events for the last descriptor are processed */ + if(mplx->fdtable[index].revents && mplx->current_entry > index) + mplx->current_entry = index; + } + + /* If table is less than 1/4 full cut its size in half */ + if(mplx->table_size > mplx->entry_count * 4 && mplx->table_size > INITIAL_TABLE_SIZE) + resize_table(mplx, mplx->table_size / 2); +} + pdtp_multiplexer_t pdtp__multiplexer_create(pdtp_connection_t conn) { + pdtp_multiplexer_t mplx = NEW(pdtp_multiplexer); + + mplx->current_entry = 0; + mplx->entry_count = 0; + + /* Initialize to NULL so they'll be allocated in the initial resize */ + mplx->fdtable = 0; + mplx->callback_table = 0; + + resize_table(mplx, INITIAL_TABLE_SIZE); + return 0; } void pdtp_multiplexer_destroy(pdtp_multiplexer_t mplx) { + pdtp__free(mplx->fdtable); + pdtp__free(mplx->callback_table); + + pdtp__free(mplx); } +void pdtp_multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg) +{ + add_entry(mplx, sock, type, handler, arg); +} + +void pdtp_multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock) +{ + unsigned i; + + for(i = 0; i < mplx->entry_count; i++) { + if(mplx->fdtable[i].fd == sock) { + remove_entry(mplx, i); + return; + } + } + +#ifndef NDEBUG + debug("Warning: Request to remove a socket (%d) from a multiplexer it isn't present in", sock); +#endif +} + +void pdtp_multiplexer_run(pdtp_multiplexer_t mplx) +{ + int fd; + pdtp_event_handler_t handler; + void *arg; + + while((mplx->remaining_events = poll(mplx->fdtable, mplx->entry_count, -1)) > 0) { + mplx->current_entry = 0; + + while(mplx->remaining_events > 0 && mplx->current_entry < mplx->entry_count) { + if(!mplx->fdtable[mplx->current_entry].revents) { + mplx->current_entry++; + continue; + } + + mplx->remaining_events--; + fd = mplx->fdtable[mplx->current_entry].fd; + handler = mplx->callback_table[mplx->current_entry].handler; + arg = mplx->callback_table[mplx->current_entry].arg; + + mplx->current_entry++; + + if(!handler(fd, arg)) + return; + } + } + +#ifndef NDEBUG + debug("Multiplexer failure"); +#endif +} + #endif /* defined(HAVE_POLL) && !defined(HAVE_MULTIPLEXER) */ Modified: multiplexer.h =================================================================== --- multiplexer.h 2004-12-14 23:39:56 UTC (rev 52) +++ multiplexer.h 2004-12-16 00:00:07 UTC (rev 53) @@ -7,8 +7,13 @@ #include "connection.h" typedef struct pdtp_multiplexer *pdtp_multiplexer_t; +typedef enum { PDTP_MPLX_READ, PDTP_MPLX_WRITE } pdtp_event_t; +typedef int (*pdtp_event_handler_t)(pdtp_sock_t sock, void *arg); pdtp_multiplexer_t pdtp__multiplexer_create(pdtp_connection_t); void pdtp_multiplexer_destroy(pdtp_multiplexer_t); +void pdtp_multiplexer_add_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock, pdtp_event_t type, pdtp_event_handler_t handler, void *arg); +void pdtp_multiplexer_remove_sock(pdtp_multiplexer_t mplx, pdtp_sock_t sock); + #endif Modified: pdtp.h =================================================================== --- pdtp.h 2004-12-14 23:39:56 UTC (rev 52) +++ pdtp.h 2004-12-16 00:00:07 UTC (rev 53) @@ -428,11 +428,17 @@ this function will never return. */ void pdtp_oom_handler_set(void (*function)(const char *allocator_name)); -/** This allows you to override the internal allocation function used by - libpdtp, which is normally a wrapper around the system malloc(). libpdtp - will not check for a NULL return from the specified allocator function. - The default behavior will be to call abort() if malloc() returns NULL. */ -void pdtp_alloc_set(void *(*function)(int nbytes)); +/** This allows you to override the internal allocator function used by + libpdtp. This function should have the semantics of realloc(). If ptr + is null, a new memory block of size nbytes should be allocated, otherwise + it should return a memory block of size nbytes which includes at least + nbytes worth of original data, and the pointer returned may or may not + be equal to the original pointer passed. This function should never + return NULL, and an out of memory event needs to be handled internally. + The default behavior will be to call abort() if an out of memory event + occurs, or if an OOM handler has been set with pdtp_oom_handler_set then + that function will be invoked if an OOM event occurs */ +void pdtp_alloc_set(void *(*function)(void *ptr, int nbytes)); /** This allows you to override the internal free function, normally the system's free() */ Modified: piecelist.c =================================================================== --- piecelist.c 2004-12-14 23:39:56 UTC (rev 52) +++ piecelist.c 2004-12-16 00:00:07 UTC (rev 53) @@ -187,7 +187,7 @@ pdtp_piecelistnode_t cur; uint32_t *buffer; - buffer = (uint32_t *)pdtp__alloc(list->count * 8); + buffer = (uint32_t *)pdtp__alloc(0, list->count * 8); for(cur = list->head; cur; cur = cur->next) { buffer[i++] = cur->start; Modified: socketops.c =================================================================== --- socketops.c 2004-12-14 23:39:56 UTC (rev 52) +++ socketops.c 2004-12-16 00:00:07 UTC (rev 53) @@ -63,7 +63,7 @@ pdtp_addr_t ret = NEW(pdtp_addr); struct sockaddr_in *sin; - ret->saddr = (struct sockaddr *)pdtp__alloc(sizeof(struct sockaddr_in)); + ret->saddr = (struct sockaddr *)pdtp__alloc(0, sizeof(struct sockaddr_in)); ret->slen = sizeof(struct sockaddr_in); memset(ret->saddr, 0, sizeof(struct sockaddr_in)); sin = (struct sockaddr_in *)ret->saddr; @@ -92,7 +92,7 @@ } ret = NEW(pdtp_addr); - ret->saddr = (struct sockaddr *)pdtp__alloc(sizeof(struct sockaddr_in)); + ret->saddr = (struct sockaddr *)pdtp__alloc(0, sizeof(struct sockaddr_in)); ret->slen = sizeof(struct sockaddr_in); memcpy(ret->saddr, res->ai_addr, sizeof(struct sockaddr_in)); @@ -110,7 +110,7 @@ struct hostent *host; ret = NEW(pdtp_addr); - ret->saddr = (struct sockaddr *)pdtp__alloc(sizeof(struct sockaddr_in)); + ret->saddr = (struct sockaddr *)pdtp__alloc(0, sizeof(struct sockaddr_in)); ret->slen = sizeof(struct sockaddr_in); memset(ret->saddr, 0, sizeof(struct sockaddr_in)); sin = (struct sockaddr_in *)ret->saddr; Modified: threadops.c =================================================================== --- threadops.c 2004-12-14 23:39:56 UTC (rev 52) +++ threadops.c 2004-12-16 00:00:07 UTC (rev 53) @@ -42,13 +42,13 @@ pdtp_mutex_t pdtp__mutex_create(void) { #ifdef HAVE_PTHREADS - pdtp_mutex_t mutex = (pdtp_mutex_t)pdtp__alloc(sizeof(pthread_mutex_t)); + pdtp_mutex_t mutex = (pdtp_mutex_t)pdtp__alloc(0, sizeof(pthread_mutex_t)); pthread_mutex_init(mutex, 0); return mutex; #endif #ifdef WIN32 - pdtp_mutex_t mutex = (pdtp_mutex_t)pdtp__alloc(sizeof(CRITICAL_SECTION)); + pdtp_mutex_t mutex = (pdtp_mutex_t)pdtp__alloc(0, sizeof(CRITICAL_SECTION)); InitializeCriticalSection(mutex); @@ -75,7 +75,7 @@ pdtp_cond_t pdtp__cond_create(void) { #ifdef HAVE_PTHREADS - pdtp_cond_t cond = (pdtp_cond_t)pdtp__alloc(sizeof(pthread_cond_t)); + pdtp_cond_t cond = (pdtp_cond_t)pdtp__alloc(0, sizeof(pthread_cond_t)); pthread_cond_init(cond, 0); return cond; #endif Modified: transaction.c =================================================================== --- transaction.c 2004-12-14 23:39:56 UTC (rev 52) +++ transaction.c 2004-12-16 00:00:07 UTC (rev 53) @@ -48,6 +48,7 @@ #include "socketops.h" #include "threadops.h" #include "transaction.h" +#include "txid.h" /* Include uio.h if we are using UIO */ #ifdef HAVE_UIO @@ -168,8 +169,8 @@ return txn; } - txn->buffer = (uint8_t *)pdtp__alloc(txn->length); - txn->object = (struct pdtp_transaction_object *)pdtp__alloc(txn->total_objs * sizeof(struct pdtp_transaction_object)); + txn->buffer = (uint8_t *)pdtp__alloc(0, txn->length); + txn->object = (struct pdtp_transaction_object *)pdtp__alloc(0, txn->total_objs * sizeof(struct pdtp_transaction_object)); return txn; @@ -443,7 +444,7 @@ if(!(data = pdtp__transaction_object_data(txn, objno))) return -1; - *string = (char *)pdtp__alloc(length + 1); + *string = (char *)pdtp__alloc(0, length + 1); memcpy(*string, data, length); (*string)[length] = '\0'; @@ -536,7 +537,7 @@ if(length < v16) goto done; - str = (char *)pdtp__alloc(v16 + 1); + str = (char *)pdtp__alloc(0, v16 + 1); memcpy(str, bptr, v16); str[v16] = '\0'; *va_arg(ap, char **) = str; @@ -559,14 +560,14 @@ pdtp_transaction_out_t txn = NEW(pdtp_transaction_out); txn->serial = 0; - txn->id = htons(tid); + txn->id = tid; txn->length = 8; txn->obj_count = 0; txn->max_objects = obj_count; #ifdef HAVE_UIO - txn->iov = (struct iovec *)pdtp__alloc((4 + txn->max_objects) * sizeof(struct iovec)); + txn->iov = (struct iovec *)pdtp__alloc(0, (4 + txn->max_objects) * sizeof(struct iovec)); /* This iov_base field gets filled in before the writev call */ txn->iov[0].iov_len = 4; @@ -582,7 +583,7 @@ #endif #ifdef WIN32 - txn->buf = (LPWSABUF)pdtp__alloc((4 + txn->max_objects) * sizeof(WSABUF)); + txn->buf = (LPWSABUF)pdtp__alloc(0, (4 + txn->max_objects) * sizeof(WSABUF)); /* This buf field gets filled in before the WSASend call */ txn->buf[0].len = 4; @@ -598,7 +599,7 @@ #endif #if !defined(HAVE_UIO) && !defined(WIN32) - txn->object = (struct pdtp_transaction_object *)pdtp__alloc(txn->max_objects * sizeof(struct pdtp_transaction_object)); + txn->object = (struct pdtp_transaction_object *)pdtp__alloc(0, txn->max_objects * sizeof(struct pdtp_transaction_object)); #endif return txn; @@ -642,7 +643,7 @@ return; #if defined(HAVE_UIO) || defined(WIN32) - obj_buf = (uint8_t *)pdtp__alloc(6 + length); + obj_buf = (uint8_t *)pdtp__alloc(0, 6 + length); v32 = htonl(length); memcpy(obj_buf, &v32, 4); @@ -665,7 +666,7 @@ #else txn->object[txn->obj_count].obj_len = length; txn->object[txn->obj_count].obj_id = id; - txn->object[txn->obj_count].obj_data = pdtp__alloc(length); + txn->object[txn->obj_count].obj_data = pdtp__alloc(0, length); memcpy(txn->object[txn->obj_count].obj_data, data, length); #endif @@ -723,27 +724,27 @@ case '1': length += 1; node->length = 1; - node->buf = (uint8_t *)pdtp__alloc(1); + node->buf = (uint8_t *)pdtp__alloc(0, 1); *node->buf = va_arg(ap, int); break; case '2': length += 2; node->length = 2; - node->buf = (uint8_t *)pdtp__alloc(2); + node->buf = (uint8_t *)pdtp__alloc(0, 2); v16 = htons((uint16_t)va_arg(ap, int)); memcpy(node->buf, &v16, 2); break; case '4': length += 4; node->length = 4; - node->buf = (uint8_t *)pdtp__alloc(4); + node->buf = (uint8_t *)pdtp__alloc(0, 4); v32 = htonl(va_arg(ap, uint32_t)); memcpy(node->buf, &v32, 4); break; case '8': length += 8; node->length = 8; - node->buf = (uint8_t *)pdtp__alloc(8); + node->buf = (uint8_t *)pdtp__alloc(0, 8); v64 = va_arg(ap, uint64_t); #ifdef HOST_BIGENDIAN memcpy(node->buf, &v64, 8); @@ -759,7 +760,7 @@ l = (unsigned)strlen((char *)bptr); length += l + 2; node->length = l + 2; - node->buf = (uint8_t *)pdtp__alloc(l + 2); + node->buf = (uint8_t *)pdtp__alloc(0, l + 2); v16 = htons((uint16_t)l); memcpy(node->buf, &v16, 2); memcpy(node->buf + 2, bptr, l); @@ -776,7 +777,7 @@ format++; } - bptr = buf = pdtp__alloc(length); + bptr = buf = pdtp__alloc(0, length); while((node = head)) { head = node->link; @@ -801,6 +802,9 @@ uint32_t length; uint16_t obj_count; + txn->serial = htonl(txn->serial); + txn->id = htons(txn->id); + length = htonl(txn->length); obj_count = htons(txn->obj_count); @@ -815,6 +819,9 @@ uint16_t obj_count; DWORD bytes = 0; + txn->serial = htonl(txn->serial); + txn->id = htons(txn->id); + length = htonl(txn->length); obj_count = htons(txn->obj_count); @@ -833,13 +840,16 @@ uint16_t v16; uint32_t v32; - buffer = (uint8_t *)pdtp__alloc(txn->length + 4); + buffer = (uint8_t *)pdtp__alloc(0, txn->length + 4); v32 = htonl(txn->length); memcpy(buffer, &v32, 4); - memcpy(buffer + 4, &txn->serial, 4); - memcpy(buffer + 8, &txn->id, 2); + v32 = htonl(txn->serial); + memcpy(buffer + 4, &v32, 4); + + v16 = htons(txn->id); + memcpy(buffer + 8, &v16, 2); v16 = htons(txn->obj_count); memcpy(buffer + 10, &v16, 2); @@ -920,4 +930,10 @@ */ void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, void (*success_callback)(pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) { + pdtp_transaction_in_t in; + + assert(out->id == TXN_DOWNLOAD); + + if(!(in = pdtp__transact(conn, out, PDTP_STORE))) { + } } |
From: <tar...@pd...> - 2004-12-14 23:39:33
|
Author: tarcieri Date: 2004-12-14 16:39:56 -0700 (Tue, 14 Dec 2004) New Revision: 52 Modified: connection.h download.c transaction.c transaction.h txnmgr.c Log: New asynchronous transaction interface, dependency cleanups, and strict aliasing cleanups Modified: connection.h =================================================================== --- connection.h 2004-12-08 00:12:22 UTC (rev 51) +++ connection.h 2004-12-14 23:39:56 UTC (rev 52) @@ -5,7 +5,6 @@ #include "pdtp.h" #include "socketops.h" #include "threadops.h" -#include "transaction.h" #include "txnmgr.h" /** Modified: download.c =================================================================== --- download.c 2004-12-08 00:12:22 UTC (rev 51) +++ download.c 2004-12-14 23:39:56 UTC (rev 52) @@ -165,7 +165,7 @@ if(piece_count * 16 != length) goto err; - if(pdtp__transaction_object_data(in, i, &data) < 0) + if(!(data = pdtp__transaction_object_data(in, i))) goto err; dl->hashes = (uint8_t *)pdtp__alloc(length); Modified: transaction.c =================================================================== --- transaction.c 2004-12-08 00:12:22 UTC (rev 51) +++ transaction.c 2004-12-14 23:39:56 UTC (rev 52) @@ -351,14 +351,12 @@ return 0; } -int pdtp__transaction_object_data(pdtp_transaction_in_t txn, unsigned objno, void **data) +void *pdtp__transaction_object_data(pdtp_transaction_in_t txn, unsigned objno) { if(pdtp__transaction_read_to_object(txn, objno) < 0) - return -1; + return 0; - *data = txn->object[objno].obj_data; - - return 0; + return txn->object[objno].obj_data; } int pdtp__transaction_object_uint16(pdtp_transaction_in_t txn, unsigned objno, uint16_t *object) @@ -373,7 +371,7 @@ if(length != 2) return -1; - if(pdtp__transaction_object_data(txn, objno, &data) < 0) + if(!(data = pdtp__transaction_object_data(txn, objno))) return -1; memcpy(&value, data, 2); @@ -393,7 +391,7 @@ if(length != 4) return -1; - if(pdtp__transaction_object_data(txn, objno, &data) < 0) + if(!(data = pdtp__transaction_object_data(txn, objno))) return -1; memcpy(&value, data, 4); @@ -418,7 +416,7 @@ if(length != 8) return -1; - if(pdtp__transaction_object_data(txn, objno, (void **)&data) < 0) + if(!(data = pdtp__transaction_object_data(txn, objno))) return -1; #ifdef HOST_BIGENDIAN @@ -442,7 +440,7 @@ if(pdtp__transaction_object_length(txn, objno, &length) < 0) return -1; - if(pdtp__transaction_object_data(txn, objno, &data) < 0) + if(!(data = pdtp__transaction_object_data(txn, objno))) return -1; *string = (char *)pdtp__alloc(length + 1); @@ -469,7 +467,7 @@ if(pdtp__transaction_object_length(txn, objno, &length) < 0) return -1; - if(pdtp__transaction_object_data(txn, objno, (void **)&bptr) < 0) + if(!(bptr = pdtp__transaction_object_data(txn, objno))) return -1; va_start(ap, format); @@ -867,30 +865,6 @@ #endif } -/* This could likely be implemented a bit more cleanly */ -static uint16_t pdtp__transaction_out_object_id(pdtp_transaction_out_t txn, unsigned objno) -{ - uint16_t ret; - -#ifdef HAVE_UIO - assert(objno < txn->obj_count); - - memcpy(&ret, (uint8_t *)txn->iov[objno + 4].iov_base + 4, 2); - ret = htons(ret); -#endif /* HAVE_UIO */ - -#ifdef WIN32 - memcpy(&ret, (uint8_t *)txn->buf[objno + 4].buf + 4, 2); - ret = htons(ret); -#endif /* WIN32 */ - -#if !defined(HAVE_UIO) && !defined(WIN32) - ret = txn->object[txn->obj_count].obj_id; -#endif /* !defined(HAVE_UIO) && !defined(WIN32) */ - - return ret; -} - /* Synchronous transaction interface. A connection and transaction are specified. The returned transaction is the response to the request transaction being sent */ @@ -925,30 +899,25 @@ } /* Asynchronous transaction interface. A connection and transaction are - specified, as well as a multiplexer, object type, and callback. The - specified transaction is sent to the specified connection. When the - specified multiplexer is running, it will invoke the given callback - whenever a transaction is received with an object of the specified - type which is identical to the same object in the request */ -void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, uint16_t obj, void (*callback)(pdtp_transaction_in_t, void *arg), void *arg) -{ - int filter_object_index; + specified, as well as a multiplexer, success callback, and error callback. + This function is currently hardcoded only to support TXN_DOWNLOAD. Passing + a transaction of any other type will result in an assertion failure. - /* Locate the object we'll be filtering by */ - for(filter_object_index = 0; filter_object_index < out->obj_count; filter_object_index++) { - if(pdtp__transaction_out_object_id(out, filter_object_index) == obj) - break; - } + The specified transaction will be synchronously sent to the server. If + an error response is received, the specified error callback will be + invoked with the server's error message. The error callback will also + be invoked if the format of the response transaction is wrong. The + response transaction should include one 32-bit unsigned integer object + which will be used as the key for receiving asynchronous transactions. + If a successful response of the proper format is received, then a filter + is created for the given multiplexer which will invoke the specified + success_callback whenever a transaction is received containing the same + object as received in the initial successful transaction response. - /* assert that at least one of the objects in our outgoing transaction - matches the passed ID. This is what we'll be filtering asynchronous - transaction responses by, so if this doesn't match either the - given transaction was miscontructed or the object ID is wrong */ - assert(filter_object_index < out->obj_count); - - /* XXX Set up filter rules here */ - - pdtp__transaction_write(conn, out); - - return; + The interface currently specifies no way to destroy filter rules for a + given multiplexer because it should be unnecessary. Filter rules will + be destroyed along with the multiplexer. + */ +void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, void (*success_callback)(pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg) +{ } Modified: transaction.h =================================================================== --- transaction.h 2004-12-08 00:12:22 UTC (rev 51) +++ transaction.h 2004-12-14 23:39:56 UTC (rev 52) @@ -24,13 +24,27 @@ pdtp_transaction_in_t pdtp__transact(pdtp_connection_t, pdtp_transaction_out_t, pdtp_read_mode_t); /* Asynchronous transaction interface. A connection and transaction are - specified, as well as a multiplexer, object type, and callback. The - specified transaction is sent to the specified connection. When the - specified multiplexer is running, it will invoke the given callback - whenever a transaction is received with an object of the specified - type which is identical to the same object in the request */ -void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, uint16_t obj, void (*callback)(pdtp_transaction_in_t, void *arg), void *arg); + specified, as well as a multiplexer, success callback, and error callback. + This function is currently hardcoded only to support TXN_DOWNLOAD. Passing + a transaction of any other type will result in an assertion failure. + The specified transaction will be synchronously sent to the server. If + an error response is received, the specified error callback will be + invoked with the server's error message. The error callback will also + be invoked if the format of the response transaction is wrong. The + response transaction should include one 32-bit unsigned integer object + which will be used as the key for receiving asynchronous transactions. + If a successful response of the proper format is received, then a filter + is created for the given multiplexer which will invoke the specified + success_callback whenever a transaction is received containing the same + object as received in the initial successful transaction response. + + The interface currently specifies no way to destroy filter rules for a + given multiplexer because it should be unnecessary. Filter rules will + be destroyed along with the multiplexer. + */ +void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, void (*success_callback)(pdtp_transaction_in_t, void *arg), void (*error_callback)(const char *msg, void *arg), void *arg); + void pdtp__transaction_in_destroy(pdtp_transaction_in_t); pdtp_transaction_type_t pdtp__transaction_type(pdtp_transaction_in_t); uint16_t pdtp__transaction_id(pdtp_transaction_in_t); @@ -39,7 +53,7 @@ int pdtp__transaction_object_index(pdtp_transaction_in_t, uint16_t type); int pdtp__transaction_object_id(pdtp_transaction_in_t, unsigned objno, uint16_t *id); int pdtp__transaction_object_length(pdtp_transaction_in_t, unsigned objno, uint32_t *length); -int pdtp__transaction_object_data(pdtp_transaction_in_t, unsigned objno, void **data); +void *pdtp__transaction_object_data(pdtp_transaction_in_t txn, unsigned objno); int pdtp__transaction_object_uint16(pdtp_transaction_in_t, unsigned objno, uint16_t *object); int pdtp__transaction_object_uint32(pdtp_transaction_in_t, unsigned objno, uint32_t *object); int pdtp__transaction_object_uint64(pdtp_transaction_in_t, unsigned objno, uint64_t *object); Modified: txnmgr.c =================================================================== --- txnmgr.c 2004-12-08 00:12:22 UTC (rev 51) +++ txnmgr.c 2004-12-14 23:39:56 UTC (rev 52) @@ -35,6 +35,7 @@ #include "pdtp.h" #include "alloc.h" +#include "collection.h" #include "threadops.h" #include "txnmgr.h" @@ -42,6 +43,12 @@ /* Connection serial number and respective lock */ uint32_t serial; pdtp_mutex_t serial_lock; + + pdtp_collection_t serial_collection; + pdtp_mutex_t serial_collection_lock; + + pdtp_collection_t handle_collection; + pdtp_mutex_t handle_collection_lock; }; pdtp_txnmgr_t pdtp__txnmgr_create(void) |
From: <tar...@pd...> - 2004-12-08 00:11:56
|
Author: tarcieri Date: 2004-12-07 17:12:22 -0700 (Tue, 07 Dec 2004) New Revision: 51 Modified: download.c make-machdep.h.sh multiplexer.c multiplexer.h transaction.c transaction.h txnmgr.h Log: More modifications for download support Modified: download.c =================================================================== --- download.c 2004-12-06 19:37:43 UTC (rev 50) +++ download.c 2004-12-08 00:12:22 UTC (rev 51) @@ -44,6 +44,7 @@ #include "errmsg.h" #include "error.h" #include "fileops.h" +#include "multiplexer.h" #include "piecelist.h" #include "transaction.h" #include "txid.h" @@ -71,6 +72,9 @@ /* Connection associated with download */ pdtp_connection_t conn; + /* Multiplexer associated with download */ + pdtp_multiplexer_t mplx; + /* File being downloaded */ pdtp_file_t file; }; @@ -255,6 +259,8 @@ pdtp__transaction_in_destroy(in); + dl->mplx = pdtp__multiplexer_create(dl->conn); + return; err: pdtp_set_last_error(ERROR_DOWNLOAD_INIT); @@ -265,7 +271,6 @@ pdtp__transaction_in_destroy(in); - /* XXX Yipe, we really need to do something here */ #ifndef NDEBUG debug("Error running download"); #endif Modified: make-machdep.h.sh =================================================================== --- make-machdep.h.sh 2004-12-06 19:37:43 UTC (rev 50) +++ make-machdep.h.sh 2004-12-08 00:12:22 UTC (rev 51) @@ -12,7 +12,7 @@ echo "#ifndef MACHDEP_H" > machdep.h echo "#define MACHDEP_H" >> machdep.h echo "/** @file machdep.h" >> machdep.h -echo "* @brief machine dependancys (auto-generated)">> machdep.h +echo "* @brief machine dependancies (auto-generated)">> machdep.h echo "*/" >> machdep.h echo >> machdep.h Modified: multiplexer.c =================================================================== --- multiplexer.c 2004-12-06 19:37:43 UTC (rev 50) +++ multiplexer.c 2004-12-08 00:12:22 UTC (rev 51) @@ -34,4 +34,25 @@ #include "machdep.h" #include "pdtp.h" +#include "connection.h" #include "multiplexer.h" + +#if defined(HAVE_POLL) && !defined(HAVE_MULTIPLEXER) +#define HAVE_MULTIPLEXER + +#include <sys/poll.h> +#include <unistd.h> + +/* Initial size of the file descriptor table */ +#define INITIAL_TABLE_SIZE 16 + +pdtp_multiplexer_t pdtp__multiplexer_create(pdtp_connection_t conn) +{ + return 0; +} + +void pdtp_multiplexer_destroy(pdtp_multiplexer_t mplx) +{ +} + +#endif /* defined(HAVE_POLL) && !defined(HAVE_MULTIPLEXER) */ Modified: multiplexer.h =================================================================== --- multiplexer.h 2004-12-06 19:37:43 UTC (rev 50) +++ multiplexer.h 2004-12-08 00:12:22 UTC (rev 51) @@ -1,6 +1,14 @@ #ifndef MULTIPLEXER_H #define MULTIPLEXER_H +#include "machdep.h" +#include "pdtp.h" + +#include "connection.h" + typedef struct pdtp_multiplexer *pdtp_multiplexer_t; +pdtp_multiplexer_t pdtp__multiplexer_create(pdtp_connection_t); +void pdtp_multiplexer_destroy(pdtp_multiplexer_t); + #endif Modified: transaction.c =================================================================== --- transaction.c 2004-12-06 19:37:43 UTC (rev 50) +++ transaction.c 2004-12-08 00:12:22 UTC (rev 51) @@ -36,6 +36,7 @@ #include "sockets.h" #include <sys/types.h> +#include <assert.h> #include <stdarg.h> #include <string.h> #include <time.h> @@ -186,7 +187,7 @@ /* Ensure that the transaction's objects are populated to at least the specified object index. */ -int pdtp__transaction_read_to_object(pdtp_transaction_in_t txn, unsigned obj) +static int pdtp__transaction_read_to_object(pdtp_transaction_in_t txn, unsigned obj) { int len; uint16_t v16; @@ -655,7 +656,7 @@ #ifdef HAVE_UIO txn->iov[txn->obj_count + 4].iov_len = 6 + length; - txn->iov[txn->obj_count + 4].iov_base = (char *)obj_buf; + txn->iov[txn->obj_count + 4].iov_base = obj_buf; #endif #ifdef WIN32 @@ -805,11 +806,11 @@ length = htonl(txn->length); obj_count = htons(txn->obj_count); - txn->iov[0].iov_base = (char *)&length; - txn->iov[3].iov_base = (char *)&obj_count; + txn->iov[0].iov_base = &length; + txn->iov[3].iov_base = &obj_count; return writev(conn->sock, txn->iov, 4 + txn->obj_count); -#endif +#endif /* HAVE_UIO */ #ifdef WIN32 uint32_t length; @@ -826,7 +827,7 @@ return -1; return bytes; -#endif +#endif /* WIN32 */ #if !defined(HAVE_UIO) && !defined(WIN32) int i, ret; @@ -866,6 +867,30 @@ #endif } +/* This could likely be implemented a bit more cleanly */ +static uint16_t pdtp__transaction_out_object_id(pdtp_transaction_out_t txn, unsigned objno) +{ + uint16_t ret; + +#ifdef HAVE_UIO + assert(objno < txn->obj_count); + + memcpy(&ret, (uint8_t *)txn->iov[objno + 4].iov_base + 4, 2); + ret = htons(ret); +#endif /* HAVE_UIO */ + +#ifdef WIN32 + memcpy(&ret, (uint8_t *)txn->buf[objno + 4].buf + 4, 2); + ret = htons(ret); +#endif /* WIN32 */ + +#if !defined(HAVE_UIO) && !defined(WIN32) + ret = txn->object[txn->obj_count].obj_id; +#endif /* !defined(HAVE_UIO) && !defined(WIN32) */ + + return ret; +} + /* Synchronous transaction interface. A connection and transaction are specified. The returned transaction is the response to the request transaction being sent */ @@ -905,6 +930,25 @@ specified multiplexer is running, it will invoke the given callback whenever a transaction is received with an object of the specified type which is identical to the same object in the request */ -void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t txn, uint16_t obj, void (*callback)(void *arg), void *arg) +void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t out, uint16_t obj, void (*callback)(pdtp_transaction_in_t, void *arg), void *arg) { + int filter_object_index; + + /* Locate the object we'll be filtering by */ + for(filter_object_index = 0; filter_object_index < out->obj_count; filter_object_index++) { + if(pdtp__transaction_out_object_id(out, filter_object_index) == obj) + break; + } + + /* assert that at least one of the objects in our outgoing transaction + matches the passed ID. This is what we'll be filtering asynchronous + transaction responses by, so if this doesn't match either the + given transaction was miscontructed or the object ID is wrong */ + assert(filter_object_index < out->obj_count); + + /* XXX Set up filter rules here */ + + pdtp__transaction_write(conn, out); + + return; } Modified: transaction.h =================================================================== --- transaction.h 2004-12-06 19:37:43 UTC (rev 50) +++ transaction.h 2004-12-08 00:12:22 UTC (rev 51) @@ -29,7 +29,7 @@ specified multiplexer is running, it will invoke the given callback whenever a transaction is received with an object of the specified type which is identical to the same object in the request */ -void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, uint16_t obj, void (*callback)(void *arg), void *arg); +void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, uint16_t obj, void (*callback)(pdtp_transaction_in_t, void *arg), void *arg); void pdtp__transaction_in_destroy(pdtp_transaction_in_t); pdtp_transaction_type_t pdtp__transaction_type(pdtp_transaction_in_t); Modified: txnmgr.h =================================================================== --- txnmgr.h 2004-12-06 19:37:43 UTC (rev 50) +++ txnmgr.h 2004-12-08 00:12:22 UTC (rev 51) @@ -1,8 +1,6 @@ #ifndef TXNMGR_H #define TXNMGR_H -#include "transaction.h" - typedef struct pdtp_txnmgr *pdtp_txnmgr_t; pdtp_txnmgr_t pdtp__txnmgr_create(void); |
From: <tar...@pd...> - 2004-12-06 19:37:20
|
Author: tarcieri Date: 2004-12-06 12:37:43 -0700 (Mon, 06 Dec 2004) New Revision: 50 Modified: pdtp.c transaction.c Log: Fixed PDTP_FETCH read mode and made it the default for directory listings Modified: pdtp.c =================================================================== --- pdtp.c 2004-12-06 06:27:02 UTC (rev 49) +++ pdtp.c 2004-12-06 19:37:43 UTC (rev 50) @@ -143,7 +143,7 @@ printf("Listing directory: %s\n", cwd); - if(!(dir = pdtp_get_directory_listing(conn, cwd, PDTP_STORE))) { + if(!(dir = pdtp_get_directory_listing(conn, cwd, PDTP_FETCH))) { print_error(); return; } Modified: transaction.c =================================================================== --- transaction.c 2004-12-06 06:27:02 UTC (rev 49) +++ transaction.c 2004-12-06 19:37:43 UTC (rev 50) @@ -305,7 +305,7 @@ unsigned pdtp__transaction_object_count(pdtp_transaction_in_t txn) { - return txn->obj_count; + return txn->total_objs; } int pdtp__transaction_object_index(pdtp_transaction_in_t txn, uint16_t id) |
From: <tar...@pd...> - 2004-12-06 06:26:50
|
Author: tarcieri Date: 2004-12-05 23:27:02 -0700 (Sun, 05 Dec 2004) New Revision: 49 Modified: directory.c download.c transaction.c transaction.h Log: Migration of download and directory code to new transaction interface Modified: directory.c =================================================================== --- directory.c 2004-12-04 23:33:16 UTC (rev 48) +++ directory.c 2004-12-06 06:27:02 UTC (rev 49) @@ -57,22 +57,16 @@ /* Send a request for a directory listing */ out = pdtp__transaction_create(TXN_DIRLIST, 1); - - /* XXX Disabled pending new transaction handling interface */ -#if 0 - notifier = pdtp__transaction_set_serial(out, conn); pdtp__transaction_add_string(out, OBJ_PATH, path); - pdtp__transaction_write(conn, out); + + in = pdtp__transact(conn, out, mode); pdtp__transaction_out_destroy(out); - /* Retrieve the transaction by the notifier interface */ - if(!(in = pdtp__transaction_wait(conn, notifier, mode))) { + if(!in) { /* XXX The connection was lost, handle this */ return 0; } -#endif - return 0; /* XXX Can't do anything for now */ - + if(!pdtp__transaction_id(in)) { if((index = pdtp__transaction_object_index(in, OBJ_ERRMSG)) < 0 || pdtp__transaction_object_string(in, index, &errstr) < 0) pdtp_set_last_error(ERROR_DIRLIST); Modified: download.c =================================================================== --- download.c 2004-12-04 23:33:16 UTC (rev 48) +++ download.c 2004-12-06 06:27:02 UTC (rev 49) @@ -107,16 +107,12 @@ /* Send a request for file information */ out = pdtp__transaction_create(TXN_FILEINFO, 1); - - /* XXX Disabled pending new transaction handling interface */ -#if 0 - notifier = pdtp__transaction_set_serial(out, conn); pdtp__transaction_add_string(out, OBJ_PATH, path); - pdtp__transaction_write(conn, out); + + in = pdtp__transact(conn, out, PDTP_STORE); pdtp__transaction_out_destroy(out); - /* Read the server's reply */ - if(!(in = pdtp__transaction_wait(conn, notifier, PDTP_STORE))) { + if(!in) { pdtp_set_last_error(ERROR_FILEINFO); return 0; } @@ -130,8 +126,6 @@ pdtp__transaction_in_destroy(in); return 0; } -#endif - return 0; /* Can't do anything for now */ dl = NEW(pdtp_download); @@ -201,7 +195,6 @@ void pdtp_download_run(pdtp_download_t dl, pdtp_file_t file) { unsigned i; - uint32_t value; pdtp_piecelist_t list; pdtp_transaction_in_t in; pdtp_transaction_out_t out; @@ -219,24 +212,18 @@ } out = pdtp__transaction_create(TXN_DOWNLOAD, 2); - - /* XXX Disabled pending new transaction handling interface */ -#if 0 - notifier = pdtp__transaction_set_serial(out, dl->conn); pdtp__transaction_add_string(out, OBJ_PATH, dl->path); pdtp__piecelist_add_to_transaction(out, list); - pdtp__transaction_write(dl->conn, out); - pdtp__transaction_out_destroy(out); - pdtp__piecelist_destroy(list); - if(!(in = pdtp__transaction_wait(dl->conn, notifier, PDTP_STORE))) { + in = pdtp__transact(dl->conn, out, PDTP_STORE); + pdtp__transaction_out_destroy(out); + + if(!in) { #ifndef NDEBUG debug("pdtp_download_run: pdtp__transaciton_wait returned NULL"); #endif return; } -#endif - return; if(!pdtp__transaction_id(in)) { #ifndef NDEBUG Modified: transaction.c =================================================================== --- transaction.c 2004-12-04 23:33:16 UTC (rev 48) +++ transaction.c 2004-12-06 06:27:02 UTC (rev 49) @@ -275,32 +275,6 @@ return -1; } -/* Synchronous transaction interface. A connection and transaction are - specified. The returned transaction is the response to the request - transaction being sent */ -pdtp_transaction_in_t pdtp__transact(pdtp_connection_t conn, pdtp_transaction_out_t out) -{ - /* XXX This implementation is temporary as it is incompatible with - pdtp__transact_async */ - - uint32_t serial; - pdtp_transaction_in_t in = 0; - - serial = pdtp__txnmgr_get_serial(conn->txnmgr); - - return in; -} - -/* Asynchronous transaction interface. A connection and transaction are - specified, as well as a multiplexer, object type, and callback. The - specified transaction is sent to the specified connection. When the - specified multiplexer is running, it will invoke the given callback - whenever a transaction is received with an object of the specified - type which is identical to the same object in the request */ -void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t txn, uint16_t obj, void (*callback)(void *arg), void *arg) -{ -} - void pdtp__transaction_in_destroy(pdtp_transaction_in_t txn) { if(txn->total_objs) { @@ -891,3 +865,46 @@ return ret; #endif } + +/* Synchronous transaction interface. A connection and transaction are + specified. The returned transaction is the response to the request + transaction being sent */ +pdtp_transaction_in_t pdtp__transact(pdtp_connection_t conn, pdtp_transaction_out_t out, pdtp_read_mode_t mode) +{ + pdtp_transaction_in_t in; + + /* XXX This implementation is temporary as it is incompatible with + pdtp__transact_async */ + + out->serial = pdtp__txnmgr_get_serial(conn->txnmgr); + pdtp__transaction_write(conn, out); + + pdtp__mutex_lock(conn->socket_lock); +#ifndef NDEBUG + debug("+++ socket_lock: pdtp__transaction_read_direct()"); +#endif + + if(!(in = pdtp__transaction_read_header(conn))) + goto err; + + if(mode == PDTP_STORE && in->total_objs) { + if(pdtp__transaction_read_to_object(in, in->total_objs - 1) < 0) + goto err; + } + + return in; +err: + /* XXX Mark the connection as bad and return 0. IMPORTANT: Ensure + conn->socket_lock is released before function returns. */ + return 0; +} + +/* Asynchronous transaction interface. A connection and transaction are + specified, as well as a multiplexer, object type, and callback. The + specified transaction is sent to the specified connection. When the + specified multiplexer is running, it will invoke the given callback + whenever a transaction is received with an object of the specified + type which is identical to the same object in the request */ +void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t txn, uint16_t obj, void (*callback)(void *arg), void *arg) +{ +} Modified: transaction.h =================================================================== --- transaction.h 2004-12-04 23:33:16 UTC (rev 48) +++ transaction.h 2004-12-06 06:27:02 UTC (rev 49) @@ -21,7 +21,7 @@ /* Synchronous transaction interface. A connection and transaction are specified. The returned transaction is the response to the request transaction being sent */ -pdtp_transaction_in_t pdtp__transact(pdtp_connection_t, pdtp_transaction_out_t); +pdtp_transaction_in_t pdtp__transact(pdtp_connection_t, pdtp_transaction_out_t, pdtp_read_mode_t); /* Asynchronous transaction interface. A connection and transaction are specified, as well as a multiplexer, object type, and callback. The |
From: <tar...@pd...> - 2004-12-04 23:32:59
|
Author: tarcieri Date: 2004-12-04 16:33:16 -0700 (Sat, 04 Dec 2004) New Revision: 48 Modified: connection.c connection.h transaction.c transaction.h txnmgr.c txnmgr.h Log: New transaction handling code improvements Modified: connection.c =================================================================== --- connection.c 2004-11-23 23:33:02 UTC (rev 47) +++ connection.c 2004-12-04 23:33:16 UTC (rev 48) @@ -104,7 +104,7 @@ connection->addr = addr; connection->socket_lock = pdtp__mutex_create(); - connection->txnmgr = pdtp__transaction_manager_create(); + connection->txnmgr = pdtp__txnmgr_create(); pdtp__set_error_ptr(0); return connection; @@ -124,7 +124,7 @@ pdtp__socket_close(connection->sock); pdtp__mutex_destroy(connection->socket_lock); - pdtp__transaction_manager_destroy(connection->txnmgr); + pdtp__txnmgr_destroy(connection->txnmgr); pdtp__free(connection); } Modified: connection.h =================================================================== --- connection.h 2004-11-23 23:33:02 UTC (rev 47) +++ connection.h 2004-12-04 23:33:16 UTC (rev 48) @@ -24,7 +24,7 @@ pdtp_addr_t addr; pdtp_mutex_t socket_lock; - pdtp_transaction_manager_t txnmgr; + pdtp_txnmgr_t txnmgr; }; #endif /* CONNECTION_H */ Modified: transaction.c =================================================================== --- transaction.c 2004-11-23 23:33:02 UTC (rev 47) +++ transaction.c 2004-12-04 23:33:16 UTC (rev 48) @@ -158,10 +158,10 @@ if(txn->length > 0) goto err1; - /* Nothing left to read. Unlock the connection */ - pdtp__mutex_unlock(conn->socket_lock); + /* Nothing left to read. Unlock the connection */ + pdtp__mutex_unlock(conn->socket_lock); #ifndef NDEBUG - debug("--- socket_lock: pdtp__transaction_read_direct()"); + debug("--- socket_lock: pdtp__transaction_read_direct()"); #endif return txn; @@ -264,17 +264,43 @@ /* XXX At this point we've lost the connection or encountered a protocol error at which point we should mark the connection as lost */ - + pdtp__mutex_unlock(txn->connection->socket_lock); #ifndef NDEBUG debug("--- socket_lock: pdtp__transaction_read_to_object()"); #endif - + txn->connection = 0; return -1; } +/* Synchronous transaction interface. A connection and transaction are + specified. The returned transaction is the response to the request + transaction being sent */ +pdtp_transaction_in_t pdtp__transact(pdtp_connection_t conn, pdtp_transaction_out_t out) +{ + /* XXX This implementation is temporary as it is incompatible with + pdtp__transact_async */ + + uint32_t serial; + pdtp_transaction_in_t in = 0; + + serial = pdtp__txnmgr_get_serial(conn->txnmgr); + + return in; +} + +/* Asynchronous transaction interface. A connection and transaction are + specified, as well as a multiplexer, object type, and callback. The + specified transaction is sent to the specified connection. When the + specified multiplexer is running, it will invoke the given callback + whenever a transaction is received with an object of the specified + type which is identical to the same object in the request */ +void pdtp__transact_async(pdtp_connection_t conn, pdtp_multiplexer_t mplx, pdtp_transaction_out_t txn, uint16_t obj, void (*callback)(void *arg), void *arg) +{ +} + void pdtp__transaction_in_destroy(pdtp_transaction_in_t txn) { if(txn->total_objs) { @@ -322,11 +348,11 @@ if(txn->current_obj == txn->total_objs || txn->object[txn->current_obj].obj_id != id) return -1; - + #ifndef NDEBUG debug("txn->object[%d] is %d", txn->current_obj, id); #endif - + return txn->current_obj; } @@ -437,7 +463,7 @@ uint32_t length; void *data; - + if(pdtp__transaction_object_length(txn, objno, &length) < 0) return -1; @@ -633,20 +659,6 @@ pdtp__free(txn); } -void pdtp__transaction_set_serial(pdtp_transaction_out_t txn, pdtp_connection_t conn) -{ - uint32_t serial; - -#if 0 - pdtp__mutex_lock(conn->txnmgr->serial_lock); - serial = conn->txnmgr->serial++; - txn->serial = htonl(serial); - pdtp__mutex_unlock(conn->txnmgr->serial_lock); - - return pdtp__create_serial_notifier(conn, serial); -#endif -} - void pdtp__transaction_add_object(pdtp_transaction_out_t txn, uint16_t id, const void *data, uint32_t length) { uint16_t v16; Modified: transaction.h =================================================================== --- transaction.h 2004-11-23 23:33:02 UTC (rev 47) +++ transaction.h 2004-12-04 23:33:16 UTC (rev 48) @@ -19,8 +19,8 @@ } pdtp_transaction_type_t; /* Synchronous transaction interface. A connection and transaction are - specified, and when a response transaction whose serial matches the - request is received on the connection, it is returned */ + specified. The returned transaction is the response to the request + transaction being sent */ pdtp_transaction_in_t pdtp__transact(pdtp_connection_t, pdtp_transaction_out_t); /* Asynchronous transaction interface. A connection and transaction are @@ -48,7 +48,6 @@ pdtp_transaction_out_t pdtp__transaction_create(uint16_t tid, uint16_t obj_count); void pdtp__transaction_out_destroy(pdtp_transaction_out_t); -void pdtp__transaction_set_serial(pdtp_transaction_out_t txn, pdtp_connection_t conn); void pdtp__transaction_add_object(pdtp_transaction_out_t, uint16_t id, const void *data, uint32_t length); void pdtp__transaction_add_int16(pdtp_transaction_out_t, uint16_t id, uint16_t value); void pdtp__transaction_add_int32(pdtp_transaction_out_t, uint16_t id, uint32_t value); Modified: txnmgr.c =================================================================== --- txnmgr.c 2004-11-23 23:33:02 UTC (rev 47) +++ txnmgr.c 2004-12-04 23:33:16 UTC (rev 48) @@ -38,17 +38,17 @@ #include "threadops.h" #include "txnmgr.h" -struct pdtp_transaction_manager { +struct pdtp_txnmgr { /* Connection serial number and respective lock */ uint32_t serial; pdtp_mutex_t serial_lock; }; -pdtp_transaction_manager_t pdtp__transaction_manager_create(void) +pdtp_txnmgr_t pdtp__txnmgr_create(void) { - pdtp_transaction_manager_t txnmgr; + pdtp_txnmgr_t txnmgr; - txnmgr = NEW(pdtp_transaction_manager); + txnmgr = NEW(pdtp_txnmgr); /* Initialize serial counter and respective lock */ txnmgr->serial = 1; @@ -57,10 +57,30 @@ return txnmgr; } -void pdtp__transaction_manager_destroy(pdtp_transaction_manager_t txnmgr) +void pdtp__txnmgr_destroy(pdtp_txnmgr_t txnmgr) { pdtp__mutex_destroy(txnmgr->serial_lock); /* Free the structure itself */ pdtp__free(txnmgr); } + +/* XXX This implementation should be replaced by one that recycles used + transaction serial numbers */ +uint32_t pdtp__txnmgr_get_serial(pdtp_txnmgr_t txnmgr) +{ + uint32_t serial; + + pdtp__mutex_lock(txnmgr->serial_lock); + serial = txnmgr->serial++; + pdtp__mutex_unlock(txnmgr->serial_lock); + + return serial; +} + +/* XXX This will be used by a new implementation that recycles used + transaction serial numbers. Since we aren't doing that, right now + this is just a stub */ +void pdtp__txnmgr_free_serial(pdtp_txnmgr_t txnmgr, uint32_t serial) +{ +} Modified: txnmgr.h =================================================================== --- txnmgr.h 2004-11-23 23:33:02 UTC (rev 47) +++ txnmgr.h 2004-12-04 23:33:16 UTC (rev 48) @@ -3,9 +3,12 @@ #include "transaction.h" -typedef struct pdtp_transaction_manager *pdtp_transaction_manager_t; +typedef struct pdtp_txnmgr *pdtp_txnmgr_t; -pdtp_transaction_manager_t pdtp__transaction_manager_create(void); -void pdtp__transaction_manager_destroy(pdtp_transaction_manager_t txnmgr); +pdtp_txnmgr_t pdtp__txnmgr_create(void); +void pdtp__txnmgr_destroy(pdtp_txnmgr_t); +uint32_t pdtp__txnmgr_get_serial(pdtp_txnmgr_t); +void pdtp__txnmgr_free_serial(pdtp_txnmgr_t, uint32_t serial); + #endif /* TXNMGR_H */ |
From: <tar...@pd...> - 2004-11-23 23:33:15
|
Author: tarcieri Date: 2004-11-23 16:33:02 -0700 (Tue, 23 Nov 2004) New Revision: 47 Modified: download.c pdtp.c pdtp.h Log: Library interface changes Modified: download.c =================================================================== --- download.c 2004-11-23 22:57:56 UTC (rev 46) +++ download.c 2004-11-23 23:33:02 UTC (rev 47) @@ -73,9 +73,6 @@ /* File being downloaded */ pdtp_file_t file; - - /* Callback table associated with download */ - pdtp_callback_table_t *callback; }; /* This structure stores piece transfer initialization parameters */ @@ -181,7 +178,6 @@ dl->conn = conn; dl->file = 0; - dl->callback = 0; return dl; err: @@ -202,7 +198,7 @@ { } -void pdtp_download_run(pdtp_download_t dl, pdtp_file_t file, pdtp_callback_table_t *table, void *ptr) +void pdtp_download_run(pdtp_download_t dl, pdtp_file_t file) { unsigned i; uint32_t value; Modified: pdtp.c =================================================================== --- pdtp.c 2004-11-23 22:57:56 UTC (rev 46) +++ pdtp.c 2004-11-23 23:33:02 UTC (rev 47) @@ -271,7 +271,7 @@ return; } - pdtp_download_run(dl, file, 0, 0); + pdtp_download_run(dl, file); pdtp_download_destroy(dl); pdtp_fclose(file); Modified: pdtp.h =================================================================== --- pdtp.h 2004-11-23 22:57:56 UTC (rev 46) +++ pdtp.h 2004-11-23 23:33:02 UTC (rev 47) @@ -228,72 +228,7 @@ * @{ */ - -/** Function table for event notification. This table is passed to the - pdtp_download_run() function. The last argument of pdtp_download_run() - is a void pointer which will be passed as the last argument to any - function in this table. */ -typedef struct { - /** This is invoked when we actually begin the download */ - void (*download_begin)(int pieces_remaining, int pieces_total, void *ptr); - - /** This is invoked when the download has been successfully completed */ - void (*download_complete)(void *ptr); - - /** This is invoked if a download fails. If this occurs, - pdtp_download_run() will also exit */ - void (*download_fail)(const char *message, void *ptr); - - /** This is invoked when we attempt to connect to a peer */ - void (*peer_connect)(const char *address, int port, void *ptr); - - /** This is invoked when we successfully connect to a peer */ - void (*peer_connect_complete)(const char *address, int port, void *ptr); - - /** This is invoked if a connection to a peer fails */ - void (*peer_connect_fail)(const char *address, int port, const char *errmsg, void *ptr); - - /** This is invoked when we disconnect from a peer */ - void (*peer_disconnect)(char *address, int port, void *ptr); - - /** This is invoked when a peer connects to us */ - void (*incoming_connect)(char *address, int port, void *ptr); - - /** This is invoked when a peer disconnects from us */ - void (*incoming_disconnect)(char *address, int port, void *ptr); - - /** This is invoked when we begin to download a piece */ - void (*piece_download)(char *address, int port, int piece, int length, void *ptr); - - /** This is invoked when we begin to upload a piece */ - void (*piece_upload)(char *address, int port, int piece, int length, void *ptr); - - /** This is invoked when a piece download completes */ - void (*piece_download_complete)(char *address, int port, int piece, void *ptr); - - /** This is invoked when a piece upload completes */ - void (*piece_upload_complete)(char *address, int port, int piece, void *ptr); - - /** This is invoked whenever we download a portion of a piece */ - void (*piece_chunk_download)(char *address, int port, int piece, int length, void *ptr); - - /** This is invoked whenever we upload a portion of a piece */ - void (*piece_chunk_upload)(char *address, int port, int piece, int length, void *ptr); - - /** This is invoked when we begin checksumming a piece */ - void (*piece_checksum)(int piece, int length, void *ptr); - - /** This is invoked if a piece checksum matches */ - void (*piece_checksum_complete)(int piece, void *ptr); - - /** This is invoked if a piece checksum mismatches */ - void (*piece_checksum_fail)(int piece, void *ptr); - - /** This is invoked whenever we MD5 checksum a piece */ - void (*piece_chunk_scan)(int piece, int length, void *ptr); -} pdtp_callback_table_t; - -/** This initiates a new download */ +/** This initiates a new download for the given file on the PDTP server */ pdtp_download_t pdtp_download_create(pdtp_connection_t, const char *path); /** This will free the information associated with a download */ @@ -337,12 +272,107 @@ the given file while this function is active. This function does not handle closing the given file pointer, that is left up to the programmer using the library. */ -void pdtp_download_run(pdtp_download_t, pdtp_file_t, pdtp_callback_table_t *, void *ptr); +void pdtp_download_run(pdtp_download_t, pdtp_file_t); /** This function may be invoked from another thread to gracefully terminate an active transfer */ void pdtp_download_stop(pdtp_download_t); +/** The following functions let you define callbacks which are invoked when + various events occur during the download. */ + +/** This is invoked when the download actually begins. It specifies how many + total pieces remain in the download and how many have been completed */ +typedef void (*pdtp_download_begin_handler_t)(int pieces_remaining, int pieces_total, void *ptr); + +void pdtp_download_set_begin_handler(pdtp_download_t, pdtp_download_begin_handler_t, void *ptr); + +/** This is invoked when the download has been successfully completed */ +typedef void (*pdtp_download_complete_handler_t)(void *ptr); + +void pdtp_download_set_complete_handler(pdtp_download_t, pdtp_download_complete_handler_t, void *ptr); + +/** This is invoked if a download fails. If this occurs, pdtp_download_run() + will also exit */ +typedef void (*pdtp_download_fail_handler_t)(const char *message, void *ptr); + +void pdtp_download_set_fail_handler(pdtp_download_t, pdtp_download_fail_handler_t, void *ptr); + +/** This is invoked when we begin checksumming a piece */ +typedef void (*pdtp_piece_checksum_handler_t)(int piece, int length, void *ptr); + +void pdtp_download_set_piece_checksum_handler(pdtp_download_t, pdtp_piece_checksum_handler_t, void *ptr); + +/** This is invoked if a piece checksum matches */ +typedef void (*pdtp_piece_checksum_pass_handler_t)(int piece, void *ptr); + +void pdtp_download_set_piece_checksum_pass_handler(pdtp_download_t, pdtp_piece_checksum_pass_handler_t, void *ptr); + +/** This is invoked if a piece checksum mismatches */ +typedef void (*pdtp_piece_checksum_fail_handler_t)(int piece, void *ptr); + +void pdtp_download_set_piece_checksum_fail_handler(pdtp_download_t, pdtp_piece_checksum_fail_handler_t, void *ptr); + +/** This is invoked when we attempt to connect to a peer */ +typedef void (*pdtp_peer_connect_handler_t)(const char *address, int port, void *ptr); + +void pdtp_download_set_peer_connect_handler(pdtp_download_t, pdtp_peer_connect_handler_t, void *ptr); + +/** This is invoked when we successfully connect to a peer */ +typedef void (*pdtp_peer_connect_complete_handler_t)(const char *address, int port, void *ptr); + +void pdtp_download_set_peer_connect_complete_handler(pdtp_download_t, pdtp_peer_connect_complete_handler_t, void *ptr); + +/** This is invoked if a connection to a peer fails */ +typedef void (*pdtp_peer_connect_fail_handler_t)(const char *address, int port, const char *errmsg, void *ptr); + +void pdtp_download_set_peer_connect_fail_handler(pdtp_download_t, pdtp_peer_connect_fail_handler_t, void *ptr); + +/** This is invoked when we disconnect from a peer */ +typedef void (*pdtp_peer_disconnect_handler_t)(char *address, int port, void *ptr); + +void pdtp_download_set_peer_disconnect_handler(pdtp_download_t, pdtp_peer_disconnect_handler_t, void *ptr); + +/** This is invoked when a peer connects to us */ +typedef void (*pdtp_incoming_connect_handler_t)(char *address, int port, void *ptr); + +void pdtp_download_set_incoming_connect_handler(pdtp_download_t, pdtp_incoming_connect_handler_t, void *ptr); + +/** This is invoked when a peer disconnects from us */ +typedef void (*pdtp_incoming_disconnect_handler_t)(char *address, int port, void *ptr); + +void pdtp_download_set_incoming_disconnect_handler(pdtp_download_t, pdtp_incoming_disconnect_handler_t, void *ptr); + +/** This is invoked when we begin to download a piece */ +typedef void (*pdtp_piece_download_handler_t)(char *address, int port, int piece, int length, void *ptr); + +void pdtp_download_set_piece_download_handler(pdtp_download_t, pdtp_piece_download_handler_t, void *ptr); + +/** This is invoked when we begin to upload a piece */ +typedef void (*pdtp_piece_upload_handler_t)(char *address, int port, int piece, int length, void *ptr); + +void pdtp_download_set_piece_upload_handler(pdtp_download_t, pdtp_piece_upload_handler_t, void *ptr); + +/** This is invoked when a piece download completes */ +typedef void (*pdtp_piece_download_complete_handler_t)(char *address, int port, int piece, void *ptr); + +void pdtp_download_set_piece_download_complete_handler(pdtp_download_t, pdtp_piece_download_complete_handler_t, void *ptr); + +/** This is invoked when a piece upload completes */ +typedef void (*pdtp_piece_upload_complete_handler_t)(char *address, int port, int piece, void *ptr); + +void pdtp_download_set_piece_upload_complete_handler(pdtp_download_t, pdtp_piece_upload_complete_handler_t, void *ptr); + +/** This is invoked whenever we download a portion of a piece */ +typedef void (*pdtp_piece_chunk_download_handler_t)(char *address, int port, int piece, int length, void *ptr); + +void pdtp_download_set_piece_chunk_download_handler(pdtp_download_t, pdtp_piece_chunk_download_handler_t, void *ptr); + +/** This is invoked whenever we upload a portion of a piece */ +typedef void (*pdtp_piece_chunk_upload_handler_t)(char *address, int port, int piece, int length, void *ptr); + +void pdtp_download_set_piece_chunk_upload_handler(pdtp_download_t, pdtp_piece_chunk_upload_handler_t, void *ptr); + /* @} */ /****************** * File Interface * |
From: <tar...@pd...> - 2004-11-23 22:58:06
|
Author: tarcieri Date: 2004-11-23 15:57:56 -0700 (Tue, 23 Nov 2004) New Revision: 46 Added: multiplexer.c multiplexer.h Modified: Makefile.in transaction.h txnmgr.h Log: Initial specification of new transaction interface Modified: Makefile.in =================================================================== --- Makefile.in 2004-11-23 22:37:59 UTC (rev 45) +++ Makefile.in 2004-11-23 22:57:56 UTC (rev 46) @@ -18,6 +18,7 @@ hasher.o \ hmac.o \ md5.o \ + multiplexer.o \ piecelist.o \ proxy.o \ sha1.o \ Added: multiplexer.c =================================================================== --- multiplexer.c 2004-11-23 22:37:59 UTC (rev 45) +++ multiplexer.c 2004-11-23 22:57:56 UTC (rev 46) @@ -0,0 +1,37 @@ +/* + multiplexer.c - Code for multiplexing multiple network connections + Copyright (C)2004 Anthony Arcieri + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * The name of Anthony Arcieri may not be used to endorse or promote + products derived from this software without specific prior written + permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "machdep.h" +#include "pdtp.h" + +#include "multiplexer.h" Added: multiplexer.h =================================================================== --- multiplexer.h 2004-11-23 22:37:59 UTC (rev 45) +++ multiplexer.h 2004-11-23 22:57:56 UTC (rev 46) @@ -0,0 +1,6 @@ +#ifndef MULTIPLEXER_H +#define MULTIPLEXER_H + +typedef struct pdtp_multiplexer *pdtp_multiplexer_t; + +#endif Modified: transaction.h =================================================================== --- transaction.h 2004-11-23 22:37:59 UTC (rev 45) +++ transaction.h 2004-11-23 22:57:56 UTC (rev 46) @@ -8,6 +8,8 @@ #include "machdep.h" #include "pdtp.h" +#include "multiplexer.h" + typedef struct pdtp_transaction_in *pdtp_transaction_in_t; typedef struct pdtp_transaction_out *pdtp_transaction_out_t; @@ -16,6 +18,19 @@ TXN_REPLY } pdtp_transaction_type_t; +/* Synchronous transaction interface. A connection and transaction are + specified, and when a response transaction whose serial matches the + request is received on the connection, it is returned */ +pdtp_transaction_in_t pdtp__transact(pdtp_connection_t, pdtp_transaction_out_t); + +/* Asynchronous transaction interface. A connection and transaction are + specified, as well as a multiplexer, object type, and callback. The + specified transaction is sent to the specified connection. When the + specified multiplexer is running, it will invoke the given callback + whenever a transaction is received with an object of the specified + type which is identical to the same object in the request */ +void pdtp__transact_async(pdtp_connection_t, pdtp_multiplexer_t, pdtp_transaction_out_t, uint16_t obj, void (*callback)(void *arg), void *arg); + void pdtp__transaction_in_destroy(pdtp_transaction_in_t); pdtp_transaction_type_t pdtp__transaction_type(pdtp_transaction_in_t); uint16_t pdtp__transaction_id(pdtp_transaction_in_t); Modified: txnmgr.h =================================================================== --- txnmgr.h 2004-11-23 22:37:59 UTC (rev 45) +++ txnmgr.h 2004-11-23 22:57:56 UTC (rev 46) @@ -1,6 +1,8 @@ #ifndef TXNMGR_H #define TXNMGR_H +#include "transaction.h" + typedef struct pdtp_transaction_manager *pdtp_transaction_manager_t; pdtp_transaction_manager_t pdtp__transaction_manager_create(void); |
From: <tar...@pd...> - 2004-11-23 22:38:03
|
Author: tarcieri Date: 2004-11-23 15:37:59 -0700 (Tue, 23 Nov 2004) New Revision: 45 Added: txnmgr.c txnmgr.h Removed: notifier.c notifier.h test.c Modified: Makefile.in connection.c connection.h directory.c download.c pdtp.c transaction.c transaction.h Log: Completion of slash-n-burn refactoring, codebase compiles again Modified: Makefile.in =================================================================== --- Makefile.in 2004-11-23 21:26:11 UTC (rev 44) +++ Makefile.in 2004-11-23 22:37:59 UTC (rev 45) @@ -18,13 +18,13 @@ hasher.o \ hmac.o \ md5.o \ - notifier.o \ piecelist.o \ proxy.o \ sha1.o \ socketops.o \ threadops.o \ transaction.o \ + txnmgr.o \ .PHONY: all clean distclean depend @@ -50,16 +50,12 @@ pdtp: pdtp.o $(STLIB_NAME) $(LD) -o pdtp pdtp.o $(STLIB_NAME) -test: test.o $(STLIB_NAME) - $(LD) -o test test.o $(STLIB_NAME) - ./test - dox:: mkdir -p ./docs doxygen doxygen.conf clean: - rm -f pdtp test $(STLIB_NAME) $(SHLIB_NAME) *.o + rm -f pdtp $(STLIB_NAME) $(SHLIB_NAME) *.o distclean: clean rm -f compile link buildlib machdep.h .depend Makefile Modified: connection.c =================================================================== --- connection.c 2004-11-23 21:26:11 UTC (rev 44) +++ connection.c 2004-11-23 22:37:59 UTC (rev 45) @@ -41,7 +41,6 @@ #include "connection.h" #include "collection.h" #include "error.h" -#include "notifier.h" #include "socketops.h" #include "threadops.h" Modified: connection.h =================================================================== --- connection.h 2004-11-23 21:26:11 UTC (rev 44) +++ connection.h 2004-11-23 22:37:59 UTC (rev 45) @@ -6,6 +6,8 @@ #include "socketops.h" #include "threadops.h" #include "transaction.h" +#include "txnmgr.h" + /** * @file connection.h * @brief pdtp_connection Modified: directory.c =================================================================== --- directory.c 2004-11-23 21:26:11 UTC (rev 44) +++ directory.c 2004-11-23 22:37:59 UTC (rev 45) @@ -53,11 +53,13 @@ char *errstr; pdtp_transaction_out_t out; pdtp_transaction_in_t in; - pdtp_notifier_t notifier; pdtp_directory_t dir; /* Send a request for a directory listing */ out = pdtp__transaction_create(TXN_DIRLIST, 1); + + /* XXX Disabled pending new transaction handling interface */ +#if 0 notifier = pdtp__transaction_set_serial(out, conn); pdtp__transaction_add_string(out, OBJ_PATH, path); pdtp__transaction_write(conn, out); @@ -68,6 +70,8 @@ /* XXX The connection was lost, handle this */ return 0; } +#endif + return 0; /* XXX Can't do anything for now */ if(!pdtp__transaction_id(in)) { if((index = pdtp__transaction_object_index(in, OBJ_ERRMSG)) < 0 || pdtp__transaction_object_string(in, index, &errstr) < 0) Modified: download.c =================================================================== --- download.c 2004-11-23 21:26:11 UTC (rev 44) +++ download.c 2004-11-23 22:37:59 UTC (rev 45) @@ -106,11 +106,12 @@ pdtp_transaction_out_t out; pdtp_transaction_in_t in; - pdtp_notifier_t notifier; pdtp_download_t dl; /* Send a request for file information */ out = pdtp__transaction_create(TXN_FILEINFO, 1); + + /* XXX Disabled pending new transaction handling interface */ #if 0 notifier = pdtp__transaction_set_serial(out, conn); pdtp__transaction_add_string(out, OBJ_PATH, path); @@ -133,7 +134,7 @@ return 0; } #endif - return 0; + return 0; /* Can't do anything for now */ dl = NEW(pdtp_download); @@ -208,7 +209,6 @@ pdtp_piecelist_t list; pdtp_transaction_in_t in; pdtp_transaction_out_t out; - pdtp_notifier_t notifier; debug("pdtp_download_run() starting"); @@ -223,6 +223,9 @@ } out = pdtp__transaction_create(TXN_DOWNLOAD, 2); + + /* XXX Disabled pending new transaction handling interface */ +#if 0 notifier = pdtp__transaction_set_serial(out, dl->conn); pdtp__transaction_add_string(out, OBJ_PATH, dl->path); pdtp__piecelist_add_to_transaction(out, list); @@ -236,6 +239,8 @@ #endif return; } +#endif + return; if(!pdtp__transaction_id(in)) { #ifndef NDEBUG @@ -267,6 +272,7 @@ pdtp__transaction_in_destroy(in); + return; err: pdtp_set_last_error(ERROR_DOWNLOAD_INIT); Deleted: notifier.c =================================================================== --- notifier.c 2004-11-23 21:26:11 UTC (rev 44) +++ notifier.c 2004-11-23 22:37:59 UTC (rev 45) @@ -1,208 +0,0 @@ -/* - notifier.c - Portable, multiplexable interthread messaging mechanism - Copyright (C)2003-04 Anthony Arcieri - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions - are met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * The name of Anthony Arcieri may not be used to endorse or promote - products derived from this software without specific prior written - permission. - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED - TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include "machdep.h" -#include "sockets.h" - -#include "alloc.h" -#include "notifier.h" -#include "threadops.h" - -/* Maximum number of spare notifiers allowed in a given notifier pool */ -#define MAX_NOTIFIER_POOL_SIZE 8 - -/* The notifier structure. On Win32 we can multiplex event objects with - sockets, allowing us to multiplex the pdtp_notifier_t object with a - socket. Unfortunately, POSIX does not provide event objects with - file descriptors, so we are forced to use pipes for interthread - communication as they can be multiplexed with the select() or poll() - system calls (and will bring the thread out of the respective system - call when we write to them, which is where the destination thread - for the message will be blocking */ -struct pdtp_notifier { -#ifdef WIN32 - void *message; - pdtp_mutex_t mutex; - pdtp_cond_t cond; -#else - int pipe[2]; -#endif /* WIN32 */ -}; - -/* To mitigate the overhead of the system calls required to create or - destroy notifier objects, we store notifier objects in a shared - resource pool for each connection. */ -typedef struct pdtp_notifier_pool_frame { - pdtp_notifier_t notifier; - struct pdtp_notifier_pool_frame *next; -} *pdtp_notifier_pool_frame_t; - -struct pdtp_notifier_pool { - pdtp_notifier_pool_frame_t root; - int count; -}; - -static pdtp_notifier_t notifier_create(void) -{ - pdtp_notifier_t notifier; - - notifier = NEW(pdtp_notifier); - -#ifdef WIN32 - notifier->message = 0; - notifier->mutex = pdtp__mutex_create(); - notifier->cond = pdtp__cond_create(); -#else - if(pipe(notifier->pipe) < 0) { - pdtp__free(notifier); - return 0; - } -#endif /* WIN32 */ - - return notifier; -} - -static void notifier_destroy(pdtp_notifier_t notifier) -{ -#ifdef WIN32 - pdtp__mutex_destroy(notifier->mutex); - pdtp__cond_destroy(notifier->cond); -#else - close(notifier->pipe[0]); - close(notifier->pipe[1]); -#endif - - pdtp__free(notifier); -} - -pdtp_notifier_pool_t pdtp__notifier_pool_create(void) -{ - pdtp_notifier_pool_t pool = NEW(pdtp_notifier_pool); - - pool->root = 0; - pool->count = 0; - - return pool; -} - -void pdtp__notifier_pool_destroy(pdtp_notifier_pool_t pool) -{ - pdtp_notifier_pool_frame_t frame; - - while(pool->root) { - frame = pool->root; - pool->root = frame->next; - - notifier_destroy(frame->notifier); - pdtp__free(frame); - } - - pdtp__free(pool); -} - -pdtp_notifier_t pdtp__notifier_get(pdtp_notifier_pool_t pool) -{ - pdtp_notifier_t notifier; - pdtp_notifier_pool_frame_t frame; - - if(!pool->root) - return notifier_create(); - - frame = pool->root; - pool->root = frame->next; - - notifier = frame->notifier; - pdtp__free(frame); - - pool->count--; - - return notifier; -} - -void pdtp__notifier_return(pdtp_notifier_pool_t pool, pdtp_notifier_t notifier) -{ - pdtp_notifier_pool_frame_t frame; - - if(pool->count >= MAX_NOTIFIER_POOL_SIZE) { - notifier_destroy(notifier); - return; - } - - frame = NEW(pdtp_notifier_pool_frame); - - frame->notifier = notifier; - frame->next = pool->root; - pool->root = frame; - - pool->count++; -} - -void pdtp__notifier_send_message(pdtp_notifier_t notifier, void *message) -{ -#ifdef WIN32 - pdtp__mutex_lock(notifier->mutex); - notifier->message = message; - pdtp__cond_signal(notifier->cond); - pdtp__mutex_unlock(notifier->mutex); -#else - write(notifier->pipe[1], &message, sizeof(void *)); -#endif /* WIN32 */ -} - -void *pdtp__notifier_recv_message(pdtp_notifier_t notifier) -{ - void *ret; -#ifdef WIN32 - pdtp__mutex_lock(notifier->mutex); - pdtp__cond_wait(notifier->cond, notifier->mutex); - ret = notifier->message; - pdtp__mutex_unlock(notifier->mutex); -#else - if(read(notifier->pipe[0], &ret, sizeof(void *)) < 0) - return 0; -#endif /* WIN32 */ - - return ret; -} - -#ifdef WIN32 -WSAEVENT pdtp__notifier_multiplex_object(pdtp_notifier_t notifier) -{ - return (WSAEVENT)notifier->cond; -} -#else -int pdtp__notifier_multiplex_object(pdtp_notifier_t notifier) -{ - return notifier->pipe[0]; -} -#endif /* WIN32 */ Deleted: notifier.h =================================================================== --- notifier.h 2004-11-23 21:26:11 UTC (rev 44) +++ notifier.h 2004-11-23 22:37:59 UTC (rev 45) @@ -1,32 +0,0 @@ -#ifndef NOTIFIER_H -#define NOTIFIER_H - -#include "machdep.h" - -/** - * @file notifier.h - * @brief Portable, multiplexable interthread messaging mechanism - */ -typedef struct pdtp_notifier_pool *pdtp_notifier_pool_t; -typedef struct pdtp_notifier *pdtp_notifier_t; - -/* Functions for managing notifier pools (each connection will have one) */ -pdtp_notifier_pool_t pdtp__notifier_pool_create(void); -void pdtp__notifier_pool_destroy(pdtp_notifier_pool_t pool); - -/* Functions for retrieving a notifier from a pool */ -pdtp_notifier_t pdtp__notifier_get(pdtp_notifier_pool_t pool); -void pdtp__notifier_return(pdtp_notifier_pool_t pool, pdtp_notifier_t notifier); - -/* Messaging functions */ -void pdtp__notifier_send_message(pdtp_notifier_t notifier, void *message); -void *pdtp__notifier_recv_message(pdtp_notifier_t notifier); - -/** Multiplex object accessor methods */ -#ifdef WIN32 -WSAEVENT pdtp__notifier_multiplex_object(pdtp_notifier_t notifier); -#else -int pdtp__notifier_multiplex_object(pdtp_notifier_t notifier); -#endif /* WIN32 */ - -#endif /* NOTIFIER_H */ Modified: pdtp.c =================================================================== --- pdtp.c 2004-11-23 21:26:11 UTC (rev 44) +++ pdtp.c 2004-11-23 22:37:59 UTC (rev 45) @@ -279,6 +279,10 @@ static void do_port(char *arg) { + puts("Currently broken"); + + /* XXX Currently broken */ +#if 0 int port = atoi(arg); if(port < 1) { @@ -295,6 +299,7 @@ puts("failed."); else puts("ok."); +#endif } static void do_help(char *arg) Deleted: test.c =================================================================== --- test.c 2004-11-23 21:26:11 UTC (rev 44) +++ test.c 2004-11-23 22:37:59 UTC (rev 45) @@ -1,177 +0,0 @@ -/* Test driver application for libpdtp - - Exit Codes: - 0 - All operations completed successfully - 1 - Fatal library error - 2 - Fatal system error - -*/ - -#include "machdep.h" -#include "sockets.h" - -#include <stdio.h> -#include <stdlib.h> - -#include "alloc.h" -#include "pdtp.h" -#include "socketops.h" -#include "connection.h" - -#ifndef WIN32 -#include <pthread.h> -#endif - -#define LISTEN_ADDR "127.0.0.1" -#define COMMAND_PORT 2555 -#define DATA_PORT 2556 - -#define PDTP_MAGIC 0x50445450 - -#ifdef WIN32 -typedef HANDLE pdtp_thread_t; -#else -typedef pthread_t *pdtp_thread_t; -#endif /* WIN32 */ - -pdtp_thread_t thr_create(void *(*thread_func)(void *arg), void *arg) -{ -#ifdef WIN32 - return CreateThread(0, 0, thread_func, arg, 0, 0); -#else - pdtp_thread_t thread = (pdtp_thread_t)pdtp__alloc(sizeof(pthread_t)); - - if(pthread_create(thread, 0, thread_func, arg)) { - free(thread); - return 0; - } - - return thread; -#endif /* WIN32 */ -} - -void thr_exit(void *ptr) -{ -#ifdef WIN32 - ExitThread(ptr); -#else - pthread_exit(ptr); -#endif /* WIN32 */ -} - -void *testlib_func(void *arg) -{ - pdtp_connection_t conn; - pdtp_download_t dl; - - printf("--- libpdtp: Connecting to %s:%d\n", LISTEN_ADDR, COMMAND_PORT); - - if(!(conn = pdtp_connect(LISTEN_ADDR, COMMAND_PORT))) { - puts("failed"); - exit(1); - } - - puts("--- libpdtp: Requesting the file '/foo'"); - - if(!(dl = pdtp_download_create(conn, "/foo"))) { - puts("--- libpdtp error: Couldn't retrieve '/foo'"); - exit(1); - } - - thr_exit(0); - - /* Unreached, but to supress warnings */ - return 0; -} - -int cmdfd_handshake(int sock) -{ - uint16_t v16; - uint32_t v32; - - /* Read the magic number from the client */ - if(pdtp__readsock_int32(sock, &v32) < 0) - return -1; - - if(v32 != PDTP_MAGIC) - return -1; - - pdtp__writesock_int32(sock, PDTP_MAGIC); - - /* Minimum protocol version supported, currently hardcoded to 1 */ - pdtp__writesock_int16(sock, 1); - - /* Maximum protocol version supported, currently hardcoded to 1 */ - pdtp__writesock_int16(sock, 1); - - /* Read the client's selected protocol version */ - if(pdtp__readsock_int16(sock, &v16) < 0) - return -1; - - if(v16 != 1) - return -1; - - return 0; -} - -#if 0 -static pdtp_transaction_in_t txn_read(int sock) -{ - pdtp_transaction_in_t txn; - pdtp_connection_t conn = NEW(pdtp_connection); - pdtp_notifier_t notifier; - - conn->sock = sock; - conn->addr = 0; - conn->socket_lock = pdtp__mutex_create(); - conn->txnmgr = pdtp__transaction_manager_create(); - - pdtp__mutex_destroy(conn->socket_lock); - pdtp__transaction_manager_destroy(conn->txnmgr); -} - -static int txn_send(int sock, pdtp_transaction_out_t txn) -{ -} -#endif - -int main() -{ - pdtp_thread_t testlib_thread; - int listenfd, cmdfd; - - puts("--- | libpdtp test driver starting | ---"); - - printf("*** Creating a listener socket on %s:%d... ", LISTEN_ADDR, COMMAND_PORT); - fflush(stdout); - - if((listenfd = pdtp__ipv4_tcp_listener_create(inet_addr(LISTEN_ADDR), COMMAND_PORT)) < 0) { - puts("failed."); - exit(2); - } else - puts("done."); - - puts("*** Running library functions thread"); - - if(!(testlib_thread = thr_create(testlib_func, 0))) { - puts("Error: Couldn't start thread"); - exit(1); - } else - - if((cmdfd = pdtp__ipv4_listener_accept(listenfd, 0)) < 0) { - printf("!!! Listener socket error"); - exit(2); - } - - close(listenfd); - puts("*** Connection completed. Doing handshaking..."); - - if(cmdfd_handshake(cmdfd) < 0) { - puts("Error: Handshake failed"); - exit(1); - } - - puts("*** Handshake succeeded"); - - exit(0); -} Modified: transaction.c =================================================================== --- transaction.c 2004-11-23 21:26:11 UTC (rev 44) +++ transaction.c 2004-11-23 22:37:59 UTC (rev 45) @@ -44,7 +44,6 @@ #include "debug.h" #include "collection.h" #include "connection.h" -#include "notifier.h" #include "socketops.h" #include "threadops.h" #include "transaction.h" @@ -634,7 +633,7 @@ pdtp__free(txn); } -pdtp_notifier_t pdtp__transaction_set_serial(pdtp_transaction_out_t txn, pdtp_connection_t conn) +void pdtp__transaction_set_serial(pdtp_transaction_out_t txn, pdtp_connection_t conn) { uint32_t serial; @@ -643,9 +642,9 @@ serial = conn->txnmgr->serial++; txn->serial = htonl(serial); pdtp__mutex_unlock(conn->txnmgr->serial_lock); -#endif return pdtp__create_serial_notifier(conn, serial); +#endif } void pdtp__transaction_add_object(pdtp_transaction_out_t txn, uint16_t id, const void *data, uint32_t length) Modified: transaction.h =================================================================== --- transaction.h 2004-11-23 21:26:11 UTC (rev 44) +++ transaction.h 2004-11-23 22:37:59 UTC (rev 45) @@ -8,10 +8,6 @@ #include "machdep.h" #include "pdtp.h" -#include "notifier.h" - -typedef struct pdtp_transaction_manager *pdtp_transaction_manager_t; -typedef struct pdtp_transaction_listener *pdtp_transaction_listener_t; typedef struct pdtp_transaction_in *pdtp_transaction_in_t; typedef struct pdtp_transaction_out *pdtp_transaction_out_t; @@ -37,7 +33,7 @@ pdtp_transaction_out_t pdtp__transaction_create(uint16_t tid, uint16_t obj_count); void pdtp__transaction_out_destroy(pdtp_transaction_out_t); -pdtp_notifier_t pdtp__transaction_set_serial(pdtp_transaction_out_t txn, pdtp_connection_t conn); +void pdtp__transaction_set_serial(pdtp_transaction_out_t txn, pdtp_connection_t conn); void pdtp__transaction_add_object(pdtp_transaction_out_t, uint16_t id, const void *data, uint32_t length); void pdtp__transaction_add_int16(pdtp_transaction_out_t, uint16_t id, uint16_t value); void pdtp__transaction_add_int32(pdtp_transaction_out_t, uint16_t id, uint32_t value); Added: txnmgr.c =================================================================== --- txnmgr.c 2004-11-23 21:26:11 UTC (rev 44) +++ txnmgr.c 2004-11-23 22:37:59 UTC (rev 45) @@ -0,0 +1,66 @@ +/* + txnmgr.c - Code for managing routing of transactions between threads + Copyright (C)2004 Anthony Arcieri + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * The name of Anthony Arcieri may not be used to endorse or promote + products derived from this software without specific prior written + permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "machdep.h" +#include "pdtp.h" + +#include "alloc.h" +#include "threadops.h" +#include "txnmgr.h" + +struct pdtp_transaction_manager { + /* Connection serial number and respective lock */ + uint32_t serial; + pdtp_mutex_t serial_lock; +}; + +pdtp_transaction_manager_t pdtp__transaction_manager_create(void) +{ + pdtp_transaction_manager_t txnmgr; + + txnmgr = NEW(pdtp_transaction_manager); + + /* Initialize serial counter and respective lock */ + txnmgr->serial = 1; + txnmgr->serial_lock = pdtp__mutex_create(); + + return txnmgr; +} + +void pdtp__transaction_manager_destroy(pdtp_transaction_manager_t txnmgr) +{ + pdtp__mutex_destroy(txnmgr->serial_lock); + + /* Free the structure itself */ + pdtp__free(txnmgr); +} Added: txnmgr.h =================================================================== --- txnmgr.h 2004-11-23 21:26:11 UTC (rev 44) +++ txnmgr.h 2004-11-23 22:37:59 UTC (rev 45) @@ -0,0 +1,9 @@ +#ifndef TXNMGR_H +#define TXNMGR_H + +typedef struct pdtp_transaction_manager *pdtp_transaction_manager_t; + +pdtp_transaction_manager_t pdtp__transaction_manager_create(void); +void pdtp__transaction_manager_destroy(pdtp_transaction_manager_t txnmgr); + +#endif /* TXNMGR_H */ |
From: <tar...@pd...> - 2004-10-25 20:50:53
|
Author: tarcieri Date: 2004-10-25 14:50:43 -0600 (Mon, 25 Oct 2004) New Revision: 43 Modified: connection.c Log: VHOST bugfixes Modified: connection.c =================================================================== --- connection.c 2004-10-25 20:45:29 UTC (rev 42) +++ connection.c 2004-10-25 20:50:43 UTC (rev 43) @@ -35,6 +35,8 @@ #include "sockets.h" #include "pdtp.h" +#include <string.h> + #include "alloc.h" #include "connection.h" #include "collection.h" @@ -92,10 +94,10 @@ goto err1; /* Select virtual host */ - if(pdtp__writesock_int16(sock, sizeof(hostname)) < 0) + if(pdtp__writesock_int16(sock, strlen(hostname)) < 0) goto err1; - if(pdtp__writesock(sock, hostname, sizeof(hostname)) < 0) + if(pdtp__writesock(sock, hostname, strlen(hostname)) < 0) goto err1; connection = NEW(pdtp_connection); |
From: <tar...@pd...> - 2004-10-25 20:45:44
|
Author: tarcieri Date: 2004-10-25 14:45:29 -0600 (Mon, 25 Oct 2004) New Revision: 42 Modified: connection.c socketops.c socketops.h Log: const correctness fixes Modified: connection.c =================================================================== --- connection.c 2004-10-25 01:39:57 UTC (rev 41) +++ connection.c 2004-10-25 20:45:29 UTC (rev 42) @@ -95,7 +95,7 @@ if(pdtp__writesock_int16(sock, sizeof(hostname)) < 0) goto err1; - if(pdtp__timeout_writesock(sock, hostname, sizeof(hostname)) < 0) + if(pdtp__writesock(sock, hostname, sizeof(hostname)) < 0) goto err1; connection = NEW(pdtp_connection); Modified: socketops.c =================================================================== --- socketops.c 2004-10-25 01:39:57 UTC (rev 41) +++ socketops.c 2004-10-25 20:45:29 UTC (rev 42) @@ -356,12 +356,12 @@ return 0; } -ssize_t pdtp__atomic_writesock(pdtp_sock_t sock, void *buf, size_t nbytes) +ssize_t pdtp__atomic_writesock(pdtp_sock_t sock, const void *buf, size_t nbytes) { return send(sock, buf, nbytes, 0); } -ssize_t pdtp__timeout_writesock(pdtp_sock_t sock, void *buf, size_t nbytes) +ssize_t pdtp__timeout_writesock(pdtp_sock_t sock, const void *buf, size_t nbytes) { fd_set fdset; struct timeval tv; @@ -380,10 +380,10 @@ /* Write function. This uses a loop to ensure the data is written completely, as the sockets involved will be set non-blocking */ -int pdtp__writesock(pdtp_sock_t sock, void *buf, size_t nbytes) +int pdtp__writesock(pdtp_sock_t sock, const void *buf, size_t nbytes) { ssize_t c; - uint8_t *bptr = buf; + uint8_t *bptr = (void *)buf; while(nbytes > 0) { if((c = pdtp__timeout_writesock(sock, bptr, nbytes)) < 1) Modified: socketops.h =================================================================== --- socketops.h 2004-10-25 01:39:57 UTC (rev 41) +++ socketops.h 2004-10-25 20:45:29 UTC (rev 42) @@ -37,9 +37,9 @@ int pdtp__readsock_int16(pdtp_sock_t sock, uint16_t *value); int pdtp__readsock_int32(pdtp_sock_t sock, uint32_t *value); -ssize_t pdtp__atomic_writesock(pdtp_sock_t sock, void *buf, size_t nbytes); -ssize_t pdtp__timeout_writesock(pdtp_sock_t sock, void *buf, size_t nbytes); -int pdtp__writesock(pdtp_sock_t sock, void *buf, size_t nbytes); +ssize_t pdtp__atomic_writesock(pdtp_sock_t sock, const void *buf, size_t nbytes); +ssize_t pdtp__timeout_writesock(pdtp_sock_t sock, const void *buf, size_t nbytes); +int pdtp__writesock(pdtp_sock_t sock, const void *buf, size_t nbytes); int pdtp__writesock_int16(pdtp_sock_t sock, uint16_t value); int pdtp__writesock_int32(pdtp_sock_t sock, uint32_t value); |
From: <tar...@pd...> - 2004-10-25 01:40:12
|
Author: tarcieri Date: 2004-10-24 19:39:57 -0600 (Sun, 24 Oct 2004) New Revision: 41 Modified: connection.c Log: Updates to support latest protocol revision Modified: connection.c =================================================================== --- connection.c 2004-09-21 19:45:35 UTC (rev 40) +++ connection.c 2004-10-25 01:39:57 UTC (rev 41) @@ -91,6 +91,13 @@ if(pdtp__writesock_int16(sock, 1) < 0) goto err1; + /* Select virtual host */ + if(pdtp__writesock_int16(sock, sizeof(hostname)) < 0) + goto err1; + + if(pdtp__timeout_writesock(sock, hostname, sizeof(hostname)) < 0) + goto err1; + connection = NEW(pdtp_connection); connection->sock = sock; connection->addr = addr; |