From 1b5a585a3cfe1891d4ccb26eca5376f0f7774778 Mon Sep 17 00:00:00 2001 From: Stan Kirdey Date: Mon, 20 Jan 2025 20:11:10 -0800 Subject: [PATCH] Add CSV parsing options (#813) * Update dc.py Adding support for CSV files where values can span several lines, pyarrow parser already supports it * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update dc.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * adding csv parse options config * naming of parse_options_config to parse_options * typo * fix tests, address PR review --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Ivan Shcheklein --- src/datachain/lib/dc.py | 16 +++++++++++++--- tests/unit/lib/test_datachain.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/src/datachain/lib/dc.py b/src/datachain/lib/dc.py index 92ac525b0..945ba3c1f 100644 --- a/src/datachain/lib/dc.py +++ b/src/datachain/lib/dc.py @@ -1942,7 +1942,7 @@ def parse_tabular( def from_csv( cls, path, - delimiter: str = ",", + delimiter: Optional[str] = None, header: bool = True, output: OutputType = None, object_name: str = "", @@ -1952,6 +1952,7 @@ def from_csv( session: Optional[Session] = None, settings: Optional[dict] = None, column_types: Optional[dict[str, "Union[str, ArrowDataType]"]] = None, + parse_options: Optional[dict[str, "Union[str, Union[bool, Callable]]"]] = None, **kwargs, ) -> "DataChain": """Generate chain from csv files. @@ -1959,7 +1960,8 @@ def from_csv( Parameters: path : Storage URI with directory. URI must start with storage prefix such as `s3://`, `gs://`, `az://` or "file:///". - delimiter : Character for delimiting columns. + delimiter : Character for delimiting columns. Takes precedence if also + specified in `parse_options`. Defaults to ",". header : Whether the files include a header row. output : Dictionary or feature class defining column names and their corresponding types. List of column names is also accepted, in which @@ -1973,6 +1975,8 @@ def from_csv( column_types : Dictionary of column names and their corresponding types. It is passed to CSV reader and for each column specified type auto inference is disabled. + parse_options: Tells the parser how to process lines. + See https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html Example: Reading a csv file: @@ -1990,6 +1994,12 @@ def from_csv( from pyarrow.dataset import CsvFileFormat from pyarrow.lib import type_for_alias + parse_options = parse_options or {} + if "delimiter" not in parse_options: + parse_options["delimiter"] = "," + if delimiter: + parse_options["delimiter"] = delimiter + if column_types: column_types = { name: type_for_alias(typ) if isinstance(typ, str) else typ @@ -2017,7 +2027,7 @@ def from_csv( msg = f"error parsing csv - incompatible output type {type(output)}" raise DatasetPrepareError(chain.name, msg) - parse_options = ParseOptions(delimiter=delimiter) + parse_options = ParseOptions(**parse_options) read_options = ReadOptions(column_names=column_names) convert_options = ConvertOptions( strings_can_be_null=True, diff --git a/tests/unit/lib/test_datachain.py b/tests/unit/lib/test_datachain.py index 26590dcd3..89608bf4d 100644 --- a/tests/unit/lib/test_datachain.py +++ b/tests/unit/lib/test_datachain.py @@ -1332,6 +1332,34 @@ def test_from_csv_column_types(tmp_dir, test_session): assert df1["age"].dtype == pd.StringDtype +def test_from_csv_parse_options(tmp_dir, test_session): + def skip_comment(row): + if row.text.startswith("# "): + return "skip" + return "error" + + s = ( + "animals;n_legs;entry\n" + "Flamingo;2;2022-03-01\n" + "# Comment here:\n" + "Horse;4;2022-03-02\n" + "Brittle stars;5;2022-03-03\n" + "Centipede;100;2022-03-04" + ) + + path = tmp_dir / "test.csv" + path.write_text(s) + + dc = DataChain.from_csv( + path.as_uri(), + session=test_session, + parse_options={"invalid_row_handler": skip_comment, "delimiter": ";"}, + ) + + df = dc.select("animals", "n_legs", "entry").to_pandas() + assert set(df["animals"]) == {"Horse", "Centipede", "Brittle stars", "Flamingo"} + + def test_to_csv_features(tmp_dir, test_session): dc_to = DataChain.from_values( f1=features, num=range(len(features)), session=test_session