[C-mpi-commits] SF.net SVN: c-mpi:[40] src
Status: Pre-Alpha
Brought to you by:
jmwozniak
|
From: <jmw...@us...> - 2010-04-27 21:27:33
|
Revision: 40
http://c-mpi.svn.sourceforge.net/c-mpi/?rev=40&view=rev
Author: jmwozniak
Date: 2010-04-27 21:27:26 +0000 (Tue, 27 Apr 2010)
Log Message:
-----------
KDA-2A and KDA-2B compile.
Modified Paths:
--------------
include/cmpi.h
include/kda-2.h
include/kda_conn-A.h
include/kda_conn-B.h
src/cmpi/cmpi.c
src/cmpi-db/cmpi-db-fifo.c
src/kda-2/cmpi_kademlia.c
src/kda-2/conn-A.c
src/kda-2/conn-B.c
src/kda-2/conn-C.c
src/kda-2/kademlia.c
src/kda-2/module.mk.in
src/kda-2/neighbor.c
src/mpi_tools/mpi_tools.c
src/mpirpc/mpirpc.c
Modified: include/cmpi.h
===================================================================
--- include/cmpi.h 2010-04-20 20:43:44 UTC (rev 39)
+++ include/cmpi.h 2010-04-27 21:27:26 UTC (rev 40)
@@ -226,6 +226,9 @@
/**
Retrieve a key/value pair from CMPI.
+ @param key The key
+ @param value Store pointer to value here
+ @param length OUT The value length
*/
CMPI_RETURN cmpi_get(char* key, char** value, int* length);
Modified: include/kda-2.h
===================================================================
--- include/kda-2.h 2010-04-20 20:43:44 UTC (rev 39)
+++ include/kda-2.h 2010-04-27 21:27:26 UTC (rev 40)
@@ -2,13 +2,13 @@
#ifndef KDA_2_H
#define KDA_2_H
-#include <limits.h>
-#include <signal.h>
-#include <stdbool.h>
+#include <limits.h>
+#include <signal.h>
+#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
-#include <time.h>
-#include <unistd.h>
+#include <time.h>
+#include <unistd.h>
#include <mpi.h>
@@ -16,81 +16,81 @@
#include <itable.h>
#include <list.h>
#include <ilist.h>
-#include <inlist.h>
+#include <inlist.h>
#include <cmpi.h>
-#include <mpi_tools.h>
+#include <mpi_tools.h>
-#include "kda_types-2.h"
+#include "kda_types-2.h"
-#include "connection.h"
+#include "connection.h"
#include "kda_neighbor-2.h"
-#define KDA_SPACE_SIZE (sizeof(KDA_ID)*8) // 160 in paper
-#define KDA_HASH_SPACE UINT_MAX //
+#define KDA_SPACE_SIZE (sizeof(KDA_ID)*8) // 160 in paper
+#define KDA_HASH_SPACE UINT_MAX //
/**
Obtain the Kademlia distance between two ids.
*/
-#define XOR(id1, id2) (((unsigned int) id1) ^ ((unsigned int) id2))
+#define XOR(id1, id2) (((unsigned int) id1) ^ ((unsigned int) id2))
-extern KDA_ID id;
-extern K_BUCKET k_bucket[];
-extern struct list* hubs;
+extern KDA_ID id;
+extern K_BUCKET k_bucket[];
+extern struct list* hubs;
extern int kda_nodes;
enum
{
KDA_STATUS_SEARCHING,
KDA_STATUS_CALLING,
- KDA_STATUS_COMPLETE
+ KDA_STATUS_COMPLETE
};
/**
KDA_ID_NULL=0 is an invalid KDA_ID used for semantic purposes.
- make_id must not create this id.
+ make_id must not create this id.
*/
#define KDA_ID_NULL 0
/**
- KDA_ID_CLIENT=1 is an invalid KDA_ID used to identify client comms.
+ KDA_ID_CLIENT=1 is an invalid KDA_ID used to identify client comms.
*/
#define KDA_ID_CLIENT 1
void KDA_Init(int alpha_in, int k_in);
void KDA_Init_client(int alpha_in, int k_in);
-void KDA_Init_conn(void);
-void KDA_Init_conn_client(void);
+void KDA_Init_conn(void);
+void KDA_Init_conn_client(void);
//// General API...
/**
Return once op->status is KDA_STATUS_COMPLETE
*/
-void KDA_Wait(KDA_Operation* op);
+void KDA_Wait(KDA_Operation* op);
//// In-DHT API...
-void KDA_Ping(KDA_ID other_id);
-int KDA_Rank(KDA_ID other_id);
+void KDA_Ping(KDA_ID other_id);
+int KDA_Rank(KDA_ID other_id);
struct ilist* KDA_Closest(KDA_ID object_id);
struct ilist* KDA_Find_key(char* key);
-struct ilist* KDA_Find(KDA_ID object_id);
-MPIRPC_Node* KDA_Node(KDA_ID other_id);
+struct ilist* KDA_Find(KDA_ID object_id);
+MPIRPC_Node* KDA_Node(KDA_ID other_id);
//// Helpers...
-void KDA_Setup_node_port(void);
+void KDA_Setup_node_port(void);
void KDA_Serve(void);
-void KDA_Serve_id(KDA_ID other_id, MPIRPC_Node node);
+void KDA_Serve_id(KDA_ID other_id, MPIRPC_Node node);
MPIRPC_Node KDA_Serve_accept(void);
void KDA_Serve_client(MPIRPC_Node node);
void KDA_Serve_handshake(MPIRPC_Node node,
- KDA_ID* other_id, int* msg);
+ KDA_ID* other_id, int* msg);
void KDA_Serve_neighbor(KDA_ID other_id, MPIRPC_Node node);
void KDA_Setup_connector_port(void);
void KDA_Connect_port(char* port);
MPIRPC_Node* KDA_Attach(void);
MPIRPC_Node* KDA_Attach_port(char* port);
-MPIRPC_Node* KDA_Register_hub(MPI_Comm comm);
+MPIRPC_Node* KDA_Register_hub(MPI_Comm comm);
void KDA_Comm_set_name(KDA_Neighbor* neighbor);
@@ -98,34 +98,34 @@
void (*proceed)(MPIRPC* rpc),
KDA_Query* query);
bool KDA_Find_node(KDA_Operation* op);
-bool KDA_Find_node_again(KDA_Operation* op);
+bool KDA_Find_node_again(KDA_Operation* op);
// void KDA_Send_neighbor(MPI_Comm comm);
void KDA_Serve_shutdown(void);
-void KDA_Detach(MPIRPC_Node node);
-void KDA_Shutdown(void);
+void KDA_Detach(MPIRPC_Node node);
+void KDA_Shutdown(void);
void KDA_Shutdown_port(char* hub_port);
/**
- Return the KDA_Neighbor for this MPIRPC_Node.
+ Return the KDA_Neighbor for this MPIRPC_Node.
Could fail if other_id is not in local neighbor table.
*/
-KDA_Neighbor* KDA_Neighbor_node(MPIRPC_Node node);
+KDA_Neighbor* KDA_Neighbor_node(MPIRPC_Node node);
/**
Could fail if other_id is not in local neighbor table.
- */
-KDA_Neighbor* KDA_Neighbor_ID(KDA_ID other_id);
+ */
+KDA_Neighbor* KDA_Neighbor_ID(KDA_ID other_id);
/**
Attempt to contact the node other_id using a node as a
- reference.
+ reference.
*/
-void KDA_Harpoon(MPIRPC_Node reference, KDA_ID other_id);
+void KDA_Harpoon(MPIRPC_Node reference, KDA_ID other_id);
/**
Connect two neighbors in the DHT.
- @param node1 The neighbor to issue the accept().
+ @param node1 The neighbor to issue the accept().
@param node2 The neighbor to issue the connect().
*/
void KDA_Link(MPIRPC_Node node1, MPIRPC_Node node2);
@@ -133,57 +133,56 @@
/**
@return A description of the form "id".
*/
-char* KDA_Description(void);
-char* KDA_id_tostring(KDA_ID other_id);
+char* KDA_Description(void);
+char* KDA_id_tostring(KDA_ID other_id);
-//// Remote API:
+//// Remote API:
MPIRPC* KDA_Translate(MPIRPC_Node node, KDA_ID id);
MPIRPC* KDA_Translate_k(MPIRPC_Node node, KDA_ID id);
MPIRPC* KDA_Lookup(MPIRPC_Node node, char* key);
MPIRPC* KDA_Lookup_k(MPIRPC_Node node, char* key);
+
+MPIRPC* KDA_Store(KDA_Neighbor* neighbor, char* key, char* value, int length);
+
/**
- Instruct a node to store a value.
-*/
-MPIRPC* KDA_Store(KDA_Neighbor* neighbor, char* key, char* value);
-/**
Instruct a node to retrieve a value.
*/
MPIRPC* KDA_Retrieve(KDA_Neighbor* neighbor, char* key);
//// Convenience prototypes to keep functions in decent order...
KDA_ID make_id(int salt);
-// void join(MPIRPC_Node* node);
+// void join(MPIRPC_Node* node);
char* id_tostring(void);
-void listen_loop(void);
+void listen_loop(void);
void bootstrap(MPIRPC_Node node);
KDA_Neighbor* random_neighbor(void);
-int KDA_Neighbor_table_size(void);
+int KDA_Neighbor_table_size(void);
-//// Neighbor management...
+//// Neighbor management...
/**
- Add neighbor to the neighbor table.
+ Add neighbor to the neighbor table.
@return false iff n is already in the neighbor table.
*/
bool neighbor_add(KDA_Neighbor* neighbor);
/**
- Add neighbor to the client list.
+ Add neighbor to the client list.
*/
void client_add(KDA_Neighbor* client);
/**
- Find neighbor in buckets and set its comm.
+ Find neighbor in buckets and set its comm.
*/
void neighbor_set_node(KDA_Neighbor* neighbor, MPIRPC_Node node);
/**
Translate a dummy neighbor that is set to MPI_COMM_NULL to
a good one in a bucket.
- TODO: Speed this up with an index or something.
+ TODO: Speed this up with an index or something.
*/
-KDA_Neighbor* neighbor_lookup(KDA_Neighbor* neighbor);
+KDA_Neighbor* neighbor_lookup(KDA_Neighbor* neighbor);
/**
Use node to connect to neighbor.
@@ -192,13 +191,13 @@
/**
Add neighbor to the neighbor table.
- Use node to link to the neighbor.
+ Use node to link to the neighbor.
*/
-void connect_neighbor(MPIRPC_Node* node, KDA_Neighbor* neighbor);
+void connect_neighbor(MPIRPC_Node* node, KDA_Neighbor* neighbor);
-void dump_bucket_ids(void);
+void dump_bucket_ids(void);
char* bucket_ids_tostring(void);
-void dump_buckets(void);
+void dump_buckets(void);
char* buckets_tostring(void);
//// Handlers...
@@ -206,42 +205,53 @@
/**
Generate info string for debugging
*/
-void handle_info(MPIRPC_Node node, int unique, char* args);
-void handle_join(MPIRPC_Node caller, int unique, char* args);
-void handle_link(MPIRPC_Node caller, int unique, char* args);
-void handle_find_node(MPIRPC_Node node, int unique, char* args);
-void handle_neighbor(MPIRPC_Node node, int unique, char* args);
-void handle_ping(MPIRPC_Node node, int unique, char* args);
-void handle_query_id(MPIRPC_Node node, int unique, char* args);
-void handle_query_id_k(MPIRPC_Node node, int unique, char* args);
-void handle_store(MPIRPC_Node node, int unique, char* args);
-void handle_retrieve(MPIRPC_Node node, int unique, char* args);
-void handle_shutdown(MPIRPC_Node caller, int unique, char* args);
+void handle_info(MPIRPC_Node node, int unique, char* args,
+ char* blob, int blob_length);
+void handle_join(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length);
+void handle_link(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length);
+void handle_find_node(MPIRPC_Node node, int unique, char* args,
+ char* blob, int blob_length);
+void handle_neighbor(MPIRPC_Node node, int unique, char* args,
+ char* blob, int blob_length);
+void handle_ping(MPIRPC_Node node, int unique, char* args,
+ char* blob, int blob_length);
+void handle_query_id(MPIRPC_Node node, int unique, char* args,
+ char* blob, int blob_length);
+void handle_query_id_k(MPIRPC_Node node, int unique, char* args,
+ char* blob, int blob_length);
+void handle_store(MPIRPC_Node node, int unique, char* args,
+ char* blob, int blob_length);
+void handle_retrieve(MPIRPC_Node node, int unique, char* args,
+ char* blob, int blob_length);
+void handle_shutdown(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length);
/* SPECIFIC TO CONN TYPE
- void handle_accept(MPIRPC_Node* caller, int unique, char* args);
- void handle_connect(MPIRPC_Node* caller, int unique, char* args);
+ void handle_accept(MPIRPC_Node* caller, int unique, char* args, char* blob, int blob_length);
+ void handle_connect(MPIRPC_Node* caller, int unique, char* args, char* blob, int blob_length);
*/
// Synchronous methods...
-struct ilist* find_node(KDA_ID id);
+struct ilist* find_node(KDA_ID id);
-// Asynchronous methods and return services...
+// Asynchronous methods and return services...
void query_id(MPIRPC_Node node, int unique, KDA_ID id);
-void return_query_id(KDA_Operation* op);
+void return_query_id(KDA_Operation* op);
void query_id_k(MPIRPC_Node node, int unique, KDA_ID id);
-void return_query_id_k(KDA_Operation* op);
+void return_query_id_k(KDA_Operation* op);
// RPCs...
-void rpc_find_node(KDA_Neighbor* neighbor, KDA_Operation* op);
+void rpc_find_node(KDA_Neighbor* neighbor, KDA_Operation* op);
// Proceeds...
void proceed_find(MPIRPC* rpc);
-void proceed_ping(MPIRPC* rpc);
-void proceed_link(MPIRPC* rpc);
+void proceed_ping(MPIRPC* rpc);
+void proceed_link(MPIRPC* rpc);
// void proceed_
-#endif
+#endif
Modified: include/kda_conn-A.h
===================================================================
--- include/kda_conn-A.h 2010-04-20 20:43:44 UTC (rev 39)
+++ include/kda_conn-A.h 2010-04-27 21:27:26 UTC (rev 40)
@@ -1,23 +1,18 @@
-#include "kademlia.h"
+#include "kda-2.h"
void KDA_Init_conn(void);
MPI_Comm KDA_Comm_create(int* other_rank);
-void KDA_Join(int other_id, int other_rank);
+void KDA_Join(int other_id, int other_rank);
/**
- Initiate a connection with the caller.
+ Translate a KDA_ID to a rank in MPI_COMM_WORLD.
*/
-void handle_join(MPIRPC_Node* caller, int unique, char* args);
+void handle_get_rank(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length);
/**
- Translate a KDA_ID to a rank in MPI_COMM_WORLD.
+ Return the local KDA_ID.
*/
-void handle_get_rank(MPIRPC_Node* caller, int unique, char* args);
-
-/**
- Return the local KDA_ID.
-*/
-void handle_get_id(MPIRPC_Node* caller, int unique, char* args);
+void handle_get_id(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length);
Modified: include/kda_conn-B.h
===================================================================
--- include/kda_conn-B.h 2010-04-20 20:43:44 UTC (rev 39)
+++ include/kda_conn-B.h 2010-04-27 21:27:26 UTC (rev 40)
@@ -2,65 +2,67 @@
/**
Connection techniques for KDA-2B DHT.
Uses HUBs to manage initial bootstrap connections.
- Uses three-way link/accept/connect/join to add neighbors.
+ Uses three-way link/accept/connect/join to add neighbors.
*/
-
#include "kda-2.h"
-extern char port[];
+extern char port[];
-void KDA_Init_conn(void);
+void KDA_Init_conn(void);
-void KDA_Connect_port(char* port);
+void KDA_Connect_port(char* port);
-//// Hub methods:
-void KDA_Hub_init(void);
-void KDA_Hub_loop(void);
+//// Hub methods:
+void KDA_Hub_init(void);
+void KDA_Hub_loop(void);
MPIRPC_Node KDA_Serve_accept(void);
void KDA_Serve_handshake(MPIRPC_Node node, KDA_ID* other_id,
- int* msg);
-void KDA_Serve_id(KDA_ID other_id, MPIRPC_Node node);
-void KDA_Serve_client(MPIRPC_Node node);
-void KDA_Serve_neighbor(KDA_ID other_id, MPIRPC_Node node);
+ int* msg);
+void KDA_Serve_id(KDA_ID other_id, MPIRPC_Node node);
+void KDA_Serve_client(MPIRPC_Node node);
+void KDA_Serve_neighbor(KDA_ID other_id, MPIRPC_Node node);
void KDA_Node_init(void);
-void KDA_Init_conn_client(void);
+void KDA_Init_conn_client(void);
void KDA_Setup_node_port(void);
/**
After establishing connection via MPI-2,
- connector calls acceptor to update neighbor tables.
+ connector calls acceptor to update neighbor tables.
*/
-void KDA_Join(MPI_Comm comm);
+void KDA_Join(MPI_Comm comm);
-MPIRPC_Node* KDA_Attach(void);
+MPIRPC_Node* KDA_Attach(void);
-MPIRPC_Node* KDA_Attach_port(char* hub_port);
+MPIRPC_Node* KDA_Attach_port(char* hub_port);
/**
- Register this comm as a hub.
-*/
-MPIRPC_Node* KDA_Register_hub(MPI_Comm comm);
+ Register this comm as a hub.
+*/
+MPIRPC_Node* KDA_Register_hub(MPI_Comm comm);
/**
- Link the caller node to the requested node.
+ Link the caller node to the requested node.
*/
-void handle_link(MPIRPC_Node caller, int unique, char* args);
+void handle_link(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length);
/**
- Connect two neighbors in the DHT, if neither is NULL.
- @param n1 The neighbor to issue the accept().
+ Connect two neighbors in the DHT, if neither is NULL.
+ @param n1 The neighbor to issue the accept().
@param n2 The neighbor to issue the connect().
*/
-void KDA_Link(MPIRPC_Node node1, MPIRPC_Node node2);
+void KDA_Link(MPIRPC_Node node1, MPIRPC_Node node2);
-void handle_accept(MPIRPC_Node caller, int unique, char* args);
+void handle_accept(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length);
-void handle_connect(MPIRPC_Node caller, int unique, char* args);
+void handle_connect(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length);
KDA_ID rpc_join(MPIRPC_Node node);
-void rpc_link_neighbor(MPIRPC_Node node, KDA_ID other_id);
+void rpc_link_neighbor(MPIRPC_Node node, KDA_ID other_id);
Modified: src/cmpi/cmpi.c
===================================================================
--- src/cmpi/cmpi.c 2010-04-20 20:43:44 UTC (rev 39)
+++ src/cmpi/cmpi.c 2010-04-27 21:27:26 UTC (rev 40)
@@ -104,7 +104,10 @@
}
/**
-
+ Obtain pointer to requested data
+ @param key Locate the data for this key
+ @param data OUT Store data pointer here
+ @return length of data
*/
int
cmpi_cached_retrieve(char* key, char** data)
Modified: src/cmpi-db/cmpi-db-fifo.c
===================================================================
--- src/cmpi-db/cmpi-db-fifo.c 2010-04-20 20:43:44 UTC (rev 39)
+++ src/cmpi-db/cmpi-db-fifo.c 2010-04-27 21:27:26 UTC (rev 40)
@@ -1,17 +1,17 @@
-#include <unistd.h>
+#include <unistd.h>
-#include <node.h>
-#include <driver.h>
+#include <node.h>
+#include <driver.h>
void
wait_for_notification(void)
{
int tmp;
- MPI_Status status;
+ MPI_Status status;
MPI_Recv(&tmp, 1, MPI_INT,
- MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
- assert(tmp == -2);
+ MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
+ assert(tmp == -2);
}
void
@@ -19,27 +19,27 @@
{
if (mpi_rank < mpi_size-1)
{
- int msg = -2;
+ int msg = -2;
MPI_Send(&msg, 1, MPI_INT, mpi_rank+1, 0, MPI_COMM_WORLD);
}
else
{
- // NOTE("NOTIFICATION COMPLETE");
+ // NOTE("NOTIFICATION COMPLETE");
}
}
void
cmpi_client_code()
{
- char hostname[100];
+ char hostname[100];
wait_for_notification();
- notify_next();
-
- NOTE("CMPI-DB-FIFO...");
- gethostname(hostname, 100);
- SHOW_S(hostname);
-
+ notify_next();
+
+ NOTE("CMPI-DB-FIFO...");
+ gethostname(hostname, 100);
+ SHOW_S(hostname);
+
cmpi_driver* driver = driver_create();
while (! driver->quitting)
Modified: src/kda-2/cmpi_kademlia.c
===================================================================
--- src/kda-2/cmpi_kademlia.c 2010-04-20 20:43:44 UTC (rev 39)
+++ src/kda-2/cmpi_kademlia.c 2010-04-27 21:27:26 UTC (rev 40)
@@ -1,18 +1,18 @@
-#include "cmpi_kda-2.h"
+#include "cmpi_kda-2.h"
/**
List of KDA_Neighbor*s.
*/
-struct list* contacts;
+struct list* contacts;
CMPI_RETURN
cmpi_init_impl()
{
- // NOTE_F;
+ // NOTE_F;
KDA_Init(3, 3);
- return CMPI_SUCCESS;
+ return CMPI_SUCCESS;
}
CMPI_RETURN
@@ -21,7 +21,7 @@
contacts = list_create();
KDA_Init_client(3,3);
- return CMPI_SUCCESS;
+ return CMPI_SUCCESS;
}
CMPI_RETURN
@@ -34,13 +34,13 @@
kda_nodes = n;
}
- return CMPI_SUCCESS;
+ return CMPI_SUCCESS;
}
CMPI_RETURN
cmpi_attach()
{
- CMPI_RETURN result;
+ CMPI_RETURN result;
if (KDA_Attach())
{
add_contacts();
@@ -48,8 +48,8 @@
}
else
result = CMPI_ERROR_SERVICENAME;
-
- return result;
+
+ return result;
}
void
@@ -57,23 +57,23 @@
{
int i;
struct list_item* item;
- NOTE_F;
+ NOTE_F;
for (i = 0; i < KDA_SPACE_SIZE; i++)
- for (item = k_bucket[i]->head;
+ for (item = k_bucket[i]->head;
item; item = item->next)
{
KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
- KDA_Neighbor_dump(neighbor);
+ KDA_Neighbor_dump(neighbor);
list_add_unique(contacts, KDA_Neighbor_cmp, neighbor);
- KDA_Comm_set_name(neighbor);
+ KDA_Comm_set_name(neighbor);
}
}
-KDA_Neighbor*
+KDA_Neighbor*
contact_lookup(KDA_Neighbor* neighbor)
{
struct list_item* item;
- NOTE_FX(neighbor->id);
+ NOTE_FX(neighbor->id);
for (item = contacts->head;
item; item = item->next)
{
@@ -81,45 +81,45 @@
if (n->id == neighbor->id)
return n;
}
-
+
NOTE("CONTACT LOOKUP FAILED: LINKING");
- KDA_Harpoon(random_neighbor()->node, neighbor->id);
- add_contacts();
-
- return contact_lookup(neighbor);
+ KDA_Harpoon(random_neighbor()->node, neighbor->id);
+ add_contacts();
+
+ return contact_lookup(neighbor);
}
char*
cmpi_info(CMPI_ID key)
{
char* result = NULL;
- KDA_Neighbor* neighbor = (KDA_Neighbor*) list_random(contacts);
+ KDA_Neighbor* neighbor = (KDA_Neighbor*) list_random(contacts);
MPIRPC* rpc = MPIRPC_Call(neighbor->node, "info",
- xheap(key),
- "", MPIRPC_PROCEED_NULL);
+ xheap(key),
+ "", MPIRPC_PROCEED_NULL);
MPIRPC_Wait(rpc);
- result = rpc->result;
- MPIRPC_Free(rpc);
- return result;
+ result = rpc->result;
+ MPIRPC_Free(rpc);
+ return result;
}
/**
Ex-DHT search.
- @return The MPI rank closest to the given key.
+ @return The MPI rank closest to the given key.
*/
CMPI_ID
cmpi_lookup(char* key)
{
- NOTE_FS(key);
+ NOTE_FS(key);
KDA_Neighbor* n = (KDA_Neighbor*) list_random(contacts);
- MPIRPC* rpc = KDA_Lookup(n->node, key);
+ MPIRPC* rpc = KDA_Lookup(n->node, key);
MPIRPC_Wait(rpc);
- printf("cmpi_lookup()@%X -> %s \n", n->id, rpc->result);
-
+ printf("cmpi_lookup()@%X -> %s \n", n->id, rpc->result);
+
return 0;
}
@@ -128,8 +128,8 @@
{
NOTE_F;
- NOTE_XI("TRANSLATE: ", id, id);
-
+ NOTE_XI("TRANSLATE: ", id, id);
+
MPIRPC_Node* node = (MPIRPC_Node*) list_random(contacts);
if (node == NULL)
return CMPI_ERROR_NEIGHBORS;
@@ -137,18 +137,18 @@
MPIRPC* rpc = KDA_Translate(*node, (KDA_ID) id);
MPIRPC_Wait(rpc);
- int result = 0;
+ int result = 0;
sscanf(rpc->result, "%X", &result);
- return result;
+ return result;
}
CMPI_RETURN
-cmpi_put(char* key, char* value)
+cmpi_put(char* key, char* value, int length)
{
struct list_item* item;
NOTE_FSS(key, value);
- // printf("contacts: %i \n", contacts);
+ // printf("contacts: %i \n", contacts);
KDA_Neighbor* n = (KDA_Neighbor*) list_random(contacts);
MPIRPC* lookup = KDA_Lookup_k(n->node, key);
@@ -160,21 +160,21 @@
item; item = item->next)
{
KDA_Neighbor* neighbor =
- contact_lookup((KDA_Neighbor*) item->data);
- MPIRPC* rpc = KDA_Store(neighbor, key, value);
+ contact_lookup((KDA_Neighbor*) item->data);
+ MPIRPC* rpc = KDA_Store(neighbor, key, value, length);
MPIRPC_Wait(rpc);
MPIRPC_Free(rpc);
- NOTE("freed");
+ NOTE("freed");
}
- list_destroy(neighbors);
+ list_destroy(neighbors);
- DONE;
- return CMPI_SUCCESS;
+ DONE;
+ return CMPI_SUCCESS;
}
CMPI_RETURN
-cmpi_get(char* key, char** value)
+cmpi_get(char* key, char** value, int* length)
{
struct list_item* item;
NOTE_FS(key);
@@ -184,9 +184,9 @@
MPIRPC* lookup = KDA_Lookup_k(n->node, key);
MPIRPC_Wait(lookup);
- struct list* neighbors = KDA_Neighbor_node_list(lookup->result);
- NOTE_I("neighbors->size ", neighbors->size);
-
+ struct list* neighbors = KDA_Neighbor_node_list(lookup->result);
+ NOTE_I("neighbors->size ", neighbors->size);
+
for (item = neighbors->head;
item; item = item->next)
{
@@ -198,13 +198,13 @@
MPIRPC* rpc = KDA_Retrieve(neighbor, key);
MPIRPC_Wait(rpc);
*value = rpc->result;
- NOTE_S("value: ", *value);
+ NOTE_S("value: ", *value);
MPIRPC_Free(rpc);
}
- list_destroy(neighbors);
+ list_destroy(neighbors);
- NOTE("cmpi_get() done \n");
+ NOTE("cmpi_get() done \n");
return CMPI_SUCCESS;
}
@@ -213,25 +213,25 @@
{
while (contacts->size > 0)
{
- NOTE("LOOP");
- KDA_Neighbor* neighbor = (KDA_Neighbor*) list_poll(contacts);
+ NOTE("LOOP");
+ KDA_Neighbor* neighbor = (KDA_Neighbor*) list_poll(contacts);
KDA_Detach(neighbor->node);
- free(neighbor);
+ free(neighbor);
}
while (hubs->size > 0)
{
MPIRPC_Node* hub = (MPIRPC_Node*) list_poll(hubs);
KDA_Detach(*hub);
- free(hub);
+ free(hub);
}
-
- NOTE("DETACHED");
+
+ NOTE("DETACHED");
}
void
cmpi_shutdown()
{
- KDA_Shutdown();
+ KDA_Shutdown();
}
Modified: src/kda-2/conn-A.c
===================================================================
--- src/kda-2/conn-A.c 2010-04-20 20:43:44 UTC (rev 39)
+++ src/kda-2/conn-A.c 2010-04-27 21:27:26 UTC (rev 40)
@@ -1,87 +1,88 @@
-#include "conn-A.h"
+#include "kda_conn-A.h"
/**
Maps KDA_IDs to ints.
- Stores MPI ranks in MPI_COMM_WORLD for all known KDA_IDs.
+ Stores MPI ranks in MPI_COMM_WORLD for all known KDA_IDs.
*/
-struct hashtable* world_ranks;
+struct hashtable* world_ranks;
void
KDA_Init_conn()
{
- // Add self to world_ranks...
+ // Add self to world_ranks...
world_ranks = hashtable_create(mpi_size);
int* heap_mpi_rank = malloc(sizeof(int));
- *heap_mpi_rank = mpi_size;
- hashtable_add(world_ranks, xheap(id), heap_mpi_rank);
-
- MPIRPC_Comm_add(MPI_COMM_WORLD);
+ *heap_mpi_rank = mpi_size;
+ hashtable_add(world_ranks, xheap(id), heap_mpi_rank);
+ MPIRPC_Comm_add(MPI_COMM_WORLD);
+
MPIRPC_Register("get_id", handle_get_id);
- MPIRPC_Register("get_rank", handle_get_rank);
-
- sleep(mpi_rank * 10);
+ MPIRPC_Register("get_rank", handle_get_rank);
- if (mpi_rank > 0)
+ sleep(mpi_rank * 10);
+
+ if (mpi_rank > 0)
{
- int other_rank = rand_lt(mpi_rank);
- MPIRPC_Node* node = MPIRPC_Node_create(MPI_COMM_WORLD,
- other_rank);
- SHOW_I(other_rank);
+ int other_rank = rand_lt(mpi_rank);
+ MPIRPC_Node node;
+ MPIRPC_Node_make(MPI_COMM_WORLD, other_rank, &node);
+ SHOW_I(other_rank);
char* result = MPIRPC_Block(node, "get_id", NULL);
KDA_ID other_id;
- sscanf(result, "%X", &other_id);
+ sscanf(result, "%X", &other_id);
KDA_Join(other_id, other_rank);
}
-
- listen_loop();
+
+ listen_loop();
}
/**
- Attach
+ Attach
*/
-MPIRPC_Node*
+MPIRPC_Node*
KDA_Attach()
{
- int other_rank = rand_lt(mpi_rank);
- MPIRPC_Node* node = MPIRPC_Node_create(MPI_COMM_WORLD,
- other_rank);
+ int other_rank = rand_lt(mpi_rank);
+ MPIRPC_Node node;
+ MPIRPC_Node_make(MPI_COMM_WORLD, other_rank, &node);
char* result = MPIRPC_Block(node, "get_id", NULL);
KDA_ID other_id;
- sscanf(result, "%X", &other_id);
+ sscanf(result, "%X", &other_id);
char* args = malloc(20*sizeof(int));
- sprintf(args, "%X %i", KDA_ID_CLIENT, mpi_rank);
- MPIRPC_Block(node, "join", args);
-
- return NULL;
+ sprintf(args, "%X %i", KDA_ID_CLIENT, mpi_rank);
+ MPIRPC_Block(node, "join", args);
+
+ return NULL;
}
void
-KDA_Harpoon(MPIRPC_Node* reference, KDA_ID other_id)
+KDA_Harpoon(MPIRPC_Node reference, KDA_ID other_id)
{
KDA_ID* heap_id = malloc(sizeof(KDA_ID));
- *heap_id = other_id;
- char* result = MPIRPC_Block(reference,
- "get_rank", xheap(other_id));
+ *heap_id = other_id;
+ char* result = MPIRPC_Block(reference, "get_rank", xheap(other_id));
int* other_rank = malloc(sizeof(int));
- sscanf(result, "%i", other_rank);
+ sscanf(result, "%i", other_rank);
hashtable_add(world_ranks, xheap(other_id), other_rank);
- KDA_Join(other_id, *other_rank);
+ KDA_Join(other_id, *other_rank);
}
void
KDA_Join(int other_id, int other_rank)
{
- MPIRPC_Node* node1 = MPIRPC_Node_create(MPI_COMM_WORLD, other_rank);
+ MPIRPC_Node node1, node2;
+ MPIRPC_Node_make(MPI_COMM_WORLD, other_rank, &node1);
char* args = malloc(20*sizeof(int));
- sprintf(args, "%X %i", id, mpi_rank);
- MPIRPC_Block(node1, "join", args);
+ sprintf(args, "%X %i", id, mpi_rank);
+ MPIRPC_Block(node1, "join", args);
- MPI_Comm newcomm = KDA_Comm_create(&other_rank);
- MPIRPC_Node* node2 = MPIRPC_Node_create(newcomm, 0);
- KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node2);
+ MPI_Comm newcomm = KDA_Comm_create(&other_rank);
+ MPIRPC_Node_make(newcomm, 0, &node2);
+ KDA_ID kda_id = (KDA_ID) other_id;
+ KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node2);
neighbor_add(neighbor);
}
@@ -89,49 +90,48 @@
KDA_Comm_create(int* other_rank)
{
MPI_Group world_group, newgroup;
- MPI_Comm newcomm;
+ MPI_Comm newcomm;
MPI_Comm_group(MPI_COMM_WORLD, &world_group);
MPI_Group_incl(world_group, 1, other_rank, &newgroup);
MPI_Comm_create(MPI_COMM_WORLD, newgroup, &newcomm);
MPIRPC_Comm_add(newcomm);
- return newcomm;
+ return newcomm;
}
void
-handle_join(MPIRPC_Node* caller, int unique, char* args)
+handle_join(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length)
{
KDA_ID other_id;
- int* other_rank = malloc(sizeof(int));
+ int* other_rank = malloc(sizeof(int));
sscanf(args, "%X %i", &other_id, other_rank);
- MPI_Comm newcomm = KDA_Comm_create(other_rank);
+ MPI_Comm newcomm = KDA_Comm_create(other_rank);
if (other_id != KDA_ID_CLIENT)
{
- MPIRPC_Node* node = MPIRPC_Node_create(newcomm, 0);
- KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
- neighbor_add(neighbor);
+ MPIRPC_Node node;
+ MPIRPC_Node_make(newcomm, 0, &node);
+ KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
+ neighbor_add(neighbor);
hashtable_add(world_ranks, xheap(other_id), other_rank);
}
-
- MPIRPC_Return(caller, unique, NULL);
+
+ MPIRPC_Null(caller, unique);
}
void
-handle_get_rank(MPIRPC_Node* caller, int unique, char* args)
+handle_get_rank(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length)
{
- int* other_rank = (int*) hashtable_search(world_ranks, args);
+ int* other_rank = (int*) hashtable_search(world_ranks, args);
char* result = malloc(10*sizeof(char));
- sprintf(result, "%i", *other_rank);
- MPIRPC_Return(caller, unique, result);
+ int length = sprintf(result, "%i", *other_rank);
+ MPIRPC_Return(caller, unique, result, length+1);
}
void
-handle_get_id(MPIRPC_Node* caller, int unique, char* args)
+handle_get_id(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length)
{
char* result = malloc(10*sizeof(char));
- sprintf(result, "%X", id);
- MPIRPC_Return(caller, unique, result);
+ int length = sprintf(result, "%X", id);
+ MPIRPC_Return(caller, unique, result, length+1);
}
-
-
Modified: src/kda-2/conn-B.c
===================================================================
--- src/kda-2/conn-B.c 2010-04-20 20:43:44 UTC (rev 39)
+++ src/kda-2/conn-B.c 2010-04-27 21:27:26 UTC (rev 40)
@@ -1,40 +1,42 @@
-/** Connection techniques for KDA-2B DHT.
+/**
+ Connection techniques for KDA-2B DHT.
Uses HUBs to manage initial bootstrap connections.
- Uses three-way link/accept/connect/join to add neighbors.
- */
+ Uses three-way link/accept/connect/join to add neighbors.
+*/
-#include <mpi.h>
-#include "kda_conn-B.h"
+#include <kda_conn-B.h>
-int unique_client;
-char port[MPI_MAX_PORT_NAME];
+#include <string.h>
+int unique_client;
+char port[MPI_MAX_PORT_NAME];
+
//// Internal prototypes
-int thirdhash(char* s);
+int thirdhash(char* s);
void
KDA_Init_conn()
{
- NOTE_F;
-
- KDA_Setup_node_port();
-
- char kda_port[MPI_MAX_PORT_NAME];
+ NOTE_F;
+
+ KDA_Setup_node_port();
+
+ char kda_port[MPI_MAX_PORT_NAME];
int err = MPI_Lookup_name("kademlia", MPI_INFO_NULL, kda_port);
if (err == MPI_SUCCESS)
{
// This node will be a DHT node:
- KDA_Node_init();
+ KDA_Node_init();
KDA_Connect_port(kda_port);
- KDA_Ping(id);
- listen_loop();
+ KDA_Ping(id);
+ listen_loop();
}
else
{
// This node is a HUB:
- KDA_Hub_init();
- KDA_Hub_loop();
+ KDA_Hub_init();
+ KDA_Hub_loop();
}
}
@@ -43,7 +45,7 @@
{
MPIRPC_Register("accept", handle_accept);
MPIRPC_Register("connect", handle_connect);
- MPIRPC_Register("join", handle_join);
+ MPIRPC_Register("join", handle_join);
}
void
@@ -56,8 +58,8 @@
void
KDA_Connect_port(char* port)
{
- NOTE_F;
- MPI_Comm newcomm;
+ NOTE_F;
+ MPI_Comm newcomm;
MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &newcomm);
// MPI_Comm_set_errhandler(newcomm, MPI_ERRORS_RETURN);
MPIRPC_Comm_add(newcomm);
@@ -65,22 +67,22 @@
MPI_Send(&id, 1, MPI_INT, 0,
CMPI_TAG_ID, newcomm);
MPI_Send(&cmpi_msg_join, 1, MPI_INT, 0,
- CMPI_TAG_CONTROL, newcomm);
+ CMPI_TAG_CONTROL, newcomm);
}
void
KDA_Hub_init(void)
{
- NOTE_F;
- MPI_Open_port(MPI_INFO_NULL, port);
+ NOTE_F;
+ MPI_Open_port(MPI_INFO_NULL, port);
MPI_Publish_name("kademlia", MPI_INFO_NULL, port);
- NOTE_S("Published kademlia on: %s", port);
+ NOTE_S("Published kademlia on: %s", port);
}
void
KDA_Init_conn_client()
{
- NOTE_F;
+ NOTE_F;
KDA_Setup_node_port();
}
@@ -91,9 +93,9 @@
MPI_Open_port(MPI_INFO_NULL, port);
char portnumber[10];
memset(portnumber, '\0', (size_t) 10);
- int p = thirdhash(port);
+ int p = thirdhash(port);
strncpy(portnumber, port+p+1, 5);
- NOTE_S("Using port: ", portnumber);
+ NOTE_S("Using port: ", portnumber);
}
void
@@ -105,50 +107,50 @@
KDA_ID other_id;
while (! cmpi_quitting)
{
- MPIRPC_Node node = KDA_Serve_accept();
- KDA_Serve_handshake(node, &other_id, &msg);
+ MPIRPC_Node node = KDA_Serve_accept();
+ KDA_Serve_handshake(node, &other_id, &msg);
switch (msg)
{
case CMPI_MSG_JOIN:
KDA_Serve_id(other_id, node);
- break;
+ break;
case CMPI_MSG_SHUTDOWN:
- KDA_Serve_shutdown();
+ KDA_Serve_shutdown();
break;
}
}
}
MPIRPC_Node
-KDA_Serve_accept()
+KDA_Serve_accept()
{
- NOTE_F;
+ NOTE_F;
MPI_Comm newcomm;
MPI_Comm_accept(port, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
// MPI_Comm_set_errhandler(newcomm, MPI_ERRORS_RETURN);
char name[10];
- sprintf(name, "client_%i", unique_client++);
+ sprintf(name, "client_%i", unique_client++);
MPI_Comm_set_name(newcomm, name);
- MPIRPC_Node node;
- MPIRPC_Node_make(newcomm, 0, &node);
+ MPIRPC_Node node;
+ MPIRPC_Node_make(newcomm, 0, &node);
SHOW_S(name);
- SHOW_X(newcomm);
-
+ SHOW_X(newcomm);
+
return node;
}
/**
TODO: Replace Recv()s with new reliable timeout Trecv()s.
@param other_id OUT
- @param msg OUT
+ @param msg OUT
*/
void
-KDA_Serve_handshake(MPIRPC_Node node, KDA_ID* other_id, int* msg)
+KDA_Serve_handshake(MPIRPC_Node node, KDA_ID* other_id, int* msg)
{
MPI_Status status;
- MPI_Comm comm = node.comm;
+ MPI_Comm comm = node.comm;
MPI_Recv(other_id, 1, MPI_INT, 0, CMPI_TAG_ID, comm, &status);
MPI_Recv(msg, 1, MPI_INT, 0, CMPI_TAG_CONTROL, comm, &status);
}
@@ -156,55 +158,55 @@
void
KDA_Serve_id(KDA_ID other_id, MPIRPC_Node node)
{
- assert(other_id != KDA_ID_NULL);
-
+ assert(other_id != KDA_ID_NULL);
+
if (other_id == KDA_ID_CLIENT)
KDA_Serve_client(node);
else
- KDA_Serve_neighbor(other_id, node);
+ KDA_Serve_neighbor(other_id, node);
}
void
KDA_Serve_client(MPIRPC_Node node)
{
- NOTE_F;
+ NOTE_F;
MPIRPC_Comm_add(node.comm);
KDA_Neighbor* neighbor = random_neighbor();
if (neighbor)
KDA_Link(neighbor->node, node);
else
- NOTE("Could not get random neighbor.");
+ NOTE("Could not get random neighbor.");
}
void
KDA_Serve_neighbor(KDA_ID other_id, MPIRPC_Node node)
{
- NOTE_F;
+ NOTE_F;
KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
MPIRPC_Comm_add(node.comm);
KDA_Neighbor* rand_neighbor = random_neighbor();
if (rand_neighbor)
KDA_Link(rand_neighbor->node, node);
else
- NOTE("Could not get random neighbor.");
+ NOTE("Could not get random neighbor.");
if (! neighbor_add(neighbor))
free(neighbor);
dump_buckets();
}
/**
- Attach
+ Attach
*/
MPIRPC_Node*
KDA_Attach()
{
- NOTE_F;
+ NOTE_F;
- KDA_Client_init();
-
- MPIRPC_Node* result = NULL;
-
- char kda_port[MPI_MAX_PORT_NAME];
+ KDA_Client_init();
+
+ MPIRPC_Node* result = NULL;
+
+ char kda_port[MPI_MAX_PORT_NAME];
int err = MPI_Lookup_name("kademlia", MPI_INFO_NULL, kda_port);
if (err == MPI_SUCCESS)
{
@@ -212,183 +214,190 @@
while (! MPIRPC_Check()); // connect call
while (! MPIRPC_Check()); // join call
}
-
- return result;
+
+ return result;
}
MPIRPC_Node*
KDA_Attach_port(char* port)
{
NOTE_F;
-
- MPI_Comm newcomm;
+
+ MPI_Comm newcomm;
MPI_Comm_connect(port, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
NOTE("connected to HUB");
NOTE_I("id: ", id);
- MPIRPC_Node* hub = KDA_Register_hub(newcomm);
-
+ MPIRPC_Node* hub = KDA_Register_hub(newcomm);
+
// MPI_Comm_set_errhandler(newcomm, MPI_ERRORS_RETURN);
- MPI_Send(&id, 1, MPI_INT, 0,
+ MPI_Send(&id, 1, MPI_INT, 0,
CMPI_TAG_ID, newcomm);
- MPI_Send(&cmpi_msg_join, 1, MPI_INT, 0,
+ MPI_Send(&cmpi_msg_join, 1, MPI_INT, 0,
CMPI_TAG_CONTROL, newcomm);
- return hub;
+ return hub;
}
/**
- Register this comm as a hub.
-*/
-MPIRPC_Node*
-KDA_Register_hub(MPI_Comm comm)
+ Register this comm as a hub.
+*/
+MPIRPC_Node*
+KDA_Register_hub(MPI_Comm comm)
{
MPI_Comm_set_name(comm, "HUB");
MPIRPC_Node* hub = MPIRPC_Node_create(comm, 0);
- list_add(hubs, hub);
+ list_add(hubs, hub);
MPIRPC_Comm_add(comm);
- return hub;
+ return hub;
}
/**
Link the caller node to the requested node.
*/
void
-handle_link(MPIRPC_Node caller, int unique, char* args)
+handle_link(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length)
{
- NOTE_F;
+ NOTE_F;
KDA_ID rID;
sscanf(args, "%X", &rID);
NOTE_FX(rID);
- // dump_buckets();
+ // dump_buckets();
// KDA_Neighbor* neighbor = KDA_Neighbor_ID(rID);
// KDA_Perform_link(caller, neighbor->node);
- MPIRPC_Return(caller, unique, NULL);
+ MPIRPC_Null(caller, unique);
}
void
KDA_Harpoon(MPIRPC_Node node, KDA_ID other_id)
{
MPIRPC_Block(node, "link", xheap(other_id));
- NOTE("rpc_link_neighbor() done");
+ NOTE("rpc_link_neighbor() done");
}
/**
- Connect two neighbors in the DHT, if neither is NULL.
- @param n1 The neighbor to issue the accept().
+ Connect two neighbors in the DHT, if neither is NULL.
+ @param n1 The neighbor to issue the accept().
@param n2 The neighbor to issue the connect().
*/
void
KDA_Link(MPIRPC_Node node1, MPIRPC_Node node2)
{
NOTE_F;
-
- char* port = MPIRPC_Block(node1, "accept", NULL);
- char* result = MPIRPC_Block(node2, "connect", port);
- dump_buckets();
+ char* port = MPIRPC_Block(node1, "accept", NULL);
+ char* result = MPIRPC_Block(node2, "connect", port);
+ dump_buckets();
+
free(port);
if (result)
- free(result);
+ free(result);
}
void
-handle_accept(MPIRPC_Node caller, int unique, char* args)
+handle_accept(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length)
{
- NOTE_F;
- MPIRPC_Return(caller, unique, port);
- MPIRPC_Flush_returns();
- MPI_Comm newcomm;
+ NOTE_F;
+ MPIRPC_Return(caller, unique, heap(port), strlen(port)+1);
+ MPIRPC_Flush_returns();
+ MPI_Comm newcomm;
MPI_Comm_accept(port, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
NOTE("accepted newcomm");
- MPI_Comm_set_name(newcomm, "COMM_ACCEPTED");
+ MPI_Comm_set_name(newcomm, "COMM_ACCEPTED");
MPIRPC_Comm_add(newcomm);
KDA_Join(newcomm);
}
void
-handle_connect(MPIRPC_Node caller, int unique, char* args)
+handle_connect(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length)
{
- NOTE_FS(args);
- MPI_Comm newcomm;
+ NOTE_FS(args);
+ MPI_Comm newcomm;
int err = MPI_Comm_connect(args, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
- MPI_Comm_set_name(newcomm, "COMM_CONNECTED");
+ MPI_Comm_set_name(newcomm, "COMM_CONNECTED");
+ char* result = malloc(16*sizeof(char));
+ int length;
if (err == MPI_SUCCESS)
{
MPIRPC_Comm_add(newcomm);
- MPIRPC_Return(caller, unique, "success");
+ length = sprintf(result, "success");
}
else
- MPIRPC_Return(caller, unique, "fail");
+ length = sprintf(result, "fail");
+ MPIRPC_Return(caller, unique, result, strlen(result)+1);
}
KDA_ID
rpc_join(MPIRPC_Node node)
{
KDA_ID other_id;
- char* result = MPIRPC_Block(node, "join", id_tostring());
+ char* result = MPIRPC_Block(node, "join", id_tostring());
sscanf(result, "%X", &other_id);
free(result);
- NOTE("rpc_join returning");
-
- return other_id;
+ NOTE("rpc_join returning");
+
+ return other_id;
}
void
-handle_join(MPIRPC_Node caller, int unique, char* args)
+handle_join(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length)
{
NOTE_FS(args);
KDA_ID other_id;
sscanf(args, "%X", &other_id);
- KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, caller);
+ KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, caller);
if (other_id != KDA_ID_CLIENT)
{
if (! neighbor_add(neighbor))
- free(neighbor);
+ free(neighbor);
}
else
{
- client_add(neighbor);
+ client_add(neighbor);
}
- KDA_Comm_set_name(neighbor);
+ KDA_Comm_set_name(neighbor);
- dump_buckets();
-
- MPIRPC_Return(caller, unique, xheap(id));
- MPIRPC_Flush_returns();
- // KDA_Ping(id);
+ dump_buckets();
+
+ char* result = xheap(id);
+ MPIRPC_Return(caller, unique, result, strlen(result)+1);
+ MPIRPC_Flush_returns();
+ // KDA_Ping(id);
}
void
KDA_Join(MPI_Comm comm)
{
NOTE_F;
-
+
MPIRPC_Node node;
- MPIRPC_Node_make(comm, 0, &node);
+ MPIRPC_Node_make(comm, 0, &node);
KDA_ID other_id = rpc_join(node);
- assert(other_id != KDA_ID_NULL);
-
+ assert(other_id != KDA_ID_NULL);
+
if (other_id != KDA_ID_CLIENT)
{
KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
- // dump_buckets();
-
+ // dump_buckets();
+
if (! neighbor_add(neighbor))
{
NOTE("ALREADY HAVE NEIGHBOR? ");
- KDA_Neighbor* tmp = neighbor;
- neighbor = neighbor_lookup(neighbor);
+ KDA_Neighbor* tmp = neighbor;
+ neighbor = neighbor_lookup(neighbor);
neighbor_set_node(neighbor, node);
free(tmp);
}
@@ -398,37 +407,37 @@
}
else
{
- MPI_Comm_set_name(node.comm, "KDA_CLIENT");
+ MPI_Comm_set_name(node.comm, "KDA_CLIENT");
}
}
/**
Finds index of the third hash character in the string.
- Useful for obtaining the port number from an MPI port name.
+ Useful for obtaining the port number from an MPI port name.
*/
int
thirdhash(char* s)
{
char* p = s;
- int hashes = 0;
+ int hashes = 0;
while (p++)
{
if (*p == '#')
hashes++;
if (hashes == 3)
- return p-s;
+ return p-s;
}
- printf("ERROR in thirdhash() \n");
- exit(1);
- return -1;
+ printf("ERROR in thirdhash() \n");
+ exit(1);
+ return -1;
}
void
KDA_Shutdown()
{
- NOTE_F;
- char hub_port[MPI_MAX_PORT_NAME];
+ NOTE_F;
+ char hub_port[MPI_MAX_PORT_NAME];
int err = MPI_Lookup_name("kademlia", MPI_INFO_NULL, hub_port);
if (err == MPI_SUCCESS)
{
@@ -436,17 +445,17 @@
}
else
{
- note("shutdown error.");
+ note("shutdown error.");
}
}
void
KDA_Shutdown_port(char* hub_port)
{
- MPI_Comm newcomm;
+ MPI_Comm newcomm;
MPI_Comm_connect(hub_port, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
- NOTE("connected");
+ NOTE("connected");
// MPI_Comm_set_errhandler(newcomm, MPI_ERRORS_RETURN);
MPI_Comm_set_name(newcomm, "HUB_SHUTDOWN");
MPI_Send(&id, 1, MPI_INT, 0,
@@ -458,26 +467,26 @@
void
KDA_Serve_shutdown()
{
- NOTE_F;
-
+ NOTE_F;
+
int i;
struct list_item* item;
- NOTE_F;
- cmpi_quitting = true;
+ NOTE_F;
+ cmpi_quitting = true;
for (i = 0; i < KDA_SPACE_SIZE; i++)
for (item = k_bucket[i]->head;
item; item = item->next)
{
- KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
+ KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
MPIRPC_Block(neighbor->node, "shutdown", NULL);
- //KDA_Neighbor_dump(neighbor);
+ //KDA_Neighbor_dump(neighbor);
}
}
void
KDA_Detach(MPIRPC_Node node)
{
- MPIRPC_Channel* channel = MPIRPC_Channel_for_node(node);
- MPIRPC_Close(channel);
+ MPIRPC_Channel* channel = MPIRPC_Channel_for_node(node);
+ MPIRPC_Close(channel);
}
Modified: src/kda-2/conn-C.c
===================================================================
--- src/kda-2/conn-C.c 2010-04-20 20:43:44 UTC (rev 39)
+++ src/kda-2/conn-C.c 2010-04-27 21:27:26 UTC (rev 40)
@@ -1,40 +1,41 @@
-/** Connection techniques for KDA-2B DHT.
+/**
+ Connection techniques for KDA-2B DHT.
Uses HUBs to manage initial bootstrap connections.
- Uses three-way link/accept/connect/join to add neighbors.
+ Uses three-way link/accept/connect/join to add neighbors.
*/
-#include "kademlia.h"
+#include "kda-2.h"
#include "conn-B.h"
-int unique_client;
-char port[MPI_MAX_PORT_NAME];
+int unique_client;
+char port[MPI_MAX_PORT_NAME];
//// Internal prototypes
-int thirdhash(char* s);
+int thirdhash(char* s);
void
KDA_Init_conn()
{
- NOTE_F;
-
- KDA_Setup_node_port();
-
- char kda_port[MPI_MAX_PORT_NAME];
+ NOTE_F;
+
+ KDA_Setup_node_port();
+
+ char kda_port[MPI_MAX_PORT_NAME];
int err = MPI_Lookup_name("kademlia", MPI_INFO_NULL, kda_port);
if (err == MPI_SUCCESS)
{
// This node will be a DHT node:
- KDA_Node_init();
+ KDA_Node_init();
KDA_Connect_port(kda_port);
- KDA_Ping(id);
- listen_loop();
+ KDA_Ping(id);
+ listen_loop();
}
else
{
// This node is a HUB:
- KDA_Hub_init();
- KDA_Hub_loop();
+ KDA_Hub_init();
+ KDA_Hub_loop();
}
}
@@ -56,8 +57,8 @@
void
KDA_Connect_port(char* port)
{
- NOTE_F;
- MPI_Comm newcomm;
+ NOTE_F;
+ MPI_Comm newcomm;
MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &newcomm);
// MPI_Comm_set_errhandler(newcomm, MPI_ERRORS_RETURN);
MPIRPC_Comm_add(newcomm);
@@ -65,22 +66,22 @@
MPI_Send(&id, 1, MPI_INT, 0,
CMPI_TAG_ID, newcomm);
MPI_Send(&cmpi_msg_join, 1, MPI_INT, 0,
- CMPI_TAG_CONTROL, newcomm);
+ CMPI_TAG_CONTROL, newcomm);
}
void
KDA_Hub_init(void)
{
- NOTE_F;
- MPI_Open_port(MPI_INFO_NULL, port);
+ NOTE_F;
+ MPI_Open_port(MPI_INFO_NULL, port);
MPI_Publish_name("kademlia", MPI_INFO_NULL, port);
- NOTE_S("Published kademlia on: %s", port);
+ NOTE_S("Published kademlia on: %s", port);
}
void
KDA_Init_conn_client()
{
- NOTE_F;
+ NOTE_F;
KDA_Setup_node_port();
}
@@ -91,9 +92,9 @@
MPI_Open_port(MPI_INFO_NULL, port);
char portnumber[10];
memset(portnumber, '\0', (size_t) 10);
- int p = thirdhash(port);
+ int p = thirdhash(port);
strncpy(portnumber, port+p+1, 5);
- NOTE_S("Using port: ", portnumber);
+ NOTE_S("Using port: ", portnumber);
}
void
@@ -105,36 +106,36 @@
KDA_ID other_id;
while (! cmpi_quitting)
{
- MPIRPC_Node* node = KDA_Serve_accept();
- KDA_Serve_handshake(node, &other_id, &msg);
+ MPIRPC_Node* node = KDA_Serve_accept();
+ KDA_Serve_handshake(node, &other_id, &msg);
switch (msg)
{
case CMPI_MSG_JOIN:
KDA_Serve_id(other_id, node);
- break;
+ break;
case CMPI_MSG_SHUTDOWN:
- KDA_Serve_shutdown();
+ KDA_Serve_shutdown();
break;
}
}
}
MPIRPC_Node*
-KDA_Serve_accept()
+KDA_Serve_accept()
{
- NOTE_F;
+ NOTE_F;
MPI_Comm newcomm;
MPI_Comm_accept(port, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
// MPI_Comm_set_errhandler(newcomm, MPI_ERRORS_RETURN);
char name[10];
- sprintf(name, "client_%i", unique_client++);
+ sprintf(name, "client_%i", unique_client++);
MPI_Comm_set_name(newcomm, name);
- MPIRPC_Node* node = MPIRPC_Node_create(newcomm, 0);
+ MPIRPC_Node* node = MPIRPC_Node_create(newcomm, 0);
SHOW_S(name);
- SHOW_X(newcomm);
-
+ SHOW_X(newcomm);
+
return node;
}
@@ -142,10 +143,10 @@
TODO: Replace Recv()s with new reliable timeout Trecv()s.
*/
void
-KDA_Serve_handshake(MPIRPC_Node* node, KDA_ID* other_id, int* msg)
+KDA_Serve_handshake(MPIRPC_Node* node, KDA_ID* other_id, int* msg)
{
MPI_Status status;
- MPI_Comm comm = node->comm;
+ MPI_Comm comm = node->comm;
MPI_Recv(other_id, 1, MPI_INT, 0, CMPI_TAG_ID, comm, &status);
MPI_Recv(msg, 1, MPI_INT, 0, CMPI_TAG_CONTROL, comm, &status);
}
@@ -153,55 +154,55 @@
void
KDA_Serve_id(KDA_ID other_id, MPIRPC_Node* node)
{
- assert(other_id != KDA_ID_NULL);
-
+ assert(other_id != KDA_ID_NULL);
+
if (other_id == KDA_ID_CLIENT)
KDA_Serve_client(node);
else
- KDA_Serve_neighbor(other_id, node);
+ KDA_Serve_neighbor(other_id, node);
}
void
KDA_Serve_client(MPIRPC_Node* node)
{
- NOTE_F;
+ NOTE_F;
MPIRPC_Comm_add(node->comm);
KDA_Neighbor* neighbor = random_neighbor();
if (neighbor)
KDA_Link(neighbor->node, node);
else
- NOTE("Could not get random neighbor.");
+ NOTE("Could not get random neighbor.");
}
void
KDA_Serve_neighbor(KDA_ID other_id, MPIRPC_Node* node)
{
- NOTE_F;
+ NOTE_F;
KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
MPIRPC_Comm_add(node->comm);
KDA_Neighbor* rand_neighbor = random_neighbor();
if (rand_neighbor)
KDA_Link(rand_neighbor->node, node);
else
- NOTE("Could not get random neighbor.");
+ NOTE("Could not get random neighbor.");
if (! neighbor_add(neighbor))
free(neighbor);
dump_buckets();
}
/**
- Attach
+ Attach
*/
MPIRPC_Node*
KDA_Attach()
{
- NOTE_F;
+ NOTE_F;
- KDA_Client_init();
-
- MPIRPC_Node* result = NULL;
-
- char kda_port[MPI_MAX_PORT_NAME];
+ KDA_Client_init();
+
+ MPIRPC_Node* result = NULL;
+
+ char kda_port[MPI_MAX_PORT_NAME];
int err = MPI_Lookup_name("kademlia", MPI_INFO_NULL, kda_port);
if (err == MPI_SUCCESS)
{
@@ -209,43 +210,43 @@
while (! MPIRPC_Check()); // connect call
while (! MPIRPC_Check()); // join call
}
-
- return result;
+
+ return result;
}
MPIRPC_Node*
KDA_Attach_port(char* port)
{
NOTE_F;
-
- MPI_Comm newcomm;
+
+ MPI_Comm newcomm;
MPI_Comm_connect(port, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
NOTE("connected to HUB");
NOTE_I("id: ", id);
- MPIRPC_Node* hub = KDA_Register_hub(newcomm);
-
+ MPIRPC_Node* hub = KDA_Register_hub(newcomm);
+
// MPI_Comm_set_errhandler(newcomm, MPI_ERRORS_RETURN);
- MPI_Send(&id, 1, MPI_INT, 0,
+ MPI_Send(&id, 1, MPI_INT, 0,
CMPI_TAG_ID, newcomm);
- MPI_Send(&cmpi_msg_join, 1, MPI_INT, 0,
+ MPI_Send(&cmpi_msg_join, 1, MPI_INT, 0,
CMPI_TAG_CONTROL, newcomm);
- return hub;
+ return hub;
}
/**
- Register this comm as a hub.
-*/
-MPIRPC_Node*
-KDA_Register_hub(MPI_Comm comm)
+ Register this comm as a hub.
+*/
+MPIRPC_Node*
+KDA_Register_hub(MPI_Comm comm)
{
MPI_Comm_set_name(comm, "HUB");
MPIRPC_Node* hub = MPIRPC_Node_create(comm, 0);
- list_add(hubs, hub);
+ list_add(hubs, hub);
MPIRPC_Comm_add(comm);
- return hub;
+ return hub;
}
/**
@@ -254,11 +255,11 @@
void
handle_link(MPIRPC_Node* caller, int unique, char* args)
{
- NOTE_F;
+ NOTE_F;
KDA_ID rID;
sscanf(args, "%X", &rID);
NOTE_FX(rID);
- // dump_buckets();
+ // dump_buckets();
// KDA_Neighbor* neighbor = KDA_Neighbor_ID(rID);
// KDA_Perform_link(caller, neighbor->node);
MPIRPC_Return(caller, unique, NULL);
@@ -268,42 +269,42 @@
KDA_Harpoon(MPIRPC_Node* node, KDA_ID other_id)
{
MPIRPC_Block(node, "link", xheap(other_id));
- NOTE("rpc_link_neighbor() done");
+ NOTE("rpc_link_neighbor() done");
}
/**
- Connect two neighbors in the DHT, if neither is NULL.
- @param n1 The neighbor to issue the accept().
+ Connect two neighbors in the DHT, if neither is NULL.
+ @param n1 The neighbor to issue the accept().
@param n2 The neighbor to issue the connect().
*/
void
KDA_Link(MPIRPC_Node* node1, MPIRPC_Node* node2)
{
NOTE_F;
-
+
if (node1 == NULL || node2 == NULL)
- return;
+ return;
- char* port = MPIRPC_Block(node1, "accept", NULL);
- char* result = MPIRPC_Block(node2, "connect", port);
+ char* port = MPIRPC_Block(node1, "accept", NULL);
+ char* result = MPIRPC_Block(node2, "connect", port);
- dump_buckets();
+ dump_buckets();
free(port);
- free(result);
+ free(result);
}
void
handle_accept(MPIRPC_Node* caller, int unique, char* args)
{
- NOTE_F;
+ NOTE_F;
MPIRPC_Return(caller, unique, port);
- MPIRPC_Flush_returns();
- MPI_Comm newcomm;
+ MPIRPC_Flush_returns();
+ MPI_Comm newcomm;
MPI_Comm_accept(port, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
NOTE("accepted newcomm");
- MPI_Comm_set_name(newcomm, "COMM_ACCEPTED");
+ MPI_Comm_set_name(newcomm, "COMM_ACCEPTED");
MPIRPC_Comm_add(newcomm);
KDA_Join(newcomm);
}
@@ -311,12 +312,12 @@
void
handle_connect(MPIRPC_Node* caller, int unique, char* args)
{
- NOTE_FS(args);
- MPI_Comm newcomm;
+ NOTE_FS(args);
+ MPI_Comm newcomm;
int err = MPI_Comm_connect(args, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
- MPI_Comm_set_name(newcomm, "COMM_CONNECTED");
+ MPI_Comm_set_name(newcomm, "COMM_CONNECTED");
if (err == MPI_SUCCESS)
{
MPIRPC_Comm_add(newcomm);
@@ -330,13 +331,13 @@
rpc_join(MPIRPC_Node* node)
{
KDA_ID other_id;
- char* result = MPIRPC_Block(node, "join", id_tostring());
+ char* result = MPIRPC_Block(node, "join", id_tostring());
sscanf(result, "%X", &other_id);
free(result);
- NOTE("rpc_join returning");
-
- return other_id;
+ NOTE("rpc_join returning");
+
+ return other_id;
}
void
@@ -347,47 +348,47 @@
KDA_ID other_id;
sscanf(args, "%X", &other_id);
- KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, caller);
+ KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, caller);
if (other_id != KDA_ID_CLIENT)
{
if (! neighbor_add(neighbor))
- free(neighbor);
+ free(neighbor);
}
else
{
- client_add(neighbor);
+ client_add(neighbor);
}
- KDA_Comm_set_name(neighbor);
+ KDA_Comm_set_name(neighbor);
- dump_buckets();
-
+ dump_buckets();
+
MPIRPC_Return(caller, unique, xheap(id));
- MPIRPC_Flush_returns();
- // KDA_Ping(id);
+ MPIRPC_Flush_returns();
+ // KDA_Ping(id);
}
void
KDA_Join(MPI_Comm comm)
{
NOTE_F;
-
- MPIRPC_Node* node = MPIRPC_Node_create(comm, 0);
+
+ MPIRPC_Node* node = MPIRPC_Node_create(comm, 0);
KDA_ID other_id = rpc_join(node);
- assert(other_id != KDA_ID_NULL);
-
+ assert(other_id != KDA_ID_NULL);
+
if (other_id != KDA_ID_CLIENT)
{
KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
- // dump_buckets();
-
+ // dump_buckets();
+
if (! neighbor_add(neighbor))
{
NOTE("ALREADY HAVE NEIGHBOR? ");
- KDA_Neighbor* tmp = neighbor;
- neighbor = neighbor_lookup(neighbor);
- neighbor_set_node(neighbor, node);
+ KDA_Neighbor* tmp = neighbor;
+ neighbor = neighbor_lookup(neighbor);
+ neighbor_set_node(neighbor, node);
free(tmp);
}
KDA_Comm_set_name(neighbor);
@@ -396,37 +397,37 @@
}
else
{
- MPI_Comm_set_name(node->comm, "KDA_CLIENT");
+ MPI_Comm_set_name(node->comm, "KDA_CLIENT");
}
}
/**
Finds index of the third hash character in the string.
- Useful for obtaining the port number from an MPI port name.
+ Useful for obtaining the port number from an MPI port name.
*/
int
thirdhash(char* s)
{
char* p = s;
- int hashes = 0;
+ int hashes = 0;
while (p++)
{
if (*p == '#')
hashes++;
if (hashes == 3)
- return p-s;
+ return p-s;
}
- printf("ERROR in thirdhash() \n");
- exit(1);
- return -1;
+ printf("ERROR in thirdhash() \n");
+ exit(1);
+ return -1;
}
void
KDA_Shutdown()
{
- NOTE_F;
- char hub_port[MPI_MAX_PORT_NAME];
+ NOTE_F;
+ char hub_port[MPI_MAX_PORT_NAME];
int err = MPI_Lookup_name("kademlia", MPI_INFO_NULL, hub_port);
if (err == MPI_SUCCESS)
{
@@ -434,17 +435,17 @@
}
else
{
- note("shutdown error.");
+ note("shutdown error.");
}
}
void
KDA_Shutdown_port(char* hub_port)
{
- MPI_Comm newcomm;
+ MPI_Comm newcomm;
MPI_Comm_connect(hub_port, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &newcomm);
- NOTE("connected");
+ NOTE("connected");
// MPI_Comm_set_errhandler(newcomm, MPI_ERRORS_RETURN);
MPI_Comm_set_name(newcomm, "HUB_SHUTDOWN");
MPI_Send(&id, 1, MPI_INT, 0,
@@ -456,26 +457,26 @@
void
KDA_Serve_shutdown()
{
- NOTE_F;
-
+ NOTE_F;
+
int i;
struct list_item* item;
- NOTE_F;
- cmpi_quitting = true;
+ NOTE_F;
+ cmpi_quitting = true;
for (i = 0; i < KDA_SPACE_SIZE; i++)
for (item = k_bucket[i]->head;
item; item = item->next)
{
- KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
+ KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
MPIRPC_Block(neighbor->node, "shutdown", NULL);
- //KDA_Neighbor_dump(neighbor);
+ //KDA_Neighbor_dump(neighbor);
}
}
void
KDA_Detach(MPIRPC_Node* node)
{
- MPIRPC_Channel* channel = MPIRPC_Channel_for_node(node);
- MPIRPC_Close(channel);
+ MPIRPC_Channel* channel = MPIRPC_Channel_for_node(node);
+ MPIRPC_Close(channel);
}
Modified: src/kda-2/kademlia.c
===================================================================
--- src/kda-2/kademlia.c 2010-04-20 20:43:44 UTC (rev 39)
+++ src/kda-2/kademlia.c 2010-04-27 21:27:26 UTC (rev 40)
@@ -1,30 +1,32 @@
-#include <kda-2.h>
+#include <kda-2.h>
-KDA_ID id;
-K_BUCKET k_bucket[KDA_SPACE_SIZE];
+#include <strings.h>
+KDA_ID id;
+K_BUCKET k_bucket[KDA_SPACE_SIZE];
+
/**
- Number of nodes from MPI_COMM_WORLD that will be DHT nodes.
+ Number of nodes from MPI_COMM_WORLD that will be DHT nodes.
*/
-int kda_nodes;
+int kda_nodes;
/**
- List of KDA_Neighbor clients.
+ List of KDA_Neighbor clients.
*/
-struct list* clients;
+struct list* clients;
/**
- List of MPIRPC_Node hubs.
+ List of MPIRPC_Node hubs.
*/
-struct list* hubs;
+struct list* hubs;
-MPI_Request request;
+MPI_Request request;
/**
Outstanding KDA_Operations:
*/
-struct itable* operations;
+struct itable* operations;
/**
Kademlia parameter alpha:
@@ -34,39 +36,39 @@
/**
Kademlia parameter k:
*/
-int k;
+int k;
/**
A neighbor pointer to myself.
*/
-KDA_Neighbor* self;
+KDA_Neighbor* self;
/**
My port name.
*/
-char port[MPI_MAX_PORT_NAME];
+char port[MPI_MAX_PORT_NAME];
/**
- A uniquifier for client numbers. (Debugging only).
+ A uniquifier for client numbers. (Debugging only).
*/
-int unique_client = 1;
+int unique_client = 1;
void
KDA_Data(int alpha_in, int k_in)
{
- NOTE_F;
-
+ NOTE_F;
+
int i;
alpha = alpha_in;
k = k_in;
-
+
for (i = 0; i < KDA_SPACE_SIZE; i++)
k_bucket[i] = list_create();
-
+
operations = itable_create(CALL_TABLE_SIZE);
clients = list_create();
- hubs = list_create();
+ hubs = list_create();
}
/**
@@ -76,21 +78,21 @@
void
KDA_Init(int alpha_in, int k_in)
{
- NOTE_F;
+ NOTE_F;
- KDA_Data(alpha_in, k_in);
+ KDA_Data(alpha_in, k_in);
MPIRPC_Node node;
node.comm = MPI_COMM_NULL;
- node.rank = 0;
+ node.rank = 0;
id = make_id(debug_rank);
- self = KDA_Neighbor_create_id(id, node);
+ self = KDA_Neighbor_create_id(id, node);
NOTE_XI("ID: ", id, id);
- port[0] = '\0';
+ port[0] = '\0';
MPIRPC_Register("info", handle_info);
- // MPIRPC_Register("neighbor", handle_neighbor);
+ // MPIRPC_Register("neighbor", handle_neighbor);
MPIRPC_Register("ping", handle_ping);
MPIRPC_Register("find_node", handle_find_node);
MPIRPC_Register("query_id", handle_query_id);
@@ -105,35 +107,35 @@
void
KDA_Init_client(int alpha_in, int k_in)
{
- id = KDA_ID_CLIENT;
-
+ id = KDA_ID_CLIENT;
+
KDA_Data(alpha_in, k_in);
- KDA_Attach();
+ KDA_Attach();
}
-KDA_Neighbor*
+KDA_Neighbor*
random_neighbor()
{
int i;
struct list_item*...
[truncated message content] |