[C-mpi-commits] SF.net SVN: c-mpi:[2]
Status: Pre-Alpha
Brought to you by:
jmwozniak
|
From: <jmw...@us...> - 2010-04-19 19:54:32
|
Revision: 2
http://c-mpi.svn.sourceforge.net/c-mpi/?rev=2&view=rev
Author: jmwozniak
Date: 2010-04-19 19:54:23 +0000 (Mon, 19 Apr 2010)
Log Message:
-----------
Whitespace cleanups, etc.
Modified Paths:
--------------
Makefile.in
configure.ac
include/gossip.h
include/kda-1.h
include/mpirpc-1.h
setup.sh
src/kda-1/kademlia.c
Modified: Makefile.in
===================================================================
--- Makefile.in 2010-04-19 18:18:16 UTC (rev 1)
+++ Makefile.in 2010-04-19 19:54:23 UTC (rev 2)
@@ -293,7 +293,7 @@
$(E)echo CFLAGS: $(CFLAGS)
$(E)echo IFLAGS: $(IFLAGS)
$(E)echo LIBS: $(LIBS)
- $(E)echo DEPENDS: $(DEPENDS)
+ $(E)echo DEPENDS: $(DEPENDS)
$(E)echo OPENSSL: $(OPENSSL_LOCATION)
# $(E)echo USE_MPIRPC_1: @USE_MPIRPC_1@
# $(E)echo USE_MPIRPC_2: @USE_MPIRPC_2@
@@ -317,16 +317,12 @@
CMPI := lib/libcmpi.a
MPIRPC := lib/libmpirpc.a
CMPI_IO := lib/libcmpi-io.so
-GOBS = lib/libgobs.a
-GACK = lib/libgack.a
-SKULLFS = lib/libskullfs.a
################################################################
# Makefile includes
# this is how we pull build information from all of the project
# subdirectories, make sure to catch top level module.mk as well
DIR :=
-include module.mk
include $(MODULES)
TEST_OBJS += $(patsubst %.c, %.o, $(TEST_SRC))
@@ -377,12 +373,6 @@
mpirpc: $(MPIRPC)
-gobs: $(GOBS)
-
-gack: $(GACK)
-
-skullfs: $(SKULLFS)
-
# Just like dir, but strip the slash off the end, to be pretty.
dirname = $(patsubst %/,%,$(dir $(1)))
Modified: configure.ac
===================================================================
--- configure.ac 2010-04-19 18:18:16 UTC (rev 1)
+++ configure.ac 2010-04-19 19:54:23 UTC (rev 2)
@@ -475,7 +475,6 @@
dnl output final version of top level makefile and subdirectory
dnl makefile includes
AC_CONFIG_FILES(Makefile
-module.mk
src/gossip/module.mk
src/adts/module.mk
src/cmpi/module.mk
Modified: include/gossip.h
===================================================================
--- include/gossip.h 2010-04-19 18:18:16 UTC (rev 1)
+++ include/gossip.h 2010-04-19 19:54:23 UTC (rev 2)
@@ -37,9 +37,7 @@
#define MASK_CMPI ((uint64_t)1 << 6)
#define MASK_ARCH ((uint64_t)1 << 7)
#define MASK_STORAGE ((uint64_t)1 << 8)
-#define MASK_GUM ((uint64_t)1 << 9)
-#define MASK_GOBS ((uint64_t)1 << 10)
-#define MASK_FS ((uint64_t)1 << 11)
+#define MASK_KDA ((uint64_t)1 << 9)
#define MASK_CLIENT ((uint64_t)1 << 12)
/********************************************************************
@@ -171,7 +169,7 @@
#define gossip_do(mask, x) \
if ((gossip_debug_on) && (gossip_debug_mask & mask) && \
(gossip_facility)) \
- { x; }
+ { x; }
#endif /* GOSSIP_DISABLE_DEBUG */
Modified: include/kda-1.h
===================================================================
--- include/kda-1.h 2010-04-19 18:18:16 UTC (rev 1)
+++ include/kda-1.h 2010-04-19 19:54:23 UTC (rev 2)
@@ -2,13 +2,13 @@
#ifndef KADEMLIA_1_H
#define KADEMLIA_1_H
-#include <limits.h>
-#include <signal.h>
-#include <stdbool.h>
+#include <limits.h>
+#include <signal.h>
+#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
-#include <time.h>
-#include <unistd.h>
+#include <time.h>
+#include <unistd.h>
#include <mpi.h>
@@ -16,11 +16,11 @@
#include <itable.h>
#include <list.h>
#include <ilist.h>
-#include <inlist.h>
-#include <mpi_tools.h>
+#include <inlist.h>
+#include <mpi_tools.h>
#include <mpirpc.h>
-#include <cmpi.h>
-// #include <driver.h>
+#include <cmpi.h>
+// #include <driver.h>
#ifdef DISKSIM_LOCATION
#include <disksim.h>
@@ -28,18 +28,18 @@
typedef int KDA_ID;
-#include "kda_types-1.h"
+#include "kda_types-1.h"
#include "kda_neighbor-1.h"
-#define KDA_SPACE_SIZE (sizeof(KDA_ID)*8) // 160 in paper
-#define KDA_HASH_SPACE INT_MAX //
+#define KDA_SPACE_SIZE (sizeof(KDA_ID)*8) // 160 in paper
+#define KDA_HASH_SPACE INT_MAX //
/**
KDA_ID_NULL=0 is an invalid KDA_ID used for semantic purposes.
- make_id must not create this id.
+ make_id must not create this id.
*/
-#define KDA_ID_NULL 0;
+#define KDA_ID_NULL 0;
/**
Obtain the Kademlia distance between two ids.
@@ -48,12 +48,12 @@
/**
Contains neighbors. A bucket is all neighbors of a given
- distance 2^i - 2^i+1
+ distance 2^i - 2^i+1
*/
-typedef struct list* K_BUCKET;
+typedef struct list* K_BUCKET;
-extern int kda_nodes;
-extern struct itable* operations;
+extern int kda_nodes;
+extern struct itable* operations;
void KDA_Init(int alpha_in, int k_in);
@@ -62,14 +62,14 @@
/**
Return once op->status is KDA_STATUS_COMPLETE
*/
-void KDA_Wait(KDA_Operation* op);
+void KDA_Wait(KDA_Operation* op);
//// Local API...
int KDA_Local_rank(KDA_ID other_id);
-KDA_Neighbor* KDA_Local_ID(int other_rank);
+KDA_Neighbor* KDA_Local_ID(int other_rank);
-//// Client API:
+//// Client API:
MPIRPC* KDA_Translate(int rank, KDA_ID id);
MPIRPC* KDA_Translate_k(int rank, KDA_ID id);
MPIRPC* KDA_Lookup(int rank, char* key);
@@ -79,10 +79,10 @@
MPIRPC* KDA_Update(int rank, char* key, char* value, int length, int offset);
//// In-DHT API...
-void KDA_Ping(KDA_ID other_id);
-void KDA_Announce(void);
+void KDA_Ping(KDA_ID other_id);
+void KDA_Announce(void);
struct ilist* KDA_Find_key(char* key);
-struct ilist* KDA_Find(KDA_ID object_id);
+struct ilist* KDA_Find(KDA_ID object_id);
struct ilist* KDA_Closest(KDA_ID object_id);
//// Helpers...
@@ -90,56 +90,56 @@
void (*proceed)(MPIRPC* rpc),
KDA_Query* query);
bool KDA_Find_node(KDA_Operation* op);
-bool KDA_Find_node_again(KDA_Operation* op);
+bool KDA_Find_node_again(KDA_Operation* op);
void KDA_Operation_free(KDA_Operation* op);
-char* KDA_Description(void);
-char* KDA_id_tostring(KDA_ID other_id);
+char* KDA_Description(void);
+char* KDA_id_tostring(KDA_ID other_id);
KDA_ID make_id(void);
-void join(int other_rank);
+void join(int other_rank);
char* id_tostring(void);
-void listen_loop(void);
+void listen_loop(void);
void bootstrap(int other_rank);
void bootjoin(int other_id, int other_rank);
void rpc_bootjoin(int other_rank);
-void bootping(void);
+void bootping(void);
void dump_buckets(void);
-char* bucket_usage(void);
+char* bucket_usage(void);
char* buckets_tostring(void);
-bool add_neighbor(KDA_Neighbor* neighbor);
+bool add_neighbor(KDA_Neighbor* neighbor);
//// Handlers...
-void handle_announce(int caller, int unique, char* args);
+void handle_announce(int caller, int unique, char* args);
void handle_info(int caller, int unique, char* args);
void handle_debug_tables(int caller, int unique, char* args);
-void handle_bucket_usage(int caller, int unique, char* args);
+void handle_bucket_usage(int caller, int unique, char* args);
void handle_bootjoin(int caller, int unique, char* args);
void handle_bootping(int caller, int unique, char* args);
-void handle_update(int caller, int unique, char* args);
-void handle_find_node(int caller, int unique, char* args);
+void handle_update(int caller, int unique, char* args);
+void handle_find_node(int caller, int unique, char* args);
void handle_ping(int caller, int unique, char* args);
void handle_query_id(int caller, int unique, char* args);
void handle_query_id_k(int caller, int unique, char* args);
void handle_store(int caller, int unique, char* args);
void handle_retrieve(int caller, int unique, char* args);
-void handle_quit(int caller, int unique, char* args);
+void handle_quit(int caller, int unique, char* args);
//// Synchronous methods...
-struct ilist* find_node(KDA_ID id);
+struct ilist* find_node(KDA_ID id);
-//// Asynchronous methods and return services...
+//// Asynchronous methods and return services...
void query_id(int caller, int unique, KDA_ID id);
-void return_query_id(KDA_Operation* op);
+void return_query_id(KDA_Operation* op);
void query_id_k(int caller, int unique, KDA_ID id);
-void return_query_id_k(KDA_Operation* op);
+void return_query_id_k(KDA_Operation* op);
//// Proceeds...
-void proceed_bootjoin(MPIRPC* rpc);
+void proceed_bootjoin(MPIRPC* rpc);
void proceed_find(MPIRPC* rpc);
-void proceed_ping(MPIRPC* rpc);
+void proceed_ping(MPIRPC* rpc);
-#endif
+#endif
Modified: include/mpirpc-1.h
===================================================================
--- include/mpirpc-1.h 2010-04-19 18:18:16 UTC (rev 1)
+++ include/mpirpc-1.h 2010-04-19 19:54:23 UTC (rev 2)
@@ -2,8 +2,6 @@
#ifndef MPIRPC_1_H
#define MPIRPC_1_H
-#warning
-
#define _XOPEN_SOURCE 500
#include <assert.h>
Modified: setup.sh
===================================================================
--- setup.sh 2010-04-19 18:18:16 UTC (rev 1)
+++ setup.sh 2010-04-19 19:54:23 UTC (rev 2)
@@ -1,6 +1,6 @@
#!/bin/sh
-rm -fv config.cache
+rm -fv config.cache
mkdir -p maint/config
Modified: src/kda-1/kademlia.c
===================================================================
--- src/kda-1/kademlia.c 2010-04-19 18:18:16 UTC (rev 1)
+++ src/kda-1/kademlia.c 2010-04-19 19:54:23 UTC (rev 2)
@@ -2,56 +2,56 @@
#include "kda-1.h"
#ifdef DISKSIM_LOCATION
-#endif
+#endif
/**
My kademlia ID.
- Normally represented in hex.
-*/
+ Normally represented in hex.
+*/
KDA_ID id;
/**
- The neighbor table.
+ The neighbor table.
*/
-K_BUCKET k_bucket[KDA_SPACE_SIZE];
+K_BUCKET k_bucket[KDA_SPACE_SIZE];
/**
- Outstanding KDA_Operations.
+ Outstanding KDA_Operations.
*/
-struct itable* operations;
+struct itable* operations;
/**
- Kademlia parameter alpha.
+ Kademlia parameter alpha.
*/
int alpha;
/**
Kademlia parameter k.
*/
-int k;
+int k;
/**
- Number of nodes in boot-up time original DHT.
+ Number of nodes in boot-up time original DHT.
*/
-int kda_nodes;
+int kda_nodes;
/**
ID for special sequential global operations such as
- debug dumps and shutdowns.
+ debug dumps and shutdowns.
*/
-int last_global_opid = 0;
+int last_global_opid = 0;
/**
Number of times buckets have been dumped.
- Useful for debugging only.
-*/
-int dumps = 0;
+ Useful for debugging only.
+*/
+int dumps = 0;
/**
- Self. Useful in KDA_Closest().
-*/
-KDA_Neighbor self;
-
+ Self. Useful in KDA_Closest().
+*/
+KDA_Neighbor self;
+
/**
Setup Kademlia.
Must be called after API_Init() and whoami().
@@ -59,26 +59,26 @@
void
KDA_Init(int alpha_in, int k_in)
{
- gossip_do(MASK_KDA, NOTE_F);
-
+ gossip_do(MASK_KDA, NOTE_F);
+
int i;
alpha = alpha_in;
k = k_in;
-
- id = make_id();
+
+ id = make_id();
for (i = 0; i < KDA_SPACE_SIZE; i++)
k_bucket[i] = list_create();
- operations = itable_create(CALL_TABLE_SIZE);
+ operations = itable_create(CALL_TABLE_SIZE);
gossip_do(MASK_KDA, NOTE_XI("ID: ", id, id));
- neighbor_make(id, mpi_rank, &self);
-
+ neighbor_make(id, mpi_rank, &self);
+
MPIRPC_Register("info", handle_info);
MPIRPC_Register("debug_tables", handle_debug_tables);
- MPIRPC_Register("debug_table_usage", handle_bucket_usage);
+ MPIRPC_Register("debug_table_usage", handle_bucket_usage);
MPIRPC_Register("bootjoin", handle_bootjoin);
- MPIRPC_Register("bootping", handle_bootping);
+ MPIRPC_Register("bootping", handle_bootping);
MPIRPC_Register("ping", handle_ping);
MPIRPC_Register("update", handle_update);
MPIRPC_Register("find_node", handle_find_node);
@@ -88,23 +88,23 @@
MPIRPC_Register("retrieve", handle_retrieve);
MPIRPC_Register("quit", handle_quit);
- // Initialize disksim emulator:
+ // Initialize disksim emulator:
DISKSIM(disksim_init());
if (mpi_rank == 0)
{
// NOTE("bootstrap...");
- bootstrap(1);
- listen_loop();
+ bootstrap(1);
+ listen_loop();
}
- else
+ else
{
listen_loop();
}
}
/**
- Perform a local search of the neighbor table for the id.
+ Perform a local search of the neighbor table for the id.
@return The MPI rank of the given Kademlia id or -1 if not found.
*/
int
@@ -114,25 +114,25 @@
int i = ilog2(d);
struct list_item* item;
- // printf("rank : %i \n", other_id);
- // printf("i: %i \n", i);
-
+ // printf("rank : %i \n", other_id);
+ // printf("i: %i \n", i);
+
for (item = k_bucket[i]->head;
item->next; item = item->next)
{
if (((KDA_Neighbor*) item->data)->id == other_id)
- return ((KDA_Neighbor*) item->data)->rank;
+ return ((KDA_Neighbor*) item->data)->rank;
}
-
- return -1;
+
+ return -1;
}
/**
- Perform a local search of the neighbor table for the MPI rank.
- @return The Kademlia neighbor with the given rank.
+ Perform a local search of the neighbor table for the MPI rank.
+ @return The Kademlia neighbor with the given rank.
*/
KDA_Neighbor*
-KDA_Local_neighbor(int other_rank)
+KDA_Local_neighbor(int other_rank)
{
int i;
struct list_item* item;
@@ -143,22 +143,22 @@
{
KDA_Neighbor* n = (KDA_Neighbor*) item->data;
if (n->rank == other_rank)
- return n;
+ return n;
}
}
- return NULL;
+ return NULL;
}
/**
Use DHT to synchronously find the given key.
*/
-struct ilist*
+struct ilist*
KDA_Find_key(char* key)
{
NOTE_F;
-
+
int id = hash_string(key, KDA_HASH_SPACE);
- return KDA_Find(id);
+ return KDA_Find(id);
}
/**
@@ -167,25 +167,25 @@
struct ilist*
KDA_Find(KDA_ID id)
{
- KDA_Operation* op =
+ KDA_Operation* op =
KDA_Operate(id, NULL, NULL, NULL, NULL);
- KDA_Wait(op);
- return op->k_closest;
+ KDA_Wait(op);
+ return op->k_closest;
}
void
KDA_Wait(KDA_Operation* op)
{
- MPIRPC_Snooze_reset();
+ MPIRPC_Snooze_reset();
while (op->status != KDA_STATUS_COMPLETE)
{
if (MPIRPC_Check())
MPIRPC_Snooze_reset();
else
- MPIRPC_Snooze();
+ MPIRPC_Snooze();
}
}
-
+
/**
ordered local closest node lookup
@return k neighbors in a list
@@ -193,11 +193,11 @@
struct ilist*
KDA_Closest(KDA_ID object_id)
{
- gossip_do(MASK_KDA, NOTE_FX(object_id));
+ gossip_do(MASK_KDA, NOTE_FX(object_id));
struct ilist* result = ilist_create();
int d;
- d = XOR(id, object_id);
- ilist_add(result, d, neighbor_clone(&self));
+ d = XOR(id, object_id);
+ ilist_add(result, d, neighbor_clone(&self));
int i;
struct list_item* item;
for (i = 0; i < KDA_SPACE_SIZE; i++)
@@ -208,38 +208,38 @@
d = XOR(neighbor->id, object_id);
if (result->size < k)
{
- // DEBUG(ilist_fdumpkeys(debug_file, result));
+ // DEBUG(ilist_fdumpkeys(debug_file, result));
ilist_ordered_insert(result, d, neighbor_clone(neighbor));
- // DEBUG(ilist_fdumpkeys(debug_file, result));
- // DEBUG(char* s = ilist_serialize_ptrs(result);NOTE(s);free(s););
+ // DEBUG(ilist_fdumpkeys(debug_file, result));
+ // DEBUG(char* s = ilist_serialize_ptrs(result);NOTE(s);free(s););
}
else if (d < result->tail->key)
{
- KDA_Neighbor* victim = ilist_pop(result);
- // NOTE_I("POPPED: ", victim->rank);
+ KDA_Neighbor* victim = ilist_pop(result);
+ // NOTE_I("POPPED: ", victim->rank);
ilist_ordered_insert(result, d, neighbor_clone(neighbor));
- // SHOW_P(victim);
- free(victim);
+ // SHOW_P(victim);
+ free(victim);
}
}
}
- // DEBUG(char* s = ilist_serialize_ptrs(result);NOTE(s);free(s););
- return result;
+ // DEBUG(char* s = ilist_serialize_ptrs(result);NOTE(s);free(s););
+ return result;
}
/**
- Query node at rank for key location.
+ Query node at rank for key location.
*/
MPIRPC*
KDA_Lookup(int rank, char* key)
{
- gossip_do(MASK_KDA, NOTE_F);
-
+ gossip_do(MASK_KDA, NOTE_F);
+
KDA_ID object_id = hash_string(key, KDA_HASH_SPACE);
- // printf("id: %i \n", object_id);
-
- return KDA_Translate(rank, object_id);
+ // printf("id: %i \n", object_id);
+
+ return KDA_Translate(rank, object_id);
}
/**
@@ -248,11 +248,11 @@
MPIRPC*
KDA_Lookup_k(int rank, char* key)
{
- KDA_ID object_id = SHA1_mod(key);
- gossip_do(MASK_KDA, SHOW_FIS_X(rank, key, object_id));
- // NOTE_SXI("TRANSLATE: ", key, object_id, object_id);
-
- return KDA_Translate_k(rank, object_id);
+ KDA_ID object_id = SHA1_mod(key);
+ gossip_do(MASK_KDA, SHOW_FIS_X(rank, key, object_id));
+ // NOTE_SXI("TRANSLATE: ", key, object_id, object_id);
+
+ return KDA_Translate_k(rank, object_id);
}
/**
@@ -276,11 +276,11 @@
// NOTE_F;
#ifdef ENABLE_TEST01
- if (cmpi_status == CMPI_STATUS_READY)
+ if (cmpi_status == CMPI_STATUS_READY)
printf("[%i] MPIRPC_Call(query_id_k)\n", debug_rank);
#endif
-
+
MPIRPC* rpc =
MPIRPC_Call(rank, heap("query_id_k"), xheap(id),
NULL, MPIRPC_PROCEED_NULL);
@@ -289,32 +289,32 @@
/**
Instruct a node to store a value.
- Asynchronous. Copies key and value into MPIRPC.
+ Asynchronous. Copies key and value into MPIRPC.
*/
MPIRPC*
KDA_Store(int rank, char* key, char* value)
{
gossip_do(MASK_KDA, NOTE_F);
- int klength = strlen(key);
- int vlength = strlen(value);
+ int klength = strlen(key);
+ int vlength = strlen(value);
char* args = malloc((klength+vlength+5)*sizeof(char));
- assert(args);
+ assert(args);
strcpy(args, key);
- args[klength] = ' ';
+ args[klength] = ' ';
strcpy(args+klength+1, value);
#ifdef ENABLE_TEST01
- if (cmpi_status == CMPI_STATUS_READY)
+ if (cmpi_status == CMPI_STATUS_READY)
printf("[%i] MPIRPC_Call(store)\n", debug_rank);
#endif
-
+
MPIRPC* rpc = MPIRPC_Call(rank, heap("store"), args,
NULL, MPIRPC_PROCEED_NULL);
- return rpc;
+ return rpc;
}
/**
- Instruct a node to retrieve a value.
+ Instruct a node to retrieve a value.
*/
MPIRPC*
KDA_Retrieve(int rank, char* key)
@@ -322,38 +322,38 @@
MPIRPC* rpc =
MPIRPC_Call(rank, heap("retrieve"), heap(key),
NULL, MPIRPC_PROCEED_NULL);
- return rpc;
+ return rpc;
}
/**
Instruct a node to update a value.
- Asynchronous. Copies key and value into MPIRPC.
+ Asynchronous. Copies key and value into MPIRPC.
*/
MPIRPC*
-KDA_Update(int rank, char* key, char* value, int length)
+KDA_Update(int rank, char* key, char* value, int length, int offset)
{
gossip_do(MASK_KDA, NOTE_F);
- int klength = strlen(key);
- int vlength = strlen(value);
+ int klength = strlen(key);
+ int vlength = strlen(value);
char* args = malloc((klength+vlength+15)*sizeof(char));
- assert(args);
+ assert(args);
sprintf(args, "%s %i %s", key, length, value);
MPIRPC* rpc = MPIRPC_Call(rank, heap("update"), args,
NULL, MPIRPC_PROCEED_NULL);
- return rpc;
+ return rpc;
}
KDA_Query*
query_create(int caller, int unique,
- void (*service)(KDA_Operation* op))
+ void (*service)(KDA_Operation* op))
{
- // NOTE_F;
+ // NOTE_F;
KDA_Query* query = malloc(sizeof(KDA_Query));
query->caller = caller;
query->unique = unique;
- query->service = service;
- return query;
+ query->service = service;
+ return query;
}
/**
@@ -362,27 +362,27 @@
void
handle_info(int caller, int unique, char* args)
{
- gossip_do(MASK_KDA, NOTE_F);
+ gossip_do(MASK_KDA, NOTE_F);
char* result = buckets_tostring();
- MPIRPC_Return(caller, unique, result);
+ MPIRPC_Return(caller, unique, result);
}
void
handle_store(int caller, int unique, char* args)
{
#ifdef ENABLE_TEST03
- SHOW_FS(args);
+ SHOW_FS(args);
#else
char* key = malloc(CMPI_KEY_LENGTH * sizeof(char));
char* value = malloc(CMPI_VALUE_LENGTH * sizeof(char));
- sscanf(args, "%s %s", key, value);
+ sscanf(args, "%s %s", key, value);
gossip_do(MASK_KDA, SHOW_FSS(key, value));
cmpi_cached_store(key, value);
#endif
- MPIRPC_Return(caller, unique, NULL);
+ MPIRPC_Return(caller, unique, NULL);
}
void
@@ -390,26 +390,26 @@
{
gossip_do(MASK_KDA, NOTE_FS(args));
- char* data;
- cmpi_cached_retrieve(args, &data);
- // SHOW_S(data);
- MPIRPC_Return(caller, unique, heap(data));
+ char* data;
+ cmpi_cached_retrieve(args, &data);
+ // SHOW_S(data);
+ MPIRPC_Return(caller, unique, heap(data));
}
void
handle_update(int caller, int unique, char* args)
{
- gossip_do(MASK_KDA, SHOW_FS(args));
+ gossip_do(MASK_KDA, SHOW_FS(args));
char* key = malloc(CMPI_KEY_LENGTH * sizeof(char));
char* value = malloc(CMPI_VALUE_LENGTH * sizeof(char));
- int offset;
+ int offset;
- sscanf(args, "%s %i %s", key, &offset, value);
+ sscanf(args, "%s %i %s", key, &offset, value);
gossip_do(MASK_KDA, SHOW_FSSI(key, value, offset));
cmpi_cached_update(key, value, offset);
- MPIRPC_Return(caller, unique, NULL);
+ MPIRPC_Return(caller, unique, NULL);
}
void
@@ -418,17 +418,17 @@
gossip_do(MASK_KDA, NOTE_F);
/*
- // We can rebroadcast the message.
- int opid;
+ // We can rebroadcast the message.
+ int opid;
sscanf(args, "%i", &opid);
if (opid == last_global_opid)
{
NOTE("Already processed...");
MPIRPC_Return(caller, unique, NULL);
- return;
+ return;
}
- last_global_opid = opid;
+ last_global_opid = opid;
int i;
struct list_item* item;
@@ -443,57 +443,57 @@
}
}
*/
-
+
dump_buckets();
- MPIRPC_Return(caller, unique, NULL);
+ MPIRPC_Return(caller, unique, NULL);
}
void
handle_bucket_usage(int caller, int unique, char* args)
{
gossip_do(MASK_KDA, NOTE_F);
- dumps++;
+ dumps++;
char* msg = bucket_usage();
- printf(msg);
+ printf(msg);
free(msg);
- MPIRPC_Return(caller, unique, NULL);
+ MPIRPC_Return(caller, unique, NULL);
}
void
handle_quit(int caller, int unique, char* args)
{
- int i;
-
+ int i;
+
gossip_do(MASK_KDA, NOTE_F);
KDA_Neighbor* n_caller = KDA_Local_neighbor(caller);
if (n_caller != NULL)
- n_caller->quitting = true;
-
+ n_caller->quitting = true;
+
if (cmpi_quitting)
{
NOTE("Already quitting...");
- MPIRPC_Return(caller, unique, NULL);
- return;
+ MPIRPC_Return(caller, unique, NULL);
+ return;
}
cmpi_quitting = true;
-
+
MPIRPC_Return(caller, unique, NULL);
// NOTE_I("cmpi_cache: ", cmpi_cache->size);
itable_destroy(operations);
- // NOTE("DESTROY_BUCKETS");
+ // NOTE("DESTROY_BUCKETS");
for (i = 0; i < KDA_SPACE_SIZE; i++)
{
- // SHOW_I(i);
+ // SHOW_I(i);
// list_destroy(k_bucket[i]);
}
}
void
-rpc_find_node(int other_rank, KDA_Operation* op)
+rpc_find_node(int other_rank, KDA_Operation* op)
{
#ifdef ENABLE_TEST01
if (cmpi_status == CMPI_STATUS_READY)
@@ -502,8 +502,8 @@
fflush(stdout);
}
#endif
-
- MPIRPC_Call(other_rank, heap("find_node"), xheap(op->object_id),
+
+ MPIRPC_Call(other_rank, heap("find_node"), xheap(op->object_id),
op, proceed_find);
}
@@ -513,17 +513,17 @@
int object_id;
struct ilist_item* item;
int count = sscanf(args, "%X", &object_id);
- assert(count == 1);
+ assert(count == 1);
struct ilist* known = KDA_Closest(object_id);
char* result = malloc(1000*sizeof(char));
char* s = result;
- s += sprintf(s, "%i ", known->size);
+ s += sprintf(s, "%i ", known->size);
for (item = known->head;
item; item = item->next)
s += sprintf(s, "%s", neighbor_name(item->data));
- ilist_destroy(known);
- MPIRPC_Return(caller, unique, result);
+ ilist_destroy(known);
+ MPIRPC_Return(caller, unique, result);
}
void
@@ -534,18 +534,18 @@
query_id(caller, unique, object_id);
}
-void
+void
query_id(int caller, int unique, KDA_ID object_id)
{
- KDA_Operate(object_id, NULL, NULL, NULL,
- query_create(caller, unique, return_query_id));
+ KDA_Operate(object_id, NULL, NULL, NULL,
+ query_create(caller, unique, return_query_id));
}
void
return_query_id(KDA_Operation* op)
{
KDA_Neighbor* n = (KDA_Neighbor*) ilist_poll(op->k_closest);
- char* result = iheap(n->rank);
+ char* result = iheap(n->rank);
MPIRPC_Return(op->query->caller, op->query->unique, result);
// free op
}
@@ -558,33 +558,33 @@
query_id_k(caller, unique, object_id);
}
-void
+void
query_id_k(int caller, int unique, KDA_ID object_id)
{
- KDA_Operate(object_id, NULL, NULL, NULL,
- query_create(caller, unique, return_query_id_k));
+ KDA_Operate(object_id, NULL, NULL, NULL,
+ query_create(caller, unique, return_query_id_k));
}
void
return_query_id_k(KDA_Operation* op)
{
char* result = malloc(2048*sizeof(char));
- char* p = result;
+ char* p = result;
while (op->k_closest->size > 0)
{
KDA_Neighbor* neighbor =
(KDA_Neighbor*) ilist_poll(op->k_closest);
p += neighbor_name_sprint(p, neighbor);
- free(neighbor);
+ free(neighbor);
}
MPIRPC_Return(op->query->caller, op->query->unique, result);
- KDA_Operation_free(op);
+ KDA_Operation_free(op);
}
/**
The KDA_Operation has completed its k_closest list.
Now issue the RPC on the target nodes or
- respond to the client query.
+ respond to the client query.
*/
void
KDA_Map(KDA_Operation* op)
@@ -600,11 +600,11 @@
MPIRPC_Call(n->rank, heap(op->name), heap(op->args),
op, op->proceed);
}
- op->returned = inlist_create();
+ op->returned = inlist_create();
}
else if (op->query)
{
- op->returned = NULL;
+ op->returned = NULL;
op->query->service(op);
}
}
@@ -612,44 +612,44 @@
void
proceed_find(MPIRPC* rpc)
{
- gossip_do(MASK_KDA, NOTE_F);
-
- KDA_Operation* op = (KDA_Operation*) rpc->extras;
+ gossip_do(MASK_KDA, NOTE_F);
- // NOTE(neighbor_ranks_tostring(op->k_closest));
-
+ KDA_Operation* op = (KDA_Operation*) rpc->extras;
+
+ // NOTE(neighbor_ranks_tostring(op->k_closest));
+
if (rpc->status == MPIRPC_STATUS_FAULT)
{
- NOTE_FS("FAULT!");
+ NOTE_FS("FAULT!");
}
else
{
int size;
- char* p = rpc->result;
+ char* p = rpc->result;
sscanf(rpc->result, "%i ", &size);
p = index(rpc->result, ' ')+1;
int i;
for (i = 0; i < size; i++)
{
- int other_id;
+ int other_id;
int other_rank;
int count = sscanf(p, "[%X]@%i ", &other_id, &other_rank);
- assert(count == 2);
+ assert(count == 2);
p = index(p, ' ')+1;
KDA_Neighbor neighbor;
neighbor_make(other_id, other_rank, &neighbor);
- add_neighbor(&neighbor);
+ add_neighbor(&neighbor);
int d = XOR(other_id, op->object_id);
if (d < op->k_closest->tail->key ||
op->k_closest->size < k)
{
// NOTE_S("data: ", neighbor_name(&neighbor));
// DEBUG(ilist_fdump(debug_file, neighbor_name, op->k_closest));
- // DEBUG(char* s = ilist_serialize_ptrs(op->k_closest);NOTE(s);free(s););
+ // DEBUG(char* s = ilist_serialize_ptrs(op->k_closest);NOTE(s);free(s););
if (! ilist_matches(op->k_closest, neighbor_cmp, &neighbor))
{
KDA_Neighbor* clone = neighbor_clone(&neighbor);
- ilist_ordered_insert(op->k_closest, d, clone);
+ ilist_ordered_insert(op->k_closest, d, clone);
op->improved = true;
if (op->k_closest->size > k)
{
@@ -659,29 +659,29 @@
}
}
}
-
+
// Check for convergence:
inlist_remove(op->outstanding, rpc->target);
-
+
if (rpc->result != NULL &&
rpc->result != MPIRPC_RESULT_FAULT)
{
- // NOTE("Free result.");
+ // NOTE("Free result.");
// free(rpc->result);
}
- MPIRPC_Free(rpc);
-
+ MPIRPC_Free(rpc);
+
if (op->outstanding->size == 0)
{
if (op->improved)
{
- op->improved = false;
+ op->improved = false;
if (! KDA_Find_node_again(op))
- KDA_Map(op);
+ KDA_Map(op);
}
else
{
- KDA_Map(op);
+ KDA_Map(op);
}
}
}
@@ -694,8 +694,8 @@
{
if (op->k_closest == NULL)
op->k_closest = KDA_Closest(op->object_id);
- // printf("local closest: \n");
- // ilist_output(neighbor_tostring, op->k_closest);
+ // printf("local closest: \n");
+ // ilist_output(neighbor_tostring, op->k_closest);
return KDA_Find_node_again(op);
}
@@ -703,69 +703,69 @@
bool
KDA_Find_node_again(KDA_Operation* op)
{
- // NOTE_S("ranks: ", neighbor_ranks_tostring(op->k_closest));
-
+ // NOTE_S("ranks: ", neighbor_ranks_tostring(op->k_closest));
+
struct ilist_item* item;
- // True iff we actually issued another RPC:
- bool contacted = false;
+ // True iff we actually issued another RPC:
+ bool contacted = false;
- int i = 0;
+ int i = 0;
for (item = op->k_closest->head;
i < alpha && item; item = item->next)
{
KDA_Neighbor* n = (KDA_Neighbor*) item->data;
if (n->rank == mpi_rank)
- continue;
+ continue;
if (! inlist_contains(op->contacted, n->rank))
{
inlist_add(op->contacted, n->rank);
inlist_add(op->outstanding, n->rank);
rpc_find_node(n->rank, op);
- contacted = true;
- i++;
+ contacted = true;
+ i++;
}
}
- return contacted;
+ return contacted;
}
/**
Join DHT using rank other_rank as known member.
*/
-bool
+bool
KDA_Join(int other_rank)
{
// join(other_rank, other_id);
-
- return true;
+
+ return true;
}
/**
Locate the object_id and call its host method and args.
- Copies name but not args onto the heap.
+ Copies name but not args onto the heap.
*/
-KDA_Operation*
+KDA_Operation*
KDA_Operate(KDA_ID object_id, char* name, void* args,
void (*proceed)(MPIRPC* rpc), KDA_Query* query)
{
- gossip_do(MASK_KDA, NOTE_F);
+ gossip_do(MASK_KDA, NOTE_F);
KDA_Operation* op = malloc(sizeof(KDA_Operation));
op->object_id = object_id;
- op->unique = ++unique;
+ op->unique = ++unique;
op->name = heap(name);
op->args = args;
- op->k_closest = NULL;
+ op->k_closest = NULL;
op->contacted = inlist_create();
- op->outstanding = inlist_create();
- op->improved = false;
- op->status = KDA_STATUS_SEARCHING;
- op->proceed = proceed;
- op->query = query;
-
+ op->outstanding = inlist_create();
+ op->improved = false;
+ op->status = KDA_STATUS_SEARCHING;
+ op->proceed = proceed;
+ op->query = query;
+
itable_add(operations, op->unique, op);
-
+
KDA_Find_node(op);
- return op;
+ return op;
}
void
@@ -775,23 +775,23 @@
// struct ilist_item* item;
// for (item = op->k_closest->head; item; item = item->next)
- // NOTE(neighbor_tostring(item->data));
-
- ilist_destroy(op->k_closest);
+ // NOTE(neighbor_tostring(item->data));
+ ilist_destroy(op->k_closest);
+
inlist_free(op->contacted);
- // NOTE("freeing op->outstanding");
+ // NOTE("freeing op->outstanding");
inlist_free(op->outstanding);
- // NOTE("freeing op->returned");
- if (op->returned)
+ // NOTE("freeing op->returned");
+ if (op->returned)
inlist_free(op->returned);
// NOTE("freeing op->query");
if (op->query)
- free(op->query);
-
+ free(op->query);
+
free(op->name);
- // if (op->args)
+ // if (op->args)
// free(op->args);
itable_remove(operations, op->unique);
@@ -802,13 +802,13 @@
void
KDA_Ping(KDA_ID other_id)
{
- KDA_Operate(other_id, heap("ping"), NULL, proceed_ping, NULL);
+ KDA_Operate(other_id, heap("ping"), NULL, proceed_ping, NULL);
}
void
proceed_ping(MPIRPC* rpc)
{
- KDA_Operation* op = (KDA_Operation*) rpc->extras;
+ KDA_Operation* op = (KDA_Operation*) rpc->extras;
inlist_add(op->returned, rpc->target);
// TODO: Deallocate op and rpc
@@ -821,7 +821,7 @@
{
cmpi_status = CMPI_STATUS_READY;
printf("[%i] %s\n", debug_rank, timestring("READY", NULL));
- fflush(stdout);
+ fflush(stdout);
KDA_Announce();
if (mpi_rank < kda_nodes-1)
rpc_bootjoin(mpi_rank+1);
@@ -831,20 +831,20 @@
{
// Notification hack...
printf("[%i] NOTIFICATION\n", debug_rank);
- fflush(stdout);
- int msg = -2;
+ fflush(stdout);
+ int msg = -2;
MPI_Send(&msg, 1, MPI_INT,
kda_nodes, 0, MPI_COMM_WORLD);
}
else
{
printf("[%i] NO NOTIFICATION\n", debug_rank);
- fflush(stdout);
+ fflush(stdout);
}
}
}
}
- MPIRPC_Free(rpc);
+ MPIRPC_Free(rpc);
}
void
@@ -852,8 +852,8 @@
{
gossip_do(MASK_KDA, NOTE_F);
- // NOTE_S("", buckets_tostring());
-
+ // NOTE_S("", buckets_tostring());
+
int i;
struct list_item* item;
for (i = 0; i < KDA_SPACE_SIZE; i++)
@@ -861,8 +861,8 @@
for (item = k_bucket[i]->head;
item; item = item->next)
{
- KDA_Neighbor* n = (KDA_Neighbor*) item->data;
- MPIRPC_Block(n->rank, heap("announce"), xheap(id));
+ KDA_Neighbor* n = (KDA_Neighbor*) item->data;
+ MPIRPC_Block(n->rank, heap("announce"), xheap(id));
}
}
}
@@ -871,31 +871,31 @@
is_neighbor(KDA_Neighbor* n)
{
// NOTE_F;
- int d = XOR(n->id, id);
+ int d = XOR(n->id, id);
int i = ilog2(d);
- // SHOW_I(i);
- return list_contains(k_bucket[i], neighbor_cmp, n);
+ // SHOW_I(i);
+ return list_contains(k_bucket[i], neighbor_cmp, n);
}
/**
- Allow a caller to insert self in local neighbor table.
+ Allow a caller to insert self in local neighbor table.
*/
void
handle_announce(int caller, int unique, char* args)
{
- gossip_do(MASK_KDA, NOTE_F);
+ gossip_do(MASK_KDA, NOTE_F);
int other_id;
sscanf(args, "%X", &other_id );
KDA_Neighbor neighbor;
- neighbor_make(other_id, caller, &neighbor);
+ neighbor_make(other_id, caller, &neighbor);
add_neighbor(&neighbor);
// DEBUG(char* table = buckets_tostring();NOTE(table);free(table););
-
- MPIRPC_Return(caller, unique, NULL);
+
+ MPIRPC_Return(caller, unique, NULL);
}
-
+
/**
Manufacture a random ID.
*/
@@ -903,17 +903,17 @@
make_id()
{
srand(time(NULL)+mpi_rank*30);
- // srand(mpi_rank*30);
- return rand();
+ // srand(mpi_rank*30);
+ return rand();
}
bool
k_bucket_insert(int i, KDA_Neighbor* neighbor)
{
- bool result = false;
+ bool result = false;
- // list_dump("%i", k_bucket[i]);
-
+ // list_dump("%i", k_bucket[i]);
+
if (! list_contains(k_bucket[i], neighbor_cmp, neighbor))
{
if (k_bucket[i]->size >= k)
@@ -923,46 +923,46 @@
list_remove(k_bucket[i], victim);
free(victim);
}
- list_ordered_insert(k_bucket[i],
+ list_ordered_insert(k_bucket[i],
neighbor_time_cmp, (void*) neighbor);
- result = true;
+ result = true;
}
else
{
// NOTE_I("Already have neighbor! in: ", i);
}
- // list_dump("%i", k_bucket[i]);
- return result;
+ // list_dump("%i", k_bucket[i]);
+ return result;
}
/**
- @return True iff we cloned and added the neighbor to a k_bucket.
+ @return True iff we cloned and added the neighbor to a k_bucket.
*/
-bool
-add_neighbor(KDA_Neighbor* neighbor)
+bool
+add_neighbor(KDA_Neighbor* neighbor)
{
- bool result = false;
- // SHOW_FXI(neighbor->id, neighbor->rank);
+ bool result = false;
+ // SHOW_FXI(neighbor->id, neighbor->rank);
int d = XOR(id, neighbor->id);
int i = ilog2(d);
- // SHOW_I(i);
+ // SHOW_I(i);
if (neighbor->rank == mpi_rank)
{
- // NOTE("Will not add self!");
+ // NOTE("Will not add self!");
}
else if (is_neighbor(neighbor))
{
- // NOTE_I("Already have neighbor: ", neighbor->rank);
+ // NOTE_I("Already have neighbor: ", neighbor->rank);
}
else
{
- KDA_Neighbor* entry = neighbor_clone(neighbor);
+ KDA_Neighbor* entry = neighbor_clone(neighbor);
k_bucket_insert(i, entry);
- result = true;
+ result = true;
}
- // SHOW_FXI_B(neighbor->id, neighbor->rank, result);
- return neighbor;
+ // SHOW_FXI_B(neighbor->id, neighbor->rank, result);
+ return neighbor;
}
void
@@ -974,26 +974,26 @@
void
rpc_bootjoin(int other_rank)
{
- MPIRPC_Call(other_rank, heap("bootjoin"), id_tostring(),
+ MPIRPC_Call(other_rank, heap("bootjoin"), id_tostring(),
NULL, proceed_bootjoin);
}
/**
We may now join the network.
- Return our id to the caller.
+ Return our id to the caller.
*/
void
handle_bootjoin(int caller, int unique, char* args)
{
- // NOTE_FS(args);
+ // NOTE_FS(args);
int other_id;
sscanf(args, "%X", &other_id );
-
+
char* result = malloc(50*sizeof(char));
sprintf(result, "%X", id);
MPIRPC_Return(caller, unique, result);
- MPIRPC_Flush_returns();
+ MPIRPC_Flush_returns();
bootjoin(other_id, caller);
}
@@ -1005,15 +1005,15 @@
{
KDA_Neighbor neighbor;
neighbor_make(other_id, other_rank, &neighbor);
- add_neighbor(&neighbor);
+ add_neighbor(&neighbor);
if (mpi_rank == 0)
{
- bootping();
+ bootping();
}
else if (mpi_rank < kda_nodes-1)
{
- KDA_Ping(id);
+ KDA_Ping(id);
}
else if (mpi_rank == kda_nodes-1)
{
@@ -1028,8 +1028,8 @@
sscanf(rpc->result, "%X", &other_id);
KDA_Neighbor neighbor;
neighbor_make(other_id, rpc->target, &neighbor);
- add_neighbor(&neighbor);
- MPIRPC_Free(rpc);
+ add_neighbor(&neighbor);
+ MPIRPC_Free(rpc);
}
void
@@ -1040,15 +1040,15 @@
}
/**
- MPIRPC registered function.
- We may now ping ourselves.
+ MPIRPC registered function.
+ We may now ping ourselves.
*/
void
handle_bootping(int caller, int unique, char* args)
{
// NOTE_F;
bootping();
- MPIRPC_Return(caller, unique, NULL);
+ MPIRPC_Return(caller, unique, NULL);
}
void
@@ -1056,8 +1056,8 @@
{
if (mpi_rank < kda_nodes-1)
rpc_bootping(mpi_rank+1);
-
- KDA_Ping(id);
+
+ KDA_Ping(id);
}
/**
@@ -1066,7 +1066,7 @@
void
handle_ping(int caller, int unique, char* args)
{
- MPIRPC_Return(caller, unique, NULL);
+ MPIRPC_Return(caller, unique, NULL);
}
void
@@ -1090,15 +1090,15 @@
char*
id_tostring()
{
- return KDA_id_tostring(id);
+ return KDA_id_tostring(id);
}
char*
KDA_id_tostring(int other_id)
{
char* s_id = malloc(10*sizeof(char));
- sprintf(s_id, "%X", other_id);
- return s_id;
+ sprintf(s_id, "%X", other_id);
+ return s_id;
}
/**
@@ -1107,71 +1107,71 @@
char*
KDA_Description()
{
- char* result = malloc(sizeof(char) * 30);
+ char* result = malloc(sizeof(char) * 30);
sprintf(result, "[%X]@%i", id, mpi_rank);
- return result;
+ return result;
}
int
table_size(void)
{
int result = 0;
-
+
int i;
struct list_item* item;
for (i = 0; i < KDA_SPACE_SIZE; i++)
for (item = k_bucket[i]->head;
item; item = item->next)
result++;
-
- return result;
+
+ return result;
}
-
+
void
dump_buckets()
{
- NOTE_F;
-
- char path[100];
- sprintf(path, "table_%i_%i_%i.out",
- debug_rank, kda_nodes, dumps++);
- FILE* file = fopen(path, "w");
+ NOTE_F;
+
+ char path[100];
+ sprintf(path, "table_%i_%i_%i.out",
+ debug_rank, kda_nodes, dumps++);
+ FILE* file = fopen(path, "w");
char* result = buckets_tostring();
fprintf(file, result);
free(result);
fclose(file);
- NOTE_S("writing: ", path);
- NOTE_I("TABLE_SIZE: ", table_size());
+ NOTE_S("writing: ", path);
+ NOTE_I("TABLE_SIZE: ", table_size());
}
-char*
+char*
buckets_tostring(void)
{
int i;
struct list_item* item;
- // NOTE_F;
+ // NOTE_F;
char* result = malloc(sizeof(char)*KDA_SPACE_SIZE*k*200);
char* p = result;
char* description = KDA_Description();
- char* neighbor = NULL;
- bool bucket_named;
+ char* neighbor = NULL;
+ bool bucket_named;
int count = 0;
- int bytes = 0;
-
+ int bytes = 0;
+
p += sprintf(p, "Neighbor table for: %s \n", description);
free(description);
for (i = 0; i < KDA_SPACE_SIZE; i++)
{
- bytes += sizeof(K_BUCKET);
+ bytes += sizeof(K_BUCKET);
if (k_bucket[i]->size > 0)
{
- bucket_named = false;
+ bucket_named = false;
for (item = k_bucket[i]->head;
item; item = item->next)
{
count++;
- bytes += sizeof(KDA_Neighbor);
+ bytes += sizeof(KDA_Neighbor);
if (! bucket_named)
{
bucket_named = true;
@@ -1185,32 +1185,32 @@
}
}
p += sprintf(p, "TABLE_NEIGHBORS: %i\n", count);
- sprintf(p, "TABLE_BYTES: %i\n", bytes);
- return result;
+ sprintf(p, "TABLE_BYTES: %i\n", bytes);
+ return result;
}
-char*
+char*
bucket_usage(void)
{
int i;
struct list_item* item;
- // NOTE_F;
- char* result = malloc(200 * sizeof(char));
+ // NOTE_F;
+ char* result = malloc(200 * sizeof(char));
char* p = result;
int count = 0;
- int bytes = 0;
-
+ int bytes = 0;
+
for (i = 0; i < KDA_SPACE_SIZE; i++)
{
- bytes += sizeof(K_BUCKET);
+ bytes += sizeof(K_BUCKET);
for (item = k_bucket[i]->head;
item; item = item->next)
{
count++;
- bytes += sizeof(KDA_Neighbor);
+ bytes += sizeof(KDA_Neighbor);
}
}
p += sprintf(p, "TABLE_COUNT: %i %i %i\n", mpi_rank, dumps, count);
- p += sprintf(p, "TABLE_USAGE: %i %i %i\n", mpi_rank, dumps, bytes);
- return result;
+ p += sprintf(p, "TABLE_USAGE: %i %i %i\n", mpi_rank, dumps, bytes);
+ return result;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|