sources for collect.py [rev. unknown]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
"""
Collect test items at filesystem and python module levels. 
Collectors and test items form a tree.  The difference
between a collector and a test item as seen from the session 
is smalll.  Collectors usually return a list of child 
collectors/items whereas items usually return None 
indicating a successful test run.  
The is a schematic example of a tree of collectors and test items:: 
    Directory
        Module 
            Class 
                Instance   
                    Function  
                    Generator 
                        ... 
            Function 
            Generator 
                Function 
        Directory       
            ... 
""" 
from __future__ import generators 
import py
from py.__.test.outcome import Skipped
def configproperty(name):
    def fget(self):
        #print "retrieving %r property from %s" %(name, self.fspath)
        return self._config.getvalue(name, self.fspath) 
    return property(fget)
class Collector(object): 
    """ Collector instances are iteratively generated
        (through their run() and join() methods)
        and form a tree.  attributes::
        parent: attribute pointing to the parent collector
                (or None if it is the root collector)
        name:   basename of this collector object
    """
    def __init__(self, name, parent=None):
        self.name = name 
        self.parent = parent
        self._config = getattr(parent, '_config', py.test.config)
        self.fspath = getattr(parent, 'fspath', None) 
    Module = configproperty('Module')
    DoctestFile = configproperty('DoctestFile')
    Directory = configproperty('Directory')
    Class = configproperty('Class')
    Instance = configproperty('Instance')
    Function = configproperty('Function')
    Generator = configproperty('Generator')
    _stickyfailure = None
    def __repr__(self): 
        return "<%s %r>" %(self.__class__.__name__, self.name) 
    def __eq__(self, other): 
        # XXX a rather strict check for now to not confuse
        #     the SetupState.prepare() logic
        return self is other
    
    def __hash__(self):
        return hash((self.name, self.parent))
    
    def __ne__(self, other):
        return not self == other
    def __cmp__(self, other): 
        s1 = self._getsortvalue()
        s2 = other._getsortvalue()
        #print "cmp", s1, s2
        return cmp(s1, s2) 
   
    def run(self):
        """ returns a list of names available from this collector.
            You can return an empty list.  Callers of this method
            must take care to catch exceptions properly.  The session
            object guards its calls to ``colitem.run()`` in its
            ``session.runtraced(colitem)`` method, including
            catching of stdout.
        """
        raise NotImplementedError("abstract")
    def join(self, name):
        """  return a child item for the given name.  Usually the
             session feeds the join method with each name obtained
             from ``colitem.run()``.  If the return value is None
             it means the ``colitem`` was not able to resolve
             with the given name.
        """
    def obj(): 
        def fget(self):
            try: 
                return self._obj   
            except AttributeError: 
                self._obj = obj = self._getobj() 
                return obj 
        def fset(self, value): 
            self._obj = value 
        return property(fget, fset, None, "underlying object") 
    obj = obj()
    def _getobj(self):
        return getattr(self.parent.obj, self.name)
    def multijoin(self, namelist): 
        """ return a list of colitems for the given namelist. """ 
        return [self.join(name) for name in namelist]
    def _getpathlineno(self): 
        return self.fspath, py.std.sys.maxint 
    def setup(self): 
        pass 
    def teardown(self): 
        pass 
    def listchain(self): 
        """ return list of all parent collectors up to ourself. """ 
        l = [self]
        while 1: 
            x = l[-1]
            if x.parent is not None: 
                l.append(x.parent) 
            else: 
                l.reverse() 
                return l 
    def listnames(self): 
        return [x.name for x in self.listchain()]
    def _getitembynames(self, namelist):
        if isinstance(namelist, str):
            namelist = namelist.split("/")
        cur = self
        for name in namelist:
            if name:
                next = cur.join(name)
                assert next is not None, (cur, name, namelist)
                cur = next
        return cur
    def _haskeyword(self, keyword): 
        return keyword in self.name
    def _getmodpath(self):
        """ return dotted module path (relative to the containing). """ 
        inmodule = False 
        newl = []
        for x in self.listchain(): 
            if not inmodule and not isinstance(x, Module): 
                continue
            if not inmodule:  
                inmodule = True
                continue
            if newl and x.name[:1] in '([': 
                newl[-1] += x.name 
            else: 
                newl.append(x.name) 
        return ".".join(newl) 
    def _skipbykeyword(self, keyword): 
        """ raise Skipped() exception if the given keyword 
            matches for this collector. 
        """
        if not keyword:
            return
        chain = self.listchain()
        for key in filter(None, keyword.split()): 
            eor = key[:1] == '-'
            if eor:
                key = key[1:]
            if not (eor ^ self._matchonekeyword(key, chain)):
                py.test.skip("test not selected by keyword %r" %(keyword,))
    def _matchonekeyword(self, key, chain): 
        for subitem in chain:
            if subitem._haskeyword(key): 
                return True 
        return False
    def _tryiter(self, yieldtype=None, reporterror=None, keyword=None):
        """ yield stop item instances from flattening the collector. 
            XXX deprecated: this way of iteration is not safe in all
            cases. 
        """ 
        if yieldtype is None: 
            yieldtype = py.test.collect.Item 
        if isinstance(self, yieldtype):
            try:
                self._skipbykeyword(keyword)
                yield self
            except Skipped:
                if reporterror is not None:
                    excinfo = py.code.ExceptionInfo()
                    reporterror((excinfo, self))
        else:
            if not isinstance(self, py.test.collect.Item):
                try:
                    if reporterror is not None:
                        reporterror((None, self))
                    for x in self.run(): 
                        for y in self.join(x)._tryiter(yieldtype, 
                                            reporterror, keyword): 
                            yield y
                except KeyboardInterrupt:
                    raise
                except: 
                    if reporterror is not None: 
                        excinfo = py.code.ExceptionInfo()
                        reporterror((excinfo, self)) 
    def _getsortvalue(self): 
        return self.name 
    _captured_out = _captured_err = None
    def startcapture(self): 
        return None # by default collectors don't capture output
    def finishcapture(self): 
        return None # by default collectors don't capture output
    def _getouterr(self): 
        return self._captured_out, self._captured_err
    def _get_collector_trail(self):
        """ Shortcut
        """
        return self._config.get_collector_trail(self)
