Skip to content

Dictionary

cifflow.dictionary.ddlm_item

DDLm item definition — one save frame extracted from a DDLm dictionary.

DdlmItem dataclass

Represents a single definition extracted from a DDLm dictionary save frame.

Each save frame in a DDLm dictionary defines either a data item or a category. After import resolution, all relevant attributes are collected into this dataclass.

Attributes:

Name Type Description
definition_id str

Canonical tag name as it appears in CIF data files, normalised to lowercase. Corresponds to _definition.id.

scope str

"Item", "Category", or "Dictionary". Defaults to "Item" when _definition.scope is absent from the save frame.

definition_class str

DDLm class of this definition: "Datum", "Attribute", "Loop", "Set", "Head", or "Functions". Defaults to "Datum" when _definition.class is absent.

category_id str | None

SQLite table name derived from _name.category_id, lowercased. None for "Dictionary"-scope frames and items missing this tag.

object_id str | None

SQLite column name derived from _name.object_id, lowercased. None for category frames and items missing this tag.

type_purpose str | None

Value of _type.purpose (e.g. "Key", "Link", "SU", "Measurand"). None if absent.

type_source str | None

Value of _type.source (e.g. "Assigned", "Recorded"). None if absent.

type_container str

Value of _type.container (e.g. "Single", "List"). Defaults to "Single" when absent.

type_contents str | None

Value of _type.contents (e.g. "Text", "Integer", "Real"). None if absent.

linked_item_id str | None

For Link and SU items: the _definition.id of the linked item, lowercased. None for all other items.

units_code str | None

Value of _units.code. None if absent.

description str | None

Human-readable description from _description.text. None if absent.

enumeration_states list[str]

Allowed enumeration values from _enumeration_set.state. Empty list when not present. Item-scope frames only.

enumeration_default str | None

Default value from _enumeration.default. None if absent. The CIF inapplicable placeholder '.' is preserved as-is.

category_keys list[str]

Lowercased fully-qualified tag names from _category_key.name. Empty list when not present. Category-scope frames only.

aliases list[str]

Old tag names from _alias.definition_id, each mapping 1:1 to this definition_id. Empty list when none are declared.

replaced_by list[str]

Preferred replacement tag names from _definition_replaced.by, lowercased. An empty string represents a PLACEHOLDER ("."), meaning deprecated with no replacement. Empty list when not present.

is_deprecated bool

True if any _definition_replaced row exists for this item, regardless of the replacement value.

enumeration_range str | None

Value of _enumeration.range. None if absent.

type_dimension str | None

Value of _type.dimension. None if absent.

enumeration_def_index_ids list[str]

Ordered list of canonical tag names from _enumeration.def_index_ids whose values form the lookup key for keyed defaults. Empty when absent.

enumeration_defaults list[tuple[list[str], str]]

Keyed default table from a _enumeration_defaults loop: each entry is (key_components, default_value) where key_components aligns positionally with enumeration_def_index_ids. Empty when absent.

Source code in src/cifflow/dictionary/ddlm_item.py
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@dataclass
class DdlmItem:
    """
    Represents a single definition extracted from a DDLm dictionary save frame.

    Each save frame in a DDLm dictionary defines either a data item or a
    category.  After import resolution, all relevant attributes are collected
    into this dataclass.

    Attributes
    ----------
    definition_id:
        Canonical tag name as it appears in CIF data files, normalised to
        lowercase.  Corresponds to ``_definition.id``.
    scope:
        ``"Item"``, ``"Category"``, or ``"Dictionary"``.  Defaults to
        ``"Item"`` when ``_definition.scope`` is absent from the save frame.
    definition_class:
        DDLm class of this definition: ``"Datum"``, ``"Attribute"``,
        ``"Loop"``, ``"Set"``, ``"Head"``, or ``"Functions"``.  Defaults to
        ``"Datum"`` when ``_definition.class`` is absent.
    category_id:
        SQLite table name derived from ``_name.category_id``, lowercased.
        ``None`` for ``"Dictionary"``-scope frames and items missing this tag.
    object_id:
        SQLite column name derived from ``_name.object_id``, lowercased.
        ``None`` for category frames and items missing this tag.
    type_purpose:
        Value of ``_type.purpose`` (e.g. ``"Key"``, ``"Link"``, ``"SU"``,
        ``"Measurand"``).  ``None`` if absent.
    type_source:
        Value of ``_type.source`` (e.g. ``"Assigned"``, ``"Recorded"``).
        ``None`` if absent.
    type_container:
        Value of ``_type.container`` (e.g. ``"Single"``, ``"List"``).
        Defaults to ``"Single"`` when absent.
    type_contents:
        Value of ``_type.contents`` (e.g. ``"Text"``, ``"Integer"``,
        ``"Real"``).  ``None`` if absent.
    linked_item_id:
        For ``Link`` and ``SU`` items: the ``_definition.id`` of the linked
        item, lowercased.  ``None`` for all other items.
    units_code:
        Value of ``_units.code``.  ``None`` if absent.
    description:
        Human-readable description from ``_description.text``.  ``None`` if
        absent.
    enumeration_states:
        Allowed enumeration values from ``_enumeration_set.state``.  Empty
        list when not present.  Item-scope frames only.
    enumeration_default:
        Default value from ``_enumeration.default``.  ``None`` if absent.
        The CIF inapplicable placeholder ``'.'`` is preserved as-is.
    category_keys:
        Lowercased fully-qualified tag names from ``_category_key.name``.
        Empty list when not present.  Category-scope frames only.
    aliases:
        Old tag names from ``_alias.definition_id``, each mapping 1:1 to
        this ``definition_id``.  Empty list when none are declared.
    replaced_by:
        Preferred replacement tag names from ``_definition_replaced.by``,
        lowercased.  An empty string represents a ``PLACEHOLDER`` (``"."``),
        meaning deprecated with no replacement.  Empty list when not present.
    is_deprecated:
        ``True`` if any ``_definition_replaced`` row exists for this item,
        regardless of the replacement value.
    enumeration_range:
        Value of ``_enumeration.range``.  ``None`` if absent.
    type_dimension:
        Value of ``_type.dimension``.  ``None`` if absent.
    enumeration_def_index_ids:
        Ordered list of canonical tag names from ``_enumeration.def_index_ids``
        whose values form the lookup key for keyed defaults.  Empty when absent.
    enumeration_defaults:
        Keyed default table from a ``_enumeration_defaults`` loop: each entry is
        ``(key_components, default_value)`` where ``key_components`` aligns
        positionally with ``enumeration_def_index_ids``.  Empty when absent.
    """

    definition_id: str
    scope: str
    definition_class: str
    category_id: str | None
    object_id: str | None
    type_purpose: str | None
    type_source: str | None
    type_container: str
    type_contents: str | None
    linked_item_id: str | None
    units_code: str | None
    description: str | None
    enumeration_states: list[str] = field(default_factory=list)
    enumeration_default: str | None = None
    category_keys: list[str] = field(default_factory=list)
    aliases: list[str] = field(default_factory=list)
    replaced_by: list[str] = field(default_factory=list)
    is_deprecated: bool = False
    enumeration_range: str | None = None
    type_dimension: str | None = None
    enumeration_def_index_ids: list[str] = field(default_factory=list)
    enumeration_defaults: list[tuple[list[str], str]] = field(default_factory=list)

cifflow.dictionary.ddlm_parser

DDLm dictionary data container produced by DictionaryLoader.

DdlmDictionary dataclass

In-memory representation of a loaded DDLm dictionary.

Produced by DictionaryLoader.load(). Contains all category and item definitions extracted from the dictionary's save frames, together with pre-built lookup tables for fast tag resolution.

Attributes:

Name Type Description
name str

The data_ block name from the parsed CIF file (e.g. "CIF_CORE").

title str | None

Value of _dictionary.title, or None if absent.

version str | None

Value of _dictionary.version, or None if absent.

categories dict[str, DdlmItem]

Mapping from lowercased definition_id to DdlmItem for every "Category"-scope frame.

items dict[str, DdlmItem]

Mapping from lowercased definition_id to DdlmItem for every "Item"-scope frame.

tag_to_item dict[str, DdlmItem]

Combined lookup covering every definition_id (both categories and items) plus all declared aliases. Keys are lowercased.

alias_to_definition_id dict[str, str]

Maps each lowercased alias tag name to the current lowercased definition_id.

deprecated_ids set[str]

Set of lowercased definition_id values whose definitions have been replaced (is_deprecated == True).

warnings list[str]

Non-fatal issues encountered during loading, in emission order.

source_files list[str]

Absolute file paths (or URIs) of every dictionary file loaded, in load order. Populated when path_resolver is supplied to :class:~cifflow.dictionary.loader.DictionaryLoader.

uri str | None

Value of _dictionary.uri from the top-level dictionary source, or None if absent.

Source code in src/cifflow/dictionary/ddlm_parser.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@dataclass
class DdlmDictionary:
    """
    In-memory representation of a loaded DDLm dictionary.

    Produced by ``DictionaryLoader.load()``.  Contains all category and item
    definitions extracted from the dictionary's save frames, together with
    pre-built lookup tables for fast tag resolution.

    Attributes
    ----------
    name:
        The ``data_`` block name from the parsed CIF file (e.g. ``"CIF_CORE"``).
    title:
        Value of ``_dictionary.title``, or ``None`` if absent.
    version:
        Value of ``_dictionary.version``, or ``None`` if absent.
    categories:
        Mapping from lowercased ``definition_id`` to ``DdlmItem`` for every
        ``"Category"``-scope frame.
    items:
        Mapping from lowercased ``definition_id`` to ``DdlmItem`` for every
        ``"Item"``-scope frame.
    tag_to_item:
        Combined lookup covering every ``definition_id`` (both categories and
        items) plus all declared aliases.  Keys are lowercased.
    alias_to_definition_id:
        Maps each lowercased alias tag name to the current lowercased
        ``definition_id``.
    deprecated_ids:
        Set of lowercased ``definition_id`` values whose definitions have been
        replaced (``is_deprecated == True``).
    warnings:
        Non-fatal issues encountered during loading, in emission order.
    source_files:
        Absolute file paths (or URIs) of every dictionary file loaded,
        in load order.  Populated when ``path_resolver`` is supplied to
        :class:`~cifflow.dictionary.loader.DictionaryLoader`.
    uri:
        Value of ``_dictionary.uri`` from the top-level dictionary source,
        or ``None`` if absent.
    """

    name: str
    title: str | None
    version: str | None
    categories: dict[str, DdlmItem]
    items: dict[str, DdlmItem]
    tag_to_item: dict[str, DdlmItem]
    alias_to_definition_id: dict[str, str]
    deprecated_ids: set[str]
    warnings: list[str] = field(default_factory=list)
    source_files: list[str] = field(default_factory=list)
    uri: str | None = None

cifflow.dictionary.loader

DDLm dictionary loader — parses a DDLm CIF and resolves _import.get directives.

SourceResolver = Callable[[str], str | None] module-attribute

Callable that maps a URI string to a raw CIF source string, or None.

DictionaryLoader

Loads a DDLm dictionary from a CIF 2.0 source string.

Resolves _import.get directives using the supplied SourceResolver. Both mode="Contents" (frame-level attribute merge) and mode="Full" (constituent dictionary incorporation) are supported. File access is fully delegated to the resolver; this class never accesses the filesystem or network directly.

Parsed files are cached for the lifetime of the loader instance. To invalidate the cache, create a new instance.

Parameters:

Name Type Description Default
resolver SourceResolver | None

Callable that maps a URI string to a raw CIF source string, or None if the file is unavailable. If None, import directives that require an external file will trigger the if_miss policy.

None
path_resolver Callable[[str], str | None] | None

Optional companion to resolver that maps the same URI to an absolute filesystem path. When provided, the resolved paths are recorded in :attr:~cifflow.dictionary.ddlm_parser.DdlmDictionary.source_files.

None
on_warning Callable[[str], None] | None

Optional callback for non-fatal warnings. If None, warnings are silently discarded.

None
ignore_head_imports bool

When True, _import.get directives in save frames with _definition.class = Head are silently skipped. Only the save frames physically present in the file being loaded are parsed. Applies to all files loaded by this instance, including constituents loaded via mode="Full" recursion. Defaults to False.

