Skip to content

Inspect

cifflow.inspect._lexer

inspect_lexer — pretty-print the lexer token stream for a CIF source.

inspect_lexer(source, *, version=None, file=sys.stdout)

Print the full token stream for source to file.

Parameters:

Name Type Description Default
source _Source

CIF source: a raw string, a pathlib.Path, or an open text file object.

required
version Optional[CifVersion]

If None (default), auto-detected from the magic line.

None
file TextIO

Output stream (default sys.stdout).

stdout
Source code in src/cifflow/inspect/_lexer.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def inspect_lexer(
    source: _Source,
    *,
    version: Optional[CifVersion] = None,
    file: TextIO = sys.stdout,
) -> None:
    """Print the full token stream for *source* to *file*.

    Parameters
    ----------
    source:
        CIF source: a raw string, a ``pathlib.Path``, or an open text file object.
    version:
        If None (default), auto-detected from the magic line.
    file:
        Output stream (default ``sys.stdout``).
    """
    source = resolve_source(source)
    if version is None:
        version, remaining, line_offset, v_errors = detect_version(source)
        if v_errors:
            for ve in v_errors:
                print(
                    c(f'[VERSION ERROR] line {ve.line}: {ve.message}', RED, BOLD, file=file),
                    file=file,
                )
    else:
        remaining, line_offset = source, 0

    ver_label = version.value
    print(
        c(f'-- token stream  (CIF {ver_label}) --', BOLD, DIM, file=file),
        file=file,
    )
    print(
        c(
            f"{'line':>5} {'col':>4}  {'token_type':<10}  {'value_type':<22}  value",
            DIM, file=file,
        ),
        file=file,
    )
    print(c('-' * 72, DIM, file=file), file=file)

    for tok in Lexer(remaining, version, line_offset).tokens():
        vtype = tok.value_type.value if tok.value_type else ''
        raw   = repr(tok.value)
        if len(raw) > 50:
            raw = raw[:47] + '…' + raw[-1]

        line_part  = c(f'{tok.line:>5} {tok.column:>4}', DIM, file=file)
        type_part  = c(f'{tok.token_type.value:<10}', CYAN, file=file)
        vtype_part = c(f'{vtype:<22}', BLUE, file=file)
        val_part   = c(raw, GREEN if tok.token_type.value == 'value' else YELLOW, file=file)

        print(f'  {line_part}  {type_part}  {vtype_part}  {val_part}', file=file)

        for err in tok.errors:
            print(
                c(
                    f'         ^ LEX ERROR  col {err.column}: {err.message}',
                    RED, file=file,
                ),
                file=file,
            )

    print(file=file)

cifflow.inspect._parser

inspect_parse + ParseHandler — pretty-print parser events for a CIF source.

ParseHandler

A CifParserEvents implementation that prints every event and error.

Pass an optional inner handler to forward all events after printing.

Parameters:

Name Type Description Default
inner Optional[CifParserEvents]

Optional downstream handler. All events are forwarded to it after being printed.

None
file TextIO

Output stream (default sys.stdout).

stdout
show_values bool

If False, add_value calls are printed as a short summary rather than one line each. Useful for large loop tables. Default True.

