[7172b2]: src / plugins / MHiveStorage / services / StorageService.cpp  Maximize  Restore  History

Download this file

79 lines (68 with data), 3.6 kB

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include "engines/redundant_engine/RedundantEngine.h"
#include "StorageService.h"
#include "engines/Simple.h"
#include "org/esb/core/PluginContext.h"
#include "org/esb/signal/Messenger.h"
#include "org/esb/hive/job/ProcessUnit.h"
#include "org/esb/hive/Environment.h"
using org::esb::hive::Environment;
namespace mhivestorage{
StorageService::StorageService()
{
}
void StorageService::onMessage(org::esb::signal::Message &msg)
{
if(msg.containsProperty("processunit_get")){
LOGDEBUG("get received message");
boost::shared_ptr<org::esb::hive::job::ProcessUnit>unit=_storageEngine->get();
msg.setProperty("processunit_get", unit);
}else if(msg.containsProperty("processunit_put")){
LOGDEBUG("put received message");
boost::shared_ptr<org::esb::hive::job::ProcessUnit>unit = msg.getProperty<boost::shared_ptr<org::esb::hive::job::ProcessUnit> >("processunit_put");
_storageEngine->put(unit);
}else if(msg.containsProperty("processunit_enque")){
LOGDEBUG("enque received message");
boost::shared_ptr<org::esb::hive::job::ProcessUnit>unit = msg.getProperty<boost::shared_ptr<org::esb::hive::job::ProcessUnit> >("processunit_enque");
_storageEngine->enque(unit);
}else if(msg.containsProperty("processunit_deque")){
LOGDEBUG("deque received message");
boost::shared_ptr<org::esb::hive::job::ProcessUnit>unit=_storageEngine->deque();
msg.setProperty("processunit_deque", unit);
}else if(msg.containsProperty("processunit_rollback")){
_storageEngine->rollback(msg.getProperty<std::string>("processunit_rollback"));
}
}
void StorageService::startService()
{
std::string engine_name=getContext()->getEnvironment<string>("mhivestorage.engine");
if (engine_name=="simple") {
_storageEngine=new engines::Simple(getContext()->database, Environment::get("hive.data_path"));
}else if(engine_name=="redundant"){
std::vector<std::string> hosts=getContext()->getProperty< std::vector<std::string> >("mhivestorage.hosts");
//int port=getContext()->getProperty<int>("mhivestorage.port");
_storageEngine=new engines::RedundantEngine(getContext()->database, Environment::get("hive.data_path"), hosts, 0);
}else {
LOGWARN("no storage engine\""<<engine_name<<"\" found, using simple storage!");
_storageEngine=new engines::Simple(getContext()->database, Environment::get("hive.data_path"));
}
_storageEngine->init();
org::esb::signal::Messenger::getInstance().addMessageListener(*this);
}
void StorageService::stopService()
{
org::esb::signal::Messenger::getInstance().removeMessageListener(*this);
_storageEngine.reset();
}
org::esb::core::OptionsDescription StorageService::getOptionsDescription()
{
org::esb::core::OptionsDescription result("mhivestorage");
result.add_options()
("mhivestorage.engine", boost::program_options::value<string >()->default_value("simple"), "which storage engine to use(simple,redundant)")
("mhivestorage.hosts", boost::program_options::value< std::vector<string> >()->multitoken(), "ip:port tuples for the other redundant storage")
("mhivestorage.port", boost::program_options::value< int >()->default_value(20202), "port number for own redundant storage")
//("mhivestorage.redundancy_level", boost::program_options::value<int >()->default_value(2), "which redundancy level should the storage engine \"redundant\" use")
;
return result;
}
REGISTER_SERVICE("storageservice", StorageService)
}

Get latest updates about Open Source Projects, Conferences and News.

Sign up for the SourceForge newsletter:

JavaScript is required for this form.





No, thanks