From 35549aa16da8645044bf658ae7c7d107d42dc3d3 Mon Sep 17 00:00:00 2001 From: adityaksolves Date: Tue, 17 Mar 2026 19:28:51 +0530 Subject: [PATCH] [SPARK-36600][SQL] Avoid unnecessary list conversion in createDataFrame --- python/pyspark/sql/session.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 3b1415210baf3..187c43276f3bf 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -1237,16 +1237,32 @@ def _createFromLocal( Create an RDD for DataFrame from a list or pandas.DataFrame, returns the RDD and schema. """ - # make sure data could consumed multiple times - if not isinstance(data, list): - data = list(data) + import itertools - if any(isinstance(d, VariantVal) for d in data): - raise PySparkValueError("Rows cannot be of type VariantVal") + # Check the first element for VariantVal without exhausting the generator + data, peek_data = itertools.tee(data) + first_row = next(peek_data, None) + if first_row is not None and isinstance(first_row, VariantVal): + raise PySparkValueError( + errorClass="CANNOT_INFER_EMPTY_SCHEMA", + messageParameters={}, + ) tupled_data: Iterable[Tuple] if schema is None or isinstance(schema, (list, tuple)): - struct = self._inferSchemaFromList(data, names=schema) + if not isinstance(data, list): + data = list(data) + + if len(data) == 0: + if schema is None: + raise PySparkValueError( + errorClass="CANNOT_INFER_EMPTY_SCHEMA", + messageParameters={}, + ) + struct = self._inferSchemaFromList([(None,) * len(schema)], names=schema) + else: + struct = self._inferSchemaFromList(data, names=schema) + converter = _create_converter(struct) tupled_data = map(converter, data) if isinstance(schema, (list, tuple)): @@ -1268,8 +1284,8 @@ def _createFromLocal( }, ) - # convert python objects to sql data - internal_data = [struct.toInternal(row) for row in tupled_data] + # Use map to keep data lazy and avoid OutOfMemoryError + internal_data = map(struct.toInternal, tupled_data) return self._sc.parallelize(internal_data), struct @staticmethod