katta-commits Mailing List for katta
Brought to you by:
joa23
You can subscribe to this list here.
| 2008 |
Jan
|
Feb
|
Mar
|
Apr
|
May
(24) |
Jun
(33) |
Jul
(78) |
Aug
(46) |
Sep
(78) |
Oct
(35) |
Nov
(4) |
Dec
(33) |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2009 |
Jan
(1) |
Feb
(2) |
Mar
(35) |
Apr
(45) |
May
(14) |
Jun
(19) |
Jul
(5) |
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
| 2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
(2) |
Sep
|
Oct
|
Nov
|
Dec
|
| 2011 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(1) |
| 2012 |
Jan
(1) |
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
|
From: Johannes Z. <jzi...@go...> - 2012-01-03 20:20:00
|
Hey Wei,
the DefaultNodeSelectionPolicy manages a list of nodes per shard.
Every time a shard is requested the first node of this list is retrieved (and put on the end of the list).
In your scenario the lists for each shard are identical and so the round-robin picks exactly the same node for each shard.
The same kind of 'problem' exists probably as well if multiple clients are used and they have a similar request-interval.
The simplest solution to avoid this which comes to my mind is to shuffle the list when creating them.
However, i also wonder if those equal node-lists-per-shard-situation really does occur that often in 'real world' deployments.
Any further thoughts on this ?
Johannes
PS: Please use the katta dev mailing list next time. The commits mailing list is just for the commits or at the maximum for discussing commits!
On Dec 21, 2011, at 4:31 PM, Lu, Wei wrote:
> Hi,
> I find a problem about DefaultNodeSelectionPolicy.
> suppose I have 2 shards deployed in 2 nodes and replicalevel is 2, each node has 2 shards {node1:[index#shard1, index#shard2], node2:[index#shard1, index#shard2]}. When I search based on the index, f.e, "./katta search index content:katta". Then the rpc call to both shards with be called to ONLY one node, according to DefaultNodeSelectionPolicy:
>
> node = nodeList.getNext(); // just choose the first node in the list without balancing...
>
> Actually, I think DefaultNodeSelectionPolicy can do better balance if just do what DefauDistributionPolicy does. What's your opinion???
>
> Regards,
> Wei
>
> ------------------------------------------------------------------------------
> Write once. Port to many.
> Get the SDK and tools to simplify cross-platform app development. Create
> new or port existing apps to sell to consumers worldwide. Explore the
> Intel AppUpSM program developer opportunity. appdeveloper.intel.com/join
> http://p.sf.net/sfu/intel-appdev_______________________________________________
> Katta-commits mailing list
> Kat...@li...
> https://lists.sourceforge.net/lists/listinfo/katta-commits
|
|
From: Lu, W. <wl...@mi...> - 2011-12-21 15:31:40
|
Hi,
I find a problem about DefaultNodeSelectionPolicy.
suppose I have 2 shards deployed in 2 nodes and replicalevel is 2, each node has 2 shards {node1:[index#shard1, index#shard2], node2:[index#shard1, index#shard2]}. When I search based on the index, f.e, "./katta search index content:katta". Then the rpc call to both shards with be called to ONLY one node, according to DefaultNodeSelectionPolicy:
node = nodeList.getNext(); // just choose the first node in the list without balancing...
Actually, I think DefaultNodeSelectionPolicy can do better balance if just do what DefauDistributionPolicy does. What's your opinion???
Regards,
Wei
|
|
From: Johannes Z. <jzi...@go...> - 2010-08-27 17:19:24
|
Hey 张成, please use the katta developer mailing list for such question (You're posting to the mailing-list were usually only the commit messages goes to). However, see some comments inline: On Aug 27, 2010, at 8:22 AM, 张成 wrote: > hello,everyone: > > I have used katta for several months. It is simple, but very stable.But it also has some problems. > 1) some time the state of index will change to ERROR, acturelly, there is no error. I don't know why? Did you try to get more detailed information with this command ? bin/katta listErrors <index name> Also have a look at the log files. > 2) some time, there are a lot of operations waiting in the node queue, maybe several 100s, I think it maybe the memory is too low. But I still can not understand why there are so many operations in the same node queue. Could you give more information on that ? What kind of operations are queued ? What is the workload of your cluster, are you running index addings or removal at that time ? Does the nodes recover from that situation ? (Now i've ask already more questions then you did ;)) > 3) Katta does not support the real-time search which become more and more important. Do you have some idea about this? In my opinion, we need to add a kind of node which is store real-time index in memory. Not sure what you mean by this. Can you express in lucene terminology what you want to have ? > > I hope that my opinion will help you. Yes it very welcome! regards Johannes > ------------------------------------------------------------------------------ > Sell apps to millions through the Intel(R) Atom(Tm) Developer Program > Be part of this innovative community and reach millions of netbook users > worldwide. Take advantage of special opportunities to increase revenue and > speed time-to-market. Join now, and jumpstart your future. > http://p.sf.net/sfu/intel-atom-d2d_______________________________________________ > Katta-commits mailing list > Kat...@li... > https://lists.sourceforge.net/lists/listinfo/katta-commits |
|
From: 张成 <zh...@gm...> - 2010-08-27 06:23:01
|
hello,everyone: I have used katta for several months. It is simple, but very stable.But it also has some problems. 1) some time the state of index will change to ERROR, acturelly, there is no error. I don't know why? 2) some time, there are a lot of operations waiting in the node queue, maybe several 100s, I think it maybe the memory is too low. But I still can not understand why there are so many operations in the same node queue. 3) Katta does not support the real-time search which become more and more important. Do you have some idea about this? In my opinion, we need to add a kind of node which is store real-time index in memory. I hope that my opinion will help you. |
|
From: <jo...@us...> - 2009-07-01 20:49:44
|
Revision: 478
http://katta.svn.sourceforge.net/katta/?rev=478&view=rev
Author: joa23
Date: 2009-07-01 20:49:13 +0000 (Wed, 01 Jul 2009)
Log Message:
-----------
KATTA-54, makes katta more generic. ATTENTION this is a experimental commit, that makes Katta completely unstable.
The test suite is broken and functionality commented out.
Modified Paths:
--------------
trunk/README.txt
trunk/ivy.xml
trunk/src/main/java/net/sf/katta/index/DeployedShard.java
trunk/src/main/java/net/sf/katta/index/IndexMetaData.java
trunk/src/main/java/net/sf/katta/index/ShardError.java
trunk/src/main/java/net/sf/katta/index/indexer/merge/IndexMergeApplication.java
trunk/src/main/java/net/sf/katta/loadtest/LoadTestNode.java
trunk/src/main/java/net/sf/katta/loadtest/LoadTestStarter.java
trunk/src/main/java/net/sf/katta/master/DistributeShardsThread.java
trunk/src/main/java/net/sf/katta/master/Master.java
trunk/src/main/java/net/sf/katta/node/Hit.java
trunk/src/main/java/net/sf/katta/node/Hits.java
trunk/src/main/java/net/sf/katta/node/NodeMetaData.java
trunk/src/main/java/net/sf/katta/node/QueryWritable.java
trunk/src/main/java/net/sf/katta/node/TermWritable.java
trunk/src/main/java/net/sf/katta/tool/ZkTool.java
trunk/src/main/java/net/sf/katta/util/CircularList.java
trunk/src/main/java/net/sf/katta/util/IndexConfiguration.java
trunk/src/main/java/net/sf/katta/util/KattaConfiguration.java
trunk/src/main/java/net/sf/katta/util/ZkConfiguration.java
trunk/src/main/java/net/sf/katta/zk/ZKClient.java
trunk/src/test/integration/net/sf/katta/integrationTest/KattaMiniCluster.java
trunk/src/test/integration/net/sf/katta/integrationTest/SearchIntegrationTest.java
trunk/src/test/java/net/sf/katta/AbstractKattaTest.java
trunk/src/test/java/net/sf/katta/KattaTest.java
trunk/src/test/java/net/sf/katta/NodeMasterReconnectTest.java
trunk/src/test/java/net/sf/katta/PerformanceTest.java
trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java
trunk/src/test/java/net/sf/katta/loadtest/LoadTestStarterTest.java
trunk/src/test/java/net/sf/katta/master/FailTest.java
trunk/src/test/java/net/sf/katta/master/MasterTest.java
trunk/src/test/java/net/sf/katta/node/KattaMultiSearcherTest.java
trunk/src/test/java/net/sf/katta/node/NodeTest.java
trunk/src/test/java/net/sf/katta/testutil/TestResources.java
trunk/src/test/java/net/sf/katta/zk/ZKClientTest.java
trunk/src/test/resources/katta.zk.properties
trunk/src/test/resources/log4j.properties
Added Paths:
-----------
trunk/src/main/java/net/sf/katta/node/AbstractServer.java
trunk/src/main/java/net/sf/katta/node/ILuceneServer.java
trunk/src/main/java/net/sf/katta/node/IMapFileServer.java
trunk/src/main/java/net/sf/katta/node/INodeManaged.java
trunk/src/main/java/net/sf/katta/node/LuceneServer.java
trunk/src/main/java/net/sf/katta/node/MapFileServer.java
trunk/src/main/java/net/sf/katta/node/Node.java
trunk/src/main/java/net/sf/katta/node/TextArrayWritable.java
trunk/src/test/java/net/sf/katta/MultiInstanceTest.java
trunk/src/test/java/net/sf/katta/client/AlternateRootCfgClientTest.java
trunk/src/test/java/net/sf/katta/client/ClientResultTest.java
trunk/src/test/java/net/sf/katta/client/LuceneClientFailoverTest.java
trunk/src/test/java/net/sf/katta/client/LuceneClientTest.java
trunk/src/test/java/net/sf/katta/client/MapFileClientTest.java
trunk/src/test/java/net/sf/katta/client/NodeInteractionTest.java
trunk/src/test/java/net/sf/katta/client/ResultCompletePolicyTest.java
trunk/src/test/java/net/sf/katta/client/SleepClientTest.java
trunk/src/test/java/net/sf/katta/client/WorkQueueTest.java
trunk/src/test/java/net/sf/katta/master/AlternateRootCfgMasterTest.java
trunk/src/test/java/net/sf/katta/node/AlternateRootCfgNodeTest.java
trunk/src/test/java/net/sf/katta/node/DocumentFrequencyWritableTest.java
trunk/src/test/java/net/sf/katta/node/MapFileServerTest.java
trunk/src/test/java/net/sf/katta/node/SleepServerTest.java
trunk/src/test/java/net/sf/katta/util/GenerateMapFiles.java
trunk/src/test/java/net/sf/katta/util/ISleepClient.java
trunk/src/test/java/net/sf/katta/util/ISleepServer.java
trunk/src/test/java/net/sf/katta/util/SleepClient.java
trunk/src/test/java/net/sf/katta/util/SleepServer.java
trunk/src/test/java/net/sf/katta/util/ZkConfigurationTest.java
trunk/src/test/java/net/sf/katta/zk/ZkPathsTest.java
trunk/src/test/resources/katta.zk.properties_alt_root
Removed Paths:
-------------
trunk/src/main/java/net/sf/katta/node/BaseNode.java
trunk/src/main/java/net/sf/katta/node/BaseRpcServer.java
trunk/src/main/java/net/sf/katta/node/IRequestHandler.java
trunk/src/main/java/net/sf/katta/node/ISearch.java
trunk/src/main/java/net/sf/katta/node/KattaMultiSearcher.java
trunk/src/main/java/net/sf/katta/node/LuceneNode.java
trunk/src/main/java/net/sf/katta/zk/ZkPathes.java
trunk/src/test/java/net/sf/katta/client/ClientFailoverTest.java
trunk/src/test/java/net/sf/katta/client/ClientTest.java
trunk/src/test/java/net/sf/katta/node/DocumentFrequenceWritableTest.java
trunk/src/test/java/net/sf/katta/zk/ZkPathesTest.java
Modified: trunk/README.txt
===================================================================
--- trunk/README.txt 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/README.txt 2009-07-01 20:49:13 UTC (rev 478)
@@ -1,4 +1,5 @@
-Katta, Lucene in the cloud.
+Katta: Lucene in the cloud. Or MapFiles. Or your own custom server.
+
For the latest information about Katta, please visit our website at:
http://katta.sourceforge.net/
Modified: trunk/ivy.xml
===================================================================
--- trunk/ivy.xml 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/ivy.xml 2009-07-01 20:49:13 UTC (rev 478)
@@ -43,7 +43,7 @@
<dependency org="org.apache.commons" name="commons-httpclient" rev="3.0.1" conf="compile" />
<dependency org="org.apache.commons" name="commons-logging" rev="1.0.4" conf="compile" />
<dependency org="org.apache.commons" name="commons-logging-api" rev="1.0.4" conf="compile" />
- <dependency org="commons-math" name="commons-math" rev="1.2" conf="compile" />
+ <dependency org="commons-math" name="commons-math" rev="1.2" conf="compile" />
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.19.0" conf="compile" />
<dependency org="org.apache.hadoop" name="hadoop-test" rev="0.19.0" conf="compile" />
@@ -61,10 +61,10 @@
<dependency org="xmlenc" name="xmlenc" rev="0.52" conf="compile" />
<dependency org="zookeeper" name="zookeeper" rev="3.1.1" conf="compile" />
- <!-- AWS -->
- <dependency org="jets3t" name="jets3t" rev="0.6.1" conf="compile" />
- <dependency org="jsch" name="jsch" rev="0.1.41" conf="compile" />
- <dependency org="typica" name="typica" rev="1.5.1" conf="compile" />
+ <!-- AWS -->
+ <dependency org="jets3t" name="jets3t" rev="0.6.1" conf="compile" />
+ <dependency org="jsch" name="jsch" rev="0.1.41" conf="compile" />
+ <dependency org="typica" name="typica" rev="1.5.1" conf="compile" />
</dependencies>
Modified: trunk/src/main/java/net/sf/katta/index/DeployedShard.java
===================================================================
--- trunk/src/main/java/net/sf/katta/index/DeployedShard.java 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/src/main/java/net/sf/katta/index/DeployedShard.java 2009-07-01 20:49:13 UTC (rev 478)
@@ -26,16 +26,16 @@
public class DeployedShard implements Writable {
- private String _shardName;
- private Map<String, String> _metaData;
+ private String _shardName = "";
+ private Map<String, String> _metaData = new HashMap<String, String>();
public DeployedShard() {
// for serialization
}
public DeployedShard(final String shardName, final Map<String, String> metaData) {
- _shardName = shardName;
- _metaData = metaData;
+ _shardName = shardName != null ? shardName : "";
+ _metaData = metaData != null ? metaData : new HashMap<String, String>();
}
public void readFields(final DataInput in) throws IOException {
Modified: trunk/src/main/java/net/sf/katta/index/IndexMetaData.java
===================================================================
--- trunk/src/main/java/net/sf/katta/index/IndexMetaData.java 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/src/main/java/net/sf/katta/index/IndexMetaData.java 2009-07-01 20:49:13 UTC (rev 478)
@@ -62,11 +62,11 @@
}
}
+
public String getPath() {
return _path.toString();
}
-
public IndexState getState() {
return _state;
}
Modified: trunk/src/main/java/net/sf/katta/index/ShardError.java
===================================================================
--- trunk/src/main/java/net/sf/katta/index/ShardError.java 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/src/main/java/net/sf/katta/index/ShardError.java 2009-07-01 20:49:13 UTC (rev 478)
@@ -23,7 +23,7 @@
public class ShardError implements Writable {
- private String _errorMsg;
+ private String _errorMsg = "";
private long _timestamp = System.currentTimeMillis();
public ShardError() {
@@ -31,7 +31,7 @@
}
public ShardError(String errorMsg) {
- _errorMsg = errorMsg;
+ _errorMsg = errorMsg != null ? errorMsg : "";
}
public String getErrorMsg() {
Modified: trunk/src/main/java/net/sf/katta/index/indexer/merge/IndexMergeApplication.java
===================================================================
--- trunk/src/main/java/net/sf/katta/index/indexer/merge/IndexMergeApplication.java 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/src/main/java/net/sf/katta/index/indexer/merge/IndexMergeApplication.java 2009-07-01 20:49:13 UTC (rev 478)
@@ -36,7 +36,6 @@
import net.sf.katta.util.KattaException;
import net.sf.katta.util.ZkConfiguration;
import net.sf.katta.zk.ZKClient;
-import net.sf.katta.zk.ZkPathes;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -79,16 +78,16 @@
List<IndexMetaData> deployedIndexes = new ArrayList<IndexMetaData>();
for (String indexName : indexNames) {
IndexMetaData indexMetaData = new IndexMetaData();
- _zkClient.readData(ZkPathes.getIndexPath(indexName), indexMetaData);
+ _zkClient.readData(_zkClient.getConfig().getZKIndexPath(indexName), indexMetaData);
deployedIndexes.add(indexMetaData);
}
- Set<Path> indexPathes = new HashSet<Path>();
+ Set<Path> indexPaths = new HashSet<Path>();
for (IndexMetaData indexMetaData : deployedIndexes) {
Path indexPath = new Path(indexMetaData.getPath());
- indexPathes.add(indexPath);
+ indexPaths.add(indexPath);
}
- LOG.info("found following indexes for potential merge: " + indexPathes);
+ LOG.info("found following indexes for potential merge: " + indexPaths);
IndexConfiguration indexConfiguration = new IndexConfiguration();
indexConfiguration.enrichJobConf(_jobConf, DfsIndexInputFormat.DOCUMENT_INFORMATION);
@@ -116,7 +115,7 @@
FileSystem fileSystem = FileSystem.get(_jobConf);
LOG.debug("using file system: " + fileSystem.getUri());
try {
- indexMergeJob.merge(indexPathes.toArray(new Path[indexPathes.size()]), mergedIndex);
+ indexMergeJob.merge(indexPaths.toArray(new Path[indexPaths.size()]), mergedIndex);
if (!fileSystem.exists(mergedIndex)) {
throw new IllegalStateException("merged index '" + mergedIndex + "' does not exists");
@@ -148,7 +147,7 @@
+ "-originals");
fileSystem.mkdirs(archiveRootPath);
LOG.info("moving old merged indices to archive: " + archiveRootPath);
- for (Path indexPath : indexPathes) {
+ for (Path indexPath : indexPaths) {
Path parentPath = indexPath.getParent();// parent of /indexes
Path indexArchivePath = new Path(archiveRootPath, parentPath.getName());
LOG.debug("moving " + parentPath + " to " + indexArchivePath);
@@ -163,7 +162,7 @@
private int countShards(List<String> indexNames) throws KattaException {
int shardCount = 0;
for (String index : indexNames) {
- shardCount += _zkClient.countChildren(ZkPathes.getIndexPath(index));
+ shardCount += _zkClient.countChildren(_zkClient.getConfig().getZKIndexPath(index));
}
return shardCount;
}
Modified: trunk/src/main/java/net/sf/katta/loadtest/LoadTestNode.java
===================================================================
--- trunk/src/main/java/net/sf/katta/loadtest/LoadTestNode.java 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/src/main/java/net/sf/katta/loadtest/LoadTestNode.java 2009-07-01 20:49:13 UTC (rev 478)
@@ -15,232 +15,209 @@
*/
package net.sf.katta.loadtest;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-import java.util.Vector;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import net.sf.katta.client.Client;
-import net.sf.katta.client.IClient;
-import net.sf.katta.node.BaseRpcServer;
-import net.sf.katta.node.Query;
-import net.sf.katta.util.KattaException;
-import net.sf.katta.util.LoadTestNodeConfiguration;
-import net.sf.katta.zk.IZkReconnectListener;
-import net.sf.katta.zk.ZKClient;
-import net.sf.katta.zk.ZkPathes;
+public class LoadTestNode /*extends BaseRpcServer implements ILoadTestNode */{
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public class LoadTestNode extends BaseRpcServer implements ILoadTestNode {
-
- final static Logger LOG = Logger.getLogger(LoadTestNode.class);
-
- private ZKClient _zkClient;
- ScheduledExecutorService _executorService;
- private List<LoadTestQueryResult> _statistics;
-
- private Lock _shutdownLock = new ReentrantLock(true);
- private volatile boolean _shutdown = false;
- LoadTestNodeConfiguration _configuration;
- LoadTestNodeMetaData _metaData;
- IClient _client = new Client();
- private String _currentNodeName;
- private Random _random = new Random(System.currentTimeMillis());
-
- private int _queryRate;
-
- private int _count;
-
- private String[] _indexNames;
-
- private String[] _queryStrings;
-
- private final class TestSearcherRunnable implements Runnable {
- private int _count;
- private String[] _indexNames;
- private String[] _queryStrings;
- private int _queryIndex;
- private int _testDelay;
-
- TestSearcherRunnable(int testDelay, int count, String[] indexNames, String[] queryStrings) {
- _count = count;
- _indexNames = indexNames;
- _queryStrings = queryStrings;
- _testDelay = testDelay;
- _queryIndex = _random.nextInt(_queryStrings.length);
- LOG.info("Starting dictionary search at index " + _queryIndex);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void run() {
- // TODO PVo search for different terms
- long startTime = System.currentTimeMillis();
- String queryString = _queryStrings[_queryIndex];
- _queryIndex = (_queryIndex + 1) % _queryStrings.length;
- try {
- _client.search(new Query(queryString), _indexNames, _count);
- long endTime = System.currentTimeMillis();
- _statistics.add(new LoadTestQueryResult(startTime, endTime, queryString, getRpcHostName() + ":"
- + getRpcServerPort()));
- } catch (KattaException e) {
- _statistics.add(new LoadTestQueryResult(startTime, -1, queryString, getRpcHostName() + ":"
- + getRpcServerPort()));
- LOG.error("Search failed.", e);
- }
- long endTime = System.currentTimeMillis();
- int testDelay = Math.max(0, (int) (_testDelay - (endTime - startTime)));
- _executorService.schedule(this, _random.nextInt(testDelay * 2), TimeUnit.MILLISECONDS);
- }
- }
-
- class ReconnectListener implements IZkReconnectListener {
-
- @Override
- public void handleNewSession() throws Exception {
- LOG.info("Reconnecting load test node.");
- announceTestSearcher(_metaData);
- }
-
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- // do nothing
- }
- }
-
- public LoadTestNode(final ZKClient zkClient, LoadTestNodeConfiguration configuration) throws KattaException {
- _zkClient = zkClient;
- _configuration = configuration;
- }
-
- public void start() throws KattaException {
- LOG.debug("Starting zk client...");
- _zkClient.getEventLock().lock();
- try {
- if (!_zkClient.isStarted()) {
- _zkClient.start(30000);
- }
- _zkClient.subscribeReconnects(new ReconnectListener());
- } finally {
- _zkClient.getEventLock().unlock();
- }
- startRpcServer(_configuration.getStartPort());
- _metaData = new LoadTestNodeMetaData();
- _metaData.setHost(getRpcHostName());
- _metaData.setPort(getRpcServerPort());
- announceTestSearcher(_metaData);
- }
-
- void announceTestSearcher(LoadTestNodeMetaData metaData) throws KattaException {
- LOG.info("Announcing new node.");
- if (_currentNodeName != null) {
- _zkClient.deleteIfExists(_currentNodeName);
- }
- _currentNodeName = _zkClient.create(ZkPathes.LOADTEST_NODES + "/node-", metaData, CreateMode.EPHEMERAL_SEQUENTIAL);
- }
-
- private void unregisterNode() {
- LOG.info("Unregistering node.");
- if (_currentNodeName != null) {
- try {
- _zkClient.deleteIfExists(_currentNodeName);
- } catch (KattaException e) {
- LOG.error("Couldn't unregister node.", e);
- }
- _currentNodeName = null;
- }
- }
-
- public void shutdown() {
- _shutdownLock.lock();
- try {
- if (_shutdown) {
- return;
- }
- _shutdown = true;
- stopRpcServer();
- if (_executorService != null) {
- _executorService.shutdown();
- try {
- _executorService.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e1) {
- // ignore
- }
- }
-
- _zkClient.close();
- } finally {
- _shutdownLock.unlock();
- }
- }
-
- @Override
- public void initTest(int queryRate, final String[] indexNames, final String[] queryStrings, final int count) {
- _queryRate = queryRate;
- _queryStrings = queryStrings;
- _count = count;
- _indexNames = indexNames;
- unregisterNode();
- }
-
- @Override
- public void startTest() {
- int threads = Math.max(1, (_queryRate - 1) / 3 + 1);
- int testDelay = 1000 * threads / _queryRate;
-
- LOG.info("Requested to run test at " + _queryRate + " queries per second using " + threads
- + " threads and a test delay of " + testDelay + "ms.");
-
- _executorService = Executors.newScheduledThreadPool(threads);
- _statistics = new Vector<LoadTestQueryResult>();
- for (int i = 0; i < threads; i++) {
- _executorService.schedule(new TestSearcherRunnable(testDelay, _count, _indexNames, _queryStrings), _random
- .nextInt(testDelay), TimeUnit.MILLISECONDS);
- }
- }
-
- @Override
- public void stopTest() {
- LOG.info("Requested to stop test.");
- _executorService.shutdown();
- try {
- announceTestSearcher(_metaData);
- } catch (KattaException e) {
- LOG.info("Failed to announce test node.", e);
- }
- }
-
- @Override
- public long getProtocolVersion(String arg0, long arg1) throws IOException {
- return 0;
- }
-
- @Override
- protected void setup() {
- // do nothing
- }
-
- @Override
- public LoadTestQueryResult[] getResults() {
- try {
- _executorService.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- LoadTestQueryResult result[] = new LoadTestQueryResult[_statistics.size()];
- for (int i = 0; i < result.length; i++) {
- result[i] = _statistics.get(i);
- }
-
- return result;
- }
+// final static Logger LOG = Logger.getLogger(LoadTestNode.class);
+//
+// private ZKClient _zkClient;
+// ScheduledExecutorService _executorService;
+// private List<LoadTestQueryResult> _statistics;
+//
+// private Lock _shutdownLock = new ReentrantLock(true);
+// private volatile boolean _shutdown = false;
+// LoadTestNodeConfiguration _configuration;
+// LoadTestNodeMetaData _metaData;
+// IClient _client = new Client();
+// private String _currentNodeName;
+// private Random _random = new Random(System.currentTimeMillis());
+//
+// private int _queryRate;
+//
+// private int _count;
+//
+// private String[] _indexNames;
+//
+// private String[] _queryStrings;
+//
+// private final class TestSearcherRunnable implements Runnable {
+// private int _count;
+// private String[] _indexNames;
+// private String[] _queryStrings;
+// private int _queryIndex;
+// private int _testDelay;
+//
+// TestSearcherRunnable(int testDelay, int count, String[] indexNames, String[] queryStrings) {
+// _count = count;
+// _indexNames = indexNames;
+// _queryStrings = queryStrings;
+// _testDelay = testDelay;
+// _queryIndex = _random.nextInt(_queryStrings.length);
+// LOG.info("Starting dictionary search at index " + _queryIndex);
+// }
+//
+// @SuppressWarnings("deprecation")
+// @Override
+// public void run() {
+// // TODO PVo search for different terms
+// long startTime = System.currentTimeMillis();
+// String queryString = _queryStrings[_queryIndex];
+// _queryIndex = (_queryIndex + 1) % _queryStrings.length;
+// try {
+// _client.search(new Query(queryString), _indexNames, _count);
+// long endTime = System.currentTimeMillis();
+// _statistics.add(new LoadTestQueryResult(startTime, endTime, queryString, getRpcHostName() + ":"
+// + getRpcServerPort()));
+// } catch (KattaException e) {
+// _statistics.add(new LoadTestQueryResult(startTime, -1, queryString, getRpcHostName() + ":"
+// + getRpcServerPort()));
+// LOG.error("Search failed.", e);
+// }
+// long endTime = System.currentTimeMillis();
+// int testDelay = Math.max(0, (int) (_testDelay - (endTime - startTime)));
+// _executorService.schedule(this, _random.nextInt(testDelay * 2), TimeUnit.MILLISECONDS);
+// }
+// }
+//
+// class ReconnectListener implements IZkReconnectListener {
+//
+// @Override
+// public void handleNewSession() throws Exception {
+// LOG.info("Reconnecting load test node.");
+// announceTestSearcher(_metaData);
+// }
+//
+// @Override
+// public void handleStateChanged(KeeperState state) throws Exception {
+// // do nothing
+// }
+// }
+//
+// public LoadTestNode(final ZKClient zkClient, LoadTestNodeConfiguration configuration) throws KattaException {
+// _zkClient = zkClient;
+// _configuration = configuration;
+// }
+//
+// public void start() throws KattaException {
+// LOG.debug("Starting zk client...");
+// _zkClient.getEventLock().lock();
+// try {
+// if (!_zkClient.isStarted()) {
+// _zkClient.start(30000);
+// }
+// _zkClient.subscribeReconnects(new ReconnectListener());
+// } finally {
+// _zkClient.getEventLock().unlock();
+// }
+// startRpcServer(_configuration.getStartPort());
+// _metaData = new LoadTestNodeMetaData();
+// _metaData.setHost(getRpcHostName());
+// _metaData.setPort(getRpcServerPort());
+// announceTestSearcher(_metaData);
+// }
+//
+// void announceTestSearcher(LoadTestNodeMetaData metaData) throws KattaException {
+// LOG.info("Announcing new node.");
+// if (_currentNodeName != null) {
+// _zkClient.deleteIfExists(_currentNodeName);
+// }
+// _currentNodeName = _zkClient.create(ZkPathes.LOADTEST_NODES + "/node-", metaData, CreateMode.EPHEMERAL_SEQUENTIAL);
+// }
+//
+// private void unregisterNode() {
+// LOG.info("Unregistering node.");
+// if (_currentNodeName != null) {
+// try {
+// _zkClient.deleteIfExists(_currentNodeName);
+// } catch (KattaException e) {
+// LOG.error("Couldn't unregister node.", e);
+// }
+// _currentNodeName = null;
+// }
+// }
+//
+// public void shutdown() {
+// _shutdownLock.lock();
+// try {
+// if (_shutdown) {
+// return;
+// }
+// _shutdown = true;
+// stopRpcServer();
+// if (_executorService != null) {
+// _executorService.shutdown();
+// try {
+// _executorService.awaitTermination(10, TimeUnit.SECONDS);
+// } catch (InterruptedException e1) {
+// // ignore
+// }
+// }
+//
+// _zkClient.close();
+// } finally {
+// _shutdownLock.unlock();
+// }
+// }
+//
+// @Override
+// public void initTest(int queryRate, final String[] indexNames, final String[] queryStrings, final int count) {
+// _queryRate = queryRate;
+// _queryStrings = queryStrings;
+// _count = count;
+// _indexNames = indexNames;
+// unregisterNode();
+// }
+//
+// @Override
+// public void startTest() {
+// int threads = Math.max(1, (_queryRate - 1) / 3 + 1);
+// int testDelay = 1000 * threads / _queryRate;
+//
+// LOG.info("Requested to run test at " + _queryRate + " queries per second using " + threads
+// + " threads and a test delay of " + testDelay + "ms.");
+//
+// _executorService = Executors.newScheduledThreadPool(threads);
+// _statistics = new Vector<LoadTestQueryResult>();
+// for (int i = 0; i < threads; i++) {
+// _executorService.schedule(new TestSearcherRunnable(testDelay, _count, _indexNames, _queryStrings), _random
+// .nextInt(testDelay), TimeUnit.MILLISECONDS);
+// }
+// }
+//
+// @Override
+// public void stopTest() {
+// LOG.info("Requested to stop test.");
+// _executorService.shutdown();
+// try {
+// announceTestSearcher(_metaData);
+// } catch (KattaException e) {
+// LOG.info("Failed to announce test node.", e);
+// }
+// }
+//
+// @Override
+// public long getProtocolVersion(String arg0, long arg1) throws IOException {
+// return 0;
+// }
+//
+// @Override
+// protected void setup() {
+// // do nothing
+// }
+//
+// @Override
+// public LoadTestQueryResult[] getResults() {
+// try {
+// _executorService.awaitTermination(10, TimeUnit.SECONDS);
+// } catch (InterruptedException e) {
+// Thread.currentThread().interrupt();
+// }
+//
+// LoadTestQueryResult result[] = new LoadTestQueryResult[_statistics.size()];
+// for (int i = 0; i < result.length; i++) {
+// result[i] = _statistics.get(i);
+// }
+//
+// return result;
+// }
}
Modified: trunk/src/main/java/net/sf/katta/loadtest/LoadTestStarter.java
===================================================================
--- trunk/src/main/java/net/sf/katta/loadtest/LoadTestStarter.java 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/src/main/java/net/sf/katta/loadtest/LoadTestStarter.java 2009-07-01 20:49:13 UTC (rev 478)
@@ -15,251 +15,221 @@
*/
package net.sf.katta.loadtest;
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import net.sf.katta.util.KattaException;
-import net.sf.katta.zk.IZkChildListener;
-import net.sf.katta.zk.IZkReconnectListener;
-import net.sf.katta.zk.ZKClient;
-import net.sf.katta.zk.ZkPathes;
-
-import org.apache.commons.math.stat.descriptive.StorelessUnivariateStatistic;
-import org.apache.commons.math.stat.descriptive.moment.Mean;
-import org.apache.commons.math.stat.descriptive.moment.StandardDeviation;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
public class LoadTestStarter {
- static final Logger LOG = Logger.getLogger(LoadTestStarter.class);
-
- ZKClient _zkClient;
- private Map<String, LoadTestNodeMetaData> _testNodes = new HashMap<String, LoadTestNodeMetaData>();
- private int _numberOfTesterNodes;
- private int _startRate;
- private int _endRate;
- private int _step;
-
- ChildListener _childListener;
-
- private String[] _indexNames;
- private String _queryFile;
- private int _count;
- private int _runTime;
- private Writer _statisticsWriter;
- private Writer _resultWriter;
-
- class ChildListener implements IZkChildListener {
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws KattaException {
- checkNodes(currentChilds);
- }
- }
-
- class ReconnectListener implements IZkReconnectListener {
-
- @Override
- public void handleNewSession() throws Exception {
- // do nothing
- }
-
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- if (state == KeeperState.SyncConnected) {
- LOG.info("Reconnecting test starter.");
- checkNodes(_zkClient.getChildren(ZkPathes.LOADTEST_NODES));
- }
- }
- }
-
- public LoadTestStarter(final ZKClient zkClient, int nodes, int startRate, int endRate, int step, int runTime,
- String[] indexNames, String queryFile, int count) throws KattaException {
- _zkClient = zkClient;
- _numberOfTesterNodes = nodes;
- _startRate = startRate;
- _endRate = endRate;
- _step = step;
- _indexNames = indexNames;
- _queryFile = queryFile;
- _count = count;
- _runTime = runTime;
- try {
- long currentTime = System.currentTimeMillis();
- _statisticsWriter = new OutputStreamWriter(new FileOutputStream("build/load-test-log-" + currentTime + ".log"));
- _resultWriter = new OutputStreamWriter(new FileOutputStream("build/load-test-results-" + currentTime + ".log"));
- } catch (FileNotFoundException e) {
- throw new KattaException("Failed to create statistics file.", e);
- }
- }
-
- public void start() throws KattaException {
- LOG.debug("Starting zk client...");
- _zkClient.getEventLock().lock();
- try {
- if (!_zkClient.isStarted()) {
- _zkClient.start(30000);
- }
- _zkClient.subscribeReconnects(new ReconnectListener());
- _childListener = new ChildListener();
- _zkClient.subscribeChildChanges(ZkPathes.LOADTEST_NODES, _childListener);
- checkNodes(_zkClient.getChildren(ZkPathes.LOADTEST_NODES));
- } finally {
- _zkClient.getEventLock().unlock();
- }
- }
-
- void checkNodes(List<String> children) throws KattaException {
- synchronized (_testNodes) {
- LOG.info("Nodes found: " + children);
-
- Set<String> obsoleteNodes = new HashSet<String>(_testNodes.keySet());
- obsoleteNodes.removeAll(children);
- for (String obsoleteNode : obsoleteNodes) {
- LOG.info("Lost connection to " + obsoleteNode);
- _testNodes.remove(obsoleteNode);
- }
-
- for (String child : children) {
- if (!_testNodes.containsKey(child)) {
- try {
- LoadTestNodeMetaData metaData = new LoadTestNodeMetaData();
- _zkClient.readData(ZkPathes.LOADTEST_NODES + "/" + child, metaData);
- LOG.info("New test node on " + metaData.getHost() + ":" + metaData.getPort());
- _testNodes.put(child, metaData);
- } catch (KattaException e) {
- LOG.info("Could not read meta data of load test node: " + child + ". It probably disappeared.");
- }
- }
- }
- if (_testNodes.size() >= _numberOfTesterNodes) {
- startTest();
- }
- }
- }
-
- private void startTest() throws KattaException {
- List<LoadTestNodeMetaData> testers = new ArrayList<LoadTestNodeMetaData>(_testNodes.values());
- List<ILoadTestNode> testNodes = new ArrayList<ILoadTestNode>();
- for (int i = 0; i < _numberOfTesterNodes; i++) {
- try {
- testNodes.add((ILoadTestNode) RPC.getProxy(ILoadTestNode.class, 0, new InetSocketAddress(testers.get(i)
- .getHost(), testers.get(i).getPort()), new Configuration()));
- } catch (IOException e) {
- throw new KattaException("Failed to start tests.", e);
- }
- }
- _zkClient.unsubscribeAll();
- for (int queryRate = _startRate; queryRate <= _endRate; queryRate += _step) {
- LOG.info("Executing tests at query rate: " + queryRate + " queries per second.");
- int numberOfNodes = Math.min(testNodes.size(), queryRate);
- LOG.info("Using " + numberOfNodes + " load test nodes for this test.");
- List<ILoadTestNode> nodesForTest = testNodes.subList(0, numberOfNodes);
-
- int remainingQueryRate = queryRate;
- int remainingNodes = numberOfNodes;
- String[] queries;
- try {
- queries = readQueries(new FileInputStream(_queryFile));
- } catch (IOException e) {
- throw new KattaException("Failed to read query file " + _queryFile + ".", e);
- }
- for (ILoadTestNode testNode : nodesForTest) {
- int queryRateForNode = remainingQueryRate / remainingNodes;
- LOG.info("Initializing test on node using query rate: " + queryRateForNode + " queries per second.");
- testNode.initTest(queryRateForNode, _indexNames, queries, _count);
- --remainingNodes;
- remainingQueryRate -= queryRateForNode;
- }
- for (ILoadTestNode testNode : nodesForTest) {
- LOG.info("Starting test on node.");
- testNode.startTest();
- }
- try {
- Thread.sleep(_runTime);
- } catch (InterruptedException e) {
- // ignore
- }
- LOG.info("Stopping all tests...");
- for (ILoadTestNode testNode : nodesForTest) {
- testNode.stopTest();
- }
- LOG.info("Collecting results...");
- List<LoadTestQueryResult> results = new ArrayList<LoadTestQueryResult>();
- for (ILoadTestNode testNode : nodesForTest) {
- LoadTestQueryResult[] nodeResults = testNode.getResults();
- for (LoadTestQueryResult result : nodeResults) {
- results.add(result);
- }
- }
- LOG.info("Received " + results.size() + " queries, expected " + queryRate * _runTime / 1000);
- try {
- StorelessUnivariateStatistic timeStandardDeviation = new StandardDeviation();
- StorelessUnivariateStatistic timeMean = new Mean();
- int errors = 0;
-
- for (LoadTestQueryResult result : results) {
- long elapsedTime = result.getEndTime() > 0 ? result.getEndTime() - result.getStartTime() : -1;
- _statisticsWriter.write(queryRate + "\t" + result.getNodeId() + "\t" + result.getStartTime() + "\t"
- + result.getEndTime() + "\t" + elapsedTime + "\t" + result.getQuery() + "\n");
- if (elapsedTime != -1) {
- timeStandardDeviation.increment(elapsedTime);
- timeMean.increment(elapsedTime);
- } else {
- ++errors;
- }
- }
- _resultWriter.write(queryRate + "\t" + ((double) results.size() * 1000 / _runTime) + "\t" + errors + "\t"
- + timeMean.getResult() + "\t" + timeStandardDeviation.getResult() + "\n");
- } catch (IOException e) {
- throw new KattaException("Failed to write statistics data.", e);
- }
- }
- try {
- _statisticsWriter.close();
- _resultWriter.close();
- } catch (IOException e) {
- LOG.warn("Failed to close statistics file.");
- }
- shutdown();
- }
-
- static String[] readQueries(InputStream inputStream) throws IOException {
- BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
- List<String> lines = new ArrayList<String>();
- String line;
- while ((line = reader.readLine()) != null) {
- line = line.trim();
- if (!line.equals("")) {
- lines.add(line);
- }
- }
- return lines.toArray(new String[lines.size()]);
- }
-
- public void shutdown() {
- _zkClient.getEventLock().lock();
- try {
- _zkClient.unsubscribeAll();
- _zkClient.close();
- } finally {
- _zkClient.getEventLock().unlock();
- }
- }
+// static final Logger LOG = Logger.getLogger(LoadTestStarter.class);
+//
+// ZKClient _zkClient;
+// private Map<String, LoadTestNodeMetaData> _testNodes = new HashMap<String, LoadTestNodeMetaData>();
+// private int _numberOfTesterNodes;
+// private int _startRate;
+// private int _endRate;
+// private int _step;
+//
+// ChildListener _childListener;
+//
+// private String[] _indexNames;
+// private String _queryFile;
+// private int _count;
+// private int _runTime;
+// private Writer _statisticsWriter;
+// private Writer _resultWriter;
+//
+// class ChildListener implements IZkChildListener {
+// @Override
+// public void handleChildChange(String parentPath, List<String> currentChilds) throws KattaException {
+// checkNodes(currentChilds);
+// }
+// }
+//
+// class ReconnectListener implements IZkReconnectListener {
+//
+// @Override
+// public void handleNewSession() throws Exception {
+// // do nothing
+// }
+//
+// @Override
+// public void handleStateChanged(KeeperState state) throws Exception {
+// if (state == KeeperState.SyncConnected) {
+// LOG.info("Reconnecting test starter.");
+// checkNodes(_zkClient.getChildren(ZkPathes.LOADTEST_NODES));
+// }
+// }
+// }
+//
+// public LoadTestStarter(final ZKClient zkClient, int nodes, int startRate, int endRate, int step, int runTime,
+// String[] indexNames, String queryFile, int count) throws KattaException {
+// _zkClient = zkClient;
+// _numberOfTesterNodes = nodes;
+// _startRate = startRate;
+// _endRate = endRate;
+// _step = step;
+// _indexNames = indexNames;
+// _queryFile = queryFile;
+// _count = count;
+// _runTime = runTime;
+// try {
+// long currentTime = System.currentTimeMillis();
+// _statisticsWriter = new OutputStreamWriter(new FileOutputStream("build/load-test-log-" + currentTime + ".log"));
+// _resultWriter = new OutputStreamWriter(new FileOutputStream("build/load-test-results-" + currentTime + ".log"));
+// } catch (FileNotFoundException e) {
+// throw new KattaException("Failed to create statistics file.", e);
+// }
+// }
+//
+// public void start() throws KattaException {
+// LOG.debug("Starting zk client...");
+// _zkClient.getEventLock().lock();
+// try {
+// if (!_zkClient.isStarted()) {
+// _zkClient.start(30000);
+// }
+// _zkClient.subscribeReconnects(new ReconnectListener());
+// _childListener = new ChildListener();
+// _zkClient.subscribeChildChanges(ZkPathes.LOADTEST_NODES, _childListener);
+// checkNodes(_zkClient.getChildren(ZkPathes.LOADTEST_NODES));
+// } finally {
+// _zkClient.getEventLock().unlock();
+// }
+// }
+//
+// void checkNodes(List<String> children) throws KattaException {
+// synchronized (_testNodes) {
+// LOG.info("Nodes found: " + children);
+//
+// Set<String> obsoleteNodes = new HashSet<String>(_testNodes.keySet());
+// obsoleteNodes.removeAll(children);
+// for (String obsoleteNode : obsoleteNodes) {
+// LOG.info("Lost connection to " + obsoleteNode);
+// _testNodes.remove(obsoleteNode);
+// }
+//
+// for (String child : children) {
+// if (!_testNodes.containsKey(child)) {
+// try {
+// LoadTestNodeMetaData metaData = new LoadTestNodeMetaData();
+// _zkClient.readData(ZkPathes.LOADTEST_NODES + "/" + child, metaData);
+// LOG.info("New test node on " + metaData.getHost() + ":" + metaData.getPort());
+// _testNodes.put(child, metaData);
+// } catch (KattaException e) {
+// LOG.info("Could not read meta data of load test node: " + child + ". It probably disappeared.");
+// }
+// }
+// }
+// if (_testNodes.size() >= _numberOfTesterNodes) {
+// startTest();
+// }
+// }
+// }
+//
+// private void startTest() throws KattaException {
+// List<LoadTestNodeMetaData> testers = new ArrayList<LoadTestNodeMetaData>(_testNodes.values());
+// List<ILoadTestNode> testNodes = new ArrayList<ILoadTestNode>();
+// for (int i = 0; i < _numberOfTesterNodes; i++) {
+// try {
+// testNodes.add((ILoadTestNode) RPC.getProxy(ILoadTestNode.class, 0, new InetSocketAddress(testers.get(i)
+// .getHost(), testers.get(i).getPort()), new Configuration()));
+// } catch (IOException e) {
+// throw new KattaException("Failed to start tests.", e);
+// }
+// }
+// _zkClient.unsubscribeAll();
+// for (int queryRate = _startRate; queryRate <= _endRate; queryRate += _step) {
+// LOG.info("Executing tests at query rate: " + queryRate + " queries per second.");
+// int numberOfNodes = Math.min(testNodes.size(), queryRate);
+// LOG.info("Using " + numberOfNodes + " load test nodes for this test.");
+// List<ILoadTestNode> nodesForTest = testNodes.subList(0, numberOfNodes);
+//
+// int remainingQueryRate = queryRate;
+// int remainingNodes = numberOfNodes;
+// String[] queries;
+// try {
+// queries = readQueries(new FileInputStream(_queryFile));
+// } catch (IOException e) {
+// throw new KattaException("Failed to read query file " + _queryFile + ".", e);
+// }
+// for (ILoadTestNode testNode : nodesForTest) {
+// int queryRateForNode = remainingQueryRate / remainingNodes;
+// LOG.info("Initializing test on node using query rate: " + queryRateForNode + " queries per second.");
+// testNode.initTest(queryRateForNode, _indexNames, queries, _count);
+// --remainingNodes;
+// remainingQueryRate -= queryRateForNode;
+// }
+// for (ILoadTestNode testNode : nodesForTest) {
+// LOG.info("Starting test on node.");
+// testNode.startTest();
+// }
+// try {
+// Thread.sleep(_runTime);
+// } catch (InterruptedException e) {
+// // ignore
+// }
+// LOG.info("Stopping all tests...");
+// for (ILoadTestNode testNode : nodesForTest) {
+// testNode.stopTest();
+// }
+// LOG.info("Collecting results...");
+// List<LoadTestQueryResult> results = new ArrayList<LoadTestQueryResult>();
+// for (ILoadTestNode testNode : nodesForTest) {
+// LoadTestQueryResult[] nodeResults = testNode.getResults();
+// for (LoadTestQueryResult result : nodeResults) {
+// results.add(result);
+// }
+// }
+// LOG.info("Received " + results.size() + " queries, expected " + queryRate * _runTime / 1000);
+// try {
+// StorelessUnivariateStatistic timeStandardDeviation = new StandardDeviation();
+// StorelessUnivariateStatistic timeMean = new Mean();
+// int errors = 0;
+//
+// for (LoadTestQueryResult result : results) {
+// long elapsedTime = result.getEndTime() > 0 ? result.getEndTime() - result.getStartTime() : -1;
+// _statisticsWriter.write(queryRate + "\t" + result.getNodeId() + "\t" + result.getStartTime() + "\t"
+// + result.getEndTime() + "\t" + elapsedTime + "\t" + result.getQuery() + "\n");
+// if (elapsedTime != -1) {
+// timeStandardDeviation.increment(elapsedTime);
+// timeMean.increment(elapsedTime);
+// } else {
+// ++errors;
+// }
+// }
+// _resultWriter.write(queryRate + "\t" + ((double) results.size() * 1000 / _runTime) + "\t" + errors + "\t"
+// + timeMean.getResult() + "\t" + timeStandardDeviation.getResult() + "\n");
+// } catch (IOException e) {
+// throw new KattaException("Failed to write statistics data.", e);
+// }
+// }
+// try {
+// _statisticsWriter.close();
+// _resultWriter.close();
+// } catch (IOException e) {
+// LOG.warn("Failed to close statistics file.");
+// }
+// shutdown();
+// }
+//
+// static String[] readQueries(InputStream inputStream) throws IOException {
+// BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+// List<String> lines = new ArrayList<String>();
+// String line;
+// while ((line = reader.readLine()) != null) {
+// line = line.trim();
+// if (!line.equals("")) {
+// lines.add(line);
+// }
+// }
+// return lines.toArray(new String[lines.size()]);
+// }
+//
+// public void shutdown() {
+// _zkClient.getEventLock().lock();
+// try {
+// _zkClient.unsubscribeAll();
+// _zkClient.close();
+// } finally {
+// _zkClient.getEventLock().unlock();
+// }
+// }
}
Modified: trunk/src/main/java/net/sf/katta/master/DistributeShardsThread.java
===================================================================
--- trunk/src/main/java/net/sf/katta/master/DistributeShardsThread.java 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/src/main/java/net/sf/katta/master/DistributeShardsThread.java 2009-07-01 20:49:13 UTC (rev 478)
@@ -36,12 +36,12 @@
import net.sf.katta.index.IndexMetaData;
import net.sf.katta.index.IndexMetaData.IndexState;
import net.sf.katta.node.NodeMetaData;
-import net.sf.katta.node.BaseNode.NodeState;
+import net.sf.katta.node.Node.NodeState;
import net.sf.katta.util.CollectionUtil;
import net.sf.katta.util.KattaException;
+import net.sf.katta.util.ZkConfiguration;
import net.sf.katta.zk.IZkChildListener;
import net.sf.katta.zk.ZKClient;
-import net.sf.katta.zk.ZkPathes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -55,6 +55,7 @@
protected final static Logger LOG = Logger.getLogger(DistributeShardsThread.class);
+ private final ZkConfiguration _conf;
private final ZKClient _zkClient;
private final IDeployPolicy _deployPolicy;
private final long _safeModeMaxTime;
@@ -67,10 +68,16 @@
protected final List<IndexStateListener> _indexStateListeners = new CopyOnWriteArrayList<IndexStateListener>();
public DistributeShardsThread(final ZKClient zkClient, final IDeployPolicy deployPolicy, final long safeModeMaxTime) {
+ this(zkClient, deployPolicy, safeModeMaxTime, true);
+ }
+
+ public DistributeShardsThread(final ZKClient zkClient, final IDeployPolicy deployPolicy, final long safeModeMaxTime,
+ boolean isDaemon) {
setDaemon(true);
setName(getClass().getSimpleName());
_deployPolicy = deployPolicy;
_zkClient = zkClient;
+ _conf = zkClient.getConfig();
// try {
// _zkClient.getEventLock().lock();
// _zkClient.subscribeReconnects(this);
@@ -218,7 +225,7 @@
for (String node : nodes) {
NodeMetaData nodeMetaData = new NodeMetaData();
try {
- _zkClient.readData(ZkPathes.getNodePath(node), nodeMetaData);
+ _zkClient.readData(_conf.getZKNodePath(node), nodeMetaData);
if (nodeMetaData.getState() == NodeState.STARTING || nodeMetaData.getState() == NodeState.RECONNECTING) {
return true;
}
@@ -248,9 +255,9 @@
private Set<String> getIndexesInState(IndexState indexState) throws KattaException {
final Set<String> indexes = new HashSet<String>();
- for (final String index : _zkClient.getChildren(ZkPathes.INDEXES)) {
+ for (final String index : _zkClient.getChildren(_conf.getZKIndicesPath())) {
final IndexMetaData indexMetaData = new IndexMetaData();
- _zkClient.readData(ZkPathes.getIndexPath(index), indexMetaData);
+ _zkClient.readData(_conf.getZKIndexPath(index), indexMetaData);
if (indexMetaData.getState() == indexState) {
indexes.add(index);
}
@@ -279,8 +286,8 @@
private Set<String> getUnderreplicatedIndexes(int nodeCount) throws KattaException {
final Set<String> underreplicatedIndexes = new HashSet<String>();
- for (final String index : _zkClient.getChildren(ZkPathes.INDEXES)) {
- final String indexZkPath = ZkPathes.getIndexPath(index);
+ for (final String index : _zkClient.getChildren(_conf.getZKIndicesPath())) {
+ final String indexZkPath = _conf.getZKIndexPath(index);
final IndexMetaData indexMetaData = new IndexMetaData();
_zkClient.readData(indexZkPath, indexMetaData);
if (indexMetaData.getState() != IndexState.ERROR && indexMetaData.getState() != IndexState.DEPLOYING
@@ -302,8 +309,8 @@
private Set<String> getOverreplicatedIndexes() throws KattaException {
final Set<String> overreplicatedIndexes = new HashSet<String>();
- for (final String index : _zkClient.getChildren(ZkPathes.INDEXES)) {
- final String indexZkPath = ZkPathes.getIndexPath(index);
+ for (final String index : _zkClient.getChildren(_conf.getZKIndicesPath())) {
+ final String indexZkPath = _conf.getZKIndexPath(index);
final IndexMetaData indexMetaData = new IndexMetaData();
_zkClient.readData(indexZkPath, indexMetaData);
if (indexMetaData.getState() != IndexState.ERROR && indexMetaData.getState() != IndexState.DEPLOYING
@@ -353,9 +360,9 @@
for (final String indexName : removedIndexes) {
final List<String> nodes = _zkClient.getKnownNodes();
for (final String node : nodes) {
- final List<String> shards = _zkClient.getChildren(ZkPathes.getNode2ShardRootPath(node));
+ final List<String> shards = _zkClient.getChildren(_conf.getZKNodeToShardPath(node));
for (final String shard : shards) {
- final String node2ShardPath = ZkPathes.getNode2ShardPath(node, shard);
+ final String node2ShardPath = _conf.getZKNodeToShardPath(node, shard);
final AssignedShard shardWritable = new AssignedShard();
_zkClient.readData(node2ShardPath, shardWritable);
if (shardWritable.getIndexName().equalsIgnoreCase(indexName)) {
@@ -385,10 +392,10 @@
LOG.info(state.name().toLowerCase() + " following indexes: " + affectedIndexes);
for (final String index : affectedIndexes) {
- final String indexZkPath = ZkPathes.getIndexPath(index);
+ final String indexZkPath = _conf.getZKIndexPath(index);
final IndexMetaData indexMetaData = new IndexMetaData();
try {
- _zkClient.readData(ZkPathes.getIndexPath(index), indexMetaData);
+ _zkClient.readData(_conf.getZKIndexPath(index), indexMetaData);
LOG.info(state.name().toLowerCase() + " shards for index '" + index + "' (" + indexMetaData.getState() + ")");
final Map<String, AssignedShard> shard2AssignedShardMap = readShardsFromFs(index, indexMetaData);
@@ -411,24 +418,24 @@
}
private void distributeIndexShards(final String index, final IndexMetaData indexMD, final Set<String> indexShards,
- final Map<String, AssignedShard> shard2AssignedShardMap, Collection liveNodes) throws KattaException {
+ final Map<String, AssignedShard> shard2AssignedShardMap, Collection<String> liveNodes) throws KattaException {
// cleanup/undeploy failed shards
for (final String shard : indexShards) {
- final String shard2ErrorRootPath = ZkPathes.getShard2ErrorRootPath(shard);
+ final String shard2ErrorRootPath = _conf.getZKShardToErrorPath(shard);
if (_zkClient.exists(shard2ErrorRootPath)) {
final List<String> nodesWithFailedShard = _zkClient.getChildren(shard2ErrorRootPath);
for (final String node : nodesWithFailedShard) {
- _zkClient.delete(ZkPathes.getShard2ErrorPath(shard, node));
- _zkClient.deleteIfExists(ZkPathes.getNode2ShardPath(node, shard));
+ _zkClient.delete(_conf.getZKShardToErrorPath(shard, node));
+ _zkClient.deleteIfExists(_conf.getZKNodeToShardPath(node, shard));
}
}
}
// add shards to zk
for (final String shard : indexShards) {
- final String shardZkPath = ZkPathes.getShardPath(index, shard);
- final String shard2NodeRootPath = ZkPathes.getShard2NodeRootPath(shard);
- final String shard2ErrorRootPath = ZkPathes.getShard2ErrorRootPath(shard);
+ final String shardZkPath = _conf.getZKShardPath(index, shard);
+ final String shard2NodeRootPath = _conf.getZKShardToNodePath(shard);
+ final String shard2ErrorRootPath = _conf.getZKShardToErrorPath(shard);
if (!_zkClient.exists(shardZkPath)) {
_zkClient.create(shardZkPath, shard2AssignedShardMap.get(shard));
}
@@ -456,11 +463,11 @@
final Set<String> indexShards) throws KattaException {
final Map<String, List<String>> shard2NodeNames = new HashMap<String, List<String>>();
for (final String shard : indexShards) {
- final String shard2NodeRootPath = ZkPathes.getShard2NodeRootPath(shard);
+ final String shard2NodeRootPath = zkClient.getConfig().getZKShardToNodePath(shard);
if (zkClient.exists(shard2NodeRootPath)) {
shard2NodeNames.put(shard, zkClient.getChildren(shard2NodeRootPath));
} else {
- shard2NodeNames.put(shard, Collections.EMPTY_LIST);
+ shard2NodeNames.put(shard, Collections.<String>emptyList());
}
}
return shard2NodeNames;
@@ -468,13 +475,13 @@
private Map<String, List<String>> readNode2ShardsMapFromZk(final ZKClient zkClient) throws KattaException {
final Map<String, List<String>> node2ShardNames = new HashMap<String, List<String>>();
- final List<String> nodes = zkClient.getChildren(ZkPathes.NODE_TO_SHARD);
+ final List<String> nodes = zkClient.getChildren(_conf.getZKNodeToShardPath());
for (final String node : nodes) {
- final String node2ShardRootPath = ZkPathes.getNode2ShardRootPath(node);
+ final String node2ShardRootPath = _conf.getZKNodeToShardPath(node);
if (zkClient.exists(node2ShardRootPath)) {
node2ShardNames.put(node, zkClient.getChildren(node2ShardRootPath));
} else {
- node2ShardNames.put(node, Collections.EMPTY_LIST);
+ node2ShardNames.put(node, Collections.<String>emptyList());
}
}
return node2ShardNames;
@@ -484,18 +491,18 @@
final Map<String, AssignedShard> shard2AssignedShardMap) throws KattaException {
final Set<String> nodes = distributionMap.keySet();
for (final String node : nodes) {
- final List<String> existingShards = _zkClient.getChildren(ZkPathes.getNode2ShardRootPath(node));
+ final List<String> existingShards = _zkClient.getChildren(_conf.getZKNodeToShardPath(node));
final List<String> newShards = distributionMap.get(node);
// add new shards
for (final String shard2Deploy : CollectionUtil.getListOfAdded(existingShards, newShards)) {
- final String shard2NodePath = ZkPathes.getNode2ShardPath(node, shard2Deploy);
+ final String shard2NodePath = _conf.getZKNodeToShardPath(node, shard2Deploy);
_zkClient.create(shard2NodePath, shard2AssignedShardMap.get(shard2Deploy));
}
// remove old shards
for (final String shard2Deploy : CollectionUtil.getListOfRemoved(existingShards, newShards)) {
- _zkClient.delete(ZkPathes.getNode2ShardPath(node, shard2Deploy));
+ _zkClient.delete(_conf.getZKNodeToShardPath(node, shard2Deploy));
}
}
}
@@ -611,8 +618,8 @@
LOG.info("start watching index '" + _index + "' (" + _indexMetaData.getState() + ")");
final Set<String> shards = _shards;
for (final String shard : shards) {
- final String shard2NodeRootPath = ZkPathes.getShard2NodeRootPath(shard);
- final String shard2ErrorPath = ZkPathes.getShard2ErrorRootPath(shard);
+ final String shard2NodeRootPath = _conf.getZKShardToNodePath(shard);
+ final String shard2ErrorPath = _conf.getZKShardToErrorPath(shard);
_shardToReplicaCount.put(shard, _zkClient.subscribeChildChanges(shard2NodeRootPath, this).size());
_shardToErrorCount.put(shard, _zkClient.subscribeChildChanges(shard2ErrorPath, this).size());
}
@@ -626,8 +633,8 @@
LOG.info("stop watching index '" + _index + "' (" + _indexMetaData.getState() + ")");
final Set<String> shards = _shards;
for (final String shard : shards) {
- final String shard2NodeRootPath = ZkPathes.getShard2NodeRootPath(shard);
- final String shard2ErrorPath = ZkPathes.getShard2ErrorRootPath(shard);
+ final String shard2NodeRootPath = _conf.getZKShardToNodePath(shard);
+ final String shard2ErrorPath = _conf.getZKShardToErrorPath(shard);
_zkClient.unsubscribeChildChanges(shard2NodeRootPath, this);
_zkClient.unsubscribeChildChanges(shard2ErrorPath, this);
}
@@ -635,10 +642,10 @@
}
public void handleChildChange(final String parentPath, final List<String> currentChilds) throws KattaException {
- final String shard = ZkPathes.getName(parentPath);
- if (parentPath.startsWith(ZkPathes.SHARD_TO_NODE)) {
+ final String shard = _conf.getZKName(parentPath);
+ if (parentPath.startsWith(_conf.getZKShardToNodePath())) {
_shardToReplicaCount.put(shard, currentChilds.size());
- } else if (parentPath.startsWith(ZkPathes.SHARD_TO_ERROR)) {
+ } else if (parentPath.startsWith(_conf.getZKShardToErrorPath())) {
_shardToErrorCount.put(shard, currentChilds.size());
} else {
throw new IllegalStateException("could not associate path " + parentPath);
@@ -718,7 +725,7 @@
LOG
.info("switching index '" + _index + "' from state " + _indexMetaData.getState() + " into state "
+ indexState);
- final String indexZkPath = ZkPathes.getIndexPath(_index);
+ final String indexZkPath = _conf.getZKIndexPath(_index);
_zkClient.readData(indexZkPath, _indexMetaData);
if (indexState == IndexState.ERROR) {
_indexMetaData.setState(indexState, "could not deploy shards properly, please see node logs");
Modified: trunk/src/main/java/net/sf/katta/master/Master.java
===================================================================
--- trunk/src/main/java/net/sf/katta/master/Master.java 2009-07-01 20:47:50 UTC (rev 477)
+++ trunk/src/main/java/net/sf/katta/master/Master.java 2009-07-01 20:49:13 UTC (rev 478)
@@ -23,11 +23,11 @@
import net.sf.katta.util.KattaException;
import net.sf.katta.util.MasterConfiguration;
import net.sf.katta.util.NetworkUtil;
+import net.sf.katta.util.ZkConfiguration;
import net.sf.katta.zk.IZkChildListener;
import net.sf.katta.zk.IZkDataListener;
import net.sf.katta.zk.IZkReconnectListener;
import net.sf.katta.zk.ZKClient;
-import net.sf.katta.zk.ZkPathes;
import org.apache.log4j.Logger;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -37,6 +37,7 @@
protected final static Logger LOG = Logger.getLogger(Master.class);
protected DistributeShardsThread _manageShardThread;
+ protected ZkConfiguration _conf;
protected ZKClient _zkClient;
protected List<String> _nodes = new ArrayList<String>();
@@ -52,11 +53,16 @@
private MasterListener _masterLister;
+ @SuppressWarnings("unchecked")
public Master(final ZKClient zkClient) throws KattaException {
_masterName = NetworkUtil.getLocalhostName() + "_" + UUID.randomUUID().toString();
_indexListener = new IndexListener();
- _nodeListener = new NodeListener();
+ _nodeListener = new NodeListener();
_masterLister = new MasterListener();
+ _conf = zkClient.getConfig();
+ if (!_conf.getZKRootPath().equals(ZkConfiguration.DEFAULT_ROOT_PATH)) {
+ LOG.info("Using ZK root path: " + _conf.getZKRootPath());
+ }
_zkClient = zkClient;
try {
_zkClient.getEventLock().lock();
@@ -64,7 +70,6 @@
} finally {
_zkClient.getEventLock().unlock();
}
-
final MasterConfiguration masterConfiguration = new MasterConfiguration();
final String deployPolicyClassName = masterConfiguration.getDeployPolicy();
IDeployPolicy deployPolicy;
@@ -84,7 +89,7 @@
} else {
safeModeMaxTime = masterConfiguration.getInt(MasterConfiguration...
[truncated message content] |
|
From: <jo...@us...> - 2009-07-01 20:47:54
|
Revision: 477
http://katta.svn.sourceforge.net/katta/?rev=477&view=rev
Author: joa23
Date: 2009-07-01 20:47:50 +0000 (Wed, 01 Jul 2009)
Log Message:
-----------
KATTA-54, makes katta more generic. ATTENTION this is a experimental commit, that makes Katta completely unstable.
The test suite is broken and functionality commented out.
Modified Paths:
--------------
trunk/src/main/java/net/sf/katta/client/Client.java
trunk/src/main/java/net/sf/katta/client/DefaultNodeSelectionPolicy.java
trunk/src/main/java/net/sf/katta/client/DeployClient.java
trunk/src/main/java/net/sf/katta/client/IDeployClient.java
trunk/src/main/java/net/sf/katta/client/INodeSelectionPolicy.java
trunk/src/main/java/net/sf/katta/client/IndexDeployFuture.java
trunk/src/main/java/net/sf/katta/client/ShardAccessException.java
Added Paths:
-----------
trunk/src/main/java/net/sf/katta/client/ClientResult.java
trunk/src/main/java/net/sf/katta/client/ILuceneClient.java
trunk/src/main/java/net/sf/katta/client/IMapFileClient.java
trunk/src/main/java/net/sf/katta/client/INodeExecutor.java
trunk/src/main/java/net/sf/katta/client/IResultPolicy.java
trunk/src/main/java/net/sf/katta/client/IResultReceiver.java
trunk/src/main/java/net/sf/katta/client/IShardProxyManager.java
trunk/src/main/java/net/sf/katta/client/LuceneClient.java
trunk/src/main/java/net/sf/katta/client/MapFileClient.java
trunk/src/main/java/net/sf/katta/client/NodeInteraction.java
trunk/src/main/java/net/sf/katta/client/ResultCompletePolicy.java
trunk/src/main/java/net/sf/katta/client/WorkQueue.java
Removed Paths:
-------------
trunk/src/main/java/net/sf/katta/client/IClient.java
Modified: trunk/src/main/java/net/sf/katta/client/Client.java
===================================================================
--- trunk/src/main/java/net/sf/katta/client/Client.java 2009-07-01 20:41:07 UTC (rev 476)
+++ trunk/src/main/java/net/sf/katta/client/Client.java 2009-07-01 20:47:50 UTC (rev 477)
@@ -16,101 +16,94 @@
package net.sf.katta.client;
import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import net.sf.katta.index.IndexMetaData;
-import net.sf.katta.node.DocumentFrequencyWritable;
-import net.sf.katta.node.Hit;
-import net.sf.katta.node.Hits;
-import net.sf.katta.node.HitsMapWritable;
-import net.sf.katta.node.IQuery;
-import net.sf.katta.node.ISearch;
-import net.sf.katta.node.QueryWritable;
import net.sf.katta.util.CollectionUtil;
import net.sf.katta.util.KattaException;
import net.sf.katta.util.ZkConfiguration;
import net.sf.katta.zk.IZkChildListener;
import net.sf.katta.zk.IZkDataListener;
import net.sf.katta.zk.ZKClient;
-import net.sf.katta.zk.ZkPathes;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.log4j.Logger;
-import org.apache.lucene.analysis.KeywordAnalyzer;
-import org.apache.lucene.queryParser.ParseException;
-import org.apache.lucene.queryParser.QueryParser;
-import org.apache.lucene.search.Query;
/**
- * Default implementation of {@link IClient}.
*
*/
-public class Client implements IClient {
+public class Client implements IShardProxyManager {
protected final static Logger LOG = Logger.getLogger(Client.class);
+ private static final String[] ALL_INDICES = new String[] { "*" };
+ protected final ZkConfiguration _zkConfig;
protected final ZKClient _zkClient;
+ protected final Class<? extends VersionedProtocol> _serverClass;
private final IndexStateListener _indexStateListener = new IndexStateListener();
private final IndexPathListener _indexPathChangeListener = new IndexPathListener();
private final ShardNodeListener _shardNodeListener = new ShardNodeListener();
protected final Map<String, List<String>> _indexToShards = new HashMap<String, List<String>>();
- // TODO: jz remove node proxies if not needed anymore
- protected final Map<String, ISearch> _node2SearchProxyMap = new HashMap<String, ISearch>();
+ protected final Map<String, VersionedProtocol> _node2ProxyMap = new HashMap<String, VersionedProtocol>();
protected final INodeSelectionPolicy _selectionPolicy;
private long _queryCount = 0;
- private final long _start;
+ private final long _startupTime;
private Configuration _hadoopConf = new Configuration();
- public Client(final INodeSelectionPolicy nodeSelectionPolicy) throws KattaException {
- this(nodeSelectionPolicy, new ZkConfiguration());
+ public Client(Class<? extends VersionedProtocol> serverClass) throws KattaException {
+ this(serverClass, new DefaultNodeSelectionPolicy(), new ZkConfiguration());
}
- public Client() throws KattaException {
- this(new DefaultNodeSelectionPolicy(), new ZkConfiguration());
+ public Client(Class<? extends VersionedProtocol> serverClass, final ZkConfiguration config) throws KattaException {
+ this(serverClass, new DefaultNodeSelectionPolicy(), config);
}
- public Client(INodeSelectionPolicy nodeSelectionPolicy, ZkConfiguration zkConfiguration) throws KattaException {
- this(nodeSelectionPolicy, zkConfiguration.getZKServers(), zkConfiguration.getZKClientPort(), zkConfiguration.getZKTickTime());
+
+ public Client(Class<? extends VersionedProtocol> serverClass, final INodeSelectionPolicy nodeSelectionPolicy)
+ throws KattaException {
+ this(serverClass, nodeSelectionPolicy, new ZkConfiguration());
}
-
- public Client(final INodeSelectionPolicy policy, String servers, int port, int timeout) throws KattaException {
+
+ public Client(Class<? extends VersionedProtocol> serverClass, final INodeSelectionPolicy policy,
+ final ZkConfiguration config) throws KattaException {
_hadoopConf.set("ipc.client.timeout", "2500");
_hadoopConf.set("ipc.client.connect.max.retries", "2");
// TODO jz: make configurable
+ _serverClass = serverClass;
_selectionPolicy = policy;
- _zkClient = new ZKClient(servers, port, timeout);
+ _zkConfig = config;
+ _zkClient = new ZKClient(config);
try {
_zkClient.getEventLock().lock();
_zkClient.start(30000);
- List<String> indexes = _zkClient.subscribeChildChanges(ZkPathes.INDEXES, _indexPathChangeListener);
+ List<String> indexes = _zkClient.subscribeChildChanges(config.getZKIndicesPath(), _indexPathChangeListener);
addOrWatchNewIndexes(indexes);
} finally {
_zkClient.getEventLock().unlock();
}
- _start = System.currentTimeMillis();
+ _startupTime = System.currentTimeMillis();
}
+ // --------------- Proxy handling ----------------------
-
protected void updateSelectionPolicy(final String shardName, List<String> nodes) {
List<String> connectedNodes = eastablishNodeProxiesIfNecessary(nodes);
_selectionPolicy.update(shardName, connectedNodes);
@@ -119,9 +112,9 @@
private List<String> eastablishNodeProxiesIfNecessary(List<String> nodes) {
List<String> connectedNodes = new ArrayList<String>(nodes);
for (String node : nodes) {
- if (!_node2SearchProxyMap.containsKey(node)) {
+ if (!_node2ProxyMap.containsKey(node)) {
try {
- _node2SearchProxyMap.put(node, createNodeProxy(node));
+ _node2ProxyMap.put(node, createNodeProxy(node));
} catch (Exception e) {
connectedNodes.remove(node);
LOG.warn("could not create proxy for node '" + node + "' - " + e.getClass().getSimpleName());
@@ -131,7 +124,7 @@
return connectedNodes;
}
- protected ISearch createNodeProxy(final String node) throws IOException {
+ protected VersionedProtocol createNodeProxy(final String node) throws IOException {
LOG.debug("creating proxy for node: " + node);
String[] hostName_port = node.split(":");
@@ -142,7 +135,10 @@
final String hostName = hostName_port[0];
final String port = hostName_port[1];
final InetSocketAddress inetSocketAddress = new InetSocketAddress(hostName, Integer.parseInt(port));
- return (ISearch) RPC.getProxy(ISearch.class, 0L, inetSocketAddress, _hadoopConf);
+ VersionedProtocol proxy = RPC.getProxy(_serverClass, 0L, inetSocketAddress, _hadoopConf);
+ LOG.debug(String.format("Created a proxy %s for %s:%s %s", Proxy.getInvocationHandler(proxy), hostName, port,
+ inetSocketAddress));
+ return proxy;
}
protected void removeIndexes(List<String> indexes) {
@@ -156,7 +152,7 @@
protected void addOrWatchNewIndexes(List<String> indexes) throws KattaException {
for (String index : indexes) {
- String indexZkPath = ZkPathes.getIndexPath(index);
+ String indexZkPath = _zkConfig.getZKIndexPath(index);
IndexMetaData indexMetaData = _zkClient.readData(indexZkPath, IndexMetaData.class);
if (isIndexSearchable(indexMetaData)) {
addIndexForSearching(index, indexZkPath);
@@ -174,7 +170,7 @@
final List<String> shards = _zkClient.getChildren(indexZkPath);
_indexToShards.put(indexName, shards);
for (final String shardName : shards) {
- List<String> nodes = _zkClient.subscribeChildChanges(ZkPathes.getShard2NodeRootPath(shardName),
+ List<String> nodes = _zkClient.subscribeChildChanges(_zkConfig.getZKShardToNodePath(shardName),
_shardNodeListener);
updateSelectionPolicy(shardName, nodes);
}
@@ -182,93 +178,183 @@
protected boolean isIndexSearchable(final IndexMetaData indexMetaData) {
return indexMetaData.getState() == IndexMetaData.IndexState.DEPLOYED
- || indexMetaData.getState() == IndexMetaData.IndexState.REPLICATING;
+ || indexMetaData.getState() == IndexMetaData.IndexState.REPLICATING;
}
- @Deprecated
- /*
- * @deprecated Old api uses IQuery what just transport a string, also uses
- * only a KeywordAnalyzer.
+ // --------------- Distributed calls to servers ----------------------
+
+ /**
+ * Broadcast a method call to all indices. Return all the results in a
+ * Collection.
+ *
+ * @param method
+ * The server's method to call.
+ * @param shardArrayParamIndex
+ * Which parameter of the method call, if any, that should be
+ * replaced with the shards to search. This is an array of Strings,
+ * with a different value for each node / server. Pass in -1 to
+ * disable.
+ * @param args
+ * The arguments to pass to the method when run on the server.
+ * @return
+ * @throws KattaException
*/
- public Hits search(final IQuery query, final String[] indexNames) throws KattaException {
- return search(query, indexNames, Integer.MAX_VALUE);
+ public <T> ClientResult<T> broadcastToAll(long timeout, boolean shutdown, Method method, int shardArrayParamIndex, Object... args) throws KattaException {
+ return broadcastToAll(new ResultCompletePolicy<T>(timeout, shutdown), method, shardArrayParamIndex, args);
}
- public Hits search(final Query query, final String[] indexNames) throws KattaException {
- return search(query, indexNames, Integer.MAX_VALUE);
+ public <T> ClientResult<T> broadcastToAll(IResultPolicy<T> resultPolicy, Method method, int shardArrayParamIndex, Object... args) throws KattaException {
+ return broadcastToShards(resultPolicy, method, shardArrayParamIndex, null, args);
}
- @Deprecated
- /*
- * @deprecated Old api uses IQuery what just transport a string, also uses
- * only a KeywordAnalyzer.
- */
- public Hits search(final IQuery query, final String[] indexNames, final int count) throws KattaException {
- try {
- final QueryParser luceneQueryParser = new QueryParser("field", new KeywordAnalyzer());
- Query luceneQuery = luceneQueryParser.parse(query.getQuery());
- return search(luceneQuery, indexNames, count);
- } catch (ParseException e) {
- throw new KattaException("Unable to parse Query: " + query.getQuery(), e);
+
+ public <T> ClientResult<T> broadcastToIndices(long timeout, boolean shutdown, Method method, int shardArrayIndex, String[] indices, Object... args)
+ throws KattaException {
+ return broadcastToIndices(new ResultCompletePolicy<T>(timeout, shutdown), method, shardArrayIndex, indices, args);
+ }
+
+ public <T> ClientResult<T> broadcastToIndices(IResultPolicy<T> resultPolicy, Method method, int shardArrayIndex, String[] indices, Object... args)
+ throws KattaException {
+ if (indices == null) {
+ indices = ALL_INDICES;
}
+ Map<String, List<String>> nodeShardsMap = getNode2ShardsMap(indices);
+ if (nodeShardsMap.values().isEmpty()) {
+ throw new KattaException("No shards for indices: " + (indices != null ? Arrays.asList(indices).toString() : "null"));
+ }
+ return broadcastInternal(resultPolicy, method, shardArrayIndex, nodeShardsMap, args);
}
- public Hits search(final Query query, final String[] indexNames, final int count) throws KattaException {
- final Map<String, List<String>> nodeShardsMap = getNode2ShardsMap(indexNames);
- final Hits result = new Hits();
- final DocumentFrequencyWritable docFreqs = getDocFrequencies(query, nodeShardsMap);
- List<NodeInteraction> nodeInteractions = new ArrayList<NodeInteraction>();
- for (final String node : nodeShardsMap.keySet()) {
- nodeInteractions.add(new SearchInteraction(node, nodeShardsMap, query, docFreqs, result, count));
+ public <T> ClientResult<T> singlecast(long timeout, boolean shutdown, Method method, int shardArrayParamIndex, String shard, Object... args) throws KattaException {
+ return singlecast(new ResultCompletePolicy<T>(timeout, shutdown), method, shardArrayParamIndex, shard, args);
+ }
+
+ public <T> ClientResult<T> singlecast(IResultPolicy<T> resultPolicy, Method method, int shardArrayParamIndex, String shard, Object... args) throws KattaException {
+ List<String> shards = new ArrayList<String>();
+ shards.add(shard);
+ return broadcastToShards(resultPolicy, method, shardArrayParamIndex, shards, args);
+ }
+
+ public <T> ClientResult<T> broadcastToShards(long timeout, boolean shutdown, Method method, int shardArrayParamIndex, List<String> shards,
+ Object... args) throws KattaException {
+ return broadcastToShards(new ResultCompletePolicy<T>(timeout, shutdown), method, shardArrayParamIndex, shards, args);
+ }
+
+ public <T> ClientResult<T> broadcastToShards(IResultPolicy<T> resultPolicy, Method method, int shardArrayParamIndex, List<String> shards,
+ Object... args) throws KattaException {
+ if (shards == null) {
+ // If no shards specified, search all shards.
+ shards = new ArrayList<String>();
+ for (List<String> indexShards : _indexToShards.values()) {
+ shards.addAll(indexShards);
+ }
}
- execute(nodeInteractions);
+ final Map<String, List<String>> nodeShardsMap = _selectionPolicy.createNode2ShardsMap(shards);
+ if (nodeShardsMap.values().isEmpty()) {
+ throw new KattaException("No shards selected: " + shards);
+ }
+ return broadcastInternal(resultPolicy, method, shardArrayParamIndex, nodeShardsMap, args);
+ }
+ private <T> ClientResult<T> broadcastInternal(IResultPolicy<T> resultPolicy, Method method, int shardArrayParamIndex, Map<String, List<String>> nodeShardsMap,
+ Object... args) throws KattaException {
+ _queryCount++;
+ /*
+ * Validate inputs.
+ */
+ if (method == null || args == null) {
+ throw new IllegalArgumentException("Null method or args!");
+ }
+ Class<?>[] types = method.getParameterTypes();
+ if (args.length != types.length) {
+ throw new IllegalArgumentException("Wrong number of args: found " + args.length + ", expected " + types.length
+ + "!");
+ }
+ for (int i = 0; i < args.length; i++) {
+ if (args[i] != null) {
+ Class<?> from = args[i].getClass();
+ Class<?> to = types[i];
+ if (!to.isAssignableFrom(from) && !(from.isPrimitive() || to.isPrimitive())) {
+ // Assume autoboxing will work.
+ throw new IllegalArgumentException("Incorrect argument type for param " + i + ": expected " + types[i] + "!");
+ }
+ }
+ }
+ if (shardArrayParamIndex > 0) {
+ if (shardArrayParamIndex >= types.length) {
+ throw new IllegalArgumentException("shardArrayParamIndex out of range!");
+ }
+ if (!(types[shardArrayParamIndex]).equals(String[].class)) {
+ throw new IllegalArgumentException("shardArrayParamIndex parameter (" + shardArrayParamIndex
+ + ") is not of type String[]!");
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ for (Map.Entry<String, List<String>> e : _indexToShards.entrySet()) {
+ LOG.trace("_indexToShards " + e.getKey() + " --> " + e.getValue().toString());
+ }
+ for (Map.Entry<String, List<String>> e : nodeShardsMap.entrySet()) {
+ LOG.trace("broadcast using " + e.getKey() + " --> " + e.getValue().toString());
+ }
+ LOG.trace("selection policy = " + _selectionPolicy);
+ }
+
+ /*
+ * Make RPC calls to all nodes in parallel.
+ */
long start = 0;
if (LOG.isDebugEnabled()) {
start = System.currentTimeMillis();
}
- result.sort(count);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Time for sorting: " + (System.currentTimeMillis() - start) + " ms");
+ /*
+ * We don't know what _selectionPolicy built, and multiple threads may write
+ * to map if IO errors occur. This map might be shared across multiple calls
+ * also. So make a copy and synchronize it.
+ */
+ Map<String, List<String>> nodeShardMapCopy = new HashMap<String, List<String>>();
+ Set<String> allShards = new HashSet<String>();
+ for (Map.Entry<String, List<String>> e : nodeShardsMap.entrySet()) {
+ nodeShardMapCopy.put(e.getKey(), new ArrayList<String>(e.getValue()));
+ allShards.addAll(e.getValue());
}
- _queryCount++;
- return result;
- }
+ nodeShardsMap = Collections.synchronizedMap(nodeShardMapCopy);
+ nodeShardMapCopy = null;
- @Deprecated
- public int count(final IQuery query, final String[] indexNames) throws KattaException {
- try {
- final QueryParser luceneQueryParser = new QueryParser("field", new KeywordAnalyzer());
- Query luceneQuery = luceneQueryParser.parse(query.getQuery());
- return count(luceneQuery, indexNames);
- } catch (ParseException e) {
- throw new KattaException("Unable to parse Query: " + query.getQuery(), e);
- }
- }
+ WorkQueue<T> workQueue = new WorkQueue<T>(this, allShards, method, shardArrayParamIndex, args);
- public int count(final Query query, final String[] indexNames) throws KattaException {
- final Map<String, List<String>> nodeShardsMap = getNode2ShardsMap(indexNames);
- final List<Integer> result = new ArrayList<Integer>();
- List<NodeInteraction> nodeInteractions = new ArrayList<NodeInteraction>();
- for (final String node : nodeShardsMap.keySet()) {
- nodeInteractions.add(new GetCountInteraction(node, nodeShardsMap, query, result));
+ for (String node : nodeShardsMap.keySet()) {
+ workQueue.execute(node, nodeShardsMap, 1);
}
- execute(nodeInteractions);
- int resultCount = 0;
- for (final Integer count : result) {
- resultCount += count.intValue();
+ ClientResult<T> results = workQueue.getResults(resultPolicy);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("broadcast(%s(%s), %s) took %d msec for %s", method.getName(), args, nodeShardsMap,
+ (System.currentTimeMillis() - start), results != null ? results : "null"));
}
- return resultCount;
+ return results;
}
- public float getQueryPerMinute() {
- long time = (System.currentTimeMillis() - _start) / (60 * 1000);
- time = Math.max(time, 1);
- return (float) _queryCount / time;
+ // --------------------- IShardManager ----------------------------
+
+ // NodeInteractions will use these methods.
+
+ public VersionedProtocol getProxy(String node) {
+ return _node2ProxyMap.get(node);
}
+ public void nodeFailed(String node, Throwable t) {
+ _node2ProxyMap.remove(node);
+ _selectionPolicy.removeNode(node);
+ }
+
+ public Map<String, List<String>> createNode2ShardsMap(Collection<String> shards) throws ShardAccessException {
+ return _selectionPolicy.createNode2ShardsMap(shards);
+ }
+
+ // -------------------- Node management --------------------
+
private Map<String, List<String>> getNode2ShardsMap(final String[] indexNames) throws KattaException {
String[] indexesToSearchIn = indexNames;
for (String indexName : indexNames) {
@@ -285,67 +371,40 @@
return nodeShardsMap;
}
- public void close() {
- if (_zkClient != null) {
- _zkClient.close();
- Collection<ISearch> proxies = _node2SearchProxyMap.values();
- for (ISearch search : proxies) {
- RPC.stopProxy(search);
- }
- }
- }
-
private List<String> getShardsToSearchIn(String[] indexNames) throws KattaException {
List<String> shards = new ArrayList<String>();
for (String index : indexNames) {
- List<String> shardsForIndex = _indexToShards.get(index);
- if (shardsForIndex == null) {
- throw new KattaException("Index '" + index + "' not deployed on any shard.");
+ List<String> theseShards = _indexToShards.get(index);
+ if (theseShards != null) {
+ List<String> shardsForIndex = _indexToShards.get(index);
+ if (shardsForIndex == null) {
+ throw new KattaException("Index '" + index + "' not deployed on any shard.");
+ }
+ shards.addAll(shardsForIndex);
+ } else {
+ LOG.warn("No shards found for index " + index);
}
- shards.addAll(shardsForIndex);
}
return shards;
}
- private DocumentFrequencyWritable getDocFrequencies(final Query query, final Map<String, List<String>> node2ShardsMap)
- throws KattaException {
- DocumentFrequencyWritable docFreqs = new DocumentFrequencyWritable();
- List<NodeInteraction> nodeInteractions = new ArrayList<NodeInteraction>();
- for (final String node : node2ShardsMap.keySet()) {
- nodeInteractions.add(new GetDocumentFrequencyInteraction(node, node2ShardsMap, query, docFreqs));
+ public double getQueryPerMinute() {
+ double minutes = (System.currentTimeMillis() - _startupTime) / 60000.0;
+ if (minutes > 0.0F) {
+ return _queryCount / minutes;
+ } else {
+ return 0.0F;
}
-
- execute(nodeInteractions);
- return docFreqs;
}
- private void execute(List<NodeInteraction> nodeInteractions) throws KattaException {
- long start = 0;
- if (LOG.isDebugEnabled()) {
- start = System.currentTimeMillis();
- }
- final List<Thread> interactionThreads = new ArrayList<Thread>(nodeInteractions.size());
- for (NodeInteraction nodeInteraction : nodeInteractions) {
- final Thread interactionThread = new Thread(nodeInteraction);
- interactionThreads.add(interactionThread);
- interactionThread.start();
- // TODO jz: use thread pool / Executor
- }
-
- try {
- for (final Thread thread : interactionThreads) {
- thread.join();
+ public void close() {
+ if (_zkClient != null) {
+ _zkClient.close();
+ Collection<VersionedProtocol> proxies = _node2ProxyMap.values();
+ for (VersionedProtocol search : proxies) {
+ RPC.stopProxy(search);
}
- } catch (final InterruptedException e) {
- LOG.warn("Join for search threads interrupted.", e);
}
- for (NodeInteraction nodeInteraction : nodeInteractions) {
- nodeInteraction.checkSuccess();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(nodeInteractions.get(0).getClass().getSimpleName() + " took " + (System.currentTimeMillis() - start)
- + " ms");
- }
}
protected class IndexStateListener implements IZkDataListener<IndexMetaData> {
@@ -355,7 +414,7 @@
}
public void handleDataChange(String dataPath, IndexMetaData metaData) throws KattaException {
- final String indexName = ZkPathes.getName(dataPath);
+ final String indexName = _zkConfig.getZKName(dataPath);
if (isIndexSearchable(metaData)) {
addIndexForSearching(indexName, dataPath);
_zkClient.unsubscribeDataChanges(dataPath, this);
@@ -381,260 +440,19 @@
List<String> removedIndexes = CollectionUtil.getListOfRemoved(indexes, currentIndexes);
removeIndexes(removedIndexes);
}
- }
- public MapWritable getDetails(final Hit hit) throws KattaException {
- return getDetails(hit, null);
}
- public MapWritable getDetails(final Hit hit, final String[] fields) throws KattaException {
- Map<String, List<String>> node2ShardMap;
- String node = hit.getNode();
- List<String> shards = Arrays.asList(hit.getShard());
- if (_node2SearchProxyMap.containsKey(node)) {
- node2ShardMap = new HashMap<String, List<String>>(1);
- node2ShardMap.put(node, shards);
- } else {
- node2ShardMap = _selectionPolicy.createNode2ShardsMap(shards);
- node = node2ShardMap.keySet().iterator().next();
- }
-
- GetDetailsInteraction getDetailsInteraction = new GetDetailsInteraction(node, node2ShardMap, hit.getDocId(), fields);
- getDetailsInteraction.run();
- getDetailsInteraction.checkSuccess();
- return getDetailsInteraction.getDetails();
- }
-
- public List<MapWritable> getDetails(List<Hit> hits) throws KattaException, InterruptedException {
- return getDetails(hits, null);
- }
-
- public List<MapWritable> getDetails(List<Hit> hits, final String[] fields) throws KattaException, InterruptedException {
- ExecutorService executorService = Executors.newFixedThreadPool(Math.min(10, hits.size() + 1));
- List<MapWritable> results = new ArrayList<MapWritable>();
- List<Future<MapWritable>> futures = new ArrayList<Future<MapWritable>>();
- for (final Hit hit : hits) {
- futures.add(executorService.submit(new Callable<MapWritable>() {
-
- @Override
- public MapWritable call() throws Exception {
- return getDetails(hit, fields);
- }
- }));
- }
-
- for (Future<MapWritable> future : futures) {
- try {
- results.add(future.get());
- } catch (ExecutionException e) {
- throw new KattaException("Could not get hit details.", e.getCause());
- }
- }
-
- executorService.shutdown();
-
- return results;
- }
-
protected class ShardNodeListener implements IZkChildListener {
public void handleChildChange(String parentPath, List<String> currentNodes) throws KattaException {
LOG.info("got shard (" + parentPath + ") event: " + currentNodes);
- final String shardName = ZkPathes.getName(parentPath);
+ final String shardName = _zkConfig.getZKName(parentPath);
// update shard2Nodes mapping
updateSelectionPolicy(shardName, currentNodes);
}
- }
- private class GetDocumentFrequencyInteraction extends NodeInteraction {
-
- private final Query _query;
- private final DocumentFrequencyWritable _docFreqs;
-
- public GetDocumentFrequencyInteraction(String node, Map<String, List<String>> node2ShardsMap, Query query,
- DocumentFrequencyWritable docFreqs) {
- super(node, node2ShardsMap);
- _query = query;
- _docFreqs = docFreqs;
- }
-
- @Override
- protected void doInteraction(ISearch search, String node, List<String> shards) throws IOException {
- final DocumentFrequencyWritable nodeDocFreqs = search.getDocFreqs(new QueryWritable(_query), shards
- .toArray(new String[shards.size()]));
- _docFreqs.addNumDocs(nodeDocFreqs.getNumDocs());
- _docFreqs.putAll(nodeDocFreqs.getAll());
- }
}
- private class GetCountInteraction extends NodeInteraction {
-
- private final Query _query;
- private final List<Integer> _result;
-
- public GetCountInteraction(String node, Map<String, List<String>> node2ShardsMap, Query query, List<Integer> result) {
- super(node, node2ShardsMap);
- _query = query;
- _result = result;
- }
-
- @Override
- protected void doInteraction(ISearch search, String node, List<String> shards) throws IOException {
- final int count = search.getResultCount(new QueryWritable(_query), shards.toArray(new String[shards.size()]));
- _result.add(count);
- }
- }
-
- private class GetDetailsInteraction extends NodeInteraction {
-
- private final int _docId;
- private final String[] _fields;
- private MapWritable _details;
-
- public GetDetailsInteraction(String node, Map<String, List<String>> node2ShardsMap, int docId, String[] fields) {
- super(node, node2ShardsMap);
- _docId = docId;
- _fields = fields;
- }
-
- @Override
- protected void doInteraction(ISearch search, String node, List<String> shards) throws IOException {
- String shard = shards.get(0);
- if (_fields == null) {
- _details = search.getDetails(shard, _docId);
- } else {
- _details = search.getDetails(shard, _docId, _fields);
- }
- }
-
- public MapWritable getDetails() {
- return _details;
- }
- }
-
- private class SearchInteraction extends NodeInteraction {
-
- private final Query _query;
- private final int _count;
- private final DocumentFrequencyWritable _docFreqs;
- private final Hits _result;
-
- public SearchInteraction(String node, Map<String, List<String>> node2ShardsMap, Query query,
- DocumentFrequencyWritable docFreqs, Hits result, int count) {
- super(node, node2ShardsMap);
- _query = query;
- _docFreqs = docFreqs;
- _result = result;
- _count = count;
- }
-
- @Override
- protected void doInteraction(ISearch search, String node, List<String> shards) throws IOException {
- Hits hits;
- final String[] shardsArray = shards.toArray(new String[shards.size()]);
- final HitsMapWritable shardToHits = search.search(new QueryWritable(_query), _docFreqs, shardsArray, _count);
- hits = shardToHits.getHits();
- _result.addHits(hits.getHits());
- _result.addTotalHits(hits.size());
- }
- }
-
- /**
- * This class encapsulates an interaction with one or multiple node's in order
- * to query information for a set of shards.
- *
- * Given the fact that shards a replicated about nodes, this class tries node
- * after node to get the desired information.
- */
- private abstract class NodeInteraction implements Runnable {
-
- private final String _node;
- private final Map<String, List<String>> _node2ShardsMap;
- private final List<String> _triedNodes = new ArrayList<String>(1);
- private int _tries = 0;
- private Exception _exception;
-
- public NodeInteraction(String node, Map<String, List<String>> node2ShardsMap) {
- _node = node;
- _node2ShardsMap = node2ShardsMap;
- }
-
- public final void run() {
- interact(_node, _node2ShardsMap);
- }
-
- protected final void interact(String node, Map<String, List<String>> node2ShardsMap) {
- List<String> shards = node2ShardsMap.get(node);
- try {
- _tries++;
- _triedNodes.add(node);
- ISearch searcher = _node2SearchProxyMap.get(node);
- try {
- if (searcher == null) {
- throw new IOException("node proxy for node " + node + " is not available any more");
- }
- long startTime = 0;
- if (LOG.isDebugEnabled()) {
- startTime = System.currentTimeMillis();
- }
- doInteraction(searcher, node, shards);
- if (LOG.isDebugEnabled()) {
- LOG.debug(getClass().getSimpleName() + " with node " + node + " took "
- + (System.currentTimeMillis() - startTime) + " ms.");
- }
- } catch (IOException e) {
- if (_tries == 3) {
- throw new KattaException(getClass().getSimpleName() + " for shards " + shards + " failed. Tried nodes: "
- + _triedNodes);
- }
- LOG.warn(
- "failed to interact with node " + node + ". Try with other node(s) " + node2ShardsMap.keySet() + ".",
- e);
- Map<String, List<String>> node2ShardsMapForFailedNode = prepareRetry(node, shards);
-
- // execute the action again for every node
- for (String newNode : node2ShardsMapForFailedNode.keySet()) {
- // TODO jz: if more then one node we should spawn new
- // threads
- interact(newNode, node2ShardsMapForFailedNode);
- }
- }
- } catch (Exception e) {
- _exception = e;
- }
- }
-
- public void checkSuccess() throws KattaException {
- if (_exception != null) {
- if (_exception instanceof KattaException) {
- throw (KattaException) _exception;
- }
- throw new KattaException(getClass().getSimpleName() + " for shards " + _node2ShardsMap.get(_node)
- + " failed. Tried nodes: " + _triedNodes, _exception);
- }
- }
-
- private Map<String, List<String>> prepareRetry(String node, List<String> shards) throws ShardAccessException {
- // remove node
- _node2ShardsMap.remove(node);
- _node2SearchProxyMap.remove(node);
- _selectionPolicy.removeNode(node);
-
- // find new node(s) for the shards and add to global node2ShardMap
- Map<String, List<String>> node2ShardsMapForFailedNode = _selectionPolicy.createNode2ShardsMap(shards);
- for (String newNode : node2ShardsMapForFailedNode.keySet()) {
- List<String> newNodeShards = node2ShardsMapForFailedNode.get(newNode);
- if (!_node2ShardsMap.containsKey(newNode)) {
- _node2ShardsMap.put(newNode, newNodeShards);
- } else {
- _node2ShardsMap.get(newNode).addAll(newNodeShards);
- }
- }
- return node2ShardsMapForFailedNode;
- }
-
- protected abstract void doInteraction(ISearch search, String node, List<String> shards) throws IOException;
- }
-
}
Added: trunk/src/main/java/net/sf/katta/client/ClientResult.java
===================================================================
--- trunk/src/main/java/net/sf/katta/client/ClientResult.java (rev 0)
+++ trunk/src/main/java/net/sf/katta/client/ClientResult.java 2009-07-01 20:47:50 UTC (rev 477)
@@ -0,0 +1,566 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.sf.katta.util.KattaException;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * A multithreaded destination for results and/or errors. Results are produced
+ * by nodes and we pass lists of shards to nodes. But due to replication and
+ * retries, we associate sets of shards with the results, not nodes.
+ *
+ * Multiple NodeInteractions will be writing to this object at the same time. If
+ * not closed, expect the contents to change. For example isComplete() might
+ * return false and then a call to getResults() might return a complete set (in
+ * which case another call to isComplete() would return true). If you need
+ * complex state information, rather than making multiple calls, you should use
+ *
+ *
+ *
+ * You can get these results from a WorkQueue by polling or blocking. Once you
+ * have an ClientResult instance you may poll it or block on it. Whenever
+ * resutls or errors are added notifyAll() is called. The ClientResult can
+ * report on the number or ratio of shards completed. You can stop the search by
+ * calling close(). The ClientResult will no longer change, and any outstanding
+ * threads will be killed (via notification to the provided IClosedListener).
+ */
+public class ClientResult<T> implements IResultReceiver<T>, Iterable<ClientResult<T>.Entry> {
+
+ private static final Logger LOG = Logger.getLogger(ClientResult.class);
+
+ /**
+ * Immutable storage of either a result or an error, which shards produced it,
+ * and it's arrival time.
+ */
+ public class Entry {
+
+ public final T result;
+ public final Throwable error;
+ public final Set<String> shards;
+ public final long time;
+
+ @SuppressWarnings("unchecked")
+ private Entry(Object o, Collection<String> shards, boolean isError) {
+ this.result = !isError ? (T) o : null;
+ this.error = isError ? (Throwable) o : null;
+ this.shards = Collections.unmodifiableSet(new HashSet<String>(shards));
+ this.time = System.currentTimeMillis();
+ }
+
+ public String toString() {
+ String resultStr;
+ if (result != null) {
+ resultStr = "null";
+ if (result != null) {
+ try {
+ resultStr = result.toString();
+ } catch (Throwable t) {
+ LOG.trace("Error calling toString() on result", t);
+ resultStr = "(toString() err)";
+ }
+ }
+ if (resultStr == null) {
+ resultStr = "(null toString())";
+ }
+ } else {
+ resultStr = error != null ? error.getClass().getSimpleName() : "null";
+ }
+ return String.format("%s from %s at %d", resultStr, shards, time);
+ }
+ }
+
+ /**
+ * Provides a way to notify interested parties when our close() method is
+ * called.
+ */
+ public interface IClosedListener {
+ /**
+ * The ClientResult's close() method was called. The result is closed before
+ * calling this.
+ */
+ public void clientResultClosed();
+ }
+
+ private boolean closed = false;
+ private final Set<String> allShards;
+ private final Set<String> seenShards = new HashSet<String>();
+ private final Set<Entry> entries = new HashSet<Entry>();
+ private final Map<Object, Entry> resultMap = new HashMap<Object, Entry>();
+ private final Collection<T> results = new ArrayList<T>();
+ private final Collection<Throwable> errors = new ArrayList<Throwable>();
+ private final long startTime = System.currentTimeMillis();
+ private final IClosedListener closedListener;
+
+ /**
+ * Construct a non-closed ClientResult, which waits for addResults() or
+ * addError() calls until close() is called. After that point, addResults()
+ * and addError() calls are ignored, and this object becomes immutable.
+ *
+ * @param closedListener
+ * If not null, it's clientResultClosed() method is called when our
+ * close() method is.
+ * @param allShards
+ * The set of all shards to expect results from.
+ */
+ public ClientResult(IClosedListener closedListener, Collection<String> allShards) {
+ if (allShards == null || allShards.isEmpty()) {
+ throw new IllegalArgumentException("No shards specified");
+ }
+ this.allShards = Collections.unmodifiableSet(new HashSet<String>(allShards));
+ this.closedListener = closedListener;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Created ClientResult(%s, %s)", closedListener != null ? closedListener : "null",
+ allShards));
+ }
+ }
+
+ /**
+ * Construct a non-closed ClientResult, which waits for addResults() or
+ * addError() calls until close() is called. After that point, addResults()
+ * and addError() calls are ignored, and this object becomes immutable.
+ *
+ * @param closedListener
+ * If not null, it's clientResultClosed() method is called when our
+ * close() method is.
+ * @param allShards
+ * The set of all shards to expect results from.
+ */
+ public ClientResult(IClosedListener closedListener, String... allShards) {
+ this(closedListener, Arrays.asList(allShards));
+ }
+
+ /**
+ * Add a result. Will be ignored if closed.
+ *
+ * @param result
+ * The result to add.
+ * @param shards
+ * The shards used to compute the result.
+ */
+ public void addResult(T result, Collection<String> shards) {
+ if (closed) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Ignoring results given to closed ClientResult");
+ }
+ return;
+ }
+ if (shards == null) {
+ LOG.warn("Null shards passed to AddResult()");
+ return;
+ }
+ Entry entry = new Entry(result, shards, false);
+ if (entry.shards.isEmpty()) {
+ LOG.warn("Empty shards passed to AddResult()");
+ return;
+ }
+ synchronized (this) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Adding result %s", entry));
+ }
+ if (LOG.isEnabledFor(Level.WARN)) {
+ for (String shard : entry.shards) {
+ if (seenShards.contains(shard)) {
+ LOG.warn("Duplicate occurances of shard " + shard);
+ } else if (!allShards.contains(shard)) {
+ LOG.warn("Unknown shard " + shard + " returned results");
+ }
+ }
+ }
+ entries.add(entry);
+ seenShards.addAll(entry.shards);
+ if (result != null) {
+ results.add(result);
+ resultMap.put(result, entry);
+ }
+ notifyAll();
+ }
+ }
+
+ /**
+ * Add a result. Will be ignored if closed.
+ *
+ * @param result
+ * The result to add.
+ * @param shards
+ * The shards used to compute the result.
+ */
+ public void addResult(T result, String... shards) {
+ addResult(result, Arrays.asList(shards));
+ }
+
+ /**
+ * Add an error. Will be ignored if closed.
+ *
+ * @param error
+ * The error to add.
+ * @param shards
+ * The shards used when the error happened.
+ */
+ public void addError(Throwable error, Collection<String> shards) {
+ if (closed) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Ignoring exception given to closed ClientResult");
+ }
+ return;
+ }
+ if (shards == null) {
+ LOG.warn("Null shards passed to addError()");
+ return;
+ }
+ Entry entry = new Entry(error, shards, true);
+ if (entry.shards.isEmpty()) {
+ LOG.warn("Empty shards passed to addError()");
+ return;
+ }
+ synchronized (this) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Adding error %s", entry));
+ }
+ if (LOG.isEnabledFor(Level.WARN)) {
+ for (String shard : entry.shards) {
+ if (seenShards.contains(shard)) {
+ LOG.warn("Duplicate occurances of shard " + shard);
+ } else if (!allShards.contains(shard)) {
+ LOG.warn("Unknown shard " + shard + " returned results");
+ }
+ }
+ }
+ entries.add(entry);
+ seenShards.addAll(entry.shards);
+ if (error != null) {
+ errors.add(error);
+ resultMap.put(error, entry);
+ }
+ notifyAll();
+ }
+ }
+
+ /**
+ * Add an error. Will be ignored if closed.
+ *
+ * @param error
+ * The error to add.
+ * @param shards
+ * The shards used when the error happened.
+ */
+ public synchronized void addError(Throwable error, String... shards) {
+ addError(error, Arrays.asList(shards));
+ }
+
+ /**
+ * Stop accepting additional results or errors. Become an immutable object.
+ * Also report the closure to the IClosedListener passed to our constructor,
+ * if any. Normally this will tell the WorkQueue to shut down immediately,
+ * killing any still running threads.
+ */
+ public synchronized void close() {
+ LOG.trace("close() called.");
+ if (!closed) {
+ closed = true;
+ if (closedListener != null) {
+ LOG.trace("Notifying closed listener.");
+ closedListener.clientResultClosed();
+ }
+ }
+ notifyAll();
+ }
+
+ /**
+ * Is this result set closed, and therefore not accepting any additional
+ * results or errors. Once closed, this becomes an immutable object.
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * @return the set of all shards we are expecting results from.
+ */
+ public Set<String> getAllShards() {
+ return allShards;
+ }
+
+ /**
+ * @return the set of shards from whom we have seen either results or errors.
+ */
+ public synchronized Set<String> getSeenShards() {
+ return Collections.unmodifiableSet(closed ? seenShards : new HashSet<String>(seenShards));
+ }
+
+ /**
+ * @return the subset of all shards from whom we have not seen either results
+ * or errors.
+ */
+ public synchronized Set<String> getMissingShards() {
+ Set<String> missing = new HashSet<String>(allShards);
+ missing.removeAll(seenShards);
+ return missing;
+ }
+
+ /**
+ * @return all of the results seen so far. Does not include errors.
+ */
+ public synchronized Collection<T> getResults() {
+ return Collections.unmodifiableCollection(closed ? results : new ArrayList<T>(results));
+ }
+
+ /**
+ * Either return results or throw an exception. Allows simple one line use of
+ * a ClientResult. If no errors occurred, returns same results as
+ * getResults(). If any errors occurred, one is chosen via getError() and
+ * thrown.
+ *
+ * @return if no errors occurred, results via getResults().
+ * @throws Throwable
+ * if any errors occurred, via getError().
+ */
+ public synchronized Collection<T> getResultsOrThrowException() throws Throwable {
+ if (isError()) {
+ throw getError();
+ } else {
+ return getResults();
+ }
+ }
+
+ /**
+ * Either return results or throw a KattaException. Allows simple one line use
+ * of a ClientResult. If no errors occurred, returns same results as
+ * getResults(). If any errors occurred, one is chosen via getKattaException()
+ * and thrown.
+ *
+ * @return if no errors occurred, results via getResults().
+ * @throws KattaException
+ * if any errors occurred, via getError().
+ */
+ public synchronized Collection<T> getResultsOrThrowKattaException() throws KattaException {
+ if (isError()) {
+ throw getKattaException();
+ } else {
+ return getResults();
+ }
+ }
+
+ /**
+ * @return all of the errors seen so far.
+ */
+ public synchronized Collection<Throwable> getErrors() {
+ return Collections.unmodifiableCollection(closed ? errors : new ArrayList<Throwable>(errors));
+ }
+
+ /**
+ * @return a randomly chosen error, or null if none exist.
+ */
+ public synchronized Throwable getError() {
+ for (Entry e : entries) {
+ if (e.error != null) {
+ return e.error;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @return a randomly chosen KattaException if one exists, else a
+ * KattaException wrapped around a randomly chosen error if one
+ * exists, else null.
+ */
+ public synchronized KattaException getKattaException() {
+ Throwable error = null;
+ for (Entry e : this) {
+ if (e.error != null) {
+ if (e.error instanceof KattaException) {
+ return (KattaException) e.error;
+ } else {
+ error = e.error;
+ }
+ }
+ }
+ if (error != null) {
+ return new KattaException("Error", error);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * @param result
+ * The result to look up.
+ * @return What shards produced the result, and when it arrived. Returns null
+ * if result not found.
+ */
+ public synchronized Entry getResultEntry(T result) {
+ return resultMap.get(result);
+ }
+
+ /**
+ * @param error
+ * The error to look up.
+ * @return What shards produced the error, and when it arrived. Returns null
+ * if error not found.
+ */
+ public synchronized Entry getErrorEntry(Throwable error) {
+ return resultMap.get(error);
+ }
+
+ /**
+ * @return true if we have seen either a result or an error for all shards.
+ */
+ public synchronized boolean isComplete() {
+ return seenShards.containsAll(allShards);
+ }
+
+ /**
+ * @return true if any errors were reported.
+ */
+ public synchronized boolean isError() {
+ return !errors.isEmpty();
+ }
+
+ /**
+ * @return true if result is complete (all shards reporting in) and no errors
+ * occurred.
+ */
+ public synchronized boolean isOK() {
+ return isComplete() && !isError();
+ }
+
+ /**
+ * @return the ratio (0.0 .. 1.0) of shards we have seen. 0.0 when no shards,
+ * 1.0 when complete.
+ */
+ public synchronized double getShardCoverage() {
+ int seen = seenShards.size();
+ int all = allShards.size();
+ return all > 0 ? (double) seen / (double) all : 0.0;
+ }
+
+ /**
+ * @return the time when this ClientResult was created.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * @return a snapshot of all the data about the results so far.
+ */
+ public Set<Entry> entrySet() {
+ if (closed) {
+ return Collections.unmodifiableSet(entries);
+ } else {
+ synchronized (this) {
+ // Set will keep changing, make a snapshot.
+ return Collections.unmodifiableSet(new HashSet<Entry>(entries));
+ }
+ }
+ }
+
+ /**
+ * @return an iterator of our Entries sees so far.
+ */
+ public Iterator<Entry> iterator() {
+ return entrySet().iterator();
+ }
+
+ /**
+ * @return a list of our results or errors, in the order they arrived.
+ */
+ public List<Entry> getArrivalTimes() {
+ List<Entry> arrivals;
+ synchronized (this) {
+ arrivals = new ArrayList<Entry>(entries);
+ }
+ Collections.sort(arrivals, new Comparator<Entry>() {
+ public int compare(Entry o1, Entry o2) {
+ if (o1.time != o2.time) {
+ return o1.time < o2.time ? -1 : 1;
+ } else {
+ // Break ties in favor of results.
+ if (o1.result != null && o2.result == null) {
+ return -1;
+ } else if (o2.result != null && o1.result == null) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
+ });
+ return arrivals;
+ }
+
+ public void waitFor(IResultPolicy<T> policy) {
+ long waitTime = 0;
+ while (true) {
+ synchronized (results) {
+ // Need to stay synchronized before waitTime() through wait() or we will
+ // miss notifications.
+ waitTime = policy.waitTime(this);
+ if (waitTime > 0 && !closed) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Waiting %d ms, results = %s", waitTime, this));
+ }
+ try {
+ synchronized (this) {
+ this.wait(waitTime);
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted", e);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Done waiting, results = %s", this));
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ if (waitTime < 0) {
+ close();
+ }
+ }
+
+ public synchronized String toString() {
+ int numResults = 0;
+ int numErrors = 0;
+ for (Entry e : this) {
+ if (e.result != null) {
+ numResults++;
+ }
+ if (e.error != null) {
+ numErrors++;
+ }
+ }
+ return String.format("ClientResult: %d results, %d errors, %d/%d shards%s%s", numResults, numErrors, seenShards
+ .size(), allShards.size(), closed ? " (closed)" : "", isComplete() ? " (complete)" : "");
+ }
+
+}
Modified: trunk/src/main/java/net/sf/katta/client/DefaultNodeSelectionPolicy.java
===================================================================
--- trunk/src/main/java/net/sf/katta/client/DefaultNodeSelectionPolicy.java 2009-07-01 20:41:07 UTC (rev 476)
+++ trunk/src/main/java/net/sf/katta/client/DefaultNodeSelectionPolicy.java 2009-07-01 20:47:50 UTC (rev 477)
@@ -15,6 +15,7 @@
*/
package net.sf.katta.client;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -39,14 +40,14 @@
// _shardsToNodeMap = newShard2NodesMap;
// }
- public void update(String shard, List<String> nodes) {
+ public void update(String shard, Collection<String> nodes) {
_shardsToNodeMap.put(shard, new CircularList<String>(nodes));
}
public List<String> remove(String shard) {
CircularList<String> nodes = _shardsToNodeMap.remove(shard);
if (nodes == null) {
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
return nodes.asList();
}
@@ -59,11 +60,11 @@
}
}
- public Map<String, List<String>> createNode2ShardsMap(List<String> shards) throws ShardAccessException {
+ public Map<String, List<String>> createNode2ShardsMap(Collection<String> shards) throws ShardAccessException {
One2ManyListMap<String, String> node2ShardsMap = new One2ManyListMap<String, String>();
for (String shard : shards) {
CircularList<String> nodeList = _shardsToNodeMap.get(shard);
- if (nodeList.isEmpty()) {
+ if (nodeList == null || nodeList.isEmpty()) {
throw new ShardAccessException(shard);
}
String node;
@@ -74,5 +75,19 @@
}
return node2ShardsMap.asMap();
}
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("DefaultNodeSelectionPolicy: ");
+ String sep = "";
+ for (Map.Entry<String, CircularList<String>> e : _shardsToNodeMap.entrySet()) {
+ buf.append(sep);
+ buf.append(e.getKey());
+ buf.append(" --> ");
+ buf.append(e.getValue());
+ sep = " ";
+ }
+ return buf.toString();
+ }
}
Modified: trunk/src/main/java/net/sf/katta/client/DeployClient.java
===================================================================
--- trunk/src/main/java/net/sf/katta/client/DeployClient.java 2009-07-01 20:41:07 UTC (rev 476)
+++ trunk/src/main/java/net/sf/katta/client/DeployClient.java 2009-07-01 20:47:50 UTC (rev 477)
@@ -23,10 +23,10 @@
import net.sf.katta.util.KattaException;
import net.sf.katta.util.ZkConfiguration;
import net.sf.katta.zk.ZKClient;
-import net.sf.katta.zk.ZkPathes;
public class DeployClient implements IDeployClient {
+ private ZkConfiguration _conf;
private ZKClient _zkClient;
public DeployClient(String servers, int port, int timeout) throws KattaException {
@@ -38,6 +38,7 @@
}
public DeployClient(ZKClient zkClient) throws KattaException {
+ _conf = zkClient.getConfig();
_zkClient = zkClient;
if (!_zkClient.isStarted()) {
_zkClient.start(30000);
@@ -46,13 +47,13 @@
public IIndexDeployFuture addIndex(String name, String path, int replicationLevel)
throws KattaException {
- final String indexPath = ZkPathes.getIndexPath(name);
+ final String indexPath = _conf.getZKIndexPath(name);
validateIndexName(name, indexPath);
final IndexMetaData indexMetaData = new IndexMetaData(path, replicationLevel,
IndexMetaData.IndexState.ANNOUNCED);
_zkClient.create(indexPath, indexMetaData);
- return new IndexDeployFuture(_zkClient, name, indexMetaData);
+ return new IndexDeployFuture(_zkClient, name, indexPath, indexMetaData);
}
private void validateIndexName(String name, String indexPath) throws KattaException {
@@ -66,7 +67,7 @@
}
public void removeIndex(String name) throws KattaException {
- final String indexPath = ZkPathes.getIndexPath(name);
+ final String indexPath = _conf.getZKIndexPath(name);
if (!_zkClient.exists(indexPath)) {
throw new IllegalArgumentException("index not exists: " + name);
}
@@ -74,14 +75,14 @@
}
public boolean existsIndex(String indexName) throws KattaException {
- return _zkClient.exists(ZkPathes.getIndexPath(indexName));
+ return _zkClient.exists(_conf.getZKIndexPath(indexName));
}
public List<IndexMetaData> getIndexes(IndexState indexState) throws KattaException {
- final List<String> indexes = _zkClient.getChildren(ZkPathes.INDEXES);
+ final List<String> indexes = _zkClient.getChildren(_conf.getZKIndicesPath());
final List<IndexMetaData> returnIndexes = new ArrayList<IndexMetaData>();
for (final String index : indexes) {
- final IndexMetaData metaData = _zkClient.readData(ZkPathes.getIndexPath(index), IndexMetaData.class);
+ final IndexMetaData metaData = _zkClient.readData(_conf.getZKIndexPath(index), IndexMetaData.class);
if (metaData.getState() == indexState) {
returnIndexes.add(metaData);
}
@@ -91,10 +92,10 @@
// TODO jz: if IndexMetaData would contain index name, we could avoid that
public List<String> getIndexNames(IndexState indexState) throws KattaException {
- final List<String> indexes = _zkClient.getChildren(ZkPathes.INDEXES);
+ final List<String> indexes = _zkClient.getChildren(_conf.getZKIndicesPath());
final List<String> returnIndexes = new ArrayList<String>();
for (final String index : indexes) {
- final IndexMetaData metaData = _zkClient.readData(ZkPathes.getIndexPath(index), IndexMetaData.class);
+ final IndexMetaData metaData = _zkClient.readData(_conf.getZKIndexPath(index), IndexMetaData.class);
if (metaData.getState() == indexState) {
returnIndexes.add(index);
}
@@ -107,7 +108,7 @@
}
public IndexMetaData getIndexMetaData(String name) throws KattaException {
- return _zkClient.readData(ZkPathes.getIndexPath(name), IndexMetaData.class);
+ return _zkClient.readData(_conf.getZKIndexPath(name), IndexMetaData.class);
}
}
Deleted: trunk/src/main/java/net/sf/katta/client/IClient.java
===================================================================
--- trunk/src/main/java/net/sf/katta/client/IClient.java 2009-07-01 20:41:07 UTC (rev 476)
+++ trunk/src/main/java/net/sf/katta/client/IClient.java 2009-07-01 20:47:50 UTC (rev 477)
@@ -1,170 +0,0 @@
-/**
- * Copyright 2008 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package net.sf.katta.client;
-
-import java.util.List;
-
-import net.sf.katta.node.Hit;
-import net.sf.katta.node.Hits;
-import net.sf.katta.node.IQuery;
-import net.sf.katta.util.KattaException;
-
-import org.apache.hadoop.io.MapWritable;
-import org.apache.lucene.search.Query;
-
-/**
- * Client for searching document indices deployed on a katta cluster.
- * <p>
- *
- * You provide a {@link IQuery} and the name of the deployed indices, and get
- * back {@link Hits} which contains multiple {@link Hit} objects as the results.
- * <br>
- * See {@link #search(IQuery, String[], int)}.
- * <p>
- *
- * The details of a hit-document can be retrieved through the
- * {@link #getDetails(Hit, String[])} method.
- *
- * @see Hit
- * @see Hits...
[truncated message content] |
|
From: <jo...@us...> - 2009-07-01 20:41:13
|
Revision: 476
http://katta.svn.sourceforge.net/katta/?rev=476&view=rev
Author: joa23
Date: 2009-07-01 20:41:07 +0000 (Wed, 01 Jul 2009)
Log Message:
-----------
KATTA-54, makes katta more generic. ATTENTION this is a experimental commit, that makes Katta completely unstable.
The test suite is broken and functionality commented out.
Modified Paths:
--------------
trunk/extras/indexing/src/main/java/net/sf/katta/indexing/IndexerJob.java
trunk/src/build/ant/build.properties
trunk/src/build/ant/common-build.xml
trunk/src/build/checkstyle/checkstyle.xml
trunk/src/main/java/net/sf/katta/Katta.java
Modified: trunk/extras/indexing/src/main/java/net/sf/katta/indexing/IndexerJob.java
===================================================================
--- trunk/extras/indexing/src/main/java/net/sf/katta/indexing/IndexerJob.java 2009-07-01 20:39:30 UTC (rev 475)
+++ trunk/extras/indexing/src/main/java/net/sf/katta/indexing/IndexerJob.java 2009-07-01 20:41:07 UTC (rev 476)
@@ -36,6 +36,7 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
Modified: trunk/src/build/ant/build.properties
===================================================================
--- trunk/src/build/ant/build.properties 2009-07-01 20:39:30 UTC (rev 475)
+++ trunk/src/build/ant/build.properties 2009-07-01 20:41:07 UTC (rev 476)
@@ -48,4 +48,4 @@
javac.version=1.6
javac.args=
javac.args.warnings=-Xlint:none
-build.encoding=ISO-8859-1
\ No newline at end of file
+build.encoding=ISO-8859-1
Modified: trunk/src/build/ant/common-build.xml
===================================================================
--- trunk/src/build/ant/common-build.xml 2009-07-01 20:39:30 UTC (rev 475)
+++ trunk/src/build/ant/common-build.xml 2009-07-01 20:41:07 UTC (rev 476)
@@ -94,7 +94,7 @@
<ivy:cachepath pathid="test.path.id" conf="test" />
<ivy:cachepath pathid="instrument.path.id" conf="instrument" />
- <junit showoutput="false" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="256m" dir="${basedir}" errorProperty="tests.failed" failureProperty="tests.failed">
+ <junit showoutput="false" printsummary="yes" haltonfailure="no" forkmode="perTest" maxmemory="256m" dir="${basedir}" errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="net.sourceforge.cobertura.datafile" file="${reports.dir}/katta_coverage.ser" />
<classpath>
<pathelement location="${instrumented.dir}" />
Modified: trunk/src/build/checkstyle/checkstyle.xml
===================================================================
--- trunk/src/build/checkstyle/checkstyle.xml 2009-07-01 20:39:30 UTC (rev 475)
+++ trunk/src/build/checkstyle/checkstyle.xml 2009-07-01 20:41:07 UTC (rev 476)
@@ -29,6 +29,7 @@
<property name="headerFile" value="src/build/checkstyle/java.header"/>
<property name="fileExtensions" value="java"/>
<property name="id" value="header"/>
+ <property name="ignoreLines" value="2"/>
</module>
</module>
Modified: trunk/src/main/java/net/sf/katta/Katta.java
===================================================================
--- trunk/src/main/java/net/sf/katta/Katta.java 2009-07-01 20:39:30 UTC (rev 475)
+++ trunk/src/main/java/net/sf/katta/Katta.java 2009-07-01 20:41:07 UTC (rev 476)
@@ -19,43 +19,40 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import net.sf.katta.client.Client;
import net.sf.katta.client.DeployClient;
-import net.sf.katta.client.IClient;
import net.sf.katta.client.IDeployClient;
import net.sf.katta.client.IIndexDeployFuture;
+import net.sf.katta.client.ILuceneClient;
+import net.sf.katta.client.LuceneClient;
import net.sf.katta.index.DeployedShard;
import net.sf.katta.index.IndexMetaData;
import net.sf.katta.index.ShardError;
import net.sf.katta.index.IndexMetaData.IndexState;
import net.sf.katta.index.indexer.SampleIndexGenerator;
import net.sf.katta.index.indexer.merge.IndexMergeApplication;
-import net.sf.katta.loadtest.LoadTestNode;
import net.sf.katta.loadtest.LoadTestStarter;
import net.sf.katta.master.Master;
-import net.sf.katta.node.BaseNode;
import net.sf.katta.node.Hit;
import net.sf.katta.node.Hits;
+import net.sf.katta.node.INodeManaged;
import net.sf.katta.node.IQuery;
-import net.sf.katta.node.LuceneNode;
+import net.sf.katta.node.Node;
import net.sf.katta.node.NodeMetaData;
import net.sf.katta.node.Query;
-import net.sf.katta.node.BaseNode.NodeState;
+import net.sf.katta.node.Node.NodeState;
import net.sf.katta.tool.ZkTool;
import net.sf.katta.util.KattaException;
-import net.sf.katta.util.LoadTestNodeConfiguration;
-import net.sf.katta.util.NodeConfiguration;
import net.sf.katta.util.SymlinkResourceLoader;
import net.sf.katta.util.VersionInfo;
import net.sf.katta.util.ZkConfiguration;
import net.sf.katta.zk.ZKClient;
-import net.sf.katta.zk.ZkPathes;
import net.sf.katta.zk.ZkServer;
import org.apache.hadoop.conf.Configuration;
@@ -65,15 +62,22 @@
/**
* Provides command line access to a Katta cluster.
*/
+@SuppressWarnings("deprecation")
public class Katta {
- public static ZkServer _zkServer; // TODO sg we have that just to access it in
- // tests, any suggestion how to solve that
- // better is welcome.
+ // TODO sg we have that just to access it in
+ // tests, any suggestion how to solve that better is welcome.
+ public static ZkServer _zkServer;
+
private final ZKClient _zkClient;
-
+ private ZkConfiguration _conf;
+
public Katta() throws KattaException {
- final ZkConfiguration configuration = new ZkConfiguration();
+ this(new ZkConfiguration());
+ }
+
+ public Katta(final ZkConfiguration configuration) throws KattaException {
+ _conf = configuration;
_zkClient = new ZKClient(configuration);
_zkClient.start(10000);
}
@@ -86,7 +90,7 @@
final ZkConfiguration conf = new ZkConfiguration();
// static methods first
if (command.endsWith("startNode")) {
- startNode(conf);
+ startNode(args.length > 1 ? args[1] : null, conf);
} else if (command.endsWith("startMaster")) {
startMaster(conf);
} else if (command.endsWith("startLoadTestNode")) {
@@ -109,7 +113,7 @@
} else if (command.endsWith("index")) {
generateIndex(args[1], args[2], Integer.parseInt(args[3]), Integer.parseInt(args[4]));
}
-
+
else {
// non static methods
Katta katta = null;
@@ -132,13 +136,19 @@
}
katta = new Katta();
katta.addIndex(args[1], args[2], replication);
+ } else if (command.endsWith("setState")) {
+ if (args.length < 3) {
+ printUsageAndExit();
+ }
+ katta = new Katta();
+ katta.setState(args[1], args[2]);
} else if (command.endsWith("removeIndex")) {
katta = new Katta();
katta.removeIndex(args[1]);
- } else if (command.endsWith("mergeIndexes")) {
+ } else if (command.endsWith("mergeIndexes") || command.endsWith("mergeIndices")) {
katta = new Katta();
katta.mergeIndexes(args);
- } else if (command.endsWith("listIndexes")) {
+ } else if (command.endsWith("listIndexes") || command.endsWith("listIndices")) {
boolean detailedView = false;
for (String arg : args) {
if (arg.equals("-d")) {
@@ -152,7 +162,7 @@
katta.listNodes();
} else if (command.endsWith("showStructure")) {
katta = new Katta();
- katta.showStructure();
+ katta.showStructure(args.length > 1 ? args[1] : null);
} else if (command.endsWith("check")) {
katta = new Katta();
katta.check();
@@ -186,44 +196,47 @@
public static void startIntegrationTest(int nodes, int startRate, int endRate, int step, int runTime, String[] indexNames,
String queryFile, int count, ZkConfiguration conf) throws KattaException {
- final ZKClient client = new ZKClient(conf);
- final LoadTestStarter integrationTester = new LoadTestStarter(client, nodes, startRate, endRate, step, runTime, indexNames, queryFile, count);
- integrationTester.start();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- integrationTester.shutdown();
- }
- });
- try {
- while (client.isStarted()) {
- Thread.sleep(100);
- }
- } catch (InterruptedException e) {
- // terminate
- }
+// TODO: port Load Test over to new client/server setup.
+// final ZKClient client = new ZKClient(conf);
+// final LoadTestStarter integrationTester = new LoadTestStarter(client, nodes, startRate, endRate, step, runTime, indexNames, queryFile, count);
+// integrationTester.start();
+// Runtime.getRuntime().addShutdownHook(new Thread() {
+// @Override
+// public void run() {
+// integrationTester.shutdown();
+// }
+// });
+// try {
+// while (client.isStarted()) {
+// Thread.sleep(100);
+// }
+// } catch (InterruptedException e) {
+// // terminate
+// }
}
public static void startLoadTestNode(ZkConfiguration conf) throws KattaException, InterruptedException {
- final ZKClient client = new ZKClient(conf);
- final LoadTestNode testSearcher = new LoadTestNode(client, new LoadTestNodeConfiguration());
- testSearcher.start();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- testSearcher.shutdown();
- }
- });
- testSearcher.join();
+// TODO: port load test to new client/server model.
+//
+// final ZKClient client = new ZKClient(conf);
+// final LoadTestNode testSearcher = new LoadTestNode(client, new LoadTestNodeConfiguration());
+// testSearcher.start();
+// Runtime.getRuntime().addShutdownHook(new Thread() {
+// @Override
+// public void run() {
+// testSearcher.shutdown();
+// }
+// });
+// testSearcher.join();
}
- private static void generateIndex(String input, String output, int wordsPerDoc, int indexSize) {
+ private static void generateIndex(String input, String output, int wordsPerDoc, int indexSize){
SampleIndexGenerator sampleIndexGenerator = new SampleIndexGenerator();
sampleIndexGenerator.createIndex(input, output, wordsPerDoc, indexSize);
}
-
+
private void redeployIndex(final String indexName) throws KattaException {
- String indexPath = ZkPathes.getIndexPath(indexName);
+ String indexPath = _conf.getZKIndexPath(indexName);
if (!_zkClient.exists(indexPath)) {
printError("index '" + indexName + "' does not exist");
return;
@@ -234,7 +247,8 @@
try {
removeIndex(indexName);
Thread.sleep(5000);
- addIndex(indexName, indexMetaData.getPath(), indexMetaData.getReplicationLevel());
+ addIndex(indexName, indexMetaData.getPath(), indexMetaData
+ .getReplicationLevel());
} catch (InterruptedException e) {
printError("Redeployment of index '" + indexName + "' interrupted.");
}
@@ -242,7 +256,7 @@
}
private void showErrors(final String indexName) throws KattaException {
- String indexZkPath = ZkPathes.getIndexPath(indexName);
+ String indexZkPath = _conf.getZKIndexPath(indexName);
if (!_zkClient.exists(indexZkPath)) {
printError("index '" + indexName + "' does not exist");
return;
@@ -255,12 +269,12 @@
List<String> shards = _zkClient.getChildren(indexZkPath);
for (String shardName : shards) {
System.out.println("Shard: " + shardName);
- String shard2ErrorRootPath = ZkPathes.getShard2ErrorRootPath(shardName);
+ String shard2ErrorRootPath = _conf.getZKShardToErrorPath(shardName);
if (_zkClient.exists(shard2ErrorRootPath)) {
List<String> errors = _zkClient.getChildren(shard2ErrorRootPath);
for (String nodeName : errors) {
System.out.print("\tNode: " + nodeName);
- String shardToNodePath = ZkPathes.getShard2ErrorPath(shardName, nodeName);
+ String shardToNodePath = _conf.getZKShardToErrorPath(shardName, nodeName);
ShardError shardError = new ShardError();
_zkClient.readData(shardToNodePath, shardError);
System.out.println("\tError: " + shardError.getErrorMsg());
@@ -275,7 +289,7 @@
System.out.println("Compiled by '" + VersionInfo.COMPILED_BY + "' on '" + VersionInfo.COMPILE_TIME + "'");
}
- public static void startMaster(ZkConfiguration conf) throws KattaException {
+ public static void startMaster(final ZkConfiguration conf) throws KattaException {
// ZkServer zkServer = null;
if (conf.isEmbedded()) {
_zkServer = new ZkServer(conf);
@@ -284,7 +298,6 @@
final Master master = new Master(client);
master.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
public void run() {
master.shutdown();
}
@@ -292,18 +305,41 @@
if (_zkServer != null) {
_zkServer.join();
} else {
- // just wait until the JVM terminates
+ // Just wait until the JVM terminates.
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
- // terminate
+ // Terminate.
}
}
}
- public static void startNode(ZkConfiguration conf) throws KattaException, InterruptedException {
+ public static void startNode(String serverClassName, final ZkConfiguration conf) throws KattaException, InterruptedException {
+ INodeManaged server = null;
+ try {
+ if (serverClassName == null) {
+ serverClassName = "net.sf.katta.node.LuceneServer";
+ }
+ Class<?> serverClass = Katta.class.getClassLoader().loadClass(serverClassName);
+ if (!INodeManaged.class.isAssignableFrom(serverClass)) {
+ System.err.println("Class " + serverClassName + " does not implement INodeManaged!");
+ System.exit(1);
+ }
+ server = (INodeManaged) serverClass.newInstance();
+ } catch (ClassNotFoundException e) {
+ System.err.println("Can not find class " + serverClassName + "!");
+ System.exit(1);
+ } catch (InstantiationException e) {
+ System.err.println("Could not create instance of class " + serverClassName + "!");
+ System.exit(1);
+ } catch (IllegalAccessException e) {
+ System.err.println("Unable to access class " + serverClassName + "!");
+ System.exit(1);
+ } catch (Throwable t) {
+ throw new RuntimeException("Error getting server instance for " + serverClassName, t);
+ }
final ZKClient client = new ZKClient(conf);
- final BaseNode node = new LuceneNode(client, new NodeConfiguration());
+ final Node node = new Node(client, server);
node.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
@@ -323,22 +359,22 @@
deployClient.removeIndex(indexName);
}
- public void showStructure() throws KattaException {
- _zkClient.showFolders(System.out);
+ public void showStructure(String arg) throws KattaException {
+ _zkClient.showFolders(arg != null && arg.startsWith("-a"), System.out);
}
private void check() throws KattaException {
- // System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
- System.out.println("Index Analysis");
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
- List<String> indexes = _zkClient.getChildren(ZkPathes.INDEXES);
+ System.out.println(" Index Analysis");
+ System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
+ List<String> indexes = _zkClient.getChildren(_conf.getZKIndicesPath());
IndexMetaData indexMetaData = new IndexMetaData();
CounterMap<IndexState> indexStateCounterMap = new CounterMap<IndexState>();
for (String index : indexes) {
- _zkClient.readData(ZkPathes.getIndexPath(index), indexMetaData);
+ _zkClient.readData(_conf.getZKIndexPath(index), indexMetaData);
indexStateCounterMap.increment(indexMetaData.getState());
}
- Table tableIndexStates = new Table(new String[] { "index state", "count" });
+ Table tableIndexStates = new Table("Index State", "Count");
Set<IndexState> keySet = indexStateCounterMap.keySet();
for (IndexState indexState : keySet) {
tableIndexStates.addRow(indexState, indexStateCounterMap.getCount(indexState));
@@ -347,52 +383,85 @@
System.out.println(indexes.size() + " indexes announced");
System.out.println("\n");
- System.out.println("Shard Analysis");
+ System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
+ System.out.println(" Shard Analysis");
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n");
for (String index : indexes) {
System.out.println("checking " + index + " ...");
- _zkClient.readData(ZkPathes.getIndexPath(index), indexMetaData);
- List<String> shards = _zkClient.getChildren(ZkPathes.getIndexPath(index));
+ _zkClient.readData(_conf.getZKIndexPath(index), indexMetaData);
+ List<String> shards = _zkClient.getChildren(_conf.getZKIndexPath(index));
for (String shard : shards) {
- int shardReplication = _zkClient.countChildren(ZkPathes.getShard2NodeRootPath(shard));
+ int shardReplication = _zkClient.countChildren(_conf.getZKShardToNodePath(shard));
if (shardReplication < indexMetaData.getReplicationLevel()) {
System.out.println("\tshard " + shard + " is under-replicated (" + shardReplication + "/"
- + indexMetaData.getReplicationLevel() + ")");
+ + indexMetaData.getReplicationLevel() + ")");
} else if (shardReplication > indexMetaData.getReplicationLevel()) {
System.out.println("\tshard " + shard + " is over-replicated (" + shardReplication + "/"
- + indexMetaData.getReplicationLevel() + ")");
+ + indexMetaData.getReplicationLevel() + ")");
}
}
}
System.out.println("\n");
- System.out.println("Node Analysis");
- System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n");
- Table tableNodeLoad = new Table("node", "connected", "deployed shards");
- List<String> nodes = _zkClient.getChildren(ZkPathes.NODE_TO_SHARD);
+ System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
+ System.out.println(" Node Analysis");
+ System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
+ Table tableNodeLoad = new Table("Node", "Connected", "Shard Status");
+ int totalShards = 0;
+ int totalAnnouncedShards = 0;
+ long startTime = Long.MAX_VALUE;
+ List<String> nodes = _zkClient.getChildren(_conf.getZKNodeToShardPath());
for (String node : nodes) {
- boolean isConnected = _zkClient.exists(ZkPathes.getNodePath(node));
- int shardCount = _zkClient.countChildren(ZkPathes.getNode2ShardRootPath(node));
+ boolean isConnected = _zkClient.exists(_conf.getZKNodePath(node));
+ int shardCount = 0;
+ int announcedShardCount = 0;
+ for (String shard : _zkClient.getChildren(_conf.getZKNodeToShardPath(node))) {
+ shardCount++;
+ long ctime = _zkClient.getCreateTime(_conf.getZKShardToNodePath(shard, node));
+ if (ctime > 0) {
+ announcedShardCount++;
+ if (ctime < startTime) {
+ startTime = ctime;
+ }
+ }
+ }
+ totalShards += shardCount;
+ totalAnnouncedShards += announcedShardCount;
StringBuilder builder = new StringBuilder();
- builder.append(" ");
+ builder.append(String.format(" %9s ", String.format("%d/%d", announcedShardCount, shardCount)));
for (int i = 0; i < shardCount; i++) {
- builder.append("|");
+ builder.append(i < announcedShardCount ? "#" : "-");
}
- builder.append(" ");
- builder.append(shardCount);
- tableNodeLoad.addRow(node, "" + isConnected, builder.toString());
+ tableNodeLoad.addRow(node, Boolean.toString(isConnected), builder);
}
System.out.println(tableNodeLoad);
+ double progress = totalShards == 0 ? 0.0 : (double) totalAnnouncedShards / (double) totalShards;
+ System.out.printf("%d out of %d shards deployed (%.2f%%)\n",
+ totalAnnouncedShards, totalShards, 100 * progress);
+ if (startTime < Long.MAX_VALUE && totalShards > 0 && totalAnnouncedShards > 0 && totalAnnouncedShards < totalShards) {
+ long elapsed = System.currentTimeMillis() - startTime;
+ double timePerShard = (double) elapsed / (double) totalAnnouncedShards;
+ long remaining = Math.round(timePerShard * (totalShards - totalAnnouncedShards));
+ Date finished = new Date(System.currentTimeMillis() + remaining);
+ remaining /= 1000;
+ long secs = remaining % 60;
+ remaining /= 60;
+ long min = remaining % 60;
+ remaining /= 60;
+ System.out.printf("Estimated completion: %s (%dh %dm %ds)", finished, remaining, min, secs);
+ }
}
public void listNodes() throws KattaException {
final List<String> nodes = _zkClient.getKnownNodes();
int inServiceNodeCount = 0;
final Table table = new Table();
+ int numNodes = 0;
for (final String node : nodes) {
- final String nodePath = ZkPathes.getNodePath(node);
+ final String nodePath = _conf.getZKNodePath(node);
final NodeMetaData nodeMetaData = new NodeMetaData();
if (_zkClient.exists(nodePath)) {
+ numNodes++;
_zkClient.readData(nodePath, nodeMetaData);
NodeState nodeState = nodeMetaData.getState();
if (nodeState == NodeState.IN_SERVICE) {
@@ -403,43 +472,44 @@
// known but outdated node (master cleans this up)
}
}
- table.setHeader("Name (" + inServiceNodeCount + "/" + table.rowSize() + " nodes connected)", "Start time", "State");
+ table.setHeader("Name (" + inServiceNodeCount + "/" + numNodes + " nodes connected)", "Start time", "State");
System.out.println(table.toString());
}
public void listIndex(boolean detailedView) throws KattaException, IOException {
final Table table;
if (!detailedView) {
- table = new Table(new String[] { "Name", "Status", "Path", "Shards", "Documents", "Size" });
+ table = new Table(new String[] { "Name", "Status", "Path", "Shards", "Size", "Disk Usage" });
} else {
- table = new Table(new String[] { "Name", "Status", "Path", "Shards", "Documents", "Size", "Replication" });
+ table = new Table(new String[] { "Name", "Status", "Path", "Shards", "Size", "Disk Usage",
+ "Replication" });
}
- final List<String> indexes = _zkClient.getChildren(ZkPathes.INDEXES);
+ final List<String> indexes = _zkClient.getChildren(_conf.getZKIndicesPath());
for (final String index : indexes) {
- String indexZkPath = ZkPathes.getIndexPath(index);
+ String indexZkPath = _conf.getZKIndexPath(index);
final IndexMetaData metaData = new IndexMetaData();
_zkClient.readData(indexZkPath, metaData);
String state = metaData.getState().toString();
List<String> shards = _zkClient.getChildren(indexZkPath);
- int docCount = calculateDocCount(shards);
- long indexSize = calculateIndexSize(metaData.getPath());
+ int size = calculateIndexSize(shards);
+ long indexBytes = calculateIndexDiskUsage(metaData.getPath());
if (!detailedView) {
- table.addRow(index, state, metaData.getPath(), shards.size(), docCount, indexSize);
+ table.addRow(index, state, metaData.getPath(), shards.size(), size, indexBytes);
} else {
- table.addRow(index, state, metaData.getPath(), shards.size(), docCount, indexSize, metaData
- .getReplicationLevel());
+ table.addRow(index, state, metaData.getPath(), shards.size(), size, indexBytes,
+ metaData.getReplicationLevel());
}
}
- if (table.rowSize() > 0) {
+ if (!indexes.isEmpty()) {
System.out.println(table.toString());
}
System.out.println(indexes.size() + " registered indexes");
System.out.println();
}
- private long calculateIndexSize(String index) throws IOException {
+ private long calculateIndexDiskUsage(String index) throws IOException {
Path indexPath = new Path(index);
URI indexUri = indexPath.toUri();
FileSystem fileSystem = FileSystem.get(indexUri, new Configuration());
@@ -449,21 +519,29 @@
return fileSystem.getContentSummary(indexPath).getLength();
}
- private int calculateDocCount(List<String> shards) throws KattaException {
+ private int calculateIndexSize(List<String> shards) throws KattaException {
int docCount = 0;
for (String shard : shards) {
- List<String> deployedShards = _zkClient.getChildren(ZkPathes.getShard2NodeRootPath(shard));
+ List<String> deployedShards = _zkClient.getChildren(_conf.getZKShardToNodePath(shard));
if (!deployedShards.isEmpty()) {
DeployedShard deployedShard = new DeployedShard();
- _zkClient.readData(ZkPathes.getShard2NodePath(shard, deployedShards.get(0)), deployedShard);
- docCount += Integer.parseInt(deployedShard.getMetaData().get(LuceneNode.NUM_OF_DOCS));
+ _zkClient.readData(_conf.getZKShardToNodePath(shard, deployedShards.get(0)), deployedShard);
+ int count = 0;
+ if (deployedShard.getMetaData() != null) {
+ try {
+ count = Integer.parseInt(deployedShard.getMetaData().get(INodeManaged.SHARD_SIZE_KEY));
+ } catch (NumberFormatException e) {
+ }
+ }
+ docCount += count;
}
}
return docCount;
}
- public void addIndex(final String name, final String path, final int replicationLevel) throws KattaException {
- final String indexZkPath = ZkPathes.getIndexPath(name);
+ public void addIndex(final String name, final String path, final int replicationLevel)
+ throws KattaException {
+ final String indexZkPath = _conf.getZKIndexPath(name);
if (name.trim().equals("*")) {
printError("Index with name " + name + " isn't allowed.");
return;
@@ -505,11 +583,11 @@
if (hadoopSiteXml != null) {
if (!hadoopSiteXml.exists()) {
throw new IllegalArgumentException("given hadoop-site.xml '" + hadoopSiteXml.getAbsolutePath()
- + "' does not exists");
+ + "' does not exists");
}
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
SymlinkResourceLoader classLoader = new SymlinkResourceLoader(contextClassLoader, "hadoop-site.xml",
- hadoopSiteXml);
+ hadoopSiteXml);
Thread.currentThread().setContextClassLoader(classLoader);
}
IndexMergeApplication indexMergeApplication = new IndexMergeApplication(_zkClient);
@@ -521,7 +599,7 @@
}
public static void search(final String[] indexNames, final String queryString, final int count) throws KattaException {
- final IClient client = new Client();
+ final ILuceneClient client = new LuceneClient();
final IQuery query = new Query(queryString);
final long start = System.currentTimeMillis();
final Hits hits = client.search(query, indexNames, count);
@@ -537,7 +615,7 @@
}
public static void search(final String[] indexNames, final String queryString) throws KattaException {
- final IClient client = new Client();
+ final ILuceneClient client = new LuceneClient();
final IQuery query = new Query(queryString);
final long start = System.currentTimeMillis();
final int hitsSize = client.count(query, indexNames);
@@ -560,42 +638,43 @@
System.err.println("\tlistIndexes [-d]\tLists all indexes. -d for detailed view.");
System.err.println("\tlistNodes\t\tLists all nodes.");
System.err.println("\tstartMaster\t\tStarts a local master.");
- System.err.println("\tstartNode\t\tStarts a local node.");
+ System.err.println("\tstartNode [server classname]\t\tStarts a local node.");
System.err.println("\tstartLoadTestNode\tStarts a load test node.");
System.err.println("\tstartLoadTest <nodes> <start-query-rate> <end-query-rate> <step> <test-duration-ms> <index-name> <query-file> <max hits>");
System.err.println("\t\t\t\tStarts a load test. The query rate is in queries per second.");
- System.err.println("\tshowStructure\t\tShows the structure of a Katta installation.");
+ System.err.println("\tshowStructure [-all]\t\tShows the structure of a Katta installation.");
System.err.println("\tcheck\t\t\tAnalyze index/shard/node status.");
System.err.println("\tversion\t\t\tPrint the version.");
- System.err.println("\taddIndex <index name> <path to index> [<replication level>]");
- System.err.println("\t\t\t\tAdd an index to a Katta installation.");
- System.err.println("\tremoveIndex <index name>");
- System.err.println("\t\t\t\tRemove an index from a Katta installation.");
- System.err.println("\tredeployIndex <index name>");
- System.err.println("\t\t\t\tUndeploys and deploys an index.");
- System.err.println("\tmergeIndexes [-indexes <index1,index2>] [-hadoopSiteXml <siteXmlPath>]");
- System.err.println("\t\t\t\tMergers all or the specified indexes.");
- System.err.println("\tlistErrors <index name>\tLists all deploy errors for a specified index.");
- System.err.println("\tsearch <index name>[,<index name>,...] \"<query>\" [count]");
System.err
- .println("\t\t\t\tSearch in supplied indexes. The query should be in \". If you supply a result count hit details will be printed. To search in all indices write \"*\"");
- System.err.println("\tindex <inputTextFile> <outputPath> <numOfWordsPerDoc> <numOfDocuments>");
- System.err.println("\t\t\t\tGenerates a sample index. The inputTextFile is used as dictionary.");
-
+ .println("\taddIndex <index name> <path to index> [<replication level>]\tAdd a index to a Katta installation.");
+ System.err.println("\tremoveIndex <index name>\tRemove a index from a Katta installation.");
+ System.err.println("\tsetState <index name> <state>\tOverwrite the state of an index.");
+ System.err.println("\tredeployIndex <index name>\tUndeploys and deploys an index.");
+ System.err
+ .println("\tmergeIndexes [-indexes <index1,index2>] [-hadoopSiteXml <siteXmlPath>]\tmerges all or the specified indexes.");
+ System.err.println("\tlistErrors <index name>\t\tLists all deploy errors for a specified index.");
+ System.err
+ .println("\tsearch <index name>[,<index name>,...] \"<query>\" [count]\tSearch in supplied indexes. " +
+ "The query should be in \". If you supply a result count hit details will be printed. " +
+ "To search in all indices write \"*\". This uses the client type LuceneClient.");
+ System.err
+ .println("\tindex <inputTextFile> <outputPath> <numOfWordsPerDoc> <numOfDocuments> \tGenerates a sample index. " +
+ "The inputTextFile is used as dictionary.");
+
System.err.println();
System.exit(1);
}
private static class Table {
private String[] _header;
- private final List<Object[]> _rows = new ArrayList<Object[]>();
+ private final List<String[]> _rows = new ArrayList<String[]>();
public Table(final String... header) {
_header = header;
}
+ /** Set the header later by calling setHeader() */
public Table() {
- // setting header later
}
public void setHeader(String... header) {
@@ -603,38 +682,44 @@
}
public void addRow(final Object... row) {
- _rows.add(row);
+ String[] strs = new String[row.length];
+ for (int i=0; i<row.length; i++) {
+ strs[i] = row[i] != null ? row[i].toString() : "";
+ }
+ _rows.add(strs);
}
- public int rowSize() {
- return _rows.size();
- }
-
- @Override
public String toString() {
final StringBuilder builder = new StringBuilder();
- builder.append("\n");
- final int[] columnSizes = getColumnSizes(_header, _rows);
+ final int[] columnSizes = getColumnSizes();
int rowWidth = 0;
for (final int columnSize : columnSizes) {
- rowWidth += columnSize + 2;
+ rowWidth += columnSize;
}
- // header
+ rowWidth += 2 + (Math.max(0, columnSizes.length - 1) * 3) + 2;
+ builder.append("\n" + getChar(rowWidth, "-") + "\n");
+ // Header.
builder.append("| ");
+ String leftPad = "";
for (int i = 0; i < _header.length; i++) {
final String column = _header[i];
- builder.append(column + getChar(columnSizes[i] - column.length(), " ") + " | ");
+ builder.append(leftPad);
+ builder.append(column + getChar(columnSizes[i] - column.length(), " "));
+ leftPad = " | ";
}
- builder.append("\n=");
- builder.append(getChar(rowWidth + columnSizes.length, "=") + "\n");
-
+ builder.append(" |\n");
+ builder.append(getChar(rowWidth, "=") + "\n");
+ // Rows.
for (final Object[] row : _rows) {
builder.append("| ");
+ leftPad = "";
for (int i = 0; i < row.length; i++) {
- builder.append(row[i] + getChar(columnSizes[i] - row[i].toString().length(), " ") + " | ");
+ builder.append(leftPad);
+ builder.append(row[i]);
+ builder.append(getChar(columnSizes[i] - row[i].toString().length(), " "));
+ leftPad = " | ";
}
- builder.append("\n-");
- builder.append(getChar(rowWidth + columnSizes.length, "-") + "\n");
+ builder.append(" |\n" + getChar(rowWidth, "-") + "\n");
}
return builder.toString();
@@ -648,12 +733,12 @@
return spaces;
}
- private int[] getColumnSizes(final String[] header, final List<Object[]> rows) {
- final int[] sizes = new int[header.length];
+ private int[] getColumnSizes() {
+ final int[] sizes = new int[_header.length];
for (int i = 0; i < sizes.length; i++) {
- int min = header[i].length();
- for (final Object[] row : rows) {
- int rowLength = row[i].toString().length();
+ int min = _header[i].length();
+ for (final String[] row : _rows) {
+ int rowLength = row[i].length();
if (rowLength > min) {
min = rowLength;
}
@@ -665,6 +750,32 @@
}
}
+ private void setState(String index, String stateName) throws KattaException {
+ IndexState state = null;
+ for (IndexState s : IndexState.values()) {
+ if (s.name().toLowerCase().equals(stateName.toLowerCase())) {
+ state = s;
+ break;
+ }
+ }
+ if (state == null) {
+ String err = "Index state " + stateName + " unknown. Valid values are: ";
+ String sep = "";
+ for (IndexState s : IndexState.values()) {
+ err += sep;
+ err += s.name();
+ sep = ", ";
+ }
+ System.err.println(err);
+ return;
+ }
+ IndexMetaData indexMetaData = new IndexMetaData();
+ _zkClient.readData(_conf.getZKIndexPath(index), indexMetaData);
+ indexMetaData.setState(state, "");
+ _zkClient.writeData(_conf.getZKIndexPath(index), indexMetaData);
+ System.out.println("Updated state of index " + index + " to DEPLOYED");
+ }
+
private static class CounterMap<K> {
private Map<K, AtomicInteger> _counterMap = new HashMap<K, AtomicInteger>();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jo...@us...> - 2009-07-01 20:39:36
|
Revision: 475
http://katta.svn.sourceforge.net/katta/?rev=475&view=rev
Author: joa23
Date: 2009-07-01 20:39:30 +0000 (Wed, 01 Jul 2009)
Log Message:
-----------
KATTA-54, makes katta more generic. ATTENTION this is a experimental commit, that makes Katta completely unstable.
The test suite is broken and functionality commented out.
Modified Paths:
--------------
trunk/bin/katta-config.sh
trunk/conf/katta.ec2.properties.template
trunk/conf/katta.node.properties
trunk/conf/katta.zk.properties
trunk/extras/benchmark/bin/launch-cluster
Added Paths:
-----------
trunk/doc/
trunk/doc/custom_server.txt
trunk/extras/benchmark/bin/image/
trunk/extras/benchmark/bin/image/create-java-image-remote
trunk/extras/benchmark/bin/image/ec2-run-user-data
trunk/extras/benchmark/src/
trunk/extras/benchmark/src/main/
trunk/extras/benchmark/src/main/scripts/
trunk/extras/benchmark/src/main/scripts/grinder.properties
trunk/extras/benchmark/src/main/scripts/grinderStart.jy
Modified: trunk/bin/katta-config.sh
===================================================================
--- trunk/bin/katta-config.sh 2009-07-01 19:51:50 UTC (rev 474)
+++ trunk/bin/katta-config.sh 2009-07-01 20:39:30 UTC (rev 475)
@@ -39,7 +39,7 @@
# the root of the Katta installation
export KATTA_HOME=`dirname "$this"`/..
-#check to see if the conf dir is given as an optional argument
+# check to see if the conf dir is given as an optional argument
if [ $# -gt 1 ]
then
if [ "--config" = "$1" ]
@@ -54,7 +54,7 @@
# Allow alternate conf dir location.
KATTA_CONF_DIR="${KATTA_CONF_DIR:-$KATTA_HOME/conf}"
-# check to see it is specified whether to use the nodes or the
+#check to see it is specified whether to use the nodes or the
# masters file
if [ $# -gt 1 ]
then
Modified: trunk/conf/katta.ec2.properties.template
===================================================================
--- trunk/conf/katta.ec2.properties.template 2009-07-01 19:51:50 UTC (rev 474)
+++ trunk/conf/katta.ec2.properties.template 2009-07-01 20:39:30 UTC (rev 475)
@@ -11,4 +11,4 @@
#path to the scret key for the configured keyPairName
aws.keyPath=
# base image, an ec2 image id that at least has java installed.
-aws.aim=ami-45e7002c
\ No newline at end of file
+aws.aim=ami-45e7002c
Modified: trunk/conf/katta.node.properties
===================================================================
--- trunk/conf/katta.node.properties 2009-07-01 19:51:50 UTC (rev 474)
+++ trunk/conf/katta.node.properties 2009-07-01 20:39:30 UTC (rev 475)
@@ -1,4 +1,4 @@
# the start port to try
node.server.port.start = 20000
# local folder on node where shards will be stored during serving
-node.shard.folder=/tmp/katta-shards
\ No newline at end of file
+node.shard.folder=/tmp/katta-shards
Modified: trunk/conf/katta.zk.properties
===================================================================
--- trunk/conf/katta.zk.properties 2009-07-01 19:51:50 UTC (rev 474)
+++ trunk/conf/katta.zk.properties 2009-07-01 20:39:30 UTC (rev 475)
@@ -1,5 +1,6 @@
# Starts zookeeper embedded in the master jvm.
-# We suggest to run zookeeper standalone in large installations, see http://hadoop.apache.org/zookeeper/docs/r3.1.1/zookeeperAdmin.html
+# We suggest you run zookeeper standalone in large installations
+# See http://hadoop.apache.org/zookeeper/docs/r3.1.1/zookeeperAdmin.html
zookeeper.embedded=true
# Comma serperated list of host:port of zookeeper servers used by the zookeeper clients.
@@ -8,6 +9,10 @@
# If found an embedded zookeeper server is started within the master or secondary master jvm.
zookeeper.servers=localhost:2181
+#
+# The root zk node. This changes with every new cloud.
+#
+zookeeper.root-path = /katta
#zookeeper client timeout in milliseconds
zookeeper.timeout=5000
#zookeeper tick time
@@ -21,4 +26,5 @@
# zookeeper folder where log data are stored
zookeeper.log-data-dir=./zookeeper-log-data
# zookeeper client port
-zookeeper.clientPort=2182
\ No newline at end of file
+zookeeper.clientPort=2182
+
Added: trunk/doc/custom_server.txt
===================================================================
--- trunk/doc/custom_server.txt (rev 0)
+++ trunk/doc/custom_server.txt 2009-07-01 20:39:30 UTC (rev 475)
@@ -0,0 +1,215 @@
+HOW TO CREATE A CUSTOM SERVER USING KATTA
+=========================================
+
+Introduction
+------------
+
+ This guide explains how to use Katta to write a new kind of client/server on top of Katta.
+Katta will manage loading of shards, pools of nodes, maintaining replication levels etc...
+The domain specific work is done by custom client and server classes.
+ Katta ships with Lucene and MapFile clients and servers. If these do not meet
+your needs then you must to write your own client and server. This guide describes one
+sequence of steps to achieve this, but you do not have to do it in this order.
+
+The Problem
+-----------
+
+ For this example our problem is grep-ing a large file for regular expressions.
+We started off just using grep, but the file size has grown to the point where latency is too
+large. So we need to cut the file up into pieces and distribute the work across a pool of
+nodes. Katta to the rescue!
+ Specifically, our data set is an ordered list of newline terminated strings. Our search
+key is a regular expression, and the result is a list of strings that match the regular
+expression, in the order they occur in the original list.
+
+Katta Infrastructure
+--------------------
+
+ A Katta index is a directory containing nothing but "shards" (directories). The name is chosed at
+adIndex time (below). We will use "grep" as our index name. The name of the index is used as part of the shard names.
+
+ You can have multiple shards per node, and this allows for a more even distribution as
+the number of nodes changes. We decide on 10 nodes and 100 shards.
+
+ So we create a directory "grep_v1", and subdirectories "shard00" .."shard99". The final shard
+names will then be "grep_shard00" .. "grep_shard99". We will be sorting based on shard name
+in the client, so we zero-pad the numbers.
+
+ The contents of the shard directories are totally up to you. Katta does not care, it just
+copies the directories to the server nodes and notifies them where the directories are. So we cut
+the input file into 100 equal sized pieces and place them each shard directory as the file "data.txt".
+
+ See the Katta documentation for deployment options to make the index available to the nodes.
+It is common to upload to a Hadoop file system, but this is not required.
+
+ Also see Katta documentation for setting up Zookeeper. At this point we will assume we have in
+place one or more Zookeeper nodes, 10 server machines, one or more client machines, and one machine
+to add the index (you can do this on any of the other machines, or use a separate machine). Now we
+need the custom software.
+
+Custom Server
+-------------
+
+ We start by defining an interface for our server. These methods will be called by our custom
+client classes, not external users, so no one but us will see them. Hadoop RPC is used, so
+all the data must be Writable. We only need 1 method, which takes a String as input. But to be
+Writable, we use Text. In order to enable sorting in the client, we need to return the shard names,
+and the list of strings for each shard. So we return a map of Text to list of Text:
+
+ interface IGrepServer extends VersionedProtocol {
+
+ public Map<Text, List<Text>> grep(Text regex, String[] shards) throws Exception;
+
+ }
+
+ The VersionedProtocol is required because in the client Hadoop RPC returns proxy objects of that type.
+
+ Due to replication and retries, the list of shards may be a subset of the shards on each node.
+In the simple no replication case, each node will get exactly 1 call with its full set of shards
+listed. No retries will occur.
+
+ In the client, we will use this interface to look up the grep() method. All of our RPC
+calls from the client to the server nodes are listed in this interface.
+
+ Now we are ready to write our actual server class. This class must implement our IGrepServer
+interface and also the INodeManaged interface. INodeManaged lists all the calls our server will
+get from the Katta Node class (start using shard, stop using shard, shutdown). To keep things
+simple we extend AbstractServer, which implements this interface and maintains the list of
+shards (directories) for us. It also takes care of VersionedProtocol (always returns version 0).
+
+ public class GrepServer extends AbstractServer implements IGrepServer {
+
+ public Map<Text, List<Text>> grep(Text regex, String[] shards) throws Exception {
+ Map<Text, List<Text>> result = new HashMap<Text, List<Text>>();
+ for (String shard : shards) {
+ result.put(new Text(shard), grepShard(regex.toString(), _shards.get(shard)));
+ }
+ return result;
+ }
+
+ private List<Text> grepShard(String regex, File shard) {
+ File dataFile = new File(shard, "data.txt");
+ // Grep file using Scanner class.
+ }
+
+ public Map<String, String> getShardMetaData(String shardName) throws Exception {
+ return null;
+ }
+
+ public void shutdown() throws IOException {
+ }
+
+ }
+
+
+ You could search the N shards in parallel if you wish.
+
+
+Custom Client
+-------------
+
+ To outside users, we want to hide the fact that we use Katta. So we define a client interface. This is
+ not used or required by Katta but is good practice.
+
+ public interface IGrepClient {
+
+ public List<String> grep(String regex) throws Exception;
+
+ }
+
+ The actual client class creates an instance of Client, uses it's broadcast() method to get a
+result set, then throws an exception if any occurred. If not, it appends all the per-shard lists
+in alphabetical shard order. This guarantees that the final list is in the right order. You could
+pass line numbers explicitly in your data, but we wanted to keep it simple for this example.
+
+ public class GrepClient implements IGrepClient {
+
+ private Client client;
+ private Method method;
+
+ public GrepClient() throws KattaException {
+ client = new Client(IGrepServer.class);
+ try {
+ method = IGrepClient.class.getMethod("grep", String.class, String[].class);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("Missing method");
+ }
+ }
+
+ public List<String> grep(String regex) throws KattaException {
+ // Call Katta.
+ ClientResult<Map<Text, List<Text>>> clientResult = client.broadcastToAll(3000, true, method, 1, new Text(regex), null);
+ // If there were any errors, pick one and throw it.
+ if (clientResult.isError()) {
+ throw clientResult.getKattaException();
+ }
+ // Combine into 1 big map of shard --> strings, sorted by shard name.
+ Map<Text, List<Text>> combined = new TreeMap<Text, List<Text>>();
+ for (Map<Text, List<Text>> result : clientResult.getResults()) {
+ combined.putAll(result);
+ }
+ // Build results.
+ List<String> strings = new ArrayList<String>();
+ for (List<Text> texts : combined.values()) {
+ for (Text text : texts) {
+ strings.add(texts.toString());
+ }
+ }
+ return strings;
+ }
+
+ }
+
+
+Things To Notice
+----------------
+
+ The only server class used by the client is IGrepClient, which contains only the RPC
+methods used by the client.
+ The second argument to grep() is the array of shard names. The last 2 arguments to broadcastToAll() are
+the args to pass into the method. The second argument will be overwritten by Katta, so we pass in null.
+The fourth argument to broadcastToAll() tells Katta which argument (if any) to overwrite (1 is second argument).
+If this is < 0 no overwriting is done.
+ The first two arguments to broadcastToAll say that we are willing to wait up to 3 seconds for complete results
+(data or error for all shards - due to retires this might not be all nodes), and then terminate the call.
+ The ClientResult object keeps track of results on a per-set-of-shards basis. Because we need to know the
+exact shard for each string (for sorting), we pass that inside our result.
+
+
+Starting The Nodes
+------------------
+
+ Make your classes are available on the classpath. One way to to drop your jar in Katt's lib dir.
+Then on all 10 server nodes tell katta to start a node using your server class:
+
+ bin/katta startNode com.foo.GrepServer
+
+
+Deploy The Index
+----------------
+
+ bin/katta addIndex grep <index-location> <replication level>
+
+ Add an index with the name "grep". In this example the location would end in "/grep_v1", but the deployed
+index name is "grep". The replication level should be 1 for no replication.
+
+Using The Client
+----------------
+
+ Now you are ready to use your client:
+
+ System.out.println("Katta references: " + new GrepClient().grep("[Kk]atta");
+
+Conclusion
+----------
+
+ I hope this has made it clear what you need to do to create a custom client/server using Katta, and
+how easy it is. Please see the Katta documentation and discussion groups for more information.
+
+ http://katta.sourceforge.net/
+
+
+
+
+
+
Added: trunk/extras/benchmark/bin/image/create-java-image-remote
===================================================================
--- trunk/extras/benchmark/bin/image/create-java-image-remote (rev 0)
+++ trunk/extras/benchmark/bin/image/create-java-image-remote 2009-07-01 20:39:30 UTC (rev 475)
@@ -0,0 +1,64 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Creates a simple image with java installed.
+
+# Import variables
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/benchmark-ec2-env.sh
+
+# Remove environment script since it contains sensitive information
+rm -f "$bin"/benchmark-ec2-env.sh
+
+# Install Java
+echo "Downloading and installing java binary."
+cd /usr/local
+wget -nv -O java.bin $JAVA_BINARY_URL
+sh java.bin
+rm -f java.bin
+
+# Install tools
+echo "Installing rpms."
+yum -y install rsync lynx screen ganglia-gmetad ganglia-gmond ganglia-web dejavu-fonts httpd php
+yum -y clean all
+
+# Run user data as script on instance startup
+chmod +x /etc/init.d/ec2-run-user-data
+echo "/etc/init.d/ec2-run-user-data" >> /etc/rc.d/rc.local
+
+# Setup root user bash environment
+echo "export JAVA_HOME=/usr/local/jdk${JAVA_VERSION}" >> /root/.bash_profile
+echo 'export PATH=$JAVA_HOME/bin:$PATH' >> /root/.bash_profile
+
+# Configure networking.
+# Delete SSH authorized_keys since it includes the key it was launched with. (Note that it is re-populated when an instance starts.)
+rm -f /root/.ssh/authorized_keys
+# Ensure logging in to new hosts is seamless.
+echo ' StrictHostKeyChecking no' >> /etc/ssh/ssh_config
+
+# Bundle and upload image
+cd ~root
+# Don't need to delete .bash_history since it isn't written until exit.
+df -h
+ec2-bundle-vol -d /mnt -k /mnt/pk*.pem -c /mnt/cert*.pem -u $AWS_ACCOUNT_ID -s 3072 -p jdk-$JAVA_VERSION-$ARCH -r $ARCH
+
+ec2-upload-bundle -b $S3_BUCKET -m /mnt/jdk-$JAVA_VERSION-$ARCH.manifest.xml -a $AWS_ACCESS_KEY_ID -s $AWS_SECRET_ACCESS_KEY
+
+# End
+echo Done
Added: trunk/extras/benchmark/bin/image/ec2-run-user-data
===================================================================
--- trunk/extras/benchmark/bin/image/ec2-run-user-data (rev 0)
+++ trunk/extras/benchmark/bin/image/ec2-run-user-data 2009-07-01 20:39:30 UTC (rev 475)
@@ -0,0 +1,51 @@
+#!/bin/bash
+#
+# ec2-run-user-data - Run instance user-data if it looks like a script.
+#
+# Only retrieves and runs the user-data script once per instance. If
+# you want the user-data script to run again (e.g., on the next boot)
+# then add this command in the user-data script:
+# rm -f /var/ec2/ec2-run-user-data.*
+#
+# History:
+# 2008-05-16 Eric Hammond <eha...@th...>
+# - Initial version including code from Kim Scheibel, Jorge Oliveira
+# 2008-08-06 Tom White
+# - Updated to use mktemp on fedora
+#
+
+prog=$(basename $0)
+logger="logger -t $prog"
+curl="curl --retry 3 --silent --show-error --fail"
+instance_data_url=http://169.254.169.254/2008-02-01
+
+# Wait until networking is up on the EC2 instance.
+perl -MIO::Socket::INET -e '
+ until(new IO::Socket::INET("169.254.169.254:80")){print"Waiting for network...\n";sleep 1}
+' | $logger
+
+# Exit if we have already run on this instance (e.g., previous boot).
+ami_id=$($curl $instance_data_url/meta-data/ami-id)
+been_run_file=/var/ec2/$prog.$ami_id
+mkdir -p $(dirname $been_run_file)
+if [ -f $been_run_file ]; then
+ $logger < $been_run_file
+ exit
+fi
+
+# Retrieve the instance user-data and run it if it looks like a script
+user_data_file=`mktemp -t ec2-user-data.XXXXXXXXXX`
+chmod 700 $user_data_file
+$logger "Retrieving user-data"
+$curl -o $user_data_file $instance_data_url/user-data 2>&1 | $logger
+if [ ! -s $user_data_file ]; then
+ $logger "No user-data available"
+elif head -1 $user_data_file | egrep -v '^#!'; then
+ $logger "Skipping user-data as it does not begin with #!"
+else
+ $logger "Running user-data"
+ echo "user-data has already been run on this instance" > $been_run_file
+ $user_data_file 2>&1 | logger -t "user-data"
+ $logger "user-data exit code: $?"
+fi
+rm -f $user_data_file
Modified: trunk/extras/benchmark/bin/launch-cluster
===================================================================
--- trunk/extras/benchmark/bin/launch-cluster 2009-07-01 19:51:50 UTC (rev 474)
+++ trunk/extras/benchmark/bin/launch-cluster 2009-07-01 20:39:30 UTC (rev 475)
@@ -146,4 +146,4 @@
done
echo ""
-echo "Cluster started."
\ No newline at end of file
+echo "Cluster started."
Added: trunk/extras/benchmark/src/main/scripts/grinder.properties
===================================================================
--- trunk/extras/benchmark/src/main/scripts/grinder.properties (rev 0)
+++ trunk/extras/benchmark/src/main/scripts/grinder.properties 2009-07-01 20:39:30 UTC (rev 475)
@@ -0,0 +1,189 @@
+#
+# Sample grinder.properties
+#
+#
+# The properties can be specified in three ways.
+# - In the console. A properties file in the distribution directory
+# can be selected in the console.
+# - As a Java system property in the command line used to start the
+# agent. (e.g. java -Dgrinder.threads=20 net.grinder.Grinder).
+# - In an agent properties file. A local properties file named
+# "grinder.properties" can be placed in the working directory of
+# each agent, or a different file name passed as an agent command
+# line argument.
+#
+# Properties present in a console selected file take precedence over
+# agent command line properties, which in turn override those in
+# an agent properties file.
+#
+# Any line which starts with a ; (semi-colon) or a # (hash) is a
+# comment and is ignored. In this example we will use a # for
+# commentary and a ; for parts of the config file that you may wish to
+# enable
+#
+# Please refer to
+# http://net.grinder.sourceforge.net/g3/properties.html for further
+# documentation.
+
+
+#
+# Commonly used properties
+#
+
+# The file name of the script to run.
+#
+# Relative paths are evaluated from the directory containing the
+# properties file. The default is "grinder.py".
+grinder.script = helloworld.py
+
+# The number of worker processes each agent should start. The default
+# is 1.
+grinder.processes = 1
+
+# The number of worker threads each worker process should start. The
+# default is 1.
+grinder.threads = 1
+
+# The number of runs each worker process will perform. When using the
+# console this is usually set to 0, meaning "run until the console
+# sneds a stop or reset signal". The default is 1.
+grinder.runs = 0
+
+# The IP address or host name that the agent and worker processes use
+# to contact the console. The default is all the network interfaces
+# of the local machine.
+; grinder.consoleHost = consolehost
+
+# The IP port that the agent and worker processes use to contact the
+# console. Defaults to 6372.
+; grinder.consolePort
+
+
+
+#
+# Less frequently used properties
+#
+
+
+### Logging ###
+
+# The directory in which worker process logs should be created. If not
+# specified, the agent's working directory is used.
+grinder.logDirectory = log
+
+# The number of archived logs from previous runs that should be kept.
+# The default is 1.
+grinder.numberOfOldLogs = 2
+
+# Overrides the "host" string used in log filenames and logs. The
+# default is the host name of the machine running the agent.
+; grinder.hostID = myagent
+
+# Set to false to disable the logging of output and error steams for
+# worker processes. You might want to use this to reduce the overhead
+# of running a client thread. The default is true.
+; grinder.logProcessStreams = false
+
+
+### Script sleep time ####
+
+# The maximum time in milliseconds that each thread waits before
+# starting. Unlike the sleep times specified in scripts, this is
+# varied according to a flat random distribution. The actual sleep
+# time will be a random value between 0 and the specified value.
+# Affected by grinder.sleepTimeFactor, but not
+# grinder.sleepTimeVariation. The default is 0ms.
+; grinder.initialSleepTime=500
+
+# Apply a factor to all the sleep times you've specified, either
+# through a property of in a script. Setting this to 0.1 would run the
+# script ten times as fast. The default is 1.
+; grinder.sleepTimeFactor=0.01
+
+# The Grinder varies the sleep times specified in scripts according to
+# a Normal distribution. This property specifies a fractional range
+# within which nearly all (99.75%) of the times will lie. E.g., if the
+# sleep time is specified as 1000 and the sleepTimeVariation is set to
+# 0.1, then 99.75% of the actual sleep times will be between 900 and
+# 1100 milliseconds. The default is 0.2.
+; grinder.sleepTimeVariation=0.005
+
+
+### Worker process control ###
+
+# If set, the agent will ramp up the number of worker processes,
+# starting the number specified every
+# grinder.processesIncrementInterval milliseconds. The upper limit is
+# set by grinder.processes. The default is to start all worker
+# processes together.
+; grinder.processIncrement = 1
+
+# Used in conjunction with grinder.processIncrement, this property
+# sets the interval in milliseconds at which the agent starts new
+# worker processes. The value is in milliseconds. The default is 60000
+# ms.
+; grinder.processIncrementInterval = 10000
+
+# Used in conjunction with grinder.processIncrement, this property
+# sets the initial number of worker processes to start. The default is
+# the value of grinder.processIncrement.
+; process.initialProcesses = 1
+
+# The maximum length of time in milliseconds that each worker process
+# should run for. grinder.duration can be specified in conjunction
+# with grinder.runs, in which case the worker processes will terminate
+# if either the duration time or the number of runs is exceeded. The
+# default is to run forever.
+; grinder.duration = 60000
+
+# If set to true, the agent process spawns engines in threads rather
+# than processes, using special class loaders to isolate the engines.
+# This allows the engine to be easily run in a debugger. This is
+# primarily a tool for debugging The Grinder engine, but it might also
+# be useful to advanced users. The default is false.
+; grinder.debug.singleprocess = true
+
+
+### Java ###
+
+# Use an alternate JVM for worker processes. The default is "java" so
+# you do not need to specify this if java is in your PATH.
+; grinder.jvm = /opt/jrockit/jrockit-R27.5.0-jdk1.5.0_14/bin/java
+
+# Use to adjust the classpath used for the worker process JVMs.
+# Anything specified here will be prepended to the classpath used to
+# start the Grinder processes.
+; grinder.jvm.classpath = /tmp/myjar.jar
+
+# Additional arguments to worker process JVMs.
+; grinder.jvm.arguments = -Dpython.cachedir=/tmp
+
+
+### Console communications ###
+
+# (See above for console address properties).
+
+# If you are not using the console, and don't want the agent to try to
+# contact it, set grinder.useConsole = false. The default is true.
+; grinder.useConsole = false
+
+# The period at which each process sends updates to the console. This
+# also controls the frequency at which the data files are flushed.
+# The default is 500 ms.
+; grinder.reportToConsole.interval = 100
+
+
+### Statistics ###
+
+# Set to false to disable reporting of timing information to the
+# console; other statistics are still reported. See
+# http://grinder.sourceforge.net/faq.html#timing for why you might
+# want to do this. The default is true.
+; grinder.reportTimesToConsole = false
+
+# If set to true, System.nanoTime() is used for measuring time instead
+# of System.currentTimeMills(). The Grinder will still report times in
+# milliseconds. The precision of these methods depends on the JVM
+# implementation and the operating system. Setting to true requires
+# J2SE 5 or later. The default is false.
+; grinder.useNanoTime = true
Added: trunk/extras/benchmark/src/main/scripts/grinderStart.jy
===================================================================
--- trunk/extras/benchmark/src/main/scripts/grinderStart.jy (rev 0)
+++ trunk/extras/benchmark/src/main/scripts/grinderStart.jy 2009-07-01 20:39:30 UTC (rev 475)
@@ -0,0 +1,34 @@
+from net.grinder.common import GrinderProperties
+from net.grinder.console.client import ConsoleConnectionFactory
+from net.grinder.console.client import ConsoleConnectionException
+from java.lang import Thread
+
+
+print "Hello, World!"
+# Connect to the console
+connection = ConsoleConnectionFactory().connect("localhost", 6372)
+
+gp = GrinderProperties()
+gp.setInt("grinder.runs", 0)
+gp.setInt("grinder.threads", 3)
+gp.setInt("grinder.processes", 3)
+gp.setProperty("grinder.script", "/opt/grinder-3.1/scripts/katta-stress-test.jy")
+gp.setProperty("grinder.consoleHost", "domU-12-31-39-03-7C-92.compute-1.internal");
+# Do initialisation here.
+
+connection.startWorkerProcesses(gp)
+
+# Unfortunately, the console API doesn't provide a way to determine
+# how many worker processes are running. Instead, we run for a minute,
+# stop recording, then tidy up.
+
+# Run for a minute
+Thread.sleep(60000)
+
+connection.stopRecording()
+
+connection.resetWorkerProcesses()
+
+# Do tidy up here.
+
+# start java -cp ../lib/jython.jar:../lib/grinder.jar org.python.util.jython grinderStart.jy
\ No newline at end of file
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jo...@us...> - 2009-07-01 19:52:13
|
Revision: 474
http://katta.svn.sourceforge.net/katta/?rev=474&view=rev
Author: joa23
Date: 2009-07-01 19:51:50 +0000 (Wed, 01 Jul 2009)
Log Message:
-----------
copy trunk into a branch before we apply KATTA-54 patch.
Added Paths:
-----------
branches/r468-before-KATTA-54/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jo...@us...> - 2009-06-27 05:45:54
|
Revision: 473
http://katta.svn.sourceforge.net/katta/?rev=473&view=rev
Author: joa23
Date: 2009-06-27 05:45:41 +0000 (Sat, 27 Jun 2009)
Log Message:
-----------
also adding objenesis jar as part of mockito dependencies.
Modified Paths:
--------------
trunk/ivy.xml
Added Paths:
-----------
trunk/lib/objenesis-1.0.jar
Modified: trunk/ivy.xml
===================================================================
--- trunk/ivy.xml 2009-06-27 05:18:41 UTC (rev 472)
+++ trunk/ivy.xml 2009-06-27 05:45:41 UTC (rev 473)
@@ -29,7 +29,9 @@
<dependency org="org.jmock" name="jmock" rev="2.4.0" conf="test" />
<dependency org="org.hamcrest" name="hamcrest-library" rev="1.1" conf="test" />
<dependency org="org.hamcrest" name="hamcrest-core" rev="1.1" conf="test" />
+ <dependency org="org.objenesis" name="objenesis" rev="1.0" conf="test" />
+
<dependency org="net.sourceforge.cobertura" name="cobertura" rev="1.9.1" conf="instrument" />
<!-- checkstyle -->
Added: trunk/lib/objenesis-1.0.jar
===================================================================
(Binary files differ)
Property changes on: trunk/lib/objenesis-1.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jo...@us...> - 2009-06-27 05:18:42
|
Revision: 472
http://katta.svn.sourceforge.net/katta/?rev=472&view=rev
Author: joa23
Date: 2009-06-27 05:18:41 +0000 (Sat, 27 Jun 2009)
Log Message:
-----------
adding hamcrest-core this used to was pulled in with the mockito maven repo as dependency.
Modified Paths:
--------------
trunk/ivy.xml
Added Paths:
-----------
trunk/lib/hamcrest-core-1.1.jar
Modified: trunk/ivy.xml
===================================================================
--- trunk/ivy.xml 2009-06-27 05:15:07 UTC (rev 471)
+++ trunk/ivy.xml 2009-06-27 05:18:41 UTC (rev 472)
@@ -28,6 +28,7 @@
<dependency org="org.mockito" name="mockito-core" rev="1.7" conf="test" />
<dependency org="org.jmock" name="jmock" rev="2.4.0" conf="test" />
<dependency org="org.hamcrest" name="hamcrest-library" rev="1.1" conf="test" />
+ <dependency org="org.hamcrest" name="hamcrest-core" rev="1.1" conf="test" />
<dependency org="net.sourceforge.cobertura" name="cobertura" rev="1.9.1" conf="instrument" />
Added: trunk/lib/hamcrest-core-1.1.jar
===================================================================
(Binary files differ)
Property changes on: trunk/lib/hamcrest-core-1.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jo...@us...> - 2009-06-27 05:15:12
|
Revision: 471
http://katta.svn.sourceforge.net/katta/?rev=471&view=rev
Author: joa23
Date: 2009-06-27 05:15:07 +0000 (Sat, 27 Jun 2009)
Log Message:
-----------
remove maven repos from ivy configuration and add last missing libraries into lib folder
Modified Paths:
--------------
trunk/CHANGES.txt
trunk/ivy/ivysettings.xml
trunk/ivy.xml
Added Paths:
-----------
trunk/lib/cobertura-1.9.1.jar
trunk/lib/mockito-core-1.7.jar
Modified: trunk/CHANGES.txt
===================================================================
--- trunk/CHANGES.txt 2009-06-16 07:22:33 UTC (rev 470)
+++ trunk/CHANGES.txt 2009-06-27 05:15:07 UTC (rev 471)
@@ -1,6 +1,7 @@
Katta Change Log
katta 0.6-dev
+ remove maven repos from ivy configuration and add last missing libraries into lib folder
fix KATTA-72, Give better error message when searching an index that doesn't exist.
fix KATTA-67, Default namespace is not created when using external Katta
fix KATTA-66, Update jets3t jar from version 0.5.0 to 0.6.1
Modified: trunk/ivy/ivysettings.xml
===================================================================
--- trunk/ivy/ivysettings.xml 2009-06-16 07:22:33 UTC (rev 470)
+++ trunk/ivy/ivysettings.xml 2009-06-27 05:15:07 UTC (rev 471)
@@ -5,10 +5,6 @@
<filesystem name="libraries">
<artifact pattern="${basedir}/lib/[artifact]-[revision].[type]" />
</filesystem>
- <ibiblio name="ibiblio" m2compatible="true" />
- <url name="test" m2compatible="true">
- <artifact pattern="http://repo1.maven.org/maven2/[organisation]/[module]/[revision]/[artifact]-[revision].[ext]"/>
- </url>
</chain>
</resolvers>
</ivysettings>
\ No newline at end of file
Modified: trunk/ivy.xml
===================================================================
--- trunk/ivy.xml 2009-06-16 07:22:33 UTC (rev 470)
+++ trunk/ivy.xml 2009-06-27 05:15:07 UTC (rev 471)
@@ -29,7 +29,7 @@
<dependency org="org.jmock" name="jmock" rev="2.4.0" conf="test" />
<dependency org="org.hamcrest" name="hamcrest-library" rev="1.1" conf="test" />
- <dependency org="net.sourceforge.cobertura" name="cobertura" rev="1.9" conf="instrument" />
+ <dependency org="net.sourceforge.cobertura" name="cobertura" rev="1.9.1" conf="instrument" />
<!-- checkstyle -->
<dependency org="net.sf.checkstyle" name="checkstyle-all" rev="5.0-beta2" conf="checkstyle" />
Added: trunk/lib/cobertura-1.9.1.jar
===================================================================
(Binary files differ)
Property changes on: trunk/lib/cobertura-1.9.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Added: trunk/lib/mockito-core-1.7.jar
===================================================================
(Binary files differ)
Property changes on: trunk/lib/mockito-core-1.7.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pi...@us...> - 2009-06-16 07:22:35
|
Revision: 470
http://katta.svn.sourceforge.net/katta/?rev=470&view=rev
Author: pipenb
Date: 2009-06-16 07:22:33 +0000 (Tue, 16 Jun 2009)
Log Message:
-----------
KATTA-72, Give better error message when searching an index that wasn't deployed properly or doesn't exist at all.
Modified Paths:
--------------
trunk/CHANGES.txt
trunk/src/main/java/net/sf/katta/client/Client.java
trunk/src/test/java/net/sf/katta/client/ClientTest.java
Modified: trunk/CHANGES.txt
===================================================================
--- trunk/CHANGES.txt 2009-06-12 18:09:54 UTC (rev 469)
+++ trunk/CHANGES.txt 2009-06-16 07:22:33 UTC (rev 470)
@@ -1,6 +1,7 @@
Katta Change Log
katta 0.6-dev
+ fix KATTA-72, Give better error message when searching an index that doesn't exist.
fix KATTA-67, Default namespace is not created when using external Katta
fix KATTA-66, Update jets3t jar from version 0.5.0 to 0.6.1
fix KATTA-71, Katta hangs when deploying index from s3 since HADOOP-4422
Modified: trunk/src/main/java/net/sf/katta/client/Client.java
===================================================================
--- trunk/src/main/java/net/sf/katta/client/Client.java 2009-06-12 18:09:54 UTC (rev 469)
+++ trunk/src/main/java/net/sf/katta/client/Client.java 2009-06-16 07:22:33 UTC (rev 470)
@@ -15,8 +15,29 @@
*/
package net.sf.katta.client;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
import net.sf.katta.index.IndexMetaData;
-import net.sf.katta.node.*;
+import net.sf.katta.node.DocumentFrequencyWritable;
+import net.sf.katta.node.Hit;
+import net.sf.katta.node.Hits;
+import net.sf.katta.node.HitsMapWritable;
+import net.sf.katta.node.IQuery;
+import net.sf.katta.node.ISearch;
+import net.sf.katta.node.QueryWritable;
import net.sf.katta.util.CollectionUtil;
import net.sf.katta.util.KattaException;
import net.sf.katta.util.ZkConfiguration;
@@ -24,6 +45,7 @@
import net.sf.katta.zk.IZkDataListener;
import net.sf.katta.zk.ZKClient;
import net.sf.katta.zk.ZkPathes;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.ipc.RPC;
@@ -33,11 +55,6 @@
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.Query;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.*;
-
/**
* Default implementation of {@link IClient}.
*
@@ -252,7 +269,7 @@
return (float) _queryCount / time;
}
- private Map<String, List<String>> getNode2ShardsMap(final String[] indexNames) throws ShardAccessException {
+ private Map<String, List<String>> getNode2ShardsMap(final String[] indexNames) throws KattaException {
String[] indexesToSearchIn = indexNames;
for (String indexName : indexNames) {
if ("*".equals(indexName)) {
@@ -278,10 +295,14 @@
}
}
- private List<String> getShardsToSearchIn(String[] indexNames) {
+ private List<String> getShardsToSearchIn(String[] indexNames) throws KattaException {
List<String> shards = new ArrayList<String>();
for (String index : indexNames) {
- shards.addAll(_indexToShards.get(index));
+ List<String> shardsForIndex = _indexToShards.get(index);
+ if (shardsForIndex == null) {
+ throw new KattaException("Index '" + index + "' not deployed on any shard.");
+ }
+ shards.addAll(shardsForIndex);
}
return shards;
}
Modified: trunk/src/test/java/net/sf/katta/client/ClientTest.java
===================================================================
--- trunk/src/test/java/net/sf/katta/client/ClientTest.java 2009-06-12 18:09:54 UTC (rev 469)
+++ trunk/src/test/java/net/sf/katta/client/ClientTest.java 2009-06-16 07:22:33 UTC (rev 470)
@@ -20,9 +20,9 @@
import net.sf.katta.AbstractKattaTest;
import net.sf.katta.master.Master;
+import net.sf.katta.node.BaseNode;
import net.sf.katta.node.Hit;
import net.sf.katta.node.Hits;
-import net.sf.katta.node.BaseNode;
import net.sf.katta.testutil.TestResources;
import net.sf.katta.util.KattaException;
@@ -31,7 +31,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.KeywordAnalyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.Query;
@@ -158,6 +157,16 @@
}
}
+ public void testSearchIndexThatDoesntExist() throws ParseException {
+ final Query query = new QueryParser("", new KeywordAnalyzer()).parse("foo: bar");
+ try {
+ _client.search(query, new String[] { "doesNotExist" }, 1);
+ fail("expected exception");
+ } catch (KattaException e) {
+ assertEquals("Index 'doesNotExist' not deployed on any shard.", e.getMessage());
+ }
+ }
+
public void testKatta20SearchLimitMaxNumberOfHits() throws KattaException, ParseException {
final Query query = new QueryParser("", new KeywordAnalyzer()).parse("foo: bar");
final Hits expectedHits = _client.search(query, new String[] { INDEX1 }, 4);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: Peter V. <in...@pe...> - 2009-06-16 05:57:16
|
Hi Alexandre, does the following command give more details? katta listErrors testIndex You could also try to redeploy the index: katta redeployIndex testIndex Or even remove it entirely and deploy it again: katta removeIndex testIndex katta addIndex ... --Peter On 15.06.2009, at 21:13, Alexandre Jaquet wrote: > Hi Peter, > > I type the following command line > > training@training-vm:~/katta$ sh bin/katta listIndexes > $ > | Name | Status | Path | Shards | Documents | > Size | > = > = > = > = > = > = > ====================================================================== > | testIndex | ERROR | src/test/testIndexA/ | 0 | 0 | > 1144893 | > ---------------------------------------------------------------------------- > > 1 registered indexes > > The status doesn't look like a cheerleader and don't know how to fix > that. > > Thx > > 2009/6/15 Peter Voss <in...@pe...> > Hi, > > did you deploy the testIndex? To verify you could run: > katta listIndexes > > You have to deploy that index first. > > I have opened an issue regarding the NPE: > http://oss.101tec.com/jira/browse/KATTA-72 > > --Peter > > > On 15.06.2009, at 20:51, Alexandre Jaquet wrote: > > > Hi, > > > > I didn't build it from the source I simply get the katta-0.5.1 > binary > > > > 2009/6/14 Stefan Groschupf <sg...@10...> > > Hi, > > is that trunk or 0.5.1? > > Stefan > > > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > > Hadoop training and consulting > > http://www.scaleunlimited.com > > http://www.101tec.com > > > > > > > > On Jun 14, 2009, at 2:16 AM, Alexandre Jaquet wrote: > > > > > Hi, > > > > > > I follow the steps described in the main page but once I runt > > > raining@training-vm:~/katta$ sh bin/katta search testIndex > foo:bar 4 > > > > > > I got the following exception : > > > > > > > > > Exception in thread "main" java.lang.NullPointerException > > > at java.util.ArrayList.addAll(ArrayList.java:472) > > > at net.sf.katta.client.Client.getShardsToSearchIn(Client.java: > > 296) > > > at net.sf.katta.client.Client.getNode2ShardsMap(Client.java: > 278) > > > at net.sf.katta.client.Client.search(Client.java:212) > > > at net.sf.katta.client.Client.search(Client.java:205) > > > at net.sf.katta.Katta.search(Katta.java:466) > > > at net.sf.katta.Katta.main(Katta.java:100) > > > > > > Any idea ? > > > > > > Thanks in advance. > > > > > > Alexandre Jaquet > > > > > > ------------------------------------------------------------------------------ > > > Crystal Reports - New Free Runtime and 30 Day Trial > > > Check out the new simplified licensing option that enables > unlimited > > > royalty-free distribution of the report engine for externally > facing > > > server and web deployment. > > > http://p.sf.net/sfu/businessobjects_______________________________________________ > > > Katta-commits mailing list > > > Kat...@li... > > > https://lists.sourceforge.net/lists/listinfo/katta-commits > > > > > > > ------------------------------------------------------------------------------ > > Crystal Reports - New Free Runtime and 30 Day Trial > > Check out the new simplified licensing option that enables unlimited > > royalty-free distribution of the report engine for externally facing > > server and web deployment. > > http://p.sf.net/sfu/businessobjects > > _______________________________________________ > > Katta-commits mailing list > > Kat...@li... > > https://lists.sourceforge.net/lists/listinfo/katta-commits > > > > > ------------------------------------------------------------------------------ > > Crystal Reports - New Free Runtime and 30 Day Trial > > Check out the new simplified licensing option that enables unlimited > > royalty-free distribution of the report engine for externally facing > > server and web deployment. > > http://p.sf.net/sfu/businessobjects_______________________________________________ > > Katta-commits mailing list > > Kat...@li... > > https://lists.sourceforge.net/lists/listinfo/katta-commits > > > ------------------------------------------------------------------------------ > Crystal Reports - New Free Runtime and 30 Day Trial > Check out the new simplified licensing option that enables unlimited > royalty-free distribution of the report engine for externally facing > server and web deployment. > http://p.sf.net/sfu/businessobjects > _______________________________________________ > Katta-commits mailing list > Kat...@li... > https://lists.sourceforge.net/lists/listinfo/katta-commits > > ------------------------------------------------------------------------------ > Crystal Reports - New Free Runtime and 30 Day Trial > Check out the new simplified licensing option that enables unlimited > royalty-free distribution of the report engine for externally facing > server and web deployment. > http://p.sf.net/sfu/businessobjects_______________________________________________ > Katta-commits mailing list > Kat...@li... > https://lists.sourceforge.net/lists/listinfo/katta-commits |
|
From: Alexandre J. <ale...@gm...> - 2009-06-15 19:21:15
|
Hi Peter, I type the following command line training@training-vm:~/katta$ sh bin/katta listIndexes $ | Name | Status | Path | Shards | Documents | Size | ============================================================================ | testIndex | ERROR | src/test/testIndexA/ | 0 | 0 | 1144893 | ---------------------------------------------------------------------------- 1 registered indexes The status doesn't look like a cheerleader and don't know how to fix that. Thx 2009/6/15 Peter Voss <in...@pe...> > Hi, > > did you deploy the testIndex? To verify you could run: > katta listIndexes > > You have to deploy that index first. > > I have opened an issue regarding the NPE: > http://oss.101tec.com/jira/browse/KATTA-72 > > --Peter > > > On 15.06.2009, at 20:51, Alexandre Jaquet wrote: > > > Hi, > > > > I didn't build it from the source I simply get the katta-0.5.1 binary > > > > 2009/6/14 Stefan Groschupf <sg...@10...> > > Hi, > > is that trunk or 0.5.1? > > Stefan > > > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > > Hadoop training and consulting > > http://www.scaleunlimited.com > > http://www.101tec.com > > > > > > > > On Jun 14, 2009, at 2:16 AM, Alexandre Jaquet wrote: > > > > > Hi, > > > > > > I follow the steps described in the main page but once I runt > > > raining@training-vm:~/katta$ sh bin/katta search testIndex foo:bar 4 > > > > > > I got the following exception : > > > > > > > > > Exception in thread "main" java.lang.NullPointerException > > > at java.util.ArrayList.addAll(ArrayList.java:472) > > > at net.sf.katta.client.Client.getShardsToSearchIn(Client.java: > > 296) > > > at net.sf.katta.client.Client.getNode2ShardsMap(Client.java:278) > > > at net.sf.katta.client.Client.search(Client.java:212) > > > at net.sf.katta.client.Client.search(Client.java:205) > > > at net.sf.katta.Katta.search(Katta.java:466) > > > at net.sf.katta.Katta.main(Katta.java:100) > > > > > > Any idea ? > > > > > > Thanks in advance. > > > > > > Alexandre Jaquet > > > > > > ------------------------------------------------------------------------------ > > > Crystal Reports - New Free Runtime and 30 Day Trial > > > Check out the new simplified licensing option that enables unlimited > > > royalty-free distribution of the report engine for externally facing > > > server and web deployment. > > > > http://p.sf.net/sfu/businessobjects_______________________________________________ > > > Katta-commits mailing list > > > Kat...@li... > > > https://lists.sourceforge.net/lists/listinfo/katta-commits > > > > > > > ------------------------------------------------------------------------------ > > Crystal Reports - New Free Runtime and 30 Day Trial > > Check out the new simplified licensing option that enables unlimited > > royalty-free distribution of the report engine for externally facing > > server and web deployment. > > http://p.sf.net/sfu/businessobjects > > _______________________________________________ > > Katta-commits mailing list > > Kat...@li... > > https://lists.sourceforge.net/lists/listinfo/katta-commits > > > > > ------------------------------------------------------------------------------ > > Crystal Reports - New Free Runtime and 30 Day Trial > > Check out the new simplified licensing option that enables unlimited > > royalty-free distribution of the report engine for externally facing > > server and web deployment. > > > http://p.sf.net/sfu/businessobjects_______________________________________________ > > Katta-commits mailing list > > Kat...@li... > > https://lists.sourceforge.net/lists/listinfo/katta-commits > > > > ------------------------------------------------------------------------------ > Crystal Reports - New Free Runtime and 30 Day Trial > Check out the new simplified licensing option that enables unlimited > royalty-free distribution of the report engine for externally facing > server and web deployment. > http://p.sf.net/sfu/businessobjects > _______________________________________________ > Katta-commits mailing list > Kat...@li... > https://lists.sourceforge.net/lists/listinfo/katta-commits > |
|
From: Peter V. <in...@pe...> - 2009-06-15 19:07:41
|
Hi, did you deploy the testIndex? To verify you could run: katta listIndexes You have to deploy that index first. I have opened an issue regarding the NPE: http://oss.101tec.com/jira/browse/KATTA-72 --Peter On 15.06.2009, at 20:51, Alexandre Jaquet wrote: > Hi, > > I didn't build it from the source I simply get the katta-0.5.1 binary > > 2009/6/14 Stefan Groschupf <sg...@10...> > Hi, > is that trunk or 0.5.1? > Stefan > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > Hadoop training and consulting > http://www.scaleunlimited.com > http://www.101tec.com > > > > On Jun 14, 2009, at 2:16 AM, Alexandre Jaquet wrote: > > > Hi, > > > > I follow the steps described in the main page but once I runt > > raining@training-vm:~/katta$ sh bin/katta search testIndex foo:bar 4 > > > > I got the following exception : > > > > > > Exception in thread "main" java.lang.NullPointerException > > at java.util.ArrayList.addAll(ArrayList.java:472) > > at net.sf.katta.client.Client.getShardsToSearchIn(Client.java: > 296) > > at net.sf.katta.client.Client.getNode2ShardsMap(Client.java:278) > > at net.sf.katta.client.Client.search(Client.java:212) > > at net.sf.katta.client.Client.search(Client.java:205) > > at net.sf.katta.Katta.search(Katta.java:466) > > at net.sf.katta.Katta.main(Katta.java:100) > > > > Any idea ? > > > > Thanks in advance. > > > > Alexandre Jaquet > > > ------------------------------------------------------------------------------ > > Crystal Reports - New Free Runtime and 30 Day Trial > > Check out the new simplified licensing option that enables unlimited > > royalty-free distribution of the report engine for externally facing > > server and web deployment. > > http://p.sf.net/sfu/businessobjects_______________________________________________ > > Katta-commits mailing list > > Kat...@li... > > https://lists.sourceforge.net/lists/listinfo/katta-commits > > > ------------------------------------------------------------------------------ > Crystal Reports - New Free Runtime and 30 Day Trial > Check out the new simplified licensing option that enables unlimited > royalty-free distribution of the report engine for externally facing > server and web deployment. > http://p.sf.net/sfu/businessobjects > _______________________________________________ > Katta-commits mailing list > Kat...@li... > https://lists.sourceforge.net/lists/listinfo/katta-commits > > ------------------------------------------------------------------------------ > Crystal Reports - New Free Runtime and 30 Day Trial > Check out the new simplified licensing option that enables unlimited > royalty-free distribution of the report engine for externally facing > server and web deployment. > http://p.sf.net/sfu/businessobjects_______________________________________________ > Katta-commits mailing list > Kat...@li... > https://lists.sourceforge.net/lists/listinfo/katta-commits |
|
From: Alexandre J. <ale...@gm...> - 2009-06-15 18:51:08
|
Hi, I didn't build it from the source I simply get the katta-0.5.1<http://sourceforge.net/project/showfiles.php?group_id=225750&package_id=273111&release_id=677307>binary 2009/6/14 Stefan Groschupf <sg...@10...> > Hi, > is that trunk or 0.5.1? > Stefan > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > Hadoop training and consulting > http://www.scaleunlimited.com > http://www.101tec.com > > > > On Jun 14, 2009, at 2:16 AM, Alexandre Jaquet wrote: > > > Hi, > > > > I follow the steps described in the main page but once I runt > > raining@training-vm:~/katta$ sh bin/katta search testIndex foo:bar 4 > > > > I got the following exception : > > > > > > Exception in thread "main" java.lang.NullPointerException > > at java.util.ArrayList.addAll(ArrayList.java:472) > > at net.sf.katta.client.Client.getShardsToSearchIn(Client.java:296) > > at net.sf.katta.client.Client.getNode2ShardsMap(Client.java:278) > > at net.sf.katta.client.Client.search(Client.java:212) > > at net.sf.katta.client.Client.search(Client.java:205) > > at net.sf.katta.Katta.search(Katta.java:466) > > at net.sf.katta.Katta.main(Katta.java:100) > > > > Any idea ? > > > > Thanks in advance. > > > > Alexandre Jaquet > > > ------------------------------------------------------------------------------ > > Crystal Reports - New Free Runtime and 30 Day Trial > > Check out the new simplified licensing option that enables unlimited > > royalty-free distribution of the report engine for externally facing > > server and web deployment. > > > http://p.sf.net/sfu/businessobjects_______________________________________________ > > Katta-commits mailing list > > Kat...@li... > > https://lists.sourceforge.net/lists/listinfo/katta-commits > > > > ------------------------------------------------------------------------------ > Crystal Reports - New Free Runtime and 30 Day Trial > Check out the new simplified licensing option that enables unlimited > royalty-free distribution of the report engine for externally facing > server and web deployment. > http://p.sf.net/sfu/businessobjects > _______________________________________________ > Katta-commits mailing list > Kat...@li... > https://lists.sourceforge.net/lists/listinfo/katta-commits > |
|
From: Stefan G. <sg...@10...> - 2009-06-14 21:13:29
|
Hi, is that trunk or 0.5.1? Stefan ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Hadoop training and consulting http://www.scaleunlimited.com http://www.101tec.com On Jun 14, 2009, at 2:16 AM, Alexandre Jaquet wrote: > Hi, > > I follow the steps described in the main page but once I runt > raining@training-vm:~/katta$ sh bin/katta search testIndex foo:bar 4 > > I got the following exception : > > > Exception in thread "main" java.lang.NullPointerException > at java.util.ArrayList.addAll(ArrayList.java:472) > at net.sf.katta.client.Client.getShardsToSearchIn(Client.java:296) > at net.sf.katta.client.Client.getNode2ShardsMap(Client.java:278) > at net.sf.katta.client.Client.search(Client.java:212) > at net.sf.katta.client.Client.search(Client.java:205) > at net.sf.katta.Katta.search(Katta.java:466) > at net.sf.katta.Katta.main(Katta.java:100) > > Any idea ? > > Thanks in advance. > > Alexandre Jaquet > ------------------------------------------------------------------------------ > Crystal Reports - New Free Runtime and 30 Day Trial > Check out the new simplified licensing option that enables unlimited > royalty-free distribution of the report engine for externally facing > server and web deployment. > http://p.sf.net/sfu/businessobjects_______________________________________________ > Katta-commits mailing list > Kat...@li... > https://lists.sourceforge.net/lists/listinfo/katta-commits |
|
From: Alexandre J. <ale...@gm...> - 2009-06-14 09:16:46
|
Hi,
I follow the steps described in the main page but once I runt
raining@training-vm:~/katta$ sh bin/katta search testIndex foo:bar 4
I got the following exception :
Exception in thread "main" java.lang.NullPointerException
at java.util.ArrayList.addAll(ArrayList.java:472)
at net.sf.katta.client.Client.getShardsToSearchIn(Client.java:296)
at net.sf.katta.client.Client.getNode2ShardsMap(Client.java:278)
at net.sf.katta.client.Client.search(Client.java:212)
at net.sf.katta.client.Client.search(Client.java:205)
at net.sf.katta.Katta.search(Katta.java:466)
at net.sf.katta.Katta.main(Katta.java:100)
Any idea ?
Thanks in advance.
Alexandre Jaquet
|
|
From: <jo...@us...> - 2009-06-12 18:10:16
|
Revision: 469
http://katta.svn.sourceforge.net/katta/?rev=469&view=rev
Author: joa23
Date: 2009-06-12 18:09:54 +0000 (Fri, 12 Jun 2009)
Log Message:
-----------
fix KATTA-67, Default namespace is not created when using external Katta
Modified Paths:
--------------
trunk/CHANGES.txt
trunk/src/main/java/net/sf/katta/master/Master.java
trunk/src/main/java/net/sf/katta/zk/ZkServer.java
Modified: trunk/CHANGES.txt
===================================================================
--- trunk/CHANGES.txt 2009-06-10 06:26:03 UTC (rev 468)
+++ trunk/CHANGES.txt 2009-06-12 18:09:54 UTC (rev 469)
@@ -1,6 +1,7 @@
Katta Change Log
katta 0.6-dev
+ fix KATTA-67, Default namespace is not created when using external Katta
fix KATTA-66, Update jets3t jar from version 0.5.0 to 0.6.1
fix KATTA-71, Katta hangs when deploying index from s3 since HADOOP-4422
fix KATTA-70, When using an external Zookeeper master and secondary master don't terminate straight away anymore.
Modified: trunk/src/main/java/net/sf/katta/master/Master.java
===================================================================
--- trunk/src/main/java/net/sf/katta/master/Master.java 2009-06-10 06:26:03 UTC (rev 468)
+++ trunk/src/main/java/net/sf/katta/master/Master.java 2009-06-12 18:09:54 UTC (rev 469)
@@ -60,7 +60,7 @@
_zkClient = zkClient;
try {
_zkClient.getEventLock().lock();
- zkClient.subscribeReconnects(this);
+ _zkClient.subscribeReconnects(this);
} finally {
_zkClient.getEventLock().unlock();
}
@@ -93,6 +93,8 @@
if (!_zkClient.isStarted()) {
LOG.info("connecting with zookeeper");
_zkClient.start(300000);
+ // now we need to create the default name space
+ _zkClient.createDefaultNameSpace();
}
becomeMasterOrSecondaryMaster();
if (_isMaster) {
Modified: trunk/src/main/java/net/sf/katta/zk/ZkServer.java
===================================================================
--- trunk/src/main/java/net/sf/katta/zk/ZkServer.java 2009-06-10 06:26:03 UTC (rev 468)
+++ trunk/src/main/java/net/sf/katta/zk/ZkServer.java 2009-06-12 18:09:54 UTC (rev 469)
@@ -90,14 +90,6 @@
}
LOG.info("data dir: " + dataDir.getAbsolutePath());
LOG.info("data log dir: " + dataLogDir.getAbsolutePath());
-
- // now if required we initialize our namespace
- final ZKClient client = new ZKClient(conf);
- client.start(300000);
- client.createDefaultNameSpace();
- client.close();
- // TODO jz: do we initialize the client only for creating the namespaces
- // ??
} else {
// TODO jz: shoudn't we better throw an exception
LOG.error("Zookeeper port " + port + " was already in use. Running in single machine mode?");
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pi...@us...> - 2009-06-10 06:26:42
|
Revision: 468
http://katta.svn.sourceforge.net/katta/?rev=468&view=rev
Author: pipenb
Date: 2009-06-10 06:26:03 +0000 (Wed, 10 Jun 2009)
Log Message:
-----------
Support dictionary searches during load tests.
Modified Paths:
--------------
trunk/src/main/java/net/sf/katta/Katta.java
trunk/src/main/java/net/sf/katta/loadtest/ILoadTestNode.java
trunk/src/main/java/net/sf/katta/loadtest/LoadTestNode.java
trunk/src/main/java/net/sf/katta/loadtest/LoadTestStarter.java
trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java
Added Paths:
-----------
trunk/src/test/java/net/sf/katta/loadtest/LoadTestStarterTest.java
Modified: trunk/src/main/java/net/sf/katta/Katta.java
===================================================================
--- trunk/src/main/java/net/sf/katta/Katta.java 2009-06-08 14:41:38 UTC (rev 467)
+++ trunk/src/main/java/net/sf/katta/Katta.java 2009-06-10 06:26:03 UTC (rev 468)
@@ -98,9 +98,9 @@
int step = Integer.parseInt(args[4]);
final int runTime = Integer.parseInt(args[5]);
final String[] indexNames = args[6].split(",");
- final String query = args[7];
+ final String queryFile = args[7];
final int count = Integer.parseInt(args[8]);
- startIntegrationTest(nodes, startRate, endRate, step, runTime, indexNames, query, count, conf);
+ startIntegrationTest(nodes, startRate, endRate, step, runTime, indexNames, queryFile, count, conf);
} else if (command.endsWith("version")) {
showVersion();
} else if (command.endsWith("zk")) {
@@ -185,9 +185,9 @@
}
public static void startIntegrationTest(int nodes, int startRate, int endRate, int step, int runTime, String[] indexNames,
- String queryString, int count, ZkConfiguration conf) throws KattaException {
+ String queryFile, int count, ZkConfiguration conf) throws KattaException {
final ZKClient client = new ZKClient(conf);
- final LoadTestStarter integrationTester = new LoadTestStarter(client, nodes, startRate, endRate, step, runTime, indexNames, queryString, count);
+ final LoadTestStarter integrationTester = new LoadTestStarter(client, nodes, startRate, endRate, step, runTime, indexNames, queryFile, count);
integrationTester.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
@@ -562,7 +562,7 @@
System.err.println("\tstartMaster\t\tStarts a local master.");
System.err.println("\tstartNode\t\tStarts a local node.");
System.err.println("\tstartLoadTestNode\tStarts a load test node.");
- System.err.println("\tstartLoadTest <nodes> <start-query-rate> <end-query-rate> <step> <test-duration-ms> <index-name> <query> <max hits>");
+ System.err.println("\tstartLoadTest <nodes> <start-query-rate> <end-query-rate> <step> <test-duration-ms> <index-name> <query-file> <max hits>");
System.err.println("\t\t\t\tStarts a load test. The query rate is in queries per second.");
System.err.println("\tshowStructure\t\tShows the structure of a Katta installation.");
System.err.println("\tcheck\t\t\tAnalyze index/shard/node status.");
Modified: trunk/src/main/java/net/sf/katta/loadtest/ILoadTestNode.java
===================================================================
--- trunk/src/main/java/net/sf/katta/loadtest/ILoadTestNode.java 2009-06-08 14:41:38 UTC (rev 467)
+++ trunk/src/main/java/net/sf/katta/loadtest/ILoadTestNode.java 2009-06-10 06:26:03 UTC (rev 468)
@@ -19,8 +19,10 @@
public interface ILoadTestNode extends VersionedProtocol {
- public void startTest(int queryRate, String[] indexNames, String queryString, int count);
+ public void initTest(int queryRate, String[] indexNames, String[] queries, int count);
+ public void startTest();
+
public void stopTest();
public LoadTestQueryResult[] getResults();
Modified: trunk/src/main/java/net/sf/katta/loadtest/LoadTestNode.java
===================================================================
--- trunk/src/main/java/net/sf/katta/loadtest/LoadTestNode.java 2009-06-08 14:41:38 UTC (rev 467)
+++ trunk/src/main/java/net/sf/katta/loadtest/LoadTestNode.java 2009-06-10 06:26:03 UTC (rev 468)
@@ -55,17 +55,28 @@
private String _currentNodeName;
private Random _random = new Random(System.currentTimeMillis());
+ private int _queryRate;
+
+ private int _count;
+
+ private String[] _indexNames;
+
+ private String[] _queryStrings;
+
private final class TestSearcherRunnable implements Runnable {
private int _count;
private String[] _indexNames;
- private String _queryString;
+ private String[] _queryStrings;
+ private int _queryIndex;
private int _testDelay;
-
- TestSearcherRunnable(int testDelay, int count, String[] indexNames, String queryString) {
+
+ TestSearcherRunnable(int testDelay, int count, String[] indexNames, String[] queryStrings) {
_count = count;
_indexNames = indexNames;
- _queryString = queryString;
+ _queryStrings = queryStrings;
_testDelay = testDelay;
+ _queryIndex = _random.nextInt(_queryStrings.length);
+ LOG.info("Starting dictionary search at index " + _queryIndex);
}
@SuppressWarnings("deprecation")
@@ -73,13 +84,15 @@
public void run() {
// TODO PVo search for different terms
long startTime = System.currentTimeMillis();
+ String queryString = _queryStrings[_queryIndex];
+ _queryIndex = (_queryIndex + 1) % _queryStrings.length;
try {
- _client.search(new Query(_queryString), _indexNames, _count);
+ _client.search(new Query(queryString), _indexNames, _count);
long endTime = System.currentTimeMillis();
- _statistics.add(new LoadTestQueryResult(startTime, endTime, _queryString, getRpcHostName() + ":"
+ _statistics.add(new LoadTestQueryResult(startTime, endTime, queryString, getRpcHostName() + ":"
+ getRpcServerPort()));
} catch (KattaException e) {
- _statistics.add(new LoadTestQueryResult(startTime, -1, _queryString, getRpcHostName() + ":"
+ _statistics.add(new LoadTestQueryResult(startTime, -1, queryString, getRpcHostName() + ":"
+ getRpcServerPort()));
LOG.error("Search failed.", e);
}
@@ -170,18 +183,26 @@
}
@Override
- public void startTest(int queryRate, final String[] indexNames, final String queryString, final int count) {
- int threads = Math.max(1, (queryRate - 1) / 3 + 1);
- int testDelay = 1000 * threads / queryRate;
+ public void initTest(int queryRate, final String[] indexNames, final String[] queryStrings, final int count) {
+ _queryRate = queryRate;
+ _queryStrings = queryStrings;
+ _count = count;
+ _indexNames = indexNames;
+ unregisterNode();
+ }
- LOG.info("Requested to run test at " + queryRate + " queries per second using " + threads
+ @Override
+ public void startTest() {
+ int threads = Math.max(1, (_queryRate - 1) / 3 + 1);
+ int testDelay = 1000 * threads / _queryRate;
+
+ LOG.info("Requested to run test at " + _queryRate + " queries per second using " + threads
+ " threads and a test delay of " + testDelay + "ms.");
_executorService = Executors.newScheduledThreadPool(threads);
_statistics = new Vector<LoadTestQueryResult>();
- unregisterNode();
for (int i = 0; i < threads; i++) {
- _executorService.schedule(new TestSearcherRunnable(testDelay, count, indexNames, queryString), _random
+ _executorService.schedule(new TestSearcherRunnable(testDelay, _count, _indexNames, _queryStrings), _random
.nextInt(testDelay), TimeUnit.MILLISECONDS);
}
}
Modified: trunk/src/main/java/net/sf/katta/loadtest/LoadTestStarter.java
===================================================================
--- trunk/src/main/java/net/sf/katta/loadtest/LoadTestStarter.java 2009-06-08 14:41:38 UTC (rev 467)
+++ trunk/src/main/java/net/sf/katta/loadtest/LoadTestStarter.java 2009-06-10 06:26:03 UTC (rev 468)
@@ -15,9 +15,13 @@
*/
package net.sf.katta.loadtest;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
@@ -56,7 +60,7 @@
ChildListener _childListener;
private String[] _indexNames;
- private String _queryString;
+ private String _queryFile;
private int _count;
private int _runTime;
private Writer _statisticsWriter;
@@ -86,14 +90,14 @@
}
public LoadTestStarter(final ZKClient zkClient, int nodes, int startRate, int endRate, int step, int runTime,
- String[] indexNames, String queryString, int count) throws KattaException {
+ String[] indexNames, String queryFile, int count) throws KattaException {
_zkClient = zkClient;
_numberOfTesterNodes = nodes;
_startRate = startRate;
_endRate = endRate;
_step = step;
_indexNames = indexNames;
- _queryString = queryString;
+ _queryFile = queryFile;
_count = count;
_runTime = runTime;
try {
@@ -170,13 +174,23 @@
int remainingQueryRate = queryRate;
int remainingNodes = numberOfNodes;
+ String[] queries;
+ try {
+ queries = readQueries(new FileInputStream(_queryFile));
+ } catch (IOException e) {
+ throw new KattaException("Failed to read query file " + _queryFile + ".", e);
+ }
for (ILoadTestNode testNode : nodesForTest) {
int queryRateForNode = remainingQueryRate / remainingNodes;
- LOG.info("Starting test on node using query rate: " + queryRateForNode + " queries per second.");
- testNode.startTest(queryRateForNode, _indexNames, _queryString, _count);
+ LOG.info("Initializing test on node using query rate: " + queryRateForNode + " queries per second.");
+ testNode.initTest(queryRateForNode, _indexNames, queries, _count);
--remainingNodes;
remainingQueryRate -= queryRateForNode;
}
+ for (ILoadTestNode testNode : nodesForTest) {
+ LOG.info("Starting test on node.");
+ testNode.startTest();
+ }
try {
Thread.sleep(_runTime);
} catch (InterruptedException e) {
@@ -226,6 +240,19 @@
shutdown();
}
+ static String[] readQueries(InputStream inputStream) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+ List<String> lines = new ArrayList<String>();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ line = line.trim();
+ if (!line.equals("")) {
+ lines.add(line);
+ }
+ }
+ return lines.toArray(new String[lines.size()]);
+ }
+
public void shutdown() {
_zkClient.getEventLock().lock();
try {
Modified: trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java
===================================================================
--- trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java 2009-06-08 14:41:38 UTC (rev 467)
+++ trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java 2009-06-10 06:26:03 UTC (rev 468)
@@ -37,7 +37,8 @@
deployClient.addIndex(INDEX1, TestResources.INDEX1.getAbsolutePath(), 1).joinDeployment();
LoadTestNode node = startLoadTestNode();
- node.startTest(10, new String[] { INDEX1 }, "test", 10);
+ node.initTest(10, new String[] { INDEX1 }, new String[] {"test"}, 10);
+ node.startTest();
Thread.sleep(5000);
node.stopTest();
Added: trunk/src/test/java/net/sf/katta/loadtest/LoadTestStarterTest.java
===================================================================
--- trunk/src/test/java/net/sf/katta/loadtest/LoadTestStarterTest.java (rev 0)
+++ trunk/src/test/java/net/sf/katta/loadtest/LoadTestStarterTest.java 2009-06-10 06:26:03 UTC (rev 468)
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta.loadtest;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+public class LoadTestStarterTest extends TestCase {
+
+ public void testReadQueries() throws IOException {
+ String[] queries = LoadTestStarter.readQueries(new ByteArrayInputStream("a\n\nb c\nd".getBytes()));
+ assertEquals(3, queries.length);
+ assertEquals("a", queries[0]);
+ assertEquals("b c", queries[1]);
+ assertEquals("d", queries[2]);
+ }
+}
Property changes on: trunk/src/test/java/net/sf/katta/loadtest/LoadTestStarterTest.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:eol-style
+ native
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pi...@us...> - 2009-06-08 14:41:39
|
Revision: 467
http://katta.svn.sourceforge.net/katta/?rev=467&view=rev
Author: pipenb
Date: 2009-06-08 14:41:38 +0000 (Mon, 08 Jun 2009)
Log Message:
-----------
Fixed failing test.
Modified Paths:
--------------
trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java
Modified: trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java
===================================================================
--- trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java 2009-06-05 19:30:13 UTC (rev 466)
+++ trunk/src/test/java/net/sf/katta/loadtest/LoadTestNodeTest.java 2009-06-08 14:41:38 UTC (rev 467)
@@ -47,8 +47,8 @@
}
// we should have executed 50 queries in 5s
- assertTrue(results.length >= 45);
- assertTrue("Queries per 500ms: " + results.length, results.length <= 55);
+ assertTrue(results.length >= 40);
+ assertTrue("Queries per 500ms: " + results.length, results.length <= 60);
node.shutdown();
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jo...@us...> - 2009-06-05 19:30:55
|
Revision: 466
http://katta.svn.sourceforge.net/katta/?rev=466&view=rev
Author: joa23
Date: 2009-06-05 19:30:13 +0000 (Fri, 05 Jun 2009)
Log Message:
-----------
fix KATTA-66, Update jets3t jar from version 0.5.0 to 0.6.1
Modified Paths:
--------------
trunk/CHANGES.txt
trunk/ivy.xml
Removed Paths:
-------------
trunk/lib/jets3t-0.5.0.jar
Modified: trunk/CHANGES.txt
===================================================================
--- trunk/CHANGES.txt 2009-06-05 18:27:11 UTC (rev 465)
+++ trunk/CHANGES.txt 2009-06-05 19:30:13 UTC (rev 466)
@@ -1,6 +1,7 @@
Katta Change Log
katta 0.6-dev
+ fix KATTA-66, Update jets3t jar from version 0.5.0 to 0.6.1
fix KATTA-71, Katta hangs when deploying index from s3 since HADOOP-4422
fix KATTA-70, When using an external Zookeeper master and secondary master don't terminate straight away anymore.
fix KATTA-63, Use java found on the path, if JAVA_HOME is not set
Modified: trunk/ivy.xml
===================================================================
--- trunk/ivy.xml 2009-06-05 18:27:11 UTC (rev 465)
+++ trunk/ivy.xml 2009-06-05 19:30:13 UTC (rev 466)
@@ -47,7 +47,7 @@
<dependency org="jasper" name="jasper-compiler" rev="x" conf="compile" />
<dependency org="jasper" name="jasper-runtime" rev="x" conf="compile" />
- <dependency org="jets3t" name="jets3t" rev="0.5.0" conf="compile" />
+ <dependency org="jets3t" name="jets3t" rev="0.6.1" conf="compile" />
<dependency org="jetty" name="jetty" rev="5.1.4" conf="compile" />
<dependency org="jsp-api" name="jsp-api" rev="2.0" conf="compile" />
<dependency org="servlet-api" name="servlet-api" rev="2.4" conf="compile" />
Deleted: trunk/lib/jets3t-0.5.0.jar
===================================================================
(Binary files differ)
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jo...@us...> - 2009-06-05 18:27:19
|
Revision: 465
http://katta.svn.sourceforge.net/katta/?rev=465&view=rev
Author: joa23
Date: 2009-06-05 18:27:11 +0000 (Fri, 05 Jun 2009)
Log Message:
-----------
fix KATTA-71, Katta hangs when deploying index from s3 since HADOOP-4422
Modified Paths:
--------------
trunk/CHANGES.txt
trunk/src/main/java/net/sf/katta/node/BaseNode.java
Modified: trunk/CHANGES.txt
===================================================================
--- trunk/CHANGES.txt 2009-06-05 17:00:26 UTC (rev 464)
+++ trunk/CHANGES.txt 2009-06-05 18:27:11 UTC (rev 465)
@@ -1,6 +1,7 @@
Katta Change Log
katta 0.6-dev
+ fix KATTA-71, Katta hangs when deploying index from s3 since HADOOP-4422
fix KATTA-70, When using an external Zookeeper master and secondary master don't terminate straight away anymore.
fix KATTA-63, Use java found on the path, if JAVA_HOME is not set
fixing a missleading path in the katta-evn.sh
Modified: trunk/src/main/java/net/sf/katta/node/BaseNode.java
===================================================================
--- trunk/src/main/java/net/sf/katta/node/BaseNode.java 2009-06-05 17:00:26 UTC (rev 464)
+++ trunk/src/main/java/net/sf/katta/node/BaseNode.java 2009-06-05 18:27:11 UTC (rev 465)
@@ -262,43 +262,54 @@
}
/**
- * Loads a shard from the given URI. The uri is handled bye the hadoop file
- * system. So all hadoop support file systems can be used, like local hdfs s3
+ * Loads a shard from the given URI. The uri is handled by the hadoop file
+ * system. So all hadoop support file systems can be used, like local, hdfs, s3
* etc. In case the shard is compressed we also unzip the content.
*/
private void download(AssignedShard shard, File localShardFolder) throws KattaException {
final String shardPath = shard.getShardPath();
String shardName = shard.getShardName();
LOG.info("Downloading shard '" + shardName + "' from " + shardPath);
- URI uri;
- try {
- uri = new URI(shardPath);
- final FileSystem fileSystem = FileSystem.get(uri, new Configuration());
- final Path path = new Path(shardPath);
- boolean isZip = fileSystem.isFile(path) && shardPath.endsWith(".zip");
+ // TODO sg: to fix HADOOP-4422 we try to download the shard 5 times
+ int maxTries = 5;
+ for (int i = 0; i < maxTries; i++) {
+ URI uri;
+ try {
+ uri = new URI(shardPath);
+ final FileSystem fileSystem = FileSystem.get(uri, new Configuration());
+ final Path path = new Path(shardPath);
+ boolean isZip = fileSystem.isFile(path) && shardPath.endsWith(".zip");
- File shardTmpFolder = new File(localShardFolder.getAbsolutePath() + "_tmp");
- // we download extract first to tmp dir in case something went wrong
- FileUtil.deleteFolder(localShardFolder);
- FileUtil.deleteFolder(shardTmpFolder);
+ File shardTmpFolder = new File(localShardFolder.getAbsolutePath() + "_tmp");
+ // we download extract first to tmp dir in case something went wrong
+ FileUtil.deleteFolder(localShardFolder);
+ FileUtil.deleteFolder(shardTmpFolder);
- if (isZip) {
- final File shardZipLocal = new File(_shardsFolder, shardName + ".zip");
- if (shardZipLocal.exists()) {
- // make sure we overwrite cleanly
+ if (isZip) {
+ final File shardZipLocal = new File(_shardsFolder, shardName + ".zip");
+ if (shardZipLocal.exists()) {
+ // make sure we overwrite cleanly
+ shardZipLocal.delete();
+ }
+ fileSystem.copyToLocalFile(path, new Path(shardZipLocal.getAbsolutePath()));
+ FileUtil.unzip(shardZipLocal, shardTmpFolder);
shardZipLocal.delete();
+ } else {
+ fileSystem.copyToLocalFile(path, new Path(shardTmpFolder.getAbsolutePath()));
}
- fileSystem.copyToLocalFile(path, new Path(shardZipLocal.getAbsolutePath()));
- FileUtil.unzip(shardZipLocal, shardTmpFolder);
- shardZipLocal.delete();
- } else {
- fileSystem.copyToLocalFile(path, new Path(shardTmpFolder.getAbsolutePath()));
+ shardTmpFolder.renameTo(localShardFolder);
+
+ // looks like we are successful.
+ return;
+ } catch (final URISyntaxException e) {
+ throw new KattaException("Can not parse uri for path: " + shardPath, e);
+ } catch (final Exception e) {
+ if(i == maxTries-1){
+ throw new KattaException("Can not load shard: " + shardPath, e);
+ } else {
+ LOG.error("Can not load shard: " + shardPath, e);
+ }
}
- shardTmpFolder.renameTo(localShardFolder);
- } catch (final URISyntaxException e) {
- throw new KattaException("Can not parse uri for path: " + shardPath, e);
- } catch (final IOException e) {
- throw new KattaException("Can not load shard: " + shardPath, e);
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <oss...@10...> - 2009-06-05 17:12:47
|
Build Katta::Trunk Test #3 successful (tests passed: 89) Agent: Default agent Build results: http://oss.101tec.com/teamcity/viewLog.html?buildId=18&buildTypeId=bt2 Changes included (1 change) ==================================================== Change 464 by pipenb (2 files): KATTA-70, When using an external Zookeeper master and secondary master don't terminate straight away anymore. see more information about changed files: http://oss.101tec.com/teamcity/viewLog.html?tab=buildChangesDiv&buildId=18&buildTypeId=bt2 ============================================================================ Configure email notifications: http://oss.101tec.com/teamcity/profile.html?init=1#notifications |