This is an English translation of the following article:
https://dev.classmethod.jp/articles/snowflake-dbt-incremental-model-for-cortex-search-service/
To build RAG using Cortex Search Service, you need to prepare a table where documents such as PDFs have been parsed using functions like AI_PARSE_DOCUMENT.
I explored how to build this table using dbt, and in this article I'll share what I came up with.
The dbt Model Implementation
Let's jump right in — here is the dbt model I implemented. It performs document parsing and chunking against a directory table from an S3 bucket containing Snowflake customer case study PDFs.
The key points of this dbt model are:
- Configured as an Incremental model with
unique_key='relative_path'+incremental_strategy='delete+insert'. Data with the same file path is deleted and re-inserted.- If a record with the same path and the same or newer modification timestamp already exists in the target table, it is skipped. This means only unprocessed or updated files are processed, reducing the cost of
AI_PARSE_DOCUMENTparsing.
- If a record with the same path and the same or newer modification timestamp already exists in the target table, it is skipped. This means only unprocessed or updated files are processed, reducing the cost of
- Chunking uses the
SNOWFLAKE.CORTEX.SPLIT_TEXT_MARKDOWN_HEADERfunction.- I adopted this approach after reading a blog post by Takada-san from Snowflake.
{{
config(
materialized='incremental',
unique_key='relative_path',
incremental_strategy='delete+insert'
)
}}
with directory_files as (
select
relative_path,
last_modified,
TO_FILE('@RAW_DB.SAMPLE_SOURCE_SNOWPIPE.STAGE_CORPUS_SAMPLE', relative_path) as doc_file
from
DIRECTORY(@RAW_DB.SAMPLE_SOURCE_SNOWPIPE.STAGE_CORPUS_SAMPLE) as d
where
relative_path like 'snowflake-case-studies/%.pdf'
{% if is_incremental() %}
and not exists (
select 1
from {{ this }} as t
where t.relative_path = d.relative_path
and t.last_modified >= d.last_modified
)
{% endif %}
),
parsed as (
select
relative_path,
last_modified,
SNOWFLAKE.CORTEX.AI_PARSE_DOCUMENT(
doc_file,
{'mode': 'LAYOUT', 'page_split': TRUE}
) as parsed_result
from
directory_files
),
concatenated as (
select
p.relative_path,
p.last_modified,
LISTAGG(f.value:content::varchar, '\n\n') within group (order by f.index) as full_content
from
parsed as p,
lateral flatten(input => p.parsed_result:pages) as f
group by
p.relative_path, p.last_modified
),
chunked as (
select
c.relative_path,
c.last_modified,
ch.value:headers:header_1::varchar as header_1,
ch.value:headers:header_2::varchar as header_2,
ch.value:headers:header_3::varchar as header_3,
ch.value:chunk::varchar as chunk,
ch.index as chunk_index
from
concatenated as c,
lateral flatten(input =>
SNOWFLAKE.CORTEX.SPLIT_TEXT_MARKDOWN_HEADER(
c.full_content,
OBJECT_CONSTRUCT('#', 'header_1', '##', 'header_2', '###', 'header_3'),
10000
)
) as ch
)
select
SHA2(relative_path || ':' || chunk_index, 256) as doc_chunk_id,
relative_path,
SPLIT_PART(relative_path, '/', -1) as file_name,
REPLACE(REPLACE(SPLIT_PART(relative_path, '/', -1), 'Case_Study_', ''), '.pdf', '') as case_study_name,
header_1,
header_2,
header_3,
chunk_index,
chunk,
last_modified,
CURRENT_TIMESTAMP() as _parsed_at
from
chunked
Additionally, in dbt_project.yml, I configured pre_hook and post_hook as shown below. The assumption is that a corpus folder is created within the models folder, and models for parsing and chunking for Cortex Search Service are defined inside it.
-
pre_hook: Refreshes the external stage pointing to the S3 bucket containing the PDFs, so the directory table queried by the dbt model is updated before the model is built. -
post_hook: Since tables for Cortex Search Service require CHANGE_TRACKING to be enabled, a query to enable it is executed after the model is built.
name: 'sample_dbt_project'
version: '1.9.0'
config-version: 2
profile: 'sample_project'
models:
sample_dbt_project:
corpus:
sample:
+schema: corpus_sample
+pre-hook:
- "ALTER STAGE RAW_DB.SAMPLE_SOURCE_SNOWPIPE.STAGE_CORPUS_SAMPLE REFRESH"
+post-hook:
- "ALTER TABLE {{ this }} SET CHANGE_TRACKING = TRUE"
First dbt Model Run & Cortex Search Service Setup
In S3, I initially placed only two case study PDFs — one for Bourbon and one for Aisan Takaoka. (I'll omit the stage creation queries here.)
Running the dbt model above with dbt build creates a table in Snowflake with the parsed and chunked case study PDFs, as shown below.
I then created a Cortex Search Service and Cortex Agent for this dbt model using the following queries.
-- Create Cortex Search Service
CREATE OR REPLACE CORTEX SEARCH SERVICE PROD_DB.AIML_SAMPLE.CSS_SNOWFLAKE_CASE_STUDIES
ON chunk
ATTRIBUTES file_name, case_study_name, header_1, header_2, header_3, chunk_index, last_modified
WAREHOUSE = CORTEX_SEARCH_WH
TARGET_LAG = '365 day'
EMBEDDING_MODEL = 'voyage-multilingual-2'
INITIALIZE = ON_CREATE
COMMENT = 'Cortex Search Service for chunked Snowflake customer case study PDFs'
AS
SELECT
file_name,
case_study_name,
header_1,
header_2,
header_3,
chunk_index,
chunk,
last_modified
FROM PROD_DB.CORPUS_SAMPLE.COR_SNOWFLAKE_CASE_STUDIES_PARSED;
-- Create Agent
CREATE OR REPLACE AGENT PROD_DB.AIML_SAMPLE.AGENT_SNOWFLAKE_CASE_STUDIES
COMMENT = 'Cortex Agent for searching and answering questions about Snowflake customer case study PDFs'
PROFILE = '{"display_name": "Snowflake Case Study Search", "avatar": "search", "color": "#29B5E8"}'
FROM SPECIFICATION
$$
instructions:
system: |
You are an assistant that answers questions about the content of Snowflake customer case study PDFs.
Use the Cortex Search Service to search for relevant case study information
and provide answers based on accurate information.
If the information is not found in the search results, do not speculate — inform the user accordingly.
response: |
Please respond concisely.
Include the referenced case study name (case_study_name) in your answer.
tools:
- tool_spec:
type: "cortex_search"
name: "SearchCaseStudies"
description: "Search Snowflake customer case study PDFs"
tool_resources:
SearchCaseStudies:
name: "PROD_DB.AIML_SAMPLE.CSS_SNOWFLAKE_CASE_STUDIES"
max_results: "5"
$$;
-- Add to Snowflake Intelligence
ALTER SNOWFLAKE INTELLIGENCE SNOWFLAKE_INTELLIGENCE_OBJECT_DEFAULT
ADD AGENT PROD_DB.AIML_SAMPLE.AGENT_SNOWFLAKE_CASE_STUDIES;
After querying through Snowflake Intelligence, I was able to get results as shown below.
Updating S3 Data and Running the dbt Model a Second Time
To verify that incremental updates work correctly, I made the following two changes to the S3 bucket:
- Deleted the 2nd page of the Aisan Takaoka case study PDF and re-uploaded it with the same file name to S3
- Added a new Chiba Bank case study PDF to the S3 bucket
After running dbt build again, you can see that the chunk count for Aisan Takaoka decreased (due to the removed page), and new records for Chiba Bank were added.
Looking at the _PARSED_AT column, which records the parsing timestamp, you can confirm that only the records for Aisan Takaoka and Chiba Bank — the files that had changes — had their timestamps updated.
With the table updated, I refreshed the Cortex Search Service by running the following query:
ALTER CORTEX SEARCH SERVICE PROD_DB.AIML_SAMPLE.CSS_SNOWFLAKE_CASE_STUDIES REFRESH;
Querying through Snowflake Intelligence again confirmed that the updated data was being used for responses.
Conclusion
I explored building a dbt Incremental model that parses PDFs and chunks them for Cortex Search Service, and shared the implementation in this article.
By using dbt's Incremental model, only files with changes are detected and processed by the AI_PARSE_DOCUMENT function, which helps reduce costs.
I hope you find this useful!







Top comments (0)