[C-MPI-commits] SF.net SVN: c-mpi:[201] test/cmpi
Status: Pre-Alpha
Brought to you by:
jmwozniak
|
From: <jmw...@us...> - 2011-05-02 22:55:55
|
Revision: 201
http://c-mpi.svn.sourceforge.net/c-mpi/?rev=201&view=rev
Author: jmwozniak
Date: 2011-05-02 22:55:48 +0000 (Mon, 02 May 2011)
Log Message:
-----------
test-putget works for KDA and is valgrind-clean
Modified Paths:
--------------
include/cmpi.h
include/cmpi_disk.h
include/gossip.h
include/kda-2.h
include/kda_conn-A.h
include/kda_neighbor-2.h
include/kda_types-2.h
include/list.h
src/adts/list.c
src/cmpi/cmpi.c
src/cmpi/cmpi_disk_printf.c
src/cmpi/cmpi_disk_void.c
src/cmpi/driver.c
src/kda-2/cmpi_kademlia.c
src/kda-2/conn-A.c
src/kda-2/conn-B.c
src/kda-2/kademlia.c
src/kda-2/neighbor.c
src/mpi_tools/mpi_tools.c
src/mpirpc/mpirpc.c
test/adts/test-lru_table01.c
test/cmpi/test-putget.c
test/cmpi/test-update01.c
test/cmpi/test-update02.c
Modified: include/cmpi.h
===================================================================
--- include/cmpi.h 2011-05-02 15:06:20 UTC (rev 200)
+++ include/cmpi.h 2011-05-02 22:55:48 UTC (rev 201)
@@ -200,17 +200,17 @@
@param value Store pointer to value here
@param length OUT The value length
*/
-CMPI_RETURN cmpi_get(char* key, char** value, int* length);
+CMPI_RETURN cmpi_get(char* key, void** value, int* length);
/**
Store a key/value pair in CMPI.
*/
-CMPI_RETURN cmpi_put(char* key, char* value, int length);
+CMPI_RETURN cmpi_put(char* key, void* value, int length);
/**
Update byte region in CMPI key/value pair.
*/
-CMPI_RETURN cmpi_update(char* key, char* value,
+CMPI_RETURN cmpi_update(char* key, void* value,
int length, int offset);
/**
@@ -223,9 +223,9 @@
//// Cached disk operations:
-int cmpi_cached_retrieve(char* key, char** data);
-void cmpi_cached_store(char* key, char* data, int length);
-void cmpi_cached_update(char* key, char* data,
+int cmpi_cached_retrieve(char* key, void** data);
+void cmpi_cached_store(char* key, void* data, int length);
+void cmpi_cached_update(char* key, void* data,
int offset, int length);
/**
@@ -233,6 +233,13 @@
*/
void cmpi_shutdown(void);
+void cmpi_assert_impl(CMPI_RETURN code, const char* message,
+ const char* func, const char* file, int line);
+#define cmpi_assert(code, message) \
+ cmpi_assert_impl(code,message,__func__,__FILE__,__LINE__)
+
+int cmpi_return_tostring(char* output, CMPI_RETURN code);
+
void cmpi_cleanup(void);
void cmpi_client_cleanup(void);
Modified: include/cmpi_disk.h
===================================================================
--- include/cmpi_disk.h 2011-05-02 15:06:20 UTC (rev 200)
+++ include/cmpi_disk.h 2011-05-02 22:55:48 UTC (rev 201)
@@ -3,6 +3,6 @@
* Select disk operations based on configure-time value of DISK_TYPE.
* */
-void cmpi_disk_storepair(char* key, char* value, int length);
-int cmpi_disk_loadpair(char* key, char** value);
+void cmpi_disk_storepair(char* key, void* value, int length);
+int cmpi_disk_loadpair(char* key, void** value);
Modified: include/gossip.h
===================================================================
--- include/gossip.h 2011-05-02 15:06:20 UTC (rev 200)
+++ include/gossip.h 2011-05-02 22:55:48 UTC (rev 201)
@@ -20,7 +20,9 @@
#ifndef __GOSSIP_H
#define __GOSSIP_H
+#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 500
+#endif
#include <errno.h>
#include <stdio.h>
Modified: include/kda-2.h
===================================================================
--- include/kda-2.h 2011-05-02 15:06:20 UTC (rev 200)
+++ include/kda-2.h 2011-05-02 22:55:48 UTC (rev 201)
@@ -78,7 +78,7 @@
//// Helpers...
-void KDA_Join(int other_id, int other_rank);
+void KDA_Join(MPIRPC_Node node, KDA_Neighbor* neighbor);
void KDA_Setup_node_port(void);
// void KDA_Serve(void);
@@ -91,6 +91,10 @@
// void KDA_Setup_connector_port(void);
// void KDA_Connect_port(char* port);
void KDA_Attach(struct list* contacts);
+/**
+ Attach to given neighbor if not already attached
+ */
+bool KDA_Attach_to(KDA_Neighbor* neighbor, struct list* contacts);
// MPIRPC_Node* KDA_Attach_port(char* port);
// MPIRPC_Node* KDA_Register_hub(MPI_Comm comm);
Modified: include/kda_conn-A.h
===================================================================
--- include/kda_conn-A.h 2011-05-02 15:06:20 UTC (rev 200)
+++ include/kda_conn-A.h 2011-05-02 22:55:48 UTC (rev 201)
@@ -5,8 +5,6 @@
MPI_Comm KDA_Comm_create(int* other_rank);
-void KDA_Join(int other_id, int other_rank);
-
/**
Translate a KDA_ID to a rank in MPI_COMM_WORLD.
*/
Modified: include/kda_neighbor-2.h
===================================================================
--- include/kda_neighbor-2.h 2011-05-02 15:06:20 UTC (rev 200)
+++ include/kda_neighbor-2.h 2011-05-02 22:55:48 UTC (rev 201)
@@ -4,29 +4,32 @@
#include <stdbool.h>
-#include "kda_types-2.h"
+#include "kda_types-2.h"
#include "kda-2.h"
/**
- Scan the stream data for neighbors.
+ Scan the stream data for neighbors.
@return The new neighbor. n contains the number of chars consumed
from data by sscanf().
*/
KDA_Neighbor* KDA_Neighbor_create_scan(char* data, int* n);
-KDA_Neighbor* KDA_Neighbor_create_string(char* info);
-KDA_Neighbor* KDA_Neighbor_create_id(KDA_ID id, MPIRPC_Node node);
+KDA_Neighbor* KDA_Neighbor_create_string(char* info);
+KDA_Neighbor* KDA_Neighbor_create_id(KDA_ID id, MPIRPC_Node node);
+void KDA_Neighbor_make_id(KDA_ID other_id, MPIRPC_Node node,
+ KDA_Neighbor* output);
+KDA_Neighbor* KDA_Neighbor_clone(KDA_Neighbor* original);
-//// cmp()s...
+//// cmp()s...
int KDA_Neighbor_node_cmp(void* object1, void* object2);
/**
Compare ids.
@return 1 iff n1 > n2, -1 iff n1 < n2, 0 iff n1 == n2
*/
-int KDA_Neighbor_cmp(void* object1, void* object2);
+int KDA_Neighbor_cmp(void* object1, void* object2);
-/**
- Compare lastseen times for neighbors n1, n2.
+/**
+ Compare lastseen times for neighbors n1, n2.
@return 1 iff n1 > n2, -1 iff n1 < n2, 0 iff n1 == n2
*/
int KDA_Neighbor_time_cmp(void* object1, void* object2);
@@ -37,9 +40,9 @@
*/
char* KDA_Neighbor_id_tostring(void* object);
char* KDA_Neighbor_name(void* object);
-// int KDA_Neighbor_name_sprint(char* buffer, void* object);
+// int KDA_Neighbor_name_sprint(char* buffer, void* object);
char* KDA_Neighbor_tostring(void* object);
-int KDA_Neighbor_sprint(char* buffer, void* object);
+int KDA_Neighbor_sprint(char* buffer, void* object);
void KDA_Neighbor_dump(void* object);
//// Input methods:
Modified: include/kda_types-2.h
===================================================================
--- include/kda_types-2.h 2011-05-02 15:06:20 UTC (rev 200)
+++ include/kda_types-2.h 2011-05-02 22:55:48 UTC (rev 201)
@@ -2,22 +2,22 @@
#ifndef KADEMLIA_TYPES_2_H
#define KADEMLIA_TYPES_2_H
-#include <cmpi.h>
+#include <cmpi.h>
typedef CMPI_ID KDA_ID;
typedef struct neighbor
{
- MPIRPC_Node node;
+ MPIRPC_Node node;
KDA_ID id;
- time_t lastseen;
+ time_t lastseen;
} KDA_Neighbor;
/**
Contains neighbors. A bucket is all neighbors of a given
distance 2^i - 2^i+1
*/
-typedef struct list* K_BUCKET;
+typedef struct list* K_BUCKET;
typedef struct kda_query KDA_Query;
typedef struct kda_operation KDA_Operation;
@@ -30,8 +30,8 @@
MPIRPC_Node caller;
int unique;
char* result;
- void (*service)(KDA_Operation* op);
-};
+ void (*service)(KDA_Operation* op);
+};
/**
Performs an operation.
@@ -40,38 +40,38 @@
2) When proceed_find converges, it
calls map.
3) map either maps the function name out (if name given),
- proceeding with the proceed function, or
- returns the query.
+ proceeding with the proceed function, or
+ returns the query.
*/
struct kda_operation
{
KDA_ID object_id;
- int unique;
- // An MPIRPC function symbolic name to call on found nodes:
+ int id;
+ // An MPIRPC function symbolic name to call on found nodes:
char* name;
char* args;
- // Maps distance to neighbors:
+ // Maps distance to neighbors:
struct ilist* k_closest;
- // neighbors that have been searched but not heard from:
+ // neighbors that have been searched but not heard from:
struct list* outstanding;
- // neighbors that have been searched for this operation:
+ // neighbors that have been searched for this operation:
struct list* contacted;
- // neighbors that have returned the RPC for this operation:
+ // neighbors that have returned the RPC for this operation:
struct list* returned;
- // If the latest rpc_find_node calls improved our search.
+ // If the latest rpc_find_node calls improved our search.
bool improved;
int status;
- // How to proceed after map(name):
+ // How to proceed after map(name):
void (*proceed)(MPIRPC* op);
- // Incoming query to respond to that triggered this operation:
- KDA_Query* query;
-};
+ // Incoming query to respond to that triggered this operation:
+ KDA_Query* query;
+};
-typedef struct
+typedef struct
{
void (*method)(MPIRPC_Node,void*);
KDA_Neighbor* neighbor;
- void* args;
+ void* args;
} KDA_RPC_Extras;
#endif
Modified: include/list.h
===================================================================
--- include/list.h 2011-05-02 15:06:20 UTC (rev 200)
+++ include/list.h 2011-05-02 22:55:48 UTC (rev 201)
@@ -62,6 +62,12 @@
void* list_inspect(struct list* target, void* data, size_t n);
/**
+ True if the comparator finds a match for arg
+*/
+bool list_matches(struct list* target, int (*cmp)(void*,void*),
+ void* arg);
+
+/**
Removes only one item that points to given data.
Does not free the item data.
@return True iff the data pointer was matched
@@ -83,6 +89,12 @@
int (*cmp)(void*,void*), void* arg);
/**
+ Return the first data element from the list where f(data,arg).
+*/
+void* list_select_one(struct list* target, int (*cmp)(void*,void*),
+ void* arg);
+
+/**
Remove the element from the list where f(data,arg).
*/
bool list_remove_where(struct list* target,
Modified: src/adts/list.c
===================================================================
--- src/adts/list.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/adts/list.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -354,6 +354,19 @@
return NULL;
}
+bool
+list_matches(struct list* target, int (*cmp)(void*,void*), void* arg)
+{
+ assert(target != NULL);
+
+ for (struct list_item* item = target->head; item;
+ item = item->next)
+ if (cmp(item->data, arg) == 0)
+ return true;
+
+ return false;
+}
+
/**
Empty the list.
*/
@@ -460,6 +473,20 @@
return result;
}
+void*
+list_select_one(struct list* target,
+ int (*cmp)(void*,void*), void* arg)
+{
+ assert(target != NULL);
+
+ for (struct list_item* item = target->head; item;
+ item = item->next)
+ if (cmp(item->data, arg) == 0)
+ return item->data;
+
+ return NULL;
+}
+
/**
Remove the elements from the list where cmp(data,arg) == 0.
@return true if one or more items were deleted.
Modified: src/cmpi/cmpi.c
===================================================================
--- src/cmpi/cmpi.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/cmpi/cmpi.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -108,7 +108,7 @@
@return length of data, 0 if data is NULL
*/
int
-cmpi_cached_retrieve(char* key, char** data)
+cmpi_cached_retrieve(char* key, void** data)
{
int result;
NOTE_F;
@@ -151,7 +151,7 @@
/**
*/
void
-cmpi_cached_store(char* key, char* data, int length)
+cmpi_cached_store(char* key, void* data, int length)
{
struct keyvalue* kv;
SHOW_FSI(key, length);
@@ -169,7 +169,7 @@
Does not use cmpi_disk.
*/
void
-cmpi_cached_update(char* key, char* data, int offset, int length)
+cmpi_cached_update(char* key, void* data, int offset, int length)
{
SHOW_FSII(key,offset,length);
printdata("update: ", data, length);
@@ -231,3 +231,40 @@
hashtable_destroy(cmpi_params);
MPIRPC_Finalize();
}
+
+void
+cmpi_assert_impl(CMPI_RETURN code, const char* message,
+ const char* func, const char* file, int line)
+{
+ if (code == CMPI_SUCCESS)
+ return;
+
+ char type[64];
+ if (message == NULL)
+ message = "";
+ cmpi_return_tostring(type, code);
+ printf("c-mpi: assert failed: %s: %s\n", type, message);
+ printf(" in: %s (%s:%i)\n", func, file, line);
+ exit(code);
+}
+
+int
+cmpi_return_tostring(char* output, CMPI_RETURN code)
+{
+ int length = 0;
+ if (code == CMPI_SUCCESS)
+ length = sprintf(output, "CMPI_SUCCESS");
+ if (code == CMPI_ERROR_UNKNOWN)
+ length = sprintf(output, "CMPI_ERROR_UNKNOWN");
+ if (code == CMPI_DOESNT_EXIST)
+ length = sprintf(output, "CMPI_DOESNT_EXIST");
+ if (code == CMPI_ERROR_FAULT)
+ length = sprintf(output, "CMPI_ERROR_FAULT");
+ if (code == CMPI_SUCCESS)
+ length = sprintf(output, "CMPI_ERROR_SERVICENAME");
+ if (code == CMPI_ERROR_NEIGHBORS)
+ length = sprintf(output, "CMPI_ERROR_NEIGHBORS");
+ else assert(false);
+
+ return length;
+}
Modified: src/cmpi/cmpi_disk_printf.c
===================================================================
--- src/cmpi/cmpi_disk_printf.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/cmpi/cmpi_disk_printf.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -2,20 +2,20 @@
/**
* Dumps disk operations to the screen.
* Used if configure --with-disk-printf is used.
- * */
+ * */
-#include <stdio.h>
+#include <stdio.h>
-#include <mpi_tools.h>
+#include <mpi_tools.h>
void
-cmpi_disk_storepair(char* key, char* value)
+cmpi_disk_storepair(char* key, void* value)
{
- NOTE_F;
- note_ss("DISK_STOREPAIR: ", key, value);
+ NOTE_F;
+ printf("DISK_STOREPAIR: %s\n", key);
}
-void cmpi_disk_loadpair(char* key, char** value)
+void cmpi_disk_loadpair(char* key, void** value)
{
- note_s("DISK_LOADPAIR: ", key);
+ printf("DISK_LOADPAIR: %s\n", key);
}
Modified: src/cmpi/cmpi_disk_void.c
===================================================================
--- src/cmpi/cmpi_disk_void.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/cmpi/cmpi_disk_void.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -12,7 +12,7 @@
NOTE_FS(key);
}
-int cmpi_disk_loadpair(char* key, char** value)
+int cmpi_disk_loadpair(char* key, void** value)
{
NOTE_F;
*value = NULL;
Modified: src/cmpi/driver.c
===================================================================
--- src/cmpi/driver.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/cmpi/driver.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -153,7 +153,7 @@
NOTE_FS(key);
- char* value;
+ void* value;
int length;
cmpi_get(key, &value, &length);
Modified: src/kda-2/cmpi_kademlia.c
===================================================================
--- src/kda-2/cmpi_kademlia.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/kda-2/cmpi_kademlia.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -73,11 +73,9 @@
void
add_contacts()
{
- int i;
- struct list_item* item;
NOTE_F;
- for (i = 0; i < KDA_SPACE_SIZE; i++)
- for (item = k_bucket[i]->head;
+ for (int i = 0; i < KDA_SPACE_SIZE; i++)
+ for (struct list_item* item = k_bucket[i]->head;
item; item = item->next)
{
KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
@@ -87,22 +85,26 @@
}
}
+/**
+ If the neighbor is unknown, attach, copy and store in contacts list.
+ @param Neighbor to find via id
+ @return Internal copy of neighbor
+ */
KDA_Neighbor*
contact_lookup(KDA_Neighbor* neighbor)
{
NOTE_FX(neighbor->id);
- for (struct list_item* item = contacts->head;
- item; item = item->next)
- {
- KDA_Neighbor* n = (KDA_Neighbor*) item->data;
- if (n->id == neighbor->id)
- return n;
- }
- KDA_Join(neighbor->id, neighbor->node.comm);
+ KDA_Neighbor* copy =
+ list_select_one(contacts, KDA_Neighbor_cmp, neighbor);
+ if (copy)
+ return copy;
+
+ puts("cloneing");
+ copy = KDA_Neighbor_clone(neighbor);
+ KDA_Attach_to(copy, contacts);
add_contacts();
-
- return contact_lookup(neighbor);
+ return copy;
}
char*
@@ -159,23 +161,28 @@
}
CMPI_RETURN
-cmpi_put(char* key, char* value, int length)
+cmpi_put(char* key, void* value, int length)
{
- struct list_item* item;
- NOTE_FSS(key, value);
+ NOTE_FS(key);
// printf("contacts: %i \n", contacts);
KDA_Neighbor* n = (KDA_Neighbor*) list_random(contacts);
+ char tmp[64];
+ KDA_Neighbor_sprint(tmp, n);
+
MPIRPC* lookup = KDA_Lookup_k(n->node, key);
MPIRPC_Wait(lookup);
-
struct list* neighbors = KDA_Neighbor_node_list(lookup->result);
+ free(lookup->result);
+ MPIRPC_Free(lookup);
- for (item = neighbors->head;
- item; item = item->next)
+ for (struct list_item* item = neighbors->head; item;
+ item = item->next)
{
KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
+ DEBUG(KDA_Neighbor_sprint(tmp, neighbor);
+ printf("neighbor: %s\n", tmp););
MPIRPC* rpc = KDA_Store(neighbor, key, value, length);
MPIRPC_Wait(rpc);
MPIRPC_Free(rpc);
@@ -188,32 +195,37 @@
}
CMPI_RETURN
-cmpi_get(char* key, char** value, int* length)
+cmpi_get(char* key, void** value, int* length)
{
- struct list_item* item;
NOTE_FS(key);
KDA_Neighbor* n = (KDA_Neighbor*) list_random(contacts);
MPIRPC* lookup = KDA_Lookup_k(n->node, key);
MPIRPC_Wait(lookup);
-
struct list* neighbors = KDA_Neighbor_node_list(lookup->result);
- NOTE_I("neighbors->size ", neighbors->size);
+ free(lookup->result);
+ MPIRPC_Free(lookup);
+ SHOW_I(neighbors->size);
+ if (neighbors->size == 0)
+ return CMPI_ERROR_NEIGHBORS;
- for (item = neighbors->head;
- item; item = item->next)
+ for (struct list_item* item = neighbors->head; item;
+ item = item->next)
{
- KDA_Neighbor* neighbor_found = (KDA_Neighbor*) item->data;
- NOTE("retrieve from: ");
- KDA_Neighbor_dump(neighbor_found);
+ KDA_Neighbor* neighbor_found = item->data;
+ DEBUG(puts("retrieve from: ");
+ KDA_Neighbor_dump(neighbor_found););
KDA_Neighbor* neighbor = contact_lookup(neighbor_found);
- KDA_Neighbor_dump(neighbor);
+ DEBUG(KDA_Neighbor_dump(neighbor));
MPIRPC* rpc = KDA_Retrieve(neighbor, key);
MPIRPC_Wait(rpc);
*value = rpc->result;
+ *length = rpc->result_length;
NOTE_S("value: ", *value);
MPIRPC_Free(rpc);
+ // Assume first retrieval is successful:
+ break;
}
list_destroy(neighbors);
@@ -223,20 +235,20 @@
}
CMPI_RETURN
-cmpi_update(char* key, char* value, int length, int offset)
+cmpi_update(char* key, void* value, int length, int offset)
{
- struct list_item* item;
- NOTE_FSS(key, value);
+ NOTE_FS(key);
// printf("contacts: %i \n", contacts);
KDA_Neighbor* n = (KDA_Neighbor*) list_random(contacts);
MPIRPC* lookup = KDA_Lookup_k(n->node, key);
MPIRPC_Wait(lookup);
-
struct list* neighbors = KDA_Neighbor_node_list(lookup->result);
+ MPIRPC_Free(lookup);
- for (item = neighbors->head; item; item = item->next)
+ for (struct list_item* item = neighbors->head; item;
+ item = item->next)
{
KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
MPIRPC* rpc = KDA_Update(neighbor, key, value, length, offset);
Modified: src/kda-2/conn-A.c
===================================================================
--- src/kda-2/conn-A.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/kda-2/conn-A.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -1,4 +1,9 @@
+#ifndef _XOPEN_SOURCE
+#define _XOPEN_SOURCE
+#endif
+#include <time.h>
+
#include "kda_conn-A.h"
#include "kda_neighbor-2.h"
#include "cmpi_mode.h"
@@ -36,7 +41,11 @@
KDA_ID other_id;
sscanf(result, "%X", &other_id);
free(result);
- KDA_Join(other_id, other_rank);
+ KDA_Neighbor neighbor;
+ KDA_Neighbor_make_id(other_id, node, &neighbor);
+ bool added = neighbor_add(&neighbor);
+ MPIRPC_Node dummy = {0};
+ KDA_Join(dummy, &neighbor);
}
/*
@@ -68,28 +77,36 @@
sscanf(result, "%X", &other_id);
free(result);
KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
- if (list_add_unique(contacts, KDA_Neighbor_cmp, neighbor))
+ if (KDA_Attach_to(neighbor, contacts))
+ {
NOTE_XI("added neighbor: ", other_id, neighbor->node.comm);
+ assert(neighbor->node.comm != MPI_COMM_NULL);
+ }
else
free(neighbor);
}
}
+bool
+KDA_Attach_to(KDA_Neighbor* neighbor, struct list* contacts)
+{
+ return list_add_unique(contacts, KDA_Neighbor_cmp, neighbor);
+}
+
/**
Form node->node connections
-
+ Neighbor should already be in k_buckets, this does not do that
+ @param node Referring node- not used in conn-A (no 3-way)
+ @param neighbor The new neighbor
*/
void
-KDA_Join(int other_id, int other_rank)
+KDA_Join(MPIRPC_Node node, KDA_Neighbor* neighbor)
{
MPIRPC_Node other_node;
- MPIRPC_Node_make(MPI_COMM_WORLD, other_rank, &other_node);
+ MPIRPC_Node_make(MPI_COMM_WORLD, neighbor->node.rank, &other_node);
char args[64];
sprintf(args, "%X %i", id, mpi_rank);
MPIRPC_Block(other_node, "join", args);
-
- KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, other_node);
- neighbor_add(neighbor);
}
void
@@ -105,8 +122,10 @@
{
MPIRPC_Node node;
MPIRPC_Node_make(caller.comm, caller.rank, &node);
- KDA_Neighbor* neighbor = KDA_Neighbor_create_id(other_id, node);
- neighbor_add(neighbor);
+
+ KDA_Neighbor neighbor;
+ KDA_Neighbor_make_id(other_id, node, &neighbor);
+ neighbor_add(&neighbor);
hashtable_add(world_ranks, xheap(other_id), other_rank);
}
@@ -137,3 +156,32 @@
KDA_Detach_hubs()
{}
+KDA_Neighbor*
+KDA_Neighbor_create_scan(char* data, int* n)
+{
+ NOTE_F;
+ if (data == NULL)
+ return NULL;
+ int other_rank;
+ KDA_ID other_id;
+ char junk[64];
+ char date[128];
+ char time[64];
+ int count = sscanf(data, "[%X]=%i@%s %s %s %n",
+ &other_id, &other_rank, junk, date, time, n);
+
+ if (count < 2)
+ return NULL;
+ KDA_Neighbor* nbor = malloc(sizeof(KDA_Neighbor));
+ nbor->id = other_id;
+ nbor->node.rank = other_rank;
+ nbor->node.comm = MPI_COMM_WORLD;
+ struct tm t = {0};
+ strcat(date, " ");
+ strcat(date, time);
+ SHOW(date);
+ char* c = strptime(date, "%F %T", &t);
+ assert(c);
+ nbor->lastseen = mktime(&t);
+ return nbor;
+}
Modified: src/kda-2/conn-B.c
===================================================================
--- src/kda-2/conn-B.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/kda-2/conn-B.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -508,3 +508,24 @@
free(hub);
}
}
+
+KDA_Neighbor*
+KDA_Neighbor_create_scan(char* data, int* n)
+{
+ if (data == NULL)
+ return NULL;
+ KDA_ID other_id;
+ char junk[20];
+ char date[20];
+ char time[20];
+ int count = sscanf(data, "[%X]@%s %s %s %n", &other_id,
+ junk, date, time, n);
+
+ if (count < 2)
+ return NULL;
+ KDA_Neighbor* nbor = malloc(sizeof(KDA_Neighbor));
+ nbor->id = other_id;
+ nbor->node.rank = 0;
+ nbor->node.comm = MPI_COMM_NULL;
+ return nbor;
+}
Modified: src/kda-2/kademlia.c
===================================================================
--- src/kda-2/kademlia.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/kda-2/kademlia.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -88,6 +88,7 @@
char* bucket_ids_tostring(void);
void dump_buckets(void);
char* buckets_tostring(void);
+char* kda_op_tostring(KDA_Operation* op);
static KDA_ID make_id(int salt);
// static char* id_tostring(void);
@@ -406,9 +407,10 @@
KDA_Translate_k(MPIRPC_Node node, KDA_ID id)
{
NOTE_FX(id);
+ char sid[32];
+ sprintf(sid, "%X", id);
MPIRPC* rpc =
- MPIRPC_Call(node, "query_id_k", iheap(id),
- NULL, MPIRPC_PROCEED_NULL);
+ MPIRPC_Call(node, "query_id_k", sid, NULL, MPIRPC_PROCEED_NULL);
return rpc;
}
@@ -480,7 +482,7 @@
KDA_Comm_set_name(KDA_Neighbor* neighbor)
{
NOTE_FX(neighbor->id);
- char name[20];
+ char name[32];
sprintf(name, "%X", neighbor->id);
MPI_Comm_set_name(neighbor->node.comm, name);
}
@@ -505,9 +507,9 @@
void* blob, int blob_length)
{
NOTE_S("store: ", args);
- cmpi_cached_store(args, blob, blob_length);
+ char* key = strdup(args);
+ cmpi_cached_store(key, blob, blob_length);
DEBUG(lru_table_printf("%s", cmpi_cache));
-
MPIRPC_Null(caller, unique);
}
@@ -541,12 +543,14 @@
NOTE_FS(args);
DEBUG(lru_table_printf("%s", cmpi_cache));
- char* data;
+ void* data;
int length;
length = cmpi_cached_retrieve(args, &data);
+ void* copy = malloc(length);
+ memcpy(copy, data, length);
- SHOW_S(data);
- MPIRPC_Return(caller, unique, data, length);
+ SHOW_S(copy);
+ MPIRPC_Return(caller, unique, copy, length);
}
void
@@ -562,15 +566,16 @@
rpc_find_node(KDA_Neighbor* neighbor, KDA_Operation* op)
{
NOTE_FXX(neighbor->id, op->object_id);
- MPIRPC_Call(neighbor->node, heap("find_node"), xheap(op->object_id),
- op, proceed_find);
+ char xid[32];
+ sprintf(xid, "%X", op->object_id);
+ MPIRPC_Call(neighbor->node, "find_node", xid, op, proceed_find);
}
void
handle_find_node(MPIRPC_Node caller, int unique, char* args,
void* blob, int blob_length)
{
- struct ilist_item* item;
+ ;
NOTE_FS(args);
// Decode...
int object_id;
@@ -583,8 +588,8 @@
char* result = (char*) malloc(1024*sizeof(char));
char* s = result;
s += sprintf(s, "%i ", known->size);
- for (item = known->head;
- item; item = item->next)
+ for (struct ilist_item* item = known->head; item;
+ item = item->next)
s += sprintf(s, "%s ", KDA_Neighbor_name(item->data));
ilist_free(known);
@@ -646,12 +651,13 @@
char* p = result;
while (op->k_closest->size > 0)
{
- NOTE("while");
- KDA_Neighbor* n =
- (KDA_Neighbor*) ilist_poll(op->k_closest);
+ KDA_Neighbor* n = (KDA_Neighbor*) ilist_poll(op->k_closest);
p += KDA_Neighbor_sprint(p, n);
- *(p++) = ' ';
+ if (op->k_closest->size > 0)
+ p += sprintf(p, " ");
}
+
+ SHOW_S(result);
MPIRPC_Return(op->query->caller, op->query->unique,
result, strlen(result)+1);
// free op
@@ -663,13 +669,12 @@
void
KDA_Map(KDA_Operation* op)
{
- struct ilist_item* item;
- // NOTE_F;
+ NOTE_F;
if (op->name)
{
- // printf("map: %s \n", op->name);
- for (item = op->k_closest->head;
- item; item = item->next)
+ printf("map: %s \n", op->name);
+ for (struct ilist_item* item = op->k_closest->head; item;
+ item = item->next)
{
KDA_Neighbor* n = (KDA_Neighbor*) item->data;
if (n->id == id)
@@ -680,9 +685,10 @@
}
else if (op->query)
{
- // NOTE("op->query");
+ NOTE("op->query");
op->query->service(op);
}
+ else assert(false);
op->returned = list_create();
}
@@ -690,18 +696,17 @@
void
proceed_find(MPIRPC* rpc)
{
- struct list_item* item;
NOTE_F;
KDA_Operation* op = (KDA_Operation*) rpc->extras;
list_remove_where(op->outstanding, KDA_Neighbor_node_cmp,
&rpc->target);
- NOTE("from: ");
- NOTE(MPIRPC_Comm_get_name(rpc->target.comm));
+ // NOTE("from: ");
+ // NOTE(MPIRPC_Comm_get_name(rpc->target.comm));
NOTE("outstanding: ");
- list_output(KDA_Neighbor_tostring, op->outstanding);
+ DEBUG(list_output(KDA_Neighbor_tostring, op->outstanding));
int size;
int n;
@@ -710,7 +715,8 @@
p += n;
struct list* neighbors = KDA_Neighbor_node_list(p);
- for (item = neighbors->head; item; item = item->next)
+ for (struct list_item* item = neighbors->head; item;
+ item = item->next)
{
KDA_Neighbor* neighbor = (KDA_Neighbor*) item->data;
neighbor = check_neighbor(rpc->target, neighbor);
@@ -730,8 +736,11 @@
ilist_pop(op->k_closest);
}
}
- list_free(neighbors);
+ list_destroy(neighbors);
+ free(rpc->result);
+ MPIRPC_Free(rpc);
+
if (op->outstanding->size == 0)
{
if (op->improved)
@@ -739,7 +748,7 @@
if (! KDA_Find_node_again(op))
KDA_Map(op);
else
- NOTE("Doing another round. ");
+ NOTE("Doing another round.");
}
else
KDA_Map(op);
@@ -758,15 +767,13 @@
KDA_Neighbor*
neighbor_lookup(KDA_Neighbor* neighbor)
{
- int i;
- struct list_item* item;
NOTE_F;
if (neighbor->id == id)
return NULL;
- for (i = 0; i < KDA_SPACE_SIZE; i++)
- for (item = k_bucket[i]->head;
- item; item = item->next)
+ for (int i = 0; i < KDA_SPACE_SIZE; i++)
+ for (struct list_item* item = k_bucket[i]->head; item;
+ item = item->next)
{
KDA_Neighbor* n = (KDA_Neighbor*) item->data;
if (n->id == neighbor->id)
@@ -787,7 +794,7 @@
KDA_Neighbor* result;
if (neighbor_add(neighbor))
- KDA_Join(neighbor->id, node.rank);
+ KDA_Join(node, neighbor);
result = neighbor_lookup(neighbor);
return result;
@@ -853,7 +860,7 @@
(KDA_Operation*) malloc(sizeof(KDA_Operation));
op->object_id = object_id;
- op->unique = unique++;
+ op->id = unique++;
op->name = name;
op->args = args;
op->k_closest = NULL;
@@ -864,7 +871,7 @@
op->proceed = proceed;
op->query = query;
- itable_add(operations, op->unique, op);
+ itable_add(operations, op->id, op);
KDA_Find_node(op);
return op;
@@ -876,7 +883,7 @@
list_free(op->contacted);
list_free(op->outstanding);
ilist_free(op->k_closest);
- itable_remove(operations, op->unique);
+ itable_remove(operations, op->id);
free(op);
}
@@ -925,7 +932,8 @@
}
/**
- @return false if n is already in the neighbor table.
+ @return True if copy of n is added to the neighbor table -
+ use neighbor_lookup to obtain this copy after a KDA_Join
*/
bool
k_bucket_insert(int i, KDA_Neighbor* n)
@@ -938,7 +946,8 @@
return false;
}
- list_ordered_insert(k_bucket[i], KDA_Neighbor_time_cmp, (void*) n);
+ KDA_Neighbor* copy = KDA_Neighbor_clone(n);
+ list_ordered_insert(k_bucket[i], KDA_Neighbor_time_cmp, copy);
NOTE("After insert: ");
DEBUG(dump_buckets());
@@ -947,7 +956,7 @@
}
/**
- @return false if n is already in the neighbor table.
+ @return True if copy of neighbor is added to the neighbor table.
*/
bool
neighbor_add(KDA_Neighbor* neighbor)
@@ -996,7 +1005,7 @@
while (! cmpi_quitting)
{
- NOTE("loop");
+ // NOTE("loop");
loop_work();
}
shutdown_others();
@@ -1157,3 +1166,15 @@
MPIRPC_Block(neighbor->node, "shutdown", NULL);
}
}
+
+char*
+kda_op_tostring(KDA_Operation* op)
+{
+ char* result = malloc(1024);
+ char* p = result;
+
+ p += sprintf(p, "Op{");
+ p += sprintf(p, "id=%i", op->id);
+
+ return result;
+}
Modified: src/kda-2/neighbor.c
===================================================================
--- src/kda-2/neighbor.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/kda-2/neighbor.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -26,37 +26,37 @@
{
NOTE_F;
KDA_Neighbor* result = malloc(sizeof(KDA_Neighbor));
- result->id = other_id;
- result->node = node;
- result->lastseen = time(NULL);
+ KDA_Neighbor_make_id(other_id, node, result);
return result;
}
+void
+KDA_Neighbor_make_id(KDA_ID other_id, MPIRPC_Node node,
+ KDA_Neighbor* output)
+{
+ output->id = other_id;
+ output->node = node;
+ output->lastseen = time(NULL);
+}
+
/**
+ Copy the original into fresh storage
+ */
+KDA_Neighbor*
+KDA_Neighbor_clone(KDA_Neighbor* original)
+{
+ KDA_Neighbor* clone =
+ KDA_Neighbor_create_id(original->id, original->node);
+ clone->lastseen = original->lastseen;
+ return clone;
+}
+
+/**
Scan the data for neighbors.
@return The new neighbor. Output n containing the number of chars
consumed from data by sscanf().
*/
-KDA_Neighbor*
-KDA_Neighbor_create_scan(char* data, int* n)
-{
- if (data == NULL)
- return NULL;
- KDA_ID other_id;
- char junk[20];
- char date[20];
- char time[20];
- int count = sscanf(data, "[%X]@%s %s %s %n", &other_id,
- junk, date, time, n);
- if (count < 2)
- return NULL;
- KDA_Neighbor* nbor = malloc(sizeof(KDA_Neighbor));
- nbor->id = other_id;
- nbor->node.rank = 0;
- nbor->node.comm = MPI_COMM_NULL;
- return nbor;
-}
/**
Neighbor-neighbor id comparison.
@@ -174,7 +174,7 @@
if (neighbor->id == id)
{
- offset = sprintf(buffer, "[%X]@SELF ", id);
+ offset = sprintf(buffer, "[%X]=%i@SELF ", id, mpi_rank);
time_t t = time(NULL);
offset += strftime(buffer+offset, 99, "%F %T",
localtime(&t));
@@ -218,8 +218,9 @@
*/
struct list* KDA_Neighbor_node_list(char* data)
{
+ SHOW_FS(data);
+
struct list* result = list_create();
-
char* end = data+strlen(data);
while (data < end-3)
{
Modified: src/mpi_tools/mpi_tools.c
===================================================================
--- src/mpi_tools/mpi_tools.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/mpi_tools/mpi_tools.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -222,6 +222,7 @@
void
note_s(char* msg, char* s)
{
+ assert(debug_file);
fprintf(debug_file, "[%i] %s%s\n", debug_rank, msg, s);
fflush(debug_file);
}
Modified: src/mpirpc/mpirpc.c
===================================================================
--- src/mpirpc/mpirpc.c 2011-05-02 15:06:20 UTC (rev 200)
+++ src/mpirpc/mpirpc.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -506,15 +506,13 @@
bool
MPIRPC_Check()
{
- struct list_item* item;
bool event = false;
int recvd = false;
MPI_Status status;
- for (item = channels->head;
+ for (struct list_item* item = channels->head;
item; item = item->next)
{
- NOTE("check channel");
MPIRPC_Channel* channel = (MPIRPC_Channel*) item->data;
// char* name = MPIRPC_Comm_get_name(channel->comm);
@@ -535,7 +533,6 @@
MPIRPC_Handle(check_msg, node);
}
}
- sleep(1);
return event;
}
@@ -576,14 +573,13 @@
void
MPIRPC_Flush_returns()
{
- struct list_item* item;
- int i;
- gossip_do(MASK_MPIRPC, NOTE_F);
+ // gossip_do(MASK_MPIRPC, NOTE_F);
MPI_Status status;
- for (item = garbage_values->head; item; item = item->next)
+ for (struct list_item* item = garbage_values->head; item;
+ item = item->next)
{
MPIRPC_Value* value = (MPIRPC_Value*) item->data;
- for (i = 0; i < 3; i++)
+ for (int i = 0; i < 3; i++)
{
MPI_Wait(&value->request[i], &status);
}
Modified: test/adts/test-lru_table01.c
===================================================================
--- test/adts/test-lru_table01.c 2011-05-02 15:06:20 UTC (rev 200)
+++ test/adts/test-lru_table01.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -38,7 +38,7 @@
MPI_Init(&argc, &argv);
whoami();
- struct lru_table* table = lru_table_create(5, 4);
+ struct lru_table* table = lru_table_create(5, 4);
int* one = malloc(sizeof(int));
int* two = malloc(sizeof(int));
Modified: test/cmpi/test-putget.c
===================================================================
--- test/cmpi/test-putget.c 2011-05-02 15:06:20 UTC (rev 200)
+++ test/cmpi/test-putget.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -12,9 +12,6 @@
{
NOTE_F;
- wait_for_notification();
- notify_next();
-
char key1[10];
char key2[10];
@@ -36,9 +33,11 @@
sleep(2);
int length;
- char* result;
- cmpi_get(key1, &result, &length);
- printf("result(%i): %s\n", length, result);
+ void* result;
+ CMPI_RETURN code = cmpi_get(key1, &result, &length);
+ cmpi_assert(code, "get() failed!");
+ printf("result(%i): %s\n", length, (char*) result);
+ free(result);
sleep(3);
Modified: test/cmpi/test-update01.c
===================================================================
--- test/cmpi/test-update01.c 2011-05-02 15:06:20 UTC (rev 200)
+++ test/cmpi/test-update01.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -35,10 +35,10 @@
cmpi_update(key2, "X", 1, 3);
- char* result;
+ void* result;
int length;
cmpi_get(key2, &result, &length);
- printf("result: %s\n", result);
+ printf("result: %s\n", (char*) result);
printf("GET SUCCEEDED\n");
sleep(2);
Modified: test/cmpi/test-update02.c
===================================================================
--- test/cmpi/test-update02.c 2011-05-02 15:06:20 UTC (rev 200)
+++ test/cmpi/test-update02.c 2011-05-02 22:55:48 UTC (rev 201)
@@ -44,16 +44,16 @@
sleep(2);
- char* result;
+ void* result;
int length;
cmpi_get(key1, &result, &length);
- printf("RESULT_1: %s\n", result);
+ printf("RESULT_1: %s\n", (char*) result);
sleep(1);
cmpi_get(key2, &result, &length);
- printf("RESULT_2: %s\n", result);
+ printf("RESULT_2: %s\n", (char*) result);
sleep(1);
cmpi_get(key3, &result, &length);
- printf("RESULT_3: %s\n", result);
+ printf("RESULT_3: %s\n", (char*) result);
NOTE("GET SUCCEEDED");
sleep(2);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|