Revision: 2956
http://archive-access.svn.sourceforge.net/archive-access/?rev=2956&view=rev
Author: binzino
Date: 2010-02-12 20:54:15 +0000 (Fri, 12 Feb 2010)
Log Message:
-----------
Added logic to handle per-collection segments.
Added Paths:
-----------
trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java
Added: trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java
===================================================================
--- trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java (rev 0)
+++ trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java 2010-02-12 20:54:15 UTC (rev 2956)
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+
+public class DistributedSegmentBean implements SegmentBean {
+
+ private static final ExecutorService executor =
+ Executors.newCachedThreadPool();
+
+ private final ScheduledExecutorService pingService;
+
+ private class DistSummmaryTask implements Callable<Summary[]> {
+ private int id;
+
+ private HitDetails[] details;
+ private Query query;
+
+ public DistSummmaryTask(int id) {
+ this.id = id;
+ }
+
+ public Summary[] call() throws Exception {
+ if (details == null) {
+ return null;
+ }
+ return beans[id].getSummary(details, query);
+ }
+
+ public void setSummaryArgs(HitDetails[] details, Query query) {
+ this.details = details;
+ this.query = query;
+ }
+
+ }
+
+ private class SegmentWorker implements Runnable {
+ private int id;
+
+ public SegmentWorker(int id) {
+ this.id = id;
+ }
+
+ public void run() {
+ try {
+ String[] segments = beans[id].getSegmentNames();
+ for (String segment : segments) {
+ segmentMap.put(segment, id);
+ }
+ } catch (IOException e) {
+ // remove all segments this bean was serving
+ Iterator<Map.Entry<String, Integer>> i =
+ segmentMap.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<String, Integer> entry = i.next();
+ int curId = entry.getValue();
+ if (curId == this.id) {
+ i.remove();
+ }
+ }
+ }
+ }
+ }
+
+ private long timeout;
+
+ private SegmentBean[] beans;
+
+ private boolean perCollection = false;
+
+ private ConcurrentMap<String, Integer> segmentMap;
+
+ private List<Callable<Summary[]>> summaryTasks;
+
+ private List<SegmentWorker> segmentWorkers;
+
+ public DistributedSegmentBean(Configuration conf, Path serversConfig)
+ throws IOException {
+ this.timeout = conf.getLong("ipc.client.timeout", 60000);
+ this.perCollection = conf.getBoolean( "nutchwax.FetchedSegments.perCollection", false );
+
+ List<SegmentBean> beanList = new ArrayList<SegmentBean>();
+
+ List<InetSocketAddress> segmentServers =
+ NutchBean.readAddresses(serversConfig, conf);
+
+ for (InetSocketAddress addr : segmentServers) {
+ SegmentBean bean = (RPCSegmentBean) RPC.getProxy(RPCSegmentBean.class,
+ FetchedSegments.VERSION, addr, conf);
+ beanList.add(bean);
+ }
+
+ beans = beanList.toArray(new SegmentBean[beanList.size()]);
+
+ summaryTasks = new ArrayList<Callable<Summary[]>>(beans.length);
+ segmentWorkers = new ArrayList<SegmentWorker>(beans.length);
+
+ for (int i = 0; i < beans.length; i++) {
+ summaryTasks.add(new DistSummmaryTask(i));
+ segmentWorkers.add(new SegmentWorker(i));
+ }
+
+ segmentMap = new ConcurrentHashMap<String, Integer>();
+
+ pingService = Executors.newScheduledThreadPool(beans.length);
+ for (SegmentWorker worker : segmentWorkers) {
+ pingService.scheduleAtFixedRate(worker, 0, 30, TimeUnit.SECONDS);
+ }
+ }
+
+ private SegmentBean getBean(HitDetails details) {
+ String key = perCollection ? "collection":"segment";
+ return beans[segmentMap.get(key)];
+ }
+
+ public String[] getSegmentNames() {
+ return segmentMap.keySet().toArray(new String[segmentMap.size()]);
+ }
+
+ public byte[] getContent(HitDetails details) throws IOException {
+ return getBean(details).getContent(details);
+ }
+
+ public long getFetchDate(HitDetails details) throws IOException {
+ return getBean(details).getFetchDate(details);
+ }
+
+ public ParseData getParseData(HitDetails details) throws IOException {
+ return getBean(details).getParseData(details);
+ }
+
+ public ParseText getParseText(HitDetails details) throws IOException {
+ return getBean(details).getParseText(details);
+ }
+
+ public void close() throws IOException {
+ executor.shutdown();
+ pingService.shutdown();
+ for (SegmentBean bean : beans) {
+ bean.close();
+ }
+ }
+
+ public Summary getSummary(HitDetails details, Query query)
+ throws IOException {
+ return getBean(details).getSummary(details, query);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Summary[] getSummary(HitDetails[] detailsArr, Query query)
+ throws IOException {
+ List<HitDetails>[] detailsList = new ArrayList[summaryTasks.size()];
+ for (int i = 0; i < detailsList.length; i++) {
+ detailsList[i] = new ArrayList<HitDetails>();
+ }
+ for (HitDetails details : detailsArr) {
+ String key = details.getValue( perCollection ? "collection":"segment" );
+ detailsList[segmentMap.get(key)].add(details);
+ }
+ for (int i = 0; i < summaryTasks.size(); i++) {
+ DistSummmaryTask task = (DistSummmaryTask)summaryTasks.get(i);
+ if (detailsList[i].size() > 0) {
+ HitDetails[] taskDetails =
+ detailsList[i].toArray(new HitDetails[detailsList[i].size()]);
+ task.setSummaryArgs(taskDetails, query);
+ } else {
+ task.setSummaryArgs(null, null);
+ }
+ }
+
+ List<Future<Summary[]>> summaries;
+ try {
+ summaries =
+ executor.invokeAll(summaryTasks, timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<Summary> summaryList = new ArrayList<Summary>();
+ for (Future<Summary[]> f : summaries) {
+ Summary[] summaryArray;
+ try {
+ summaryArray = f.get();
+ if (summaryArray == null) {
+ continue;
+ }
+ for (Summary summary : summaryArray) {
+ summaryList.add(summary);
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ return summaryList.toArray(new Summary[summaryList.size()]);
+ }
+
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|