True
Source code in src/cifflow/inspect/_parser.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class ParseHandler:
    """A ``CifParserEvents`` implementation that prints every event and error.

    Pass an optional *inner* handler to forward all events after printing.

    Parameters
    ----------
    inner:
        Optional downstream handler.  All events are forwarded to it after
        being printed.
    file:
        Output stream (default ``sys.stdout``).
    show_values:
        If False, ``add_value`` calls are printed as a short summary rather
        than one line each.  Useful for large loop tables.  Default True.
    """

    def __init__(
        self,
        inner: Optional[CifParserEvents] = None,
        *,
        file: TextIO = sys.stdout,
        show_values: bool = True,
    ) -> None:
        self._inner       = inner
        self._file        = file
        self._show_values = show_values
        self._depth       = 0

        print(
            c('-- parser events --', BOLD, DIM, file=self._file),
            file=self._file,
        )

    # -- helpers ---------------------------------------------------------------

    def _indent(self) -> str:
        return '  ' * self._depth

    def _print(self, text: str, colour: str = '') -> None:
        prefix = c(self._indent(), DIM, file=self._file)
        body   = c(text, colour, file=self._file) if colour else text
        print(prefix + body, file=self._file)

    def _fwd(self, name: str, *args, **kwargs) -> None:
        if self._inner is not None:
            getattr(self._inner, name)(*args, **kwargs)

    # -- CifParserEvents -------------------------------------------------------

    def on_data_block(self, name: str) -> None:
        self._depth = 0
        self._print(f'on_data_block({name!r})', BOLD)
        self._depth = 1
        self._fwd('on_data_block', name)

    def on_save_frame_start(self, name: str) -> None:
        self._print(f'on_save_frame_start({name!r})', CYAN)
        self._depth += 1
        self._fwd('on_save_frame_start', name)

    def on_save_frame_end(self) -> None:
        self._depth = max(1, self._depth - 1)
        self._print('on_save_frame_end()', CYAN)
        self._fwd('on_save_frame_end')

    def add_tag(self, tag_name: str) -> None:
        self._print(f'add_tag({tag_name!r})', YELLOW)
        self._fwd('add_tag', tag_name)

    def add_value(self, value: str, value_type: ValueType) -> None:
        if self._show_values:
            raw = repr(value)
            if len(raw) > 60:
                raw = raw[:57] + '…' + raw[-1]
            self._print(f'add_value({raw}, {value_type.value})', GREEN)
        self._fwd('add_value', value, value_type)

    def on_list_start(self) -> None:
        self._print('on_list_start()', MAGENTA)
        self._depth += 1
        self._fwd('on_list_start')

    def on_list_end(self) -> None:
        self._depth = max(0, self._depth - 1)
        self._print('on_list_end()', MAGENTA)
        self._fwd('on_list_end')

    def on_table_start(self) -> None:
        self._print('on_table_start()', MAGENTA)
        self._depth += 1
        self._fwd('on_table_start')

    def on_table_end(self) -> None:
        self._depth = max(0, self._depth - 1)
        self._print('on_table_end()', MAGENTA)
        self._fwd('on_table_end')

    def on_table_key(self, key: str, value_type: ValueType) -> None:
        self._print(f'on_table_key({key!r}, {value_type.value})', BLUE)
        self._fwd('on_table_key', key, value_type)

    def on_loop_start(self, tags: List[str]) -> None:
        self._print(f'on_loop_start({tags!r})', CYAN)
        self._depth += 1
        self._fwd('on_loop_start', tags)

    def on_loop_end(self) -> None:
        self._depth = max(1, self._depth - 1)
        self._print('on_loop_end()', CYAN)
        self._fwd('on_loop_end')

    def on_error(self, error: ParseError) -> None:
        msg = (
            f'[{error.error_type.upper()}] '
            f'line {error.line} col {error.column}: '
            f'{error.message}'
        )
        if error.context:
            msg += f'  (context: {error.context!r})'
        if error.recovery_action:
            msg += f'  -> {error.recovery_action}'
        self._print(msg, RED)
        self._fwd('on_error', error)

inspect_parse(source, *, inner=None, file=sys.stdout, show_values=True, show_tokens=True)

Run the full pipeline and print token stream then parser events.

Parameters:

Name Type Description Default
source _Source

CIF source: a raw string, a pathlib.Path, or an open text file object.

required
inner Optional[CifParserEvents]

Optional downstream handler to receive all events.

None
file TextIO

Output stream (default sys.stdout).

stdout
show_values bool

Forward to ParseHandler; set False to suppress add_value lines for large files.

True
show_tokens bool

If True (default), also print the lexer token stream before events.

True
Source code in src/cifflow/inspect/_parser.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def inspect_parse(
    source: _Source,
    *,
    inner: Optional[CifParserEvents] = None,
    file: TextIO = sys.stdout,
    show_values: bool = True,
    show_tokens: bool = True,
) -> None:
    """Run the full pipeline and print token stream then parser events.

    Parameters
    ----------
    source:
        CIF source: a raw string, a ``pathlib.Path``, or an open text file object.
    inner:
        Optional downstream handler to receive all events.
    file:
        Output stream (default ``sys.stdout``).
    show_values:
        Forward to ``ParseHandler``; set False to suppress ``add_value`` lines
        for large files.
    show_tokens:
        If True (default), also print the lexer token stream before events.
    """
    source = resolve_source(source)
    if show_tokens:
        inspect_lexer(source, file=file)

    handler = ParseHandler(inner, file=file, show_values=show_values)
    CifParser(handler).parse(source)
    print(file=file)

