Cython has moved to github.
cython
view tests/run/bufaccess.pyx @ 1586:4a96e1aff2d4
Fix error in buffer typestring checking
| author | Dag Sverre Seljebotn <dagss@student.matnat.uio.no> |
|---|---|
| date | Tue Dec 16 10:02:51 2008 +0100 (3 years ago) |
| parents | d0c4b0a150ca |
| children | 136bd9fa09a7 |
line source
1 # Tests the buffer access syntax functionality by constructing
2 # mock buffer objects.
3 #
4 # Note that the buffers are mock objects created for testing
5 # the buffer access behaviour -- for instance there is no flag
6 # checking in the buffer objects (why test our test case?), rather
7 # what we want to test is what is passed into the flags argument.
8 #
10 from __future__ import unicode_literals
12 cimport stdlib
13 cimport python_buffer
14 cimport stdio
15 cimport cython
17 from python_ref cimport PyObject
19 __test__ = {}
21 import re
22 exclude = []#re.compile('object').search]
24 def testcase(func):
25 for e in exclude:
26 if e(func.__name__):
27 return func
28 __test__[func.__name__] = func.__doc__
29 return func
31 def testcas(a):
32 pass
35 #
36 # Buffer acquire and release tests
37 #
39 def nousage():
40 """
41 The challenge here is just compilation.
42 """
43 cdef object[int, ndim=2] buf
45 def printbuf():
46 """
47 Just compilation.
48 """
49 cdef object[int, ndim=2] buf
50 print buf
52 @testcase
53 def acquire_release(o1, o2):
54 """
55 >>> A = IntMockBuffer("A", range(6))
56 >>> B = IntMockBuffer("B", range(6))
57 >>> acquire_release(A, B)
58 acquired A
59 released A
60 acquired B
61 released B
62 >>> acquire_release(None, None)
63 >>> acquire_release(None, B)
64 acquired B
65 released B
66 """
67 cdef object[int] buf
68 buf = o1
69 buf = o2
71 @testcase
72 def acquire_raise(o):
73 """
74 Apparently, doctest won't handle mixed exceptions and print
75 stats, so need to circumvent this.
77 >>> A = IntMockBuffer("A", range(6))
78 >>> A.resetlog()
79 >>> acquire_raise(A)
80 Traceback (most recent call last):
81 ...
82 Exception: on purpose
83 >>> A.printlog()
84 acquired A
85 released A
87 """
88 cdef object[int] buf
89 buf = o
90 raise Exception("on purpose")
92 @testcase
93 def acquire_failure1():
94 """
95 >>> acquire_failure1()
96 acquired working
97 0 3
98 0 3
99 released working
100 """
101 cdef object[int] buf
102 buf = IntMockBuffer("working", range(4))
103 print buf[0], buf[3]
104 try:
105 buf = ErrorBuffer()
106 assert False
107 except Exception:
108 print buf[0], buf[3]
110 @testcase
111 def acquire_failure2():
112 """
113 >>> acquire_failure2()
114 acquired working
115 0 3
116 0 3
117 released working
118 """
119 cdef object[int] buf = IntMockBuffer("working", range(4))
120 print buf[0], buf[3]
121 try:
122 buf = ErrorBuffer()
123 assert False
124 except Exception:
125 print buf[0], buf[3]
127 @testcase
128 def acquire_failure3():
129 """
130 >>> acquire_failure3()
131 acquired working
132 0 3
133 released working
134 acquired working
135 0 3
136 released working
137 """
138 cdef object[int] buf
139 buf = IntMockBuffer("working", range(4))
140 print buf[0], buf[3]
141 try:
142 buf = 3
143 assert False
144 except Exception:
145 print buf[0], buf[3]
147 @testcase
148 def acquire_failure4():
149 """
150 >>> acquire_failure4()
151 acquired working
152 0 3
153 released working
154 acquired working
155 0 3
156 released working
157 """
158 cdef object[int] buf = IntMockBuffer("working", range(4))
159 print buf[0], buf[3]
160 try:
161 buf = 2
162 assert False
163 except Exception:
164 print buf[0], buf[3]
166 @testcase
167 def acquire_failure5():
168 """
169 >>> acquire_failure5()
170 Traceback (most recent call last):
171 ...
172 ValueError: Buffer acquisition failed on assignment; and then reacquiring the old buffer failed too!
173 """
174 cdef object[int] buf
175 buf = IntMockBuffer("working", range(4))
176 buf.fail = True
177 buf = 3
180 @testcase
181 def acquire_nonbuffer1(first, second=None):
182 """
183 >>> acquire_nonbuffer1(3)
184 Traceback (most recent call last):
185 ...
186 TypeError: 'int' does not have the buffer interface
187 >>> acquire_nonbuffer1(type)
188 Traceback (most recent call last):
189 ...
190 TypeError: 'type' does not have the buffer interface
191 >>> acquire_nonbuffer1(None, 2)
192 Traceback (most recent call last):
193 ...
194 TypeError: 'int' does not have the buffer interface
195 """
196 cdef object[int] buf
197 buf = first
198 buf = second
200 @testcase
201 def acquire_nonbuffer2():
202 """
203 >>> acquire_nonbuffer2()
204 acquired working
205 0 3
206 released working
207 acquired working
208 0 3
209 released working
210 """
211 cdef object[int] buf = IntMockBuffer("working", range(4))
212 print buf[0], buf[3]
213 try:
214 buf = ErrorBuffer
215 assert False
216 except Exception:
217 print buf[0], buf[3]
220 @testcase
221 def as_argument(object[int] bufarg, int n):
222 """
223 >>> A = IntMockBuffer("A", range(6))
224 >>> as_argument(A, 6)
225 acquired A
226 0 1 2 3 4 5 END
227 released A
228 """
229 cdef int i
230 for i in range(n):
231 print bufarg[i],
232 print 'END'
234 @testcase
235 def as_argument_defval(object[int] bufarg=IntMockBuffer('default', range(6)), int n=6):
236 """
237 >>> as_argument_defval()
238 acquired default
239 0 1 2 3 4 5 END
240 released default
241 >>> A = IntMockBuffer("A", range(6))
242 >>> as_argument_defval(A, 6)
243 acquired A
244 0 1 2 3 4 5 END
245 released A
246 """
247 cdef int i
248 for i in range(n):
249 print bufarg[i],
250 print 'END'
252 @testcase
253 def cdef_assignment(obj, n):
254 """
255 >>> A = IntMockBuffer("A", range(6))
256 >>> cdef_assignment(A, 6)
257 acquired A
258 0 1 2 3 4 5 END
259 released A
261 """
262 cdef object[int] buf = obj
263 cdef int i
264 for i in range(n):
265 print buf[i],
266 print 'END'
268 @testcase
269 def forin_assignment(objs, int pick):
270 """
271 >>> A = IntMockBuffer("A", range(6))
272 >>> B = IntMockBuffer("B", range(6))
273 >>> forin_assignment([A, B, A, A], 2)
274 acquired A
275 2
276 released A
277 acquired B
278 2
279 released B
280 acquired A
281 2
282 released A
283 acquired A
284 2
285 released A
286 """
287 cdef object[int] buf
288 for buf in objs:
289 print buf[pick]
291 @testcase
292 def cascaded_buffer_assignment(obj):
293 """
294 >>> A = IntMockBuffer("A", range(6))
295 >>> cascaded_buffer_assignment(A)
296 acquired A
297 acquired A
298 released A
299 released A
300 """
301 cdef object[int] a, b
302 a = b = obj
304 @testcase
305 def tuple_buffer_assignment1(a, b):
306 """
307 >>> A = IntMockBuffer("A", range(6))
308 >>> B = IntMockBuffer("B", range(6))
309 >>> tuple_buffer_assignment1(A, B)
310 acquired A
311 acquired B
312 released A
313 released B
314 """
315 cdef object[int] x, y
316 x, y = a, b
318 @testcase
319 def tuple_buffer_assignment2(tup):
320 """
321 >>> A = IntMockBuffer("A", range(6))
322 >>> B = IntMockBuffer("B", range(6))
323 >>> tuple_buffer_assignment2((A, B))
324 acquired A
325 acquired B
326 released A
327 released B
328 """
329 cdef object[int] x, y
330 x, y = tup
332 @testcase
333 def explicitly_release_buffer():
334 """
335 >>> explicitly_release_buffer()
336 acquired A
337 released A
338 After release
339 """
340 cdef object[int] x = IntMockBuffer("A", range(10))
341 x = None
342 print "After release"
344 #
345 # Format strings
346 #
347 @testcase
348 def alignment_string(object[int] buf):
349 """
350 >>> alignment_string(IntMockBuffer(None, [1,2], format="@i"))
351 2
352 >>> alignment_string(IntMockBuffer(None, [1,2], format="@i@@"))
353 2
354 >>> alignment_string(IntMockBuffer(None, [1,2], format=">i"))
355 Traceback (most recent call last):
356 ...
357 ValueError: Buffer acquisition error: Only native byte order, size and alignment supported.
358 >>> alignment_string(IntMockBuffer(None, [1,2], format="<i"))
359 Traceback (most recent call last):
360 ...
361 ValueError: Buffer acquisition error: Only native byte order, size and alignment supported.
362 >>> alignment_string(IntMockBuffer(None, [1,2], format="=i"))
363 Traceback (most recent call last):
364 ...
365 ValueError: Buffer acquisition error: Only native byte order, size and alignment supported.
366 >>> alignment_string(IntMockBuffer(None, [1,2], format="!i"))
367 Traceback (most recent call last):
368 ...
369 ValueError: Buffer acquisition error: Only native byte order, size and alignment supported.
370 """
371 print buf[1]
373 @testcase
374 def wrong_string(object[int] buf):
375 """
376 >>> wrong_string(IntMockBuffer(None, [1,2], format="if"))
377 Traceback (most recent call last):
378 ...
379 ValueError: Buffer dtype mismatch (expected end, got float)
380 >>> wrong_string(IntMockBuffer(None, [1,2], format="$$"))
381 Traceback (most recent call last):
382 ...
383 ValueError: Buffer dtype mismatch (expected int, got unparseable format string)
384 """
385 print buf[1]
387 @testcase
388 def int_and_long_are_same():
389 """
390 >>> int_and_long_are_same()
391 """
392 cdef object[int] intarr
393 cdef object[long] longarr
394 if sizeof(int) == sizeof(long):
395 intarr = IntMockBuffer(None, [1,2], format='l')
396 longarr = IntMockBuffer(None, [1,2])
398 #
399 # Getting items and index bounds checking
400 #
401 @testcase
402 def get_int_2d(object[int, ndim=2] buf, int i, int j):
403 """
404 >>> C = IntMockBuffer("C", range(6), (2,3))
405 >>> get_int_2d(C, 1, 1)
406 acquired C
407 released C
408 4
410 Check negative indexing:
411 >>> get_int_2d(C, -1, 0)
412 acquired C
413 released C
414 3
415 >>> get_int_2d(C, -1, -2)
416 acquired C
417 released C
418 4
419 >>> get_int_2d(C, -2, -3)
420 acquired C
421 released C
422 0
424 Out-of-bounds errors:
425 >>> get_int_2d(C, 2, 0)
426 Traceback (most recent call last):
427 ...
428 IndexError: Out of bounds on buffer access (axis 0)
429 >>> get_int_2d(C, 0, -4)
430 Traceback (most recent call last):
431 ...
432 IndexError: Out of bounds on buffer access (axis 1)
433 """
434 return buf[i, j]
436 @testcase
437 def get_int_2d_uintindex(object[int, ndim=2] buf, unsigned int i, unsigned int j):
438 """
439 Unsigned indexing:
440 >>> C = IntMockBuffer("C", range(6), (2,3))
441 >>> get_int_2d_uintindex(C, 0, 0)
442 acquired C
443 released C
444 0
445 >>> get_int_2d_uintindex(C, 1, 2)
446 acquired C
447 released C
448 5
449 """
450 # This is most interesting with regards to the C code
451 # generated.
452 return buf[i, j]
454 @testcase
455 def set_int_2d(object[int, ndim=2] buf, int i, int j, int value):
456 """
457 Uses get_int_2d to read back the value afterwards. For pure
458 unit test, one should support reading in MockBuffer instead.
460 >>> C = IntMockBuffer("C", range(6), (2,3))
461 >>> set_int_2d(C, 1, 1, 10)
462 acquired C
463 released C
464 >>> get_int_2d(C, 1, 1)
465 acquired C
466 released C
467 10
469 Check negative indexing:
470 >>> set_int_2d(C, -1, 0, 3)
471 acquired C
472 released C
473 >>> get_int_2d(C, -1, 0)
474 acquired C
475 released C
476 3
478 >>> set_int_2d(C, -1, -2, 8)
479 acquired C
480 released C
481 >>> get_int_2d(C, -1, -2)
482 acquired C
483 released C
484 8
486 >>> set_int_2d(C, -2, -3, 9)
487 acquired C
488 released C
489 >>> get_int_2d(C, -2, -3)
490 acquired C
491 released C
492 9
494 Out-of-bounds errors:
495 >>> set_int_2d(C, 2, 0, 19)
496 Traceback (most recent call last):
497 ...
498 IndexError: Out of bounds on buffer access (axis 0)
499 >>> set_int_2d(C, 0, -4, 19)
500 Traceback (most recent call last):
501 ...
502 IndexError: Out of bounds on buffer access (axis 1)
504 """
505 buf[i, j] = value
507 @testcase
508 def list_comprehension(object[int] buf, len):
509 """
510 >>> list_comprehension(IntMockBuffer(None, [1,2,3]), 3)
511 1|2|3
512 """
513 cdef int i
514 print u"|".join([unicode(buf[i]) for i in range(len)])
516 #
517 # The negative_indices buffer option
518 #
519 @testcase
520 def no_negative_indices(object[int, negative_indices=False] buf, int idx):
521 """
522 The most interesting thing here is to inspect the C source and
523 make sure optimal code is produced.
525 >>> A = IntMockBuffer(None, range(6))
526 >>> no_negative_indices(A, 3)
527 3
528 >>> no_negative_indices(A, -1)
529 Traceback (most recent call last):
530 ...
531 IndexError: Out of bounds on buffer access (axis 0)
532 """
533 return buf[idx]
535 #
536 # Buffer type mismatch examples. Varying the type and access
537 # method simultaneously, the odds of an interaction is virtually
538 # zero.
539 #
540 @testcase
541 def fmtst1(buf):
542 """
543 >>> fmtst1(IntMockBuffer("A", range(3)))
544 Traceback (most recent call last):
545 ...
546 ValueError: Buffer dtype mismatch (expected float, got int)
547 """
548 cdef object[float] a = buf
550 @testcase
551 def fmtst2(object[int] buf):
552 """
553 >>> fmtst2(FloatMockBuffer("A", range(3)))
554 Traceback (most recent call last):
555 ...
556 ValueError: Buffer dtype mismatch (expected int, got float)
557 """
559 @testcase
560 def ndim1(object[int, ndim=2] buf):
561 """
562 >>> ndim1(IntMockBuffer("A", range(3)))
563 Traceback (most recent call last):
564 ...
565 ValueError: Buffer has wrong number of dimensions (expected 2, got 1)
566 """
568 #
569 # Test which flags are passed.
570 #
571 @testcase
572 def readonly(obj):
573 """
574 >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
575 >>> readonly(R)
576 acquired R
577 25
578 released R
579 >>> [str(x) for x in R.recieved_flags] # Works in both py2 and py3
580 ['FORMAT', 'INDIRECT', 'ND', 'STRIDES']
581 """
582 cdef object[unsigned short int, ndim=3] buf = obj
583 print buf[2, 2, 1]
585 @testcase
586 def writable(obj):
587 """
588 >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
589 >>> writable(R)
590 acquired R
591 released R
592 >>> [str(x) for x in R.recieved_flags] # Py2/3
593 ['FORMAT', 'INDIRECT', 'ND', 'STRIDES', 'WRITABLE']
594 """
595 cdef object[unsigned short int, ndim=3] buf = obj
596 buf[2, 2, 1] = 23
598 @testcase
599 def strided(object[int, ndim=1, mode='strided'] buf):
600 """
601 >>> A = IntMockBuffer("A", range(4))
602 >>> strided(A)
603 acquired A
604 released A
605 2
606 >>> [str(x) for x in A.recieved_flags] # Py2/3
607 ['FORMAT', 'ND', 'STRIDES']
609 Check that the suboffsets were patched back prior to release.
610 >>> A.release_ok
611 True
612 """
613 return buf[2]
615 @testcase
616 def c_contig(object[int, ndim=1, mode='c'] buf):
617 """
618 >>> A = IntMockBuffer(None, range(4))
619 >>> c_contig(A)
620 2
621 >>> [str(x) for x in A.recieved_flags]
622 ['FORMAT', 'ND', 'STRIDES', 'C_CONTIGUOUS']
623 """
624 return buf[2]
626 @testcase
627 def c_contig_2d(object[int, ndim=2, mode='c'] buf):
628 """
629 Multi-dim has seperate implementation
631 >>> A = IntMockBuffer(None, range(12), shape=(3,4))
632 >>> c_contig_2d(A)
633 7
634 >>> [str(x) for x in A.recieved_flags]
635 ['FORMAT', 'ND', 'STRIDES', 'C_CONTIGUOUS']
636 """
637 return buf[1, 3]
639 @testcase
640 def f_contig(object[int, ndim=1, mode='fortran'] buf):
641 """
642 >>> A = IntMockBuffer(None, range(4))
643 >>> f_contig(A)
644 2
645 >>> [str(x) for x in A.recieved_flags]
646 ['FORMAT', 'ND', 'STRIDES', 'F_CONTIGUOUS']
647 """
648 return buf[2]
650 @testcase
651 def f_contig_2d(object[int, ndim=2, mode='fortran'] buf):
652 """
653 Must set up strides manually to ensure Fortran ordering.
655 >>> A = IntMockBuffer(None, range(12), shape=(4,3), strides=(1, 4))
656 >>> f_contig_2d(A)
657 7
658 >>> [str(x) for x in A.recieved_flags]
659 ['FORMAT', 'ND', 'STRIDES', 'F_CONTIGUOUS']
660 """
661 return buf[3, 1]
663 #
664 # Test compiler options for bounds checking. We create an array with a
665 # safe "boundary" (memory
666 # allocated outside of what it published) and then check whether we get back
667 # what we stored in the memory or an error.
669 @testcase
670 def safe_get(object[int] buf, int idx):
671 """
672 >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
674 Validate our testing buffer...
675 >>> safe_get(A, 0)
676 5
677 >>> safe_get(A, 2)
678 7
679 >>> safe_get(A, -3)
680 5
682 Access outside it. This is already done above for bounds check
683 testing but we include it to tell the story right.
685 >>> safe_get(A, -4)
686 Traceback (most recent call last):
687 ...
688 IndexError: Out of bounds on buffer access (axis 0)
689 >>> safe_get(A, 3)
690 Traceback (most recent call last):
691 ...
692 IndexError: Out of bounds on buffer access (axis 0)
693 """
694 return buf[idx]
696 @testcase
697 @cython.boundscheck(False) # outer decorators should take precedence
698 @cython.boundscheck(True)
699 def unsafe_get(object[int] buf, int idx):
700 """
701 Access outside of the area the buffer publishes.
702 >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
703 >>> unsafe_get(A, -4)
704 4
705 >>> unsafe_get(A, -5)
706 3
707 >>> unsafe_get(A, 3)
708 8
709 """
710 return buf[idx]
712 @testcase
713 @cython.boundscheck(False)
714 def unsafe_get_nonegative(object[int, negative_indices=False] buf, int idx):
715 """
716 Also inspect the C source to see that it is optimal...
718 >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
719 >>> unsafe_get_nonegative(A, -2)
720 3
721 """
722 return buf[idx]
724 @testcase
725 def mixed_get(object[int] buf, int unsafe_idx, int safe_idx):
726 """
727 >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
728 >>> mixed_get(A, -4, 0)
729 (4, 5)
730 >>> mixed_get(A, 0, -4)
731 Traceback (most recent call last):
732 ...
733 IndexError: Out of bounds on buffer access (axis 0)
734 """
735 with cython.boundscheck(False):
736 one = buf[unsafe_idx]
737 with cython.boundscheck(True):
738 two = buf[safe_idx]
739 return (one, two)
741 #
742 # Coercions
743 #
744 @testcase
745 def coercions(object[unsigned char] uc):
746 """
747 TODO
748 """
749 print type(uc[0])
750 uc[0] = -1
751 print uc[0]
752 uc[0] = <int>3.14
753 print uc[0]
755 cdef char* ch = "asfd"
756 cdef object[object] objbuf
757 objbuf[3] = ch
760 #
761 # Testing that accessing data using various types of buffer access
762 # all works.
763 #
765 def printbuf_int(object[int] buf, shape):
766 # Utility func
767 cdef int i
768 for i in range(shape[0]):
769 print buf[i],
770 print 'END'
773 @testcase
774 def printbuf_int_2d(o, shape):
775 """
776 Strided:
778 >>> printbuf_int_2d(IntMockBuffer("A", range(6), (2,3)), (2,3))
779 acquired A
780 0 1 2 END
781 3 4 5 END
782 released A
783 >>> printbuf_int_2d(IntMockBuffer("A", range(100), (3,3), strides=(20,5)), (3,3))
784 acquired A
785 0 5 10 END
786 20 25 30 END
787 40 45 50 END
788 released A
790 Indirect:
791 >>> printbuf_int_2d(IntMockBuffer("A", [[1,2],[3,4]]), (2,2))
792 acquired A
793 1 2 END
794 3 4 END
795 released A
796 """
797 # should make shape builtin
798 cdef object[int, ndim=2] buf
799 buf = o
800 cdef int i, j
801 for i in range(shape[0]):
802 for j in range(shape[1]):
803 print buf[i, j],
804 print 'END'
806 @testcase
807 def printbuf_float(o, shape):
808 """
809 >>> printbuf_float(FloatMockBuffer("F", [1.0, 1.25, 0.75, 1.0]), (4,))
810 acquired F
811 1.0 1.25 0.75 1.0 END
812 released F
813 """
815 # should make shape builtin
816 cdef object[float] buf
817 buf = o
818 cdef int i, j
819 for i in range(shape[0]):
820 print buf[i],
821 print "END"
824 #
825 # Test assignments
826 #
827 @testcase
828 def inplace_operators(object[int] buf):
829 """
830 >>> buf = IntMockBuffer(None, [2, 2])
831 >>> inplace_operators(buf)
832 >>> printbuf_int(buf, (2,))
833 0 3 END
834 """
835 cdef int j = 0
836 buf[1] += 1
837 buf[j] *= 2
838 buf[0] -= 4
842 #
843 # Typedefs
844 #
845 # Test three layers of typedefs going through a h file for plain int, and
846 # simply a header file typedef for floats and unsigned.
848 ctypedef int td_cy_int
849 cdef extern from "bufaccess.h":
850 ctypedef td_cy_int td_h_short # Defined as short, but Cython doesn't know this!
851 ctypedef float td_h_double # Defined as double
852 ctypedef unsigned int td_h_ushort # Defined as unsigned short
853 ctypedef td_h_short td_h_cy_short
855 @testcase
856 def printbuf_td_cy_int(object[td_cy_int] buf, shape):
857 """
858 >>> printbuf_td_cy_int(IntMockBuffer(None, range(3)), (3,))
859 0 1 2 END
860 >>> printbuf_td_cy_int(ShortMockBuffer(None, range(3)), (3,))
861 Traceback (most recent call last):
862 ...
863 ValueError: Buffer dtype mismatch (expected bufaccess.td_cy_int, got short)
865 """
866 cdef int i
867 for i in range(shape[0]):
868 print buf[i],
869 print 'END'
871 @testcase
872 def printbuf_td_h_short(object[td_h_short] buf, shape):
873 """
874 >>> printbuf_td_h_short(ShortMockBuffer(None, range(3)), (3,))
875 0 1 2 END
876 >>> printbuf_td_h_short(IntMockBuffer(None, range(3)), (3,))
877 Traceback (most recent call last):
878 ...
879 ValueError: Buffer dtype mismatch (expected bufaccess.td_h_short, got int)
880 """
881 cdef int i
882 for i in range(shape[0]):
883 print buf[i],
884 print 'END'
886 @testcase
887 def printbuf_td_h_cy_short(object[td_h_cy_short] buf, shape):
888 """
889 >>> printbuf_td_h_cy_short(ShortMockBuffer(None, range(3)), (3,))
890 0 1 2 END
891 >>> printbuf_td_h_cy_short(IntMockBuffer(None, range(3)), (3,))
892 Traceback (most recent call last):
893 ...
894 ValueError: Buffer dtype mismatch (expected bufaccess.td_h_cy_short, got int)
895 """
896 cdef int i
897 for i in range(shape[0]):
898 print buf[i],
899 print 'END'
901 @testcase
902 def printbuf_td_h_ushort(object[td_h_ushort] buf, shape):
903 """
904 >>> printbuf_td_h_ushort(UnsignedShortMockBuffer(None, range(3)), (3,))
905 0 1 2 END
906 >>> printbuf_td_h_ushort(ShortMockBuffer(None, range(3)), (3,))
907 Traceback (most recent call last):
908 ...
909 ValueError: Buffer dtype mismatch (expected bufaccess.td_h_ushort, got short)
910 """
911 cdef int i
912 for i in range(shape[0]):
913 print buf[i],
914 print 'END'
916 @testcase
917 def printbuf_td_h_double(object[td_h_double] buf, shape):
918 """
919 >>> printbuf_td_h_double(DoubleMockBuffer(None, [0.25, 1, 3.125]), (3,))
920 0.25 1.0 3.125 END
921 >>> printbuf_td_h_double(FloatMockBuffer(None, [0.25, 1, 3.125]), (3,))
922 Traceback (most recent call last):
923 ...
924 ValueError: Buffer dtype mismatch (expected bufaccess.td_h_double, got float)
925 """
926 cdef int i
927 for i in range(shape[0]):
928 print buf[i],
929 print 'END'
932 #
933 # Object access
934 #
935 from python_ref cimport Py_INCREF, Py_DECREF
936 def addref(*args):
937 for item in args: Py_INCREF(item)
938 def decref(*args):
939 for item in args: Py_DECREF(item)
941 def get_refcount(x):
942 return (<PyObject*>x).ob_refcnt
944 @testcase
945 def printbuf_object(object[object] buf, shape):
946 """
947 Only play with unique objects, interned numbers etc. will have
948 unpredictable refcounts.
950 ObjectMockBuffer doesn't do anything about increfing/decrefing,
951 we to the "buffer implementor" refcounting directly in the
952 testcase.
954 >>> a, b, c = "globally_unique_string_23234123", {4:23}, [34,3]
955 >>> get_refcount(a), get_refcount(b), get_refcount(c)
956 (2, 2, 2)
957 >>> A = ObjectMockBuffer(None, [a, b, c])
958 >>> printbuf_object(A, (3,))
959 'globally_unique_string_23234123' 2
960 {4: 23} 2
961 [34, 3] 2
962 """
963 cdef int i
964 for i in range(shape[0]):
965 print repr(buf[i]), (<PyObject*>buf[i]).ob_refcnt
967 @testcase
968 def assign_to_object(object[object] buf, int idx, obj):
969 """
970 See comments on printbuf_object above.
972 >>> a, b = [1, 2, 3], [4, 5, 6]
973 >>> get_refcount(a), get_refcount(b)
974 (2, 2)
975 >>> addref(a)
976 >>> A = ObjectMockBuffer(None, [1, a]) # 1, ...,otherwise it thinks nested lists...
977 >>> get_refcount(a), get_refcount(b)
978 (3, 2)
979 >>> assign_to_object(A, 1, b)
980 >>> get_refcount(a), get_refcount(b)
981 (2, 3)
982 >>> decref(b)
983 """
984 buf[idx] = obj
986 #
987 # cast option
988 #
989 @testcase
990 def buffer_cast(object[unsigned int, cast=True] buf, int idx):
991 """
992 Round-trip a signed int through unsigned int buffer access.
994 >>> A = IntMockBuffer(None, [-100])
995 >>> buffer_cast(A, 0)
996 -100
997 """
998 cdef unsigned int data = buf[idx]
999 return <int>data
1001 @testcase
1002 def buffer_cast_fails(object[char, cast=True] buf):
1003 """
1004 Cannot cast between datatype of different sizes.
1006 >>> buffer_cast_fails(IntMockBuffer(None, [0]))
1007 Traceback (most recent call last):
1008 ...
1009 ValueError: Attempted cast of buffer to datatype of different size.
1010 """
1011 return buf[0]
1014 #
1015 # Testcase support code (more tests below!, because of scope rules)
1016 #
1019 available_flags = (
1020 ('FORMAT', python_buffer.PyBUF_FORMAT),
1021 ('INDIRECT', python_buffer.PyBUF_INDIRECT),
1022 ('ND', python_buffer.PyBUF_ND),
1023 ('STRIDES', python_buffer.PyBUF_STRIDES),
1024 ('C_CONTIGUOUS', python_buffer.PyBUF_C_CONTIGUOUS),
1025 ('F_CONTIGUOUS', python_buffer.PyBUF_F_CONTIGUOUS),
1026 ('WRITABLE', python_buffer.PyBUF_WRITABLE)
1027 )
1029 cdef class MockBuffer:
1030 cdef object format, offset
1031 cdef void* buffer
1032 cdef int len, itemsize, ndim
1033 cdef Py_ssize_t* strides
1034 cdef Py_ssize_t* shape
1035 cdef Py_ssize_t* suboffsets
1036 cdef object label, log
1038 cdef readonly object recieved_flags, release_ok
1039 cdef public object fail
1041 def __init__(self, label, data, shape=None, strides=None, format=None, offset=0):
1042 # It is important not to store references to data after the constructor
1043 # as refcounting is checked on object buffers.
1044 self.label = label
1045 self.release_ok = True
1046 self.log = ""
1047 self.offset = offset
1048 self.itemsize = self.get_itemsize()
1049 if format is None: format = self.get_default_format()
1050 if shape is None: shape = (len(data),)
1051 if strides is None:
1052 strides = []
1053 cumprod = 1
1054 rshape = list(shape)
1055 rshape.reverse()
1056 for s in rshape:
1057 strides.append(cumprod)
1058 cumprod *= s
1059 strides.reverse()
1060 strides = [x * self.itemsize for x in strides]
1061 suboffsets = [-1] * len(shape)
1062 datashape = [len(data)]
1063 p = data
1064 while True:
1065 p = p[0]
1066 if isinstance(p, list): datashape.append(len(p))
1067 else: break
1068 if len(datashape) > 1:
1069 # indirect access
1070 self.ndim = len(datashape)
1071 shape = datashape
1072 self.buffer = self.create_indirect_buffer(data, shape)
1073 suboffsets = [0] * (self.ndim-1) + [-1]
1074 strides = [sizeof(void*)] * (self.ndim-1) + [self.itemsize]
1075 self.suboffsets = self.list_to_sizebuf(suboffsets)
1076 else:
1077 # strided and/or simple access
1078 self.buffer = self.create_buffer(data)
1079 self.ndim = len(shape)
1080 self.suboffsets = NULL
1082 self.format = format
1083 self.len = len(data) * self.itemsize
1085 self.strides = self.list_to_sizebuf(strides)
1086 self.shape = self.list_to_sizebuf(shape)
1088 def __dealloc__(self):
1089 stdlib.free(self.strides)
1090 stdlib.free(self.shape)
1091 if self.suboffsets != NULL:
1092 stdlib.free(self.suboffsets)
1093 # must recursively free indirect...
1094 else:
1095 stdlib.free(self.buffer)
1097 cdef void* create_buffer(self, data):
1098 cdef char* buf = <char*>stdlib.malloc(len(data) * self.itemsize)
1099 cdef char* it = buf
1100 for value in data:
1101 self.write(it, value)
1102 it += self.itemsize
1103 return buf
1105 cdef void* create_indirect_buffer(self, data, shape):
1106 cdef void** buf
1107 assert shape[0] == len(data)
1108 if len(shape) == 1:
1109 return self.create_buffer(data)
1110 else:
1111 shape = shape[1:]
1112 buf = <void**>stdlib.malloc(len(data) * sizeof(void*))
1113 for idx, subdata in enumerate(data):
1114 buf[idx] = self.create_indirect_buffer(subdata, shape)
1115 return buf
1117 cdef Py_ssize_t* list_to_sizebuf(self, l):
1118 cdef Py_ssize_t* buf = <Py_ssize_t*>stdlib.malloc(len(l) * sizeof(Py_ssize_t))
1119 for i, x in enumerate(l):
1120 buf[i] = x
1121 return buf
1123 def __getbuffer__(MockBuffer self, Py_buffer* buffer, int flags):
1124 if self.fail:
1125 raise ValueError("Failing on purpose")
1127 self.recieved_flags = []
1128 cdef int value
1129 for name, value in available_flags:
1130 if (value & flags) == value:
1131 self.recieved_flags.append(name)
1133 buffer.buf = <void*>(<char*>self.buffer + (<int>self.offset * self.itemsize))
1134 buffer.obj = self
1135 buffer.len = self.len
1136 buffer.readonly = 0
1137 buffer.format = <char*>self.format
1138 buffer.ndim = self.ndim
1139 buffer.shape = self.shape
1140 buffer.strides = self.strides
1141 buffer.suboffsets = self.suboffsets
1142 buffer.itemsize = self.itemsize
1143 buffer.internal = NULL
1144 if self.label:
1145 msg = "acquired %s" % self.label
1146 print msg
1147 self.log += msg + "\n"
1149 def __releasebuffer__(MockBuffer self, Py_buffer* buffer):
1150 if buffer.suboffsets != self.suboffsets:
1151 self.release_ok = False
1152 if self.label:
1153 msg = "released %s" % self.label
1154 print msg
1155 self.log += msg + "\n"
1157 def printlog(self):
1158 print self.log[:-1]
1160 def resetlog(self):
1161 self.log = ""
1163 cdef int write(self, char* buf, object value) except -1: raise Exception()
1164 cdef get_itemsize(self):
1165 print "ERROR, not subclassed", self.__class__
1166 cdef get_default_format(self):
1167 print "ERROR, not subclassed", self.__class__
1169 cdef class CharMockBuffer(MockBuffer):
1170 cdef int write(self, char* buf, object value) except -1:
1171 (<char*>buf)[0] = <int>value
1172 return 0
1173 cdef get_itemsize(self): return sizeof(char)
1174 cdef get_default_format(self): return b"@b"
1176 cdef class IntMockBuffer(MockBuffer):
1177 cdef int write(self, char* buf, object value) except -1:
1178 (<int*>buf)[0] = <int>value
1179 return 0
1180 cdef get_itemsize(self): return sizeof(int)
1181 cdef get_default_format(self): return b"@i"
1183 cdef class UnsignedIntMockBuffer(MockBuffer):
1184 cdef int write(self, char* buf, object value) except -1:
1185 (<unsigned int*>buf)[0] = <unsigned int>value
1186 return 0
1187 cdef get_itemsize(self): return sizeof(unsigned int)
1188 cdef get_default_format(self): return b"@I"
1190 cdef class ShortMockBuffer(MockBuffer):
1191 cdef int write(self, char* buf, object value) except -1:
1192 (<short*>buf)[0] = <short>value
1193 return 0
1194 cdef get_itemsize(self): return sizeof(short)
1195 cdef get_default_format(self): return b"h" # Try without endian specifier
1197 cdef class UnsignedShortMockBuffer(MockBuffer):
1198 cdef int write(self, char* buf, object value) except -1:
1199 (<unsigned short*>buf)[0] = <unsigned short>value
1200 return 0
1201 cdef get_itemsize(self): return sizeof(unsigned short)
1202 cdef get_default_format(self): return b"@1H" # Try with repeat count
1204 cdef class FloatMockBuffer(MockBuffer):
1205 cdef int write(self, char* buf, object value) except -1:
1206 (<float*>buf)[0] = <float>value
1207 return 0
1208 cdef get_itemsize(self): return sizeof(float)
1209 cdef get_default_format(self): return b"f"
1211 cdef class DoubleMockBuffer(MockBuffer):
1212 cdef int write(self, char* buf, object value) except -1:
1213 (<double*>buf)[0] = <double>value
1214 return 0
1215 cdef get_itemsize(self): return sizeof(double)
1216 cdef get_default_format(self): return b"d"
1218 cdef extern from *:
1219 void* addr_of_pyobject "(void*)"(object)
1221 cdef class ObjectMockBuffer(MockBuffer):
1222 cdef int write(self, char* buf, object value) except -1:
1223 (<void**>buf)[0] = addr_of_pyobject(value)
1224 return 0
1226 cdef get_itemsize(self): return sizeof(void*)
1227 cdef get_default_format(self): return b"@O"
1230 cdef class IntStridedMockBuffer(IntMockBuffer):
1231 cdef __cythonbufferdefaults__ = {"mode" : "strided"}
1233 cdef class ErrorBuffer:
1234 cdef object label
1236 def __init__(self, label):
1237 self.label = label
1239 def __getbuffer__(ErrorBuffer self, Py_buffer* buffer, int flags):
1240 raise Exception("acquiring %s" % self.label)
1242 def __releasebuffer__(ErrorBuffer self, Py_buffer* buffer):
1243 raise Exception("releasing %s" % self.label)
1245 #
1246 # Typed buffers
1247 #
1248 @testcase
1249 def typedbuffer1(obj):
1250 """
1251 >>> typedbuffer1(IntMockBuffer("A", range(10)))
1252 acquired A
1253 released A
1254 >>> typedbuffer1(None)
1255 >>> typedbuffer1(4)
1256 Traceback (most recent call last):
1257 ...
1258 TypeError: Cannot convert int to bufaccess.IntMockBuffer
1259 """
1260 cdef IntMockBuffer[int, ndim=1] buf = obj
1262 @testcase
1263 def typedbuffer2(IntMockBuffer[int, ndim=1] obj):
1264 """
1265 >>> typedbuffer2(IntMockBuffer("A", range(10)))
1266 acquired A
1267 released A
1268 >>> typedbuffer2(None)
1269 >>> typedbuffer2(4)
1270 Traceback (most recent call last):
1271 ...
1272 TypeError: Argument 'obj' has incorrect type (expected bufaccess.IntMockBuffer, got int)
1273 """
1274 pass
1276 #
1277 # Test __cythonbufferdefaults__
1278 #
1279 @testcase
1280 def bufdefaults1(IntStridedMockBuffer[int, ndim=1] buf):
1281 """
1282 For IntStridedMockBuffer, mode should be
1283 "strided" by defaults which should show
1284 up in the flags.
1286 >>> A = IntStridedMockBuffer("A", range(10))
1287 >>> bufdefaults1(A)
1288 acquired A
1289 released A
1290 >>> [str(x) for x in A.recieved_flags]
1291 ['FORMAT', 'ND', 'STRIDES']
1292 """
1293 pass
1296 #
1297 # Structs
1298 #
1299 cdef struct MyStruct:
1300 char a
1301 char b
1302 long long int c
1303 int d
1304 int e
1306 cdef struct SmallStruct:
1307 int a
1308 int b
1310 cdef struct NestedStruct:
1311 SmallStruct x
1312 SmallStruct y
1313 int z
1315 cdef class MyStructMockBuffer(MockBuffer):
1316 cdef int write(self, char* buf, object value) except -1:
1317 cdef MyStruct* s
1318 s = <MyStruct*>buf;
1319 s.a, s.b, s.c, s.d, s.e = value
1320 return 0
1322 cdef get_itemsize(self): return sizeof(MyStruct)
1323 cdef get_default_format(self): return b"2bq2i"
1325 cdef class NestedStructMockBuffer(MockBuffer):
1326 cdef int write(self, char* buf, object value) except -1:
1327 cdef NestedStruct* s
1328 s = <NestedStruct*>buf;
1329 s.x.a, s.x.b, s.y.a, s.y.b, s.z = value
1330 return 0
1332 cdef get_itemsize(self): return sizeof(NestedStruct)
1333 cdef get_default_format(self): return b"2T{ii}i"
1335 @testcase
1336 def basic_struct(object[MyStruct] buf):
1337 """
1338 >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)]))
1339 1 2 3 4 5
1340 >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="bbqii"))
1341 1 2 3 4 5
1342 >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="23bqii"))
1343 Traceback (most recent call last):
1344 ...
1345 ValueError: Buffer dtype mismatch (expected long long, got char)
1346 >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="i"))
1347 Traceback (most recent call last):
1348 ...
1349 ValueError: Buffer dtype mismatch (expected char, got int)
1350 """
1351 print buf[0].a, buf[0].b, buf[0].c, buf[0].d, buf[0].e
1353 @testcase
1354 def nested_struct(object[NestedStruct] buf):
1355 """
1356 >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)]))
1357 1 2 3 4 5
1358 >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="T{ii}T{2i}i"))
1359 1 2 3 4 5
1360 >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="iiiii"))
1361 Traceback (most recent call last):
1362 ...
1363 ValueError: Buffer dtype mismatch (expected SmallStruct, got int)
1364 >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="T{iii}T{ii}i"))
1365 Traceback (most recent call last):
1366 ...
1367 ValueError: Buffer dtype mismatch (expected end of SmallStruct struct, got int)
1368 """
1369 print buf[0].x.a, buf[0].x.b, buf[0].y.a, buf[0].y.b, buf[0].z
1372 cdef struct LongComplex:
1373 long double real
1374 long double imag
1376 cdef struct MixedComplex:
1377 long double real
1378 float imag
1380 cdef class LongComplexMockBuffer(MockBuffer):
1381 cdef int write(self, char* buf, object value) except -1:
1382 cdef LongComplex* s
1383 s = <LongComplex*>buf;
1384 s.real, s.imag = value
1385 return 0
1387 cdef get_itemsize(self): return sizeof(LongComplex)
1388 cdef get_default_format(self): return b"Zg"
1390 @testcase
1391 def complex_struct_dtype(object[LongComplex] buf):
1392 """
1393 Note that the format string is "Zg" rather than "2g"...
1394 >>> complex_struct_dtype(LongComplexMockBuffer(None, [(0, -1)]))
1395 0.0 -1.0
1396 """
1397 print buf[0].real, buf[0].imag
1399 @testcase
1400 def mixed_complex_struct_dtype(object[MixedComplex] buf):
1401 """
1402 Triggering a specific execution path for this case.
1404 >>> mixed_complex_struct_dtype(LongComplexMockBuffer(None, [(0, -1)]))
1405 Traceback (most recent call last):
1406 ...
1407 ValueError: Cannot store complex number in 'MixedComplex' as 'long double' differs from 'float' in size.
1408 """
1409 print buf[0].real, buf[0].imag
1411 @testcase
1412 def complex_struct_inplace(object[LongComplex] buf):
1413 """
1414 >>> complex_struct_inplace(LongComplexMockBuffer(None, [(0, -1)]))
1415 1.0 1.0
1416 """
1417 buf[0].real += 1
1418 buf[0].imag += 2
1419 print buf[0].real, buf[0].imag