False
Source code in src/cifflow/dictionary/loader.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
class DictionaryLoader:
    """
    Loads a DDLm dictionary from a CIF 2.0 source string.

    Resolves ``_import.get`` directives using the supplied ``SourceResolver``.
    Both ``mode="Contents"`` (frame-level attribute merge) and ``mode="Full"``
    (constituent dictionary incorporation) are supported.  File access is fully
    delegated to the resolver; this class never accesses the filesystem or
    network directly.

    Parsed files are cached for the lifetime of the loader instance.  To
    invalidate the cache, create a new instance.

    Parameters
    ----------
    resolver
        Callable that maps a URI string to a raw CIF source string, or ``None``
        if the file is unavailable.  If ``None``, import directives that require
        an external file will trigger the ``if_miss`` policy.
    path_resolver
        Optional companion to *resolver* that maps the same URI to an absolute
        filesystem path.  When provided, the resolved paths are recorded in
        :attr:`~cifflow.dictionary.ddlm_parser.DdlmDictionary.source_files`.
    on_warning
        Optional callback for non-fatal warnings.  If ``None``, warnings are
        silently discarded.
    ignore_head_imports
        When ``True``, ``_import.get`` directives in save frames with
        ``_definition.class = Head`` are silently skipped.  Only the save
        frames physically present in the file being loaded are parsed.
        Applies to all files loaded by this instance, including constituents
        loaded via ``mode="Full"`` recursion.  Defaults to ``False``.
    """

    def __init__(
        self,
        resolver: SourceResolver | None = None,
        *,
        path_resolver: 'Callable[[str], str | None] | None' = None,
        on_warning: Callable[[str], None] | None = None,
        ignore_head_imports: bool = False,
    ) -> None:
        self._resolver = resolver
        self._path_resolver = path_resolver
        self._on_warning = on_warning if on_warning is not None else lambda msg: None
        self._ignore_head_imports = ignore_head_imports
        self._source_cache: dict[str, str] = {}
        self._parse_cache: dict[str, CifFile] = {}

    def load(self, source: str, *, base_uri: str | None = None) -> DdlmDictionary:
        """
        Parse a DDLm dictionary source string and resolve all ``_import.get`` directives.

        Both ``mode="Contents"`` (frame-level attribute merge) and
        ``mode="Full"`` (constituent dictionary incorporation) are supported.
        When a ``mode="Full"`` import targets a Head category, the entire
        constituent dictionary is loaded recursively and its definitions are
        merged into the result, with local definitions taking precedence.

        Circular imports are detected and skipped with a warning.

        Parameters
        ----------
        source:
            Raw CIF 2.0 source string of the dictionary to parse.
        base_uri:
            URI of the dictionary being parsed, used as the base for resolving
            relative import URIs.  If ``None`` and ``_dictionary.uri`` is
            present in the dictionary, that value is used.  If neither is
            available, relative URIs are passed to the resolver as-is.

        Returns
        -------
        DdlmDictionary
            The fully loaded dictionary with all imports resolved.
        """
        collected: list[str] = []
        if base_uri:
            resolved = self._path_resolver(base_uri) if self._path_resolver else None
            collected.append(resolved or base_uri)
        return self._load_recursive(source, base_uri, set(), collected)

    def _load_recursive(
        self,
        source: str,
        base_uri: str | None,
        loading: set[str],
        collected: list[str],
    ) -> DdlmDictionary:
        """Parse and resolve one dictionary, tracking *loading* for cycle detection."""
        warnings: list[str] = []

        def warn(msg: str) -> None:
            warnings.append(msg)
            self._on_warning(msg)

        cif, parse_errors = build(source)
        for e in parse_errors:
            warn(f'parse error in dictionary: {e.message} (line {e.line})')

        if not cif.blocks:
            warn('dictionary CIF contains no data blocks')
            return DdlmDictionary(
                name='', title=None, version=None,
                categories={}, items={}, tag_to_item={},
                alias_to_definition_id={}, deprecated_ids=set(),
                warnings=warnings,
            )

        if len(cif.blocks) > 1:
            warn(f'dictionary CIF has {len(cif.blocks)} data blocks — using first')

        block_name = cif.blocks[0]
        block = cif[block_name]

        # Read the canonical dictionary URI unconditionally.
        uri_vals = block['_dictionary.uri'] if '_dictionary.uri' in block else []
        dict_uri = uri_vals[0] if uri_vals and isinstance(uri_vals[0], str) and uri_vals[0] not in ('.', '?') else None

        # Resolve base_uri for import resolution if not supplied by caller.
        if base_uri is None:
            base_uri = dict_uri

        title = block['_dictionary.title'][0] if '_dictionary.title' in block else None
        if isinstance(title, str) and title in ('.', '?'):
            title = None
        version = block['_dictionary.version'][0] if '_dictionary.version' in block else None
        if isinstance(version, str) and version in ('.', '?'):
            version = None

        # pool accumulates DdlmItems from mode="Full" constituent imports.
        # Primary items (from this file's frames) are appended afterwards so
        # they overwrite constituent definitions with the same definition_id.
        pool: dict[str, DdlmItem] = {}
        primary_items: list[DdlmItem] = []

        for sf_name in block.save_frames:
            sf = block[sf_name]
            frame_data = {tag: sf[tag] for tag in sf.tags if tag in _FRAME_TAGS}
            _apply_tag_aliases(frame_data)

            frame_class = (_scalar(frame_data, '_definition.class') or '').lower()
            is_head = frame_class == 'head'
            if '_import.get' in frame_data and not (self._ignore_head_imports and is_head):
                directives_val = frame_data['_import.get']
                if directives_val and isinstance(directives_val[0], list):
                    directives = directives_val[0]
                    self._resolve_imports(
                        frame_data, directives, base_uri, loading, pool, warn, collected
                    )

            item = _extract_item(frame_data, warn)
            if item is not None:
                primary_items.append(item)

        # Merge: constituents first (pool), then primary overwrites.
        all_items = list(pool.values()) + primary_items

        categories, items, tag_to_item, alias_to_def_id, deprecated_ids = (
            _build_lookup_tables(all_items, warn)
        )

        return DdlmDictionary(
            name=block_name,
            title=title,
            version=version,
            uri=dict_uri,
            categories=categories,
            items=items,
            tag_to_item=tag_to_item,
            alias_to_definition_id=alias_to_def_id,
            deprecated_ids=deprecated_ids,
            warnings=warnings,
            source_files=list(collected),
        )

    def _load_constituent(
        self,
        uri: str,
        loading: set[str],
        warn: Callable[[str], None],
        collected: list[str] | None = None,
    ) -> DdlmDictionary | None:
        """
        Load and return the dictionary at *uri*, or ``None`` on failure.

        Checks *loading* for circular imports before proceeding.  Adds *uri*
        to *loading* for the duration of the recursive call.
        """
        if uri in loading:
            warn(f'circular import detected for {uri!r} — skipped')
            return None
        src = self._get_source(uri)
        if src is None:
            return None
        if collected is not None:
            resolved = self._path_resolver(uri) if self._path_resolver else None
            entry = resolved or uri
            if entry not in collected:
                collected.append(entry)
        loading.add(uri)
        try:
            return self._load_recursive(src, uri, loading, collected if collected is not None else [])
        finally:
            loading.discard(uri)

    def _resolve_imports(
        self,
        frame_data: dict[str, list],
        directives: list[Any],
        base_uri: str | None,
        loading: set[str],
        pool: dict[str, DdlmItem],
        warn: Callable[[str], None],
        collected: list[str] | None = None,
    ) -> None:
        """Apply ``_import.get`` directives to *frame_data* and/or *pool*."""
        # Sort by 'order' if present; fall back to list order.
        def _order_key(d: Any) -> int:
            if not isinstance(d, dict):
                return 0
            v = d.get('order')
            try:
                return int(v) if v is not None else 0
            except (TypeError, ValueError):
                return 0

        sorted_directives = sorted(directives, key=_order_key)

        for directive in sorted_directives:
            if not isinstance(directive, dict):
                warn(f'_import.get directive is not a table — skipped: {directive!r}')
                continue

            file_uri = directive.get('file', '')
            save_id = directive.get('save', '')
            mode = directive.get('mode', 'Contents')
            dupl = directive.get('dupl', 'Exit')
            miss = directive.get('miss', 'Exit')

            if not isinstance(file_uri, str) or not file_uri:
                warn("_import.get directive missing 'file' key — skipped")
                continue
            if not isinstance(save_id, str) or not save_id:
                warn("_import.get directive missing 'save' key — skipped")
                continue

            if mode not in ('Contents', 'Full'):
                warn(
                    f"_import.get mode {mode!r} is not supported "
                    f"(file={file_uri!r}, save={save_id!r}) — skipped"
                )
                continue

            # Resolve the URI relative to base_uri if needed.
            resolved_uri = self._resolve_uri(file_uri, base_uri)

            if mode == 'Full':
                # Look up the named save frame first to determine whether the
                # target is a Head category (dictionary-level import) or an
                # ordinary frame (frame-level attribute merge like Contents).
                source_cif = self._get_parsed(resolved_uri)

                if source_cif is None:
                    msg = (
                        f"_import.get could not load {resolved_uri!r} "
                        f"(save={save_id!r})"
                    )
                    if miss == 'Ignore':
                        warn(msg + ' — ignored')
                        continue
                    else:
                        warn(msg + ' — aborting dictionary load')
                        return

                source_frame_data = self._find_frame_by_definition_id(
                    source_cif, save_id, lambda _: None
                )

                if source_frame_data is None:
                    msg = (
                        f"_import.get save frame {save_id!r} not found "
                        f"in {resolved_uri!r}"
                    )
                    if miss == 'Ignore':
                        warn(msg + ' — ignored')
                        continue
                    else:
                        warn(msg + ' — aborting dictionary load')
                        return

                target_class = (
                    _scalar(source_frame_data, '_definition.class') or ''
                ).lower()

                if target_class == 'head':
                    # Dictionary-level import: load the entire constituent
                    # dictionary and merge all its definitions into pool.
                    constituent = self._load_constituent(resolved_uri, loading, warn, collected)
                    if constituent is None:
                        msg = (
                            f"_import.get could not load constituent "
                            f"{resolved_uri!r} (save={save_id!r})"
                        )
                        if miss == 'Ignore':
                            warn(msg + ' — ignored')
                            continue
                        else:
                            warn(msg + ' — aborting dictionary load')
                            return

                    # Surface constituent warnings prefixed with their source.
                    for w in constituent.warnings:
                        warn(f'[{resolved_uri}] {w}')

                    abort = _merge_constituent(pool, constituent, dupl, warn)
                    if abort:
                        return
                    continue

                # Non-Head target: frame-level attribute merge (same as Contents).
                # Fall through to the shared frame-merge path below.
                # source_cif and source_frame_data are already resolved above.

            else:
                # mode == 'Contents': frame-level attribute merge.
                source_cif = self._get_parsed(resolved_uri)

                if source_cif is None:
                    msg = (
                        f"_import.get could not load {resolved_uri!r} "
                        f"(save={save_id!r})"
                    )
                    if miss == 'Ignore':
                        warn(msg + ' — ignored')
                        continue
                    else:
                        warn(msg + ' — aborting dictionary load')
                        return

                source_frame_data = self._find_frame_by_definition_id(
                    source_cif, save_id, warn
                )

                if source_frame_data is None:
                    msg = (
                        f"_import.get save frame with _definition.id={save_id!r} "
                        f"not found in {resolved_uri!r}"
                    )
                    if miss == 'Ignore':
                        warn(msg + ' — ignored')
                        continue
                    else:
                        warn(msg + ' — aborting dictionary load')
                        return

            # Shared frame-level merge path (mode="Contents" or mode="Full" non-Head).
            # source_cif and source_frame_data are already resolved above.
            abort = self._merge_frame(
                frame_data, source_frame_data, source_cif, dupl, warn
            )
            if abort:
                return

    def _resolve_uri(self, uri: str, base_uri: str | None) -> str:
        """Return the URI to pass to the resolver."""
        # If the URI looks absolute or base_uri is absent, use it as-is.
        return uri

    def _get_source(self, uri: str) -> str | None:
        """Return raw CIF source for *uri*, using cache then resolver."""
        if uri in self._source_cache:
            return self._source_cache[uri]
        if self._resolver is None:
            return None
        src = self._resolver(uri)
        if src is not None:
            self._source_cache[uri] = src
        return src

    def _get_parsed(self, uri: str) -> CifFile | None:
        """Return a parsed CifFile for *uri*, using cache then resolver."""
        if uri in self._parse_cache:
            return self._parse_cache[uri]
        src = self._get_source(uri)
        if src is None:
            return None
        cif, _ = build(src)
        self._parse_cache[uri] = cif
        return cif

    def _find_frame_by_definition_id(
        self,
        cif: CifFile,
        definition_id: str,
        warn: Callable[[str], None],
    ) -> dict[str, list] | None:
        """
        Search all save frames in *cif* for one matching *definition_id*.

        Match strategy (case-insensitive, in priority order):

        1. ``_definition.id`` value — used by full dictionary frames.
        2. Save frame label — used by template files (e.g. ``templ_attr.cif``)
           that declare no ``_definition.id``.

        Returns the frame's working dict filtered to ``_FRAME_TAGS``, or
        ``None`` if no match is found.
        """
        if not cif.blocks:
            return None
        block = cif[cif.blocks[0]]
        target = definition_id.lower()
        for sf_name in block.save_frames:
            sf = block[sf_name]
            if '_definition.id' in sf:
                raw_id = sf['_definition.id'][0]
                if isinstance(raw_id, str) and raw_id.lower() == target:
                    fd = {tag: sf[tag] for tag in sf.tags if tag in _FRAME_TAGS}
                    _apply_tag_aliases(fd)
                    return fd
            elif sf_name.lower() == target:
                # Template files carry no _definition.id; match by frame label.
                fd = {tag: sf[tag] for tag in sf.tags if tag in _FRAME_TAGS}
                _apply_tag_aliases(fd)
                return fd
        return None

    def _merge_frame(
        self,
        frame_data: dict[str, list],
        source_data: dict[str, list],
        source_cif: CifFile,
        dupl: str,
        warn: Callable[[str], None],
    ) -> bool:
        """
        Merge *source_data* tags into *frame_data* according to *dupl* policy.

        Returns ``True`` if the load should be aborted (``dupl == "Exit"``
        and a conflict was found), ``False`` otherwise.
        """
        for tag, values in source_data.items():
            if tag in _IMPORT_IDENTITY_TAGS:
                # Never import frame-identity tags from a source frame.
                continue
            if tag not in frame_data:
                frame_data[tag] = values
            else:
                if dupl == 'Ignore':
                    pass  # Keep existing value.
                elif dupl == 'Replace':
                    # If the tag belongs to a Loop category, remove all tags
                    # from that category in frame_data before inserting.
                    self._replace_loop_category_tags(
                        frame_data, tag, source_cif
                    )
                    frame_data[tag] = values
                else:  # 'Exit' (default)
                    warn(
                        f"_import.get dupl=Exit: conflict on tag {tag!r} — "
                        f"aborting dictionary load"
                    )
                    return True
        return False

    def _replace_loop_category_tags(
        self,
        frame_data: dict[str, list],
        tag: str,
        source_cif: CifFile,
    ) -> None:
        """If *tag* belongs to a Loop category in *source_cif*, remove all tags from that category in *frame_data* before the caller inserts the new value."""
        if not source_cif.blocks:
            return
        block = source_cif[source_cif.blocks[0]]

        # Find the tag's save frame to get its _name.category_id.
        tag_lower = tag.lower()
        category_id: str | None = None
        for sf_name in block.save_frames:
            sf = block[sf_name]
            if '_definition.id' not in sf:
                continue
            raw_id = sf['_definition.id'][0]
            if not isinstance(raw_id, str) or raw_id.lower() != tag_lower:
                continue
            cat_vals = sf['_name.category_id'] if '_name.category_id' in sf else []
            if cat_vals and isinstance(cat_vals[0], str):
                category_id = cat_vals[0].lower()
            break

        if category_id is None:
            return

        # Check if that category is a Loop class.
        for sf_name in block.save_frames:
            sf = block[sf_name]
            if '_definition.id' not in sf:
                continue
            raw_id = sf['_definition.id'][0]
            if not isinstance(raw_id, str) or raw_id.lower() != category_id:
                continue
            class_vals = sf['_definition.class'] if '_definition.class' in sf else []
            if class_vals and isinstance(class_vals[0], str):
                if class_vals[0].lower() == 'loop':
                    # Remove all tags in frame_data that belong to this category.
                    # Look up each tag's category via its save frame in source_cif.
                    self._remove_category_tags(frame_data, category_id, block)
            break

    def _remove_category_tags(
        self,
        frame_data: dict[str, list],
        category_id: str,
        block: Any,
    ) -> None:
        """Remove all tags from *frame_data* whose category is *category_id*."""
        to_remove = []
        for existing_tag in list(frame_data):
            for sf_name in block.save_frames:
                sf = block[sf_name]
                if '_definition.id' not in sf:
                    continue
                raw_id = sf['_definition.id'][0]
                if not isinstance(raw_id, str) or raw_id.lower() != existing_tag.lower():
                    continue
                cat_vals = sf['_name.category_id'] if '_name.category_id' in sf else []
                if cat_vals and isinstance(cat_vals[0], str):
                    if cat_vals[0].lower() == category_id:
                        to_remove.append(existing_tag)
                break
        for t in to_remove:
            del frame_data[t]

