Work at SourceForge, help us to make it a better place! We have an immediate need for a Support Technician in our San Francisco or Denver office.

Close

[r7415]: branches / 1.5 / turbogears / util.py Maximize Restore History

Download this file

util.py    799 lines (638 with data), 25.0 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
"""The TurboGears utility module."""
_all__ = ['Bunch', 'DictObj', 'DictWrapper', 'Enum', 'setlike',
'get_package_name', 'get_model', 'load_project_config',
'ensure_sequence', 'has_arg', 'to_kw', 'from_kw', 'adapt_call',
'call_on_stack', 'remove_keys', 'arg_index',
'inject_arg', 'inject_args', 'add_tg_args', 'bind_args',
'recursive_update', 'combine_contexts',
'request_available', 'flatten_sequence', 'load_class',
'parse_http_accept_header', 'simplify_http_accept_header',
'to_unicode', 'to_utf8', 'quote_cookie', 'unquote_cookie',
'get_template_encoding_default', 'get_mime_type_for_format',
'mime_type_has_charset', 'find_precision', 'copy_if_mutable',
'match_ip', 'deprecated']
import os
import sys
import re
import logging
import warnings
import htmlentitydefs
import socket
import struct
from inspect import getargspec, getargvalues
from itertools import izip, islice, chain
from operator import isSequenceType
from Cookie import _quote as quote_cookie, _unquote as unquote_cookie
import pkg_resources
from cherrypy import request
from turbogears.decorator import decorator
from turbogears import config
def deprecated(message=None):
"""Decorator which can be used to mark functions as deprecated.
It will result in a warning being emitted when the function is used.
Inspired by http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/391367
"""
def decorate(func):
if not decorate.message:
decorate.message = ("Call to deprecated function %s."
% func.__name__)
def new_func(*args, **kwargs):
if not decorate.warned:
warnings.warn(decorate.message, category=DeprecationWarning,
stacklevel=2)
decorate.warned = True
return func(*args, **kwargs)
new_func.__name__ = func.__name__
new_func.__doc__ = func.__doc__
new_func.__dict__.update(func.__dict__)
return new_func
decorate.message = message
decorate.warned = False
return decorate
def missing_dependency_error(name=None):
msg = """\
Before you can run this command, you need to install all the project's
dependencies by running "python setup.py develop" in the project directory, or
you can install the application with "python setup.py install", or build an egg
with "python setup.py bdist_egg" and install it with "easy_install dist/<egg>".
If you are stuck, visit http://docs.turbogears.org/GettingHelp for support."""
if name:
msg = ("This project requires the %s package but it could not be "
"found.\n\n" % name) + msg
return msg
class Bunch(dict):
"""Simple but handy collector of a bunch of named stuff."""
def __repr__(self):
keys = self.keys()
keys.sort()
args = ', '.join(['%s=%r' % (key, self[key]) for key in keys])
return '%s(%s)' % (self.__class__.__name__, args)
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
__setattr__ = dict.__setitem__
def __delattr__(self, name):
try:
del self[name]
except KeyError:
raise AttributeError(name)
class DictObj(Bunch):
@deprecated("Use Bunch instead of DictObj and DictWrapper.")
def __init__(self, *args, **kw):
super(DictObj, self).__init__(*args, **kw)
DictWrapper = DictObj
def Enum(*names):
"""True immutable symbolic enumeration with qualified value access.
Written by Zoran Isailovski:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/413486
"""
# Uncomment the following check if you don't like empty enums.
# if not names:
# raise ValueError("Empty enums are not supported")
class EnumClass(object):
__slots__ = names
def __iter__(self):
return iter(constants)
def __len__(self):
return len(constants)
def __getitem__(self, i):
return constants[i]
def __repr__(self):
return 'Enum' + str(names)
def __str__(self):
return 'enum ' + str(constants)
enumType = EnumClass()
class EnumValue(object):
__slots__ = ('__value')
def __init__(self, value):
self.__value = value
Value = property(lambda self: self.__value)
EnumType = property(lambda self: enumType)
def __hash__(self):
return hash(self.__value)
def __cmp__(self, other):
# C fans might want to remove the following check
# to make all enums comparable by ordinal value
if not (isinstance(other, EnumValue)
and self.EnumType is other.EnumType):
raise TypeError("Only values from the same enum are comparable")
return cmp(self.__value, other.__value)
def __invert__(self):
return constants[maximum - self.__value]
def __nonzero__(self):
return bool(self.__value)
def __repr__(self):
return str(names[self.__value])
maximum = len(names) - 1
constants = [None] * len(names)
for i, each in enumerate(names):
val = EnumValue(i)
setattr(EnumClass, each, val)
constants[i] = val
constants = tuple(constants)
return enumType
class setlike(list):
"""Set preserving item order."""
def add(self, item):
if item not in self:
self.append(item)
def add_all(self, iterable):
for item in iterable:
self.add(item)
def get_project_meta(name):
"""Get egg-info file with that name in the current project."""
for dirname in os.listdir('./'):
if dirname.lower().endswith('egg-info'):
fname = os.path.join(dirname, name)
return fname
def get_project_config():
"""Try to select appropriate project configuration file."""
return os.path.exists('setup.py') and 'dev.cfg' or 'prod.cfg'
def load_project_config(configfile=None):
"""Try to update the project settings from the config file specified.
If configfile is C{None}, uses L{get_project_config} to locate one.
"""
if configfile is None:
configfile = get_project_config()
if not os.path.isfile(configfile):
print 'Config file %s not found or is not a file.' % (
os.path.abspath(configfile),)
sys.exit()
package = get_package_name()
config.update_config(configfile=configfile, modulename=package + '.config')
def get_package_name():
"""Try to find out the package name of the current directory."""
package = config.get('package')
if package:
return package
if hasattr(sys, 'argv') and "--egg" in sys.argv:
projectname = sys.argv[sys.argv.index("--egg")+1]
egg = pkg_resources.get_distribution(projectname)
top_level = egg._get_metadata("top_level.txt")
else:
fname = get_project_meta('top_level.txt')
top_level = fname and open(fname) or []
for package in top_level:
package = package.rstrip()
if package and package != 'locales':
return package
def get_project_name():
pkg_info = get_project_meta('PKG-INFO')
if pkg_info:
name = list(open(pkg_info))[1][6:-1]
return name.strip()
def get_model():
package_name = get_package_name()
if not package_name:
return None
package = __import__(package_name, {}, {}, ["model"])
if hasattr(package, "model"):
return package.model
def ensure_sequence(obj):
"""Construct a sequence from object."""
if obj is None:
return []
elif isSequenceType(obj):
return obj
else:
return [obj]
def to_kw(func, args, kw, start=0):
"""Convert all applicable arguments to keyword arguments."""
argnames, defaults = getargspec(func)[::3]
defaults = ensure_sequence(defaults)
kv_pairs = izip(
islice(argnames, start, len(argnames) - len(defaults)), args)
for k, v in kv_pairs:
kw[k] = v
return args[len(argnames)-len(defaults)-start:], kw
def from_kw(func, args, kw, start=0):
"""Extract named positional arguments from keyword arguments."""
argnames, defaults = getargspec(func)[::3]
defaults = ensure_sequence(defaults)
newargs = [kw.pop(name) for name in islice(argnames, start,
len(argnames) - len(defaults)) if name in kw]
newargs.extend(args)
return newargs, kw
def adapt_call(func, args, kw, start=0):
"""Remove unsupported func arguments from given args list and kw dict.
@param func: the callable to inspect for supported arguments
@type func: callable
@param args: the names of the positional arguments intended to be passed
to func
@type args: list
@param kw: the keyword arguments intended to be passed to func
@type kw: dict
@keyparam start: the number of items from the start of the argument list of
func to disregard. Set start=1 to use adapt_call on a bound method to
disregard the implicit self argument.
@type start: int
Returns args list and kw dict from which arguments unsupported by func
have been removed. The passed in kw dict is also stripped as a side-effect.
The returned objects can then be used to call the target function.
Example:
def myfunc(arg1, arg2, kwarg1='foo'):
pass
args, kw = adapt_call(myfunc, ['args1, 'bogus1'],
{'kwargs1': 'bar', 'bogus2': 'spamm'})
# --> ['args1'], {'kwargs1': 'bar'}
myfunc(*args, **kw)
"""
argnames, varargs, kwargs = getargspec(func)[:3]
del argnames[:start]
if kwargs in (None, "_decorator__kwargs"):
remove_keys(kw, [key for key in kw if key not in argnames])
if varargs in (None, "_decorator__varargs"):
args = args[:len(argnames)]
for n, key in enumerate(argnames):
if key in kw:
args = args[:n]
break
return args, kw
def call_on_stack(func_name, kw, start=0):
"""Check if a call to function matching pattern is on stack."""
try:
frame = sys._getframe(start+1)
except ValueError:
return False
while frame.f_back:
frame = frame.f_back
if frame.f_code.co_name == func_name:
args = getargvalues(frame)[3]
for key in kw.iterkeys():
try:
if kw[key] != args[key]:
break
except (KeyError, TypeError):
break
else:
return True
return False
def remove_keys(dict_, seq):
"""Gracefully remove keys from dict."""
for key in seq:
dict_.pop(key, None)
return dict_
def has_arg(func, argname):
"""Check whether function has argument."""
return argname in getargspec(func)[0]
def arg_index(func, argname):
"""Find index of argument as declared for given function."""
argnames = getargspec(func)[0]
if has_arg(func, argname):
return argnames.index(argname)
else:
return None
def inject_arg(func, argname, argval, args, kw, start=0):
"""Insert argument into call."""
argnames, defaults = getargspec(func)[::3]
defaults = ensure_sequence(defaults)
pos = arg_index(func, argname)
if pos is None or pos > len(argnames) - len(defaults) - 1:
kw[argname] = argval
else:
pos -= start
args = tuple(chain(islice(args, pos), (argval,),
islice(args, pos, None)))
return args, kw
def inject_args(func, injections, args, kw, start=0):
"""Insert arguments into call."""
for argname, argval in injections.iteritems():
args, kw = inject_arg(func, argname, argval, args, kw, start)
return args, kw
def inject_call(func, injections, *args, **kw):
"""Insert arguments and call."""
args, kw = inject_args(func, injections, args, kw)
return func(*args, **kw)
def add_tg_args(func, args):
"""Add hint for special arguments that shall not be removed."""
try:
tg_args = func._tg_args
except AttributeError:
tg_args = set()
tg_args.update(args)
func._tg_args = tg_args
def bind_args(**add):
"""Call with arguments set to a predefined value."""
def entagle(func):
return lambda func, *args, **kw: inject_call(func, add, *args, **kw)
def make_decorator(func):
argnames, varargs, kwargs, defaults = getargspec(func)
defaults = list(ensure_sequence(defaults))
defaults = [d for d in defaults if
argnames[-len(defaults) + defaults.index(d)] not in add]
argnames = [arg for arg in argnames if arg not in add]
return decorator(entagle, (argnames, varargs, kwargs, defaults))(func)
return make_decorator
def recursive_update(to_dict, from_dict):
"""Recursively update all dicts in to_dict with values from from_dict."""
# probably slow as hell :( should be optimized somehow...
for k, v in from_dict.iteritems():
if isinstance(v, dict) and isinstance(to_dict[k], dict):
recursive_update(to_dict[k], v)
else:
to_dict[k] = v
return to_dict
def combine_contexts(frames=None, depth=None):
"""Combine contexts (globals, locals) of frames."""
locals_ = {}
globals_ = {}
if frames is None:
frames = []
if depth is not None:
frames.extend([sys._getframe(d+1) for d in depth])
for frame in frames:
locals_.update(frame.f_locals)
globals_.update(frame.f_globals)
return locals_, globals_
def request_available():
"""Check if cherrypy.request is available."""
stage = getattr(request, 'stage', None)
return stage is not None
def flatten_sequence(seq):
"""Flatten sequence."""
for item in seq:
if isSequenceType(item) and not isinstance(item, basestring):
for item in flatten_sequence(item):
yield item
else:
yield item
def load_class(dottedpath):
"""Load a class from a module in dotted-path notation.
E.g.: load_class("package.module.class").
Based on recipe 16.3 from Python Cookbook, 2ed., by Alex Martelli,
Anna Martelli Ravenscroft, and David Ascher (O'Reilly Media, 2005)
"""
assert dottedpath is not None, "dottedpath must not be None"
splitted_path = dottedpath.split('.')
modulename = '.'.join(splitted_path[:-1])
classname = splitted_path[-1]
try:
try:
module = __import__(modulename, globals(), locals(), [classname])
except ValueError: # Py < 2.5
if not modulename:
module = __import__(__name__.split('.', 1)[0],
globals(), locals(), [classname])
except ImportError:
# properly log the exception information and return None
# to tell caller we did not succeed
logging.exception('tg.utils: Could not import %s'
' because an exception occurred', dottedpath)
return None
try:
return getattr(module, classname)
except AttributeError:
logging.exception('tg.utils: Could not import %s'
' because the class was not found', dottedpath)
return None
def parse_http_accept_header(accept):
"""Parse an HTTP Accept header (RFC 2616) into a sorted list.
The quality factors in the header determine the sort order.
The values can include possible media-range parameters.
This function can also be used for the Accept-Charset,
Accept-Encoding and Accept-Language headers.
"""
if accept is None:
return []
items = []
for item in accept.split(','):
params = item.split(';')
for i, param in enumerate(params[1:]):
param = param.split('=', 1)
if param[0].strip() == 'q':
try:
q = float(param[1])
if not 0 < q <= 1:
raise ValueError
except (IndexError, ValueError):
q = 0
else:
item = ';'.join(params[:i+1])
break
else:
q = 1
if q:
item = item.strip()
if item:
items.append((item, q))
items.sort(key=lambda item: -item[1])
return [item[0] for item in items]
def simplify_http_accept_header(accept):
"""Parse an HTTP Accept header (RFC 2616) into a preferred value.
The quality factors in the header determine the preference.
Possible media-range parameters are allowed, but will be ignored.
This function can also be used for the Accept-Charset,
Accept-Encoding and Accept-Language headers.
This is similar to parse_http_accept_header(accept)[0], but faster.
"""
if accept is None:
return None
best_item = accept
best_q = 0
for item in accept.split(','):
params = item.split(';')
item = params.pop(0)
for param in params:
param = param.split('=', 1)
if param[0].strip() == 'q':
try:
q = float(param[1])
if not 0 < q <= 1:
raise ValueError
except (IndexError, ValueError):
q = 0
break
else:
q = 1
if q > best_q:
item = item.strip()
if item:
best_item = item
if q == 1:
break
best_q = q
return best_item
def to_unicode(value):
"""Convert encoded string to unicode string.
Uses get_template_encoding_default() to guess source string encoding.
Handles turbogears.i18n.lazystring correctly.
"""
if isinstance(value, str):
# try to make sure we won't get UnicodeDecodeError from the template
# by converting all encoded strings to Unicode strings
try:
value = unicode(value)
except UnicodeDecodeError:
try:
value = unicode(value, get_template_encoding_default())
except UnicodeDecodeError:
# fail early
raise ValueError("Non-unicode string: %r" % value)
return value
def to_utf8(value):
"""Convert a unicode string to utf-8 encoded plain string.
Handles turbogears.i18n.lazystring correctly.
Does nothing to already encoded string.
"""
if isinstance(value, str):
pass
elif hasattr(value, '__unicode__'):
value = unicode(value)
if isinstance(value, unicode):
value = value.encode('utf-8')
return value
def get_template_encoding_default(engine_name=None):
"""Return default encoding for template files (Kid, Genshi, etc.)."""
if engine_name is None:
engine_name = config.get('tg.defaultview', 'genshi')
return config.get('%s.encoding' % engine_name,
config.get('%s.default_encoding' % engine_name, 'utf-8'))
_format_mime_types = dict(
plain='text/plain', text='text/plain',
html='text/html', xhtml = 'text/html', # see note below
xml='text/xml', json='application/json')
def get_mime_type_for_format(format):
"""Return default MIME media type for a template format.
Note: By default we are serving xhtml as "text/html" instead of the more
correct "application/xhtml+xml", since many browsers, particularly MSIE,
do not support this. We are assuming that xhtml means XHTML 1.0 here,
where this approach is possible. It would be possible to use some kind
of content negotiation to deliver a customized content type, but we avoid
this because it causes more harm (e.g. with proxies involved) than good.
If you want to serve the proper content type (e.g. for XHTML 1.1),
set tg.format_mime_types= {'xhtml': 'application/xhtml+xml'}.
You can also set a particular content type per controller using the
content_type parameter of the expose decorator.
For detailed information about this issues, see here:
http://www.smackthemouse.com/xhtmlxml, http://schneegans.de/web/xhtml/.
"""
mime_type = config.get('tg.format_mime_types', {}).get(format)
if not mime_type:
mime_type = _format_mime_types.get(format, 'text/html')
return mime_type
def mime_type_has_charset(mime_type):
"""Return whether the MIME media type supports a charset parameter.
Note: According to RFC4627, we do not output a charset parameter
for "application/json" (this type always uses a UTF encoding).
"""
if not mime_type:
return False
if mime_type.startswith('text/'):
return True
if mime_type.startswith('application/'):
if mime_type.endswith('/xml') or mime_type.endswith('+xml'):
return True
if mime_type.endswith('/javascript'):
return True
return False
def find_precision(value):
"""Find precision of some arbitrary value.
The main intention for this function is to use it together with
turbogears.i18n.format.format_decimal() where one has to inform
the precision wanted. So, use it like this:
format_decimal(some_number, find_precision(some_number))
"""
decimals = ''
try:
decimals = str(value).split('.', 1)[1]
except IndexError:
pass
return len(decimals)
def copy_if_mutable(value, feedback=False):
"""Make a copy of the value if it is mutable.
Returns the value. If feedback is set to true, also returns
whether value was mutable as the second element of a tuple.
"""
if isinstance(value, dict):
mutable = True
value = value.copy()
elif isinstance(value, list):
mutable = True
value = value[:]
else:
mutable = False
if feedback:
return value, mutable
else:
return value
def fixentities(htmltext):
"""Replace HTML character entities with numerical references.
Note: This won't handle CDATA sections properly.
"""
def repl(matchobj):
entity = htmlentitydefs.entitydefs.get(matchobj.group(1).lower())
if not entity:
return matchobj.group(0)
elif len(entity) == 1:
if entity in '&<>\'"':
return matchobj.group(0)
return '&#%d;' % ord(entity)
else:
return entity
return re.sub('&(\w+);?', repl, htmltext)
if hasattr(socket, 'inet_pton') and hasattr(socket, 'AF_INET6'):
def inet6_aton(addr):
"""Convert IP6 standard hex notation to IP6 address."""
return socket.inet_pton(socket.AF_INET6, addr)
else: # Windows etc.
import string
_inet6_chars = string.hexdigits + ':.'
def inet6_aton(addr):
"""Convert IPv6 standard hex notation to IPv6 address.
Inspired by http://twistedmatrix.com/trac/.
"""
faulty = addr.lstrip(_inet6_chars)
if faulty:
raise ValueError("Illegal character '%c' in IPv6 address" % faulty[0])
parts = addr.split(':')
elided = parts.count('')
extenso = '.' in parts[-1] and 7 or 8
if len(parts) > extenso or elided > 3:
raise ValueError("Syntactically invalid IPv6 address")
if elided == 3:
return '\x00' * 16
if elided:
zeros = ['0'] * (extenso - len(parts) + elided)
if addr.startswith('::'):
parts[:2] = zeros
elif addr.endswith('::'):
parts[-2:] = zeros
else:
idx = parts.index('')
parts[idx:idx+1] = zeros
if len(parts) != extenso:
raise ValueError("Syntactically invalid IPv6 address")
if extenso == 7:
ipv4 = parts.pop()
if ipv4.count('.') != 3:
raise ValueError("Syntactically invalid IPv6 address")
parts = [int(x, 16) for x in parts]
return struct.pack('!6H', *parts) + socket.inet_aton(ipv4)
else:
parts = [int(x, 16) for x in parts]
return struct.pack('!8H', *parts)
def inet_aton(addr):
"""Convert IPv4 or IPv6 notation to IPv6 address."""
if ':' in addr:
return inet6_aton(addr)
else:
return struct.pack('!QL', 0, 0xffff) + socket.inet_aton(addr)
def _inet_prefix(addr, masked):
"""Remove the number of masked bits from the IPV6 address."""
hi, lo = struct.unpack("!QQ", addr)
return (hi << 64 | lo) >> masked
def match_ip(cidr, ip):
"""Check whether IP address matches CIDR IP address block."""
if '/' in cidr:
cidr, prefix = cidr.split('/', 1)
masked = (':' in cidr and 128 or 32) - int(prefix)
else:
masked = None
cidr = inet_aton(cidr)
ip = inet_aton(ip)
if masked:
cidr = _inet_prefix(cidr, masked)
ip = _inet_prefix(ip, masked)
return ip == cidr