class FSCollector(Collector): 
    def __init__(self, fspath, parent=None): 
        fspath = py.path.local(fspath) 
        super(FSCollector, self).__init__(fspath.basename, parent) 
        self.fspath = fspath 
class Directory(FSCollector): 
    def filefilter(self, path): 
        if path.check(file=1):
            b = path.purebasename 
            ext = path.ext
            return (b.startswith('test_') or 
                    b.endswith('_test')) and ext in ('.txt', '.py')
    
    def recfilter(self, path): 
        if path.check(dir=1, dotfile=0):
            return path.basename not in ('CVS', '_darcs', '{arch}')
    def run(self):
        files = []
        dirs = []
        for p in self.fspath.listdir():
            if self.filefilter(p):
                files.append(p.basename)
            elif self.recfilter(p):
                dirs.append(p.basename) 
        files.sort()
        dirs.sort()
        return files + dirs
    def join(self, name):
        name2items = self.__dict__.setdefault('_name2items', {})
        try:
            res = name2items[name]
        except KeyError:
            p = self.fspath.join(name)
            res = None
            if p.check(file=1): 
                if p.ext == '.py':
                    res = self.Module(p, parent=self) 
                elif p.ext == '.txt':
                    res = self.DoctestFile(p, parent=self)
            elif p.check(dir=1): 
                Directory = py.test.config.getvalue('Directory', p) 
                res = Directory(p, parent=self) 
            name2items[name] = res 
        return res
class PyCollectorMixin(Collector): 
    def funcnamefilter(self, name): 
        return name.startswith('test') 
    def classnamefilter(self, name): 
        return name.startswith('Test')
    def _buildname2items(self): 
        # NB. we avoid random getattrs and peek in the __dict__ instead
        d = {}
        dicts = [getattr(self.obj, '__dict__', {})]
        for basecls in py.std.inspect.getmro(self.obj.__class__):
            dicts.append(basecls.__dict__)
        seen = {}
        for dic in dicts:
            for name, obj in dic.items():
                if name in seen:
                    continue
                seen[name] = True
                res = self.makeitem(name, obj)
                if res is not None:
                    d[name] = res 
        return d
    def makeitem(self, name, obj, usefilters=True):
        if (not usefilters or self.classnamefilter(name)) and \
            py.std.inspect.isclass(obj):
            return self.Class(name, parent=self)
        elif (not usefilters or self.funcnamefilter(name)) and callable(obj): 
            if obj.func_code.co_flags & 32: # generator function 
                return self.Generator(name, parent=self)
            else: 
                return self.Function(name, parent=self)
    def _prepare(self): 
        if not hasattr(self, '_name2items'): 
            ex = getattr(self, '_name2items_exception', None)
            if ex is not None: 
                raise ex[0], ex[1], ex[2]
            try: 
                self._name2items = self._buildname2items()
            except (SystemExit, KeyboardInterrupt): 
                raise 
            except:
                self._name2items_exception = py.std.sys.exc_info()
                raise
    def run(self): 
        self._prepare()
        itemlist = self._name2items.values()
        itemlist.sort()
        return [x.name for x in itemlist]
    def join(self, name): 
        self._prepare()
        return self._name2items.get(name, None) 