load(source, *, base_uri=None)

Parse a DDLm dictionary source string and resolve all _import.get directives.

Both mode="Contents" (frame-level attribute merge) and mode="Full" (constituent dictionary incorporation) are supported. When a mode="Full" import targets a Head category, the entire constituent dictionary is loaded recursively and its definitions are merged into the result, with local definitions taking precedence.

Circular imports are detected and skipped with a warning.

Parameters:

Name Type Description Default
source str

Raw CIF 2.0 source string of the dictionary to parse.

required
base_uri str | None

URI of the dictionary being parsed, used as the base for resolving relative import URIs. If None and _dictionary.uri is present in the dictionary, that value is used. If neither is available, relative URIs are passed to the resolver as-is.

None

Returns:

Type Description
DdlmDictionary

The fully loaded dictionary with all imports resolved.

Source code in src/cifflow/dictionary/loader.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def load(self, source: str, *, base_uri: str | None = None) -> DdlmDictionary:
    """
    Parse a DDLm dictionary source string and resolve all ``_import.get`` directives.

    Both ``mode="Contents"`` (frame-level attribute merge) and
    ``mode="Full"`` (constituent dictionary incorporation) are supported.
    When a ``mode="Full"`` import targets a Head category, the entire
    constituent dictionary is loaded recursively and its definitions are
    merged into the result, with local definitions taking precedence.

    Circular imports are detected and skipped with a warning.

    Parameters
    ----------
    source:
        Raw CIF 2.0 source string of the dictionary to parse.
    base_uri:
        URI of the dictionary being parsed, used as the base for resolving
        relative import URIs.  If ``None`` and ``_dictionary.uri`` is
        present in the dictionary, that value is used.  If neither is
        available, relative URIs are passed to the resolver as-is.

    Returns
    -------
    DdlmDictionary
        The fully loaded dictionary with all imports resolved.
    """
    collected: list[str] = []
    if base_uri:
        resolved = self._path_resolver(base_uri) if self._path_resolver else None
        collected.append(resolved or base_uri)
    return self._load_recursive(source, base_uri, set(), collected)

directory_resolver(path)

Return a SourceResolver that reads files by filename from a local directory.

The last path component of the URI is used as the filename. Returns None if the file is not found in the directory.

Parameters:

Name Type Description Default
path str | Path

Directory to search for dictionary files.

required

Returns:

Type Description
SourceResolver

A callable mapping URI strings to raw CIF source strings.

Source code in src/cifflow/dictionary/loader.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def directory_resolver(path: str | pathlib.Path) -> SourceResolver:
    """
    Return a SourceResolver that reads files by filename from a local directory.

    The last path component of the URI is used as the filename.  Returns
    ``None`` if the file is not found in the directory.

    Parameters
    ----------
    path:
        Directory to search for dictionary files.

    Returns
    -------
    SourceResolver
        A callable mapping URI strings to raw CIF source strings.
    """
    directory = pathlib.Path(path)

    def _resolve(uri: str) -> str | None:
        filename = pathlib.PurePosixPath(uri).name
        candidate = directory / filename
        if candidate.exists():
            return candidate.read_text(encoding='utf-8')
        return None

    return _resolve

directory_path_resolver(path)

Return a path resolver that maps a URI to its absolute file path.

Companion to :func:directory_resolver. Pass to DictionaryLoader(path_resolver=...) so that source_files in the resulting DdlmDictionary contains absolute paths rather than bare URIs.

Parameters:

Name Type Description Default
path str | Path

Directory to search for dictionary files.

required

Returns:

Type Description
Callable[[str], str | None]

Maps URI strings to absolute path strings, or None if not found.

Source code in src/cifflow/dictionary/loader.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def directory_path_resolver(path: str | pathlib.Path) -> 'Callable[[str], str | None]':
    """
    Return a path resolver that maps a URI to its absolute file path.

    Companion to :func:`directory_resolver`.  Pass to
    ``DictionaryLoader(path_resolver=...)`` so that ``source_files`` in the
    resulting ``DdlmDictionary`` contains absolute paths rather than bare URIs.

    Parameters
    ----------
    path:
        Directory to search for dictionary files.

    Returns
    -------
    Callable[[str], str | None]
        Maps URI strings to absolute path strings, or ``None`` if not found.
    """
    directory = pathlib.Path(path)

    def _resolve_path(uri: str) -> str | None:
        filename = pathlib.PurePosixPath(uri).name
        candidate = directory / filename
        if candidate.exists():
            return str(candidate.resolve())
        return None

    return _resolve_path

cifflow.dictionary.schema

SQLite schema generation from a loaded DDLm dictionary.

BridgeColumnDef dataclass

A column whose value is derived transitively through one or more tables.

When populating table_name, the column column_name has no direct CIF source. Its value is resolved by following a chain of single-column FK lookups described by hops, then reading bridge_value_column from the final table in the chain.

Each hop is a 3-tuple (via_column, bridge_table, bridge_pk_column):

  • via_column: column in the previous table (or in table_name for the first hop) whose value is used as the lookup key.
  • bridge_table: the table to look up in.
  • bridge_pk_column: the PK column of bridge_table matched against via_column.

For a single-hop bridge the chain has length 1 and the semantics are identical to the legacy four-field form.

Attributes:

Name Type Description
table_name str

Table that gains the derived column (e.g. 'geom_angle').

column_name str

Name of the derived column (e.g. 'structure_id').

hops list[tuple[str, str, str]]

Ordered list of (via_column, bridge_table, bridge_pk_column) tuples, one per lookup step. Must contain at least one entry.

bridge_value_column str

Column in the last hop's bridge_table whose value is copied into column_name (e.g. 'structure_id').

fallback_chains list[tuple[list[tuple[str, str, str]], str]]

Alternative resolution chains tried in order when the primary chain yields None for a given row. Each entry is a (hops, bridge_value_column) pair with the same structure as the primary fields.

Source code in src/cifflow/dictionary/schema.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@dataclass
class BridgeColumnDef:
    """
    A column whose value is derived transitively through one or more tables.

    When populating ``table_name``, the column ``column_name`` has no direct
    CIF source.  Its value is resolved by following a chain of single-column
    FK lookups described by ``hops``, then reading ``bridge_value_column``
    from the final table in the chain.

    Each hop is a 3-tuple ``(via_column, bridge_table, bridge_pk_column)``:

    - ``via_column``: column in the *previous* table (or in ``table_name``
      for the first hop) whose value is used as the lookup key.
    - ``bridge_table``: the table to look up in.
    - ``bridge_pk_column``: the PK column of ``bridge_table`` matched
      against ``via_column``.

    For a single-hop bridge the chain has length 1 and the semantics are
    identical to the legacy four-field form.

    Attributes
    ----------
    table_name:
        Table that gains the derived column (e.g. ``'geom_angle'``).
    column_name:
        Name of the derived column (e.g. ``'structure_id'``).
    hops:
        Ordered list of ``(via_column, bridge_table, bridge_pk_column)``
        tuples, one per lookup step.  Must contain at least one entry.
    bridge_value_column:
        Column in the *last* hop's ``bridge_table`` whose value is copied
        into ``column_name`` (e.g. ``'structure_id'``).
    fallback_chains:
        Alternative resolution chains tried in order when the primary chain
        yields ``None`` for a given row.  Each entry is a
        ``(hops, bridge_value_column)`` pair with the same structure as the
        primary fields.
    """

    table_name: str
    column_name: str
    hops: list[tuple[str, str, str]]
    bridge_value_column: str
    fallback_chains: 'list[tuple[list[tuple[str, str, str]], str]]' = field(default_factory=list)

    # ------------------------------------------------------------------
    # Backward-compat properties (single-hop case; also useful for
    # visualisation which only needs the first and last table).
    # ------------------------------------------------------------------

    @property
    def via_column(self) -> str:
        """Via-column of the first hop (column in ``table_name``)."""
        return self.hops[0][0]

    @property
    def bridge_table(self) -> str:
        """Bridge table of the last hop (the table holding the value)."""
        return self.hops[-1][1]

    @property
    def bridge_pk_column(self) -> str:
        """PK column of the last hop's bridge table."""
        return self.hops[-1][2]

via_column property

Via-column of the first hop (column in table_name).

bridge_table property

Bridge table of the last hop (the table holding the value).

bridge_pk_column property

PK column of the last hop's bridge table.

ForeignKeyDef dataclass

A FOREIGN KEY constraint between two tables (single- or multi-column).

Always emitted with DEFERRABLE INITIALLY DEFERRED to handle cyclic category graphs correctly within a transaction.

Attributes:

Name Type Description
source_table str

Name of the table that holds the foreign key column(s).

source_columns list[str]

Ordered list of foreign key column names in source_table.

target_table str

Name of the table being referenced.

target_columns list[str]

Ordered list of column names being referenced in target_table, corresponding positionally to source_columns.

Source code in src/cifflow/dictionary/schema.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass
class ForeignKeyDef:
    """
    A ``FOREIGN KEY`` constraint between two tables (single- or multi-column).

    Always emitted with ``DEFERRABLE INITIALLY DEFERRED`` to handle cyclic
    category graphs correctly within a transaction.

    Attributes
    ----------
    source_table:
        Name of the table that holds the foreign key column(s).
    source_columns:
        Ordered list of foreign key column names in *source_table*.
    target_table:
        Name of the table being referenced.
    target_columns:
        Ordered list of column names being referenced in *target_table*,
        corresponding positionally to *source_columns*.
    """

    source_table: str
    source_columns: list[str]
    target_table: str
    target_columns: list[str]

ColumnDef dataclass

Definition of a single column in a generated SQLite table.

Attributes:

Name Type Description
name str

SQL column name, equal to the DDLm _name.object_id, lowercased. For synthetic columns the name is _cifflow_block_id, _cifflow_row_id, or _cifflow_id.

definition_id str

The current canonical _definition.id for this column's DDLm item. Empty string for synthetic columns.

type_contents str | None

DDLm _type.contents value (e.g. "Text", "Integer", "Real", "List"); None if absent from the dictionary or for synthetic columns. Informational only -- DDL always emits TEXT for all value columns; _cifflow_row_id always emits INTEGER.

nullable bool

False for synthetic and primary-key columns; True for all other domain columns.

is_primary_key bool

True if this column is part of the table's PRIMARY KEY.

is_synthetic bool

True for the _cifflow_block_id, _cifflow_row_id, and _cifflow_id infrastructure columns, which have no corresponding DDLm item definition.

linked_item_id str | None

For SU items only: the _definition.id of the associated measurand item, lowercased. None for all other column types. Does not produce a FOREIGN KEY constraint; used by the ingestion and output layers.

type_container str | None

DDLm _type.container value (e.g. "Single", "List", "Matrix"); None for synthetic columns, "Single" as the DDLm default for domain columns when the attribute is absent. Non-"Single" containers store JSON text in SQLite regardless of type_contents.

enumeration_states list[str]

Allowed enumeration values from _enumeration_set.state. Empty list when not present.

enumeration_range str | None

Value of _enumeration.range. None if absent.

type_dimension str | None

Value of _type.dimension. None if absent.

enumeration_default str | None

Scalar default value from _enumeration.default. None if absent.

enumeration_def_index_ids list[str]

