# -*- coding: utf-8 -*- """ Tests the 'read_fwf' function in parsers.py. This test suite is independent of the others because the engine is set to 'python-fwf' internally. """ from datetime import datetime import pytest import numpy as np import pandas as pd import pandas.util.testing as tm from pandas import DataFrame from pandas import compat from pandas.compat import StringIO, BytesIO from pandas.io.parsers import read_csv, read_fwf, EmptyDataError class TestFwfParsing(object): def test_fwf(self): data_expected = """\ 2011,58,360.242940,149.910199,11950.7 2011,59,444.953632,166.985655,11788.4 2011,60,364.136849,183.628767,11806.2 2011,61,413.836124,184.375703,11916.8 2011,62,502.953953,173.237159,12468.3 """ expected = read_csv(StringIO(data_expected), engine='python', header=None) data1 = """\ 201158 360.242940 149.910199 11950.7 201159 444.953632 166.985655 11788.4 201160 364.136849 183.628767 11806.2 201161 413.836124 184.375703 11916.8 201162 502.953953 173.237159 12468.3 """ colspecs = [(0, 4), (4, 8), (8, 20), (21, 33), (34, 43)] df = read_fwf(StringIO(data1), colspecs=colspecs, header=None) tm.assert_frame_equal(df, expected) data2 = """\ 2011 58 360.242940 149.910199 11950.7 2011 59 444.953632 166.985655 11788.4 2011 60 364.136849 183.628767 11806.2 2011 61 413.836124 184.375703 11916.8 2011 62 502.953953 173.237159 12468.3 """ df = read_fwf(StringIO(data2), widths=[5, 5, 13, 13, 7], header=None) tm.assert_frame_equal(df, expected) # From Thomas Kluyver: apparently some non-space filler characters can # be seen, this is supported by specifying the 'delimiter' character: # http://publib.boulder.ibm.com/infocenter/dmndhelp/v6r1mx/index.jsp?topic=/com.ibm.wbit.612.help.config.doc/topics/rfixwidth.html data3 = """\ 201158~~~~360.242940~~~149.910199~~~11950.7 201159~~~~444.953632~~~166.985655~~~11788.4 201160~~~~364.136849~~~183.628767~~~11806.2 201161~~~~413.836124~~~184.375703~~~11916.8 201162~~~~502.953953~~~173.237159~~~12468.3 """ df = read_fwf( StringIO(data3), colspecs=colspecs, delimiter='~', header=None) tm.assert_frame_equal(df, expected) with tm.assert_raises_regex(ValueError, "must specify only one of"): read_fwf(StringIO(data3), colspecs=colspecs, widths=[6, 10, 10, 7]) with tm.assert_raises_regex(ValueError, "Must specify either"): read_fwf(StringIO(data3), colspecs=None, widths=None) def test_BytesIO_input(self): if not compat.PY3: pytest.skip( "Bytes-related test - only needs to work on Python 3") result = read_fwf(BytesIO("שלום\nשלום".encode('utf8')), widths=[ 2, 2], encoding='utf8') expected = DataFrame([["של", "ום"]], columns=["של", "ום"]) tm.assert_frame_equal(result, expected) def test_fwf_colspecs_is_list_or_tuple(self): data = """index,A,B,C,D foo,2,3,4,5 bar,7,8,9,10 baz,12,13,14,15 qux,12,13,14,15 foo2,12,13,14,15 bar2,12,13,14,15 """ with tm.assert_raises_regex(TypeError, 'column specifications must ' 'be a list or tuple.+'): pd.io.parsers.FixedWidthReader(StringIO(data), {'a': 1}, ',', '#') def test_fwf_colspecs_is_list_or_tuple_of_two_element_tuples(self): data = """index,A,B,C,D foo,2,3,4,5 bar,7,8,9,10 baz,12,13,14,15 qux,12,13,14,15 foo2,12,13,14,15 bar2,12,13,14,15 """ with tm.assert_raises_regex(TypeError, 'Each column specification ' 'must be.+'): read_fwf(StringIO(data), [('a', 1)]) def test_fwf_colspecs_None(self): # GH 7079 data = """\ 123456 456789 """ colspecs = [(0, 3), (3, None)] result = read_fwf(StringIO(data), colspecs=colspecs, header=None) expected = DataFrame([[123, 456], [456, 789]]) tm.assert_frame_equal(result, expected) colspecs = [(None, 3), (3, 6)] result = read_fwf(StringIO(data), colspecs=colspecs, header=None) expected = DataFrame([[123, 456], [456, 789]]) tm.assert_frame_equal(result, expected) colspecs = [(0, None), (3, None)] result = read_fwf(StringIO(data), colspecs=colspecs, header=None) expected = DataFrame([[123456, 456], [456789, 789]]) tm.assert_frame_equal(result, expected) colspecs = [(None, None), (3, 6)] result = read_fwf(StringIO(data), colspecs=colspecs, header=None) expected = DataFrame([[123456, 456], [456789, 789]]) tm.assert_frame_equal(result, expected) def test_fwf_regression(self): # GH 3594 # turns out 'T060' is parsable as a datetime slice! tzlist = [1, 10, 20, 30, 60, 80, 100] ntz = len(tzlist) tcolspecs = [16] + [8] * ntz tcolnames = ['SST'] + ["T%03d" % z for z in tzlist[1:]] data = """ 2009164202000 9.5403 9.4105 8.6571 7.8372 6.0612 5.8843 5.5192 2009164203000 9.5435 9.2010 8.6167 7.8176 6.0804 5.8728 5.4869 2009164204000 9.5873 9.1326 8.4694 7.5889 6.0422 5.8526 5.4657 2009164205000 9.5810 9.0896 8.4009 7.4652 6.0322 5.8189 5.4379 2009164210000 9.6034 9.0897 8.3822 7.4905 6.0908 5.7904 5.4039 """ df = read_fwf(StringIO(data), index_col=0, header=None, names=tcolnames, widths=tcolspecs, parse_dates=True, date_parser=lambda s: datetime.strptime(s, '%Y%j%H%M%S')) for c in df.columns: res = df.loc[:, c] assert len(res) def test_fwf_for_uint8(self): data = """1421302965.213420 PRI=3 PGN=0xef00 DST=0x17 SRC=0x28 04 154 00 00 00 00 00 127 1421302964.226776 PRI=6 PGN=0xf002 SRC=0x47 243 00 00 255 247 00 00 71""" # noqa df = read_fwf(StringIO(data), colspecs=[(0, 17), (25, 26), (33, 37), (49, 51), (58, 62), (63, 1000)], names=['time', 'pri', 'pgn', 'dst', 'src', 'data'], converters={ 'pgn': lambda x: int(x, 16), 'src': lambda x: int(x, 16), 'dst': lambda x: int(x, 16), 'data': lambda x: len(x.split(' '))}) expected = DataFrame([[1421302965.213420, 3, 61184, 23, 40, 8], [1421302964.226776, 6, 61442, None, 71, 8]], columns=["time", "pri", "pgn", "dst", "src", "data"]) expected["dst"] = expected["dst"].astype(object) tm.assert_frame_equal(df, expected) def test_fwf_compression(self): try: import gzip import bz2 except ImportError: pytest.skip("Need gzip and bz2 to run this test") data = """1111111111 2222222222 3333333333""".strip() widths = [5, 5] names = ['one', 'two'] expected = read_fwf(StringIO(data), widths=widths, names=names) if compat.PY3: data = bytes(data, encoding='utf-8') comps = [('gzip', gzip.GzipFile), ('bz2', bz2.BZ2File)] for comp_name, compresser in comps: with tm.ensure_clean() as path: tmp = compresser(path, mode='wb') tmp.write(data) tmp.close() result = read_fwf(path, widths=widths, names=names, compression=comp_name) tm.assert_frame_equal(result, expected) def test_comment_fwf(self): data = """ 1 2. 4 #hello world 5 NaN 10.0 """ expected = np.array([[1, 2., 4], [5, np.nan, 10.]]) df = read_fwf(StringIO(data), colspecs=[(0, 3), (4, 9), (9, 25)], comment='#') tm.assert_almost_equal(df.values, expected) def test_1000_fwf(self): data = """ 1 2,334.0 5 10 13 10. """ expected = np.array([[1, 2334., 5], [10, 13, 10]]) df = read_fwf(StringIO(data), colspecs=[(0, 3), (3, 11), (12, 16)], thousands=',') tm.assert_almost_equal(df.values, expected) def test_bool_header_arg(self): # see gh-6114 data = """\ MyColumn a b a b""" for arg in [True, False]: with pytest.raises(TypeError): read_fwf(StringIO(data), header=arg) def test_full_file(self): # File with all values test = """index A B C 2000-01-03T00:00:00 0.980268513777 3 foo 2000-01-04T00:00:00 1.04791624281 -4 bar 2000-01-05T00:00:00 0.498580885705 73 baz 2000-01-06T00:00:00 1.12020151869 1 foo 2000-01-07T00:00:00 0.487094399463 0 bar 2000-01-10T00:00:00 0.836648671666 2 baz 2000-01-11T00:00:00 0.157160753327 34 foo""" colspecs = ((0, 19), (21, 35), (38, 40), (42, 45)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test))) def test_full_file_with_missing(self): # File with missing values test = """index A B C 2000-01-03T00:00:00 0.980268513777 3 foo 2000-01-04T00:00:00 1.04791624281 -4 bar 0.498580885705 73 baz 2000-01-06T00:00:00 1.12020151869 1 foo 2000-01-07T00:00:00 0 bar 2000-01-10T00:00:00 0.836648671666 2 baz 34""" colspecs = ((0, 19), (21, 35), (38, 40), (42, 45)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test))) def test_full_file_with_spaces(self): # File with spaces in columns test = """ Account Name Balance CreditLimit AccountCreated 101 Keanu Reeves 9315.45 10000.00 1/17/1998 312 Gerard Butler 90.00 1000.00 8/6/2003 868 Jennifer Love Hewitt 0 17000.00 5/25/1985 761 Jada Pinkett-Smith 49654.87 100000.00 12/5/2006 317 Bill Murray 789.65 5000.00 2/5/2007 """.strip('\r\n') colspecs = ((0, 7), (8, 28), (30, 38), (42, 53), (56, 70)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test))) def test_full_file_with_spaces_and_missing(self): # File with spaces and missing values in columns test = """ Account Name Balance CreditLimit AccountCreated 101 10000.00 1/17/1998 312 Gerard Butler 90.00 1000.00 8/6/2003 868 5/25/1985 761 Jada Pinkett-Smith 49654.87 100000.00 12/5/2006 317 Bill Murray 789.65 """.strip('\r\n') colspecs = ((0, 7), (8, 28), (30, 38), (42, 53), (56, 70)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test))) def test_messed_up_data(self): # Completely messed up file test = """ Account Name Balance Credit Limit Account Created 101 10000.00 1/17/1998 312 Gerard Butler 90.00 1000.00 761 Jada Pinkett-Smith 49654.87 100000.00 12/5/2006 317 Bill Murray 789.65 """.strip('\r\n') colspecs = ((2, 10), (15, 33), (37, 45), (49, 61), (64, 79)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test))) def test_multiple_delimiters(self): test = r""" col1~~~~~col2 col3++++++++++++++++++col4 ~~22.....11.0+++foo~~~~~~~~~~Keanu Reeves 33+++122.33\\\bar.........Gerard Butler ++44~~~~12.01 baz~~Jennifer Love Hewitt ~~55 11+++foo++++Jada Pinkett-Smith ..66++++++.03~~~bar Bill Murray """.strip('\r\n') colspecs = ((0, 4), (7, 13), (15, 19), (21, 41)) expected = read_fwf(StringIO(test), colspecs=colspecs, delimiter=' +~.\\') tm.assert_frame_equal(expected, read_fwf(StringIO(test), delimiter=' +~.\\')) def test_variable_width_unicode(self): if not compat.PY3: pytest.skip( 'Bytes-related test - only needs to work on Python 3') test = """ שלום שלום ום שלל של ום """.strip('\r\n') expected = read_fwf(BytesIO(test.encode('utf8')), colspecs=[(0, 4), (5, 9)], header=None, encoding='utf8') tm.assert_frame_equal(expected, read_fwf( BytesIO(test.encode('utf8')), header=None, encoding='utf8')) def test_dtype(self): data = """ a b c 1 2 3.2 3 4 5.2 """ colspecs = [(0, 5), (5, 10), (10, None)] result = pd.read_fwf(StringIO(data), colspecs=colspecs) expected = pd.DataFrame({ 'a': [1, 3], 'b': [2, 4], 'c': [3.2, 5.2]}, columns=['a', 'b', 'c']) tm.assert_frame_equal(result, expected) expected['a'] = expected['a'].astype('float64') expected['b'] = expected['b'].astype(str) expected['c'] = expected['c'].astype('int32') result = pd.read_fwf(StringIO(data), colspecs=colspecs, dtype={'a': 'float64', 'b': str, 'c': 'int32'}) tm.assert_frame_equal(result, expected) def test_skiprows_inference(self): # GH11256 test = """ Text contained in the file header DataCol1 DataCol2 0.0 1.0 101.6 956.1 """.strip() expected = read_csv(StringIO(test), skiprows=2, delim_whitespace=True) tm.assert_frame_equal(expected, read_fwf( StringIO(test), skiprows=2)) def test_skiprows_by_index_inference(self): test = """ To be skipped Not To Be Skipped Once more to be skipped 123 34 8 123 456 78 9 456 """.strip() expected = read_csv(StringIO(test), skiprows=[0, 2], delim_whitespace=True) tm.assert_frame_equal(expected, read_fwf( StringIO(test), skiprows=[0, 2])) def test_skiprows_inference_empty(self): test = """ AA BBB C 12 345 6 78 901 2 """.strip() with pytest.raises(EmptyDataError): read_fwf(StringIO(test), skiprows=3) def test_whitespace_preservation(self): # Addresses Issue #16772 data_expected = """ a ,bbb cc,dd """ expected = read_csv(StringIO(data_expected), header=None) test_data = """ a bbb ccdd """ result = read_fwf(StringIO(test_data), widths=[3, 3], header=None, skiprows=[0], delimiter="\n\t") tm.assert_frame_equal(result, expected) def test_default_delimiter(self): data_expected = """ a,bbb cc,dd""" expected = read_csv(StringIO(data_expected), header=None) test_data = """ a \tbbb cc\tdd """ result = read_fwf(StringIO(test_data), widths=[3, 3], header=None, skiprows=[0]) tm.assert_frame_equal(result, expected)