class Module(FSCollector, PyCollectorMixin):
    def run(self):
        if getattr(self.obj, 'disabled', 0):
            return []
        return super(Module, self).run()
    def join(self, name):
        res = super(Module, self).join(name)
        if res is None:
            attr = getattr(self.obj, name, None)
            if attr is not None:
                res = self.makeitem(name, attr, usefilters=False)
        return res
    
    def startcapture(self): 
        self._config._startcapture(self, path=self.fspath)
    def finishcapture(self): 
        self._config._finishcapture(self)
    def __repr__(self): 
        return "<%s %r>" % (self.__class__.__name__, self.name)
    def obj(self): 
        try: 
            return self._obj    
        except AttributeError:
            failure = getattr(self, '_stickyfailure', None)
            if failure is not None: 
                raise failure[0], failure[1], failure[2]
            try: 
                self._obj = obj = self.fspath.pyimport() 
            except KeyboardInterrupt: 
                raise
            except: 
                self._stickyfailure = py.std.sys.exc_info()
                raise 
            return obj 
    obj = property(obj, None, None, "module object")
    def setup(self): 
        if hasattr(self.obj, 'setup_module'): 
            self.obj.setup_module(self.obj) 
    def teardown(self): 
        if hasattr(self.obj, 'teardown_module'): 
            self.obj.teardown_module(self.obj) 
class Class(PyCollectorMixin, Collector): 
    def run(self): 
        if getattr(self.obj, 'disabled', 0):
            return []
        return ["()"]
    def join(self, name):
        assert name == '()'
        return self.Instance(name, self)
    def setup(self): 
        setup_class = getattr(self.obj, 'setup_class', None)
        if setup_class is not None: 
            setup_class = getattr(setup_class, 'im_func', setup_class) 
            setup_class(self.obj) 
    def teardown(self): 
        teardown_class = getattr(self.obj, 'teardown_class', None) 
        if teardown_class is not None: 
            teardown_class = getattr(teardown_class, 'im_func', teardown_class) 
            teardown_class(self.obj) 
    def _getsortvalue(self):
        # try to locate the class in the source - not nice, but probably
        # the most useful "solution" that we have
        try:
            file = (py.std.inspect.getsourcefile(self.obj) or
                    py.std.inspect.getfile(self.obj))
            if not file:
                raise IOError
            lines, lineno = py.std.inspect.findsource(self.obj)
            return py.path.local(file), lineno
        except IOError:
            pass
        # fall back...
        for x in self._tryiter((py.test.collect.Generator, py.test.collect.Item)):
            return x._getsortvalue()
class Instance(PyCollectorMixin, Collector): 
    def _getobj(self): 
        return self.parent.obj()  
    def Function(self): 
        return getattr(self.obj, 'Function', 
                       Collector.Function.__get__(self)) # XXX for python 2.2 
    Function = property(Function)
class FunctionMixin(object):
    """ mixin for the code common to Function and Generator.
    """
    def _getpathlineno(self):
        code = py.code.Code(self.obj) 
        return code.path, code.firstlineno 
    def _getsortvalue(self):  
        return self._getpathlineno() 
    def setup(self): 
        """ perform setup for this test function. """
        if getattr(self.obj, 'im_self', None): 
            name = 'setup_method' 
        else: 
            name = 'setup_function' 
        obj = self.parent.obj 
        meth = getattr(obj, name, None)
        if meth is not None: 
            return meth(self.obj) 
    def teardown(self): 
        """ perform teardown for this test function. """
        if getattr(self.obj, 'im_self', None): 
            name = 'teardown_method' 
        else: 
            name = 'teardown_function' 
        obj = self.parent.obj 
        meth = getattr(obj, name, None)
        if meth is not None: 
            return meth(self.obj) 
class Generator(FunctionMixin, PyCollectorMixin, Collector): 
    def run(self): 
        self._prepare()
        itemlist = self._name2items
        return [itemlist["[%d]" % num].name for num in xrange(len(itemlist))]
    
    def _buildname2items(self): 
        d = {} 
        # slightly hackish to invoke setup-states on
        # collection ...
        self.Function._state.prepare(self)
        for i, x in py.builtin.enumerate(self.obj()): 
            call, args = self.getcallargs(x)
            if not callable(call): 
                raise TypeError("yielded test %r not callable" %(call,))
            name = "[%d]" % i
            d[name] = self.Function(name, self, args, obj=call, sort_value = i)
        return d
        
    def getcallargs(self, obj):
        if isinstance(obj, (tuple, list)):
            call, args = obj[0], obj[1:]
        else:
            call, args = obj, ()
        return call, args 
class DoctestFile(PyCollectorMixin, FSCollector): 
    def run(self):
        return [self.fspath.basename]
    def join(self, name):
        from py.__.test.doctest import DoctestText
        if name == self.fspath.basename: 
            item = DoctestText(self.fspath.basename, parent=self)
            item._content = self.fspath.read()
            return item