Ordered index tag names from _enumeration.def_index_ids for keyed default lookup. Empty when absent.

enumeration_defaults list[tuple[list[str], str]]

Keyed default table: [(key_components, default_value), ...]. key_components aligns positionally with enumeration_def_index_ids. Empty when absent.

Source code in src/cifflow/dictionary/schema.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@dataclass
class ColumnDef:
    """
    Definition of a single column in a generated SQLite table.

    Attributes
    ----------
    name:
        SQL column name, equal to the DDLm ``_name.object_id``, lowercased.
        For synthetic columns the name is ``_cifflow_block_id``, ``_cifflow_row_id``, or
        ``_cifflow_id``.
    definition_id:
        The current canonical ``_definition.id`` for this column's DDLm item.
        Empty string for synthetic columns.
    type_contents:
        DDLm ``_type.contents`` value (e.g. ``"Text"``, ``"Integer"``,
        ``"Real"``, ``"List"``); ``None`` if absent from the dictionary or for
        synthetic columns.  Informational only -- DDL always emits ``TEXT`` for
        all value columns; ``_cifflow_row_id`` always emits ``INTEGER``.
    nullable:
        ``False`` for synthetic and primary-key columns; ``True`` for all
        other domain columns.
    is_primary_key:
        ``True`` if this column is part of the table's ``PRIMARY KEY``.
    is_synthetic:
        ``True`` for the ``_cifflow_block_id``, ``_cifflow_row_id``, and ``_cifflow_id``
        infrastructure columns, which have no corresponding DDLm item
        definition.
    linked_item_id:
        For ``SU`` items only: the ``_definition.id`` of the associated
        measurand item, lowercased.  ``None`` for all other column types.
        Does not produce a ``FOREIGN KEY`` constraint; used by the ingestion
        and output layers.
    type_container:
        DDLm ``_type.container`` value (e.g. ``"Single"``, ``"List"``,
        ``"Matrix"``); ``None`` for synthetic columns, ``"Single"`` as the
        DDLm default for domain columns when the attribute is absent.
        Non-``"Single"`` containers store JSON text in SQLite regardless of
        ``type_contents``.
    enumeration_states:
        Allowed enumeration values from ``_enumeration_set.state``.  Empty
        list when not present.
    enumeration_range:
        Value of ``_enumeration.range``.  ``None`` if absent.
    type_dimension:
        Value of ``_type.dimension``.  ``None`` if absent.
    enumeration_default:
        Scalar default value from ``_enumeration.default``.  ``None`` if absent.
    enumeration_def_index_ids:
        Ordered index tag names from ``_enumeration.def_index_ids`` for keyed
        default lookup.  Empty when absent.
    enumeration_defaults:
        Keyed default table: ``[(key_components, default_value), ...]``.
        ``key_components`` aligns positionally with ``enumeration_def_index_ids``.
        Empty when absent.
    """

    name: str
    definition_id: str
    type_contents: str | None
    nullable: bool
    is_primary_key: bool
    is_synthetic: bool
    linked_item_id: str | None
    type_container: str | None = 'Single'
    enumeration_states: list[str] = field(default_factory=list)
    enumeration_range: str | None = None
    type_dimension: str | None = None
    enumeration_default: str | None = None
    enumeration_def_index_ids: list[str] = field(default_factory=list)
    enumeration_defaults: list[tuple[list[str], str]] = field(default_factory=list)

TableDef dataclass

Definition of a single SQLite table generated from a DDLm category.

Attributes:

Name Type Description
name str

SQL table name, derived from the category's _definition.id (lowercased, leading _ stripped, . replaced with _).

definition_id str

The _definition.id of the category save frame that produced this table.

category_class str

DDLm class of the source category: "Set" or "Loop".

columns list[ColumnDef]

Ordered list of column definitions. Order follows the column-ordering rule: _cifflow_block_id, _cifflow_id (keyless Set only), _cifflow_row_id, primary-key domain columns, remaining domain columns alphabetically.

primary_keys list[str]

Column names forming the PRIMARY KEY, in declaration order.

foreign_keys list[ForeignKeyDef]

FOREIGN KEY constraints on this table; empty when none exist.

Source code in src/cifflow/dictionary/schema.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@dataclass
class TableDef:
    """
    Definition of a single SQLite table generated from a DDLm category.

    Attributes
    ----------
    name:
        SQL table name, derived from the category's ``_definition.id``
        (lowercased, leading ``_`` stripped, ``.`` replaced with ``_``).
    definition_id:
        The ``_definition.id`` of the category save frame that produced
        this table.
    category_class:
        DDLm class of the source category: ``"Set"`` or ``"Loop"``.
    columns:
        Ordered list of column definitions.  Order follows the column-ordering
        rule: ``_cifflow_block_id``, ``_cifflow_id`` (keyless Set only),
        ``_cifflow_row_id``, primary-key domain columns, remaining domain columns
        alphabetically.
    primary_keys:
        Column names forming the ``PRIMARY KEY``, in declaration order.
    foreign_keys:
        ``FOREIGN KEY`` constraints on this table; empty when none exist.
    """

    name: str
    definition_id: str
    category_class: str
    columns: list[ColumnDef]
    primary_keys: list[str]
    foreign_keys: list[ForeignKeyDef] = field(default_factory=list)

SchemaSpec dataclass

Complete SQLite schema derived from a DdlmDictionary.

Produced by :func:generate_schema and consumed by :func:emit_create_statements and :func:~cifflow.dictionary.schema_apply.apply_schema.

Attributes:

Name Type Description
tables dict[str, TableDef]

Mapping from SQL table name to its :class:TableDef.

column_to_tag dict[tuple[str, str], str]

Reverse mapping from (table_name, column_name) to the canonical _definition.id of the corresponding DDLm item. Synthetic columns (_cifflow_block_id, _cifflow_row_id, _cifflow_id) are excluded.

alias_to_definition_id dict[str, str]

Old tag name → canonical _definition.id. Copied from DdlmDictionary.alias_to_definition_id by generate_schema. Used by ingest() for alias resolution without retaining a dictionary reference.

deprecated_ids set[str]

Set of _definition.id values marked as deprecated. Copied from DdlmDictionary.deprecated_ids by generate_schema. Used by ingest() to emit deprecation warnings.

warnings list[str]

Non-fatal issues encountered during schema generation, in emission order.

bridge_columns list[BridgeColumnDef]

Transitive bridge column definitions -- derived columns whose values are resolved through one or more FK lookup hops.

propagation_links dict[str, list[tuple[str, str, str | None]]]

Mapping from table name to a list of (column_name, target_def_id, default) tuples for PK Link columns whose FK constraint was skipped at schema generation time.

dictionary_name str | None

data_ block name from the source dictionary CIF (e.g. "CIF_CORE"). Copied from :attr:~cifflow.dictionary.ddlm_parser.DdlmDictionary.name.

dictionary_title str | None

Value of _dictionary.title from the source. None if absent.

dictionary_version str | None

Value of _dictionary.version from the source. None if absent.

dictionary_uri str | None

Value of _dictionary.uri from the source. None if absent.

source_files list[str]

Absolute file paths of every dictionary file loaded. Empty when no path_resolver was supplied to :class:~cifflow.dictionary.loader.DictionaryLoader.

category_parent dict[str, str | None]

Mapping from table name to its parent table name (or None for root categories) in the DDLm category-parent hierarchy.

tag_to_category_class dict[str, str]

Mapping from canonical _definition.id to the DDLm class ("Set" or "Loop") of the category that contains it. Covers all dictionary items, including those in categories that do not generate schema tables. Used by inspect_schema to trace Set-derived category keys transitively without requiring a live dictionary reference.

deprecated_replacements dict[str, list[str]]

Mapping from deprecated _definition.id to the list of replacement tag names from _definition_replaced.by. An empty string in the list represents a PLACEHOLDER ("."), meaning deprecated with no named replacement. Covers both deprecated items and deprecated categories.

Source code in src/cifflow/dictionary/schema.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
@dataclass
class SchemaSpec:
    """
    Complete SQLite schema derived from a ``DdlmDictionary``.

    Produced by :func:`generate_schema` and consumed by
    :func:`emit_create_statements` and
    :func:`~cifflow.dictionary.schema_apply.apply_schema`.

    Attributes
    ----------
    tables:
        Mapping from SQL table name to its :class:`TableDef`.
    column_to_tag:
        Reverse mapping from ``(table_name, column_name)`` to the canonical
        ``_definition.id`` of the corresponding DDLm item.  Synthetic
        columns (``_cifflow_block_id``, ``_cifflow_row_id``, ``_cifflow_id``) are excluded.
    alias_to_definition_id:
        Old tag name → canonical ``_definition.id``.  Copied from
        ``DdlmDictionary.alias_to_definition_id`` by ``generate_schema``.
        Used by ``ingest()`` for alias resolution without retaining a
        dictionary reference.
    deprecated_ids:
        Set of ``_definition.id`` values marked as deprecated.  Copied from
        ``DdlmDictionary.deprecated_ids`` by ``generate_schema``.  Used by
        ``ingest()`` to emit deprecation warnings.
    warnings:
        Non-fatal issues encountered during schema generation, in emission
        order.
    bridge_columns:
        Transitive bridge column definitions -- derived columns whose values
        are resolved through one or more FK lookup hops.
    propagation_links:
        Mapping from table name to a list of
        ``(column_name, target_def_id, default)`` tuples for PK ``Link``
        columns whose FK constraint was skipped at schema generation time.
    dictionary_name:
        ``data_`` block name from the source dictionary CIF (e.g.
        ``"CIF_CORE"``).  Copied from
        :attr:`~cifflow.dictionary.ddlm_parser.DdlmDictionary.name`.
    dictionary_title:
        Value of ``_dictionary.title`` from the source.  ``None`` if absent.
    dictionary_version:
        Value of ``_dictionary.version`` from the source.  ``None`` if absent.
    dictionary_uri:
        Value of ``_dictionary.uri`` from the source.  ``None`` if absent.
    source_files:
        Absolute file paths of every dictionary file loaded.  Empty when no
        ``path_resolver`` was supplied to
        :class:`~cifflow.dictionary.loader.DictionaryLoader`.
    category_parent:
        Mapping from table name to its parent table name (or ``None`` for
        root categories) in the DDLm category-parent hierarchy.
    tag_to_category_class:
        Mapping from canonical ``_definition.id`` to the DDLm class
        (``"Set"`` or ``"Loop"``) of the category that contains it.
        Covers *all* dictionary items, including those in categories that do
        not generate schema tables.  Used by ``inspect_schema`` to trace
        Set-derived category keys transitively without requiring a live
        dictionary reference.
    deprecated_replacements:
        Mapping from deprecated ``_definition.id`` to the list of replacement
        tag names from ``_definition_replaced.by``.  An empty string in the
        list represents a ``PLACEHOLDER`` (``"."``), meaning deprecated with
        no named replacement.  Covers both deprecated items and deprecated
        categories.
    """

    tables: dict[str, TableDef]
    column_to_tag: dict[tuple[str, str], str]
    alias_to_definition_id: dict[str, str] = field(default_factory=dict)
    deprecated_ids: set[str] = field(default_factory=set)
    warnings: list[str] = field(default_factory=list)
    bridge_columns: list[BridgeColumnDef] = field(default_factory=list)
    propagation_links: dict[str, list[tuple[str, str, str | None]]] = field(default_factory=dict)
    dictionary_name: str | None = None
    dictionary_title: str | None = None
    dictionary_version: str | None = None
    dictionary_uri: str | None = None
    source_files: list[str] = field(default_factory=list)
    category_parent: dict[str, str | None] = field(default_factory=dict)
    tag_to_category_class: dict[str, str] = field(default_factory=dict)
    deprecated_replacements: dict[str, list[str]] = field(default_factory=dict)

    def descendants(self, root: str) -> frozenset[str]:
        """Return all table names that are *root* or a descendant of *root* in the ``category_parent`` hierarchy.

        Returns ``frozenset({root})`` if *root* has no children, or
        ``frozenset()`` if *root* is not in the schema at all.
        """
        if root not in self.tables and root not in self.category_parent.values():
            return frozenset()
        result: set[str] = {root}
        for tbl in self.tables:
            p = self.category_parent.get(tbl)
            while p is not None:
                if p == root:
                    result.add(tbl)
                    break
                p = self.category_parent.get(p)
        return frozenset(result)

descendants(root)

Return all table names that are root or a descendant of root in the category_parent hierarchy.

Returns frozenset({root}) if root has no children, or frozenset() if root is not in the schema at all.

Source code in src/cifflow/dictionary/schema.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def descendants(self, root: str) -> frozenset[str]:
    """Return all table names that are *root* or a descendant of *root* in the ``category_parent`` hierarchy.

    Returns ``frozenset({root})`` if *root* has no children, or
    ``frozenset()`` if *root* is not in the schema at all.
    """
    if root not in self.tables and root not in self.category_parent.values():
        return frozenset()
    result: set[str] = {root}
    for tbl in self.tables:
        p = self.category_parent.get(tbl)
        while p is not None:
            if p == root:
                result.add(tbl)
                break
            p = self.category_parent.get(p)
    return frozenset(result)

generate_schema(dictionary)

Derive a :class:SchemaSpec from a loaded DdlmDictionary.

Iterates over all categories in dictionary, creating one :class:TableDef for each Set or Loop category. Head and Functions categories are silently skipped (they never appear in data instance files); any other unrecognised class emits a warning and is also skipped.

Foreign-key constraints are built in a second pass over all items whose type_purpose is "Link". SU items populate :attr:ColumnDef.linked_item_id but do not produce :class:ForeignKeyDef entries.