cifflow.inspect._model

inspect_model — pretty-print a CifFile or CIF source string.

inspect_model(source, *, mode='pad', file=sys.stdout, show_values=True, show_tokens=True)

Run the full pipeline through the CIF model and print a summary.

Prints (in order): token stream, parser events, CifFile summary, errors.

Parameters:

Name Type Description Default
source _Source

CIF source: a raw string, a pathlib.Path, or an open text file object.

required
mode str

Loop row-count mismatch mode passed to CifBuilder: 'pad' (default) or 'strict'.

'pad'
file TextIO

Output stream (default sys.stdout).

stdout
show_values bool

Forward to ParseHandler; set False to suppress add_value lines.

True
show_tokens bool

If True (default), also print the lexer token stream before events.

True
Source code in src/cifflow/inspect/_model.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def inspect_model(
    source: _Source,
    *,
    mode: str = 'pad',
    file: TextIO = sys.stdout,
    show_values: bool = True,
    show_tokens: bool = True,
) -> None:
    """Run the full pipeline through the CIF model and print a summary.

    Prints (in order): token stream, parser events, CifFile summary, errors.

    Parameters
    ----------
    source:
        CIF source: a raw string, a ``pathlib.Path``, or an open text file object.
    mode:
        Loop row-count mismatch mode passed to ``CifBuilder``: ``'pad'``
        (default) or ``'strict'``.
    file:
        Output stream (default ``sys.stdout``).
    show_values:
        Forward to ``ParseHandler``; set False to suppress ``add_value`` lines.
    show_tokens:
        If True (default), also print the lexer token stream before events.
    """
    from cifflow.cifmodel.builder import CifBuilder

    source = resolve_source(source)

    if show_tokens:
        inspect_lexer(source, file=file)

    errors: list[ParseError] = []
    builder = CifBuilder(on_error=errors.append, mode=mode)
    handler = ParseHandler(builder, file=file, show_values=show_values)
    CifParser(handler).parse(source)
    print(file=file)

    _print_model(builder.result, file=file)

    if errors:
        print(c('-- errors --', BOLD, DIM, file=file), file=file)
        for err in errors:
            loc  = c(f'line {err.line} col {err.column}', DIM, file=file)
            kind = c(f'[{err.error_type.upper()}]', RED, BOLD, file=file)
            print(f'  {kind}  {loc}  {err.message}', file=file)
            if err.recovery_action:
                print(f'    {c("->", DIM, file=file)} {err.recovery_action}', file=file)
        print(file=file)

cifflow.inspect._schema

inspect_schema — pretty-print a SchemaSpec derived from a DDLm dictionary.

inspect_schema(source, *, show_ddl=False, file=sys.stdout)

Print a structured summary of a SchemaSpec to file.

source may be:

  • A :class:~cifflow.dictionary.schema.SchemaSpec — used directly.
  • A :class:~cifflow.dictionary.loader.DdlmDictionary — schema generated from it.
  • A pathlib.Path to a DDLm dictionary file — loaded via :class:~cifflow.dictionary.loader.DictionaryLoader with directory_resolver(path.parent) so _import.get directives resolve from the same directory.
  • A raw CIF source string — parsed with no resolver (imports that require external files are silently skipped).

Parameters:

Name Type Description Default
source Union[str, Path, SchemaSpec, DdlmDictionary]

Dictionary source, a pre-built DdlmDictionary, or a SchemaSpec.

required
show_ddl bool

If True, append the raw CREATE TABLE DDL under each table. Default False.

False
file TextIO

Output stream. Default sys.stdout.

