Revision: 10149
http://xbmc.svn.sourceforge.net/xbmc/?rev=10149&view=rev
Author: elupus
Date: 2007-09-02 21:15:11 -0700 (Sun, 02 Sep 2007)
Log Message:
-----------
changed: improved upnp eventing code to retry if server closed connection + discard events if server stops accepting them
Modified Paths:
--------------
trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltEvent.cpp
trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltEvent.h
trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltService.cpp
trunk/XBMC/xbmc/lib/libUPnP/Platinum/ThirdParty/Neptune/Source/Core/NptQueue.h
trunk/XBMC/xbmc/lib/libUPnP/Platinum/ThirdParty/Neptune/Source/System/Win32/NptWin32Queue.cpp
Modified: trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltEvent.cpp
===================================================================
--- trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltEvent.cpp 2007-09-03 04:13:41 UTC (rev 10148)
+++ trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltEvent.cpp 2007-09-03 04:15:11 UTC (rev 10149)
@@ -196,7 +196,11 @@
xml = "<?xml version=\"1.0\" encoding=\"utf-8\"?>" + xml;
PLT_HttpHelper::SetBody(request, xml);
- return m_SubscriberTask->AddRequest(request);;
+ if (NPT_FAILED(m_SubscriberTask->AddRequest(request))) {
+ delete request;
+ return NPT_FAILURE;
+ }
+ return NPT_SUCCESS;
}
/*----------------------------------------------------------------------
@@ -212,7 +216,7 @@
| PLT_EventSubscriberTask::PLT_EventSubscriberTask()
+---------------------------------------------------------------------*/
PLT_EventSubscriberTask::PLT_EventSubscriberTask()
-// : m_Requests(10)
+ : m_Requests(10)
{
}
@@ -239,7 +243,7 @@
NPT_OutputStreamReference output_stream;
NPT_InputStreamReference input_stream;
- NPT_HttpRequest* request;
+ NPT_HttpRequest* request;
NPT_String host;
NPT_UInt16 port = 0;
@@ -250,7 +254,10 @@
NPT_HttpResponse* response;
NPT_Reference<NPT_HttpResponse> response_holder;
NPT_Reference<NPT_HttpRequest> request_holder(request);
+ NPT_Result res = NPT_SUCCESS;
+ NPT_Integer count = 0;
+retry:
// check if we should abort
if(IsAborting(0)) return;
@@ -259,11 +266,11 @@
|| request->GetUrl().GetPort() != port
|| request->GetUrl().GetHost() != host ) {
m_Socket = new NPT_TcpClientSocket();
- m_Socket->SetReadTimeout(30000);
- m_Socket->SetWriteTimeout(30000);
- NPT_CHECK_LABEL(client.Connect(m_Socket.AsPointer(), *request), failed);
- NPT_CHECK_LABEL(m_Socket->GetOutputStream(output_stream), failed);
- NPT_CHECK_LABEL(m_Socket->GetInputStream(input_stream), failed);
+ m_Socket->SetReadTimeout(5000);
+ m_Socket->SetWriteTimeout(5000);
+ NPT_CHECK_LABEL_SEVERE(res = client.Connect(m_Socket.AsPointer(), *request), failed);
+ NPT_CHECK_LABEL_SEVERE(res = m_Socket->GetOutputStream(output_stream), failed);
+ NPT_CHECK_LABEL_SEVERE(res = m_Socket->GetInputStream(input_stream), failed);
port = request->GetUrl().GetPort();
host = request->GetUrl().GetHost();
@@ -272,14 +279,14 @@
NPT_LOG_FINER("PLT_EventSubscriberTask sending:");
PLT_LOG_HTTP_MESSAGE(NPT_LOG_LEVEL_FINER, request);
- NPT_CHECK_LABEL(client.SendRequest(output_stream, *request), failed);
- NPT_CHECK_LABEL(client.WaitForResponse(input_stream, *request, info, response), failed);
+ NPT_CHECK_LABEL_SEVERE(res = client.SendRequest(output_stream, *request), failed);
+ NPT_CHECK_LABEL_SEVERE(res = client.WaitForResponse(input_stream, *request, info, response), failed);
response_holder = response;
NPT_LOG_FINE("PLT_EventSubscriberTask receiving:");
PLT_LOG_HTTP_MESSAGE(NPT_LOG_LEVEL_FINE, response);
- NPT_CHECK_LABEL(ProcessResponse(request, info, response), failed);
+ NPT_CHECK_LABEL_SEVERE(ProcessResponse(request, info, response), failed);
// if it's no keep alive, we reopen on next attempt
if (!PLT_HttpHelper::IsConnectionKeepAlive(response)) {
@@ -288,7 +295,14 @@
continue;
failed:
+ output_stream = NULL;
+ input_stream = NULL;
m_Socket = NULL;
+ // server may have closed socket for us
+ if((res == NPT_ERROR_EOS || res == NPT_ERROR_TIMEOUT) && count < 2) {
+ count++;
+ goto retry;
+ }
}
}
Modified: trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltEvent.h
===================================================================
--- trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltEvent.h 2007-09-03 04:13:41 UTC (rev 10148)
+++ trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltEvent.h 2007-09-03 04:15:11 UTC (rev 10149)
@@ -121,7 +121,7 @@
PLT_EventSubscriberTask();
virtual ~PLT_EventSubscriberTask();
- NPT_Result AddRequest(NPT_HttpRequest* request) { return m_Requests.Push(request); }
+ NPT_Result AddRequest(NPT_HttpRequest* request) { return m_Requests.Push(request, false); }
protected:
// PLT_ThreadTask methods
virtual void DoAbort();
Modified: trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltService.cpp
===================================================================
--- trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltService.cpp 2007-09-03 04:13:41 UTC (rev 10148)
+++ trunk/XBMC/xbmc/lib/libUPnP/Platinum/Source/Core/PltService.cpp 2007-09-03 04:15:11 UTC (rev 10149)
@@ -80,7 +80,8 @@
+---------------------------------------------------------------------*/
NPT_Result
PLT_LastChangeXMLIterator::operator()(PLT_StateVariable* const &var) const
-{
+{
+ if(var->IsSendingEvents()) return NPT_SUCCESS;
NPT_XmlElementNode* variable = new NPT_XmlElementNode((const char*)var->GetName());
NPT_CHECK_SEVERE(m_Node->AddChild(variable));
NPT_CHECK_SEVERE(variable->SetAttribute("val", var->GetValue()));
@@ -491,12 +492,18 @@
PLT_UPnPMessageHelper::GenerateUUID(19, sid);
subscriber->SetSID("uuid:" + sid);
- m_Subscribers.Add(subscriber);
-
PLT_UPnPMessageHelper::SetSID(&response, subscriber->GetSID());
PLT_UPnPMessageHelper::SetTimeOut(&response, timeout);
+ // we must set LastChanged state to all non evented
+ // variables, by doing this first, we will get all data
+ // in one event to the new subscriber
+ m_StateChanged = m_StateVariables;
+ NotifyChanged();
+
+ m_Subscribers.Add(subscriber);
subscriber->Notify(m_StateVariables);
+
return NPT_SUCCESS;
}
@@ -586,7 +593,6 @@
expiration = sub->GetExpirationTime();
sub->Notify(*vars);
- m_Subscribers.Add(sub);
// for now subscribers never expire
// renabled to keep down overhead -- elupus
Modified: trunk/XBMC/xbmc/lib/libUPnP/Platinum/ThirdParty/Neptune/Source/Core/NptQueue.h
===================================================================
--- trunk/XBMC/xbmc/lib/libUPnP/Platinum/ThirdParty/Neptune/Source/Core/NptQueue.h 2007-09-03 04:13:41 UTC (rev 10148)
+++ trunk/XBMC/xbmc/lib/libUPnP/Platinum/ThirdParty/Neptune/Source/Core/NptQueue.h 2007-09-03 04:15:11 UTC (rev 10149)
@@ -32,7 +32,8 @@
// methods
virtual ~NPT_GenericQueue() {}
- virtual NPT_Result Push(NPT_QueueItem* item) = 0;
+ virtual NPT_Result Push(NPT_QueueItem* item,
+ bool blocking = true) = 0;
virtual NPT_Result Pop(NPT_QueueItem*& item,
bool blocking = true) = 0;
@@ -52,8 +53,9 @@
NPT_Queue(NPT_Cardinal max_items = 0) :
m_Delegate(NPT_GenericQueue::CreateInstance(max_items)) {}
virtual ~NPT_Queue<T>() { delete m_Delegate; }
- virtual NPT_Result Push(T* item) {
- return m_Delegate->Push(reinterpret_cast<NPT_QueueItem*>(item));
+ virtual NPT_Result Push(T* item, bool blocking = true) {
+ return m_Delegate->Push(reinterpret_cast<NPT_QueueItem*>(item),
+ blocking);
}
virtual NPT_Result Pop(T*& item, bool blocking = true) {
return m_Delegate->Pop(reinterpret_cast<NPT_QueueItem*&>(item),
Modified: trunk/XBMC/xbmc/lib/libUPnP/Platinum/ThirdParty/Neptune/Source/System/Win32/NptWin32Queue.cpp
===================================================================
--- trunk/XBMC/xbmc/lib/libUPnP/Platinum/ThirdParty/Neptune/Source/System/Win32/NptWin32Queue.cpp 2007-09-03 04:13:41 UTC (rev 10148)
+++ trunk/XBMC/xbmc/lib/libUPnP/Platinum/ThirdParty/Neptune/Source/System/Win32/NptWin32Queue.cpp 2007-09-03 04:15:11 UTC (rev 10149)
@@ -33,7 +33,7 @@
// methods
NPT_Win32Queue(NPT_Cardinal max_items);
~NPT_Win32Queue();
- NPT_Result Push(NPT_QueueItem* item);
+ NPT_Result Push(NPT_QueueItem* item, bool blocking);
NPT_Result Pop(NPT_QueueItem*& item, bool blocking);
@@ -68,7 +68,7 @@
| NPT_Win32Queue::Push
+---------------------------------------------------------------------*/
NPT_Result
-NPT_Win32Queue::Push(NPT_QueueItem* item)
+NPT_Win32Queue::Push(NPT_QueueItem* item, bool blocking)
{
// lock the mutex that protects the list
NPT_CHECK(m_Mutex.Lock());
@@ -76,6 +76,11 @@
// check that we have not exceeded the max
if (m_MaxItems) {
while (m_Items.GetItemCount() >= m_MaxItems) {
+ if (!blocking) {
+ m_Mutex.Unlock();
+ return NPT_FAILURE;
+ }
+
// we must wait until some items have been removed
// unlock the mutex so that another thread can pop
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|