Skip to content

Database

cifflow.database.compact

convert_database — one-way export that casts DuckDB columns to typed storage.

convert_database(src, dst, schema, on_coercion_failure='null')

Copy src into dst, casting columns to typed DuckDB storage.

All columns in the source database are stored as VARCHAR (the ingest layer never writes typed values). This function creates the destination tables with proper INTEGER / DOUBLE / VARCHAR types and casts each value on the way across.

Casting is performed entirely inside DuckDB's SQL engine via TRY_CAST, regexp_replace, and from_json / to_json for JSON container columns. Destination tables are created without NOT NULL or PRIMARY KEY constraints; all SQL joins and queries work normally.

Parameters:

Name Type Description Default
src DuckDBPyConnection

Source DuckDB connection populated by ingest().

required
dst DuckDBPyConnection

Destination DuckDB connection (must be empty).

required
schema 'SchemaSpec'

SchemaSpec used when src was populated.

required
on_coercion_failure Literal['null', 'keep', 'error']

'null' (default) — failed cast → NULL via TRY_CAST. 'keep' — same as 'null' (typed columns cannot store non-castable strings; stored as NULL). 'error' — raise ValueError on first failure.

'null'

Returns:

Type Description
list[str]

Warning messages: SU-dropped values and coercion failures (null/keep policy only — error policy raises instead of returning).

Raises:

Type Description
ValueError

When on_coercion_failure='error' and a value cannot be cast to the target type.

Exception

DDL or data-transfer failures propagate directly after rolling back the destination transaction.

Source code in src/cifflow/database/compact.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def convert_database(
    src: duckdb.DuckDBPyConnection,
    dst: duckdb.DuckDBPyConnection,
    schema: 'SchemaSpec',
    on_coercion_failure: Literal['null', 'keep', 'error'] = 'null',
) -> list[str]:
    """Copy *src* into *dst*, casting columns to typed DuckDB storage.

    All columns in the source database are stored as ``VARCHAR`` (the ingest
    layer never writes typed values).  This function creates the destination
    tables with proper ``INTEGER`` / ``DOUBLE`` / ``VARCHAR`` types and casts
    each value on the way across.

    Casting is performed entirely inside DuckDB's SQL engine via ``TRY_CAST``,
    ``regexp_replace``, and ``from_json`` / ``to_json`` for JSON container
    columns.  Destination tables are created without ``NOT NULL`` or
    ``PRIMARY KEY`` constraints; all SQL joins and queries work normally.

    Parameters
    ----------
    src:
        Source DuckDB connection populated by ``ingest()``.
    dst:
        Destination DuckDB connection (must be empty).
    schema:
        ``SchemaSpec`` used when *src* was populated.
    on_coercion_failure:
        ``'null'`` (default) — failed cast → NULL via ``TRY_CAST``.
        ``'keep'``           — same as ``'null'`` (typed columns cannot store
                              non-castable strings; stored as NULL).
        ``'error'``          — raise ``ValueError`` on first failure.

    Returns
    -------
    list[str]
        Warning messages: SU-dropped values and coercion failures (null/keep
        policy only — error policy raises instead of returning).

    Raises
    ------
    ValueError
        When ``on_coercion_failure='error'`` and a value cannot be cast to
        the target type.
    Exception
        DDL or data-transfer failures propagate directly after rolling back
        the destination transaction.
    """
    messages: list[str] = []

    # Topological sort: FK parents before children.
    all_tables = set(schema.tables)

    def _topo_order(names: set[str]) -> list[str]:
        deps: dict[str, set[str]] = {t: set() for t in names}
        for t in names:
            for fk in schema.tables[t].foreign_keys:
                if fk.target_table in names:
                    deps[t].add(fk.target_table)
        order: list[str] = []
        seen: set[str] = set()

        def _visit(name: str) -> None:
            if name in seen:
                return
            seen.add(name)
            for parent in sorted(deps[name]):
                _visit(parent)
            order.append(name)

        for name in sorted(names):
            _visit(name)
        return order

    ordered_tables = _topo_order(all_tables)

    # Pre-compute column metadata for every table once.
    table_meta: dict[str, tuple] = {}
    for tbl_name in ordered_tables:
        table = schema.tables[tbl_name]
        cols = [c for c in table.columns if not c.is_synthetic or c.name in _INFRA]
        sql_types  = {c.name: _sql_type_for(c)  for c in cols}
        leaf_types = {c.name: _leaf_sql_type(c) for c in cols}
        table_meta[tbl_name] = (cols, sql_types, leaf_types)

    # Phase 1 — DDL: create all destination tables in one transaction.
    dst.begin()
    try:
        for tbl_name in ordered_tables:
            cols, sql_types, _ = table_meta[tbl_name]
            col_defs = [f'    "{c.name}"  {sql_types[c.name]}' for c in cols]
            dst.execute(
                f'CREATE TABLE "{tbl_name}" (\n'
                + ',\n'.join(col_defs)
                + '\n)'
            )

        # Fallback-tier DDL.
        for tbl in _FALLBACK_TABLES:
            try:
                info_rows = src.execute(
                    "SELECT column_name, is_nullable "
                    "FROM information_schema.columns "
                    "WHERE table_name = ? ORDER BY ordinal_position",
                    [tbl],
                ).fetchall()
            except Exception:
                continue
            if not info_rows:
                continue
            col_ddl = ', '.join(
                f'"{r[0]}" VARCHAR' + ('' if r[1] == 'YES' else ' NOT NULL')
                for r in info_rows
            )
            try:
                dst.execute(f'CREATE TABLE IF NOT EXISTS "{tbl}" ({col_ddl})')
            except Exception:
                pass

        dst.commit()
    except Exception:
        dst.rollback()
        raise

    # Phase 2 — Data: one transaction per populated table, skipping empty ones.
    for tbl_name in ordered_tables:
        if src.execute(f'SELECT 1 FROM "{tbl_name}" LIMIT 1').fetchone() is None:
            continue

        cols, sql_types, leaf_types = table_meta[tbl_name]

        # Detect SU stripping and coercion failures before the transfer.
        messages.extend(_scan_for_su(src, tbl_name, cols, sql_types, leaf_types))
        fail_msgs = _scan_for_cast_failures(src, tbl_name, cols, sql_types)
        if fail_msgs:
            if on_coercion_failure == 'error':
                raise ValueError(fail_msgs[0])
            messages.extend(fail_msgs)

        cast_exprs = [
            _sql_cast_expr(c.name, sql_types[c.name], leaf_types[c.name], 'TRY_CAST')
            for c in cols
        ]
        select_sql = f'SELECT {", ".join(cast_exprs)} FROM "{tbl_name}"'

        dst.begin()
        try:
            _transfer_arrow(src, dst, select_sql, tbl_name)
            dst.commit()
        except Exception:
            dst.rollback()
            raise

    # Fallback-tier data: one transaction per non-empty table.
    for tbl in _FALLBACK_TABLES:
        try:
            cols_fb = [d[0] for d in
                       src.execute(f'SELECT * FROM "{tbl}" LIMIT 0').description]
        except Exception:
            continue
        if src.execute(f'SELECT 1 FROM "{tbl}" LIMIT 1').fetchone() is None:
            continue
        q_cols = ', '.join(f'"{c}"' for c in cols_fb)
        dst.begin()
        try:
            _transfer_arrow(src, dst, f'SELECT {q_cols} FROM "{tbl}"', tbl)
            dst.commit()
        except Exception:
            dst.rollback()

    return messages