stdout
Source code in src/cifflow/inspect/_schema.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def inspect_schema(
    source: 'Union[str, pathlib.Path, SchemaSpec, DdlmDictionary]',
    *,
    show_ddl: bool = False,
    file: TextIO = sys.stdout,
) -> None:
    """Print a structured summary of a ``SchemaSpec`` to *file*.

    *source* may be:

    - A :class:`~cifflow.dictionary.schema.SchemaSpec` — used directly.
    - A :class:`~cifflow.dictionary.loader.DdlmDictionary` — schema generated from it.
    - A ``pathlib.Path`` to a DDLm dictionary file — loaded via
      :class:`~cifflow.dictionary.loader.DictionaryLoader` with
      ``directory_resolver(path.parent)`` so ``_import.get`` directives
      resolve from the same directory.
    - A raw CIF source string — parsed with no resolver (imports that require
      external files are silently skipped).

    Parameters
    ----------
    source:
        Dictionary source, a pre-built ``DdlmDictionary``, or a ``SchemaSpec``.
    show_ddl:
        If ``True``, append the raw ``CREATE TABLE`` DDL under each table.
        Default ``False``.
    file:
        Output stream.  Default ``sys.stdout``.
    """
    from cifflow.dictionary.loader import DictionaryLoader, directory_resolver
    from cifflow.dictionary.schema import SchemaSpec, generate_schema, emit_create_statements

    try:
        from cifflow.dictionary.loader import DdlmDictionary
    except ImportError:
        DdlmDictionary = None

    if isinstance(source, SchemaSpec):
        schema = source
    elif DdlmDictionary is not None and isinstance(source, DdlmDictionary):
        schema = generate_schema(source)
    elif isinstance(source, pathlib.Path) or (
        isinstance(source, str) and not source.lstrip().startswith('#')
        and '\n' not in source.strip()
    ):
        path = pathlib.Path(source)
        raw = path.read_text(encoding='utf-8')
        loader = DictionaryLoader(resolver=directory_resolver(path.parent))
        dictionary = loader.load(raw)
        schema = generate_schema(dictionary)
    else:
        loader = DictionaryLoader(resolver=None)
        dictionary = loader.load(source)
        schema = generate_schema(dictionary)

    n_tables = len(schema.tables)
    n_set    = sum(1 for t in schema.tables.values() if t.category_class == 'Set')
    n_loop   = sum(1 for t in schema.tables.values() if t.category_class == 'Loop')
    n_fk     = sum(len(t.foreign_keys) for t in schema.tables.values())
    n_warn   = len(schema.warnings)

    summary = (
        f'{n_tables} table{"s" if n_tables != 1 else ""}'
        f'  ({n_set} Set, {n_loop} Loop)'
        f'  {n_fk} FK{"s" if n_fk != 1 else ""}'
        f'  {n_warn} warning{"s" if n_warn != 1 else ""}'
    )
    print(c('-- schema --', BOLD, DIM, file=file), file=file)
    print(c(summary, DIM, file=file), file=file)
    print(file=file)

    ddl_stmts = emit_create_statements(schema) if show_ddl else []
    ddl_by_table: dict[str, str] = {}
    if show_ddl:
        for stmt, table in zip(ddl_stmts, schema.tables.values()):
            ddl_by_table[table.name] = stmt

    def _depr_suffix(definition_id: str) -> str:
        if definition_id not in schema.deprecated_ids:
            return ''
        replacements = [r for r in schema.deprecated_replacements.get(definition_id, []) if r]
        if replacements:
            return '  ' + c('DEPRECATED -> ' + ', '.join(replacements), RED, file=file)
        return '  ' + c('DEPRECATED', RED, file=file)

    for table in sorted(schema.tables.values(), key=lambda t: t.name):
        cls_colour = CYAN if table.category_class == 'Loop' else BLUE
        header = (
            c(table.name, BOLD, file=file)
            + '  '
            + c(f'[{table.category_class}]', cls_colour, file=file)
            + _depr_suffix(table.definition_id)
        )
        print(header, file=file)

        pk_str = ', '.join(c(k, YELLOW, file=file) for k in table.primary_keys)
        print(f'  PK  {pk_str}', file=file)

        def _col_display_type(col) -> str:
            if col.name == '_cifflow_row_id':
                return 'INTEGER'
            return col.type_contents or 'TEXT'

        col_name_w = max((len(col.name) for col in table.columns), default=8)
        type_w     = max((len(_col_display_type(col)) for col in table.columns), default=4)

        print(f'  {c("columns", DIM, file=file)}', file=file)
        for col in table.columns:
            name_part = c(col.name.ljust(col_name_w), YELLOW, file=file)
            type_part = c(_col_display_type(col).ljust(type_w), GREEN, file=file)

            flags: list[str] = []
            if not col.nullable:
                flags.append(c('NOT NULL', DIM, file=file))
            if col.is_synthetic and col.name == '_cifflow_row_id':
                flags.append(c('UNIQUE', DIM, file=file))
            if col.is_primary_key:
                flags.append(c('PK', YELLOW, file=file))
            if col.is_synthetic:
                flags.append(c('synthetic', DIM, file=file))

            tag_part = ''
            if not col.is_synthetic:
                tag_part = '  ' + c(col.definition_id, DIM, file=file)
            if col.linked_item_id and not col.is_primary_key:
                tag_part += '  ' + c(f'->su {col.linked_item_id}', MAGENTA, file=file)
            tag_part += _depr_suffix(col.definition_id)

            flag_str = '  '.join(flags)
            print(f'    {name_part}  {type_part}  {flag_str}{tag_part}', file=file)

        if table.foreign_keys:
            print(f'  {c("foreign keys", DIM, file=file)}', file=file)
            for fk in table.foreign_keys:
                if len(fk.source_columns) == 1:
                    src = c(fk.source_columns[0], YELLOW, file=file)
                    tgt = c(
                        f'{fk.target_table}.{fk.target_columns[0]}',
                        CYAN, file=file,
                    )
                else:
                    src = c(
                        '(' + ', '.join(fk.source_columns) + ')',
                        YELLOW, file=file,
                    )
                    tgt = c(
                        f'{fk.target_table}.(' + ', '.join(fk.target_columns) + ')',
                        CYAN, file=file,
                    )
                print(f'    {src} -> {tgt}  DEFERRABLE', file=file)

        if show_ddl and table.name in ddl_by_table:
            print(f'  {c("ddl", DIM, file=file)}', file=file)
            for ddl_line in ddl_by_table[table.name].splitlines():
                print(f'    {c(ddl_line, DIM, file=file)}', file=file)

        print(file=file)

    set_tables = {name for name, t in schema.tables.items() if t.category_class == 'Set'}

    # Reverse map: definition_id → (table_name, col_name), for transitive chain-following.
    tag_to_table_col: dict[str, tuple[str, str]] = {
        defn_id: (tbl, col_name)
        for (tbl, col_name), defn_id in schema.column_to_tag.items()
    }
    col_by_key: dict[tuple[str, str], object] = {
        (tbl, col.name): col
        for tbl, tbl_def in schema.tables.items()
        for col in tbl_def.columns
    }

    def _resolves_to_set(linked_item_id: str, visited: set) -> bool:
        """Return True if linked_item_id transitively reaches a Set category."""
        if not linked_item_id or linked_item_id in visited:
            return False
        visited.add(linked_item_id)
        canonical = schema.alias_to_definition_id.get(linked_item_id, linked_item_id)
        cls = schema.tag_to_category_class.get(canonical)
        if cls == 'Set':
            return True
        if cls != 'Loop':
            return False
        entry = tag_to_table_col.get(canonical)
        if entry is None:
            return False
        target_col = col_by_key.get(entry)
        if target_col is not None and target_col.linked_item_id:
            return _resolves_to_set(target_col.linked_item_id, visited)
        return False

    bridge_by_table: dict[str, list] = {}
    for bc in schema.bridge_columns:
        bridge_by_table.setdefault(bc.table_name, []).append(bc)

    floating_loops = []
    for table in schema.tables.values():
        if table.category_class != 'Loop':
            continue
        pk_set = set(table.primary_keys)

        has_set_link = any(
            _resolves_to_set(col.linked_item_id, set())
            for col in table.columns
            if col.is_primary_key and not col.is_synthetic and col.linked_item_id
        )

        has_set_bridge = any(
            bc.column_name in pk_set and bc.hops[-1][1] in set_tables
            for bc in bridge_by_table.get(table.name, [])
        )

        if not has_set_link and not has_set_bridge:
            floating_loops.append(table)

    if floating_loops:
        print(c('-- loop tables without Set-derived category key --', BOLD, DIM, file=file), file=file)
        for table in sorted(floating_loops, key=lambda t: t.name):
            pk_str = ', '.join(c(k, YELLOW, file=file) for k in table.primary_keys)
            print(f'  {c(table.name, BOLD, file=file)}  PK: {pk_str}', file=file)
        print(file=file)

    if schema.warnings:
        print(c('-- schema warnings --', BOLD, DIM, file=file), file=file)
        for w in schema.warnings:
            print(f'  {c("!", YELLOW, file=file)}  {w}', file=file)
        print(file=file)

