[C-MPI-commits] SF.net SVN: c-mpi:[197] test
Status: Pre-Alpha
Brought to you by:
jmwozniak
|
From: <jmw...@us...> - 2011-04-28 19:53:24
|
Revision: 197
http://c-mpi.svn.sourceforge.net/c-mpi/?rev=197&view=rev
Author: jmwozniak
Date: 2011-04-28 19:53:16 +0000 (Thu, 28 Apr 2011)
Log Message:
-----------
Overhaul to get Kademlia working again
Modified Paths:
--------------
docs/manual/manual.txt
include/cmpi.h
include/dense-1.h
include/kda-2.h
include/kda_conn-A.h
include/mpi_tools.h
include/mpirpc.h
include/xtree.h
src/adts/lru_table.c
src/adts/xtree.c
src/cmpi/cmpi.c
src/cmpi/driver.c
src/cmpi/mode_mono.c
src/cmpi/node.c
src/dense-1/cmpi_dense.c
src/dense-1/dense.c
src/kda-2/cmpi_kademlia.c
src/kda-2/conn-A.c
src/kda-2/conn-B.c
src/kda-2/kademlia.c
src/kda-2/neighbor.c
src/mpi_tools/mpi_tools.c
src/mpirpc/mpirpc.c
test/adts/test-dpkm_list.c
test/adts/test-lru_table01.c
test/cmpi/test-manyputs.c
test/cmpi/test-startup.c
test/driver/test-cmd-sleep.c
test/driver/test.zsh
test/mpi_tools/test-tools01.c
test/mpirpc/test-args.c
test/mpirpc/test-blob.c
test/mpirpc/test-ping.c
test/mpirpc/test-returns.c
Modified: docs/manual/manual.txt
===================================================================
--- docs/manual/manual.txt 2011-04-27 18:19:30 UTC (rev 196)
+++ docs/manual/manual.txt 2011-04-28 19:53:16 UTC (rev 197)
@@ -65,6 +65,8 @@
(+cmpi-db+) and the user connects to it via a cp-like tool
(+cmpi-cp+).
+image:cluster.png[Cluster mode operation]
+
Commands executed on submit host:
--------------------------------------------------------
#!/bin/sh
@@ -449,12 +451,12 @@
~~~~
The tests are defined for each component in +module.mk+. For each
-+test-*.c+, a +test-*.x+ executable is produced and launched. The
++test-\*.c+, a +test-\*.x+ executable is produced and launched. The
launcher is ; +assert()+s and output parsing are used to confirm
-correctness. Output is collected in +test-*.out+. If the test is run
-from a +test-*.zsh+, debugging output is collected and post-processed
+correctness. Output is collected in +test-\*.out+. If the test is run
+from a +test-\*.zsh+, debugging output is collected and post-processed
by the ZSH script. If the test fails, the output is moved to
-+test-*.out.failed+ (so make does not consider it).
++test-\*.out.failed+ (so make does not consider it).
make D=1 test_results
@@ -463,6 +465,8 @@
Components
----------
+image:cmpi-connections.png[C-MPI components outlined below.]
+
cmpi::
The C-MPI interface. Some reusable functionality is defined.
@@ -526,7 +530,7 @@
Abstract data types: lists, hash tables, etc.
-+gossip+;;
++gossip+::
A logging library from Phil Carns.
Modified: include/cmpi.h
===================================================================
--- include/cmpi.h 2011-04-27 18:19:30 UTC (rev 196)
+++ include/cmpi.h 2011-04-28 19:53:16 UTC (rev 197)
@@ -124,9 +124,9 @@
void cmpi_params_add(char* key, char* value);
/**
- Copies out the result from the cache.
+ Obtain pointer to the result from the params table.
*/
-char* cmpi_params_search(char* key);
+char* cmpi_params_get(char* key);
//// Methods that the implementation must provide:
Modified: include/dense-1.h
===================================================================
--- include/dense-1.h 2011-04-27 18:19:30 UTC (rev 196)
+++ include/dense-1.h 2011-04-28 19:53:16 UTC (rev 197)
@@ -33,25 +33,4 @@
void rpc_bootping(void);
void listen_loop(void);
-void handle_bootping(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_info(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_ping(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_query_id(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_query_id_k(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_store(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_update(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_retrieve(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_debug_cache(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-void handle_quit(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
-
#endif
Modified: include/kda-2.h
===================================================================
--- include/kda-2.h 2011-04-27 18:19:30 UTC (rev 196)
+++ include/kda-2.h 2011-04-28 19:53:16 UTC (rev 197)
@@ -78,19 +78,21 @@
//// Helpers...
+void KDA_Join(int other_id, int other_rank);
+
void KDA_Setup_node_port(void);
-void KDA_Serve(void);
-void KDA_Serve_id(KDA_ID other_id, MPIRPC_Node node);
-MPIRPC_Node KDA_Serve_accept(void);
-void KDA_Serve_client(MPIRPC_Node node);
-void KDA_Serve_handshake(MPIRPC_Node node,
- KDA_ID* other_id, int* msg);
-void KDA_Serve_neighbor(KDA_ID other_id, MPIRPC_Node node);
-void KDA_Setup_connector_port(void);
-void KDA_Connect_port(char* port);
-MPIRPC_Node* KDA_Attach(void);
-MPIRPC_Node* KDA_Attach_port(char* port);
-MPIRPC_Node* KDA_Register_hub(MPI_Comm comm);
+// void KDA_Serve(void);
+// void KDA_Serve_id(KDA_ID other_id, MPIRPC_Node node);
+// MPIRPC_Node KDA_Serve_accept(void);
+// void KDA_Serve_client(MPIRPC_Node node);
+//void KDA_Serve_handshake(MPIRPC_Node node,
+// KDA_ID* other_id, int* msg);
+//void KDA_Serve_neighbor(KDA_ID other_id, MPIRPC_Node node);
+// void KDA_Setup_connector_port(void);
+// void KDA_Connect_port(char* port);
+void KDA_Attach(struct list* contacts);
+// MPIRPC_Node* KDA_Attach_port(char* port);
+// MPIRPC_Node* KDA_Register_hub(MPI_Comm comm);
void KDA_Comm_set_name(KDA_Neighbor* neighbor);
@@ -103,7 +105,8 @@
// void KDA_Send_neighbor(MPI_Comm comm);
void KDA_Serve_shutdown(void);
void KDA_Detach(MPIRPC_Node node);
-void KDA_Shutdown(void);
+void KDA_Detach_hubs();
+void KDA_Shutdown(struct list* contacts);
void KDA_Shutdown_port(char* hub_port);
/**
@@ -144,10 +147,13 @@
MPIRPC* KDA_Lookup(MPIRPC_Node node, char* key);
MPIRPC* KDA_Lookup_k(MPIRPC_Node node, char* key);
-MPIRPC* KDA_Store(KDA_Neighbor* neighbor, char* key, char* value, int length);
-
+MPIRPC* KDA_Store(KDA_Neighbor* neighbor, char* key, char* value,
+ int length);
MPIRPC* KDA_Retrieve(KDA_Neighbor* neighbor, char* key);
+MPIRPC* KDA_Update(KDA_Neighbor* neighbor, char* key, char* value,
+ int length, int offset);
+
void listen_loop(void);
bool neighbor_add(KDA_Neighbor* neighbor);
Modified: include/kda_conn-A.h
===================================================================
--- include/kda_conn-A.h 2011-04-27 18:19:30 UTC (rev 196)
+++ include/kda_conn-A.h 2011-04-28 19:53:16 UTC (rev 197)
@@ -10,9 +10,9 @@
/**
Translate a KDA_ID to a rank in MPI_COMM_WORLD.
*/
-void handle_get_rank(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length);
+MPIRPC_Handler(handle_get_rank);
/**
Return the local KDA_ID.
*/
-void handle_get_id(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length);
+MPIRPC_Handler(handle_get_id);
Modified: include/mpi_tools.h
===================================================================
--- include/mpi_tools.h 2011-04-27 18:19:30 UTC (rev 196)
+++ include/mpi_tools.h 2011-04-28 19:53:16 UTC (rev 197)
@@ -94,6 +94,8 @@
#define OK NOTE("ok")
+void setup_debug_file(void);
+
/**
Simplify debugging statements.
*/
Modified: include/mpirpc.h
===================================================================
--- include/mpirpc.h 2011-04-27 18:19:30 UTC (rev 196)
+++ include/mpirpc.h 2011-04-28 19:53:16 UTC (rev 197)
@@ -30,9 +30,9 @@
int status;
char name[MPIRPC_MAX_NAME];
char args[MPIRPC_MAX_ARGS];
- char* blob;
+ void* blob;
int blob_length;
- char* result;
+ void* result;
int result_length;
void* extras;
void (*proceed) (struct mpirpc* rpc);
@@ -67,6 +67,21 @@
MPI_Request request[4];
} MPIRPC_Value;
+/**
+ Defines MPIRPC handler "f"
+ Parameters:
+ the target node
+ the uniqifier
+ the args
+ the blob
+ the blob length
+ */
+#define MPIRPC_Handler_pointer(f) void \
+ (*f)(MPIRPC_Node,int,char*,void*,int)
+
+#define MPIRPC_Handler(f) void \
+ f(MPIRPC_Node caller, int unique, char* args, void* blob, int blob_length)
+
//// API...
void MPIRPC_Init(void);
@@ -75,8 +90,7 @@
void MPIRPC_Recv(MPI_Comm comm);
-void MPIRPC_Register(char* name,
- void (*f)(MPIRPC_Node,int,char*,char*,int));
+void MPIRPC_Register(char* name, MPIRPC_Handler_pointer(f));
bool MPIRPC_Comm_add(MPI_Comm comm);
@@ -86,20 +100,20 @@
void* extras, void (*proceed)(MPIRPC*));
MPIRPC* MPIRPC_Call_blob(MPIRPC_Node target, char* name, char* args,
- char* blob, int blob_length,
+ void* blob, int blob_length,
void* extras, void (*proceed)(MPIRPC*));
-char* MPIRPC_Block(MPIRPC_Node target, char* name, char* args);
+void* MPIRPC_Block(MPIRPC_Node target, char* name, char* args);
-char* MPIRPC_Block_blob(MPIRPC_Node target, char* name, char* args,
- char* blob, int blob_length);
+void* MPIRPC_Block_blob(MPIRPC_Node target, char* name, char* args,
+ void* blob, int blob_length);
-char* MPIRPC_Wait(MPIRPC* rpc);
+void* MPIRPC_Wait(MPIRPC* rpc);
void MPIRPC_Null(MPIRPC_Node caller, int unique);
void MPIRPC_Return(MPIRPC_Node caller, int unique,
- char* result, int rlength);
+ void* result, int rlength);
MPIRPC_Node* MPIRPC_Node_create(MPI_Comm comm, int rank);
Modified: include/xtree.h
===================================================================
--- include/xtree.h 2011-04-27 18:19:30 UTC (rev 196)
+++ include/xtree.h 2011-04-28 19:53:16 UTC (rev 197)
@@ -6,73 +6,67 @@
#ifndef XTREE_H
#define XTREE_H
-#include <stdbool.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
+#include <inlist.h>
-#include <ilist.h>
-#include <inlist.h>
-
-typedef int xtree_id;
+typedef int xtree_id;
#define XTREE_DEPTH (sizeof(xtree_id)*8)
-typedef struct xtree_object_s xtree_object;
+typedef struct xtree_object_s xtree_object;
typedef union xtree_thing_u xtree_thing;
-typedef enum
+typedef enum
{
XTREE_NODE,
XTREE_ITEM
-} xtree_tag;
+} xtree_tag;
typedef struct xtree_item_s
{
xtree_id id;
void* data;
xtree_object* smaller;
- xtree_object* bigger;
+ xtree_object* bigger;
} xtree_item;
typedef struct xtree_node_s
{
xtree_object* zero;
- xtree_object* one;
+ xtree_object* one;
} xtree_node;
union xtree_thing_u
{
xtree_node node;
- xtree_item item;
+ xtree_item item;
};
struct xtree_object_s
{
xtree_tag tag;
- xtree_thing thing;
+ xtree_thing thing;
};
struct xtree
{
- int size;
- xtree_object* root;
+ int size;
+ xtree_object* root;
};
struct xtree* xtree_create();
-void xtree_add(struct xtree* target, xtree_id id, void* data);
+void xtree_add(struct xtree* target, xtree_id id, void* data);
void* xtree_search(struct xtree* target, xtree_id id);
struct inlist* xtree_query(struct xtree* target,
- xtree_id id, int count);
+ xtree_id id, int count);
void xtree_printf(struct xtree* target);
-//// Internals:
+//// Internals:
-void xtree_replace(struct xtree* target, int depth,
+void xtree_replace(struct xtree* target, int depth,
xtree_object* old_object,
xtree_object* new_object);
-#endif
+#endif
Modified: src/adts/lru_table.c
===================================================================
--- src/adts/lru_table.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/adts/lru_table.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -7,11 +7,13 @@
return (i1>i2) ? i1 : i2;
}
+/*
static int
min(int i1, int i2)
{
return (i1<i2) ? i1 : i2;
}
+*/
struct lru_table*
lru_table_create(int capacity, int limit)
@@ -195,8 +197,6 @@
gossip_do(MASK_ADTS, lru_table_printf("%s", table));
DEBUG(printdata("old data: ", item->data, item->length));
- char* data = item->data;
-
int new_length = max(item->length, offset+length);
char* update = realloc(item->data, new_length);
Modified: src/adts/xtree.c
===================================================================
--- src/adts/xtree.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/adts/xtree.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -1,27 +1,34 @@
-#include <xtree.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <xtree.h>
+#include <ilist.h>
+
/**
@return The ith bit of id.
*/
int
bit(int i, xtree_id id)
{
- // printf("shift: %i\n", (1 << i));
+ // printf("shift: %i\n", (1 << i));
- return !! ((1 << i) & id);
+ return !! ((1 << i) & id);
}
struct xtree*
xtree_create()
{
struct xtree* result;
-
+
result = malloc(sizeof(struct xtree));
result->size = 0;
- result->root = NULL;
-
- return result;
+ result->root = NULL;
+
+ return result;
}
void
@@ -30,37 +37,37 @@
xtree_object* new_object;
xtree_object** p;
bool done;
- int depth;
+ int depth;
new_object = malloc(sizeof(xtree_object));
- new_object->tag = XTREE_ITEM;
+ new_object->tag = XTREE_ITEM;
new_object->thing.item.id = id;
new_object->thing.item.data = data;
if (target->size == 0)
{
- target->root = new_object;
- target->size++;
- return;
+ target->root = new_object;
+ target->size++;
+ return;
}
-
+
p = &(target->root);
- depth = XTREE_DEPTH-1;
+ depth = XTREE_DEPTH-1;
done = false;
while (!done)
{
if (*p == NULL)
{
- printf("inserting at NULL stub: depth: %i\n", depth);
- *p = new_object;
- target->size++;
- break;
+ printf("inserting at NULL stub: depth: %i\n", depth);
+ *p = new_object;
+ target->size++;
+ break;
}
else if ((*p)->tag == XTREE_ITEM)
{
xtree_replace(target, depth, *p, new_object);
- target->size++;
- break;
+ target->size++;
+ break;
}
else if ((*p)->tag == XTREE_NODE)
{
@@ -68,67 +75,65 @@
p = &((*p)->thing.node.zero);
else
p = &((*p)->thing.node.one);
- depth--;
+ depth--;
}
}
}
-#include <unistd.h>
-
void
-xtree_replace(struct xtree* target, int depth,
+xtree_replace(struct xtree* target, int depth,
xtree_object* old_object, xtree_object* new_object)
{
- xtree_item old_item;
+ xtree_item old_item = {0};
xtree_object* p;
xtree_id old_id;
- xtree_id new_id;
-
- xtree_object* new_node;
+ xtree_id new_id;
- // Shortcuts...
+ xtree_object* new_node;
+
+ // Shortcuts...
old_id = old_object->thing.item.id;
new_id = new_object->thing.item.id;
-
+
old_item.id = old_object->thing.item.id;
- old_item.data = old_object->thing.item.data;
+ old_item.data = old_object->thing.item.data;
- old_object->tag = XTREE_NODE;
+ old_object->tag = XTREE_NODE;
old_object->thing.node.zero = NULL;
- old_object->thing.node.one = NULL;
+ old_object->thing.node.one = NULL;
- printf("xtree_replace(%i)\n", depth);
+ printf("xtree_replace(%i)\n", depth);
printf("old %i: \n", old_id);
printf("new %i: \n", new_id);
-
+
p = old_object;
old_object = malloc(sizeof(xtree_object));
old_object->tag = XTREE_ITEM;
- old_object->thing.item = old_item;
-
+ old_object->thing.item = old_item;
+
while (true)
{
if (bit(depth,old_id) != bit(depth,new_id))
{
- printf("diff at: %i\n", depth);
-
+ printf("diff at: %i\n", depth);
+
if (bit(depth,old_id) == 0)
{
p->thing.node.zero = old_object;
- p->thing.node.one = new_object;
+ p->thing.node.one = new_object;
}
else
{
p->thing.node.zero = new_object;
- p->thing.node.one = old_object;
+ p->thing.node.one = old_object;
}
- break;
+ break;
}
else
{
- // printf("new_node\n");
+ // printf("new_node\n");
new_node = malloc(sizeof(xtree_object));
new_node->tag = XTREE_NODE;
new_node->thing.node.zero = NULL;
@@ -137,7 +142,7 @@
p->thing.node.zero = new_node;
else
p->thing.node.one = new_node;
- p = new_node;
+ p = new_node;
}
depth--;
}
@@ -145,19 +150,19 @@
/**
*/
-void
+void
xtree_find(xtree_object* p, xtree_id id, int count,
- int depth, struct ilist* objects)
+ int depth, struct ilist* objects)
{
if (p == NULL)
{
- return;
+ return;
}
-
+
if (p->tag == XTREE_ITEM)
{
- ilist_ordered_insert(objects, id ^ p->thing.item.id, p);
- return;
+ ilist_ordered_insert(objects, id ^ p->thing.item.id, p);
+ return;
}
if (bit(depth, id) == 0)
@@ -179,53 +184,53 @@
struct inlist*
xtree_query(struct xtree* target, xtree_id id, int count)
{
- struct inlist* result;
- struct ilist* objects;
+ struct inlist* result;
+ struct ilist* objects;
struct ilist_item* item;
- xtree_object* object;
-
+ xtree_object* object;
+
objects = ilist_create();
xtree_find(target->root, id, count, XTREE_DEPTH-1, objects);
- result = inlist_create();
+ result = inlist_create();
for (item = objects->head; item; item = item->next)
{
object = (xtree_object*) item->data;
inlist_add(result, object->thing.item.id);
}
- return result;
+ return result;
}
/**
- Return the data object for the matching id.
+ Return the data object for the matching id.
*/
void*
xtree_search(struct xtree* target, xtree_id id)
{
- void* result;
- struct ilist* objects;
- xtree_object* object;
+ void* result;
+ struct ilist* objects = ilist_create();
+ xtree_object* object;
result = NULL;
-
- xtree_find(target->root, id, 1, XTREE_DEPTH-1, objects);
+ xtree_find(target->root, id, 1, XTREE_DEPTH-1, objects);
+
if (objects->size > 0)
{
object = (xtree_object*) objects->head->data;
if (object->thing.item.id == id)
- result = object->thing.item.data;
+ result = object->thing.item.data;
}
-
- return result;
+
+ return result;
}
void
xtree_printf_item(xtree_item* item, int depth)
{
- printf(" %i", item->id);
+ printf(" %i", item->id);
}
void
@@ -234,27 +239,27 @@
char spaces[256];
memset(spaces, ' ', XTREE_DEPTH-depth);
- spaces[XTREE_DEPTH-depth] = '\0';
+ spaces[XTREE_DEPTH-depth] = '\0';
- // printf("depth: %i\n", depth)
-
+ // printf("depth: %i\n", depth)
+
if (object != NULL)
{
if (object->tag == XTREE_ITEM)
{
- printf("ITEM: ");
+ printf("ITEM: ");
xtree_printf_item(&(object->thing.item), depth);
}
else
{
if (object->thing.node.zero != NULL)
{
- printf("\n%s0 ", spaces);
+ printf("\n%s0 ", spaces);
xtree_printf_node(object->thing.node.zero, depth-1);
}
if (object->thing.node.one != NULL)
{
- printf("\n%s1 ", spaces);
+ printf("\n%s1 ", spaces);
xtree_printf_node(object->thing.node.one, depth-1);
}
}
@@ -264,10 +269,10 @@
void
xtree_printf(struct xtree* target)
{
- if (target->size > 0)
+ if (target->size > 0)
xtree_printf_node(target->root, XTREE_DEPTH-1);
else
printf("xtree: empty");
- printf("\n");
+ printf("\n");
}
Modified: src/cmpi/cmpi.c
===================================================================
--- src/cmpi/cmpi.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/cmpi/cmpi.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -48,9 +48,10 @@
Returns pointer to the matching value or NULL if not found.
*/
char*
-cmpi_params_search(char* key)
+cmpi_params_get(char* key)
{
char* result = hashtable_search(cmpi_params, key);
+ SHOW_FSS(key, result);
return result;
}
Modified: src/cmpi/driver.c
===================================================================
--- src/cmpi/driver.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/cmpi/driver.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -54,7 +54,7 @@
*/
void driver_setup_fifo_names()
{
- char* dir = cmpi_params_search("driver_dir");
+ char* dir = cmpi_params_get("driver_dir");
if (!dir)
dir = "/tmp";
Modified: src/cmpi/mode_mono.c
===================================================================
--- src/cmpi/mode_mono.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/cmpi/mode_mono.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -51,6 +51,7 @@
int cmpi_mode_first_client(int rank, int size, int nodes)
{
+ SHOW_FI(nodes);
return nodes;
}
Modified: src/cmpi/node.c
===================================================================
--- src/cmpi/node.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/cmpi/node.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -154,8 +154,8 @@
{
gossip_debug_init();
- //gossip_set_debug_mask(true,MASK_DEBUG|MASK_MPIRPC|MASK_ADTS|
- // MASK_CMPI|MASK_DRIVER);
+ gossip_set_debug_mask(true,MASK_DEBUG|MASK_MPIRPC|// MASK_ADTS|
+ MASK_CMPI|MASK_DRIVER);
gossip_enable_stdout();
@@ -171,12 +171,8 @@
whoami();
if (mpi_rank == 0)
- {
DEBUG(timestamp("START", NULL));
- }
- dmalloc_setup();
-
// Handle and store options...
options(argc, argv);
@@ -202,7 +198,7 @@
exit(1);
}
- char* tag = cmpi_params_search("tag");
+ char* tag = cmpi_params_get("tag");
if (tag)
note_s("tag: ", tag);
Modified: src/dense-1/cmpi_dense.c
===================================================================
--- src/dense-1/cmpi_dense.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/dense-1/cmpi_dense.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -255,7 +255,7 @@
DENSE_Read_params(void)
{
// NOTE_F;
- char* nodes_msg = cmpi_params_search("nodes");
+ char* nodes_msg = cmpi_params_get("nodes");
if (nodes_msg)
{
int n = sscanf(nodes_msg, "%i", &dense_nodes);
Modified: src/dense-1/dense.c
===================================================================
--- src/dense-1/dense.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/dense-1/dense.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -28,6 +28,17 @@
int k = 1;
+static MPIRPC_Handler(handle_bootping);
+static MPIRPC_Handler(handle_info);
+static MPIRPC_Handler(handle_ping);
+static MPIRPC_Handler(handle_query_id);
+static MPIRPC_Handler(handle_query_id_k);
+static MPIRPC_Handler(handle_store);
+static MPIRPC_Handler(handle_update);
+static MPIRPC_Handler(handle_retrieve);
+static MPIRPC_Handler(handle_debug_cache);
+static MPIRPC_Handler(handle_quit);
+
void
DENSE_Init(int k_in, MPI_Comm comm)
{
@@ -85,7 +96,7 @@
cmpi_mode_first_client(mpi_rank, mpi_size, dense_nodes);
NOTE("NOTIFYING");
int msg = -2;
- MPI_Send(&msg, 1, MPI_INT, client, 0, MPI_COMM_WORLD);
+ // MPI_Send(&msg, 1, MPI_INT, client, 0, MPI_COMM_WORLD);
}
}
@@ -217,31 +228,23 @@
MPIRPC_Block(node, "quit", NULL);
}
-void
-handle_bootping(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_bootping)
{
MPIRPC_Null(caller, unique);
rpc_bootping();
}
-void
-handle_info(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_info)
{
MPIRPC_Null(caller, unique);
}
-void
-handle_ping(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_ping)
{
MPIRPC_Null(caller, unique);
}
-void
-handle_query_id(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_query_id)
{
int object_id;
sscanf(args, "%X", &object_id);
@@ -251,9 +254,7 @@
MPIRPC_Return(caller, unique, result, length);
}
-void
-handle_query_id_k(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_query_id_k)
{
int i;
int object_id;
@@ -269,9 +270,7 @@
MPIRPC_Return(caller, unique, result, strlen(result)+1);
}
-void
-handle_store(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_store)
{
SHOW_FSI(args, blob_length);
@@ -283,9 +282,7 @@
DONE;
}
-void
-handle_update(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_update)
{
char* key = malloc(strlen(args)+1);
@@ -301,9 +298,7 @@
MPIRPC_Null(caller, unique);
}
-void
-handle_retrieve(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_retrieve)
{
NOTE_FS(args);
@@ -329,18 +324,14 @@
}
}
-void
-handle_debug_cache(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_debug_cache)
{
MPIRPC_Null(caller, unique);
puts("cmpi_cache:");
lru_table_printdata(cmpi_cache);
}
-void
-handle_quit(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+static MPIRPC_Handler(handle_quit)
{
MPIRPC_Null(caller, unique);
sleep(1);
Modified: src/kda-2/cmpi_kademlia.c
===================================================================
--- src/kda-2/cmpi_kademlia.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/kda-2/cmpi_kademlia.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -6,10 +6,13 @@
*/
struct list* contacts;
+static void cmpi_impl_parameters(void);
+
CMPI_RETURN
cmpi_init_impl()
{
// NOTE_F;
+ cmpi_impl_parameters();
KDA_Init(3, 3);
return CMPI_SUCCESS;
@@ -18,9 +21,13 @@
CMPI_RETURN
cmpi_init_impl_client()
{
+ NOTE_F;
+
contacts = list_create();
KDA_Init_client(3,3);
+ KDA_Attach(contacts);
+
return CMPI_SUCCESS;
}
@@ -37,19 +44,30 @@
return CMPI_SUCCESS;
}
+static void
+cmpi_impl_parameters()
+{
+ char* value = cmpi_params_get("nodes");
+ assert(value);
+ cmpi_impl_parameter("nodes", value);
+}
+
CMPI_RETURN
cmpi_attach()
{
+ /*
CMPI_RETURN result;
if (KDA_Attach())
{
- add_contacts();
result = CMPI_SUCCESS;
}
else
result = CMPI_ERROR_SERVICENAME;
return result;
+ */
+ printf("cmpi_attach(): DEPRECATED!\n");
+ return CMPI_SUCCESS;
}
void
@@ -61,20 +79,19 @@
for (i = 0; i < KDA_SPACE_SIZE; i++)
for (item = k_bucket[i]->head;
item; item = item->next)
- {
- KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
- KDA_Neighbor_dump(neighbor);
- list_add_unique(contacts, KDA_Neighbor_cmp, neighbor);
- KDA_Comm_set_name(neighbor);
- }
+ {
+ KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
+ KDA_Neighbor_dump(neighbor);
+ list_add_unique(contacts, KDA_Neighbor_cmp, neighbor);
+ KDA_Comm_set_name(neighbor);
+ }
}
KDA_Neighbor*
contact_lookup(KDA_Neighbor* neighbor)
{
- struct list_item* item;
NOTE_FX(neighbor->id);
- for (item = contacts->head;
+ for (struct list_item* item = contacts->head;
item; item = item->next)
{
KDA_Neighbor* n = (KDA_Neighbor*) item->data;
@@ -82,8 +99,7 @@
return n;
}
- NOTE("CONTACT LOOKUP FAILED: LINKING");
- KDA_Harpoon(KDA_Random_neighbor()->node, neighbor->id);
+ KDA_Join(neighbor->id, neighbor->node.comm);
add_contacts();
return contact_lookup(neighbor);
@@ -118,7 +134,7 @@
MPIRPC_Wait(rpc);
- printf("cmpi_lookup()@%X -> %s \n", n->id, rpc->result);
+ printf("cmpi_lookup()@%X -> %s \n", n->id, (char*) rpc->result);
return 0;
}
@@ -159,12 +175,10 @@
for (item = neighbors->head;
item; item = item->next)
{
- KDA_Neighbor* neighbor =
- contact_lookup((KDA_Neighbor*) item->data);
+ KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
MPIRPC* rpc = KDA_Store(neighbor, key, value, length);
MPIRPC_Wait(rpc);
MPIRPC_Free(rpc);
- NOTE("freed");
}
list_destroy(neighbors);
@@ -208,7 +222,47 @@
return CMPI_SUCCESS;
}
+CMPI_RETURN
+cmpi_update(char* key, char* value, int length, int offset)
+{
+ struct list_item* item;
+ NOTE_FSS(key, value);
+
+ // printf("contacts: %i \n", contacts);
+
+ KDA_Neighbor* n = (KDA_Neighbor*) list_random(contacts);
+ MPIRPC* lookup = KDA_Lookup_k(n->node, key);
+ MPIRPC_Wait(lookup);
+
+ struct list* neighbors = KDA_Neighbor_node_list(lookup->result);
+
+ for (item = neighbors->head; item; item = item->next)
+ {
+ KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
+ MPIRPC* rpc = KDA_Update(neighbor, key, value, length, offset);
+ MPIRPC_Wait(rpc);
+ MPIRPC_Free(rpc);
+ }
+
+ list_destroy(neighbors);
+
+ DONE;
+ return CMPI_SUCCESS;
+}
+
void
+cmpi_debug_tables()
+{
+ puts("debug_tables()");
+}
+
+void
+cmpi_debug_caches()
+{
+ puts("debug_caches()");
+}
+
+void
cmpi_detach()
{
while (contacts->size > 0)
@@ -219,19 +273,21 @@
free(neighbor);
}
- while (hubs->size > 0)
- {
- MPIRPC_Node* hub = (MPIRPC_Node*) list_poll(hubs);
- KDA_Detach(*hub);
- free(hub);
- }
-
+ KDA_Detach_hubs();
NOTE("DETACHED");
}
+static void free_contacts();
+
void
cmpi_shutdown()
{
- KDA_Shutdown();
+ KDA_Shutdown(contacts);
+ free_contacts();
}
+void
+free_contacts()
+{
+ list_destroy(contacts);
+}
Modified: src/kda-2/conn-A.c
===================================================================
--- src/kda-2/conn-A.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/kda-2/conn-A.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -1,5 +1,7 @@
#include "kda_conn-A.h"
+#include "kda_neighbor-2.h"
+#include "cmpi_mode.h"
/**
Maps KDA_IDs to ints.
@@ -10,6 +12,7 @@
void
KDA_Init_conn()
{
+ NOTE_F;
// Add self to world_ranks...
world_ranks = hashtable_create(mpi_size);
int* heap_mpi_rank = malloc(sizeof(int));
@@ -21,7 +24,7 @@
MPIRPC_Register("get_id", handle_get_id);
MPIRPC_Register("get_rank", handle_get_rank);
- sleep(mpi_rank * 10);
+ sleep(mpi_rank * 3);
if (mpi_rank > 0)
{
@@ -32,85 +35,76 @@
char* result = MPIRPC_Block(node, "get_id", NULL);
KDA_ID other_id;
sscanf(result, "%X", &other_id);
+ free(result);
KDA_Join(other_id, other_rank);
}
+ /*
+ int client =
+ cmpi_mode_first_client(mpi_rank, mpi_size, kda_nodes);
+ SHOW_I(client);
+ int msg = -2;
+ // MPI_Send(&msg, 1, MPI_INT, client, 0, MPI_COMM_WORLD);
+*/
listen_loop();
}
/**
- Attach
+ Form client->node connections
*/
-MPIRPC_Node*
-KDA_Attach()
-{
- int other_rank = rand_lt(mpi_rank);
- MPIRPC_Node node;
- MPIRPC_Node_make(MPI_COMM_WORLD, other_rank, &node);
- char* result = MPIRPC_Block(node, "get_id", NULL);
- KDA_ID other_id;
- sscanf(result, "%X", &other_id);
-
- char* args = malloc(20*sizeof(int));
- sprintf(args, "%X %i", KDA_ID_CLIENT, mpi_rank);
- MPIRPC_Block(node, "join", args);
-
- return NULL;
-}
-
void
-KDA_Harpoon(MPIRPC_Node reference, KDA_ID other_id)
+KDA_Attach(struct list* contacts)
{
- KDA_ID* heap_id = malloc(sizeof(KDA_ID));
- *heap_id = other_id;
- char* result = MPIRPC_Block(reference, "get_rank", xheap(other_id));
- int* other_rank = malloc(sizeof(int));
- sscanf(result, "%i", other_rank);
- hashtable_add(world_ranks, xheap(other_id), other_rank);
- KDA_Join(other_id, *other_rank);
+ NOTE_F;
+ MPIRPC_Comm_add(MPI_COMM_WORLD);
+ for (int i = 0; i < 3; i++)
+ {
+ int other_rank = rand_lt(mpi_size/2);
+ MPIRPC_Node node;
+ MPIRPC_Node_make(MPI_COMM_WORLD, other_rank, &node);
+ char* result = MPIRPC_Block(node, "get_id", NULL);
+ NOTE("got id");
+ KDA_ID other_id;
+ sscanf(result, "%X", &other_id);
+ free(result);
+ KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
+ if (list_add_unique(contacts, KDA_Neighbor_cmp, neighbor))
+ NOTE_XI("added neighbor: ", other_id, neighbor->node.comm);
+ else
+ free(neighbor);
+ }
}
+/**
+ Form node->node connections
+
+*/
void
KDA_Join(int other_id, int other_rank)
{
- MPIRPC_Node node1, node2;
- MPIRPC_Node_make(MPI_COMM_WORLD, other_rank, &node1);
- char* args = malloc(20*sizeof(int));
+ MPIRPC_Node other_node;
+ MPIRPC_Node_make(MPI_COMM_WORLD, other_rank, &other_node);
+ char args[64];
sprintf(args, "%X %i", id, mpi_rank);
- MPIRPC_Block(node1, "join", args);
+ MPIRPC_Block(other_node, "join", args);
- MPI_Comm newcomm = KDA_Comm_create(&other_rank);
- MPIRPC_Node_make(newcomm, 0, &node2);
- KDA_ID kda_id = (KDA_ID) other_id;
- KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node2);
+ KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, other_node);
neighbor_add(neighbor);
}
-MPI_Comm
-KDA_Comm_create(int* other_rank)
-{
- MPI_Group world_group, newgroup;
- MPI_Comm newcomm;
- MPI_Comm_group(MPI_COMM_WORLD, &world_group);
- MPI_Group_incl(world_group, 1, other_rank, &newgroup);
- MPI_Comm_create(MPI_COMM_WORLD, newgroup, &newcomm);
- MPIRPC_Comm_add(newcomm);
- return newcomm;
-}
-
void
-handle_join(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length)
+handle_join(MPIRPC_Node caller, int unique, char* args,
+ char* blob, int blob_length)
{
+ SHOW_FI(caller.rank);
KDA_ID other_id;
- int* other_rank = malloc(sizeof(int));
+ int* other_rank = malloc(sizeof(int));
sscanf(args, "%X %i", &other_id, other_rank);
- MPI_Comm newcomm = KDA_Comm_create(other_rank);
-
if (other_id != KDA_ID_CLIENT)
{
MPIRPC_Node node;
- MPIRPC_Node_make(newcomm, 0, &node);
+ MPIRPC_Node_make(caller.comm, caller.rank, &node);
KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
neighbor_add(neighbor);
hashtable_add(world_ranks, xheap(other_id), other_rank);
@@ -119,19 +113,18 @@
MPIRPC_Null(caller, unique);
}
-void
-handle_get_rank(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length)
+MPIRPC_Handler(handle_get_rank)
{
int* other_rank = (int*) hashtable_search(world_ranks, args);
- char* result = malloc(10*sizeof(char));
+ char* result = malloc(16*sizeof(char));
int length = sprintf(result, "%i", *other_rank);
MPIRPC_Return(caller, unique, result, length+1);
}
-void
-handle_get_id(MPIRPC_Node caller, int unique, char* args, char* blob, int blob_length)
+MPIRPC_Handler(handle_get_id)
{
- char* result = malloc(10*sizeof(char));
+ SHOW_FI(caller.rank);
+ char* result = malloc(16*sizeof(char));
int length = sprintf(result, "%X", id);
MPIRPC_Return(caller, unique, result, length+1);
}
@@ -141,5 +134,6 @@
{}
void
-KDA_Shutdown()
+KDA_Detach_hubs()
{}
+
Modified: src/kda-2/conn-B.c
===================================================================
--- src/kda-2/conn-B.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/kda-2/conn-B.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -15,6 +15,12 @@
//// Internal prototypes
int thirdhash(char* s);
+/**
+ List of MPIRPC_Node hubs.
+*/
+// struct list* hubs;
+
+
void
KDA_Init_conn()
{
@@ -38,6 +44,8 @@
KDA_Hub_init();
KDA_Hub_loop();
}
+
+ // hubs = list_create();
}
void
@@ -490,3 +498,13 @@
MPIRPC_Close(channel);
}
+void
+KDA_Detach_hubs()
+{
+ while (hubs->size > 0)
+ {
+ MPIRPC_Node* hub = (MPIRPC_Node*) list_poll(hubs);
+ KDA_Detach(*hub);
+ free(hub);
+ }
+}
Modified: src/kda-2/kademlia.c
===================================================================
--- src/kda-2/kademlia.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/kda-2/kademlia.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -21,11 +21,6 @@
*/
struct list* clients;
-/**
- List of MPIRPC_Node hubs.
-*/
-struct list* hubs;
-
MPI_Request request;
/**
@@ -95,8 +90,8 @@
char* buckets_tostring(void);
static KDA_ID make_id(int salt);
-static char* id_tostring(void);
-static void bootstrap(MPIRPC_Node node);
+// static char* id_tostring(void);
+// static void bootstrap(MPIRPC_Node node);
int KDA_Neighbor_table_size(void);
@@ -106,33 +101,30 @@
Generate info string for debugging
*/
void handle_info(MPIRPC_Node node, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_join(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_link(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_find_node(MPIRPC_Node node, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_neighbor(MPIRPC_Node node, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_ping(MPIRPC_Node node, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_query_id(MPIRPC_Node node, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_query_id_k(MPIRPC_Node node, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_store(MPIRPC_Node node, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
+void handle_update(MPIRPC_Node node, int unique, char* args,
+ void* blob, int blob_length);
void handle_retrieve(MPIRPC_Node node, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
void handle_shutdown(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length);
+ void* blob, int blob_length);
-/* SPECIFIC TO CONN TYPE
- void handle_accept(MPIRPC_Node* caller, int unique, char* args, char* blob, int blob_length);
- void handle_connect(MPIRPC_Node* caller, int unique, char* args, char* blob, int blob_length);
-*/
-
// Synchronous methods...
struct ilist* find_node(KDA_ID id);
@@ -156,17 +148,13 @@
{
NOTE_F;
- int i;
alpha = alpha_in;
k = k_in;
- for (i = 0; i < KDA_SPACE_SIZE; i++)
+ for (int i = 0; i < KDA_SPACE_SIZE; i++)
k_bucket[i] = list_create();
operations = itable_create(CALL_TABLE_SIZE);
-
- clients = list_create();
- hubs = list_create();
}
/**
@@ -185,13 +173,16 @@
node.rank = 0;
id = make_id(debug_rank);
self = KDA_Neighbor_create_id(id, node);
- NOTE_XI("ID: ", id, id);
+ NOTE_XI("SELF: ", id, id);
port[0] = '\0';
+ clients = list_create();
+
MPIRPC_Register("info", handle_info);
// MPIRPC_Register("neighbor", handle_neighbor);
MPIRPC_Register("ping", handle_ping);
+ MPIRPC_Register("join", handle_join);
MPIRPC_Register("find_node", handle_find_node);
MPIRPC_Register("query_id", handle_query_id);
MPIRPC_Register("query_id_k", handle_query_id_k);
@@ -205,11 +196,10 @@
void
KDA_Init_client(int alpha_in, int k_in)
{
+ NOTE_F;
id = KDA_ID_CLIENT;
KDA_Data(alpha_in, k_in);
-
- KDA_Attach();
}
KDA_Neighbor*
@@ -276,8 +266,7 @@
struct ilist*
KDA_Find(KDA_ID id)
{
- KDA_Operation* op =
- KDA_Operate(id, NULL, NULL, NULL, NULL);
+ KDA_Operation* op = KDA_Operate(id, NULL, NULL, NULL, NULL);
KDA_Wait(op);
return op->k_closest;
}
@@ -302,8 +291,6 @@
struct ilist*
KDA_Closest(KDA_ID object_id)
{
- int i;
- struct list_item* item;
NOTE_FX(object_id);
struct ilist* result = ilist_create();
@@ -311,9 +298,9 @@
int d = XOR(self->id, object_id);
ilist_add(result, d, self);
- for (i = 0; i < KDA_SPACE_SIZE; i++)
+ for (int i = 0; i < KDA_SPACE_SIZE; i++)
{
- for (item = k_bucket[i]->head;
+ for (struct list_item* item = k_bucket[i]->head;
item; item = item->next)
{
KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
@@ -420,7 +407,7 @@
{
NOTE_FX(id);
MPIRPC* rpc =
- MPIRPC_Call(node, heap("query_id_k"), iheap(id),
+ MPIRPC_Call(node, "query_id_k", iheap(id),
NULL, MPIRPC_PROCEED_NULL);
return rpc;
}
@@ -433,7 +420,7 @@
{
NOTE_F;
- return MPIRPC_Call(node, heap("query_id"), xheap(id), NULL, NULL);
+ return MPIRPC_Call(node, "query_id", xheap(id), NULL, NULL);
}
/**
@@ -458,10 +445,25 @@
KDA_Retrieve(KDA_Neighbor* neighbor, char* key)
{
NOTE_F;
- return MPIRPC_Call(neighbor->node, heap("retrieve"), key,
+ return MPIRPC_Call(neighbor->node, "retrieve", key,
NULL, MPIRPC_PROCEED_NULL);
}
+MPIRPC*
+KDA_Update(KDA_Neighbor* neighbor, char* key, char* value,
+ int length, int offset)
+{
+ SHOW_FSI(key, length);
+
+ char args[MPIRPC_MAX_ARGS];
+ sprintf(args, "%s %i", key, offset);
+
+ MPIRPC* rpc =
+ MPIRPC_Call_blob(neighbor->node, "update", args,
+ value, length, NULL, MPIRPC_PROCEED_NULL);
+ return rpc;
+}
+
KDA_Query*
query_create(MPIRPC_Node caller, int unique,
void (*service)(KDA_Operation* op))
@@ -485,7 +487,7 @@
void
handle_info(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
// NOTE_F;
char* result = buckets_tostring();
@@ -500,7 +502,7 @@
*/
void
handle_store(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
NOTE_S("store: ", args);
cmpi_cached_store(args, blob, blob_length);
@@ -509,6 +511,24 @@
MPIRPC_Null(caller, unique);
}
+void
+handle_update(MPIRPC_Node caller, int unique, char* args,
+ void* blob, int blob_length)
+{
+ char* key = malloc(strlen(args)+1);
+
+ SHOW_FSI(args, blob_length);
+
+ int offset;
+ int n = sscanf(args, "%s %i", key, &offset);
+ assert(n == 2);
+
+ cmpi_cached_update(key, blob, offset, blob_length);
+ // DEBUG(lru_table_printdata("%s", cmpi_cache));
+
+ MPIRPC_Null(caller, unique);
+}
+
/**
Obtain value given key.
@param args The key
@@ -516,7 +536,7 @@
*/
void
handle_retrieve(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
NOTE_FS(args);
DEBUG(lru_table_printf("%s", cmpi_cache));
@@ -531,7 +551,7 @@
void
handle_shutdown(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
NOTE_F;
cmpi_quitting = true;
@@ -548,7 +568,7 @@
void
handle_find_node(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
struct ilist_item* item;
NOTE_FS(args);
@@ -573,7 +593,7 @@
void
handle_query_id(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
int object_id;
sscanf(args, "%X", &object_id);
@@ -600,7 +620,7 @@
void
handle_query_id_k(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
int object_id;
sscanf(args, "%X", &object_id);
@@ -690,8 +710,7 @@
p += n;
struct list* neighbors = KDA_Neighbor_node_list(p);
- for (item = neighbors->head;
- item; item = item->next)
+ for (item = neighbors->head; item; item = item->next)
{
KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
neighbor = check_neighbor(rpc->target, neighbor);
@@ -768,7 +787,7 @@
KDA_Neighbor* result;
if (neighbor_add(neighbor))
- KDA_Harpoon(node, neighbor->id);
+ KDA_Join(neighbor->id, node.rank);
result = neighbor_lookup(neighbor);
return result;
@@ -879,7 +898,7 @@
KDA_Operation* op = (KDA_Operation*) rpc->extras;
list_add(op->returned, target);
- dump_buckets();
+ DEBUG(dump_buckets());
// TODO: Deallocate op and rpc
// if (mpi_rank == mpi_size-1) NOTE("Highest rank ping completed.");
@@ -922,7 +941,7 @@
list_ordered_insert(k_bucket[i], KDA_Neighbor_time_cmp, (void*) n);
NOTE("After insert: ");
- dump_buckets();
+ DEBUG(dump_buckets());
return true;
}
@@ -961,11 +980,15 @@
*/
void
handle_ping(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
MPIRPC_Null(caller, unique);
}
+static void loop_work();
+static void shutdown_others();
+static void shutdown_loop();
+
void
listen_loop(void)
{
@@ -973,18 +996,54 @@
while (! cmpi_quitting)
{
- // NOTE("check");
- if (! MPIRPC_Check())
+ NOTE("loop");
+ loop_work();
+ }
+ shutdown_others();
+ shutdown_loop();
+ DONE;
+}
+
+static void
+loop_work()
+{
+ if (! MPIRPC_Check())
+ {
+ MPIRPC_Snooze();
+ MPIRPC_Flush_returns();
+ }
+ else
+ {
+ MPIRPC_Snooze_reset();
+ }
+}
+
+static void
+shutdown_others()
+{
+ NOTE_F;
+ DEBUG(dump_buckets());
+ for (int i = 0; i < KDA_SPACE_SIZE; i++)
+ for (struct list_item* item = k_bucket[i]->head; item;
+ item = item->next)
{
- MPIRPC_Snooze();
+ KDA_Neighbor* neighbor = item->data;
+ char* string = KDA_Neighbor_name(neighbor);
+ MPIRPC_Block(neighbor->node, "shutdown", NULL);
}
- else
- {
- MPIRPC_Snooze_reset();
- }
- }
+ DONE;
}
+static void
+shutdown_loop()
+{
+ NOTE_F;
+ time_t start = time(NULL);
+ while (time(NULL) - start < 2)
+ loop_work();
+ DONE;
+}
+
int KDA_Neighbor_table_size()
{
int i;
@@ -995,12 +1054,6 @@
}
char*
-id_tostring()
-{
- return KDA_id_tostring(id);
-}
-
-char*
KDA_id_tostring(KDA_ID other_id)
{
char* s_id = malloc(10*sizeof(char));
@@ -1008,6 +1061,10 @@
return s_id;
}
+/**
+ do not use this
+ @deprecated
+ */
void
dump_bucket_ids()
{
@@ -1016,38 +1073,33 @@
free(result);
}
+/*
+ do not use this
+ */
char*
bucket_ids_tostring()
{
- int i;
- struct list_item* item;
// NOTE_F;
char* result =
(char*) malloc(sizeof(char)*KDA_SPACE_SIZE*k*20);
- char* p = result;
- char* neighbor = NULL;
- bool bucket_named;
-
+ char* p = result;
p += sprintf(p, "Neighbor table for: %X \n", id);
- for (i = 0; i < KDA_SPACE_SIZE; i++)
+ for (int i = 0; i < KDA_SPACE_SIZE; i++)
{
- if (k_bucket[i]->size > 0)
+ bool bucket_named = false;
+ for (struct list_item* item = k_bucket[i]->head; item;
+ item = item->next)
{
- bucket_named = false;
- for (item = k_bucket[i]->head;
- item; item = item->next)
+ if (! bucket_named)
{
- if (! bucket_named)
- {
- bucket_named = true;
- p += sprintf(p, "bucket[%i]: \n", i);
- }
- p += sprintf(p, "\t");
- if (item->data == NULL)
- NOTE("ERROR NULL");
- neighbor = KDA_Neighbor_id_tostring(item->data);
- p += sprintf(p, "%s\n", neighbor);
+ bucket_named = true;
+ p += sprintf(p, "bucket[%i]:\n", i);
}
+ p += sprintf(p, "\t");
+ assert(item->data);
+ KDA_Neighbor* neighbor = item->data;
+ char* string = KDA_Neighbor_id_tostring(neighbor);
+ p += sprintf(p, "%s (%i)\n", string, neighbor->node.rank);
}
}
return result;
@@ -1066,37 +1118,42 @@
buckets_tostring(void)
{
// NOTE_F;
-
- int i;
- struct list_item* item;
- // NOTE_F;
char* result =
(char*) malloc(sizeof(char)*KDA_SPACE_SIZE*k*200);
- char* p = result;
- char* neighbor = NULL;
- bool bucket_named;
-
+ char* p = result;
p += sprintf(p, "Neighbor table for: %X \n", id);
- for (i = 0; i < KDA_SPACE_SIZE; i++)
+ for (int i = 0; i < KDA_SPACE_SIZE; i++)
{
- if (k_bucket[i]->size > 0)
+ bool bucket_named = false;
+ for (struct list_item* item = k_bucket[i]->head; item;
+ item = item->next)
{
- bucket_named = false;
- for (item = k_bucket[i]->head;
- item; item = item->next)
+ if (! bucket_named)
{
- if (! bucket_named)
- {
- bucket_named = true;
- p += sprintf(p, "bucket[%i]: \n", i);
- }
- p += sprintf(p, "\t");
- if (item->data == NULL)
- NOTE("ERROR NULL");
- neighbor = KDA_Neighbor_tostring(item->data);
- p += sprintf(p, "%s\n", neighbor);
+ bucket_named = true;
+ p += sprintf(p, "bucket[%i]:\n", i);
}
+ p += sprintf(p, "\t");
+ KDA_Neighbor* neighbor = item->data;
+ assert(neighbor);
+ char* string = KDA_Neighbor_tostring(neighbor);
+ p += sprintf(p, "%s\n", string);
}
}
return result;
}
+
+/**
+ Client call to shut down contacts
+ */
+void
+KDA_Shutdown(struct list* contacts)
+{
+ NOTE_F;
+ for (struct list_item* item = contacts->head; item;
+ item = item->next)
+ {
+ KDA_Neighbor* neighbor = item->data;
+ MPIRPC_Block(neighbor->node, "shutdown", NULL);
+ }
+}
Modified: src/kda-2/neighbor.c
===================================================================
--- src/kda-2/neighbor.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/kda-2/neighbor.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -8,7 +8,7 @@
static char output[100];
/**
- Create a single neighbor. Uses KDA_Neighborcreate_scan().
+ Create a single neighbor. Uses KDA_Neighbor_create_scan().
*/
KDA_Neighbor*
KDA_Neighbor_create_string(char* info)
@@ -33,9 +33,9 @@
}
/**
- Scan the stream data for neighbors.
- @return The new neighbor. n contains the number of chars consumed
- from data by sscanf().
+ Scan the data for neighbors.
+ @return The new neighbor. Output n containing the number of chars
+ consumed from data by sscanf().
*/
KDA_Neighbor*
KDA_Neighbor_create_scan(char* data, int* n)
@@ -194,7 +194,8 @@
MPI_Comm_get_name(neighbor->node.comm, name, &length);
}
- offset = sprintf(buffer, "[%X]@%s ", neighbor->id, name);
+ offset = sprintf(buffer, "[%X]=%i@%s ", neighbor->id,
+ neighbor->node.rank, name);
offset += strftime(buffer+offset, 99, "%F %T",
localtime(&neighbor->lastseen));
}
@@ -211,7 +212,8 @@
}
/**
- Scan the stream data for neighbors. Uses KDA_Neighborcreate_scan().
+ Scan the stream data for neighbors.
+ Uses KDA_Neighbor_create_scan().
@return A list of neighbors.
*/
struct list* KDA_Neighbor_node_list(char* data)
Modified: src/mpi_tools/mpi_tools.c
===================================================================
--- src/mpi_tools/mpi_tools.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/mpi_tools/mpi_tools.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -30,7 +30,7 @@
else
{
if (mpi_rank == 0)
- printf("Using single debug file \n");
+ DEBUG(puts("Debug to stdout"));
debug_file = stdout;
}
}
Modified: src/mpirpc/mpirpc.c
===================================================================
--- src/mpirpc/mpirpc.c 2011-04-27 18:19:30 UTC (rev 196)
+++ src/mpirpc/mpirpc.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -17,7 +17,7 @@
/**
For MPIRPC_Check() : Irecv()...
*/
-static MPI_Request check_request = MPI_REQUEST_NULL;
+// static MPI_Request check_request = MPI_REQUEST_NULL;
/**
For MPIRPC_Check() : Irecv()...
@@ -105,7 +105,7 @@
Register a new RPC for remote access.
*/
void
-MPIRPC_Register(char* name, void (*f)(MPIRPC_Node,int,char*,char*,int))
+MPIRPC_Register(char* name, MPIRPC_Handler_pointer(f))
{
hashtable_add(portmap, name, f);
}
@@ -258,7 +258,7 @@
*/
MPIRPC*
MPIRPC_Call_blob(MPIRPC_Node target, char* name, char* args,
- char* blob, int blob_length,
+ void* blob, int blob_length,
void* extras, void (*proceed)(MPIRPC* rpc))
{
assert(target.comm != MPI_COMM_NULL);
@@ -312,16 +312,16 @@
Copied into MPIRPC.
@param args The args to send. Copied into MPIRPC.
*/
-char*
+void*
MPIRPC_Block(MPIRPC_Node target, char* name, char* args)
{
gossip_do(MASK_MPIRPC, NOTE_FS(name));
return MPIRPC_Block_blob(target, name, args, NULL, 0);
}
-char*
+void*
MPIRPC_Block_blob(MPIRPC_Node target, char* name, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
char* result;
gossip_do(MASK_MPIRPC, NOTE_FS(name));
@@ -364,7 +364,7 @@
MPI_Recv(&args_length, 1, MPI_INT, caller.rank,
MPIRPC_TAG_LENGTH, caller.comm, &status);
- gossip_do(MASK_MPIRPC, NOTE_FS(name));
+ gossip_do(MASK_MPIRPC, SHOW_FSI(name,caller.rank));
if (args_length > 0)
{
@@ -420,7 +420,7 @@
*/
void
MPIRPC_Return(MPIRPC_Node caller, int unique,
- char* result, int length)
+ void* result, int length)
{
gossip_do(MASK_MPIRPC, SHOW_FIII(caller.rank, unique, length));
@@ -514,6 +514,7 @@
for (item = channels->head;
item; item = item->next)
{
+ NOTE("check channel");
MPIRPC_Channel* channel = (MPIRPC_Channel*) item->data;
// char* name = MPIRPC_Comm_get_name(channel->comm);
@@ -534,6 +535,8 @@
MPIRPC_Handle(check_msg, node);
}
}
+ sleep(1);
+
return event;
}
@@ -577,8 +580,7 @@
int i;
gossip_do(MASK_MPIRPC, NOTE_F);
MPI_Status status;
- for (item = garbage_values->head;
- item; item = item->next)
+ for (item = garbage_values->head; item; item = item->next)
{
MPIRPC_Value* value = (MPIRPC_Value*) item->data;
for (i = 0; i < 3; i++)
@@ -597,7 +599,7 @@
MPIRPC_Free() or MPIRPC_Destroy() as proceed-functions.
@return rpc's result when obtained.
*/
-char*
+void*
MPIRPC_Wait(MPIRPC* rpc)
{
int unique = rpc->unique;
@@ -714,8 +716,8 @@
{
if (value->result)
{
- NOTE_S("Free result: ", value->result);
- SHOW_P(value->result);
+ // NOTE_S("Free result: ", value->result);
+ // SHOW_P(value->result);
free(value->result);
}
Modified: test/adts/test-dpkm_list.c
===================================================================
--- test/adts/test-dpkm_list.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/adts/test-dpkm_list.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -139,7 +139,7 @@
dpkm_list_printdata(L2);
printf("size: %i\n", L2->size);
- while (item = dpkm_list_poll(L2))
+ while ((item = dpkm_list_poll(L2)))
extra_key_data(item);
printf("size: %i\n", L2->size);
Modified: test/adts/test-lru_table01.c
===================================================================
--- test/adts/test-lru_table01.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/adts/test-lru_table01.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -111,7 +111,7 @@
puts("draining...");
while ((kv = lru_table_poll(table)))
{
- printf("polled: %s\n", kv->key, kv->data);
+ printf("polled: %s\n", kv->key);
printdata("", kv->data, kv->length);
keyvalue_destroy(kv);
}
Modified: test/cmpi/test-manyputs.c
===================================================================
--- test/cmpi/test-manyputs.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/cmpi/test-manyputs.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -19,7 +19,7 @@
int i;
// Obtain the number of reps to perform:
- char* s = cmpi_params_search("reps");
+ char* s = cmpi_params_get("reps");
if (s == NULL || strlen(s) == 0)
{
puts("No reps given!");
Modified: test/cmpi/test-startup.c
===================================================================
--- test/cmpi/test-startup.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/cmpi/test-startup.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -4,6 +4,8 @@
Be sure to pass -n <nodes> !
*/
+#include <unistd.h>
+
#include "test_helpers.h"
void
@@ -13,9 +15,15 @@
gossip_set_debug_mask(1, MASK_MPIRPC|MASK_DHT);
- wait_for_notification();
- notify_next();
+ puts("sleeping...");
+ sleep(4);
+ // wait_for_notification();
+ // puts("notifying...");
+ // notify_next();
+
+ puts("running...");
+
// if (mpi_rank == mpi_size-1)
cmpi_shutdown();
}
Modified: test/driver/test-cmd-sleep.c
===================================================================
--- test/driver/test-cmd-sleep.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/driver/test-cmd-sleep.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -23,7 +23,7 @@
fflush(to_cmpi);
char output[32];
t = fscanf(from_cmpi, "%s", output);
- assert(t == 0);
+ assert(t == 1);
if (!(output[0] == 'o' &&
output[1] == 'k'))
Modified: test/driver/test.zsh
===================================================================
--- test/driver/test.zsh 2011-04-27 18:19:30 UTC (rev 196)
+++ test/driver/test.zsh 2011-04-28 19:53:16 UTC (rev 197)
@@ -30,7 +30,7 @@
${LAUNCH} -n ${TOTAL} ${DRIVER} -n ${NODES} ${CMPI_OPTS} > ${OUTPUT} &
DRIVER_PID=${!}
-tools/timebomb.zsh ${$} 20 ${OUTPUT} $0 &
+tools/timebomb.zsh ${$} 30 ${OUTPUT} $0 &
BOMB_PID=${!}
sleep 3
Modified: test/mpi_tools/test-tools01.c
===================================================================
--- test/mpi_tools/test-tools01.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/mpi_tools/test-tools01.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -11,6 +11,7 @@
MPI_Init(&argc, &argv);
whoami();
+ setup_debug_file();
int i;
for (i = 0; i < 20; i++)
Modified: test/mpirpc/test-args.c
===================================================================
--- test/mpirpc/test-args.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/mpirpc/test-args.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -11,7 +11,7 @@
void
handle_test(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
show_fs(args);
MPIRPC_Null(caller, unique);
@@ -25,9 +25,8 @@
MPI_Init(&argc, &argv);
MPIRPC_Init();
whoami();
+ setup_debug_file();
- dmalloc_setup();
-
MPIRPC_Comm_add(MPI_COMM_WORLD);
MPIRPC_Node neighbor;
Modified: test/mpirpc/test-blob.c
===================================================================
--- test/mpirpc/test-blob.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/mpirpc/test-blob.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -13,17 +13,20 @@
void
handle_test(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+ void* blob, int blob_length)
{
int a;
sscanf(args, "%i", &a);
printf("handle_test: %i\n", a);
- assert(blob[0] == '\n');
- assert(blob[1] == '\0');
- assert(blob[2] == '\t');
- assert(blob[3] == '\n');
+ assert(blob_length == 4);
+ char* data = (char*) blob;
+ assert(data[0] == '\n');
+ assert(data[1] == '\0');
+ assert(data[2] == '\t');
+ assert(data[3] == '\n');
+
printf("asserts ok\n");
MPIRPC_Null(caller, unique);
Modified: test/mpirpc/test-ping.c
===================================================================
--- test/mpirpc/test-ping.c 2011-04-27 18:19:30 UTC (rev 196)
+++ test/mpirpc/test-ping.c 2011-04-28 19:53:16 UTC (rev 197)
@@ -1,8 +1,7 @@
-
/**
- Simple two-processor ping pong test.
- No arguments or return values.
-*/
+ Simple two-processor ping pong test.
+ No arguments or return values.
+ */
#include <unistd.h>
@@ -12,15 +11,14 @@
#include "test_helpers.h"
-int count = 0;
+int count = 0;
bool running = true;
void
-handle_ping(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+handle_ping(MPIRPC_Node caller, int unique, char* args, void* blob,
+ int blob_length)
{
- NOTE_F;
- SHOW_S(args);
+ puts("handle_ping");
MPIRPC_Null(caller, unique);
if (++count < 3)
{
@@ -35,11 +33,10 @@
}
void
-handle_quit(MPIRPC_Node caller, int unique, char* args,
- char* blob, int blob_length)
+handle_quit(MPIRPC_Node caller, int unique, char* args, void* blob,
+ int blob_length)
{
- NOTE_F;
- SHOW_S(args);
+ puts("handle_quit");
running = false;
MPIRPC_Null(caller, unique);
}
@@ -52,12 +49,13 @@
...
[truncated message content] |