[22d026]: Allura / allura / lib / async.py  Maximize  Restore  History

Download this file

176 lines (150 with data), 5.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import re
import logging
from collections import defaultdict
from contextlib import contextmanager
import mock
import kombu
import pkg_resources
log = logging.getLogger(__name__)
class Connection(object):
def __init__(self, hostname, port, userid, password, vhost):
self._conn_proto = kombu.BrokerConnection(
hostname=hostname,
port=port,
userid=userid,
password=password,
virtual_host=vhost)
self._connection_pool = self._conn_proto.Pool(preload=1, limit=None)
self._exchanges = dict(
audit=kombu.Exchange('audit'),
react=kombu.Exchange('react'))
self.reset()
def reset(self):
self._conn = self._connection_pool.acquire()
self._channel_pool = self._conn.ChannelPool(preload=2, limit=10)
@contextmanager
def channel(self):
try:
ch = self._channel_pool.acquire()
except kombu.exceptions.ChannelLimitExceeded:
log.info('Channel pool exhausted, opening a new connection')
self.reset()
ch = self._channel_pool.acquire()
try:
yield ch
finally:
if ch.is_open: ch.release()
else: log.info('ch is not open, not returning to pool')
@contextmanager
def producer(self, xn):
with self.channel() as ch:
prod = kombu.Producer(ch, self._exchanges[xn], auto_declare=False)
yield prod
def clear_exchanges(self):
try:
with self.channel() as ch:
ch.exchange_delete('audit')
except:
pass
try:
with self.channel() as ch:
ch.exchange_delete('react')
except:
pass
def declare_exchanges(self):
self.clear_exchanges()
with self.channel() as ch:
ch.exchange_declare('audit', 'topic', durable=True, auto_delete=False)
with self.channel() as ch:
ch.exchange_declare('react', 'topic', durable=True, auto_delete=False)
def declare_queue(self, xn, qn, keys):
try:
with self.channel() as ch:
ch.queue_delete(qn)
except:
pass
with self.channel() as ch:
ch.queue_declare(qn, durable=True, auto_delete=False)
for k in keys:
with self.channel() as ch:
ch.queue_bind(qn, xn, k)
def publish(self, xn, key_msgs):
'''Publish a series of messages to an exchange using a single producer
xn: exchange name
key_msgs: sequence of (routing_key, message, kwargs) pairs to be published
'''
with self.producer(xn) as p:
for routing_key, body, kwargs in key_msgs:
kwargs.setdefault('serializer', 'pickle')
p.publish(body, routing_key=routing_key, **kwargs)
class MockAMQ(object):
def __init__(self, globals):
self.exchanges = defaultdict(list)
self.queue_bindings = defaultdict(list)
self.globals = globals
def clear(self):
for k,v in self.exchanges.iteritems():
v[:] = []
def create_backend(self):
return mock.Mock()
def publish(self, xn, routing_key, message, **kw):
self.exchanges[xn].append(
dict(routing_key=routing_key, message=message, kw=kw))
def pop(self, xn):
return self.exchanges[xn].pop(0)
def declare_exchanges(self):
pass
def declare_queue(self, xn, qn, keys):
pass
def setup_handlers(self, paste_registry=None):
from allura.command.reactor import tool_consumers, ReactorCommand
from allura.command import base
from allura import model as M
self.queue_bindings = defaultdict(list)
base.log = logging.getLogger('allura.command')
base.M = M
self.tools = []
for ep in pkg_resources.iter_entry_points('allura'):
try:
self.tools.append((ep.name, ep.load()))
except ImportError:
base.log.warning('Canot load entry point %s', ep)
self.reactor = ReactorCommand('reactor_setup')
if paste_registry:
self.reactor.registry = paste_registry
self.reactor.globals = self.globals
self.reactor.parse_args([])
for name, tool in self.tools:
for method, xn, qn, keys in tool_consumers(name, tool):
for k in keys:
self.queue_bindings[xn].append(
dict(key=k, tool_name=name, method=method))
def handle(self, xn, msg=None):
if msg is None:
msg = self.pop(xn)
for handler in self.queue_bindings[xn]:
if self._route_matches(handler['key'], msg['routing_key']):
self._route(xn, msg, handler['tool_name'], handler['method'])
def handle_all(self):
for xn, messages in self.exchanges.items():
messages = list(messages)
self.exchanges[xn][:] = []
for msg in messages:
self.handle(xn, msg)
def _route(self, xn, msg, tool_name, method):
if xn == 'audit':
callback = self.reactor.route_audit(tool_name, method)
else:
callback = self.reactor.route_react(tool_name, method)
data = msg['message']
message = mock.Mock()
message.delivery_info = dict(
routing_key=msg['routing_key'])
message.ack = lambda:None
return callback(data, message)
def _route_matches(self, pattern, key):
re_pattern = (pattern
.replace('.', r'\.')
.replace('*', r'(?:\w+)')
.replace('#', r'(?:\w+)(?:\.\w+)*'))
return re.match(re_pattern+'$', key)