alias_to_definition_id and deprecated_ids are copied directly from dictionary so that ingest() can perform alias resolution and deprecation checking without retaining a reference to the dictionary.

Parameters:

Name Type Description Default
dictionary DdlmDictionary

The loaded dictionary returned by :meth:~cifflow.dictionary.loader.DictionaryLoader.load.

required

Returns:

Type Description
SchemaSpec

The complete schema specification including all tables, column definitions, primary keys, foreign keys, the reverse column_to_tag mapping, and alias/deprecation metadata.

Source code in src/cifflow/dictionary/schema.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
def generate_schema(dictionary: DdlmDictionary) -> SchemaSpec:
    """
    Derive a :class:`SchemaSpec` from a loaded ``DdlmDictionary``.

    Iterates over all categories in *dictionary*, creating one
    :class:`TableDef` for each ``Set`` or ``Loop`` category.  ``Head`` and
    ``Functions`` categories are silently skipped (they never appear in data
    instance files); any other unrecognised class emits a warning and is also
    skipped.

    Foreign-key constraints are built in a second pass over all items whose
    ``type_purpose`` is ``"Link"``.  ``SU`` items populate
    :attr:`ColumnDef.linked_item_id` but do not produce
    :class:`ForeignKeyDef` entries.

    ``alias_to_definition_id`` and ``deprecated_ids`` are copied directly from
    *dictionary* so that ``ingest()`` can perform alias resolution and
    deprecation checking without retaining a reference to the dictionary.

    Parameters
    ----------
    dictionary:
        The loaded dictionary returned by
        :meth:`~cifflow.dictionary.loader.DictionaryLoader.load`.

    Returns
    -------
    SchemaSpec
        The complete schema specification including all tables, column
        definitions, primary keys, foreign keys, the reverse
        ``column_to_tag`` mapping, and alias/deprecation metadata.
    """
    warnings: list[str] = []
    tables: dict[str, TableDef] = {}
    column_to_tag: dict[tuple[str, str], str] = {}

    for cat_id, cat_item in dictionary.categories.items():
        cat_class = cat_item.definition_class
        if cat_class not in ('Set', 'Loop'):
            if cat_class not in ('Head', 'Functions'):
                warnings.append(
                    f"category {cat_id!r} has unsupported class {cat_class!r} -- skipped"
                )
            continue

        # Table name is derived from the category's own definition_id.
        tbl_name = _table_name(cat_item.definition_id)

        # Domain items: those whose _name.category_id points to this category.
        domain_items: dict[str, DdlmItem] = {
            item.object_id: item
            for item in dictionary.items.values()
            if item.category_id == cat_item.definition_id
            and item.object_id is not None
        }

        # --- Determine primary key column names ---
        non_synthetic_pks: list[str] = []
        for key_tag in cat_item.category_keys:
            key_item = dictionary.tag_to_item.get(key_tag)
            if key_item is None:
                warnings.append(
                    f"category {cat_id!r}: category key {key_tag!r} not found "
                    f"in dictionary -- skipped"
                )
                continue
            if key_item.object_id is None:
                warnings.append(
                    f"category {cat_id!r}: category key {key_tag!r} has no "
                    f"object_id -- skipped"
                )
                continue
            non_synthetic_pks.append(key_item.object_id)

        use_fallback_pk = not non_synthetic_pks
        if use_fallback_pk:
            if cat_class == 'Set':
                warnings.append(
                    f"category {cat_id!r} (Set) has no _category_key.name -- "
                    f"using _cifflow_id as primary key"
                )
                primary_keys = ['_cifflow_id']
            else:  # Loop
                warnings.append(
                    f"category {cat_id!r} (Loop) has no _category_key.name -- "
                    f"using _cifflow_block_id + _cifflow_row_id as primary key"
                )
                primary_keys = ['_cifflow_block_id', '_cifflow_row_id']
        else:
            primary_keys = list(non_synthetic_pks)

        # --- Build columns in specified order ---
        columns: list[ColumnDef] = []

        # 1. _cifflow_block_id (always first; informational only for keyed tables)
        block_id_is_pk = '_cifflow_block_id' in primary_keys
        columns.append(ColumnDef(
            name='_cifflow_block_id',
            definition_id='',
            type_contents=None,
            type_container=None,
            nullable=False,
            is_primary_key=block_id_is_pk,
            is_synthetic=True,
            linked_item_id=None,
        ))

        # 2. _cifflow_id (keyless Set tables only)
        if use_fallback_pk and cat_class == 'Set':
            columns.append(ColumnDef(
                name='_cifflow_id',
                definition_id='',
                type_contents=None,
                type_container=None,
                nullable=False,
                is_primary_key=True,
                is_synthetic=True,
                linked_item_id=None,
            ))

        # 3. _cifflow_row_id (all Set and Loop tables)
        row_id_is_pk = '_cifflow_row_id' in primary_keys
        columns.append(ColumnDef(
            name='_cifflow_row_id',
            definition_id='',
            type_contents=None,
            type_container=None,
            nullable=False,
            is_primary_key=row_id_is_pk,
            is_synthetic=True,
            linked_item_id=None,
        ))

        # 4. Non-synthetic primary-key columns (in category_keys order)
        for obj_id in non_synthetic_pks:
            item = domain_items.get(obj_id)
            if item is None:
                warnings.append(
                    f"table {tbl_name!r}: primary key column {obj_id!r} not "
                    f"found in category items -- using TEXT"
                )
                col = ColumnDef(
                    name=obj_id,
                    definition_id='',
                    type_contents=None,
                    type_container=None,
                    nullable=False,
                    is_primary_key=True,
                    is_synthetic=False,
                    linked_item_id=None,
                )
            else:
                col = ColumnDef(
                    name=obj_id,
                    definition_id=item.definition_id,
                    type_contents=item.type_contents or 'Text',
                    type_container=item.type_container or 'Single',
                    nullable=False,
                    is_primary_key=True,
                    is_synthetic=False,
                    linked_item_id=item.linked_item_id,
                    enumeration_states=item.enumeration_states,
                    enumeration_range=item.enumeration_range,
                    type_dimension=item.type_dimension,
                    enumeration_default=item.enumeration_default,
                    enumeration_def_index_ids=item.enumeration_def_index_ids,
                    enumeration_defaults=item.enumeration_defaults,
                )
                column_to_tag[(tbl_name, obj_id)] = item.definition_id
            columns.append(col)

        # 5. Remaining domain columns (alphabetically, excluding PKs)
        pk_set = set(non_synthetic_pks)
        for obj_id, item in sorted(domain_items.items()):
            if obj_id in pk_set:
                continue
            col = ColumnDef(
                name=obj_id,
                definition_id=item.definition_id,
                type_contents=item.type_contents or 'Text',
                type_container=item.type_container or 'Single',
                nullable=True,
                is_primary_key=False,
                is_synthetic=False,
                linked_item_id=(
                    item.linked_item_id if item.type_purpose == 'SU' else None
                ),
                enumeration_states=item.enumeration_states,
                enumeration_range=item.enumeration_range,
                type_dimension=item.type_dimension,
                enumeration_default=item.enumeration_default,
                enumeration_def_index_ids=item.enumeration_def_index_ids,
                enumeration_defaults=item.enumeration_defaults,
            )
            columns.append(col)
            column_to_tag[(tbl_name, obj_id)] = item.definition_id

        tables[tbl_name] = TableDef(
            name=tbl_name,
            definition_id=cat_item.definition_id,
            category_class=cat_class,
            columns=columns,
            primary_keys=primary_keys,
            foreign_keys=[],
        )

    # --- Second pass: foreign-key detection ---
    # Collect all Link items grouped by (src_tbl, tgt_tbl).  When multiple
    # source columns all link to columns that together cover the target table's
    # full composite PK, emit one composite FOREIGN KEY constraint.  Single-
    # column FKs targeting a sole PK are handled as the degenerate case.
    #
    # SQLite requires the FK target to have a UNIQUE index.  For a sole-PK
    # table SQLite creates one automatically; for a composite PK it does NOT
    # create per-column UNIQUE indices.  Therefore a valid FK must reference
    # EITHER the sole PK (single-column FK) OR the full composite PK (multi-
    # column FK).  Partial or non-PK references are warned and skipped.

    bridge_columns: list[BridgeColumnDef] = []

    _link_groups: dict[
        tuple[str, str], list[tuple[str, str, DdlmItem]]
    ] = defaultdict(list)   # (src_tbl, tgt_tbl) → [(src_col, tgt_col, item)]

    for item in dictionary.items.values():
        if item.type_purpose != 'Link' or item.linked_item_id is None:
            continue

        target_item = dictionary.tag_to_item.get(item.linked_item_id)
        if target_item is None:
            warnings.append(
                f"FK: linked_item_id {item.linked_item_id!r} for "
                f"{item.definition_id!r} not found in dictionary -- skipped"
            )
            continue

        if item.category_id is None or item.object_id is None:
            continue
        if target_item.category_id is None or target_item.object_id is None:
            continue

        src_tbl = _table_name(item.category_id)
        tgt_tbl = _table_name(target_item.category_id)

        if src_tbl not in tables:
            continue  # source category not schema-generating (Head etc.)
        if tgt_tbl not in tables:
            warnings.append(
                f"FK: target table {tgt_tbl!r} for {item.definition_id!r} "
                f"not in schema -- skipped"
            )
            continue

        # Warn if linked item is not a category key of the target.
        tgt_cat = dictionary.categories.get(target_item.category_id)
        if tgt_cat and item.linked_item_id not in tgt_cat.category_keys:
            warnings.append(
                f"FK: {item.definition_id!r} -> {item.linked_item_id!r}: "
                f"target is not a declared category key of "
                f"{target_item.category_id!r} "
                f"(PKs={sorted(dictionary.tag_to_item[k].object_id for k in tgt_cat.category_keys if k in dictionary.tag_to_item)}) -- attempting FK resolution"
            )

        _link_groups[(src_tbl, tgt_tbl)].append(
            (item.object_id, target_item.object_id, item)
        )

    for (src_tbl, tgt_tbl), pairs in sorted(_link_groups.items()):
        tgt_pks: list[str] = tables[tgt_tbl].primary_keys
        tgt_pks_set = set(tgt_pks)

        # Strip pairs that target non-PK columns and warn about each one.
        # A mixed group must not prevent valid PK-targeting pairs from forming FKs.
        pk_pairs = []
        for src_col, tgt_col, item in pairs:
            if tgt_col not in tgt_pks_set:
                warnings.append(
                    f"FK: {item.definition_id!r} -> {item.linked_item_id!r}: "
                    f"target column '{tgt_col}' is not a PK of "
                    f"'{tgt_tbl}' (PKs={tgt_pks}) -- skipping FK constraint"
                )
            else:
                pk_pairs.append((src_col, tgt_col, item))

        if not pk_pairs:
            continue
        pairs = pk_pairs

        # tgt_col → [src_col, ...]: detect full coverage and duplicate targets
        tgt_to_srcs: dict[str, list[str]] = defaultdict(list)
        for src_col, tgt_col, _ in pairs:
            tgt_to_srcs[tgt_col].append(src_col)

        tgt_cols_covered = set(tgt_to_srcs.keys())
        missing_pk_cols  = tgt_pks_set - tgt_cols_covered
        has_conflicts    = any(len(v) > 1 for v in tgt_to_srcs.values())

        if has_conflicts and not missing_pk_cols:
            # Multiple source columns each independently reference the full PK
            # (e.g. bond.atom_1 and bond.atom_2 both → atom.number).
            # Emit one separate single/composite FK per source column.
            for tgt_col, src_list in tgt_to_srcs.items():
                for src_col in src_list:
                    tables[src_tbl].foreign_keys.append(ForeignKeyDef(
                        source_table=src_tbl,
                        source_columns=[src_col],
                        target_table=tgt_tbl,
                        target_columns=[tgt_col],
                    ))
        elif len(missing_pk_cols) == 1:
            # All covered columns are PKs; exactly one PK column is missing.
            # Sub-case A: the missing column already exists in src_tbl (self-ref
            #   or previously bridged) -- use it directly.
            # Sub-case B: try to derive it via a transitive bridge table.
            [missing_pk_col] = missing_pk_cols
            src_col_names = {c.name for c in tables[src_tbl].columns}
            bridge_col_in_src: str | None = (
                missing_pk_col if missing_pk_col in src_col_names else None
            )

            if bridge_col_in_src is None:
                found = _find_transitive_bridge(
                    src_tbl, tgt_tbl, missing_pk_col,
                    tables, dictionary, _link_groups,
                )
                if found is not None:
                    # found is a list of paths; each path is a list of
                    # (via_col, bridge_tbl, bridge_pk, val_col_or_None) tuples.
                    # Intermediate entries carry None; the last entry carries
                    # the real value column.  Use the first path as primary and
                    # carry the rest as fallback chains so ingest can try them
                    # in order when the primary yields None for a given row.
                    primary = found[0]
                    hops = [(vc, bt, bp) for vc, bt, bp, _ in primary]
                    bridge_val_col = primary[-1][3]
                    fallback_chains = [
                        ([(vc, bt, bp) for vc, bt, bp, _ in alt], alt[-1][3])
                        for alt in found[1:]
                    ]
                    # Add derived column once per (src_tbl, col) pair
                    tables[src_tbl].columns.append(ColumnDef(
                        name=missing_pk_col,
                        definition_id='',
                        type_contents=None,
                        type_container=None,
                        nullable=True,
                        is_primary_key=False,
                        is_synthetic=True,  # transitive bridge -- no CIF tag
                        linked_item_id=None,
                    ))
                    bridge_columns.append(BridgeColumnDef(
                        table_name=src_tbl,
                        column_name=missing_pk_col,
                        hops=hops,
                        bridge_value_column=bridge_val_col,
                        fallback_chains=fallback_chains,
                    ))
                    bridge_col_in_src = missing_pk_col

            if bridge_col_in_src is not None:
                # Emit one composite FK per conflicting src column (or one if
                # no conflicts), with tgt_pks ordering throughout.
                if has_conflicts:
                    for tgt_col, src_list in tgt_to_srcs.items():
                        for src_col in src_list:
                            src_ordered = [
                                src_col if pk == tgt_col else bridge_col_in_src
                                for pk in tgt_pks
                            ]
                            tables[src_tbl].foreign_keys.append(ForeignKeyDef(
                                source_table=src_tbl,
                                source_columns=src_ordered,
                                target_table=tgt_tbl,
                                target_columns=list(tgt_pks),
                            ))
                else:
                    src_ordered = [
                        tgt_to_srcs[pk][0] if pk in tgt_to_srcs else bridge_col_in_src
                        for pk in tgt_pks
                    ]
                    tables[src_tbl].foreign_keys.append(ForeignKeyDef(
                        source_table=src_tbl,
                        source_columns=src_ordered,
                        target_table=tgt_tbl,
                        target_columns=list(tgt_pks),
                    ))
            else:
                # No bridge found -- warn per pair
                for src_col, tgt_col, item in pairs:
                    warnings.append(
                        f"FK: {item.definition_id!r} -> {item.linked_item_id!r}: "
                        f"partial FK to '{tgt_tbl}' -- covers "
                        f"{sorted(tgt_cols_covered)} of PKs={tgt_pks}, "
                        f"no transitive bridge found -- skipping FK constraint"
                    )
        elif missing_pk_cols or has_conflicts:
            # Cannot form a complete, unambiguous (composite) FK.
            # Emit one warning per failing pair so each source item is named.
            for src_col, tgt_col, item in pairs:
                if len(tgt_to_srcs.get(tgt_col, [])) > 1:
                    msg = (
                        f"ambiguous composite FK -- multiple source columns "
                        f"link to '{tgt_tbl}'.'{tgt_col}'"
                    )
                elif len(missing_pk_cols) > 1:
                    msg = (
                        f"partial FK to '{tgt_tbl}' -- covers "
                        f"['{tgt_col}'] of PKs={tgt_pks} "
                        f"({len(missing_pk_cols)} missing PKs, bridge search skipped)"
                    )
                else:
                    msg = (
                        f"partial FK to '{tgt_tbl}' -- covers "
                        f"['{tgt_col}'] of PKs={tgt_pks}"
                    )
                warnings.append(
                    f"FK: {item.definition_id!r} -> {item.linked_item_id!r}: "
                    f"{msg} -- skipping FK constraint"
                )
        else:
            # All PKs covered, no non-PK targets, no duplicate targets.
            # Order source columns to match the target PK column order.
            src_ordered = [tgt_to_srcs[tc][0] for tc in tgt_pks]
            tables[src_tbl].foreign_keys.append(ForeignKeyDef(
                source_table=src_tbl,
                source_columns=src_ordered,
                target_table=tgt_tbl,
                target_columns=list(tgt_pks),
            ))

    # --- Third pass: propagation links ---
    # For every PK column that is a Link item, record the target definition_id
    # so that _apply_fk can still fill the column from the fk_accumulator or
    # loop values even when no formal FK constraint was emitted.
    #
    # Additionally, PK Link columns with skipped FKs are made nullable: the
    # database cannot enforce referential integrity for them, and NULL is the
    # correct representation of an absent/default value.
    propagation_links: dict[str, list[tuple[str, str, str | None]]] = {}
    _seen_prop: set[tuple[str, str]] = set()
    for item in dictionary.items.values():
        if item.type_purpose != 'Link' or item.linked_item_id is None:
            continue
        if item.category_id is None or item.object_id is None:
            continue
        src_tbl = _table_name(item.category_id)
        if src_tbl not in tables:
            continue
        src_col_def = next(
            (c for c in tables[src_tbl].columns if c.name == item.object_id),
            None,
        )
        if src_col_def is None:
            continue
        is_pk = src_col_def.is_primary_key
        # Non-PK items: only include when they carry an enumeration_default that
        # should be applied to absent columns.
        if not is_pk and item.enumeration_default is None:
            continue
        key = (src_tbl, item.object_id)
        if key in _seen_prop:
            continue
        _seen_prop.add(key)
        propagation_links.setdefault(src_tbl, []).append(
            (item.object_id, item.linked_item_id, item.enumeration_default)
        )
        if is_pk:
            # Make PK column nullable: FK was skipped, so NULL is valid here.
            src_col_def.nullable = True

    # Build category parent map: table_name → parent table_name (or None).
    # Used by the output layer for wildcard category expansion.
    category_parent: dict[str, str | None] = {}
    for cat_id, cat_item in dictionary.categories.items():
        if cat_item.definition_class not in ('Set', 'Loop'):
            continue
        tbl_name = _table_name(cat_item.definition_id)
        if tbl_name not in tables:
            continue
        parent_id = cat_item.category_id
        if parent_id:
            parent_tbl = _table_name(parent_id)
            # Exclude self-references (top-level categories often have
            # _name.category_id pointing to themselves).
            category_parent[tbl_name] = (
                parent_tbl if parent_tbl in tables and parent_tbl != tbl_name else None
            )
        else:
            category_parent[tbl_name] = None

    tag_to_category_class: dict[str, str] = {}
    deprecated_replacements: dict[str, list[str]] = {}
    for defn_id, item in dictionary.tag_to_item.items():
        if item.category_id:
            cat = dictionary.categories.get(item.category_id)
            if cat and cat.definition_class in ('Set', 'Loop'):
                tag_to_category_class[defn_id] = cat.definition_class
        if item.is_deprecated:
            deprecated_replacements[defn_id] = item.replaced_by

    return SchemaSpec(
        tables=tables,
        column_to_tag=column_to_tag,
        alias_to_definition_id=dict(dictionary.alias_to_definition_id),
        deprecated_ids=set(dictionary.deprecated_ids),
        warnings=warnings,
        bridge_columns=bridge_columns,
        propagation_links=propagation_links,
        dictionary_name=dictionary.name or None,
        dictionary_title=dictionary.title or None,
        dictionary_version=dictionary.version or None,
        dictionary_uri=dictionary.uri or None,
        source_files=list(dictionary.source_files),
        category_parent=category_parent,
        tag_to_category_class=tag_to_category_class,
        deprecated_replacements=deprecated_replacements,
    )

