Extract, Cognify, Load
The five-task pipeline that turns text into a knowledge graph — and the two-schema problem that has to be solved in between.
# From cognee/api/v1/cognify/cognify.py, lines 290-353
default_tasks = [
Task(classify_documents),
Task(extract_chunks_from_documents,
max_chunk_size=chunk_size or get_max_chunk_tokens(),
chunker=chunker),
Task(extract_graph_and_summarize,
graph_model, config, custom_prompt,
task_config={"batch_size": chunks_per_batch}),
Task(add_data_points,
embed_triplets=embed_triplets,
task_config={"batch_size": chunks_per_batch}),
Task(extract_dlt_fk_edges),
]
The simplest API in cognee hides the deepest pipeline. cognee.remember("the user prefers detailed explanations") reads like a single operation. Behind it is a five-task pipeline that classifies documents, chunks them, runs an LLM extraction per chunk, writes the result to a graph and a vector store, and extracts foreign-key edges from any structured data sources — all strictly sequential, all stamped with provenance, and all protected by a per-dataset lock to prevent concurrent runs against the same data.
I initially read cognify() as "run the LLM once and store the result." The five tasks and the integrate_chunk_graphs function show this is wrong. The LLM is called *per chunk*, and there is a translation step between the LLM's output and the runtime storage that is one of the more interesting pieces of engineering in the codebase.
Key Takeaways
cognify()is a strictly sequential pipeline of fiveTaskobjects, not a "run the LLM and store" one-shot- The
KnowledgeGraph↔DataPointtranslation inintegrate_chunk_graphsis the architecture's hinge - DLT (data-load tool) chunks bypass the LLM entirely — they get deterministic FK-based graph construction
- Per-chunk parallel LLM calls (
asyncio.gather) insideextract_graph_and_summarizeare bounded by anasyncio.Semaphore(data_per_batch=20)— this is how cognee extracts 100+ entities per second per chunk - Every task stamps provenance onto the
DataPoints it creates —source_pipeline,source_task,source_user,source_content_hash,topological_rank
The pipeline at a glance
The five tasks, in order, are:
1. classify_documents — turn raw Data items into typed Document objects (PDF, audio, image, text, etc.) 2. extract_chunks_from_documents — split the documents into semantic text chunks 3. extract_graph_and_summarize — the LLM call. For each chunk, extract entities and relationships AND generate a per-chunk summary, batched under an asyncio.Semaphore(data_per_batch=20) 4. add_data_points — persist the resulting nodes, edges, and embeddings to the graph database and the vector database, dual-write 5. extract_dlt_fk_edges — for structured data loaded via DLT, extract foreign-key edges deterministically (no LLM call)
Imagine you are running cognee.add(). The string "Cognee turns documents into AI memory" is on stdin. By the time the call returns, the five tasks have run, the LLM has been called against the chunk containing that string, and the resulting entities ("Cognee," "documents," "AI memory") and the relationship between them have been written to a graph that the next cognee.recall() can query.
flowchart LR
A["add()<br/>raw data"] --> B["classify_documents<br/>raw Data → typed Document"]
B --> C["extract_chunks_from_documents<br/>Document → DocumentChunk[]"]
C --> D["extract_graph_and_summarize<br/>LLM per chunk<br/>→ KnowledgeGraph"]
D --> E["add_data_points<br/>graph + vector write<br/>(dual or atomic)"]
E --> F["extract_dlt_fk_edges<br/>DLT chunks only<br/>deterministic FKs"]
F --> G["searchable graph<br/>+ vector store"]
The pipeline is *strictly sequential* at the task level. The orchestrator in cognee/modules/pipelines/operations/run_tasks_base.py:262-283 takes tasks[0] as the running task and recursively chains leftover_tasks[1:] — there is no automatic parallelization between tasks. Data items within a single task, however, are processed in parallel via asyncio.gather bounded by an asyncio.Semaphore(data_per_batch). The default is 20. This is the right level of concurrency: enough to saturate an LLM rate limiter, not enough to overwhelm a graph database.
The two-schema problem
The interesting part is what happens inside extract_graph_and_summarize. The function in cognee/tasks/graph/extract_graph_from_data.py calls extract_content_graph (in cognee/infrastructure/llm/extraction/knowledge_graph/) for each chunk. The LLM call is wrapped by Instructor, which forces the response into a Pydantic model — by default, KnowledgeGraph from cognee/shared/data_models.py.
# From cognee/shared/data_models.py
class Node(BaseModel):
id: str
name: str = "" # defaults to id in __init__
type: str
description: str
class Edge(BaseModel):
source_node_id: str
target_node_id: str
relationship_name: str
description: str | None
class KnowledgeGraph(BaseModel):
nodes: list[Node] = Field(default_factory=list)
edges: list[Edge] = Field(default_factory=list)
Notice the id: str. The LLM is not generating UUIDs. It is generating human-readable identifiers — names, slugs, anything the LLM thinks identifies the entity. The runtime storage, in contrast, is the DataPoint base class in cognee/infrastructure/engine/models/DataPoint.py:27-367, where every node has an id: UUID = default_factory(uuid4) and a *deterministic* identity hash:
# From DataPoint.py
def id_for(*values):
return uuid5(NAMESPACE_OID, f"{cls.__name__}:{'|'.join(normalized_values)}")
The two schemas are not the same. The LLM-facing KnowledgeGraph uses strings, the runtime DataPoint uses UUIDs derived from the class name and the identity fields. The translation is the bridge function integrate_chunk_graphs (in cognee/tasks/graph/extract_graph_from_data.py:56-125), which:
1. Validates each entity against the ontology resolver (default: rdflib + fuzzy match at 80% similarity) 2. Calls expand_with_nodes_and_edges (line 110) to convert LLM Node/Edge strings into DataPoint subclasses (Entity, etc.) with UUIDs and embeddings 3. Stamps provenance on all reachable DataPoints: source_pipeline, source_task, source_user, source_node_set, source_content_hash, topological_rank (this happens in cognee/modules/pipelines/operations/run_tasks_base.py:33-117)
This is the architecture's hinge. Every other piece — search, multi-tenant isolation, the session→graph sync — assumes the runtime DataPoint representation. The LLM is a structured-output machine that returns a stringly-typed KnowledgeGraph, and the integration function is the only place where the two worlds meet.
The dual representation is a deliberate cost. The LLM cannot generate UUIDs reliably. The runtime cannot store nodes by human-readable names — it needs deterministic identity for upserts and de-duplication. The two schemas are necessary, and the translation is necessary, and the cost is one extra function call per chunk.
Why DLT chunks skip the LLM
The fifth task, extract_dlt_fk_edges, is for structured data loaded via the DLT (data load tool) library. If a chunk originated from a DLT source (a database table, a CSV with a known schema, a Notion page), the foreign-key relationships between rows are known at ingestion time. There is no need to call the LLM to discover them. The extract_graph_from_data.py:153-159 function checks for DLT chunks and skips the LLM extraction entirely:
# From extract_graph_from_data.py
if chunk.contains.dlt_source:
# DLT chunks get deterministic FK-based graph construction
# No LLM call — relationships are known from the schema
...
This is a quiet but important optimization. A 100,000-row DLT load would otherwise require 100,000 LLM calls to extract relationships — which would be both expensive and slow. Cognee recognizes the structure of the input and chooses a different code path. The architectural discipline of "don't call the LLM when you don't have to" is built into the pipeline at the task level, not bolted on.
How the chunks become a graph
Inside extract_graph_and_summarize (the third task), the per-chunk LLM calls run in parallel via asyncio.gather (line 166 of extract_graph_from_data.py). Each chunk gets its own LLM call, returns a KnowledgeGraph, and is then translated into DataPoint nodes and edges. The translation is the bridge function. After all chunks are processed, integrate_chunk_graphs walks the union of all chunk graphs, validates entities against the ontology resolver, and produces the *integrated* graph for the document.
The ontology resolver is itself a small extension point. The default is rdflib with fuzzy matching at 80% similarity, but cognee/modules/ontology/ is the public surface — supply your own OWL file via ONTOLOGY_FILE_PATH=/path/to/your.owl and the resolver will ground extracted entities in standardized semantic frameworks. This is the line that turns cognee from "LLM extracts whatever it wants" into "LLM extracts what fits your domain."
The write path: dual or atomic
The fourth task, add_data_points (in cognee/tasks/storage/add_data_points.py:31-167), is where the graph and the vector store get written. The function has two paths:
- Dual write (the common case):
asyncio.gather(graph_engine.add_nodes, index_data_points)— the graph database and the vector index are written in parallel - Atomic write (for hybrid backends like Postgres + pgvector):
unified.has_capability(EngineCapability.HYBRID_WRITE)— a single atomic write that includes both
The dual-write path is faster but has a failure mode: if the graph write succeeds and the vector write fails, the graph has nodes that the vector index doesn't know about. The atomic path closes this gap at the cost of throughput. The choice is exposed as unified.has_capability, and the codebase explicitly favors atomic when the backend supports it. This is the kind of decision that is invisible to the user but is load-bearing for production.
The vector indexing is itself non-trivial. index_data_points (cognee/tasks/storage/index_data_points.py:10-70) groups data points by (DataPoint subclass, field_name), where field_name is the value of metadata["index_fields"] — a list of fields on the DataPoint that should be embedded. Collection names are constructed as {TypeName}_{field_name} — Entity_name, DocumentChunk_text, EdgeType_relationship_name. The function uses an asyncio.Semaphore(4) to bound embedding concurrency and incremental task creation so a slow embedding doesn't block the next batch. This is the kind of concurrency discipline that distinguishes a research prototype from a production library.
Provenance is a first-class concern
Every DataPoint carries provenance metadata. The fields — source_pipeline, source_task, source_user, source_node_set, source_content_hash, topological_rank — are stamped automatically in cognee/modules/pipelines/operations/run_tasks_base.py:33-117. This is not an audit log. It is a property of the runtime objects themselves. When a DataPoint is serialized, its provenance is serialized with it. When it is queried, the provenance is in the result. When a node is removed, the provenance can be used to find the upstream pipeline run that created it.
This matters for the multi-tenant story. If tenant A's data accidentally leaks into tenant B's search result, the provenance field will say which user and which pipeline run created the leaked node. The next chapter shows the second layer of defense — per-dataset database isolation. Provenance and per-dataset isolation are belt and suspenders: provenance for forensic reconstruction, isolation for prevention.
What the pipeline does not do
The pipeline is the workhorse, but it is not everything. The cognify() function does not decide *which* dataset to run against — that decision happens earlier, in setup() and the user/dataset resolution logic in cognee/api/v1/cognify/cognify.py:222-226. The pipeline does not handle the LLM rate limit — that is the llm_rate_limiter singleton's job (covered in chapter three). The pipeline does not decide whether to write atomically or via dual write — the backend's unified.has_capability decides that. The pipeline is the assembly line; the rest of the system is the factory around it.
The kitchen-line metaphor is a one-shot. The image fits for one sentence, and the rest of the chapter is technical prose: a strictly sequential pipeline of five tasks, with a per-chunk LLM call at the heart, translated by a single bridge function into a runtime representation that the search layer and the multi-tenant substrate can use. The next chapter takes that runtime representation and asks: how does a query get answered?