cifflow.inspect._ingest

inspect_ingest — trace what happens during CIF ingestion.

TraceEvent dataclass

One event captured during :func:inspect_ingest.

Attributes:

Name Type Description
kind str

Category of event. One of:

  • 'warning' — non-fatal semantic issue (e.g. unrecognised tag)
  • 'error' — fatal semantic error
detail str

Human-readable description of the event.

block_id Optional[str]

CIF data-block name where the event occurred, if known.

table Optional[str]

DuckDB table name involved, if applicable.

tag Optional[str]

CIF tag involved, if applicable.

Source code in src/cifflow/inspect/_ingest.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@dataclass
class TraceEvent:
    """One event captured during :func:`inspect_ingest`.

    Attributes
    ----------
    kind:
        Category of event.  One of:

        - ``'warning'``      — non-fatal semantic issue (e.g. unrecognised tag)
        - ``'error'``        — fatal semantic error
    detail:
        Human-readable description of the event.
    block_id:
        CIF data-block name where the event occurred, if known.
    table:
        DuckDB table name involved, if applicable.
    tag:
        CIF tag involved, if applicable.
    """

    kind: str
    detail: str
    block_id: Optional[str] = None
    table: Optional[str] = None
    tag: Optional[str] = None

inspect_ingest(cif, db=None, schema=None, *, propagate_fk=False, dataset_id=None, file=None)

