Cython has moved to github.
cython-devel
view tests/run/bufaccess.pyx @ 1622:4f0327bdebc9
test for temp allocation bug in call
| author | Robert Bradshaw <robertwb@math.washington.edu> |
|---|---|
| date | Sat Jan 17 01:25:34 2009 -0800 (3 years ago) |
| parents | 1b08009689f0 4a96e1aff2d4 |
| children | bd501ebaffa2 162a2b63c0da |
line source
1 # Tests the buffer access syntax functionality by constructing
2 # mock buffer objects.
3 #
4 # Note that the buffers are mock objects created for testing
5 # the buffer access behaviour -- for instance there is no flag
6 # checking in the buffer objects (why test our test case?), rather
7 # what we want to test is what is passed into the flags argument.
8 #
10 from __future__ import unicode_literals
12 cimport stdlib
13 cimport python_buffer
14 cimport stdio
15 cimport cython
17 from python_ref cimport PyObject
19 __test__ = {}
21 import re
22 exclude = []#re.compile('object').search]
24 def testcase(func):
25 for e in exclude:
26 if e(func.__name__):
27 return func
28 __test__[func.__name__] = func.__doc__
29 return func
31 def testcas(a):
32 pass
35 #
36 # Buffer acquire and release tests
37 #
39 def nousage():
40 """
41 The challenge here is just compilation.
42 """
43 cdef object[int, ndim=2] buf
45 def printbuf():
46 """
47 Just compilation.
48 """
49 cdef object[int, ndim=2] buf
50 print buf
52 @testcase
53 def acquire_release(o1, o2):
54 """
55 >>> A = IntMockBuffer("A", range(6))
56 >>> B = IntMockBuffer("B", range(6))
57 >>> acquire_release(A, B)
58 acquired A
59 released A
60 acquired B
61 released B
62 >>> acquire_release(None, None)
63 >>> acquire_release(None, B)
64 acquired B
65 released B
66 """
67 cdef object[int] buf
68 buf = o1
69 buf = o2
71 @testcase
72 def acquire_raise(o):
73 """
74 Apparently, doctest won't handle mixed exceptions and print
75 stats, so need to circumvent this.
77 >>> A = IntMockBuffer("A", range(6))
78 >>> A.resetlog()
79 >>> acquire_raise(A)
80 Traceback (most recent call last):
81 ...
82 Exception: on purpose
83 >>> A.printlog()
84 acquired A
85 released A
87 """
88 cdef object[int] buf
89 buf = o
90 raise Exception("on purpose")
92 @testcase
93 def acquire_failure1():
94 """
95 >>> acquire_failure1()
96 acquired working
97 0 3
98 0 3
99 released working
100 """
101 cdef object[int] buf
102 buf = IntMockBuffer("working", range(4))
103 print buf[0], buf[3]
104 try:
105 buf = ErrorBuffer()
106 assert False
107 except Exception:
108 print buf[0], buf[3]
110 @testcase
111 def acquire_failure2():
112 """
113 >>> acquire_failure2()
114 acquired working
115 0 3
116 0 3
117 released working
118 """
119 cdef object[int] buf = IntMockBuffer("working", range(4))
120 print buf[0], buf[3]
121 try:
122 buf = ErrorBuffer()
123 assert False
124 except Exception:
125 print buf[0], buf[3]
127 @testcase
128 def acquire_failure3():
129 """
130 >>> acquire_failure3()
131 acquired working
132 0 3
133 released working
134 acquired working
135 0 3
136 released working
137 """
138 cdef object[int] buf
139 buf = IntMockBuffer("working", range(4))
140 print buf[0], buf[3]
141 try:
142 buf = 3
143 assert False
144 except Exception:
145 print buf[0], buf[3]
147 @testcase
148 def acquire_failure4():
149 """
150 >>> acquire_failure4()
151 acquired working
152 0 3
153 released working
154 acquired working
155 0 3
156 released working
157 """
158 cdef object[int] buf = IntMockBuffer("working", range(4))
159 print buf[0], buf[3]
160 try:
161 buf = 2
162 assert False
163 except Exception:
164 print buf[0], buf[3]
166 @testcase
167 def acquire_failure5():
168 """
169 >>> acquire_failure5()
170 Traceback (most recent call last):
171 ...
172 ValueError: Buffer acquisition failed on assignment; and then reacquiring the old buffer failed too!
173 """
174 cdef object[int] buf
175 buf = IntMockBuffer("working", range(4))
176 buf.fail = True
177 buf = 3
180 @testcase
181 def acquire_nonbuffer1(first, second=None):
182 """
183 >>> acquire_nonbuffer1(3)
184 Traceback (most recent call last):
185 ...
186 TypeError: 'int' does not have the buffer interface
187 >>> acquire_nonbuffer1(type)
188 Traceback (most recent call last):
189 ...
190 TypeError: 'type' does not have the buffer interface
191 >>> acquire_nonbuffer1(None, 2)
192 Traceback (most recent call last):
193 ...
194 TypeError: 'int' does not have the buffer interface
195 """
196 cdef object[int] buf
197 buf = first
198 buf = second
200 @testcase
201 def acquire_nonbuffer2():
202 """
203 >>> acquire_nonbuffer2()
204 acquired working
205 0 3
206 released working
207 acquired working
208 0 3
209 released working
210 """
211 cdef object[int] buf = IntMockBuffer("working", range(4))
212 print buf[0], buf[3]
213 try:
214 buf = ErrorBuffer
215 assert False
216 except Exception:
217 print buf[0], buf[3]
220 @testcase
221 def as_argument(object[int] bufarg, int n):
222 """
223 >>> A = IntMockBuffer("A", range(6))
224 >>> as_argument(A, 6)
225 acquired A
226 0 1 2 3 4 5 END
227 released A
228 """
229 cdef int i
230 for i in range(n):
231 print bufarg[i],
232 print 'END'
234 @testcase
235 def as_argument_defval(object[int] bufarg=IntMockBuffer('default', range(6)), int n=6):
236 """
237 >>> as_argument_defval()
238 acquired default
239 0 1 2 3 4 5 END
240 released default
241 >>> A = IntMockBuffer("A", range(6))
242 >>> as_argument_defval(A, 6)
243 acquired A
244 0 1 2 3 4 5 END
245 released A
246 """
247 cdef int i
248 for i in range(n):
249 print bufarg[i],
250 print 'END'
252 @testcase
253 def cdef_assignment(obj, n):
254 """
255 >>> A = IntMockBuffer("A", range(6))
256 >>> cdef_assignment(A, 6)
257 acquired A
258 0 1 2 3 4 5 END
259 released A
261 """
262 cdef object[int] buf = obj
263 cdef int i
264 for i in range(n):
265 print buf[i],
266 print 'END'
268 @testcase
269 def forin_assignment(objs, int pick):
270 """
271 >>> A = IntMockBuffer("A", range(6))
272 >>> B = IntMockBuffer("B", range(6))
273 >>> forin_assignment([A, B, A, A], 2)
274 acquired A
275 2
276 released A
277 acquired B
278 2
279 released B
280 acquired A
281 2
282 released A
283 acquired A
284 2
285 released A
286 """
287 cdef object[int] buf
288 for buf in objs:
289 print buf[pick]
291 @testcase
292 def cascaded_buffer_assignment(obj):
293 """
294 >>> A = IntMockBuffer("A", range(6))
295 >>> cascaded_buffer_assignment(A)
296 acquired A
297 acquired A
298 released A
299 released A
300 """
301 cdef object[int] a, b
302 a = b = obj
304 @testcase
305 def tuple_buffer_assignment1(a, b):
306 """
307 >>> A = IntMockBuffer("A", range(6))
308 >>> B = IntMockBuffer("B", range(6))
309 >>> tuple_buffer_assignment1(A, B)
310 acquired A
311 acquired B
312 released A
313 released B
314 """
315 cdef object[int] x, y
316 x, y = a, b
318 @testcase
319 def tuple_buffer_assignment2(tup):
320 """
321 >>> A = IntMockBuffer("A", range(6))
322 >>> B = IntMockBuffer("B", range(6))
323 >>> tuple_buffer_assignment2((A, B))
324 acquired A
325 acquired B
326 released A
327 released B
328 """
329 cdef object[int] x, y
330 x, y = tup
332 @testcase
333 def explicitly_release_buffer():
334 """
335 >>> explicitly_release_buffer()
336 acquired A
337 released A
338 After release
339 """
340 cdef object[int] x = IntMockBuffer("A", range(10))
341 x = None
342 print "After release"
344 #
345 # Format strings
346 #
347 @testcase
348 def alignment_string(object[int] buf):
349 """
350 >>> alignment_string(IntMockBuffer(None, [1,2], format="@i"))
351 2
352 >>> alignment_string(IntMockBuffer(None, [1,2], format="@i@@"))
353 2
354 >>> alignment_string(IntMockBuffer(None, [1,2], format=">i"))
355 Traceback (most recent call last):
356 ...
357 ValueError: Buffer acquisition error: Only native byte order, size and alignment supported.
358 >>> alignment_string(IntMockBuffer(None, [1,2], format="<i"))
359 Traceback (most recent call last):
360 ...
361 ValueError: Buffer acquisition error: Only native byte order, size and alignment supported.
362 >>> alignment_string(IntMockBuffer(None, [1,2], format="=i"))
363 Traceback (most recent call last):
364 ...
365 ValueError: Buffer acquisition error: Only native byte order, size and alignment supported.
366 >>> alignment_string(IntMockBuffer(None, [1,2], format="!i"))
367 Traceback (most recent call last):
368 ...
369 ValueError: Buffer acquisition error: Only native byte order, size and alignment supported.
370 """
371 print buf[1]
373 @testcase
374 def wrong_string(object[int] buf):
375 """
376 >>> wrong_string(IntMockBuffer(None, [1,2], format="if"))
377 Traceback (most recent call last):
378 ...
379 ValueError: Buffer dtype mismatch (expected end, got float)
380 >>> wrong_string(IntMockBuffer(None, [1,2], format="$$"))
381 Traceback (most recent call last):
382 ...
383 ValueError: Buffer dtype mismatch (expected int, got unparseable format string)
384 """
385 print buf[1]
387 @testcase
388 def int_and_long_are_same():
389 """
390 >>> int_and_long_are_same()
391 """
392 cdef object[int] intarr
393 cdef object[long] longarr
394 if sizeof(int) == sizeof(long):
395 intarr = IntMockBuffer(None, [1,2], format='l')
396 longarr = IntMockBuffer(None, [1,2])
398 #
399 # Getting items and index bounds checking
400 #
401 @testcase
402 def get_int_2d(object[int, ndim=2] buf, int i, int j):
403 """
404 >>> C = IntMockBuffer("C", range(6), (2,3))
405 >>> get_int_2d(C, 1, 1)
406 acquired C
407 released C
408 4
410 Check negative indexing:
411 >>> get_int_2d(C, -1, 0)
412 acquired C
413 released C
414 3
415 >>> get_int_2d(C, -1, -2)
416 acquired C
417 released C
418 4
419 >>> get_int_2d(C, -2, -3)
420 acquired C
421 released C
422 0
424 Out-of-bounds errors:
425 >>> get_int_2d(C, 2, 0)
426 Traceback (most recent call last):
427 ...
428 IndexError: Out of bounds on buffer access (axis 0)
429 >>> get_int_2d(C, 0, -4)
430 Traceback (most recent call last):
431 ...
432 IndexError: Out of bounds on buffer access (axis 1)
433 """
434 return buf[i, j]
436 @testcase
437 def get_int_2d_uintindex(object[int, ndim=2] buf, unsigned int i, unsigned int j):
438 """
439 Unsigned indexing:
440 >>> C = IntMockBuffer("C", range(6), (2,3))
441 >>> get_int_2d_uintindex(C, 0, 0)
442 acquired C
443 released C
444 0
445 >>> get_int_2d_uintindex(C, 1, 2)
446 acquired C
447 released C
448 5
449 """
450 # This is most interesting with regards to the C code
451 # generated.
452 return buf[i, j]
454 @testcase
455 def set_int_2d(object[int, ndim=2] buf, int i, int j, int value):
456 """
457 Uses get_int_2d to read back the value afterwards. For pure
458 unit test, one should support reading in MockBuffer instead.
460 >>> C = IntMockBuffer("C", range(6), (2,3))
461 >>> set_int_2d(C, 1, 1, 10)
462 acquired C
463 released C
464 >>> get_int_2d(C, 1, 1)
465 acquired C
466 released C
467 10
469 Check negative indexing:
470 >>> set_int_2d(C, -1, 0, 3)
471 acquired C
472 released C
473 >>> get_int_2d(C, -1, 0)
474 acquired C
475 released C
476 3
478 >>> set_int_2d(C, -1, -2, 8)
479 acquired C
480 released C
481 >>> get_int_2d(C, -1, -2)
482 acquired C
483 released C
484 8
486 >>> set_int_2d(C, -2, -3, 9)
487 acquired C
488 released C
489 >>> get_int_2d(C, -2, -3)
490 acquired C
491 released C
492 9
494 Out-of-bounds errors:
495 >>> set_int_2d(C, 2, 0, 19)
496 Traceback (most recent call last):
497 ...
498 IndexError: Out of bounds on buffer access (axis 0)
499 >>> set_int_2d(C, 0, -4, 19)
500 Traceback (most recent call last):
501 ...
502 IndexError: Out of bounds on buffer access (axis 1)
504 """
505 buf[i, j] = value
507 @testcase
508 def list_comprehension(object[int] buf, len):
509 """
510 >>> list_comprehension(IntMockBuffer(None, [1,2,3]), 3)
511 1|2|3
512 """
513 cdef int i
514 print u"|".join([unicode(buf[i]) for i in range(len)])
516 #
517 # The negative_indices buffer option
518 #
519 @testcase
520 def no_negative_indices(object[int, negative_indices=False] buf, int idx):
521 """
522 The most interesting thing here is to inspect the C source and
523 make sure optimal code is produced.
525 >>> A = IntMockBuffer(None, range(6))
526 >>> no_negative_indices(A, 3)
527 3
528 >>> no_negative_indices(A, -1)
529 Traceback (most recent call last):
530 ...
531 IndexError: Out of bounds on buffer access (axis 0)
532 """
533 return buf[idx]
535 #
536 # Buffer type mismatch examples. Varying the type and access
537 # method simultaneously, the odds of an interaction is virtually
538 # zero.
539 #
540 @testcase
541 def fmtst1(buf):
542 """
543 >>> fmtst1(IntMockBuffer("A", range(3)))
544 Traceback (most recent call last):
545 ...
546 ValueError: Buffer dtype mismatch (expected float, got int)
547 """
548 cdef object[float] a = buf
550 @testcase
551 def fmtst2(object[int] buf):
552 """
553 >>> fmtst2(FloatMockBuffer("A", range(3)))
554 Traceback (most recent call last):
555 ...
556 ValueError: Buffer dtype mismatch (expected int, got float)
557 """
559 @testcase
560 def ndim1(object[int, ndim=2] buf):
561 """
562 >>> ndim1(IntMockBuffer("A", range(3)))
563 Traceback (most recent call last):
564 ...
565 ValueError: Buffer has wrong number of dimensions (expected 2, got 1)
566 """
568 #
569 # Test which flags are passed.
570 #
571 @testcase
572 def readonly(obj):
573 """
574 >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
575 >>> readonly(R)
576 acquired R
577 25
578 released R
579 >>> [str(x) for x in R.recieved_flags] # Works in both py2 and py3
580 ['FORMAT', 'INDIRECT', 'ND', 'STRIDES']
581 """
582 cdef object[unsigned short int, ndim=3] buf = obj
583 print buf[2, 2, 1]
585 @testcase
586 def writable(obj):
587 """
588 >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
589 >>> writable(R)
590 acquired R
591 released R
592 >>> [str(x) for x in R.recieved_flags] # Py2/3
593 ['FORMAT', 'INDIRECT', 'ND', 'STRIDES', 'WRITABLE']
594 """
595 cdef object[unsigned short int, ndim=3] buf = obj
596 buf[2, 2, 1] = 23
598 @testcase
599 def strided(object[int, ndim=1, mode='strided'] buf):
600 """
601 >>> A = IntMockBuffer("A", range(4))
602 >>> strided(A)
603 acquired A
604 released A
605 2
606 >>> [str(x) for x in A.recieved_flags] # Py2/3
607 ['FORMAT', 'ND', 'STRIDES']
609 Check that the suboffsets were patched back prior to release.
610 >>> A.release_ok
611 True
612 """
613 return buf[2]
615 @testcase
616 def c_contig(object[int, ndim=1, mode='c'] buf):
617 """
618 >>> A = IntMockBuffer(None, range(4))
619 >>> c_contig(A)
620 2
621 >>> [str(x) for x in A.recieved_flags]
622 ['FORMAT', 'ND', 'STRIDES', 'C_CONTIGUOUS']
623 """
624 return buf[2]
626 @testcase
627 def c_contig_2d(object[int, ndim=2, mode='c'] buf):
628 """
629 Multi-dim has seperate implementation
631 >>> A = IntMockBuffer(None, range(12), shape=(3,4))
632 >>> c_contig_2d(A)
633 7
634 >>> [str(x) for x in A.recieved_flags]
635 ['FORMAT', 'ND', 'STRIDES', 'C_CONTIGUOUS']
636 """
637 return buf[1, 3]
639 @testcase
640 def f_contig(object[int, ndim=1, mode='fortran'] buf):
641 """
642 >>> A = IntMockBuffer(None, range(4))
643 >>> f_contig(A)
644 2
645 >>> [str(x) for x in A.recieved_flags]
646 ['FORMAT', 'ND', 'STRIDES', 'F_CONTIGUOUS']
647 """
648 return buf[2]
650 @testcase
651 def f_contig_2d(object[int, ndim=2, mode='fortran'] buf):
652 """
653 Must set up strides manually to ensure Fortran ordering.
655 >>> A = IntMockBuffer(None, range(12), shape=(4,3), strides=(1, 4))
656 >>> f_contig_2d(A)
657 7
658 >>> [str(x) for x in A.recieved_flags]
659 ['FORMAT', 'ND', 'STRIDES', 'F_CONTIGUOUS']
660 """
661 return buf[3, 1]
663 #
664 # Test compiler options for bounds checking. We create an array with a
665 # safe "boundary" (memory
666 # allocated outside of what it published) and then check whether we get back
667 # what we stored in the memory or an error.
669 @testcase
670 def safe_get(object[int] buf, int idx):
671 """
672 >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
674 Validate our testing buffer...
675 >>> safe_get(A, 0)
676 5
677 >>> safe_get(A, 2)
678 7
679 >>> safe_get(A, -3)
680 5
682 Access outside it. This is already done above for bounds check
683 testing but we include it to tell the story right.
685 >>> safe_get(A, -4)
686 Traceback (most recent call last):
687 ...
688 IndexError: Out of bounds on buffer access (axis 0)
689 >>> safe_get(A, 3)
690 Traceback (most recent call last):
691 ...
692 IndexError: Out of bounds on buffer access (axis 0)
693 """
694 return buf[idx]
696 @testcase
697 @cython.boundscheck(False) # outer decorators should take precedence
698 @cython.boundscheck(True)
699 def unsafe_get(object[int] buf, int idx):
700 """
701 Access outside of the area the buffer publishes.
702 >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
703 >>> unsafe_get(A, -4)
704 4
705 >>> unsafe_get(A, -5)
706 3
707 >>> unsafe_get(A, 3)
708 8
709 """
710 return buf[idx]
712 @testcase
713 @cython.boundscheck(False)
714 def unsafe_get_nonegative(object[int, negative_indices=False] buf, int idx):
715 """
716 Also inspect the C source to see that it is optimal...
718 >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
719 >>> unsafe_get_nonegative(A, -2)
720 3
721 """
722 return buf[idx]
724 @testcase
725 def mixed_get(object[int] buf, int unsafe_idx, int safe_idx):
726 """
727 >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
728 >>> mixed_get(A, -4, 0)
729 (4, 5)
730 >>> mixed_get(A, 0, -4)
731 Traceback (most recent call last):
732 ...
733 IndexError: Out of bounds on buffer access (axis 0)
734 """
735 with cython.boundscheck(False):
736 one = buf[unsafe_idx]
737 with cython.boundscheck(True):
738 two = buf[safe_idx]
739 return (one, two)
741 #
742 # Coercions
743 #
744 @testcase
745 def coercions(object[unsigned char] uc):
746 """
747 TODO
748 """
749 print type(uc[0])
750 uc[0] = -1
751 print uc[0]
752 uc[0] = <int>3.14
753 print uc[0]
755 cdef char* ch = "asfd"
756 cdef object[object] objbuf
757 objbuf[3] = ch
760 #
761 # Testing that accessing data using various types of buffer access
762 # all works.
763 #
765 def printbuf_int(object[int] buf, shape):
766 # Utility func
767 cdef int i
768 for i in range(shape[0]):
769 print buf[i],
770 print 'END'
773 @testcase
774 def printbuf_int_2d(o, shape):
775 """
776 Strided:
778 >>> printbuf_int_2d(IntMockBuffer("A", range(6), (2,3)), (2,3))
779 acquired A
780 0 1 2 END
781 3 4 5 END
782 released A
783 >>> printbuf_int_2d(IntMockBuffer("A", range(100), (3,3), strides=(20,5)), (3,3))
784 acquired A
785 0 5 10 END
786 20 25 30 END
787 40 45 50 END
788 released A
790 Indirect:
791 >>> printbuf_int_2d(IntMockBuffer("A", [[1,2],[3,4]]), (2,2))
792 acquired A
793 1 2 END
794 3 4 END
795 released A
796 """
797 # should make shape builtin
798 cdef object[int, ndim=2] buf
799 buf = o
800 cdef int i, j
801 for i in range(shape[0]):
802 for j in range(shape[1]):
803 print buf[i, j],
804 print 'END'
806 @testcase
807 def printbuf_float(o, shape):
808 """
809 >>> printbuf_float(FloatMockBuffer("F", [1.0, 1.25, 0.75, 1.0]), (4,))
810 acquired F
811 1.0 1.25 0.75 1.0 END
812 released F
813 """
815 # should make shape builtin
816 cdef object[float] buf
817 buf = o
818 cdef int i, j
819 for i in range(shape[0]):
820 print buf[i],
821 print "END"
824 #
825 # Test assignments
826 #
827 @testcase
828 def inplace_operators(object[int] buf):
829 """
830 >>> buf = IntMockBuffer(None, [2, 2])
831 >>> inplace_operators(buf)
832 >>> printbuf_int(buf, (2,))
833 0 3 END
834 """
835 cdef int j = 0
836 buf[1] += 1
837 buf[j] *= 2
838 buf[0] -= 4
842 #
843 # Typedefs
844 #
845 # Test three layers of typedefs going through a h file for plain int, and
846 # simply a header file typedef for floats and unsigned.
848 ctypedef int td_cy_int
849 cdef extern from "bufaccess.h":
850 ctypedef td_cy_int td_h_short # Defined as short, but Cython doesn't know this!
851 ctypedef float td_h_double # Defined as double
852 ctypedef unsigned int td_h_ushort # Defined as unsigned short
853 ctypedef td_h_short td_h_cy_short
855 @testcase
856 def printbuf_td_cy_int(object[td_cy_int] buf, shape):
857 """
858 >>> printbuf_td_cy_int(IntMockBuffer(None, range(3)), (3,))
859 0 1 2 END
860 >>> printbuf_td_cy_int(ShortMockBuffer(None, range(3)), (3,))
861 Traceback (most recent call last):
862 ...
863 ValueError: Buffer dtype mismatch (expected bufaccess.td_cy_int, got short)
865 """
866 cdef int i
867 for i in range(shape[0]):
868 print buf[i],
869 print 'END'
871 @testcase
872 def printbuf_td_h_short(object[td_h_short] buf, shape):
873 """
874 >>> printbuf_td_h_short(ShortMockBuffer(None, range(3)), (3,))
875 0 1 2 END
876 >>> printbuf_td_h_short(IntMockBuffer(None, range(3)), (3,))
877 Traceback (most recent call last):
878 ...
879 ValueError: Buffer dtype mismatch (expected bufaccess.td_h_short, got int)
880 """
881 cdef int i
882 for i in range(shape[0]):
883 print buf[i],
884 print 'END'
886 @testcase
887 def printbuf_td_h_cy_short(object[td_h_cy_short] buf, shape):
888 """
889 >>> printbuf_td_h_cy_short(ShortMockBuffer(None, range(3)), (3,))
890 0 1 2 END
891 >>> printbuf_td_h_cy_short(IntMockBuffer(None, range(3)), (3,))
892 Traceback (most recent call last):
893 ...
894 ValueError: Buffer dtype mismatch (expected bufaccess.td_h_cy_short, got int)
895 """
896 cdef int i
897 for i in range(shape[0]):
898 print buf[i],
899 print 'END'
901 @testcase
902 def printbuf_td_h_ushort(object[td_h_ushort] buf, shape):
903 """
904 >>> printbuf_td_h_ushort(UnsignedShortMockBuffer(None, range(3)), (3,))
905 0 1 2 END
906 >>> printbuf_td_h_ushort(ShortMockBuffer(None, range(3)), (3,))
907 Traceback (most recent call last):
908 ...
909 ValueError: Buffer dtype mismatch (expected bufaccess.td_h_ushort, got short)
910 """
911 cdef int i
912 for i in range(shape[0]):
913 print buf[i],
914 print 'END'
916 @testcase
917 def printbuf_td_h_double(object[td_h_double] buf, shape):
918 """
919 >>> printbuf_td_h_double(DoubleMockBuffer(None, [0.25, 1, 3.125]), (3,))
920 0.25 1.0 3.125 END
921 >>> printbuf_td_h_double(FloatMockBuffer(None, [0.25, 1, 3.125]), (3,))
922 Traceback (most recent call last):
923 ...
924 ValueError: Buffer dtype mismatch (expected bufaccess.td_h_double, got float)
925 """
926 cdef int i
927 for i in range(shape[0]):
928 print buf[i],
929 print 'END'
932 #
933 # Object access
934 #
935 from python_ref cimport Py_INCREF, Py_DECREF
936 def addref(*args):
937 for item in args: Py_INCREF(item)
938 def decref(*args):
939 for item in args: Py_DECREF(item)
941 def get_refcount(x):
942 return (<PyObject*>x).ob_refcnt
944 @testcase
945 def printbuf_object(object[object] buf, shape):
946 """
947 Only play with unique objects, interned numbers etc. will have
948 unpredictable refcounts.
950 ObjectMockBuffer doesn't do anything about increfing/decrefing,
951 we to the "buffer implementor" refcounting directly in the
952 testcase.
954 >>> a, b, c = "globally_unique_string_23234123", {4:23}, [34,3]
955 >>> get_refcount(a), get_refcount(b), get_refcount(c)
956 (2, 2, 2)
957 >>> A = ObjectMockBuffer(None, [a, b, c])
958 >>> printbuf_object(A, (3,))
959 'globally_unique_string_23234123' 2
960 {4: 23} 2
961 [34, 3] 2
962 """
963 cdef int i
964 for i in range(shape[0]):
965 print repr(buf[i]), (<PyObject*>buf[i]).ob_refcnt
967 @testcase
968 def assign_to_object(object[object] buf, int idx, obj):
969 """
970 See comments on printbuf_object above.
972 >>> a, b = [1, 2, 3], [4, 5, 6]
973 >>> get_refcount(a), get_refcount(b)
974 (2, 2)
975 >>> addref(a)
976 >>> A = ObjectMockBuffer(None, [1, a]) # 1, ...,otherwise it thinks nested lists...
977 >>> get_refcount(a), get_refcount(b)
978 (3, 2)
979 >>> assign_to_object(A, 1, b)
980 >>> get_refcount(a), get_refcount(b)
981 (2, 3)
982 >>> decref(b)
983 """
984 buf[idx] = obj
986 @testcase
987 def assign_temporary_to_object(object[object] buf):
988 """
989 See comments on printbuf_object above.
991 >>> a, b = [1, 2, 3], {4:23}
992 >>> get_refcount(a)
993 2
994 >>> addref(a)
995 >>> A = ObjectMockBuffer(None, [b, a])
996 >>> get_refcount(a)
997 3
998 >>> assign_temporary_to_object(A)
999 >>> get_refcount(a)
1000 2
1002 >>> printbuf_object(A, (2,))
1003 {4: 23} 2
1004 {1: 8} 2
1006 To avoid leaking a reference in our testcase we need to
1007 replace the temporary with something we can manually decref :-)
1008 >>> assign_to_object(A, 1, a)
1009 >>> decref(a)
1010 """
1011 buf[1] = {3-2: 2+(2*4)-2}
1013 #
1014 # cast option
1015 #
1016 @testcase
1017 def buffer_cast(object[unsigned int, cast=True] buf, int idx):
1018 """
1019 Round-trip a signed int through unsigned int buffer access.
1021 >>> A = IntMockBuffer(None, [-100])
1022 >>> buffer_cast(A, 0)
1023 -100
1024 """
1025 cdef unsigned int data = buf[idx]
1026 return <int>data
1028 @testcase
1029 def buffer_cast_fails(object[char, cast=True] buf):
1030 """
1031 Cannot cast between datatype of different sizes.
1033 >>> buffer_cast_fails(IntMockBuffer(None, [0]))
1034 Traceback (most recent call last):
1035 ...
1036 ValueError: Attempted cast of buffer to datatype of different size.
1037 """
1038 return buf[0]
1041 #
1042 # Testcase support code (more tests below!, because of scope rules)
1043 #
1046 available_flags = (
1047 ('FORMAT', python_buffer.PyBUF_FORMAT),
1048 ('INDIRECT', python_buffer.PyBUF_INDIRECT),
1049 ('ND', python_buffer.PyBUF_ND),
1050 ('STRIDES', python_buffer.PyBUF_STRIDES),
1051 ('C_CONTIGUOUS', python_buffer.PyBUF_C_CONTIGUOUS),
1052 ('F_CONTIGUOUS', python_buffer.PyBUF_F_CONTIGUOUS),
1053 ('WRITABLE', python_buffer.PyBUF_WRITABLE)
1054 )
1056 cdef class MockBuffer:
1057 cdef object format, offset
1058 cdef void* buffer
1059 cdef int len, itemsize, ndim
1060 cdef Py_ssize_t* strides
1061 cdef Py_ssize_t* shape
1062 cdef Py_ssize_t* suboffsets
1063 cdef object label, log
1065 cdef readonly object recieved_flags, release_ok
1066 cdef public object fail
1068 def __init__(self, label, data, shape=None, strides=None, format=None, offset=0):
1069 # It is important not to store references to data after the constructor
1070 # as refcounting is checked on object buffers.
1071 self.label = label
1072 self.release_ok = True
1073 self.log = ""
1074 self.offset = offset
1075 self.itemsize = self.get_itemsize()
1076 if format is None: format = self.get_default_format()
1077 if shape is None: shape = (len(data),)
1078 if strides is None:
1079 strides = []
1080 cumprod = 1
1081 rshape = list(shape)
1082 rshape.reverse()
1083 for s in rshape:
1084 strides.append(cumprod)
1085 cumprod *= s
1086 strides.reverse()
1087 strides = [x * self.itemsize for x in strides]
1088 suboffsets = [-1] * len(shape)
1089 datashape = [len(data)]
1090 p = data
1091 while True:
1092 p = p[0]
1093 if isinstance(p, list): datashape.append(len(p))
1094 else: break
1095 if len(datashape) > 1:
1096 # indirect access
1097 self.ndim = len(datashape)
1098 shape = datashape
1099 self.buffer = self.create_indirect_buffer(data, shape)
1100 suboffsets = [0] * (self.ndim-1) + [-1]
1101 strides = [sizeof(void*)] * (self.ndim-1) + [self.itemsize]
1102 self.suboffsets = self.list_to_sizebuf(suboffsets)
1103 else:
1104 # strided and/or simple access
1105 self.buffer = self.create_buffer(data)
1106 self.ndim = len(shape)
1107 self.suboffsets = NULL
1109 try:
1110 format = format.encode('ASCII')
1111 except AttributeError:
1112 pass
1113 self.format = format
1114 self.len = len(data) * self.itemsize
1116 self.strides = self.list_to_sizebuf(strides)
1117 self.shape = self.list_to_sizebuf(shape)
1119 def __dealloc__(self):
1120 stdlib.free(self.strides)
1121 stdlib.free(self.shape)
1122 if self.suboffsets != NULL:
1123 stdlib.free(self.suboffsets)
1124 # must recursively free indirect...
1125 else:
1126 stdlib.free(self.buffer)
1128 cdef void* create_buffer(self, data):
1129 cdef char* buf = <char*>stdlib.malloc(len(data) * self.itemsize)
1130 cdef char* it = buf
1131 for value in data:
1132 self.write(it, value)
1133 it += self.itemsize
1134 return buf
1136 cdef void* create_indirect_buffer(self, data, shape):
1137 cdef void** buf
1138 assert shape[0] == len(data)
1139 if len(shape) == 1:
1140 return self.create_buffer(data)
1141 else:
1142 shape = shape[1:]
1143 buf = <void**>stdlib.malloc(len(data) * sizeof(void*))
1144 for idx, subdata in enumerate(data):
1145 buf[idx] = self.create_indirect_buffer(subdata, shape)
1146 return buf
1148 cdef Py_ssize_t* list_to_sizebuf(self, l):
1149 cdef Py_ssize_t* buf = <Py_ssize_t*>stdlib.malloc(len(l) * sizeof(Py_ssize_t))
1150 for i, x in enumerate(l):
1151 buf[i] = x
1152 return buf
1154 def __getbuffer__(MockBuffer self, Py_buffer* buffer, int flags):
1155 if self.fail:
1156 raise ValueError("Failing on purpose")
1158 self.recieved_flags = []
1159 cdef int value
1160 for name, value in available_flags:
1161 if (value & flags) == value:
1162 self.recieved_flags.append(name)
1164 buffer.buf = <void*>(<char*>self.buffer + (<int>self.offset * self.itemsize))
1165 buffer.obj = self
1166 buffer.len = self.len
1167 buffer.readonly = 0
1168 buffer.format = <char*>self.format
1169 buffer.ndim = self.ndim
1170 buffer.shape = self.shape
1171 buffer.strides = self.strides
1172 buffer.suboffsets = self.suboffsets
1173 buffer.itemsize = self.itemsize
1174 buffer.internal = NULL
1175 if self.label:
1176 msg = "acquired %s" % self.label
1177 print msg
1178 self.log += msg + "\n"
1180 def __releasebuffer__(MockBuffer self, Py_buffer* buffer):
1181 if buffer.suboffsets != self.suboffsets:
1182 self.release_ok = False
1183 if self.label:
1184 msg = "released %s" % self.label
1185 print msg
1186 self.log += msg + "\n"
1188 def printlog(self):
1189 print self.log[:-1]
1191 def resetlog(self):
1192 self.log = ""
1194 cdef int write(self, char* buf, object value) except -1: raise Exception()
1195 cdef get_itemsize(self):
1196 print "ERROR, not subclassed", self.__class__
1197 cdef get_default_format(self):
1198 print "ERROR, not subclassed", self.__class__
1200 cdef class CharMockBuffer(MockBuffer):
1201 cdef int write(self, char* buf, object value) except -1:
1202 (<char*>buf)[0] = <int>value
1203 return 0
1204 cdef get_itemsize(self): return sizeof(char)
1205 cdef get_default_format(self): return b"@b"
1207 cdef class IntMockBuffer(MockBuffer):
1208 cdef int write(self, char* buf, object value) except -1:
1209 (<int*>buf)[0] = <int>value
1210 return 0
1211 cdef get_itemsize(self): return sizeof(int)
1212 cdef get_default_format(self): return b"@i"
1214 cdef class UnsignedIntMockBuffer(MockBuffer):
1215 cdef int write(self, char* buf, object value) except -1:
1216 (<unsigned int*>buf)[0] = <unsigned int>value
1217 return 0
1218 cdef get_itemsize(self): return sizeof(unsigned int)
1219 cdef get_default_format(self): return b"@I"
1221 cdef class ShortMockBuffer(MockBuffer):
1222 cdef int write(self, char* buf, object value) except -1:
1223 (<short*>buf)[0] = <short>value
1224 return 0
1225 cdef get_itemsize(self): return sizeof(short)
1226 cdef get_default_format(self): return b"h" # Try without endian specifier
1228 cdef class UnsignedShortMockBuffer(MockBuffer):
1229 cdef int write(self, char* buf, object value) except -1:
1230 (<unsigned short*>buf)[0] = <unsigned short>value
1231 return 0
1232 cdef get_itemsize(self): return sizeof(unsigned short)
1233 cdef get_default_format(self): return b"@1H" # Try with repeat count
1235 cdef class FloatMockBuffer(MockBuffer):
1236 cdef int write(self, char* buf, object value) except -1:
1237 (<float*>buf)[0] = <float>value
1238 return 0
1239 cdef get_itemsize(self): return sizeof(float)
1240 cdef get_default_format(self): return b"f"
1242 cdef class DoubleMockBuffer(MockBuffer):
1243 cdef int write(self, char* buf, object value) except -1:
1244 (<double*>buf)[0] = <double>value
1245 return 0
1246 cdef get_itemsize(self): return sizeof(double)
1247 cdef get_default_format(self): return b"d"
1249 cdef extern from *:
1250 void* addr_of_pyobject "(void*)"(object)
1252 cdef class ObjectMockBuffer(MockBuffer):
1253 cdef int write(self, char* buf, object value) except -1:
1254 (<void**>buf)[0] = addr_of_pyobject(value)
1255 return 0
1257 cdef get_itemsize(self): return sizeof(void*)
1258 cdef get_default_format(self): return b"@O"
1261 cdef class IntStridedMockBuffer(IntMockBuffer):
1262 cdef __cythonbufferdefaults__ = {"mode" : "strided"}
1264 cdef class ErrorBuffer:
1265 cdef object label
1267 def __init__(self, label):
1268 self.label = label
1270 def __getbuffer__(ErrorBuffer self, Py_buffer* buffer, int flags):
1271 raise Exception("acquiring %s" % self.label)
1273 def __releasebuffer__(ErrorBuffer self, Py_buffer* buffer):
1274 raise Exception("releasing %s" % self.label)
1276 #
1277 # Typed buffers
1278 #
1279 @testcase
1280 def typedbuffer1(obj):
1281 """
1282 >>> typedbuffer1(IntMockBuffer("A", range(10)))
1283 acquired A
1284 released A
1285 >>> typedbuffer1(None)
1286 >>> typedbuffer1(4)
1287 Traceback (most recent call last):
1288 ...
1289 TypeError: Cannot convert int to bufaccess.IntMockBuffer
1290 """
1291 cdef IntMockBuffer[int, ndim=1] buf = obj
1293 @testcase
1294 def typedbuffer2(IntMockBuffer[int, ndim=1] obj):
1295 """
1296 >>> typedbuffer2(IntMockBuffer("A", range(10)))
1297 acquired A
1298 released A
1299 >>> typedbuffer2(None)
1300 >>> typedbuffer2(4)
1301 Traceback (most recent call last):
1302 ...
1303 TypeError: Argument 'obj' has incorrect type (expected bufaccess.IntMockBuffer, got int)
1304 """
1305 pass
1307 #
1308 # Test __cythonbufferdefaults__
1309 #
1310 @testcase
1311 def bufdefaults1(IntStridedMockBuffer[int, ndim=1] buf):
1312 """
1313 For IntStridedMockBuffer, mode should be
1314 "strided" by defaults which should show
1315 up in the flags.
1317 >>> A = IntStridedMockBuffer("A", range(10))
1318 >>> bufdefaults1(A)
1319 acquired A
1320 released A
1321 >>> [str(x) for x in A.recieved_flags]
1322 ['FORMAT', 'ND', 'STRIDES']
1323 """
1324 pass
1327 #
1328 # Structs
1329 #
1330 cdef struct MyStruct:
1331 char a
1332 char b
1333 long long int c
1334 int d
1335 int e
1337 cdef struct SmallStruct:
1338 int a
1339 int b
1341 cdef struct NestedStruct:
1342 SmallStruct x
1343 SmallStruct y
1344 int z
1346 cdef class MyStructMockBuffer(MockBuffer):
1347 cdef int write(self, char* buf, object value) except -1:
1348 cdef MyStruct* s
1349 s = <MyStruct*>buf;
1350 s.a, s.b, s.c, s.d, s.e = value
1351 return 0
1353 cdef get_itemsize(self): return sizeof(MyStruct)
1354 cdef get_default_format(self): return b"2bq2i"
1356 cdef class NestedStructMockBuffer(MockBuffer):
1357 cdef int write(self, char* buf, object value) except -1:
1358 cdef NestedStruct* s
1359 s = <NestedStruct*>buf;
1360 s.x.a, s.x.b, s.y.a, s.y.b, s.z = value
1361 return 0
1363 cdef get_itemsize(self): return sizeof(NestedStruct)
1364 cdef get_default_format(self): return b"2T{ii}i"
1366 @testcase
1367 def basic_struct(object[MyStruct] buf):
1368 """
1369 >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)]))
1370 1 2 3 4 5
1371 >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="bbqii"))
1372 1 2 3 4 5
1373 >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="23bqii"))
1374 Traceback (most recent call last):
1375 ...
1376 ValueError: Buffer dtype mismatch (expected long long, got char)
1377 >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="i"))
1378 Traceback (most recent call last):
1379 ...
1380 ValueError: Buffer dtype mismatch (expected char, got int)
1381 """
1382 print buf[0].a, buf[0].b, buf[0].c, buf[0].d, buf[0].e
1384 @testcase
1385 def nested_struct(object[NestedStruct] buf):
1386 """
1387 >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)]))
1388 1 2 3 4 5
1389 >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="T{ii}T{2i}i"))
1390 1 2 3 4 5
1391 >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="iiiii"))
1392 Traceback (most recent call last):
1393 ...
1394 ValueError: Buffer dtype mismatch (expected SmallStruct, got int)
1395 >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="T{iii}T{ii}i"))
1396 Traceback (most recent call last):
1397 ...
1398 ValueError: Buffer dtype mismatch (expected end of SmallStruct struct, got int)
1399 """
1400 print buf[0].x.a, buf[0].x.b, buf[0].y.a, buf[0].y.b, buf[0].z
1403 cdef struct LongComplex:
1404 long double real
1405 long double imag
1407 cdef struct MixedComplex:
1408 long double real
1409 float imag
1411 cdef class LongComplexMockBuffer(MockBuffer):
1412 cdef int write(self, char* buf, object value) except -1:
1413 cdef LongComplex* s
1414 s = <LongComplex*>buf;
1415 s.real, s.imag = value
1416 return 0
1418 cdef get_itemsize(self): return sizeof(LongComplex)
1419 cdef get_default_format(self): return b"Zg"
1421 @testcase
1422 def complex_struct_dtype(object[LongComplex] buf):
1423 """
1424 Note that the format string is "Zg" rather than "2g"...
1425 >>> complex_struct_dtype(LongComplexMockBuffer(None, [(0, -1)]))
1426 0.0 -1.0
1427 """
1428 print buf[0].real, buf[0].imag
1430 @testcase
1431 def mixed_complex_struct_dtype(object[MixedComplex] buf):
1432 """
1433 Triggering a specific execution path for this case.
1435 >>> mixed_complex_struct_dtype(LongComplexMockBuffer(None, [(0, -1)]))
1436 Traceback (most recent call last):
1437 ...
1438 ValueError: Cannot store complex number in 'MixedComplex' as 'long double' differs from 'float' in size.
1439 """
1440 print buf[0].real, buf[0].imag
1442 @testcase
1443 def complex_struct_inplace(object[LongComplex] buf):
1444 """
1445 >>> complex_struct_inplace(LongComplexMockBuffer(None, [(0, -1)]))
1446 1.0 1.0
1447 """
1448 buf[0].real += 1
1449 buf[0].imag += 2
1450 print buf[0].real, buf[0].imag
