[379103]: src / org / esb / hive / HiveClient.cpp  Maximize  Restore  History

Download this file

156 lines (140 with data), 5.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//#include "org/esb/net/TcpSocket.h"
//#include "org/esb/io/ObjectOutputStream.h"
//#include "org/esb/io/ObjectInputStream.h"
#include "org/esb/net/TcpSocket.h"
#include "org/esb/hive/job/ProcessUnit.h"
#include "org/esb/lang/Thread.h"
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/shared_ptr.hpp>
#include "org/esb/av/FormatBaseStream.h"
//#include "org/esb/util/Log.h"
#include "HiveClient.h"
//#include "Version.h"
namespace org {
namespace esb {
namespace hive {
HiveClient::HiveClient(std::string host, int port) {
_host = host;
_port = port;
_sock = NULL;
_ois = NULL;
_oos = NULL;
_toHalt = false;
_running = false;
_sock = new org::esb::net::TcpSocket((char*) _host.c_str(), _port);
#ifdef USE_SAFMQ
_qis=new QueueInputStream(_host, _port,"punitout");
_qos=new QueueOutputStream(_host, _port,"punitin");
#endif
org::esb::av::FormatBaseStream::initialize();
// avcodec_register_all();
// av_register_all();
// avcodec_init();
}
HiveClient::~HiveClient() {
if (_sock)
_sock->close();
//delete _sock;
}
void HiveClient::onMessage(org::esb::signal::Message & msg) {
if (msg.getProperty("hiveclient") == "start") {
boost::thread t(boost::bind(&HiveClient::start, this));
_running = true;
} else
if (msg.getProperty("hiveclient") == "stop") {
_toHalt = true;
if (_running) {
LOGDEBUG("StopSignal Received, waiting for all work done!");
_toHalt = true;
boost::mutex::scoped_lock terminationLock(terminationMutex);
ctrlCHit.wait(terminationLock);
LOGDEBUG("stopping done!")
}
}
}
void HiveClient::start() {
_toHalt = false;
connect();
process();
}
void HiveClient::stop() {
// _toHalt=true;
}
void HiveClient::connect() {
try {
//delete _sock;
_sock = new org::esb::net::TcpSocket((char*) _host.c_str(), _port);
_sock->connect();
#ifdef USE_SAFMQ
_qis=new QueueInputStream(_host, _port,"punitout");
_qos=new QueueOutputStream(_host, _port,"punitin");
_ois=new org::esb::io::ObjectInputStream(_qis.get());
_oos = new org::esb::io::ObjectOutputStream(_qos.get());
#else
_ois = new org::esb::io::ObjectInputStream(_sock->getInputStream());
_oos = new org::esb::io::ObjectOutputStream(_sock->getOutputStream());
#endif
LOGINFO("Server " << _host << " connected!!!");
std::cout << "Video Processor connected to Server "<<_host<< ":" << _port<<std::endl;
} catch (exception & ex) {
LOGERROR("cant connect to \"" << _host << ":" << _port << "\"!!!" << ex.what());
std::cout <<"cant connect to \"" << _host << ":" << _port << "\"!!!" << ex.what()<<std::endl;
std::cout <<"retry it in 5 seconds."<<std::endl;
// logerror("cant connect to \"" << _host << ":" << _port << "\"!!!" << ex.what());
}
}
void HiveClient::process() {
//int pCount = 0;
while (!_toHalt) {
if (!_sock->isConnected()) {
connect();
} else {
while (!_toHalt) {
char * text = const_cast<char*> ("get process_unit");
org::esb::hive::job::ProcessUnit * unit = new org::esb::hive::job::ProcessUnit();
try {
_sock->getOutputStream()->write(text, strlen(text));
_ois->readObject(*unit);
} catch (exception & ex) {
LOGERROR("Connection to Server lost!!!" << ex.what());
_sock->close();
}
if (unit->_input_packets.size() == 0) {
delete unit;
break;
}
unit->process();
/**
* clear the input packets, they are no more nedded
* they only consumes Network bandwidth and cpu on the server
*/
//unit->_input_packets.clear();
char * text_out = const_cast<char*> ("put process_unit");
try {
_sock->getOutputStream()->write(text_out, strlen(text_out));
_oos->writeObject(*unit);
} catch (exception & ex) {
LOGERROR("Connection to Server lost!!!" << ex.what());
_sock->close();
}
/*
delete unit->_decoder;
unit->_decoder = NULL;
delete unit->_encoder;
unit->_encoder = NULL;*/
delete unit->_converter;
unit->_converter = NULL;
delete unit;
//_toHalt=true;
}
}
org::esb::lang::Thread::sleep2(5000);
}
boost::mutex::scoped_lock terminationLock(terminationMutex);
ctrlCHit.notify_all();
}
}
}
}

Get latest updates about Open Source Projects, Conferences and News.

Sign up for the SourceForge newsletter:

JavaScript is required for this form.





No, thanks