cifflow.database.defaults

generate_defaults — fill NULL columns with DDLm-defined defaults.

generate_defaults(connection, schema, max_iterations=32)

Fill NULL columns in an ingested database with DDLm-defined defaults.

Runs a fixed-point loop: each pass fills NULLs using scalar defaults (_enumeration.default) and keyed defaults (_enumeration_defaults lookup tables). The loop repeats until no more rows change or max_iterations is reached, so that a filled index tag can unlock keyed defaults that were previously unresolvable.

This function operates in-place on connection and adds no tracking columns. Call it only after :func:~cifflow.ingestion.ingest and before any round-trip or fidelity work.

Parameters:

Name Type Description Default
connection DuckDBPyConnection

Open DuckDB connection containing the ingested schema tables.

required
schema SchemaSpec

Schema descriptor produced by :func:~cifflow.dictionary.schema.generate_schema.

required
max_iterations int

Maximum number of fill passes before giving up.

32

Returns:

Type Description
int

Total number of cells filled across all passes.

Raises:

Type Description
Exception

If the transaction cannot be started, or if any SQL statement fails (e.g. a schema table is absent from connection). The transaction is rolled back before re-raising.

Source code in src/cifflow/database/defaults.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def generate_defaults(
    connection: duckdb.DuckDBPyConnection,
    schema: SchemaSpec,
    max_iterations: int = 32,
) -> int:
    """Fill NULL columns in an ingested database with DDLm-defined defaults.

    Runs a fixed-point loop: each pass fills NULLs using scalar defaults
    (``_enumeration.default``) and keyed defaults (``_enumeration_defaults``
    lookup tables).  The loop repeats until no more rows change or
    *max_iterations* is reached, so that a filled index tag can unlock
    keyed defaults that were previously unresolvable.

    This function operates in-place on *connection* and adds no tracking
    columns.  Call it only after :func:`~cifflow.ingestion.ingest` and before
    any round-trip or fidelity work.

    Parameters
    ----------
    connection:
        Open DuckDB connection containing the ingested schema tables.
    schema:
        Schema descriptor produced by
        :func:`~cifflow.dictionary.schema.generate_schema`.
    max_iterations:
        Maximum number of fill passes before giving up.

    Returns
    -------
    int
        Total number of cells filled across all passes.

    Raises
    ------
    Exception
        If the transaction cannot be started, or if any SQL statement fails
        (e.g. a schema table is absent from *connection*).  The transaction
        is rolled back before re-raising.
    """
    tag_to_col: dict[str, tuple[str, str]] = {
        v: k for k, v in schema.column_to_tag.items()
    }

    all_ops, keyed_ops = _build_fill_ops(schema, tag_to_col, connection)

    if not all_ops:
        return 0

    # Wrap in one transaction to avoid per-statement WAL flushes on file DBs.
    connection.execute('BEGIN TRANSACTION')
    total_filled = 0
    ok = False
    try:
        for i in range(max_iterations):
            # Scalar defaults are monotone — once applied they never change.
            # After the first pass, only keyed ops need re-running.
            ops = all_ops if i == 0 else keyed_ops
            pass_filled = sum(op(connection) for op in ops)
            total_filled += pass_filled
            if pass_filled == 0:
                break
        ok = True
    except Exception:
        raise
    finally:
        if ok:
            connection.execute('COMMIT')
        else:
            try:
                connection.execute('ROLLBACK')
            except duckdb.Error:
                pass

    return total_filled