|
From: <mm...@us...> - 2012-11-21 10:48:39
|
Revision: 3293
http://dmcs.svn.sourceforge.net/dmcs/?rev=3293&view=rev
Author: mmsc
Date: 2012-11-21 10:48:28 +0000 (Wed, 21 Nov 2012)
Log Message:
-----------
Fix bugs in the computation loop of NormalContext.
Modified Paths:
--------------
dmcs/branches/dmcs1.5/include/mcs/NormalContext.h
dmcs/branches/dmcs1.5/src/mcs/NewContext.cpp
dmcs/branches/dmcs1.5/src/mcs/NormalContext.cpp
dmcs/branches/dmcs1.5/src/mcs/StreamingJoiner.cpp
dmcs/branches/dmcs1.5/src/parser/DLVResultParser.cpp
Modified: dmcs/branches/dmcs1.5/include/mcs/NormalContext.h
===================================================================
--- dmcs/branches/dmcs1.5/include/mcs/NormalContext.h 2012-11-20 22:27:55 UTC (rev 3292)
+++ dmcs/branches/dmcs1.5/include/mcs/NormalContext.h 2012-11-21 10:48:28 UTC (rev 3293)
@@ -79,6 +79,14 @@
void
init();
+ bool
+ process_input(NewBeliefState* input,
+ std::size_t parent_qid,
+ EvaluatorPtr eval,
+ NewConcurrentMessageDispatcherPtr md,
+ std::size_t& k1,
+ std::size_t& k2);
+
void
process_request(std::size_t parent_qid,
const NewHistory& history,
Modified: dmcs/branches/dmcs1.5/src/mcs/NewContext.cpp
===================================================================
--- dmcs/branches/dmcs1.5/src/mcs/NewContext.cpp 2012-11-20 22:27:55 UTC (rev 3292)
+++ dmcs/branches/dmcs1.5/src/mcs/NewContext.cpp 2012-11-21 10:48:28 UTC (rev 3293)
@@ -265,6 +265,7 @@
}
(*belief_state) = (*belief_state) | (*input_bs);
+ DBGLOG(DBG, "NewContext::send_out_result(): combined bs = " << *belief_state);
}
// otherwise, this context broke a cycle and we don't have input_bs
Modified: dmcs/branches/dmcs1.5/src/mcs/NormalContext.cpp
===================================================================
--- dmcs/branches/dmcs1.5/src/mcs/NormalContext.cpp 2012-11-20 22:27:55 UTC (rev 3292)
+++ dmcs/branches/dmcs1.5/src/mcs/NormalContext.cpp 2012-11-21 10:48:28 UTC (rev 3293)
@@ -212,79 +212,82 @@
}
-void
-NormalContext::process_request(std::size_t parent_qid,
- const NewHistory& history,
- EvaluatorPtr eval,
- NewConcurrentMessageDispatcherPtr md,
- NewJoinerDispatcherPtr jd,
- std::size_t k1,
- std::size_t k2)
+
+bool
+NormalContext::process_input(NewBeliefState* input,
+ std::size_t parent_qid,
+ EvaluatorPtr eval,
+ NewConcurrentMessageDispatcherPtr md,
+ std::size_t& k1,
+ std::size_t& k2)
{
- assert ((k1 == 0 && k2 == 0) || (0 < k1 && k1 < k2+1));
- NewBeliefState* input;
-
- while (1)
+ if (total_guessing_input != NULL)
{
- // prepare the fixed part of the input
- if (!is_leaf)
+ NewBeliefState* current_guess = new NewBeliefState(BeliefStateOffset::instance()->NO_BLOCKS(),
+ BeliefStateOffset::instance()->SIZE_BS());
+
+ (*current_guess) = (*starting_guess);
+
+ // iterate over all possible guesses or until k1 --> k2 models were computed
+ DBGLOG(DBG, "NormalContext::process_request(). Guessing input = " << *total_guessing_input);
+ bool computed_k1_k2 = false;
+ do
{
- std::size_t this_qid = query_id(ctx_id, ++query_counter);
- DBGLOG(DBG, "NormalContext[" << ctx_id << "]::process_request: trigger join with query_id = " << this_qid << " " << detailprint(this_qid));
- ReturnedBeliefState* rbs = joiner->first_join(this_qid, history, md, jd);
- if (rbs->belief_state == NULL)
+ NewBeliefState* combined_input =
+ new NewBeliefState(BeliefStateOffset::instance()->NO_BLOCKS(),
+ BeliefStateOffset::instance()->SIZE_BS());
+ (*combined_input ) = (*input) | (*current_guess);
+
+ if (compute(combined_input, k1, k2, parent_qid, eval, md))
{
+ computed_k1_k2 = true;
break;
}
- input = rbs->belief_state;
+ current_guess = next_guess(current_guess, total_guessing_input);
}
- else
- input = new NewBeliefState(BeliefStateOffset::instance()->NO_BLOCKS(),
- BeliefStateOffset::instance()->SIZE_BS());
-
- if (total_guessing_input != NULL)
+ while (current_guess);
+
+ // carefully clean up
+ if (current_guess)
{
- NewBeliefState* current_guess = new NewBeliefState(BeliefStateOffset::instance()->NO_BLOCKS(),
- BeliefStateOffset::instance()->SIZE_BS());
-
- (*current_guess) = (*starting_guess);
+ delete current_guess;
+ current_guess = NULL;
+ }
- // make guess
- DBGLOG(DBG, "NormalContext::process_request(). Guessing input = " << *total_guessing_input);
- bool computed_k1_k2 = false;
- do
- {
- NewBeliefState* combined_input =
- new NewBeliefState(BeliefStateOffset::instance()->NO_BLOCKS(),
- BeliefStateOffset::instance()->SIZE_BS());
- (*combined_input ) = (*input) | (*current_guess);
+ return computed_k1_k2;
+ }
+ else
+ {
+ return compute(input, k1, k2, parent_qid, eval, md);
+ }
+}
- if (compute(combined_input, k1, k2, parent_qid, eval, md))
- {
- computed_k1_k2 = true;
- break;
- }
- current_guess = next_guess(current_guess, total_guessing_input);
- }
- while (current_guess);
- if (computed_k1_k2 || is_leaf)
- {
- if (current_guess)
- {
- delete current_guess;
- current_guess = NULL;
- }
+void
+NormalContext::process_request(std::size_t parent_qid,
+ const NewHistory& history,
+ EvaluatorPtr eval,
+ NewConcurrentMessageDispatcherPtr md,
+ NewJoinerDispatcherPtr jd,
+ std::size_t k1,
+ std::size_t k2)
+{
+ assert ((k1 == 0 && k2 == 0) || (0 < k1 && k1 < k2+1));
- break;
- }
- }
- else
- {
- if (compute(input, k1, k2, parent_qid, eval, md) || is_leaf) break;
- }
+ std::size_t this_qid = query_id(ctx_id, ++query_counter);
+ ReturnedBeliefState* rbs = joiner->first_join(this_qid, history, md, jd);
+ NewBeliefState* input = rbs->belief_state;
+
+ while (input != NULL)
+ {
+ DBGLOG(DBG, "NormalContext::process_request: input = " << *input);
+ if (process_input(input, parent_qid, eval, md, k1, k2)) break;
+
+ this_qid = query_id(ctx_id, ++query_counter);
+ rbs = joiner->next_join(this_qid, history, md, jd);
+ input = rbs->belief_state;
}
}
Modified: dmcs/branches/dmcs1.5/src/mcs/StreamingJoiner.cpp
===================================================================
--- dmcs/branches/dmcs1.5/src/mcs/StreamingJoiner.cpp 2012-11-20 22:27:55 UTC (rev 3292)
+++ dmcs/branches/dmcs1.5/src/mcs/StreamingJoiner.cpp 2012-11-21 10:48:28 UTC (rev 3293)
@@ -135,6 +135,7 @@
if (!ask_first_packs(query_id, history, 0, neighbors->size()-1, md, jd))
{
+ DBGLOG(DBG, "StreamingJoiner::first_join: One neighbor is inconsistent. Bailing out!");
// A neighbor is inconsistent. Reset and return NULL
reset();
ReturnedBeliefState* end_rbs = new ReturnedBeliefState(NULL, query_id);
@@ -160,6 +161,7 @@
if (pack_size > 0)
{
+ DBGLOG(DBG, "StreamingJoiner::first_join: first join failed. Call next join.");
return next_join(query_id, history, md, jd);
}
@@ -175,6 +177,18 @@
NewConcurrentMessageDispatcherPtr md,
NewJoinerDispatcherPtr jd)
{
+ if (!joined_results.empty())
+ {
+ ReturnedBeliefState* rbs = joined_results.front();
+ joined_results.pop_front();
+ return rbs;
+ }
+ else if (pack_size == 0)
+ {
+ ReturnedBeliefState* end_rbs = new ReturnedBeliefState(NULL, query_id);
+ return end_rbs;
+ }
+
// now really going to the loop of asking next =============================================
while (1)
{
@@ -191,6 +205,8 @@
pc++;
std::size_t k_one = pc * pack_size + 1;
std::size_t k_two = (pc+1) * pack_size;
+
+ DBGLOG(DBG, "StreamingJoiner::next_join: k1 = " << k_one << ", k2 = " << k_two);
// Ask next at neighbor_offset
if (ask_neighbor_and_receive(next_neighbor, query_id, history, k_one, k_two, md, jd))
@@ -200,6 +216,7 @@
assert (next_neighbor > 0);
// Ask first packs before neighbor_offset
+ DBGLOG(DBG, "StreamingJoiner::next_join: Going to ask_first_pack. next_neighbor = " << next_neighbor);
bool ret = ask_first_packs(query_id, history, 0, next_neighbor - 1, md, jd);
assert (ret == true);
@@ -223,6 +240,7 @@
{
next_neighbor++;
asking_next = true;
+ DBGLOG(DBG, "StreamingJoiner::next_join: move to next neighbor = " << next_neighbor);
}
}
}
@@ -256,6 +274,7 @@
if (bs)
{
+ DBGLOG(DBG, "StreamingJoiner::ask_neighbor_and_receive: bs = " << *bs);
std::size_t offset = neighbor_offset_from_qid(qid);
std::size_t noff = ((*neighbors)[neighbor_index])->neighbor_offset;
assert (noff == offset);
@@ -401,7 +420,7 @@
// be careful that we are blocked here. Use timeout sending instead?
ReturnedBeliefState* rbs = new ReturnedBeliefState(result, query_id);
- DBGLOG(DBG, "Succeeded: push back " << *result);
+ //DBGLOG(DBG, "StreamingJoiner::join: Succeeded: push back " << *result);
joined_results.push_back(rbs);
return true;
@@ -420,7 +439,7 @@
{
assert (from_neighbor <= to_neighbor && to_neighbor < neighbors->size());
- DBGLOG(DBG, "StreamingJoiner::ask_first_packs: forward_qid = " << forward_qid);
+ DBGLOG(DBG, "StreamingJoiner::ask_first_packs: forward_qid = " << forward_qid << ", from_neighbor = " << from_neighbor << ", to_neighbor = " << to_neighbor);
for (std::size_t i = from_neighbor; i <= to_neighbor; ++i)
{
@@ -433,6 +452,7 @@
k2 = pack_size;
}
+ DBGLOG(DBG, "StreamingJoiner::ask_first_packs i = " << i << ", k1 = " << k1 << ", k2 = " << k2);
// in this method, we register to joiner_dispatcher
ask_neighbor(i, forward_qid, history, k1, k2, md, jd);
}
@@ -487,6 +507,7 @@
jd->unregisterIdOffset(qid, joiner_offset);
if (count_models_read[neighbor_index] == 0)
{
+ DBGLOG(DBG, "StreamingJoiner::ask_first_packs: encounter an inconsistent neighbor with noff = " << noff);
// this neighbor is inconsistent
return false;
}
Modified: dmcs/branches/dmcs1.5/src/parser/DLVResultParser.cpp
===================================================================
--- dmcs/branches/dmcs1.5/src/parser/DLVResultParser.cpp 2012-11-20 22:27:55 UTC (rev 3292)
+++ dmcs/branches/dmcs1.5/src/parser/DLVResultParser.cpp 2012-11-21 10:48:28 UTC (rev 3293)
@@ -55,7 +55,7 @@
{
std::string input;
std::getline(is, input);
- DBGLOG(DBG, "DLVResultParser::parse(): Input to parser = " << input);
+ //DBGLOG(DBG, "DLVResultParser::parse(): Input to parser = " << input);
if (input.empty() || is.bad())
{
/*DBGLOG(DBG, "leaving loop because got input size " << input.size()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|