emit_create_statements(schema)

Render each :class:TableDef in schema as a CREATE TABLE statement.

Returns one SQL string per table in topological order (FK parents before children). The statements use CREATE TABLE IF NOT EXISTS and include inline PRIMARY KEY and FOREIGN KEY clauses. All FK constraints carry DEFERRABLE INITIALLY DEFERRED.

All value columns are declared TEXT regardless of ColumnDef.type_contents; _cifflow_row_id is always INTEGER.

Parameters:

Name Type Description Default
schema SchemaSpec

The schema specification produced by :func:generate_schema.

required

Returns:

Type Description
list[str]

One CREATE TABLE IF NOT EXISTS ... statement per table.

Source code in src/cifflow/dictionary/schema.py
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
def emit_create_statements(schema: SchemaSpec) -> list[str]:
    """
    Render each :class:`TableDef` in *schema* as a ``CREATE TABLE`` statement.

    Returns one SQL string per table in topological order (FK parents before
    children).  The statements use ``CREATE TABLE IF NOT EXISTS`` and include
    inline ``PRIMARY KEY`` and ``FOREIGN KEY`` clauses.  All FK constraints
    carry ``DEFERRABLE INITIALLY DEFERRED``.

    All value columns are declared ``TEXT`` regardless of
    ``ColumnDef.type_contents``; ``_cifflow_row_id`` is always ``INTEGER``.

    Parameters
    ----------
    schema:
        The schema specification produced by :func:`generate_schema`.

    Returns
    -------
    list[str]
        One ``CREATE TABLE IF NOT EXISTS ...`` statement per table.
    """
    stmts: list[str] = []

    for table in _topo_sort_tables(schema.tables):
        parts: list[str] = []

        row_id_col = next((c for c in table.columns if c.name == '_cifflow_row_id'), None)
        for col in table.columns:
            line = f"    {_qi(col.name)}  {_ddl_type(col)}"
            if not col.nullable:
                line += "  NOT NULL"
            parts.append(line)

        pk_clause = ', '.join(_qi(k) for k in table.primary_keys)
        parts.append(f"    PRIMARY KEY ({pk_clause})")

        # Composite UNIQUE on (_cifflow_block_id, _cifflow_row_id) when _cifflow_row_id is not already
        # part of the PRIMARY KEY.
        if row_id_col is not None and not row_id_col.is_primary_key:
            parts.append(
                f"    UNIQUE ({_qi('_cifflow_block_id')}, {_qi('_cifflow_row_id')})"
            )

        for fk in table.foreign_keys:
            src_cols = ', '.join(_qi(c) for c in fk.source_columns)
            tgt_cols = ', '.join(_qi(c) for c in fk.target_columns)
            parts.append(
                f"    FOREIGN KEY ({src_cols})\n"
                f"        REFERENCES {_qi(fk.target_table)}({tgt_cols})\n"
                f"        DEFERRABLE INITIALLY DEFERRED"
            )

        body = ',\n'.join(parts)
        stmts.append(
            f"CREATE TABLE IF NOT EXISTS {_qi(table.name)} (\n{body}\n)"
        )

    return stmts

emit_fallback_create_statements()

Return the fixed DDL statements for the schema-less fallback tier.

Returns four SQL strings: CREATE TABLE IF NOT EXISTS for _cif_fallback, its lookup index, CREATE TABLE IF NOT EXISTS for _block_dataset_membership, and CREATE TABLE IF NOT EXISTS for _validation_result.

Source code in src/cifflow/dictionary/schema.py
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
def emit_fallback_create_statements() -> list[str]:
    """
    Return the fixed DDL statements for the schema-less fallback tier.

    Returns four SQL strings: ``CREATE TABLE IF NOT EXISTS`` for
    ``_cif_fallback``, its lookup index, ``CREATE TABLE IF NOT EXISTS`` for
    ``_block_dataset_membership``, and ``CREATE TABLE IF NOT EXISTS`` for
    ``_validation_result``.
    """
    fallback = (
        f"CREATE TABLE IF NOT EXISTS {_qi('_cif_fallback')} (\n"
        f"    {_qi('_cifflow_block_id')}   TEXT     NOT NULL,\n"
        f"    {_qi('_cifflow_row_id')}     INTEGER  NOT NULL,\n"
        f"    {_qi('tag')}         TEXT     NOT NULL,\n"
        f"    {_qi('value')}       TEXT,\n"
        f"    {_qi('value_type')}  TEXT     NOT NULL,\n"
        f"    {_qi('loop_id')}     INTEGER,\n"
        f"    {_qi('col_index')}   INTEGER,\n"
        f"    {_qi('ref_table')}   TEXT,\n"
        f"    PRIMARY KEY ({_qi('_cifflow_block_id')}, {_qi('_cifflow_row_id')}, {_qi('tag')})\n"
        f")"
    )
    index = (
        f"CREATE INDEX IF NOT EXISTS {_qi('idx_cif_fallback_tag_block')} "
        f"ON {_qi('_cif_fallback')} ({_qi('tag')}, {_qi('_cifflow_block_id')})"
    )
    membership = (
        f"CREATE TABLE IF NOT EXISTS {_qi('_block_dataset_membership')} (\n"
        f"    {_qi('_cifflow_block_id')}            TEXT  NOT NULL,\n"
        f"    {_qi('_audit_dataset_id')}    TEXT  NOT NULL,\n"
        f"    {_qi('id_regime')}            TEXT  NOT NULL,\n"
        f"    PRIMARY KEY ({_qi('_cifflow_block_id')}, {_qi('_audit_dataset_id')})\n"
        f")"
    )
    validation = (
        f"CREATE TABLE IF NOT EXISTS {_qi('_validation_result')} (\n"
        f"    {_qi('check_name')}  TEXT  NOT NULL,\n"
        f"    {_qi('severity')}    TEXT  NOT NULL,\n"
        f"    {_qi('block_id')}    TEXT,\n"
        f"    {_qi('detail')}      TEXT,\n"
        f"    {_qi('id_regime')}   TEXT\n"
        f")"
    )
    block_order = (
        f"CREATE TABLE IF NOT EXISTS {_qi('_block_order')} (\n"
        f"    {_qi('_cifflow_block_id')}   TEXT     PRIMARY KEY,\n"
        f"    {_qi('position')}    INTEGER  NOT NULL\n"
        f")"
    )
    tag_presence = (
        f"CREATE TABLE IF NOT EXISTS {_qi('_tag_presence')} (\n"
        f"    {_qi('_cifflow_block_id')}     TEXT  NOT NULL,\n"
        f"    {_qi('table_name')}    TEXT  NOT NULL,\n"
        f"    {_qi('column_name')}   TEXT  NOT NULL,\n"
        f"    {_qi('pk_json')}       TEXT  NOT NULL,\n"
        f"    PRIMARY KEY ({_qi('_cifflow_block_id')}, {_qi('table_name')}, {_qi('column_name')}, {_qi('pk_json')})\n"
        f")"
    )
    return [fallback, index, membership, validation, block_order, tag_presence]

cifflow.dictionary.resolver

Tag resolution — maps a CIF tag name to its current DDLm definition.

ResolvedTag dataclass

Result of resolving a tag name against a loaded DDLm dictionary.

Produced by :func:resolve_tag.

Attributes:

Name Type Description
definition_id str