Run ingestion, capture events, and pretty-print a diagnostic trace.

Parameters:

Name Type Description Default
cif CifFile

Parsed CifFile from build().

required
db DuckDBPyConnection | None

Open duckdb.DuckDBPyConnection, or None for a fresh in-memory DB.

None
schema SchemaSpec | None

SchemaSpec used to route tags, or None to route all to fallback.

None
propagate_fk bool

Forwarded to ingest().

False
dataset_id str | None

Forwarded to ingest().

None
file Optional[TextIO]

Where to write the trace. Defaults to sys.stdout.

None

Returns:

Type Description
list[TraceEvent]

All captured events in occurrence order.

Source code in src/cifflow/inspect/_ingest.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def inspect_ingest(
    cif: CifFile,
    db: duckdb.DuckDBPyConnection | None = None,
    schema: SchemaSpec | None = None,
    *,
    propagate_fk: bool = False,
    dataset_id: str | None = None,
    file: Optional[TextIO] = None,
) -> list[TraceEvent]:
    """Run ingestion, capture events, and pretty-print a diagnostic trace.

    Parameters
    ----------
    cif:
        Parsed ``CifFile`` from ``build()``.
    db:
        Open ``duckdb.DuckDBPyConnection``, or ``None`` for a fresh in-memory DB.
    schema:
        ``SchemaSpec`` used to route tags, or ``None`` to route all to fallback.
    propagate_fk:
        Forwarded to ``ingest()``.
    dataset_id:
        Forwarded to ``ingest()``.
    file:
        Where to write the trace.  Defaults to ``sys.stdout``.

    Returns
    -------
    list[TraceEvent]
        All captured events in occurrence order.
    """
    if file is None:
        file = sys.stdout

    from cifflow.ingestion.ingest import ingest

    events: list[TraceEvent] = []

    print(c('-- inspect_ingest --', BOLD, DIM, file=file), file=file)

    try:
        _, ingest_errors = ingest(
            cif, db, schema=schema,
            propagate_fk=propagate_fk,
            dataset_id=dataset_id,
        )
        for msg in ingest_errors:
            events.append(TraceEvent(kind='warning', detail=msg))

    except ValueError as exc:
        events.append(TraceEvent(kind='error', detail=str(exc)))

    except Exception as exc:
        events.append(TraceEvent(kind='error', detail=str(exc)))

    warnings_ev = [e for e in events if e.kind == 'warning']
    errors_ev = [e for e in events if e.kind == 'error']

    if warnings_ev:
        print(c(f'  {len(warnings_ev)} semantic warning(s):', YELLOW, file=file), file=file)
        for ev in warnings_ev:
            print(f'    {c("~", YELLOW, file=file)}  {ev.detail}', file=file)

    if errors_ev:
        print(c(f'  {len(errors_ev)} error(s):', RED, BOLD, file=file), file=file)
        for ev in errors_ev:
            print(f'    {c("!", RED, file=file)}  {ev.detail}', file=file)

    if not warnings_ev and not errors_ev:
        print(c('  Ingestion completed with no warnings.', GREEN, file=file), file=file)

    _print_trace_summary(events, file)
    return events