From 6d9d656fa1e60ced370bbec92584b194dac68902 Mon Sep 17 00:00:00 2001 From: Christian Loitsch Date: Tue, 17 Mar 2026 20:24:41 +0100 Subject: [PATCH] feat: unnest transform --- CHANGELOG.md | 17 +++ README.md | 218 +++++++++++++++++++++----------- benchmark/benchmark_csv.dart | 52 +++----- example/csv_fuse_example.dart | 12 +- lib/csv.dart | 8 +- lib/src/csv_codec.dart | 142 +++++++++++++++++++-- lib/src/csv_decoder.dart | 106 ++++++++++++---- lib/src/csv_encoder.dart | 153 +++++++++++----------- pubspec.yaml | 5 +- test/csv_test.dart | 36 ++---- test/multi_char_delim_test.dart | 31 ++--- test/parse_headers_test.dart | 2 +- test/small_chunk_test.dart | 4 +- test/split_crlf_test.dart | 29 ++--- test/split_escape_test.dart | 21 +-- 15 files changed, 534 insertions(+), 302 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a35efe7..eeea90c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,20 @@ +# 8.0.0 +Fix stream nesting issue (#77): `stream.transform(csv.decoder).toList()` now +correctly returns `List>` instead of `List>>`. + +**Breaking changes:** +- `CsvCodec` has been renamed to `Csv`. A deprecated `CsvCodec` typedef is + provided for migration. +- `Csv` does not extend `dart:convert`'s `Codec`. Use `asCodec()` if you + need a `Codec` (e.g., for `.fuse()`). +- `CsvDecoder` is now a `StreamTransformerBase>` instead of + a `Converter>>`. Each stream event is a single row. +- `CsvEncoder` is now a `StreamTransformerBase, String>` instead of + a `Converter>, String>`. Each stream event is a single row. +- `csv.decoder.fuse(...)` → use `csv.asCodec().decoder.fuse(...)` instead. + +See the "The Codec Problem" section in the README for a detailed explanation. + # 7.2.0 Document CsvRow map-like access and necessary casts. Add decodeWithHeaders() helper function. diff --git a/README.md b/README.md index 119d974..0a994be 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,20 @@ A high-quality, best-practice CSV library for Dart, inspired by PapaParse but built with Dart idioms in mind. -## Upgrading from Version 6 -Version 7 is a complete rewrite and introduces breaking changes. -If you rely on the specific flexibility of version 6 (e.g., complex eol handling not supported here), -please consult [doc/README-v6.md](doc/README-v6.md) and continue using version 6. +## Upgrading from Version 7 +Version 8 fixes an issue where using `csv.decoder` as a stream transformer +produced an extra layer of nesting ([#77](https://github.com/close2/csv/issues/77)). + +- `CsvCodec` has been renamed to `Csv`. A deprecated `CsvCodec` typedef is + available for migration. +- `Csv` no longer extends `dart:convert`'s `Codec` class. If you need a `Codec` +(e.g., for `.fuse()`), use [`asCodec()`](#the-codec-problem--ascodec). + +If you rely on the version 6 API, please consult [doc/README-v6.md](doc/README-v6.md). ## Features -- **Darty API**: Fully implements `Codec` and `Converter` interfaces from `dart:convert`. +- **Stream-friendly**: `decoder` and `encoder` are proper `StreamTransformer`s — one row per stream event. - **Easy Excel Compatibility**: Built-in support for Excel-compatible CSVs (UTF-8 BOM, `;` separator, `\r\n` line endings). - **Auto-detection**: Smartly detects delimiters and line endings. - **Robust Parsing**: Handles quoted fields, escaped quotes, and even malformed CSVs graciously (similar to PapaParse). @@ -19,7 +25,7 @@ please consult [doc/README-v6.md](doc/README-v6.md) and continue using version 6 ### Delimiters -The `CvCodec` and `CsvDecoder` support: +The `Csv` class and `CsvDecoder` support: * **Field Delimiters**: Can be single or multi-character strings (e.g., `,`, `::`, `|`). * **Quote Character**: Must be a **single character**. Defaults to `"`. * **Escape Character**: Must be a **single character** (if provided). Defaults to the quote character. @@ -35,7 +41,7 @@ If you write the CSV string directly to a file using certain methods (like `File To avoid this, either: 1. **Use `\n` as the line delimiter:** ```dart - final codec = CsvCodec(lineDelimiter: '\n'); + final codec = Csv(lineDelimiter: '\n'); final csvString = codec.encode(data); ``` 2. **Use a binary writer** (e.g., `File.openWrite()`) which writes bytes exactly as they are. @@ -64,6 +70,54 @@ void main() { } ``` +### Stream Transformation (e.g., reading a file) + +The decoder and encoder are `StreamTransformer`s that emit **one row per event**, +so `stream.transform(csv.decoder).toList()` gives you a flat `List>`. + +```dart +import 'dart:convert'; +import 'dart:io'; +import 'package:csv/csv.dart'; + +void main() async { + final file = File('data.csv'); + + // Each event in the stream is a single row (List). + final List> rows = await file + .openRead() + .transform(utf8.decoder) + .transform(csv.decoder) + .toList(); + + print(rows.first); // e.g. ['Name', 'Age', 'City'] +} +``` + +### Stream Read-Modify-Write Pipeline + +```dart +import 'dart:convert'; +import 'dart:io'; +import 'package:csv/csv.dart'; + +void main() async { + final input = File('input.csv'); + final output = File('output.csv'); + + await input.openRead() + .transform(utf8.decoder) + .transform(csv.decoder) + .map((row) { + row.add('Processed'); + return row; + }) + .transform(csv.encoder) + .transform(utf8.encoder) + .pipe(output.openWrite()); +} +``` + ### Excel Compatible CSV Excel often requires a UTF-8 BOM and `;` as a separator to open files correctly in certain locales. @@ -90,7 +144,7 @@ void main() { import 'package:csv/csv.dart'; void main() { - final myCodec = CsvCodec( + final myCodec = Csv( fieldDelimiter: '\t', lineDelimiter: '\n', quoteMode: QuoteMode.strings, // Only quote strings, not numbers @@ -98,7 +152,7 @@ void main() { ); final encoded = myCodec.encode([['a', 1, true], ['b', 2.5, false]]); - // Output: "a",1,true\n"b",2.5,false + // Output: "a"\t1\ttrue\n"b"\t2.5\tfalse } ``` @@ -116,7 +170,7 @@ void main() { final input = 'Name,Age,Active\nAlice,30,true'; // With dynamic typing enabled - final codec = CsvCodec(dynamicTyping: true); + final codec = Csv(dynamicTyping: true); final rows = codec.decode(input); print(rows[0][1].runtimeType); // int (30) @@ -132,7 +186,7 @@ You can use the `encoderTransform` and `decoderTransform` hooks to process field import 'package:csv/csv.dart'; void main() { - final customCodec = CsvCodec( + final customCodec = Csv( fieldDelimiter: ';', parseHeaders: true, // Required if you want 'header' name in the transform decoderTransform: (value, index, header) { @@ -184,9 +238,9 @@ void main() { print(rowsWithHeaders[0][1]); // Alice - // Method 2: Using CsvCodec(parseHeaders: true) + // Method 2: Using Csv(parseHeaders: true) // This requires casting the returned rows to CsvRow manually. - final codec = CsvCodec(parseHeaders: true); + final codec = Csv(parseHeaders: true); final dynamicRows = codec.decode(fileContents); @@ -198,55 +252,9 @@ void main() { } ``` -### Stream Transformation (Read-Modify-Write) - -You can use `fuse` to combine the encoder and decoder, or simply chain transformations to process large files efficiently. - -```dart -import 'dart:convert'; -import 'dart:io'; -import 'package:csv/csv.dart'; - -void main() async { - final input = File('input.csv'); - final output = File('output.csv'); - - await input.openRead() - .transform(utf8.decoder) - .transform(csv.decoder) - .map((row) { - // Modify the row - row.add('Processed'); - return row; - }) - .transform(csv.encoder) - .transform(utf8.encoder) - .pipe(output.openWrite()); -} -``` - -### Fusing Codecs - -You can also fuse the `csv.encoder` and `csv.decoder` (or any other compatible codecs) to create a new codec. - -```dart -import 'dart:convert'; -import 'package:csv/csv.dart'; - -void main() { - // Create a codec that converts List -> String -> List - // Ideally this is an identity transformation (Round Trip). - final fused = csv.encoder.fuse(csv.decoder); - - final data = [['a', 'b'], ['c', 'd']]; - final result = fused.convert(data); - print(result); // [['a', 'b'], ['c', 'd']] -} -``` - -### Advanced Fusing: Processing Pipeline +### Using `asCodec()` for Fusing -You can create a `Codec` that reads a CSV string, processes the data, and outputs a new CSV string by fusing the decoder, a custom processor, and the encoder. +If you need a `dart:convert` `Codec` — for example, to use `.fuse()` — call `asCodec()`: ```dart import 'dart:convert'; @@ -263,14 +271,9 @@ class AddColumnConverter extends Converter>, List List -> Modified List -> CSV String - - // Let's create a "Processing Codec" that takes String and returns String (CSV -> CSV) - // We start with the decoder (String -> List) - // Fuse with processor (List -> List) - // Fuse with encoder (List -> String) - - final sanitizingCodec = csv.decoder.fuse(processor).fuse(csv.encoder); + // Use asCodec() to get a dart:convert Codec, then fuse. + final codec = csv.asCodec(); + final sanitizingCodec = codec.decoder.fuse(processor).fuse(codec.encoder); final inputCsv = 'Name,Age\nAlice,30'; final outputCsv = sanitizingCodec.convert(inputCsv); @@ -282,7 +285,6 @@ void main() { } ``` - ## PapaParse Features This library incorporates many good ideas from PapaParse, such as: @@ -298,5 +300,79 @@ Add this to your `pubspec.yaml`: ```yaml dependencies: - csv: ^7.0.0 + csv: ^8.0.0 ``` + +## The Codec Problem & `asCodec()` + +This section explains why `Csv` does **not** extend `dart:convert`'s `Codec`, +and what `asCodec()` provides as an escape hatch. + +### Background + +Dart's `Codec` provides two converters: + +| Component | Type | +|-----------|-------------------| +| `decoder` | `Converter` | +| `encoder` | `Converter` | + +A `Converter` has two roles: + +1. **Batch**: `T convert(S input)` — convert an entire input at once. +2. **Stream**: `Stream bind(Stream)` — transform a stream, where each event is of type `T`. + +The type parameter `T` must serve **both** roles. For most codecs this is fine. +For example, `utf8` is a `Codec>`: batch conversion produces a +`List`, and each stream event is also a `List` (a chunk of bytes). +Concatenating chunks of bytes is natural. + +### Why CSV breaks this + +For a CSV decoder, the two roles need **different types**: + +| Role | Needs | +|---|---| +| Batch (`convert()`) | Returns `List>` — all rows at once | +| Stream (each event) | Should be `List` — one row at a time | + +There is no single type `T` that works for both. If we use +`Converter>>` (as version 7 did), then: + +- `convert("a,b\nc,d")` → `[['a','b'], ['c','d']]` ✓ correct +- `stream.transform(decoder).toList()` → `List>>` ✗ **extra nesting!** + +Each stream event is a *batch* of rows, and `.toList()` collects those batches +into yet another list — the "extra `[]`" reported in [#77](https://github.com/close2/csv/issues/77). + +This is a fundamental limitation of `dart:convert`'s type system, not a bug in +this library. Versions 3.2 through 6 worked around it by not being a `Codec` at +all (see [commit 235d898](https://github.com/close2/csv/commit/235d89854b86ef9f2e2e864d138ce96ab38b4a0d)). +Version 7 reintroduced `Codec` and the problem came back. + +### The solution in version 8 + +Version 8 takes a hybrid approach: + +- **By default**, `CsvDecoder` and `CsvEncoder` are `StreamTransformerBase` + subclasses. Stream usage works correctly — one row per event. +- **`asCodec()`** returns a real `Codec>, String>` adapter for + when you need `.fuse()` or other `Codec`-specific APIs. The returned codec + wraps the same parsing/encoding logic, but follows Dart's `Converter` type + contract (with the inherent stream nesting trade-off). + +```dart +// Default: stream works correctly +final rows = await file.openRead() + .transform(utf8.decoder) + .transform(csv.decoder) + .toList(); +// rows is List> ✓ + +// asCodec(): for fuse() and other Codec APIs +final fused = csv.asCodec().decoder.fuse(someConverter); +``` + +If you use `asCodec().decoder` as a stream transformer, you will get the extra +nesting — that is inherent to Dart's `Converter` type contract and cannot be +avoided. Use `csv.decoder` directly for streams. diff --git a/benchmark/benchmark_csv.dart b/benchmark/benchmark_csv.dart index 4907957..a009071 100644 --- a/benchmark/benchmark_csv.dart +++ b/benchmark/benchmark_csv.dart @@ -5,20 +5,20 @@ void main() async { print('--- CSV Benchmark ---'); await runBenchmark('Default CSV', csv); - await runBenchmark('Dynamic Typing CSV', CsvCodec(dynamicTyping: true)); + await runBenchmark('Dynamic Typing CSV', Csv(dynamicTyping: true)); await runBenchmark('Excel CSV', excel); - await runBenchmark('Tab CSV', CsvCodec(fieldDelimiter: '\t')); + await runBenchmark('Tab CSV', Csv(fieldDelimiter: '\t')); await runFuseBenchmark('Fused Codec (Round Trip)', csv); } -Future runBenchmark(String name, CsvCodec codec) async { +Future runBenchmark(String name, Csv codec) async { print('\n--- $name ---'); const targetSizeBytes = 100 * 1024 * 1024; // 100 MB const chunkSize = 1000; - final sampleRow = [ + final sampleRow = [ 'field1', 12345, 12.345, 'This is a slightly longer field.', 'Field with "quotes" and , commas', @@ -26,30 +26,28 @@ Future runBenchmark(String name, CsvCodec codec) async { ]; final estimatedRowSize = codec.encode([sampleRow]).length; - final totalRows = (targetSizeBytes / estimatedRowSize).floor(); + final totalRows = (targetSizeBytes ~/ estimatedRowSize); - // Encoding + // Encoding — stream of individual rows final encodeStopwatch = Stopwatch()..start(); var encodedBytes = 0; - final encodeController = StreamController>>(); + final encodeController = StreamController>(); final encodingStream = encodeController.stream.transform(codec.encoder); final encodingFuture = encodingStream.listen((data) => encodedBytes += data.length).asFuture(); - for (var i = 0; i < totalRows; i += chunkSize) { - final nextChunkRows = (totalRows - i) > chunkSize ? chunkSize : (totalRows - i); - final chunk = List.generate(nextChunkRows, (_) => sampleRow); - encodeController.add(chunk); + for (var i = 0; i < totalRows; i++) { + encodeController.add(sampleRow); } await encodeController.close(); await encodingFuture; encodeStopwatch.stop(); - // Decoding + // Decoding — stream emits individual rows final decodeStopwatch = Stopwatch()..start(); var decodedRows = 0; final decodeController = StreamController(); final decodingStream = decodeController.stream.transform(codec.decoder); - final decodingFuture = decodingStream.listen((chunk) => decodedRows += chunk.length).asFuture(); + final decodingFuture = decodingStream.listen((_) => decodedRows++).asFuture(); for (var i = 0; i < totalRows; i += chunkSize) { final nextChunkRows = (totalRows - i) > chunkSize ? chunkSize : (totalRows - i); @@ -69,21 +67,17 @@ Future runBenchmark(String name, CsvCodec codec) async { print(' - Dec: ${(mb / (decTime / 1000)).toStringAsFixed(2)} MB/s ($decTime ms)'); } -Future runFuseBenchmark(String name, CsvCodec codec) async { +Future runFuseBenchmark(String name, Csv codec) async { print('\n--- $name ---'); - // Fuse encoder and decoder: List -> String -> List - // Note: CsvCodec is Codec, String>. - // codec.encoder is Converter, String>. - // codec.decoder is Converter>. - // fused = codec.encoder.fuse(codec.decoder); // Converter, List> - - final fused = codec.encoder.fuse(codec.decoder); + // Use asCodec() to get a dart:convert Codec for fusing. + final dartCodec = codec.asCodec(); + final fused = dartCodec.encoder.fuse(dartCodec.decoder); const targetSizeBytes = 50 * 1024 * 1024; // 50 MB (smaller for round-trip) const chunkSize = 1000; - final sampleRow = [ + final sampleRow = [ 'field1', 12345, 12.345, 'This is a slightly longer field.', 'Field with "quotes" and , commas', @@ -91,24 +85,18 @@ Future runFuseBenchmark(String name, CsvCodec codec) async { ]; final estimatedRowSize = codec.encode([sampleRow]).length; - final totalRows = (targetSizeBytes / estimatedRowSize).floor(); + final totalRows = (targetSizeBytes ~/ estimatedRowSize); final stopwatch = Stopwatch()..start(); var processedRows = 0; - final controller = StreamController>>(); - final stream = controller.stream.transform(fused); - final future = stream.listen((chunk) { - processedRows += chunk.length; - }).asFuture(); - + // fused.convert() works batch-style on List> for (var i = 0; i < totalRows; i += chunkSize) { final nextChunkRows = (totalRows - i) > chunkSize ? chunkSize : (totalRows - i); final chunk = List.generate(nextChunkRows, (_) => sampleRow); - controller.add(chunk); + final result = fused.convert(chunk); + processedRows += result.length; } - await controller.close(); - await future; stopwatch.stop(); if (processedRows != totalRows) { diff --git a/example/csv_fuse_example.dart b/example/csv_fuse_example.dart index 68486a2..ff828e2 100644 --- a/example/csv_fuse_example.dart +++ b/example/csv_fuse_example.dart @@ -13,13 +13,9 @@ void main() { final processor = AddColumnConverter(); // Create a pipeline: CSV String -> List -> Modified List -> CSV String - - // Let's create a "Processing Codec" that takes String and returns String (CSV -> CSV) - // We start with the decoder (String -> List) - // Fuse with processor (List -> List) - // Fuse with encoder (List -> String) - - final sanitizingCodec = csv.decoder.fuse(processor).fuse(csv.encoder); + // Use asCodec() to get a dart:convert Codec for fusing. + final codec = csv.asCodec(); + final sanitizingCodec = codec.decoder.fuse(processor).fuse(codec.encoder); final inputCsv = 'Name,Age\nAlice,30'; final outputCsv = sanitizingCodec.convert(inputCsv); @@ -28,4 +24,4 @@ void main() { // Output: // Name,Age,Processed // Alice,30,Processed -} \ No newline at end of file +} diff --git a/lib/csv.dart b/lib/csv.dart index 4367453..cfcd9bd 100644 --- a/lib/csv.dart +++ b/lib/csv.dart @@ -9,8 +9,8 @@ export 'src/csv_row.dart'; import 'src/csv_codec.dart'; -/// A default CSV codec. -final CsvCodec csv = CsvCodec(); +/// A default CSV instance with standard settings. +final Csv csv = Csv(); -/// A CSV codec configured for Excel. -final CsvCodec excel = CsvCodec.excel(); +/// A CSV instance configured for Excel. +final Csv excel = Csv.excel(); diff --git a/lib/src/csv_codec.dart b/lib/src/csv_codec.dart index 629ba7f..f006896 100644 --- a/lib/src/csv_codec.dart +++ b/lib/src/csv_codec.dart @@ -4,12 +4,31 @@ import 'csv_decoder.dart'; import 'quote_mode.dart'; import 'csv_row.dart'; -/// A [Codec] for CSV data. -class CsvCodec extends Codec>, String> { +/// A CSV codec that provides encoding and decoding of CSV data. +/// +/// This class does **not** extend `dart:convert`'s [Codec]. +/// Dart's [Codec]/[Converter] type system requires the same type `T` for +/// both batch conversion and stream events, which is incompatible with CSV +/// where batch conversion returns `List>` but stream events +/// should be individual rows (`List`). +/// +/// For stream usage, use [decoder] and [encoder] directly with +/// `Stream.transform()`: +/// ```dart +/// final rows = await file.openRead() +/// .transform(utf8.decoder) +/// .transform(csv.decoder) +/// .toList(); // List> — each element is one row +/// ``` +/// +/// If you need a `dart:convert` [Codec] (e.g., for [Codec.fuse]), +/// use [asCodec]. Be aware that the codec's stream behavior wraps rows +/// in an extra list layer (see [asCodec] documentation). +class Csv { final CsvEncoder _encoder; final CsvDecoder _decoder; - /// Creates a [CsvCodec] with the given parameters. + /// Creates a [Csv] instance with the given parameters. /// /// [fieldDelimiter]: The separator between fields (default: ','). /// [lineDelimiter]: The separator between lines (default: '\r\n'). @@ -23,7 +42,7 @@ class CsvCodec extends Codec>, String> { /// [encoderTransform]: A function to transform fields before encoding. /// [decoderTransform]: A function to transform fields after decoding. /// [dynamicTyping]: Whether to automatically parse numbers and booleans (default: false). - CsvCodec({ + Csv({ String fieldDelimiter = ',', String lineDelimiter = '\r\n', String quoteCharacter = '"', @@ -55,17 +74,30 @@ class CsvCodec extends Codec>, String> { dynamicTyping: dynamicTyping, ); - /// Creates a [CsvCodec] configured for Excel. + /// Creates a [Csv] instance configured for Excel. /// /// This uses ';' as a field delimiter and adds a UTF-8 BOM. - CsvCodec.excel() : this(fieldDelimiter: ';', addBom: true, autoDetect: false); + Csv.excel() : this(fieldDelimiter: ';', addBom: true, autoDetect: false); - @override + /// The CSV encoder. + /// + /// Can be used as a [StreamTransformer] where each input event is a + /// single row (`List`) and each output event is a CSV string + /// fragment. CsvEncoder get encoder => _encoder; - @override + /// The CSV decoder. + /// + /// Can be used as a [StreamTransformer] where each input event is a + /// string chunk and each output event is a single row (`List`). CsvDecoder get decoder => _decoder; + /// Encodes [rows] into a CSV string. + String encode(List> rows) => _encoder.convert(rows); + + /// Decodes a CSV [input] string into a list of rows. + List> decode(String input) => _decoder.convert(input); + /// Decodes the given [encoded] CSV string into a list of [CsvRow]s. /// /// This automatically uses header parsing, returning a properly typed @@ -86,4 +118,98 @@ class CsvCodec extends Codec>, String> { ); return decoder.convert(encoded).cast(); } + + /// Returns a `dart:convert` [Codec] adapter for this CSV codec. + /// + /// This is useful when you need to use APIs that require a [Codec], + /// such as [Codec.fuse]. + /// + /// **Important**: The returned codec's [Converter] types follow Dart's + /// `Converter>>` contract. When used as a + /// [StreamTransformer], each stream event will be a `List>` + /// (a batch of rows), **not** a single row. This means + /// `stream.transform(codec.decoder).toList()` produces + /// `List>>` — an extra layer of nesting. + /// + /// For stream usage, prefer using [decoder] and [encoder] directly. + /// + /// Example: + /// ```dart + /// // Fuse with another codec + /// final fused = csv.asCodec().decoder.fuse(someConverter); + /// ``` + Codec>, String> asCodec() => _CsvCodecAdapter(this); +} + +/// Deprecated: Use [Csv] instead. +@Deprecated('Renamed to Csv. Will be removed in a future version.') +typedef CsvCodec = Csv; + +/// A `dart:convert` [Codec] adapter that wraps a [Csv]. +/// +/// This provides the standard [Codec] interface with proper types, +/// at the cost of the stream nesting issue inherent in Dart's type system. +class _CsvCodecAdapter extends Codec>, String> { + final Csv _csv; + + _CsvCodecAdapter(this._csv); + + @override + Converter>> get decoder => + _CodecDecoderAdapter(_csv._decoder); + + @override + Converter>, String> get encoder => + _CodecEncoderAdapter(_csv._encoder); +} + +/// Wraps a [CsvDecoder] as a `Converter>>`. +class _CodecDecoderAdapter extends Converter>> { + final CsvDecoder _decoder; + + _CodecDecoderAdapter(this._decoder); + + @override + List> convert(String input) => _decoder.convert(input); + + @override + Sink startChunkedConversion(Sink>> sink) { + // Wrap the batch sink so individual rows are collected into batches. + return _decoder.startChunkedConversion(_BatchingSink(sink)); + } +} + +/// Wraps a [CsvEncoder] as a `Converter>, String>`. +class _CodecEncoderAdapter extends Converter>, String> { + final CsvEncoder _encoder; + + _CodecEncoderAdapter(this._encoder); + + @override + String convert(List> input) => _encoder.convert(input); +} + +/// A sink adapter that collects individual rows into batches before +/// forwarding them to a `Sink>>`. +/// +/// This bridges the gap between the row-by-row output of [CsvDecoder] +/// and the batch-oriented `Converter` sink interface. +class _BatchingSink implements Sink> { + final Sink>> _target; + final _batch = >[]; + + _BatchingSink(this._target); + + @override + void add(List row) { + _batch.add(row); + } + + @override + void close() { + if (_batch.isNotEmpty) { + _target.add(_batch.toList()); + } + _target.close(); + } } diff --git a/lib/src/csv_decoder.dart b/lib/src/csv_decoder.dart index 3d88547..7171519 100644 --- a/lib/src/csv_decoder.dart +++ b/lib/src/csv_decoder.dart @@ -1,8 +1,16 @@ +import 'dart:async'; import 'dart:convert'; import 'csv_row.dart'; -/// A converter that converts a CSV string into a [List>]. -class CsvDecoder extends Converter>> { +/// A stream transformer and batch converter that decodes CSV strings +/// into rows of `List`. +/// +/// When used as a [StreamTransformer] (e.g., with `Stream.transform()`), +/// each stream event is a single row (`List`). +/// +/// The [convert] method accepts a full CSV string and returns all rows +/// as a `List>`. +class CsvDecoder extends StreamTransformerBase> { /// The separator between fields. If null, it will be auto-detected. final String? fieldDelimiter; @@ -49,26 +57,31 @@ class CsvDecoder extends Converter>> { 'escapeCharacter must be a single character', ); - @override + /// Converts a CSV [input] string into a list of rows. + /// + /// Each row is a `List`. If [parseHeaders] is true, + /// each row (except the header) is a [CsvRow]. List> convert(String input) { if (input.isEmpty) return []; final output = >[]; - final outSink = ChunkedConversionSink>>.withCallback(( - result, - ) { - for (var chunk in result) { - output.addAll(chunk); - } - }); - final sink = startChunkedConversion(outSink); + final outSink = _CollectorSink>(output); + final sink = _createSink(outSink); sink.add(input); sink.close(); return output; } - @override - StringConversionSink startChunkedConversion(Sink>> sink) { + /// Creates a chunked conversion sink that writes decoded rows + /// into [sink]. + /// + /// Each row added to [sink] is a single `List`. + /// This is used internally by [bind] for stream transformation. + StringConversionSink startChunkedConversion(Sink> sink) { + return _createSink(sink); + } + + StringConversionSink _createSink(Sink> sink) { return _CsvDecoderSink( sink, fieldDelimiter, @@ -80,10 +93,56 @@ class CsvDecoder extends Converter>> { dynamicTyping, ); } + + @override + Stream> bind(Stream stream) { + return Stream>.eventTransformed( + stream, + (EventSink> sink) => _DecoderEventSink(this, sink), + ); + } +} + +/// An [EventSink] adapter that bridges [CsvDecoder]'s chunked conversion +/// to the stream event model, emitting individual rows. +class _DecoderEventSink implements EventSink { + final EventSink> _eventSink; + final StringConversionSink _chunkedSink; + + _DecoderEventSink(CsvDecoder decoder, EventSink> sink) + : _eventSink = sink, + _chunkedSink = decoder.startChunkedConversion(sink); + + @override + void add(String event) { + _chunkedSink.add(event); + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + _eventSink.addError(error, stackTrace); + } + + @override + void close() { + _chunkedSink.close(); + } +} + +/// A simple sink that collects items into a list. +class _CollectorSink implements Sink { + final List _target; + _CollectorSink(this._target); + + @override + void add(T data) => _target.add(data); + + @override + void close() {} } class _CsvDecoderSink extends StringConversionSink { - final Sink>> _outSink; + final Sink> _outSink; final String? _presetDelimiter; final String _quoteCharacter; final String? _escapeCharacter; @@ -256,7 +315,6 @@ class _CsvDecoderSink extends StringConversionSink { if (end <= start) return; - final results = >[]; final actualEscapeChar = _escapeCharacter ?? _quoteCharacter; final delim = _delimiter!; @@ -365,7 +423,7 @@ class _CsvDecoderSink extends StringConversionSink { _currentRow.add(_transform(_buffer.toString())); _fieldIndex++; _buffer.clear(); - _finalizeRow(results); + _finalizeRow(); if (i + 1 < end && charCode == crCode && chunk.codeUnitAt(i + 1) == nlCode) { @@ -382,10 +440,6 @@ class _CsvDecoderSink extends StringConversionSink { _buffer.write(chunk.substring(anchor, end)); } - if (results.isNotEmpty) { - _outSink.add(results); - } - if (isLast) close(); } @@ -421,9 +475,9 @@ class _CsvDecoderSink extends StringConversionSink { return offset; } - /// Finalizes the current row, adds it to [results], and clears `_currentRow`. + /// Finalizes the current row, emits it to the output sink, and resets state. /// Handles header parsing and [CsvRow] conversion if enabled. - void _finalizeRow(List> results) { + void _finalizeRow() { if (!_skipEmptyLines || _currentRow.any((e) => e != '')) { if (_parseHeaders && _headers == null) { // First row is the header row. @@ -436,7 +490,7 @@ class _CsvDecoderSink extends StringConversionSink { final rowToAdd = _headers != null ? CsvRow(_currentRow, _headers!) : _currentRow; - results.add(rowToAdd); + _outSink.add(rowToAdd); } } _currentRow = []; @@ -494,11 +548,7 @@ class _CsvDecoderSink extends StringConversionSink { if (_currentRow.isNotEmpty || _buffer.isNotEmpty) { _currentRow.add(_transform(_buffer.toString())); _fieldIndex++; - final results = >[]; - _finalizeRow(results); - if (results.isNotEmpty) { - _outSink.add(results); - } + _finalizeRow(); _buffer.clear(); } } diff --git a/lib/src/csv_encoder.dart b/lib/src/csv_encoder.dart index ff2cf70..8c83c0a 100644 --- a/lib/src/csv_encoder.dart +++ b/lib/src/csv_encoder.dart @@ -1,9 +1,16 @@ -import 'dart:convert'; +import 'dart:async'; import 'quote_mode.dart'; import 'csv_row.dart'; -/// A converter that converts a [List>] into a CSV string. -class CsvEncoder extends Converter>, String> { +/// A stream transformer and batch converter that encodes rows into CSV strings. +/// +/// When used as a [StreamTransformer] (e.g., with `Stream.transform()`), +/// each incoming stream event should be a single row (`List`), +/// and each outgoing event is a CSV string fragment. +/// +/// The [convert] method accepts all rows at once as a `List>` +/// and returns the full CSV string. +class CsvEncoder extends StreamTransformerBase, String> { /// The separator between fields. final String fieldDelimiter; @@ -43,33 +50,52 @@ class CsvEncoder extends Converter>, String> { this.fieldTransform, }) : escapeCharacter = escapeCharacter ?? quoteCharacter; - @override - String convert(List> input) { - if (input.isEmpty) return addBom ? '\ufeff' : ''; + /// Converts all [rows] into a CSV string. + String convert(List> rows) { + if (rows.isEmpty) return addBom ? '\ufeff' : ''; - final output = []; - final outSink = ChunkedConversionSink.withCallback( - (result) => output.addAll(result), - ); - final sink = startChunkedConversion(outSink); - sink.add(input); - sink.close(); - return output.join(); + final buffer = StringBuffer(); + if (addBom) { + buffer.write('\ufeff'); + } + + for (var i = 0; i < rows.length; i++) { + final row = rows[i]; + _writeRow(buffer, row); + if (i < rows.length - 1) { + buffer.write(lineDelimiter); + } + } + return buffer.toString(); + } + + void _writeRow(StringBuffer buffer, List row) { + final isCsvRow = row is CsvRow; + for (var j = 0; j < row.length; j++) { + if (j > 0) { + buffer.write(fieldDelimiter); + } + final String? header = isCsvRow ? row.getHeaderName(j) : null; + buffer.write( + encodeField( + row[j], + fieldDelimiter, + quoteCharacter, + escapeCharacter, + quoteMode, + fieldTransform, + j, + header, + ), + ); + } } @override - ChunkedConversionSink>> startChunkedConversion( - Sink sink, - ) { - return _CsvEncoderSink( - sink, - fieldDelimiter, - lineDelimiter, - quoteCharacter, - escapeCharacter, - quoteMode, - addBom, - fieldTransform, + Stream bind(Stream> stream) { + return Stream.eventTransformed( + stream, + (EventSink sink) => _EncoderEventSink(this, sink), ); } @@ -130,72 +156,37 @@ class CsvEncoder extends Converter>, String> { } } -class _CsvEncoderSink implements ChunkedConversionSink>> { - final Sink _outSink; - final String _fieldDelimiter; - final String _lineDelimiter; - final String _quoteCharacter; - final String _escapeCharacter; - final QuoteMode _quoteMode; - final bool _addBom; - final dynamic Function(dynamic field, int index, String? header)? _fieldTransform; - bool _isFirstChunk = true; - - _CsvEncoderSink( - this._outSink, - this._fieldDelimiter, - this._lineDelimiter, - this._quoteCharacter, - this._escapeCharacter, - this._quoteMode, - this._addBom, - this._fieldTransform, - ); +/// An [EventSink] adapter for the encoder that accepts individual rows +/// and outputs CSV string fragments. +class _EncoderEventSink implements EventSink> { + final CsvEncoder _encoder; + final EventSink _eventSink; + bool _isFirstRow = true; - @override - void add(List> chunk) { - if (chunk.isEmpty) return; + _EncoderEventSink(this._encoder, this._eventSink); + @override + void add(List row) { final buffer = StringBuffer(); - if (_isFirstChunk) { - if (_addBom) { + if (_isFirstRow) { + if (_encoder.addBom) { buffer.write('\ufeff'); } - _isFirstChunk = false; + _isFirstRow = false; } else { - buffer.write(_lineDelimiter); + buffer.write(_encoder.lineDelimiter); } + _encoder._writeRow(buffer, row); + _eventSink.add(buffer.toString()); + } - for (var i = 0; i < chunk.length; i++) { - final row = chunk[i]; - final isCsvRow = row is CsvRow; - for (var j = 0; j < row.length; j++) { - if (j > 0) { - buffer.write(_fieldDelimiter); - } - final String? header = isCsvRow ? row.getHeaderName(j) : null; - buffer.write( - CsvEncoder.encodeField( - row[j], - _fieldDelimiter, - _quoteCharacter, - _escapeCharacter, - _quoteMode, - _fieldTransform, - j, - header, - ), - ); - } - if (i < chunk.length - 1) { - buffer.write(_lineDelimiter); - } - } - _outSink.add(buffer.toString()); + @override + void addError(Object error, [StackTrace? stackTrace]) { + _eventSink.addError(error, stackTrace); } @override void close() { - _outSink.close(); + _eventSink.close(); } } diff --git a/pubspec.yaml b/pubspec.yaml index 2f1100e..d812063 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,8 @@ name: csv -version: 7.2.0 +version: 8.0.0 description: |- - A codec to transform between a string and a list of values. + A high-quality CSV library. Encode and decode CSV strings, with + stream support (one row per event), auto-detection, and Excel compatibility. The string must be comma (configurable) separated values. homepage: https://github.com/close2/csv diff --git a/test/csv_test.dart b/test/csv_test.dart index d78c975..9ffdc71 100644 --- a/test/csv_test.dart +++ b/test/csv_test.dart @@ -198,29 +198,23 @@ void main() { group('Chunked Conversion', () { test('Chunked decoding: split row', () async { final input = Stream.fromIterable(['A,B,C\nd', ',e,f']); - final result = await input.transform(CsvCodec(fieldDelimiter: ',').decoder).toList(); + final result = await input.transform(Csv(fieldDelimiter: ',').decoder).toList(); expect( result, equals([ - [ - ['A', 'B', 'C'], - ], - [ - ['d', 'e', 'f'], - ], + ['A', 'B', 'C'], + ['d', 'e', 'f'], ]), ); }); test('Chunked decoding: split quoted field', () async { final input = Stream.fromIterable(['A,"B\n', 'B",C']); - final result = await input.transform(CsvCodec(fieldDelimiter: ',').decoder).toList(); + final result = await input.transform(Csv(fieldDelimiter: ',').decoder).toList(); expect( result, equals([ - [ - ['A', 'B\nB', 'C'], - ], + ['A', 'B\nB', 'C'], ]), ); }); @@ -228,13 +222,9 @@ void main() { test('Chunked encoding', () async { final input = Stream.fromIterable( [ - [ - ['A', 'B'], - ], - [ - ['C', 'D'], - ], - ].cast>>(), + ['A', 'B'], + ['C', 'D'], + ].cast>(), ); final result = await input.transform(csv.encoder).join(); expect(result, equals('A,B\r\nC,D')); @@ -243,7 +233,7 @@ void main() { group('Advanced Features', () { test('QuoteMode.strings', () { - final codec = CsvCodec(quoteMode: QuoteMode.strings); + final codec = Csv(quoteMode: QuoteMode.strings); final input = [ [1, "1", true, "true"], ]; @@ -251,7 +241,7 @@ void main() { }); test('QuoteMode.always', () { - final codec = CsvCodec(quoteMode: QuoteMode.always); + final codec = Csv(quoteMode: QuoteMode.always); final input = [ [1, "A"], ]; @@ -279,7 +269,7 @@ void main() { }); test('skipEmptyLines', () { - final codec = CsvCodec(skipEmptyLines: true); + final codec = Csv(skipEmptyLines: true); final input = 'A,B\n\nC,D\n\n'; expect( codec.decode(input), @@ -289,7 +279,7 @@ void main() { ]), ); - final codecNoSkip = CsvCodec(skipEmptyLines: false); + final codecNoSkip = Csv(skipEmptyLines: false); expect(codecNoSkip.decode(input).length, equals(4)); }); @@ -337,7 +327,7 @@ void main() { test('CsvRow and parseHeaders', () { final input = 'id,name\n1,Alice\n2,Bob'; - final codec = CsvCodec(parseHeaders: true); + final codec = Csv(parseHeaders: true); final result = codec.decode(input); expect(result.length, equals(2)); diff --git a/test/multi_char_delim_test.dart b/test/multi_char_delim_test.dart index 7027ef6..bd987e7 100644 --- a/test/multi_char_delim_test.dart +++ b/test/multi_char_delim_test.dart @@ -1,5 +1,3 @@ - -import 'dart:convert'; import 'package:csv/csv.dart'; import 'package:test/test.dart'; @@ -12,13 +10,7 @@ void main() { test('Split multi-char delimiter', () { final output = >[]; - final outSink = ChunkedConversionSink>>.withCallback(( - accumulated, - ) { - for (final rows in accumulated) { - output.addAll(rows); - } - }); + final outSink = _CollectorSink(output); // Delimiter is '::' final decoderSink = CsvDecoder(fieldDelimiter: '::') @@ -33,14 +25,8 @@ void main() { }); test('Split multi-char delimiter after quote', () { - final output = >[]; - final outSink = ChunkedConversionSink>>.withCallback(( - accumulated, - ) { - for (final rows in accumulated) { - output.addAll(rows); - } - }); + final output = >[]; + final outSink = _CollectorSink(output); // Delimiter is '::' final decoderSink = CsvDecoder(fieldDelimiter: '::') @@ -56,3 +42,14 @@ void main() { }); }); } + +class _CollectorSink implements Sink> { + final List> _target; + _CollectorSink(this._target); + + @override + void add(List data) => _target.add(data); + + @override + void close() {} +} diff --git a/test/parse_headers_test.dart b/test/parse_headers_test.dart index e5ed5d3..fb6d388 100644 --- a/test/parse_headers_test.dart +++ b/test/parse_headers_test.dart @@ -4,7 +4,7 @@ import 'package:csv/csv.dart'; void main() { test('decode parseHeaders to CsvRow map-like access', () { final fileContents = 'id,name\n1,Alice\n2,Bob'; - final codec = CsvCodec(parseHeaders: true); + final codec = Csv(parseHeaders: true); final rows = codec.decode(fileContents); expect(rows.length, 2); diff --git a/test/small_chunk_test.dart b/test/small_chunk_test.dart index af6417b..4dfcf2c 100644 --- a/test/small_chunk_test.dart +++ b/test/small_chunk_test.dart @@ -20,7 +20,7 @@ void main() { } await controller.close(); - final result = (await resultFuture).expand((i) => i).toList(); + final result = await resultFuture; expect(result, [['a', 'b', 'c']]); }); @@ -39,7 +39,7 @@ void main() { } await controller.close(); - final result = (await resultFuture).expand((i) => i).toList(); + final result = await resultFuture; expect(result, [['a', 'b', 'c'], ['1', '2', '3']]); }); } diff --git a/test/split_crlf_test.dart b/test/split_crlf_test.dart index bd6abd1..1a55967 100644 --- a/test/split_crlf_test.dart +++ b/test/split_crlf_test.dart @@ -1,5 +1,3 @@ - -import 'dart:convert'; import 'package:csv/csv.dart'; import 'package:test/test.dart'; @@ -7,13 +5,7 @@ void main() { group('Split CRLF', () { test('Split CRLF', () { final output = >[]; - final outSink = ChunkedConversionSink>>.withCallback(( - accumulated, - ) { - for (final rows in accumulated) { - output.addAll(rows); - } - }); + final outSink = _CollectorSink(output); final decoderSink = CsvDecoder(fieldDelimiter: ',').startChunkedConversion(outSink); @@ -27,13 +19,7 @@ void main() { test('Split CRLF with skipEmptyLines: false', () { final output = >[]; - final outSink = ChunkedConversionSink>>.withCallback(( - accumulated, - ) { - for (final rows in accumulated) { - output.addAll(rows); - } - }); + final outSink = _CollectorSink(output); final decoderSink = CsvDecoder(fieldDelimiter: ',', skipEmptyLines: false).startChunkedConversion(outSink); @@ -46,3 +32,14 @@ void main() { }); }); } + +class _CollectorSink implements Sink> { + final List> _target; + _CollectorSink(this._target); + + @override + void add(List data) => _target.add(data); + + @override + void close() {} +} diff --git a/test/split_escape_test.dart b/test/split_escape_test.dart index d68a127..6bb1b6e 100644 --- a/test/split_escape_test.dart +++ b/test/split_escape_test.dart @@ -1,5 +1,3 @@ - -import 'dart:convert'; import 'package:csv/csv.dart'; import 'package:test/test.dart'; @@ -31,13 +29,7 @@ void _verifySplit(String input, List> expected, final chunk2 = input.substring(i); final output = >[]; - final outSink = ChunkedConversionSink>>.withCallback(( - accumulated, - ) { - for (final rows in accumulated) { - output.addAll(rows); - } - }); + final outSink = _CollectorSink(output); // IMPORTANT: Set fieldDelimiter to prevent auto-detection buffering final decoderSink = CsvDecoder( @@ -58,3 +50,14 @@ void _verifySplit(String input, List> expected, ); } } + +class _CollectorSink implements Sink> { + final List> _target; + _CollectorSink(this._target); + + @override + void add(List data) => _target.add(data); + + @override + void close() {} +}