The current canonical tag name (_definition.id), lowercased.

category_id str

The SQL table name for this definition (_name.category_id), lowercased.

object_id str

The SQL column name for this definition (_name.object_id), lowercased.

was_alias bool

True if the input tag was an old alias that maps to definition_id; False if it matched the canonical name directly.

is_deprecated bool

True if this definition has been superseded by one or more replacements (_definition_replaced records exist).

Source code in src/cifflow/dictionary/resolver.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@dataclass
class ResolvedTag:
    """
    Result of resolving a tag name against a loaded DDLm dictionary.

    Produced by :func:`resolve_tag`.

    Attributes
    ----------
    definition_id:
        The current canonical tag name (``_definition.id``), lowercased.
    category_id:
        The SQL table name for this definition (``_name.category_id``),
        lowercased.
    object_id:
        The SQL column name for this definition (``_name.object_id``),
        lowercased.
    was_alias:
        ``True`` if the input tag was an old alias that maps to
        *definition_id*; ``False`` if it matched the canonical name directly.
    is_deprecated:
        ``True`` if this definition has been superseded by one or more
        replacements (``_definition_replaced`` records exist).
    """

    definition_id: str
    category_id: str
    object_id: str
    was_alias: bool
    is_deprecated: bool

resolve_tag(tag, dictionary)

Resolve a tag name from a CIF data file to its current definition.

Looks up tag (case-insensitive) in dictionary.tag_to_item, following alias chains transparently. Returns None if the tag is not known to this dictionary; this is the signal that the tag belongs to the fallback tier, not an error condition.

Does not emit warnings. The caller is responsible for acting on the was_alias and is_deprecated flags of the returned value.

Parameters:

Name Type Description Default
tag str

The tag name to resolve, as it appears in a CIF data file. Lookup is case-insensitive.

required
dictionary DdlmDictionary

The loaded DdlmDictionary to resolve against.

required

Returns:

Type Description
ResolvedTag | None

Resolution result including canonical name, table, column, and alias/deprecation flags; None if the tag is not known to this dictionary.

Source code in src/cifflow/dictionary/resolver.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def resolve_tag(tag: str, dictionary: DdlmDictionary) -> ResolvedTag | None:
    """
    Resolve a tag name from a CIF data file to its current definition.

    Looks up *tag* (case-insensitive) in ``dictionary.tag_to_item``,
    following alias chains transparently.  Returns ``None`` if the tag is
    not known to this dictionary; this is the signal that the tag belongs
    to the fallback tier, not an error condition.

    Does not emit warnings.  The caller is responsible for acting on the
    ``was_alias`` and ``is_deprecated`` flags of the returned value.

    Parameters
    ----------
    tag:
        The tag name to resolve, as it appears in a CIF data file.
        Lookup is case-insensitive.
    dictionary:
        The loaded ``DdlmDictionary`` to resolve against.

    Returns
    -------
    ResolvedTag | None
        Resolution result including canonical name, table, column, and
        alias/deprecation flags; ``None`` if the tag is not known to this
        dictionary.
    """
    item = dictionary.tag_to_item.get(tag.lower())
    if item is None:
        return None

    was_alias = tag.lower() in dictionary.alias_to_definition_id

    if item.category_id is None or item.object_id is None:
        return None

    return ResolvedTag(
        definition_id=item.definition_id,
        category_id=item.category_id,
        object_id=item.object_id,
        was_alias=was_alias,
        is_deprecated=item.is_deprecated,
    )

cifflow.dictionary.cache

JSON serialisation and deserialisation of DdlmDictionary.

Allows a fully resolved dictionary (including metadictionary imports) to be saved to disk and reloaded without re-parsing constituent CIF files.

Cache invalidation is the caller's responsibility. These functions make no attempt to detect whether the source dictionary files have changed.

save_dictionary(dictionary, path)

Serialise dictionary to a JSON file at path.

The file is written atomically from the perspective of a single process (standard open + json.dump). Existing files are overwritten.

tag_to_item is stored as a dict[str, str] mapping (tag name → definition_id) to avoid duplicating DdlmItem objects for every alias. It is reconstructed on load.

Parameters:

Name Type Description Default
dictionary DdlmDictionary

The DdlmDictionary to serialise.

required
path str | Path

Destination file path. Parent directories must already exist.

