From: <ro...@us...> - 2012-08-28 11:30:58
|
Revision: 2801 http://nscldaq.svn.sourceforge.net/nscldaq/?rev=2801&view=rev Author: ron-fox Date: 2012-08-28 11:30:47 +0000 (Tue, 28 Aug 2012) Log Message: ----------- Provides stalled data source detection/support/status. Modified Paths: -------------- branches/nscldaq-10.2-development/daq/eventbuilder/ConnectionManager.tcl branches/nscldaq-10.2-development/daq/eventbuilder/connectionList.tcl Added Paths: ----------- branches/nscldaq-10.2-development/daq/eventbuilder/observer.tcl Modified: branches/nscldaq-10.2-development/daq/eventbuilder/ConnectionManager.tcl =================================================================== --- branches/nscldaq-10.2-development/daq/eventbuilder/ConnectionManager.tcl 2012-08-27 12:01:38 UTC (rev 2800) +++ branches/nscldaq-10.2-development/daq/eventbuilder/ConnectionManager.tcl 2012-08-28 11:30:47 UTC (rev 2801) @@ -41,6 +41,7 @@ package require snit package require EVB::CallbackManager package require EvbOrderer; # C/C++ orderer. +package require Observer; # Observer pattern component package provide EVB::ConnectionManager 1.0 @@ -51,10 +52,11 @@ # # OPTIONS # -state - Current socket state (see above). This is readonly. -# -clientaddr - Client TCP/IP address (this is readonly). -# -description - Client description as provided in the the connection negotiation. -# -socket - Socket connected to client. +# -clientaddr - Client TCP/IP address (this is readonly). +# -description - Client description as provided in the the connection negotiation. +# -socket - Socket connected to client. # -disconnectcommand - Script to call on disconnect. +# -fragmentcommand - Script to call on a fragment # # METHODS # Only construction and destrution are are public. @@ -65,23 +67,46 @@ option -socket; # The socket connected to client. option -clientaddr -default "not-set" - option -disconnectcommand [list] + option -disconnectcommand -default [list] -configuremethod _SetCallback + option -fragmentcommand -default [list] -configuremethod _SetCallback + variable callbacks + + + constructor args { $self configurelist $args fconfigure $options(-socket) -blocking 1 -buffering none -translation {binary lf} + set callbacks [EVB::CallbackManager %AUTO] + $callbacks define -disconnectcommand + $callbacks define -fragmentcommand + $self _Expecting _Connect FORMING; # We are now expecting a CONNECT command. } destructor { if {$options(-socket) != -1} { catch {$self _Close 'CLOSED'} } + $callbacks destroy + } + #---------------------------------------------------------------------------- + # Configuration + # + ## + # _SetCallback - register a callback handler for an option. + # + # @param option - name of the option. + # @param script - New script to register. + # + method _SetCallback {option script} { + $callbacks register $option $script + set options($option) $script } - + #------------------------------------------------------------------------------ # Private methods: @@ -181,9 +206,8 @@ set options(-socket) -1 - if {$options(-disconnectcommand) ne [list]} { - uplevel #0 $options(-disconnectcommand) - } + $callbacks invoke -disconnectcommand [list] [list] + } @@ -259,6 +283,10 @@ # TODO: Handle errors as a close EVB::handleFragment $socket + + $callbacks -fragmentcommand [list] [list] + + puts $socket "OK" } else { @@ -286,20 +314,33 @@ # %O - Connection object created to manage this connection. # %H - Host from which the client came. # %D - Connection descdription. +# -sourcetimeout - Number of seconds of not getting a fragment +# from a data source after which it is considered timed out. +# # # METHODS: # getConnections - List the connections and their states. # snit::type EVB::ConnectionManager { + component TimeoutObservers + option -port; # Port on which we listen for connections. option -connectcommand -default [list] -configuremethod _SetCallback option -disconnectcommand -default [list] -configuremethod _SetCallback + option -sourcetimeout -default 10 - variable serverSocket; # Socket run by us. - variable connections [list]; # List of Connection objects. - variable callbacks; # Callback manager. + variable serverSocket; # Socket run by us. + variable connections -array {}; # Key is connection, value last received timestamp. + variable lastFragment [clock seconds]; # When the last fragment arrived. + variable callbacks; # Callback manager. + variable timedoutSources [list]; # list of connections that are timed out. + + delegate addObserver to TimeoutObservers as addTimeoutObserver + delegate removeObserver to TimeoutObservers as removeTimeoutObserver + + constructor args { $self configurelist $args; # To get the port. @@ -310,37 +351,49 @@ $callbacks define -connectcommand $callbacks define -disconnectcommand + set serverSocket [socket -server [mymethod _NewConnection] $options(-port)] + # watch timeouts at 1/2 the timeout interval: - set serverSocket [socket -server [mymethod _NewConnection] $options(-port)] + after [$self _TimeoutCheckInterval] [mymethod _CheckSourceTimeouts] + + install TimeoutObservers using using Observer %AUTO% -partof $self } destructor { - foreach object $connections { + foreach object [array names connections] { $object destroy + unset connections($object) } close $serverSocket + $callbacks destroy + $TimeoutObservers destroy } #----------------------------------------------------------------- # Public methods: # ## - # Get the list of connections and their properties/status: + # Get the list of connections # # @return list of three element sublists (possibly empty). # @retval Each list element contains in order: # - Client's host. # - description of client. # - Connection state string. - # + # - "yes" if the connection is in the timedout list + # "" if not. method getConnections {} { set result [list]; # possibly empty return list. - foreach connection $connections { + foreach connection [array names connections] { set host [$connection cget -clientaddr] set desc [$connection cget -description] set state [$connection cget -state] + set timedout "" + if {[lsearch -exact $timedoutSources $connection] != -1} { + set timedout "yes" + } - lappend result [list $host $desc $state] + lappend result [list $host $desc $state $timedout] } return $result } @@ -358,15 +411,15 @@ # is not in our list, we toss an error if we are handed one. # method _DisconnectClient object { - set objectIndex [lsearch -exact $connections $object] - if {$objectIndex != -1} { + if {[array names $connections $object] ne ""} { + $callbacks invoke -disconnectcommand [list %O %H %D] \ [list $object [$object cget -clientaddr] [$object cget -description]] - set connections [lreplace $connections $objectIndex $objectIndex] + unset connections($object) $object destroy } else { - error "BUG - $object not in ConnectionManager connection list (Disconnect)" + error "BUG - $object not in ConnectionManager known connections" } } @@ -387,8 +440,9 @@ method _NewConnection {sock client cport} { set connection [EVB::Connection %AUTO% -socket $sock -clientaddr $client] - lappend connections $connection + set connections($connection) [clock seconds]; # connection's last frag time to be not-timedout. $connection configure -disconnectcommand [mymethod _DisconnectClient $connection] + $connection configure -fragmentcommand [mymethod _RecordFragment $connection] $callbacks invoke -connectcommand [list %H %O] [list $client $connection] } ## @@ -403,5 +457,69 @@ $callbacks register $option $value set options($option) $value } + ## + # _RecordFragment + # + # Record the time at which the most recent fragment arrived and which + # connection received it. + # + # @param connection - the connection that just got the fragment. + # + method _RecordConnection connection { + set connections($connection) [clock seconds] + set lastFragment [clock seconds] + } + ## + # Check for lack of data on data sources; + # - If we've not had any data in a timeout period, nobody's timed out. + # - If we have had data within a timeout but a source has not contributed, + # - It's timed out. + # + method _CheckSourceTimeouts {} { + + set previouslyTimedOut [timedoutSources] + set timedoutSources [list]; # Assume it's all just peachy. + + # Only care if we're getting fragments: + + set now [clock seconds] + if {($now - $lastFragment) < $options(-sourcetimeout)} { + foreach connection [array names $connections] { + if {($now - $connections($connection)) > $options(-sourcetimeout)} { + lappend timedoutSources $connection + } + } + } + + # Invoke observers for each newly timed out source + # Invocation is of the form: + # observer STALLED connection + + foreach source $timedoutSources { + if {$source ni $previouslyTimedOuts} { + $TimeoutObservers invoke STALLED $source + } + } + # Invoke observers that are no longer stalled: + # Invocation is of the form. + # observer UNSTALLED $source + # + foreach source $previouslyTimedOut { + if {$source ni $timedoutSources} { + $TimeoutObservers UNSTALLED $source + } + + # Reschedule: + + after [$self _TimeoutCheckInterval] [mymethod $_CheckSourceTimeouts] + } + ## + # compute the timeout check interval: + # + # @return ms to next timeout check. + # + method _TimoutCheckInterval {} { + return [expr {int(1000*$options(-sourcetimeout)/2.0)}] + } } Modified: branches/nscldaq-10.2-development/daq/eventbuilder/connectionList.tcl =================================================================== --- branches/nscldaq-10.2-development/daq/eventbuilder/connectionList.tcl 2012-08-27 12:01:38 UTC (rev 2800) +++ branches/nscldaq-10.2-development/daq/eventbuilder/connectionList.tcl 2012-08-28 11:30:47 UTC (rev 2801) @@ -39,6 +39,7 @@ $win.table heading #1 -text Host $win.table heading #2 -text Description $win.table heading #3 -text State + $win.table heading #4 -text Stalled grid $win.table -sticky nsew Added: branches/nscldaq-10.2-development/daq/eventbuilder/observer.tcl =================================================================== --- branches/nscldaq-10.2-development/daq/eventbuilder/observer.tcl (rev 0) +++ branches/nscldaq-10.2-development/daq/eventbuilder/observer.tcl 2012-08-28 11:30:47 UTC (rev 2801) @@ -0,0 +1,104 @@ +# This software is Copyright by the Board of Trustees of Michigan +# State University (c) Copyright 2005. +# +# You may use this software under the terms of the GNU public license +# (GPL). The terms of this license are described at: +# +# http://www.gnu.org/licenses/gpl.txt +# +# Author: +# Ron Fox +# NSCL +# Michigan State University +# + +## +# @file observer.tcl +# @brief snit observer pattern component. +# + + +package provide observer 1.0 + +## +# This snit object provides generic support for the observer pattern +# within snit objects. There are two use cases: +# - One observer type per object. +# - Multiple observer types per object. +# +# Regardless, the first thing a client must do is install +# one or more instances of this type as component(s). +# +# If you are only installing a single observer, you can +# normally +#<verbatim> +# delegate addObserver to <observer-component> +# delegate removeObserver to <observer-component> +#</endverbatim> +# +# Then clients of _your_ type can just use addObserver and removeObserver directly. +# If you require multiple observer types you can again install each as a component +# and: +# +# \verbatim +# delegate addObserver to <observer-component1> as add<someTypeObserver> +# delegate removeObserver to <observer-component1> as remove<someTypeObserver> +# +# delegate addObserver to <observer-component2> as add<someOtherTypeObserver> +# delegate removeObserver to <observer-component2> as remove<someOtherTypeObserver> +# +# ... +# +# \endverbatim +# +# METHODS: +# addObserver - adds an observer to the ordered list of observers. +# removeObserver - Removes an observer from the ordered list of observers. +# invoke - Invokes the observers in registration order. +# +snit::type Observer { + variable observerList [list]; # List of observers. + + ## + # addObsever + # + # Add an observer to the end of the list. + # + # @param observer - Command prefix of script to invoke. + # + method addObserver observer { + lappend observerList $observer + } + ## + # removeObserver + # + # Remove an observer from the list. It is an error to remove + # a nonexistent observer. + # + # @param observer - command prefix of script to remove. + # + method removeObserver observer { + set index [lsearch -exact $observerList $observer] + + if {$index == -1} { + error "$observer is not in the observer list" + } else { + set observerList [lreplace $observerList $index $index] + } + } + ## + # invoke + # + # Invoke the observers in the list. + # + # @param args - The args are appended as command words for the + # command. + # + method invoke args { + foreach observer $observerList { + uplevel {*}$observer {*}$args + } + } + + +} \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |