This page is part of the documentation for the training session to take place in Barcelona on the 22nd of May of 2013.
In this part we show the basics of creating Sardana controllers for a Motor, a Counter and a ZeroD channel.
Duration: 1h10'
In the link below you will find a description of what is a controller and the types of controllers that Sardana has.
Sardana Controller Overview link:
http://www.tango-controls.org/static/sardana/latest/doc/html/development/devel/overview/overview_controller.html#sardana-controller-overview
There are more details in the chapter 'Writing controllers':
http://www.tango-controls.org/static/sardana/latest/doc/html/development/devel/howto_controllers/index.html#sardana-controller-howto
Synchronized START
/FOR/ Each controller(s) implied in the motion
- Call PreStartAll()
/END FOR/
/FOR/ Each motor(s) implied in the motion
- ret = PreStartOne(motor to move, new position)
- /IF/ ret is not true
/RAISE/ Cannot start. Motor PreStartOne returns False
- /END IF/
- Call StartOne(motor to move, new position)
/END FOR/
/FOR/ Each controller(s) implied in the motion
- Call StartAll()
/END FOR/
Synchronized READ
/FOR/ Each controller(s) implied in the reading
- Call PreReadAll()
/END FOR/
/FOR/ Each motor(s) implied in the reading
- PreReadOne(motor to read)
/END FOR/
/FOR/ Each controller(s) implied in the reading
- Call ReadAll()
/END FOR/
/FOR/ Each motor(s) implied in the reading
- Call ReadOne(motor to read)
/END FOR/
State evaluation of all pool elements is implemented in a similar algorithm
Latest Sardana documentation is available at:
or you could build it yourself from the code executing:
python setup.py build
file:///home/sardana/Sardana/build/sphinx/html/index.html
MOTOR HAS ATTRIBUTES: Position, State, Status, Limit_Switches
+) Position attribute details (user units)
pos = sign * dial + offset
dial = hw_pos / step_per_unit
backlash
+) Common attributes Velocity, acceleration, decelaration
In this section we will see how we can interface a real motor controller (IcePAP) with a very simplified library and implementing the minimum required for the Motor API.
sardana@sardanademo:~$ mkdir poolcontrollers
sardana@sardanademo:~$ cd poolcontrollers/
sardana@sardanademo:~/poolcontrollers$ wget https://sourceforge.net/p/sardana/wiki/Howto-CreateControllers/attachment/mymotorctrl.py
sardana@sardanademo:~/poolcontrollers$ gedit mymotorctrl.py &
sardana@sardanademo:~$ jive &
(and create PoolPath property with value: /home/sardana/poolcontrollers, and restart Pool, MacroServer and Spock)
Door_demo1_1 [1]: %lsctrllib
Door_demo1_1 [2]: %defctrl?
Door_demo1_1 [3]: %defctrl MiniIcepapMotorController miniipapctrl Host icepap10 Port 5000
Door_demo1_1 [4]: %defelem?
Door_demo1_1 [5]: %defelem ipap8 miniipapctrl 8
Door_demo1_1 [6]: wm ipap8
Door_demo1_1 [7]: mv ipap8 10000
from sardana import State
from sardana.pool.controller import MotorController
from sardana.pool.controller import Type
from sardana.pool.controller import Description
### Needed for the Icepap communication
import socket
import struct
class MiniIcepapMotorController(MotorController):
""" This controller provides basic interface for Icepap motors."""
## The properties used to connect to the ICEPAP motor controller
ctrl_properties = {'host': {Type: str, Description: 'The host name'},
'port': {Type: int, Description: 'The port number'},
}
MaxDevice = 1
def __init__(self, inst, props, *args, **kwargs):
MotorController.__init__(self, inst, props, *args, **kwargs)
self.ipap = IcepapControl(self.host, self.port)
def AddDevice(self, axis):
pass
def DeleteDevice(self, axis):
pass
def StateOne(self, axis):
isMoving = self.ipap.isMoving(axis)
in_pos_lim = self.ipap.inLimitPositive(axis)
in_neg_lim = self.ipap.inLimitNegative(axis)
home = self.ipap.inHome(axis)
state = State.On
if isMoving:
state = State.Moving
status_string = 'My custom status info'
# LIM- (bit 2), LIM+ (bit 1), HOME (bit 0)
switch_state = in_neg_lim << 2 | in_pos_lim << 1 | home
return state, status_string, switch_state
def ReadOne(self, axis):
return self.ipap.getPos(axis)
def StartAll(self):
pass
def StartOne(self, axis, pos):
self.ipap.move(axis, pos)
def AbortOne(self, axis):
self.ipap.abort(axis)
class IcepapControl():
def __init__(self, host, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
NOLINGER = struct.pack('ii', 1, 0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)
self.sock.connect((host, port))
self.DEBUG = False
def getPos(self, axis):
cmd = '%d:?POS' % axis
ans = self.ask(cmd)
pos = int(ans.split()[1])
return pos
def move(self, axis, abs_pos):
cmd = '%d:MOVE %d' % (axis, abs_pos)
self.send(cmd)
def abort(self, axis):
cmd = '%d:ABORT' % axis
self.send(cmd)
def isMoving(self, axis):
status_reg = self.getStatus(axis)
isMoving = (status_reg >> 10) & 1
return isMoving
def inLimitPositive(self, axis):
status_reg = self.getStatus(axis)
return (status_reg >> 18) & 1
def inLimitNegative(self, axis):
status_reg = self.getStatus(axis)
return (status_reg >> 19) & 1
def inHome(self, axis):
status_reg = self.getStatus(axis)
return (status_reg >> 20) & 1
def getStatus(self, axis):
cmd = '?STATUS %d' % axis
ans = self.ask(cmd)
status = int(ans.split()[1], 16)
return status
def ask(self, cmd):
self.sock.send(cmd + '\n')
size = 8192
data = self.sock.recv(size)
if data.count("$") > 0:
while data.count('$') < 2:
data = data + self.sock.recv(size)
if self.DEBUG:
print cmd, '->', data
return data
def send(self, cmd):
ans = self.ask('#'+cmd)
if ans.find('OK') == -1:
raise Exception('Icepap Exceptioncmd %s: %s' % (cmd, ans))
Controller Parameters
- timer
- monitor
- trigger_type
COUNTER TIMER HAS ATTRIBUTE: Value
To illustrate a counter/timer device, we can imagine that we want to count the recieved bytes in the network eth0 interface. Below there is the code that implements a counter timer controller without any hardware dependency.
sardana@sardanademo:~/poolcontrollers$ wget https://sourceforge.net/p/sardana/wiki/Howto-CreateControllers/attachment/myctctrl.py
Door_demo1_1 [7]: %defctrl MiniNetworkPackagesCounterTimerController mininetctctrl
Door_demo1_1 [7]: %defelem netcounter mininetctctrl 1
Door_demo1_1 [7]: %defmeas mg1 ct01 netcounter
Door_demo1_1 [7]: ascan mot01 0 10 10 5
from sardana import State
from sardana.pool.controller import CounterTimerController
class MiniNetworkPackagesCounterTimerController(CounterTimerController):
""" This controller provides interface for network packages counting."""
MaxDevice = 1
def __init__(self, inst, props, *args, **kwargs):
CounterTimerController.__init__(self,inst,props, *args, **kwargs)
self.acq_time = 1.
self.acq_end_time = time.time()
self.start_counts = 0
def AddDevice(self, axis):
pass
def DeleteDevice(self, axis):
pass
def LoadOne(self, axis, value):
self.acq_time = value
def StateOne(self, axis):
state = State.On
if time.time() < self.acq_end_time:
state = State.Moving
status_string = 'My custom status info'
return state, status_string
def StartOneCT(self, axis):
self.acq_end_time = time.time() + self.acq_time
self.start_counts = self.read_network_counts()
def ReadOne(self, axis):
counts = self.read_network_counts()
return counts - self.start_counts
def StartAllCT(self):
pass
def AbortOne(self, axis):
self.acq_end_time = time.time()
def read_network_counts(self):
with os.popen('cat /proc/net/dev |grep eth0') as fd:
output = fd.read()
recv_bytes_start = output.find(':') + 2
recv_bytes_end = output.find(' ', recv_bytes_start)
return int(output[recv_bytes_start:recv_bytes_end])
ZEROD HAS ATTRIBUTES:
Value
CumulationType
CurrentValue
TimeBuffer
Value
ValueBuffer
In order to get in touch with the 0D devices, from the controller point of view, it needs just to provide values to the Pool, which will manage the buffer of values and then compute some kind of averaging calculation. In this case, the controller relies on the '/proc/loadavg' hardware and provides three devices, loadavg of last minute, last 5 minutes, and last 15 minutes.
sardana@sardanademo:~/poolcontrollers$ wget https://sourceforge.net/p/sardana/wiki/Howto-CreateControllers/attachment/myzerodctrl.py
Door_demo1_1 [10]: %lsctrllib
Door_demo1_1 [11]: %defctrl MiniAvgLoadZeroDController miniloadavg0dctrl
Door_demo1_1 [12]: %defelem loadavg1m miniloadavg0dctrl 1
Door_demo1_1 [13]: %defelem loadavg5m miniloadavg0dctrl 2
Door_demo1_1 [14]: %defelem loadavg15m miniloadavg0dctrl 3
Door_demo1_1 [15]: %defmeas mg2 ct01 netcounter loadavg1m loadavg5m loadavg15m
Door_demo1_1 [16]: senv ActiveMntGrp mg2
Door_demo1_1 [17]: ascan ipap8 0 10000 10 1
import os
### Needed for the MotorController interface
from sardana import State
from sardana.pool.controller import ZeroDController
class MiniAvgLoadZeroDController(ZeroDController):
""" This controller provides interface for CPU avgload."""
MaxDevice = 3
def __init__(self, inst, props, *args, **kwargs):
ZeroDController.__init__(self,inst,props, *args, **kwargs)
def AddDevice(self, axis):
pass
def DeleteDevice(self, axis):
pass
def StateOne(self, axis):
state = State.On
status_string = 'My custom status info'
return state, status_string
def ReadOne(self, axis):
loadavg = self.read_loadavg()
return loadavg[axis - 1]
def read_loadavg(self):
with os.popen('cat /proc/loadavg') as fd:
output = fd.read()
return map(float, output.split()[:3])