required
Source code in src/cifflow/dictionary/cache.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def save_dictionary(
    dictionary: DdlmDictionary,
    path: str | pathlib.Path,
) -> None:
    """
    Serialise *dictionary* to a JSON file at *path*.

    The file is written atomically from the perspective of a single process
    (standard ``open`` + ``json.dump``).  Existing files are overwritten.

    ``tag_to_item`` is stored as a ``dict[str, str]`` mapping (tag name →
    ``definition_id``) to avoid duplicating ``DdlmItem`` objects for every
    alias.  It is reconstructed on load.

    Parameters
    ----------
    dictionary:
        The ``DdlmDictionary`` to serialise.
    path:
        Destination file path.  Parent directories must already exist.
    """
    data = {
        'name': dictionary.name,
        'title': dictionary.title,
        'version': dictionary.version,
        'uri': dictionary.uri,
        'categories': {
            k: dataclasses.asdict(v)
            for k, v in dictionary.categories.items()
        },
        'items': {
            k: dataclasses.asdict(v)
            for k, v in dictionary.items.items()
        },
        # Store as tag → definition_id to avoid duplicating DdlmItem objects.
        'tag_to_item': {
            tag: item.definition_id
            for tag, item in dictionary.tag_to_item.items()
        },
        'alias_to_definition_id': dictionary.alias_to_definition_id,
        'deprecated_ids': sorted(dictionary.deprecated_ids),
        'warnings': dictionary.warnings,
        'source_files': dictionary.source_files,
    }
    with open(path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

load_dictionary(path)

Deserialise a DdlmDictionary from a JSON file at path.

Raises ValueError if the file does not exist, contains malformed JSON, is missing required keys, or references an unknown definition_id in tag_to_item. The caller should respond by falling back to DictionaryLoader.load().

Parameters:

Name Type Description Default
path str | Path

Path to a JSON file previously written by :func:save_dictionary.

required

Returns:

Type Description
DdlmDictionary

The deserialised dictionary.

Raises:

Type Description
ValueError

If the file cannot be read or the contents are invalid.

Source code in src/cifflow/dictionary/cache.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def load_dictionary(path: str | pathlib.Path) -> DdlmDictionary:
    """
    Deserialise a ``DdlmDictionary`` from a JSON file at *path*.

    Raises ``ValueError`` if the file does not exist, contains malformed JSON,
    is missing required keys, or references an unknown ``definition_id`` in
    ``tag_to_item``.  The caller should respond by falling back to
    ``DictionaryLoader.load()``.

    Parameters
    ----------
    path:
        Path to a JSON file previously written by :func:`save_dictionary`.

    Returns
    -------
    DdlmDictionary
        The deserialised dictionary.

    Raises
    ------
    ValueError
        If the file cannot be read or the contents are invalid.
    """
    try:
        with open(path, 'r', encoding='utf-8') as f:
            data = json.load(f)
    except FileNotFoundError:
        raise ValueError(f'dictionary cache file not found: {path}')
    except json.JSONDecodeError as e:
        raise ValueError(f'malformed JSON in dictionary cache {path}: {e}')

    try:
        categories = {
            k: DdlmItem(**v) for k, v in data['categories'].items()
        }
        items = {
            k: DdlmItem(**v) for k, v in data['items'].items()
        }
    except (KeyError, TypeError) as e:
        raise ValueError(f'invalid dictionary cache structure in {path}: {e}')

    # Reconstruct tag_to_item from the stored tag → definition_id mapping.
    all_by_id: dict[str, DdlmItem] = {**categories, **items}
    tag_to_item: dict[str, DdlmItem] = {}
    try:
        for tag, def_id in data['tag_to_item'].items():
            if def_id not in all_by_id:
                raise ValueError(
                    f'tag_to_item entry {tag!r} references unknown '
                    f'definition_id {def_id!r} in {path}'
                )
            tag_to_item[tag] = all_by_id[def_id]
    except (KeyError, TypeError) as e:
        raise ValueError(f'invalid tag_to_item in dictionary cache {path}: {e}')

    try:
        return DdlmDictionary(
            name=data['name'],
            title=data['title'],
            version=data['version'],
            categories=categories,
            items=items,
            tag_to_item=tag_to_item,
            alias_to_definition_id=data['alias_to_definition_id'],
            deprecated_ids=set(data['deprecated_ids']),
            warnings=data['warnings'],
            source_files=data.get('source_files', []),
            uri=data.get('uri'),
        )
    except (KeyError, TypeError) as e:
        raise ValueError(f'invalid dictionary cache structure in {path}: {e}')

cifflow.dictionary.visualise

Schema visualisation: DOT and HTML output for SchemaSpec.

Public functions

visualise_schema(schema, ...) -> Graphviz DOT string visualise_schema_html(schema, ...) -> self-contained HTML string

visualise_schema(schema, *, show_columns='sparse', show_bridge=True, show_parent_edges=True, highlight_orphans=True, highlight_components=False, show_orphans=True, show_legend=True, concentrate=False, hide_deprecated=False, layout='dot', splines='curved', ranksep=1.0, nodesep=0.4)

Return a Graphviz DOT string visualising schema.

Parameters:

Name Type Description Default
schema SchemaSpec

The :class:~cifflow.dictionary.schema.SchemaSpec to visualise.

required
show_columns Literal['all', 'sparse', 'none']

'all' — every column; 'sparse' — only PK and key/bridge columns; 'none' — header only.

'sparse'
show_bridge bool

Include bridge column edges. Always True for [BRIDGE ONLY] nodes.

True
show_parent_edges bool

Include category-parent hierarchy edges. Always True when the target is a ghost node.

True
highlight_orphans bool

Apply [ORPHAN] / [BRIDGE ONLY] badges and border styles.

True
highlight_components bool

Wrap each connected component in a subgraph cluster_ box.

False
show_orphans bool

When False, [ORPHAN] and [BRIDGE ONLY] nodes (and their edges) are omitted entirely.

True
show_legend bool

When True (default), emit a __legend__ node summarising node colours, connectivity badges, edge styles, and column badges. The content of the legend adapts to the active flags.

True
concentrate bool

When True, set concentrate=true in the graph attributes. Graphviz merges parallel edges that share a common endpoint into a shared spine, reducing visual clutter in dense schemas.

False
hide_deprecated bool

When True, deprecated columns (those whose definition_id appears in schema.deprecated_ids) are omitted from column rows. Any table where every non-synthetic column is deprecated is removed from the graph entirely — no node, no ghost, no edges.

False
layout str

Graphviz layout engine written into graph [layout=...]. viz.js reads this attribute automatically.

'dot'
splines str

Graphviz splines attribute controlling edge routing. 'curved' (default) draws smooth distinct arcs and handles edge labels correctly, including edges that run backwards in the layout. 'ortho' routes edges as right-angle lines but has known issues with label placement and backwards edges. Other values: 'polyline', 'spline', 'none'.

'curved'
ranksep float

Minimum separation in inches between ranks (layout rows/columns). Larger values spread the graph out vertically (or horizontally with rankdir=LR) and give edge routing more room. Default 1.0.

1.0
nodesep float

Minimum separation in inches between adjacent nodes in the same rank. Default 0.4.

0.4

Returns:

Type Description
str

A Graphviz DOT source string ready to pass to a Graphviz renderer or embed in an HTML page via viz.js.

Source code in src/cifflow/dictionary/visualise.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
def visualise_schema(
    schema: SchemaSpec,
    *,
    show_columns: Literal['all', 'sparse', 'none'] = 'sparse',
    show_bridge: bool = True,
    show_parent_edges: bool = True,
    highlight_orphans: bool = True,
    highlight_components: bool = False,
    show_orphans: bool = True,
    show_legend: bool = True,
    concentrate: bool = False,
    hide_deprecated: bool = False,
    layout: str = 'dot',
    splines: str = 'curved',
    ranksep: float = 1.0,
    nodesep: float = 0.4,
) -> str:
    """
    Return a Graphviz DOT string visualising *schema*.

    Parameters
    ----------
    schema:
        The :class:`~cifflow.dictionary.schema.SchemaSpec` to visualise.
    show_columns:
        ``'all'`` — every column; ``'sparse'`` — only PK and key/bridge columns;
        ``'none'`` — header only.
    show_bridge:
        Include bridge column edges.  Always ``True`` for ``[BRIDGE ONLY]`` nodes.
    show_parent_edges:
        Include category-parent hierarchy edges.  Always ``True`` when the target
        is a ghost node.
    highlight_orphans:
        Apply ``[ORPHAN]`` / ``[BRIDGE ONLY]`` badges and border styles.
    highlight_components:
        Wrap each connected component in a ``subgraph cluster_`` box.
    show_orphans:
        When ``False``, ``[ORPHAN]`` and ``[BRIDGE ONLY]`` nodes (and their edges)
        are omitted entirely.
    show_legend:
        When ``True`` (default), emit a ``__legend__`` node summarising node
        colours, connectivity badges, edge styles, and column badges.  The
        content of the legend adapts to the active flags.
    concentrate:
        When ``True``, set ``concentrate=true`` in the graph attributes.
        Graphviz merges parallel edges that share a common endpoint into a
        shared spine, reducing visual clutter in dense schemas.
    hide_deprecated:
        When ``True``, deprecated columns (those whose ``definition_id``
        appears in ``schema.deprecated_ids``) are omitted from column rows.
        Any table where every non-synthetic column is deprecated is removed
        from the graph entirely — no node, no ghost, no edges.
    layout:
        Graphviz layout engine written into ``graph [layout=...]``.  viz.js
        reads this attribute automatically.
    splines:
        Graphviz ``splines`` attribute controlling edge routing.  ``'curved'``
        (default) draws smooth distinct arcs and handles edge labels correctly,
        including edges that run backwards in the layout.  ``'ortho'`` routes
        edges as right-angle lines but has known issues with label placement
        and backwards edges.  Other values: ``'polyline'``, ``'spline'``,
        ``'none'``.
    ranksep:
        Minimum separation in inches between ranks (layout rows/columns).
        Larger values spread the graph out vertically (or horizontally with
        ``rankdir=LR``) and give edge routing more room.  Default ``1.0``.
    nodesep:
        Minimum separation in inches between adjacent nodes in the same rank.
        Default ``0.4``.

    Returns
    -------
    str
        A Graphviz DOT source string ready to pass to a Graphviz renderer or
        embed in an HTML page via viz.js.
    """
    ghost_tables = _collect_ghost_tables(schema)
    bridge_only, orphans, pass1_components = _classify_tables(schema)

    # Deprecated filtering
    deprecated_ids: frozenset[str] = (
        frozenset(schema.deprecated_ids) if hide_deprecated else frozenset()
    )
    hidden_deprecated: set[str] = (
        _deprecated_table_names(schema) if hide_deprecated else set()
    )

    # Determine which real tables to emit
    if show_orphans:
        real_tables = set(schema.tables) - hidden_deprecated
    else:
        real_tables = set(schema.tables) - bridge_only - orphans - hidden_deprecated

    # Ghost tables must not include tables we deliberately hid as deprecated
    ghost_tables -= hidden_deprecated

    concentrate_attr = ' concentrate=true' if concentrate else ''
    lines: list[str] = [
        'digraph schema {',
        f'    graph [rankdir=LR layout="{_escape(layout)}" splines="{_escape(splines)}"'
        f' ranksep={ranksep} nodesep={nodesep}'
        f' fontname="Helvetica" fontsize=11{concentrate_attr}]',
        '    node  [fontname="Helvetica" fontsize=10]',
        '    edge  [fontname="Helvetica" fontsize=9 decorate=true]',
        '',
    ]

    # --- Connectivity lookup ---
    def _connectivity(name: str) -> str:
        if name in bridge_only:
            return 'bridge_only'
        if name in orphans:
            return 'orphan'
        return 'connected'

    # --- Ghost nodes ---
    if ghost_tables:
        for ghost in sorted(ghost_tables):
            lines += _ghost_node_dot(ghost)
        lines.append('')

    # --- Real table nodes (possibly clustered) ---
    if highlight_components:
        # Sort components by their representative name for stability
        sorted_components = sorted(pass1_components, key=_component_label)

        # Collect component nodes (only real, non-orphan/bridge tables)
        real_structural_components = [
            c for c in sorted_components if len(c) >= 2 and c.issubset(real_tables)
        ]
        # Partial components (some members hidden by show_orphans=False)
        partial_structural_components = [
            c for c in sorted_components
            if len(c) >= 2 and not c.issubset(real_tables) and any(t in real_tables for t in c)
        ]
        # Include partially-visible components too
        all_structural = real_structural_components + partial_structural_components

        for i, component in enumerate(all_structural):
            visible_members = sorted(t for t in component if t in real_tables)
            if not visible_members:
                continue
            rep = _component_label(component)
            lines.append(f'    subgraph cluster_{i} {{')
            lines.append(f'        label="{_escape(rep)}" style=filled fillcolor="#f5f5f5"')
            for tbl_name in visible_members:
                tbl = schema.tables[tbl_name]
                for node_line in _table_node_dot(tbl, _connectivity(tbl_name), highlight_orphans, show_columns, schema, deprecated_ids):
                    lines.append('    ' + node_line)
            lines.append('    }')
            lines.append('')

        # Orphans cluster
        visible_orphans = sorted(orphans & real_tables)
        visible_bridge_only = sorted(bridge_only & real_tables)
        if visible_orphans or visible_bridge_only:
            lines.append('    subgraph cluster_orphans {')
            lines.append('        label="Isolated tables" style=filled fillcolor="#fff8f8"')
            for tbl_name in visible_orphans + visible_bridge_only:
                if tbl_name not in real_tables:
                    continue
                tbl = schema.tables[tbl_name]
                for node_line in _table_node_dot(tbl, _connectivity(tbl_name), highlight_orphans, show_columns, schema, deprecated_ids):
                    lines.append('    ' + node_line)
            lines.append('    }')
            lines.append('')

        # Ghost node cluster
        if ghost_tables:
            lines.append('    subgraph cluster_missing {')
            lines.append('        label="Missing tables" style=filled fillcolor="#ffe8e8"')
            for ghost in sorted(ghost_tables):
                for node_line in _ghost_node_dot(ghost):
                    lines.append('    ' + node_line)
            lines.append('    }')
            lines.append('')

        # Singleton real-table nodes not yet placed
        placed = set()
        for c in all_structural:
            placed.update(c)
        placed.update(orphans)
        placed.update(bridge_only)
        for tbl_name in sorted(real_tables - placed):
            tbl = schema.tables[tbl_name]
            lines += _table_node_dot(tbl, _connectivity(tbl_name), highlight_orphans, show_columns, schema, deprecated_ids)
            lines.append('')
    else:
        for tbl_name in sorted(real_tables):
            tbl = schema.tables[tbl_name]
            lines += _table_node_dot(tbl, _connectivity(tbl_name), highlight_orphans, show_columns, schema, deprecated_ids)
            lines.append('')

    # --- Legend node ---
    if show_legend:
        lines += _legend_dot(highlight_orphans, show_bridge, show_parent_edges, show_columns)
        lines.append('')

    # --- Edges ---
    lines.append('')

    # FK edges
    for tbl_name in sorted(real_tables):
        tbl = schema.tables[tbl_name]
        vis_cols = _visible_columns(tbl, schema, show_columns, deprecated_ids)
        for fk in tbl.foreign_keys:
            target = fk.target_table
            # Skip if target is a real table that's been hidden
            if target not in ghost_tables and target not in real_tables:
                continue
            label = _fk_label(fk, vis_cols, show_columns)
            attr = f' [label="{label}"]' if label else ''
            lines.append(f'    {_dot_id(fk.source_table)} -> {_dot_id(target)}{attr}')

    # Bridge edges
    bridge_col_by_table: dict[str, list[BridgeColumnDef]] = {}
    for bc in schema.bridge_columns:
        bridge_col_by_table.setdefault(bc.table_name, []).append(bc)

    for tbl_name in sorted(real_tables):
        if tbl_name not in bridge_col_by_table:
            continue
        is_bridge_only_node = tbl_name in bridge_only
        for bc in bridge_col_by_table[tbl_name]:
            bridge_target = bc.bridge_table
            target_is_ghost = bridge_target in ghost_tables
            target_in_real = bridge_target in real_tables
            if not target_is_ghost and not target_in_real:
                continue
            # Show bridge edge if: show_bridge is True, OR the node is bridge_only, OR target is ghost
            if show_bridge or is_bridge_only_node or target_is_ghost:
                label = _escape(f'{bc.column_name} via {bc.via_column}')
                lines.append(
                    f'    {_dot_id(tbl_name)} -> {_dot_id(bridge_target)}'
                    f' [label="{label}" style=dashed color="#888888"]'
                )

    # Parent-hierarchy edges
    for child, parent in sorted(schema.category_parent.items()):
        if not parent:
            continue
        child_in_real = child in real_tables
        parent_is_ghost = parent in ghost_tables
        parent_in_real = parent in real_tables
        if not child_in_real:
            continue
        if not parent_is_ghost and not parent_in_real:
            continue
        # Show parent edge if: show_parent_edges is True, OR target is ghost
        if show_parent_edges or parent_is_ghost:
            lines.append(
                f'    {_dot_id(child)} -> {_dot_id(parent)}'
                f' [style=dotted arrowhead=open color="#aaaaaa"]'
            )

    lines.append('}')
    return '\n'.join(lines)

visualise_schema_html(schema, *, title=None, show_columns='sparse', show_bridge=True, show_parent_edges=True, highlight_orphans=True, highlight_components=False, show_orphans=True, show_legend=True, concentrate=False, hide_deprecated=False, layout='dot', splines='curved', ranksep=1.0, nodesep=0.4)

Return a self-contained HTML string that renders schema interactively.

All keyword arguments except title are forwarded to :func:visualise_schema. The returned HTML embeds viz.js and svg-pan-zoom as inline <script> blocks — no network access is required.

Parameters:

Name Type Description Default
schema SchemaSpec

The :class:~cifflow.dictionary.schema.SchemaSpec to render.

required
title str | None

<title> element text. Defaults to schema.dictionary_name or 'Schema' when not given.

None
show_columns Literal['all', 'sparse', 'none']

'all', 'sparse', or 'none'. Forwarded to :func:visualise_schema.

'sparse'
show_bridge bool

Forwarded to :func:visualise_schema.

True
show_parent_edges bool

Forwarded to :func:visualise_schema.

True
highlight_orphans bool

Forwarded to :func:visualise_schema.

True
highlight_components bool

Forwarded to :func:visualise_schema.

False
show_orphans bool

Forwarded to :func:visualise_schema.

True
show_legend bool

Forwarded to :func:visualise_schema.

True
concentrate bool

Forwarded to :func:visualise_schema.

False
hide_deprecated bool

Forwarded to :func:visualise_schema.

False
layout str

Forwarded to :func:visualise_schema.

'dot'
splines str

Forwarded to :func:visualise_schema.

'curved'
ranksep float

Forwarded to :func:visualise_schema.

1.0
nodesep float

Forwarded to :func:visualise_schema.

0.4

Returns:

Type Description
str

A self-contained HTML document with the schema rendered as an interactive SVG via viz.js and svg-pan-zoom.

Source code in src/cifflow/dictionary/visualise.py
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
def visualise_schema_html(
    schema: SchemaSpec,
    *,
    title: str | None = None,
    show_columns: Literal['all', 'sparse', 'none'] = 'sparse',
    show_bridge: bool = True,
    show_parent_edges: bool = True,
    highlight_orphans: bool = True,
    highlight_components: bool = False,
    show_orphans: bool = True,
    show_legend: bool = True,
    concentrate: bool = False,
    hide_deprecated: bool = False,
    layout: str = 'dot',
    splines: str = 'curved',
    ranksep: float = 1.0,
    nodesep: float = 0.4,
) -> str:
    """
    Return a self-contained HTML string that renders *schema* interactively.

    All keyword arguments except *title* are forwarded to
    :func:`visualise_schema`.  The returned HTML embeds viz.js and svg-pan-zoom
    as inline ``<script>`` blocks — no network access is required.

    Parameters
    ----------
    schema
        The :class:`~cifflow.dictionary.schema.SchemaSpec` to render.
    title
        ``<title>`` element text.  Defaults to ``schema.dictionary_name``
        or ``'Schema'`` when not given.
    show_columns
        ``'all'``, ``'sparse'``, or ``'none'``.  Forwarded to :func:`visualise_schema`.
    show_bridge
        Forwarded to :func:`visualise_schema`.
    show_parent_edges
        Forwarded to :func:`visualise_schema`.
    highlight_orphans
        Forwarded to :func:`visualise_schema`.
    highlight_components
        Forwarded to :func:`visualise_schema`.
    show_orphans
        Forwarded to :func:`visualise_schema`.
    show_legend
        Forwarded to :func:`visualise_schema`.
    concentrate
        Forwarded to :func:`visualise_schema`.
    hide_deprecated
        Forwarded to :func:`visualise_schema`.
    layout
        Forwarded to :func:`visualise_schema`.
    splines
        Forwarded to :func:`visualise_schema`.
    ranksep
        Forwarded to :func:`visualise_schema`.
    nodesep
        Forwarded to :func:`visualise_schema`.

    Returns
    -------
    str
        A self-contained HTML document with the schema rendered as an
        interactive SVG via viz.js and svg-pan-zoom.
    """
    dot_string = visualise_schema(
        schema,
        show_columns=show_columns,
        show_bridge=show_bridge,
        show_parent_edges=show_parent_edges,
        highlight_orphans=highlight_orphans,
        highlight_components=highlight_components,
        show_orphans=show_orphans,
        show_legend=show_legend,
        concentrate=concentrate,
        hide_deprecated=hide_deprecated,
        layout=layout,
        splines=splines,
        ranksep=ranksep,
        nodesep=nodesep,
    )

    page_title = title or schema.dictionary_name or 'Schema'
    page_title_escaped = html.escape(page_title)

    viz_js = _read_js('viz.js')
    full_render_js = _read_js('full.render.js')
    svg_pan_zoom_js = _read_js('svg-pan-zoom.min.js')

    # Escape DOT string for embedding in a JS template literal
    dot_escaped = dot_string.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${')

    return f"""<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <title>{page_title_escaped}</title>
  <script>{viz_js}</script>
  <script>{full_render_js}</script>
  <script>{svg_pan_zoom_js}</script>
  <style>
    body {{ margin: 0; background: #fafafa; }}
    #graph {{ width: 100vw; height: 100vh; overflow: hidden; }}
    #graph svg {{ width: 100%; height: 100%; }}
  </style>
</head>
<body>
  <div id="graph"></div>
  <script>
    const dot = `{dot_escaped}`;
    new Viz().renderSVGElement(dot).then(svg => {{
      document.getElementById('graph').appendChild(svg);
      svgPanZoom(svg, {{ zoomEnabled: true, controlIconsEnabled: true, fit: true, center: true }});
    }}).catch(err => {{
      document.getElementById('graph').textContent = 'Render error: ' + err;
    }});
  </script>
</body>
</html>"""