# -*- coding: utf-8 -*-
########################################################################
#
# License: BSD
# Created: 2005-05-24
# Author: Ivan Vilata i Balaguer - ivan@selidor.net
#
# $Id$
#
########################################################################
"""Utilities for PyTables' test suites."""
from __future__ import print_function
import os
import re
import sys
import time
import locale
import platform
import tempfile
import warnings
try:
import unittest2 as unittest
except ImportError:
if sys.version_info < (2, 7):
raise
else:
import unittest
import numpy
import numexpr
import tables
from tables.utils import detect_number_of_cores
verbose = False
"""Show detailed output of the testing process."""
heavy = False
"""Run all tests even when they take long to complete."""
show_memory = False
"""Show the progress of memory consumption."""
def parse_argv(argv):
global verbose, heavy
if 'verbose' in argv:
verbose = True
argv.remove('verbose')
if 'silent' in argv: # take care of old flag, just in case
verbose = False
argv.remove('silent')
if '--heavy' in argv:
heavy = True
argv.remove('--heavy')
return argv
zlib_avail = tables.which_lib_version("zlib") is not None
lzo_avail = tables.which_lib_version("lzo") is not None
bzip2_avail = tables.which_lib_version("bzip2") is not None
blosc_avail = tables.which_lib_version("blosc") is not None
def print_heavy(heavy):
if heavy:
print("""Performing the complete test suite!""")
else:
print("""\
Performing only a light (yet comprehensive) subset of the test suite.
If you want a more complete test, try passing the --heavy flag to this script
(or set the 'heavy' parameter in case you are using tables.test() call).
The whole suite will take more than 4 hours to complete on a relatively
modern CPU and around 512 MB of main memory.""")
print('-=' * 38)
[docs]def print_versions():
"""Print all the versions of software that PyTables relies on."""
print('-=' * 38)
print("PyTables version: %s" % tables.__version__)
print("HDF5 version: %s" % tables.which_lib_version("hdf5")[1])
print("NumPy version: %s" % numpy.__version__)
tinfo = tables.which_lib_version("zlib")
if numexpr.use_vml:
# Get only the main version number and strip out all the rest
vml_version = numexpr.get_vml_version()
vml_version = re.findall("[0-9.]+", vml_version)[0]
vml_avail = "using VML/MKL %s" % vml_version
else:
vml_avail = "not using Intel's VML/MKL"
print("Numexpr version: %s (%s)" % (numexpr.__version__, vml_avail))
if tinfo is not None:
print("Zlib version: %s (%s)" % (tinfo[1],
"in Python interpreter"))
tinfo = tables.which_lib_version("lzo")
if tinfo is not None:
print("LZO version: %s (%s)" % (tinfo[1], tinfo[2]))
tinfo = tables.which_lib_version("bzip2")
if tinfo is not None:
print("BZIP2 version: %s (%s)" % (tinfo[1], tinfo[2]))
tinfo = tables.which_lib_version("blosc")
if tinfo is not None:
blosc_date = tinfo[2].split()[1]
print("Blosc version: %s (%s)" % (tinfo[1], blosc_date))
blosc_cinfo = tables.blosc_get_complib_info()
blosc_cinfo = [
"%s (%s)" % (k, v[1]) for k, v in sorted(blosc_cinfo.items())
]
print("Blosc compressors: %s" % ', '.join(blosc_cinfo))
try:
from Cython import __version__ as cython_version
print('Cython version: %s' % cython_version)
except:
pass
print('Python version: %s' % sys.version)
print('Platform: %s' % platform.platform())
#if os.name == 'posix':
# (sysname, nodename, release, version, machine) = os.uname()
# print('Platform: %s-%s' % (sys.platform, machine))
print('Byte-ordering: %s' % sys.byteorder)
print('Detected cores: %s' % detect_number_of_cores())
print('Default encoding: %s' % sys.getdefaultencoding())
print('Default FS encoding: %s' % sys.getfilesystemencoding())
print('Default locale: (%s, %s)' % locale.getdefaultlocale())
print('-=' * 38)
# This should improve readability whan tests are run by CI tools
sys.stdout.flush()
def verbosePrint(string, nonl=False):
"""Print out the `string` if verbose output is enabled."""
if not verbose:
return
if nonl:
print(string, end=' ')
else:
print(string)
def allequal(a, b, flavor="numpy"):
"""Checks if two numerical objects are equal."""
# print("a-->", repr(a))
# print("b-->", repr(b))
if not hasattr(b, "shape"):
# Scalar case
return a == b
if ((not hasattr(a, "shape") or a.shape == ()) and
(not hasattr(b, "shape") or b.shape == ())):
return a == b
if a.shape != b.shape:
if verbose:
print("Shape is not equal:", a.shape, "!=", b.shape)
return 0
# Way to check the type equality without byteorder considerations
if hasattr(b, "dtype") and a.dtype.str[1:] != b.dtype.str[1:]:
if verbose:
print("dtype is not equal:", a.dtype, "!=", b.dtype)
return 0
# Rank-0 case
if len(a.shape) == 0:
if a[()] == b[()]:
return 1
else:
if verbose:
print("Shape is not equal:", a.shape, "!=", b.shape)
return 0
# null arrays
if a.size == 0: # len(a) is not correct for generic shapes
if b.size == 0:
return 1
else:
if verbose:
print("length is not equal")
print("len(a.data) ==>", len(a.data))
print("len(b.data) ==>", len(b.data))
return 0
# Multidimensional case
result = (a == b)
result = numpy.all(result)
if not result and verbose:
print("Some of the elements in arrays are not equal")
return result
def areArraysEqual(arr1, arr2):
"""Are both `arr1` and `arr2` equal arrays?
Arguments can be regular NumPy arrays, chararray arrays or
structured arrays (including structured record arrays). They are
checked for type and value equality.
"""
t1 = type(arr1)
t2 = type(arr2)
if not ((hasattr(arr1, 'dtype') and arr1.dtype == arr2.dtype) or
issubclass(t1, t2) or issubclass(t2, t1)):
return False
return numpy.all(arr1 == arr2)
# COMPATIBILITY: assertWarns is new in Python 3.2
# Code copied from the standard unittest.case module (Python 3.4)
if not hasattr(unittest.TestCase, 'assertWarns'):
class _BaseTestCaseContext:
def __init__(self, test_case):
self.test_case = test_case
def _raiseFailure(self, standardMsg):
msg = self.test_case._formatMessage(self.msg, standardMsg)
raise self.test_case.failureException(msg)
class _AssertRaisesBaseContext(_BaseTestCaseContext):
def __init__(self, expected, test_case, callable_obj=None,
expected_regex=None):
_BaseTestCaseContext.__init__(self, test_case)
self.expected = expected
self.test_case = test_case
if callable_obj is not None:
try:
self.obj_name = callable_obj.__name__
except AttributeError:
self.obj_name = str(callable_obj)
else:
self.obj_name = None
if expected_regex is not None:
expected_regex = re.compile(expected_regex)
self.expected_regex = expected_regex
self.msg = None
def handle(self, name, callable_obj, args, kwargs):
"""
If callable_obj is None, assertRaises/Warns is being used as a
context manager, so check for a 'msg' kwarg and return self.
If callable_obj is not None, call it passing args and kwargs.
"""
if callable_obj is None:
self.msg = kwargs.pop('msg', None)
return self
with self:
callable_obj(*args, **kwargs)
class _AssertWarnsContext(_AssertRaisesBaseContext):
def __enter__(self):
for v in sys.modules.values():
if getattr(v, '__warningregistry__', None):
v.__warningregistry__ = {}
self.warnings_manager = warnings.catch_warnings(record=True)
self.warnings = self.warnings_manager.__enter__()
warnings.simplefilter("always", self.expected)
return self
def __exit__(self, exc_type, exc_value, tb):
self.warnings_manager.__exit__(exc_type, exc_value, tb)
if exc_type is not None:
# let unexpected exceptions pass through
return
try:
exc_name = self.expected.__name__
except AttributeError:
exc_name = str(self.expected)
first_matching = None
for m in self.warnings:
w = m.message
if not isinstance(w, self.expected):
continue
if first_matching is None:
first_matching = w
if (self.expected_regex is not None and
not self.expected_regex.search(str(w))):
continue
# store warning for later retrieval
self.warning = w
self.filename = m.filename
self.lineno = m.lineno
return
# Now we simply try to choose a helpful failure message
if first_matching is not None:
self._raiseFailure(
'"{0}" does not match "{1}"'.format(
self.expected_regex.pattern, str(first_matching)))
if self.obj_name:
self._raiseFailure("{0} not triggered by {1}".format(
exc_name, self.obj_name))
else:
self._raiseFailure("{0} not triggered".format(exc_name))
class PyTablesTestCase(unittest.TestCase):
def tearDown(self):
super(PyTablesTestCase, self).tearDown()
for key in self.__dict__:
if self.__dict__[key].__class__.__name__ not in ('instancemethod'):
self.__dict__[key] = None
def _getName(self):
"""Get the name of this test case."""
return self.id().split('.')[-2]
def _getMethodName(self):
"""Get the name of the method currently running in the test case."""
return self.id().split('.')[-1]
def _verboseHeader(self):
"""Print a nice header for the current test method if verbose."""
if verbose:
name = self._getName()
methodName = self._getMethodName()
title = "Running %s.%s" % (name, methodName)
print('%s\n%s' % (title, '-' * len(title)))
@classmethod
def _testFilename(class_, filename):
"""Returns an absolute version of the `filename`, taking care of the
location of the calling test case class."""
modname = class_.__module__
# When the definitive switch to ``setuptools`` is made,
# this should definitely use the ``pkg_resouces`` API::
#
# return pkg_resources.resource_filename(modname, filename)
#
modfile = sys.modules[modname].__file__
dirname = os.path.dirname(modfile)
return os.path.join(dirname, filename)
# COMPATIBILITY: assertWarns is new in Python 3.2
if not hasattr(unittest.TestCase, 'assertWarns'):
def assertWarns(self, expected_warning, callable_obj=None,
*args, **kwargs):
context = _AssertWarnsContext(expected_warning, self, callable_obj)
return context.handle('assertWarns', callable_obj, args, kwargs)
def _checkEqualityGroup(self, node1, node2, hardlink=False):
if verbose:
print("Group 1:", node1)
print("Group 2:", node2)
if hardlink:
self.assertTrue(
node1._v_pathname != node2._v_pathname,
"node1 and node2 have the same pathnames.")
else:
self.assertTrue(
node1._v_pathname == node2._v_pathname,
"node1 and node2 does not have the same pathnames.")
self.assertTrue(
node1._v_children == node2._v_children,
"node1 and node2 does not have the same children.")
def _checkEqualityLeaf(self, node1, node2, hardlink=False):
if verbose:
print("Leaf 1:", node1)
print("Leaf 2:", node2)
if hardlink:
self.assertTrue(
node1._v_pathname != node2._v_pathname,
"node1 and node2 have the same pathnames.")
else:
self.assertTrue(
node1._v_pathname == node2._v_pathname,
"node1 and node2 does not have the same pathnames.")
self.assertTrue(
areArraysEqual(node1[:], node2[:]),
"node1 and node2 does not have the same values.")
class TestFileMixin(object):
h5fname = None
open_kwargs = {}
def setUp(self):
super(TestFileMixin, self).setUp()
#self.h5fname = self._testFilename(self.testfname)
self.h5file = tables.open_file(
self.h5fname, title=self._getName(), **self.open_kwargs)
def tearDown(self):
"""Close ``h5file``."""
self.h5file.close()
super(TestFileMixin, self).tearDown()
class TempFileMixin(object):
open_mode = 'w'
open_kwargs = {}
def _getTempFileName(self):
return tempfile.mktemp(prefix=self._getName(), suffix='.h5')
def setUp(self):
"""Set ``h5file`` and ``h5fname`` instance attributes.
* ``h5fname``: the name of the temporary HDF5 file.
* ``h5file``: the writable, empty, temporary HDF5 file.
"""
super(TempFileMixin, self).setUp()
self.h5fname = self._getTempFileName()
self.h5file = tables.open_file(
self.h5fname, self.open_mode, title=self._getName(),
**self.open_kwargs)
def tearDown(self):
"""Close ``h5file`` and remove ``h5fname``."""
self.h5file.close()
self.h5file = None
os.remove(self.h5fname) # comment this for debugging purposes only
super(TempFileMixin, self).tearDown()
def _reopen(self, mode='r', **kwargs):
"""Reopen ``h5file`` in the specified ``mode``.
Returns a true or false value depending on whether the file was
reopenend or not. If not, nothing is changed.
"""
self.h5file.close()
self.h5file = tables.open_file(self.h5fname, mode, **kwargs)
return True
class ShowMemTime(PyTablesTestCase):
tref = time.time()
"""Test for showing memory and time consumption."""
def test00(self):
"""Showing memory and time consumption."""
# Obtain memory info (only for Linux 2.6.x)
for line in open("/proc/self/status"):
if line.startswith("VmSize:"):
vmsize = int(line.split()[1])
elif line.startswith("VmRSS:"):
vmrss = int(line.split()[1])
elif line.startswith("VmData:"):
vmdata = int(line.split()[1])
elif line.startswith("VmStk:"):
vmstk = int(line.split()[1])
elif line.startswith("VmExe:"):
vmexe = int(line.split()[1])
elif line.startswith("VmLib:"):
vmlib = int(line.split()[1])
print("\nWallClock time:", time.time() - self.tref)
print("Memory usage: ******* %s *******" % self._getName())
print("VmSize: %7s kB\tVmRSS: %7s kB" % (vmsize, vmrss))
print("VmData: %7s kB\tVmStk: %7s kB" % (vmdata, vmstk))
print("VmExe: %7s kB\tVmLib: %7s kB" % (vmexe, vmlib))
## Local Variables:
## mode: python
## py-indent-offset: 4
## tab-width: 4
## fill-column: 72
## End: