|
From: Nico K. <nkl...@us...> - 2007-06-04 07:39:16
|
Update of /cvsroot/mmapps/mmapps/remotepublishing/src/org/mmbase/remotepublishing/builders In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv10757/remotepublishing/src/org/mmbase/remotepublishing/builders Modified Files: PublishingQueueBuilder.java Log Message: Added support for updates of relations between nodes Index: PublishingQueueBuilder.java =================================================================== RCS file: /cvsroot/mmapps/mmapps/remotepublishing/src/org/mmbase/remotepublishing/builders/PublishingQueueBuilder.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** PublishingQueueBuilder.java 18 Apr 2007 07:50:11 -0000 1.4 --- PublishingQueueBuilder.java 4 Jun 2007 07:39:03 -0000 1.5 *************** *** 35,42 **** /** * Main class that publishes or removes nodes to or from remote clouds. - * @author keesj - * @version PublishingQueueBuilder.java,v 1.2 2003/07/28 09:43:51 nico Exp */ public class PublishingQueueBuilder extends MMObjectBuilder implements Runnable{ private static Logger log = Logging.getLoggerInstance(PublishingQueueBuilder.class.getName()); --- 35,58 ---- /** * Main class that publishes or removes nodes to or from remote clouds. */ public class PublishingQueueBuilder extends MMObjectBuilder implements Runnable{ + public static final String FIELD_TIMESTAMP = "timestamp"; + public static final String FIELD_PUBLISHDATE = "publishdate"; + public static final String FIELD_STATUS = "status"; + public static final String FIELD_ACTION = "action"; + public static final String FIELD_DESTINATIONCLOUD = "destinationcloud"; + public static final String FIELD_SOURCENUMBER = "sourcenumber"; + public static final String FIELD_RELATEDNODES = "relatednodes"; + + public static final String ACTION_UPDATE = "update"; + public static final String ACTION_UPDATE_NODE = "update-node"; + public static final String ACTION_UPDATE_RELATIONS = "update-relations"; + + public static final String ACTION_REMOVE = "remove"; + + public static final String STATUS_DONE = "done"; + public static final String STATUS_FAIL = "fail"; + + private static Logger log = Logging.getLoggerInstance(PublishingQueueBuilder.class.getName()); *************** *** 101,105 **** } ! node.setValue("destinationcloud", remoteCloudNumber); } --- 117,121 ---- } ! node.setValue(FIELD_DESTINATIONCLOUD, remoteCloudNumber); } *************** *** 169,252 **** CloudInfo localCloudInfo = CloudInfo.getDefaultCloudInfo(); NodeManager nodeManager = localCloudInfo.getCloud().getNodeManager("publishqueue"); ! NodeQuery query = nodeManager.createQuery(); ! ! StepField statusField = query.getStepField(nodeManager.getField("status")); ! FieldValueConstraint failStatus = query.createConstraint(statusField, ! FieldCompareConstraint.NOT_EQUAL, "fail"); ! query.setCaseSensitive(failStatus, true); ! FieldValueConstraint doneStatus = query.createConstraint(statusField, ! FieldCompareConstraint.NOT_EQUAL, "done"); ! query.setCaseSensitive(doneStatus, true); ! Constraint statusComposite = query.createConstraint( ! failStatus, CompositeConstraint.LOGICAL_AND, doneStatus); ! ! if (!nodeManager.hasField("publishdate")) { ! query.setConstraint(statusComposite); ! } ! else { ! StepField publishDateField = query.getStepField(nodeManager.getField("publishdate")); ! Constraint publishNull = query.createConstraint(publishDateField); ! Constraint publishNow = query.createConstraint( ! publishDateField, FieldCompareConstraint.LESS_EQUAL, new Date()); ! ! Constraint publishDateComposite = query.createConstraint( ! publishNow, CompositeConstraint.LOGICAL_OR, publishNull); ! Constraint composite = query.createConstraint( ! statusComposite, CompositeConstraint.LOGICAL_AND, publishDateComposite); ! query.setConstraint(composite); ! } ! query.setMaxNumber(10); ! ! query.setCachePolicy(CachePolicy.NEVER); NodeList list = null; while(list == null || !list.isEmpty()) { - //FIXME: add this timestamp as debug information for performance inspector - long timestamp = System.currentTimeMillis(); list = nodeManager.getList(query); for (int x = 0; x < list.size(); x++) { ! Node node = list.getNode(x); ! if(node != null) { try { // check if node is not removed from the cloud in between query and retrieval ! if (localCloudInfo.getCloud().hasNode(node.getNumber())) { ! if (node.getStringValue("action").equalsIgnoreCase("update")) { try { ! if (isAllowedToPublish(node)) { ! boolean finished = false; ! int trytimes = 0; ! while(!finished) { ! try { ! publish(localCloudInfo, node); ! finished=true; ! } catch (BridgeException e) { ! if ( handleRmiException(e) && ++trytimes<=maxtrying) { ! //if it was caused rmi connection exception and still within ! // the maximal tring times limitation, ! // continue to try publishing it again ! } else { ! //otherwise, throw the runtime exception ! throw e; ! } } ! }; ! node.setStringValue("status", "done"); ! node.commit(); } ! } catch (BridgeException e) { ! log.error("Nodenumber : " + node.getNumber() + ", " + e, e); ! publishFailed(localCloudInfo, node, e); } ! } ! else { ! if (node.getStringValue("action").equalsIgnoreCase("remove")) { try { ! removeNode(localCloudInfo, node); } catch (BridgeException e) { ! log.error("Removing published node (" + node.getNumber() + ") failed", e); ! publishFailed(localCloudInfo, node, e); } } --- 185,236 ---- CloudInfo localCloudInfo = CloudInfo.getDefaultCloudInfo(); NodeManager nodeManager = localCloudInfo.getCloud().getNodeManager("publishqueue"); ! NodeQuery query = createQuery(nodeManager); NodeList list = null; while(list == null || !list.isEmpty()) { list = nodeManager.getList(query); for (int x = 0; x < list.size(); x++) { ! Node queueNode = list.getNode(x); ! if(queueNode != null) { try { // check if node is not removed from the cloud in between query and retrieval ! if (localCloudInfo.getCloud().hasNode(queueNode.getNumber())) { ! String action = queueNode.getStringValue(FIELD_ACTION); ! if (isUpdateAction(action)) { try { ! boolean finished = false; ! int trytimes = 0; ! while(!finished) { ! try { ! update(localCloudInfo, queueNode, action); ! finished=true; ! } catch (BridgeException e) { ! if ( handleRmiException(e) && ++trytimes<=maxtrying) { ! //if it was caused rmi connection exception and still within ! // the maximal tring times limitation, ! // continue to try publishing it again ! } else { ! //otherwise, throw the runtime exception ! throw e; } ! } } ! queueNode.setStringValue(FIELD_STATUS, STATUS_DONE); ! queueNode.commit(); ! } catch (BridgeException e) { ! log.error("Nodenumber : " + queueNode.getNumber() + ", " + e, e); ! publishFailed(localCloudInfo, queueNode, e); } ! } ! else { ! if (isRemoveAction(action)) { try { ! removeNode(localCloudInfo, queueNode); } catch (BridgeException e) { ! log.error("Removing published node (" + queueNode.getNumber() + ") failed", e); ! publishFailed(localCloudInfo, queueNode, e); } } *************** *** 255,265 **** } catch (Throwable e) { ! log.error("Throwable error with nodenumber: " + node.getNumber() + ", " + e.getMessage()); log.debug(Logging.stackTrace(e)); } ! } } //end for ! //FIXME: add this log message as debug information for performance inspector ! log.debug("Published total "+list.size()+" nodes in "+(System.currentTimeMillis()-timestamp)+" ms!"); } } --- 239,248 ---- } catch (Throwable e) { ! log.error("Throwable error with nodenumber: " + queueNode.getNumber() + ", " + e.getMessage()); log.debug(Logging.stackTrace(e)); } ! } } //end for ! log.debug("Published total "+list.size()+" nodes"); } } *************** *** 267,270 **** --- 250,298 ---- } + private boolean isRemoveAction(String action) { + return action.equalsIgnoreCase(ACTION_REMOVE); + } + + private boolean isUpdateAction(String action) { + return action.equalsIgnoreCase(ACTION_UPDATE) || action.equalsIgnoreCase(ACTION_UPDATE_NODE) + || action.equalsIgnoreCase(ACTION_UPDATE_RELATIONS); + } + + private NodeQuery createQuery(NodeManager nodeManager) { + NodeQuery query = nodeManager.createQuery(); + + StepField statusField = query.getStepField(nodeManager.getField(FIELD_STATUS)); + FieldValueConstraint failStatus = query.createConstraint(statusField, + FieldCompareConstraint.NOT_EQUAL, STATUS_FAIL); + query.setCaseSensitive(failStatus, true); + FieldValueConstraint doneStatus = query.createConstraint(statusField, + FieldCompareConstraint.NOT_EQUAL, STATUS_DONE); + query.setCaseSensitive(doneStatus, true); + Constraint statusComposite = query.createConstraint( + failStatus, CompositeConstraint.LOGICAL_AND, doneStatus); + + if (!nodeManager.hasField(FIELD_PUBLISHDATE)) { + query.setConstraint(statusComposite); + } + else { + StepField publishDateField = query.getStepField(nodeManager.getField(FIELD_PUBLISHDATE)); + Constraint publishNull = query.createConstraint(publishDateField); + Constraint publishNow = query.createConstraint( + publishDateField, FieldCompareConstraint.LESS_EQUAL, new Date()); + + Constraint publishDateComposite = query.createConstraint( + publishNow, CompositeConstraint.LOGICAL_OR, publishNull); + Constraint composite = query.createConstraint( + statusComposite, CompositeConstraint.LOGICAL_AND, publishDateComposite); + query.setConstraint(composite); + } + query.setMaxNumber(10); + StepField timestampField = query.getStepField(nodeManager.getField(FIELD_TIMESTAMP)); + query.addSortOrder(timestampField, SortOrder.ORDER_ASCENDING); + + query.setCachePolicy(CachePolicy.NEVER); + return query; + } + /** * handle exception, if the root cause is RMI's connection broken, *************** *** 307,314 **** private void publishFailed(CloudInfo localCloudInfo, Node node, BridgeException e) { ! node.setStringValue("status", "fail"); node.commit(); for(PublishListener listener : publishListeners) { ! int number = node.getIntValue("sourcenumber"); if (localCloudInfo.getCloud().hasNode(number)) { StringBuffer message = new StringBuffer(); --- 335,342 ---- private void publishFailed(CloudInfo localCloudInfo, Node node, BridgeException e) { ! node.setStringValue(FIELD_STATUS, STATUS_FAIL); node.commit(); for(PublishListener listener : publishListeners) { ! int number = node.getIntValue(FIELD_SOURCENUMBER); if (localCloudInfo.getCloud().hasNode(number)) { StringBuffer message = new StringBuffer(); *************** *** 326,337 **** } } ! ! protected boolean isAllowedToPublish(Node publishQueueNode) { ! return true; ! } ! ! private void publish(CloudInfo localCloudInfo, Node publishQueueNode) { ! int localNodeNumber = publishQueueNode.getIntValue("sourcenumber"); ! int remoteCloudNumber = publishQueueNode.getIntValue("destinationcloud"); CloudInfo remoteCloudInfo = CloudInfo.getCloudInfo(remoteCloudNumber); Node localNode = localCloudInfo.getCloud().getNode(localNodeNumber); --- 354,361 ---- } } ! ! private void update(CloudInfo localCloudInfo, Node queueNode, String action) { ! int localNodeNumber = queueNode.getIntValue(FIELD_SOURCENUMBER); ! int remoteCloudNumber = queueNode.getIntValue(FIELD_DESTINATIONCLOUD); CloudInfo remoteCloudInfo = CloudInfo.getCloudInfo(remoteCloudNumber); Node localNode = localCloudInfo.getCloud().getNode(localNodeNumber); *************** *** 343,354 **** if (PublishManager.isPublished(localCloudInfo, localNode)) { if (localNode instanceof Relation) { ! log.debug(nodeManagerName + " update relation with number " + ! localNodeNumber); } else { ! log.debug(nodeManagerName + " update node with number " + ! localNodeNumber); } - - PublishManager.updatePublishedNodes(localCloudInfo, localNode); } else { if (localNode instanceof Relation) { --- 367,395 ---- if (PublishManager.isPublished(localCloudInfo, localNode)) { if (localNode instanceof Relation) { ! log.debug(nodeManagerName + " update relation with number " + localNodeNumber); } else { ! log.debug(nodeManagerName + " update node with number " + localNodeNumber); ! } ! if (action.equalsIgnoreCase(ACTION_UPDATE_NODE)) { ! PublishManager.updateNodesAndRelations(localCloudInfo, localNode, true, false); ! } ! if (action.equalsIgnoreCase(ACTION_UPDATE)) { ! PublishManager.updateNodesAndRelations(localCloudInfo, localNode, true, true); ! } ! if (action.equalsIgnoreCase(ACTION_UPDATE_RELATIONS)) { ! String relatedNodes = queueNode.getStringValue(FIELD_RELATEDNODES); ! if (relatedNodes != null && relatedNodes.length() > 0) { ! List<Integer> related = new ArrayList<Integer>(); ! StringTokenizer tokenizer = new StringTokenizer(relatedNodes, ","); ! while(tokenizer.hasMoreTokens()) { ! int relatedNodeNumber = Integer.valueOf(tokenizer.nextToken()); ! related.add(relatedNodeNumber); ! } ! PublishManager.updateNodesAndRelations(localCloudInfo, localNode, false, true, related); ! } ! else { ! PublishManager.updateNodesAndRelations(localCloudInfo, localNode, false, true); ! } } } else { if (localNode instanceof Relation) { *************** *** 360,364 **** } ! PublishManager.publishNode(localCloudInfo, localNode, remoteCloudInfo); } --- 401,410 ---- } ! if (action.equalsIgnoreCase(ACTION_UPDATE_NODE)) { ! PublishManager.createNodeAndRelations(localCloudInfo, localNode, remoteCloudInfo, false); ! } ! if (action.equalsIgnoreCase(ACTION_UPDATE)) { ! PublishManager.createNodeAndRelations(localCloudInfo, localNode, remoteCloudInfo, true); ! } } *************** *** 377,386 **** } ! private void removeNode(CloudInfo localCloudInfo, Node publishQueueNode) { ! int number = publishQueueNode.getIntValue("sourcenumber"); PublishManager.deletePublishedNode(localCloudInfo, number); ! publishQueueNode.setStringValue("status", "done"); ! publishQueueNode.commit(); } --- 423,432 ---- } ! private void removeNode(CloudInfo localCloudInfo, Node queueNode) { ! int number = queueNode.getIntValue(FIELD_SOURCENUMBER); PublishManager.deletePublishedNode(localCloudInfo, number); ! queueNode.setStringValue(FIELD_STATUS, STATUS_DONE); ! queueNode.commit(); } |