lxml.tests.test_incremental_xmlfile
1
2
3 """
4 Tests for the incremental XML serialisation API.
5 """
6
7 from __future__ import absolute_import
8
9 import io
10 import os
11 import sys
12 import unittest
13 import textwrap
14 import tempfile
15
16 from lxml . etree import LxmlSyntaxError
17
18 this_dir = os . path . dirname ( __file__ )
19 if this_dir not in sys . path :
20 sys . path . insert ( 0 , this_dir )
21
22 from . common_imports import etree , BytesIO , HelperTestCase , skipIf , _str
26 _file = None
27
33
35 with etree . xmlfile ( self . _file ) as xf :
36 with xf . element ( 'test' ) :
37 xf . write ( 'toast' )
38 self . assertXml ( '<test>toast</test>' )
39
48
56
58 with etree . xmlfile ( self . _file ) as xf :
59 with xf . element ( 'test' ) :
60 xf . write ( 'con' )
61 with xf . element ( 'toast' ) :
62 xf . write ( 'tent' )
63 with xf . element ( 'taste' ) :
64 xf . write ( 'inside' )
65 xf . write ( 'tnet' )
66 xf . write ( 'noc' )
67 self . assertXml ( '<test>con<toast>tent<taste>inside</taste>'
68 'tnet</toast>noc</test>' )
69
74
86
92
99
105
112
118
125
127 with etree . xmlfile ( self . _file ) as xf :
128 with xf . element ( '{nsURI}test' , nsmap = { None : 'nsURI' , 'p' : 'ns2' } ) :
129 with xf . element ( '{nsURI}toast' ) :
130 pass
131 with xf . element ( '{ns2}toast' ) :
132 pass
133 self . assertXml (
134 '<test xmlns="nsURI" xmlns:p="ns2"><toast></toast><p:toast></p:toast></test>' )
135
142
149
155
161
167
169 with etree . xmlfile ( self . _file ) as xf :
170 with xf . element ( 'test' ) :
171 xf . write ( 'Comments: <!-- text -->\n' )
172 xf . write ( 'Entities: &' )
173 self . assertXml (
174 '<test>Comments: <!-- text -->\nEntities: &amp;</test>' )
175
181
183 with etree . xmlfile ( self . _file , buffered = False ) as xf :
184 with xf . element ( 'test' ) :
185 self . assertXml ( "<test>" )
186 xf . write ( 'toast' )
187 self . assertXml ( "<test>toast" )
188 with xf . element ( 'taste' ) :
189 self . assertXml ( "<test>toast<taste>" )
190 xf . write ( 'some' , etree . Element ( "more" ) , "toast" )
191 self . assertXml ( "<test>toast<taste>some<more/>toast" )
192 self . assertXml ( "<test>toast<taste>some<more/>toast</taste>" )
193 xf . write ( 'end' )
194 self . assertXml ( "<test>toast<taste>some<more/>toast</taste>end" )
195 self . assertXml ( "<test>toast<taste>some<more/>toast</taste>end</test>" )
196 self . assertXml ( "<test>toast<taste>some<more/>toast</taste>end</test>" )
197
211
213 try :
214 with etree . xmlfile ( self . _file ) as xf :
215 with xf . element ( 'root' ) :
216 with xf . element ( 'test' ) :
217 xf . write ( "BEFORE" )
218 raise TypeError ( "FAIL!" )
219 xf . write ( "AFTER" )
220 except TypeError as exc :
221 self . assertTrue ( "FAIL" in str ( exc ) , exc )
222 else :
223 self . assertTrue ( False , "exception not propagated" )
224 self . assertXml ( "<root><test>BEFORE</test></root>" )
225
234
235 g = gen ( )
236 next ( g )
237 g . send ( 'A' )
238 g . send ( 'B' )
239 g . send ( 'C' )
240 g . close ( )
241 self . assertXml ( "<root><entry>A</entry><entry>B</entry><entry>C</entry></root>" )
242
244 try :
245 with etree . xmlfile ( self . _file ) as xf :
246 xf . write ( 'toast' )
247 except etree . LxmlSyntaxError :
248 self . assertTrue ( True )
249 else :
250 self . assertTrue ( False )
251
253 with etree . xmlfile ( self . _file ) as xf :
254 with xf . element ( 'test' ) :
255 pass
256 try :
257 xf . write ( 'toast' )
258 except etree . LxmlSyntaxError :
259 self . assertTrue ( True )
260 else :
261 self . assertTrue ( False )
262
273
275 cm_exit = None
276 try :
277 with etree . xmlfile ( self . _file ) as xf :
278 x = xf . element ( 'test' )
279 cm_exit = x . __exit__
280 x . __enter__ ( )
281 raise ValueError ( '123' )
282 except ValueError :
283 self . assertTrue ( cm_exit )
284 try :
285 cm_exit ( ValueError , ValueError ( "huhu" ) , None )
286 except etree . LxmlSyntaxError :
287 self . assertTrue ( True )
288 else :
289 self . assertTrue ( False )
290 else :
291 self . assertTrue ( False )
292
294 pos = self . _file . tell ( )
295 self . _file . seek ( 0 )
296 try :
297 return self . _file . read ( )
298 finally :
299 self . _file . seek ( pos )
300
308
312
313 - def assertXml ( self , expected , encoding = 'utf8' ) :
315
319 self . _file = BytesIO ( )
320
326
330 self . _file = tempfile . TemporaryFile ( )
331
332
333 @ skipIf ( sys . platform . startswith ( "win" ) , "Can't reopen temporary files on Windows" )
334 - class TempPathXmlFileTestCase ( _XmlFileTestCaseBase ) :
336 self . _tmpfile = tempfile . NamedTemporaryFile ( )
337 self . _file = self . _tmpfile . name
338
340 try :
341 self . _tmpfile . close ( )
342 finally :
343 if os . path . exists ( self . _tmpfile . name ) :
344 os . unlink ( self . _tmpfile . name )
345
347 self . _tmpfile . seek ( 0 )
348 return self . _tmpfile . read ( )
349
351 self . _tmpfile . seek ( 0 )
352 return etree . parse ( self . _tmpfile )
353
354 @ skipIf ( True , "temp file behaviour is too platform specific here" )
357
358 @ skipIf ( True , "temp file behaviour is too platform specific here" )
361
371
373 assert not self . closed
374 self . closed = True
375 self . _target . close ( )
376
380
382 return self . _target . getvalue ( )
383
385 pos = self . _file . tell ( )
386 self . _target . seek ( 0 )
387 try :
388 return etree . parse ( self . _target )
389 finally :
390 self . _target . seek ( pos )
391
397
404
406 class WriteError ( Exception ) :
407 pass
408
409 class Writer ( object ) :
410 def __init__ ( self , trigger ) :
411 self . _trigger = trigger
412 self . _failed = False
413
414 def write ( self , data ) :
415 assert not self . _failed , "write() called again after failure"
416 if self . _trigger in data :
417 self . _failed = True
418 raise WriteError ( "FAILED: " + self . _trigger . decode ( 'utf8' ) )
419
420 for trigger in [ 'text' , 'root' , 'tag' , 'noflush' ] :
421 try :
422 with etree . xmlfile ( Writer ( trigger . encode ( 'utf8' ) ) , encoding = 'utf8' ) as xf :
423 with xf . element ( 'root' ) :
424 xf . flush ( )
425 with xf . element ( 'tag' ) :
426 xf . write ( 'text' )
427 xf . flush ( )
428 xf . write ( 'noflush' )
429 xf . flush ( )
430 xf . flush ( )
431 except WriteError as exc :
432 self . assertTrue ( 'FAILED: ' + trigger in str ( exc ) )
433 else :
434 self . assertTrue ( False , "exception not raised for '%s'" % trigger )
435
439 self . _file = BytesIO ( )
440
442
443 void_elements = set ( [
444 "area" , "base" , "br" , "col" , "embed" , "hr" , "img" ,
445 "input" , "keygen" , "link" , "meta" , "param" ,
446 "source" , "track" , "wbr"
447 ] )
448
449
450 void_elements . difference_update ( [
451 'area' , 'embed' , 'keygen' , 'source' , 'track' , 'wbr'
452 ] )
453
454 for tag in sorted ( void_elements ) :
455 with etree . htmlfile ( self . _file ) as xf :
456 xf . write ( etree . Element ( tag ) )
457 self . assertXml ( '<%s>' % tag )
458 self . _file = BytesIO ( )
459
461 with etree . htmlfile ( self . _file ) as xf :
462 with xf . element ( 'foo' ) :
463 cm = xf . method ( 'xml' )
464 cm . __enter__ ( )
465
466 self . assertRaises ( LxmlSyntaxError , cm . __enter__ )
467
468 cm2 = xf . method ( 'xml' )
469 cm2 . __enter__ ( )
470 cm2 . __exit__ ( None , None , None )
471
472 self . assertRaises ( LxmlSyntaxError , cm2 . __exit__ , None , None , None )
473
474 cm3 = xf . method ( 'xml' )
475 cm3 . __enter__ ( )
476 with xf . method ( 'html' ) :
477 self . assertRaises ( LxmlSyntaxError , cm3 . __exit__ , None , None , None )
478
512
514
515
516
517 with etree . htmlfile ( self . _file ) as xf :
518 with xf . element ( "root" ) :
519 with xf . element ( 'foo' , attrib = { 'selected' : 'bar' } ) :
520 pass
521
522 self . assertXml (
523 '<root>'
524
525
526 '<foo selected="bar"></foo>'
527 '</root>' )
528 self . _file = BytesIO ( )
529
536
538 with etree . htmlfile ( self . _file ) as xf :
539 with xf . element ( "tagname" , attrib = { "attr" : _str ( '"misquöted\\u3344\\U00013344"' ) } ) :
540 xf . write ( "foo" )
541
542 self . assertXml ( '<tagname attr=""misquöted㍄𓍄"">foo</tagname>' )
543
550
557
567
572
578
582 out = io . BytesIO ( )
583 xf = etree . xmlfile ( out )
584 scm = xf . __enter__ ( )
585 acm = xf . __aenter__ ( )
586 list ( acm . __await__ ( ) )
587
588 def api_of ( obj ) :
589 return sorted ( name for name in dir ( scm ) if not name . startswith ( '__' ) )
590
591 a_api = api_of ( acm )
592
593 self . assertEqual ( api_of ( scm ) , api_of ( acm ) )
594 self . assertTrue ( 'write' in a_api )
595 self . assertTrue ( 'element' in a_api )
596 self . assertTrue ( 'method' in a_api )
597 self . assertTrue ( len ( a_api ) > 5 )
598
600 while True :
601 try :
602 coro . send ( None )
603 except StopIteration as ex :
604 return ex . value
605
606 @ skipIf ( sys . version_info < ( 3 , 5 ) , "requires support for async-def (Py3.5+)" )
608 code = textwrap . dedent ( """\
609 async def test_async_xmlfile(close=True, buffered=True):
610 class Writer(object):
611 def __init__(self):
612 self._data = []
613 self._all_data = None
614 self._calls = 0
615
616 async def write(self, data):
617 self._calls += 1
618 self._data.append(data)
619
620 async def close(self):
621 assert self._all_data is None
622 assert self._data is not None
623 self._all_data = b''.join(self._data)
624 self._data = None # make writing fail afterwards
625
626 async def generate(out, close=True, buffered=True):
627 async with etree.xmlfile(out, close=close, buffered=buffered) as xf:
628 async with xf.element('root'):
629 await xf.write('root-text')
630 async with xf.method('html'):
631 await xf.write(etree.Element('img', src='http://huhu.org/'))
632 await xf.flush()
633 for i in range(3):
634 async with xf.element('el'):
635 await xf.write('text-%d' % i)
636
637 out = Writer()
638 await generate(out, close=close, buffered=buffered)
639 if not close:
640 await out.close()
641 assert out._data is None, out._data
642 return out._all_data, out._calls
643 """ )
644 lns = { }
645 exec ( code , globals ( ) , lns )
646 test_async_xmlfile = lns [ 'test_async_xmlfile' ]
647
648 expected = (
649 b'<root>root-text<img src="http://huhu.org/">'
650 b'<el>text-0</el><el>text-1</el><el>text-2</el></root>'
651 )
652
653 data , calls = self . _run_async ( test_async_xmlfile ( close = True ) )
654 self . assertEqual ( expected , data )
655 self . assertEqual ( 2 , calls )
656
657 data , calls = self . _run_async ( test_async_xmlfile ( close = False ) )
658 self . assertEqual ( expected , data )
659 self . assertEqual ( 2 , calls )
660
661 data , unbuffered_calls = self . _run_async ( test_async_xmlfile ( buffered = False ) )
662 self . assertEqual ( expected , data )
663 self . assertTrue ( unbuffered_calls > calls , unbuffered_calls )
664
677
678
679 if __name__ == '__main__' :
680 print ( 'to test use test.py %s' % __file__ )
681