Mastering the Graph: How Bob Built a Smarter RAG for Unstructured Data
đ The Vision: Engineering the Ultimate Knowledge Engine â The Idea Behind the Tool
This project represents one of my most ambitious âpublicâ collaborations with Bob, engineered by fusing the âbest-of-breedâ technologies into a single, cohesive powerhouse. To appreciate the sheer capability of this system, one must look at the two essential âbrainsâ driving the operation: OpenSearch and Neo4j.
While both are titans of data management, they approach information from fundamentally different angles. When synchronized, they solve the most persistent âheadachesâ in modern AI â specifically the issues of context, accuracy, and scale. Finally, to feed these brains, weâve integrated Docling. When it comes to transforming messy, unstructured data into machine-readable intelligence, Docling stands in a league of its own; no other tool on the market can match its precision in meeting the rigorous requirements of high-fidelity document processing.
đ§ The Dual-Brain Architecture
OpenSearch: The High-Speed Librarian
OpenSearch is an open-source search and analytics suite. At its core, it is a distributed search engine designed to handle massive amounts of data with lightning-fast speed.
- Usage in RAG: In a standard Retrieval-Augmented Generation (RAG) setup, OpenSearch acts as the âVector Database.â It takes the text chunks Bob extracted with Docling and converts them into âembeddingsâ (mathematical vectors). When you ask a question, OpenSearch performs a Similarity Search â finding the text that âfeelsâ most similar to your query, even if the exact words donât match.
- Market Dominance: It is heavily used because it offers Hybrid Search. It can combine classic keyword matching (finding the exact word âiPhoneâ) with semantic vector search (understanding you mean âsmartphoneâ), providing a level of precision that pure vector databases often miss.
Neo4j: The Master of Connections
Neo4j is the worldâs leading Graph Database. Unlike traditional databases that store data in rows and columns, Neo4j stores data as Nodes (entities like âBobâ or âOpenSearchâ) and Relationships (the lines connecting them, like âCreated Byâ).
- Usage in GraphRAG: While OpenSearch finds similar text, Neo4j finds related facts. In a GraphRAG implementation, Neo4j allows the AI to perform âmulti-hop reasoning.â For example, if you ask, âWhat other tools did the creator of OpenSearch-Docling-GraphRAG build?â the system can hop from the Tool to Bob, and then to all of Bobâs other Projects.
- Market Dominance: Neo4j is the âgold standardâ because it uses Index-Free Adjacency. This is a technical way of saying it doesnât have to search through a giant list to find a connection; it simply follows the physical pointers from one piece of data to the next, making it incredibly fast for complex, interconnected queries.
Why the Market is Obsessed with this Duo
The tech world is moving away from âBasic RAGâ and toward âGraphRAGâ for three main reasons:
- Eliminating Hallucinations: Traditional RAG can sometimes grab two unrelated text chunks that just happen to sound similar. GraphRAG ensures the data is factually linked in a graph, providing a âground truthâ that prevents the AI from making up connections.
- Contextual Depth: By using Docling to parse documents into a graph structure, Bobâs tool can understand the hierarchy of a PDF (e.g., this sub-header belongs to this section), which a flat search engine might lose.
- Explainability: If the AI gives a weird answer, you can look at the Neo4j graph and see exactly which âpathâ it took to get there. It turns the âBlack Boxâ of AI into a transparent map.
Letâs jump into practical implementation of this Project đŞ
đ ď¸ The Implementation: How Bob Connects the Dots
Under the hood, the system is orchestrated via a Streamlit UI, utilizing Ollama for local LLM inference and embedding generation to ensure data privacy.
This synergy allows for a Hybrid RAG approach: the system first retrieves relevant context from OpenSearch, then enriches it with relationship data from Neo4j, resulting in answers that are both contextually deep and factually grounded.
| Feature | Technology | Usage in this Tool |
| ----------- | -------------- | ----------------------------------------------------- |
| **Parsing** | **Docling** | Processes PDF, DOCX, and PPTX into structured data. |
| **Search** | **OpenSearch** | Powers semantic retrieval via vector embeddings. |
| **Graph** | **Neo4j** | Stores entity relationships for GraphRAG queries. |
| **Model** | **Ollama** | Runs local models like `ibm/granite4` for generation. |
Features Implemented so farâŚ
đ¸ď¸ Neo4j (Knowledge Graph)
- Knowledge Graph Construction: Automatically builds a relational map of entities (88+ currently identified) and their connections from processed documents.
- Interactive Visualization: Features a âGraph Explorerâ using PyVis that allows users to click, drag, and zoom through three view types: Entity Graph, Document Structure, and Full Graph.
- Relationship Mapping: Implements a fixed MENTIONS relationship logic that correctly links document chunks to entities (like People, Organizations, and Locations) using both unique IDs and names.
- Multi-Hop Reasoning: Enables the AI to follow âpathsâ between entities, allowing it to answer complex questions that require connecting facts across different documents.
đ OpenSearch (Vector Search)
- Semantic Retrieval: Functions as the systemâs high-speed vector store, performing similarity searches on document embeddings to find contextually relevant information.
- High-Volume Indexing: Capable of efficiently managing and searching hundreds of document chunks (696+ in the current build).
- Metadata Integration: Stores document text alongside metadata, ensuring that search results are tied back to their original sources for transparency.
đŽ GraphQL API
- Flexible Data Retrieval: Provides a dedicated playground at http://localhost:8000/api/graphql for running complex, nested queries.
- Comprehensive Schema: Supports queries for health checks, semantic searches, RAG queries, and detailed graph statistics.
- Entity Exploration: Allows developers to list entities by type (e.g., âPersonâ) and see their document counts and connection distances.
đ REST API
- Standardized Endpoints: Offers traditional HTTP endpoints for all core functions, including document uploading, batch processing, and RAG-based question answering.
- Background Job Management: Includes endpoints to trigger batch jobs and monitor their progress (e.g., % completed) via unique job IDs.
- System Monitoring: Provides health check and configuration endpoints that report the status of connected services like Neo4j, OpenSearch, and Ollama.
đ GPU Acceleration
- Performance Optimization: When enabled, it provides a 2â3x boost in document processing speed and a 3â5x increase in embedding generation speed.
- Configurable Resources: Users can manage GPU usage via environment variables, specifically setting GPU_DEVICE_ID and GPU_MEMORY_FRACTION to optimize hardware allocation.
- NVIDIA CUDA Support: Designed to leverage NVIDIA hardware to handle heavy batch processing tasks more efficiently than a standard CPU.
đŤ Getting Started: Deploying the Powerhouse in 5 Minutes
Bob designed this system to be âplug-and-playâ for developers. Whether you are running it locally or in a containerized environment, you can go from zero to a fully functional GraphRAG system in minutes.
Prerequisites
Before diving in, ensure your machine meets these basic requirements:
- Python: Version 3.11, 3.12, or 3.13.
- Docker & Docker Compose: For orchestrating OpenSearch and Neo4j.
- Ollama: To run local LLMs like ibm/granite4:latest and granite-embedding:278m.
Hardware: At least 8GB RAM and 10GB disk space.
The Setup Process
To get the system up and running, follow Bobâs streamlined installation path:Clone & Configure: Clone the repository and set up your environment variables by copying the
.env.examplefile to.env.Pull AI Models: Use Ollama to download the necessary intelligence models:
ollama pull ibm/granite4:latest ollama pull granite-embedding:278m
- Launch: Execute the startup script:
./start.sh
- This script automatically builds the Python environment, installs dependencies, and starts the Docker services for OpenSearch and Neo4j.
"""
Main Streamlit application for OpenSearch-Docling-GraphRAG.
"""
import streamlit as st
from streamlit_option_menu import option_menu
import os
from pathlib import Path
from datetime import datetime
import json
from loguru import logger
import sys
# Configure logger
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("logs/app_{time}.log", rotation="1 day", retention="7 days", level="DEBUG")
# Import application modules
from src.processors import DoclingProcessor
from src.rag import OpenSearchClient, OllamaClient
from src.graphrag import Neo4jClient, GraphBuilder, GraphVisualizer
from config.settings import settings
# Page configuration
st.set_page_config(
page_title="OpenSearch-Docling-GraphRAG",
page_icon="đ",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize session state
if 'processor' not in st.session_state:
st.session_state.processor = None
if 'opensearch_client' not in st.session_state:
st.session_state.opensearch_client = None
if 'ollama_client' not in st.session_state:
st.session_state.ollama_client = None
if 'neo4j_client' not in st.session_state:
st.session_state.neo4j_client = None
if 'graph_builder' not in st.session_state:
st.session_state.graph_builder = None
if 'graph_visualizer' not in st.session_state:
st.session_state.graph_visualizer = None
if 'initialized' not in st.session_state:
st.session_state.initialized = False
def initialize_clients():
"""Initialize all clients."""
try:
with st.spinner("Initializing clients..."):
if not st.session_state.initialized:
st.session_state.processor = DoclingProcessor()
st.session_state.opensearch_client = OpenSearchClient()
st.session_state.ollama_client = OllamaClient()
st.session_state.neo4j_client = Neo4jClient()
st.session_state.graph_builder = GraphBuilder(st.session_state.neo4j_client)
st.session_state.graph_visualizer = GraphVisualizer(st.session_state.neo4j_client)
st.session_state.initialized = True
st.success("â
All clients initialized successfully!")
logger.info("All clients initialized")
except Exception as e:
st.error(f"â Error initializing clients: {str(e)}")
logger.error(f"Initialization error: {str(e)}")
def process_single_file(uploaded_file):
"""Process a single uploaded file."""
try:
# Save uploaded file temporarily
temp_path = Path(settings.input_dir) / uploaded_file.name
with open(temp_path, 'wb') as f:
f.write(uploaded_file.getbuffer())
# Process document
with st.spinner(f"Processing {uploaded_file.name}..."):
doc_data = st.session_state.processor.process_document(str(temp_path))
# Generate embeddings
texts = [chunk['text'] for chunk in doc_data['chunks']]
embeddings = st.session_state.ollama_client.generate_embeddings_batch(texts)
# Index in OpenSearch
document_id = f"{Path(uploaded_file.name).stem}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
st.session_state.opensearch_client.index_document(
document_id=document_id,
file_name=uploaded_file.name,
file_path=str(temp_path),
chunks=doc_data['chunks'],
embeddings=embeddings,
metadata=doc_data['metadata']
)
# Build knowledge graph
st.session_state.graph_builder.build_document_graph(
document_id=document_id,
file_name=uploaded_file.name,
file_path=str(temp_path),
chunks=doc_data['chunks'],
metadata=doc_data['metadata']
)
# Save output
output_file = st.session_state.processor.save_output(doc_data, settings.output_dir)
return {
'success': True,
'document_id': document_id,
'output_file': output_file,
'chunks': len(doc_data['chunks'])
}
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
return {'success': False, 'error': str(e)}
def process_batch_files():
"""Process all files in the input directory."""
input_dir = Path(settings.input_dir)
files = list(input_dir.glob('*'))
files = [f for f in files if f.is_file() and not f.name.startswith('.')]
if not files:
st.warning("No files found in input directory")
return
progress_bar = st.progress(0)
status_text = st.empty()
results = []
for i, file_path in enumerate(files):
status_text.text(f"Processing {file_path.name} ({i+1}/{len(files)})")
try:
# Process document
doc_data = st.session_state.processor.process_document(str(file_path))
# Generate embeddings
texts = [chunk['text'] for chunk in doc_data['chunks']]
embeddings = st.session_state.ollama_client.generate_embeddings_batch(texts)
# Index in OpenSearch
document_id = f"{file_path.stem}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
st.session_state.opensearch_client.index_document(
document_id=document_id,
file_name=file_path.name,
file_path=str(file_path),
chunks=doc_data['chunks'],
embeddings=embeddings,
metadata=doc_data['metadata']
)
# Build knowledge graph
st.session_state.graph_builder.build_document_graph(
document_id=document_id,
file_name=file_path.name,
file_path=str(file_path),
chunks=doc_data['chunks'],
metadata=doc_data['metadata']
)
# Save output
output_file = st.session_state.processor.save_output(doc_data, settings.output_dir)
results.append({
'file': file_path.name,
'status': 'success',
'document_id': document_id,
'chunks': len(doc_data['chunks'])
})
except Exception as e:
logger.error(f"Error processing {file_path.name}: {str(e)}")
results.append({
'file': file_path.name,
'status': 'failed',
'error': str(e)
})
progress_bar.progress((i + 1) / len(files))
status_text.text("Batch processing complete!")
return results
def main():
"""Main application."""
# Sidebar
with st.sidebar:
st.title("đ Document RAG System")
selected = option_menu(
menu_title=None,
options=["Home", "Upload", "Batch Process", "Search", "Graph Explorer", "Settings"],
icons=["house", "cloud-upload", "files", "search", "diagram-3", "gear"],
default_index=0,
)
st.divider()
# System status
st.subheader("System Status")
if st.session_state.initialized:
st.success("â
System Ready")
# Show statistics
try:
doc_count = st.session_state.opensearch_client.get_document_count()
st.metric("Indexed Chunks", doc_count)
graph_stats = st.session_state.graph_builder.get_graph_summary()
st.metric("Documents in Graph", graph_stats.get('total_documents', 0))
st.metric("Entities", graph_stats.get('total_entities', 0))
except:
pass
else:
st.warning("â ď¸ System Not Initialized")
if st.button("Initialize System"):
initialize_clients()
# Main content
if selected == "Home":
st.title("đ Welcome to OpenSearch-Docling-GraphRAG")
st.markdown("""
### A Comprehensive Document Processing and RAG System
This application combines:
- **Docling**: Advanced document processing
- **OpenSearch**: Vector search and retrieval
- **Neo4j**: Knowledge graph construction
- **Ollama**: Local LLM for embeddings and generation
#### Features:
- đ Process various document formats (PDF, DOCX, PPTX, etc.)
- đ Semantic search with vector embeddings
- đ¸ď¸ Knowledge graph construction and exploration
- đŹ RAG-based question answering
- đ Batch processing capabilities
#### Getting Started:
1. Initialize the system using the sidebar button
2. Upload documents or process batch files
3. Search and query your documents
4. Explore the knowledge graph
""")
if not st.session_state.initialized:
st.info("đ Please initialize the system from the sidebar to get started")
elif selected == "Upload":
st.title("đ¤ Upload Documents")
if not st.session_state.initialized:
st.warning("Please initialize the system first")
return
uploaded_file = st.file_uploader(
"Choose a document",
type=['pdf', 'docx', 'pptx', 'txt', 'md', 'html']
)
if uploaded_file:
st.info(f"File: {uploaded_file.name} ({uploaded_file.size} bytes)")
if st.button("Process Document"):
result = process_single_file(uploaded_file)
if result['success']:
st.success(f"â
Document processed successfully!")
st.json({
'Document ID': result['document_id'],
'Chunks Created': result['chunks'],
'Output File': result['output_file']
})
else:
st.error(f"â Error: {result['error']}")
elif selected == "Batch Process":
st.title("đ Batch Process Documents")
if not st.session_state.initialized:
st.warning("Please initialize the system first")
return
st.info(f"Input directory: {settings.input_dir}")
input_dir = Path(settings.input_dir)
files = list(input_dir.glob('*'))
files = [f for f in files if f.is_file() and not f.name.startswith('.')]
st.write(f"Found {len(files)} files:")
for f in files:
st.text(f" ⢠{f.name}")
if st.button("Process All Files"):
if files:
results = process_batch_files()
# Display results
if results:
st.subheader("Processing Results")
success_count = sum(1 for r in results if r['status'] == 'success')
st.metric("Successfully Processed", f"{success_count}/{len(results)}")
# Show details
for result in results:
if result['status'] == 'success':
st.success(f"â
{result['file']} - {result['chunks']} chunks")
else:
st.error(f"â {result['file']} - {result['error']}")
else:
st.warning("No files to process")
elif selected == "Search":
st.title("đ Search Documents")
if not st.session_state.initialized:
st.warning("Please initialize the system first")
return
query = st.text_input("Enter your question:")
k = st.slider("Number of results", 1, 10, 5)
if query and st.button("Search"):
with st.spinner("Searching..."):
# Generate query embedding
query_embedding = st.session_state.ollama_client.generate_embedding(query)
# Search
results = st.session_state.opensearch_client.search(
query_embedding=query_embedding,
k=k
)
# Generate RAG response
rag_response = st.session_state.ollama_client.generate_rag_response(
query=query,
retrieved_docs=results
)
# Display answer
st.subheader("Answer")
st.write(rag_response['answer'])
# Display sources
st.subheader("Sources")
for i, source in enumerate(rag_response['sources'], 1):
with st.expander(f"Source {i}: {source['file_name']} (Score: {source['score']:.4f})"):
st.write(f"Chunk ID: {source['chunk_id']}")
elif selected == "Graph Explorer":
st.title("đ¸ď¸ Knowledge Graph Explorer")
if not st.session_state.initialized:
st.warning("Please initialize the system first")
return
# Tabs for different views
viz_tab, search_tab, stats_tab = st.tabs(["đ Visualize", "đ Search", "đ Statistics"])
with viz_tab:
st.subheader("Interactive Graph Visualization")
# Visualization options
viz_type = st.radio(
"Select visualization type:",
["Entity Graph", "Document Structure", "Full Graph"],
horizontal=True
)
if viz_type == "Entity Graph":
st.markdown("**Visualize an entity and its connections**")
col1, col2 = st.columns([3, 1])
with col1:
entity_name = st.text_input(
"Enter entity name:",
placeholder="e.g., Bob, Python, AI",
key="entity_viz"
)
with col2:
max_depth = st.number_input("Max depth:", min_value=1, max_value=5, value=2)
if st.button("đ¨ Visualize Entity", type="primary"):
if entity_name:
try:
with st.spinner(f"Generating graph for '{entity_name}'..."):
html = st.session_state.graph_visualizer.visualize_entity_graph(
entity_name=entity_name,
max_depth=max_depth,
max_nodes=50
)
st.session_state.graph_visualizer.render_graph(html)
st.success(f"â
Graph generated for '{entity_name}'")
st.info("đĄ **Tip:** Click and drag nodes, scroll to zoom, hover for details")
except Exception as e:
st.error(f"â Error: {str(e)}")
st.info("đĄ Try searching for a different entity or check if documents have been processed")
else:
st.warning("â ď¸ Please enter an entity name")
elif viz_type == "Document Structure":
st.markdown("**Visualize document structure with chunks and entities**")
# Get list of documents
try:
with st.session_state.neo4j_client.driver.session() as session:
result = session.run("MATCH (d:Document) RETURN d.id as id, d.file_name as name")
documents = [(r['id'], r['name']) for r in result]
if documents:
doc_options = ["All Documents"] + [f"{name} ({id[:8]}...)" for id, name in documents]
selected_doc = st.selectbox("Select document:", doc_options)
if st.button("đ¨ Visualize Document", type="primary"):
try:
with st.spinner("Generating document graph..."):
if selected_doc == "All Documents":
html = st.session_state.graph_visualizer.visualize_document_graph(
document_id=None,
max_nodes=100
)
else:
# Extract document ID from selection
doc_id = [id for id, name in documents if name in selected_doc][0]
html = st.session_state.graph_visualizer.visualize_document_graph(
document_id=doc_id,
max_nodes=100
)
st.session_state.graph_visualizer.render_graph(html)
st.success("â
Document graph generated")
# Legend
st.markdown("---")
st.markdown("**Legend:**")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("đľ **Blue Box** = Document")
with col2:
st.markdown("đ˘ **Green Dot** = Chunk")
with col3:
st.markdown("đ´ **Red Star** = Entity")
except Exception as e:
st.error(f"â Error: {str(e)}")
else:
st.info("đ No documents found. Upload and process documents first.")
except Exception as e:
st.error(f"â Error fetching documents: {str(e)}")
else: # Full Graph
st.markdown("**Visualize the entire knowledge graph**")
st.warning("â ď¸ This may be slow for large graphs")
max_nodes = st.slider("Maximum nodes to display:", 10, 200, 50)
if st.button("đ¨ Visualize Full Graph", type="primary"):
try:
with st.spinner("Generating full graph..."):
html = st.session_state.graph_visualizer.visualize_document_graph(
document_id=None,
max_nodes=max_nodes
)
st.session_state.graph_visualizer.render_graph(html)
st.success("â
Full graph generated")
except Exception as e:
st.error(f"â Error: {str(e)}")
with search_tab:
st.subheader("đ Search Entities")
entity_name = st.text_input("Search for entity:", key="search_entity")
if st.button("Search", key="search_btn"):
if entity_name:
try:
# Search for entity
with st.session_state.neo4j_client.driver.session() as session:
result = session.run("""
MATCH (e:Entity)
WHERE toLower(e.name) CONTAINS toLower($name)
RETURN e.name as name, id(e) as id
LIMIT 20
""", name=entity_name)
entities = list(result)
if entities:
st.success(f"Found {len(entities)} entities:")
for entity in entities:
with st.expander(f"đ´ {entity['name']}"):
# Get connections
conn_result = session.run("""
MATCH (e:Entity)-[r]-(n)
WHERE id(e) = $id
RETURN type(r) as rel_type, labels(n) as labels, count(*) as count
""", id=entity['id'])
connections = list(conn_result)
if connections:
st.markdown("**Connections:**")
for conn in connections:
st.text(f" ⢠{conn['rel_type']} â {conn['labels'][0]}: {conn['count']}")
# Visualize button
if st.button(f"Visualize {entity['name']}", key=f"viz_{entity['id']}"):
html = st.session_state.graph_visualizer.visualize_entity_graph(
entity_name=entity['name'],
max_depth=2
)
st.session_state.graph_visualizer.render_graph(html)
else:
st.info(f"No entities found matching '{entity_name}'")
except Exception as e:
st.error(f"â Error: {str(e)}")
else:
st.warning("Please enter an entity name")
with stats_tab:
st.subheader("đ Graph Statistics")
try:
with st.session_state.neo4j_client.driver.session() as session:
# Get counts
stats = {}
# Document count
result = session.run("MATCH (d:Document) RETURN count(d) as count")
stats['documents'] = result.single()['count']
# Chunk count
result = session.run("MATCH (c:Chunk) RETURN count(c) as count")
stats['chunks'] = result.single()['count']
# Entity count
result = session.run("MATCH (e:Entity) RETURN count(e) as count")
stats['entities'] = result.single()['count']
# Relationship count
result = session.run("MATCH ()-[r]->() RETURN count(r) as count")
stats['relationships'] = result.single()['count']
# Display stats
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Documents", stats['documents'])
with col2:
st.metric("Chunks", stats['chunks'])
with col3:
st.metric("Entities", stats['entities'])
with col4:
st.metric("Relationships", stats['relationships'])
st.markdown("---")
# Top entities
st.markdown("**Top 10 Most Connected Entities:**")
result = session.run("""
MATCH (e:Entity)-[r]-()
RETURN e.name as name, count(r) as connections
ORDER BY connections DESC
LIMIT 10
""")
top_entities = list(result)
if top_entities:
for i, entity in enumerate(top_entities, 1):
st.text(f"{i}. {entity['name']}: {entity['connections']} connections")
else:
st.info("No entities found")
except Exception as e:
st.error(f"â Error fetching statistics: {str(e)}")
elif selected == "Settings":
st.title("âď¸ Settings")
st.subheader("Configuration")
st.json({
'Ollama Model': settings.ollama_model,
'Embedding Model': settings.ollama_embedding_model,
'OpenSearch Host': f"{settings.opensearch_host}:{settings.opensearch_port}",
'Neo4j URI': settings.neo4j_uri,
'Input Directory': settings.input_dir,
'Output Directory': settings.output_dir,
'Chunk Size': settings.chunk_size,
'Chunk Overlap': settings.chunk_overlap
})
if __name__ == "__main__":
main()
# Made with Bob
The First Workflow

Once the application is live at http://localhost:8501, you can begin processing data immediately:
- Initialize: Click the âInitialize Systemâ button in the sidebar to connect all âbrainsâ.
- Upload: Use the Upload tab to process a single document or drop files into the
/inputfolder for Batch Processing. - Query: Navigate to the Search tab to ask questions. The system will retrieve the best text chunks from OpenSearch and use the Neo4j graph to provide a deep, contextual answer.
A focus on Knowledge Graph Implementation
Bobâs system elevates standard data retrieval by implementing a sophisticated Knowledge Graph via Neo4j, transforming flat document text into an interconnected web of intelligence. Unlike traditional databases, this implementation maps complex relationships between 88+ identified entities â such as people, organizations, and locations â using a specialized MENTIONS relationship logic that links specific document chunks to their real-world subjects. By leveraging the Graph Explorer, users can perform multi-hop reasoning to uncover hidden connections across disparate documents, effectively turning a static library into a dynamic, navigable map of organizational knowledge.
- Relational Mapping: Automatically extracts and connects entities from processed documents.
- Interactive Visualization: Features a PyVis-powered explorer with three specialized views: Entity Graph, Document Structure, and Full Graph.
- Visual Discovery: Utilizes a color-coded legend where Blue Boxes represent documents, Green Circles signify chunks, and Red Stars indicate key entities.
- Deep Exploration: Allows for adjustable depth searches (1â5 levels) to visualize how a single entity like âBobâ connects to the broader database.
- Local neo4j portal: There is also local access to the neo4j server.
"""
Neo4j client for graph database operations.
"""
from typing import List, Dict, Any, Optional
from neo4j import GraphDatabase
from loguru import logger
from config.settings import settings
class Neo4jClient:
"""Client for interacting with Neo4j graph database."""
def __init__(self):
"""Initialize Neo4j client."""
self.driver = GraphDatabase.driver(
settings.neo4j_uri,
auth=(settings.neo4j_user, settings.neo4j_password)
)
self._verify_connectivity()
logger.info("Neo4j client initialized")
def _verify_connectivity(self):
"""Verify connection to Neo4j."""
try:
with self.driver.session() as session:
session.run("RETURN 1")
logger.info("Neo4j connection verified")
except Exception as e:
logger.error(f"Neo4j connection failed: {str(e)}")
raise
def close(self):
"""Close the driver connection."""
self.driver.close()
logger.info("Neo4j connection closed")
def _flatten_metadata(self, metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""
Flatten nested metadata dictionaries for Neo4j compatibility.
Neo4j only accepts primitive types and arrays as property values.
Args:
metadata: Metadata dictionary (possibly nested)
Returns:
Flattened metadata dictionary
"""
if not metadata:
return {}
flattened = {}
for key, value in metadata.items():
if isinstance(value, dict):
# Flatten nested dict with dot notation
for nested_key, nested_value in value.items():
flat_key = f"{key}_{nested_key}"
if isinstance(nested_value, (str, int, float, bool)):
flattened[flat_key] = nested_value
else:
flattened[flat_key] = str(nested_value)
elif isinstance(value, (str, int, float, bool)):
flattened[key] = value
elif isinstance(value, list):
# Convert list elements to strings if not primitive
flattened[key] = [str(v) if not isinstance(v, (str, int, float, bool)) else v for v in value]
else:
flattened[key] = str(value)
return flattened
def create_document_node(
self,
document_id: str,
file_name: str,
file_path: str,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create a document node in the graph.
Args:
document_id: Unique document identifier
file_name: Name of the file
file_path: Path to the file
metadata: Additional metadata
Returns:
Created node information
"""
# Flatten metadata to avoid nested dict issues
flat_metadata = self._flatten_metadata(metadata)
with self.driver.session() as session:
query = """
MERGE (d:Document {id: $document_id})
SET d.file_name = $file_name,
d.file_path = $file_path,
d += $metadata,
d.created_at = datetime()
RETURN d
"""
result = session.run(
query,
document_id=document_id,
file_name=file_name,
file_path=file_path,
metadata=flat_metadata
)
record = result.single()
logger.info(f"Created document node: {document_id}")
return dict(record["d"])
def create_chunk_node(
self,
document_id: str,
chunk_id: int,
text: str,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create a chunk node and link it to its document.
Args:
document_id: Parent document identifier
chunk_id: Chunk identifier
text: Chunk text
metadata: Additional metadata
Returns:
Created node information
"""
# Flatten metadata to avoid nested dict issues
flat_metadata = self._flatten_metadata(metadata)
with self.driver.session() as session:
query = """
MATCH (d:Document {id: $document_id})
CREATE (c:Chunk {id: $chunk_id, document_id: $document_id})
SET c.text = $text,
c += $metadata,
c.created_at = datetime()
CREATE (d)-[:HAS_CHUNK]->(c)
RETURN c
"""
result = session.run(
query,
document_id=document_id,
chunk_id=chunk_id,
text=text,
metadata=flat_metadata
)
record = result.single()
return dict(record["c"])
def create_entity_node(
self,
entity_name: str,
entity_type: str,
properties: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create an entity node.
Args:
entity_name: Name of the entity
entity_type: Type of entity (Person, Organization, Location, etc.)
properties: Additional properties
Returns:
Created node information
"""
with self.driver.session() as session:
query = f"""
MERGE (e:Entity:{entity_type} {{name: $entity_name}})
SET e += $properties,
e.created_at = coalesce(e.created_at, datetime())
RETURN e
"""
result = session.run(
query,
entity_name=entity_name,
properties=properties or {}
)
record = result.single()
return dict(record["e"])
def create_relationship(
self,
from_node_id: str,
to_node_id: str,
relationship_type: str,
properties: Optional[Dict[str, Any]] = None
):
"""
Create a relationship between two nodes.
Args:
from_node_id: Source node ID (or name for entities)
to_node_id: Target node ID (or name for entities)
relationship_type: Type of relationship
properties: Relationship properties
"""
with self.driver.session() as session:
# Convert IDs to appropriate types
# Try to convert to int for chunk IDs, keep as string for entity names
def convert_id(id_val):
try:
return int(id_val)
except (ValueError, TypeError):
return id_val
from_id_int = convert_id(from_node_id)
to_id_int = convert_id(to_node_id)
# Try to match by id (as int or string) or by name (for entities)
query = f"""
MATCH (a)
WHERE a.id = $from_id OR a.id = $from_id_int OR a.name = $from_id
MATCH (b)
WHERE b.id = $to_id OR b.id = $to_id_int OR b.name = $to_id
MERGE (a)-[r:{relationship_type}]->(b)
SET r += $properties
RETURN r
"""
result = session.run(
query,
from_id=from_node_id,
from_id_int=from_id_int,
to_id=to_node_id,
to_id_int=to_id_int,
properties=properties or {}
)
# Check if relationship was created
record = result.single()
if not record:
logger.warning(f"Could not create relationship {relationship_type} from {from_node_id} to {to_node_id}")
def find_related_documents(
self,
entity_name: str,
max_depth: int = 2
) -> List[Dict[str, Any]]:
"""
Find documents related to an entity.
Args:
entity_name: Name of the entity
max_depth: Maximum relationship depth
Returns:
List of related documents
"""
with self.driver.session() as session:
query = """
MATCH (e:Entity {name: $entity_name})
MATCH path = (e)-[*1..%d]-(d:Document)
RETURN DISTINCT d.id as document_id,
d.file_name as file_name,
length(path) as distance
ORDER BY distance
""" % max_depth
result = session.run(query, entity_name=entity_name)
return [dict(record) for record in result]
def get_document_graph(self, document_id: str) -> Dict[str, Any]:
"""
Get the graph structure for a document.
Args:
document_id: Document identifier
Returns:
Graph structure with nodes and relationships
"""
with self.driver.session() as session:
query = """
MATCH (d:Document {id: $document_id})
OPTIONAL MATCH (d)-[r1:HAS_CHUNK]->(c:Chunk)
OPTIONAL MATCH (c)-[r2]-(e:Entity)
RETURN d, collect(DISTINCT c) as chunks,
collect(DISTINCT e) as entities,
collect(DISTINCT r1) as chunk_rels,
collect(DISTINCT r2) as entity_rels
"""
result = session.run(query, document_id=document_id)
record = result.single()
if not record:
return {}
return {
"document": dict(record["d"]),
"chunks": [dict(c) for c in record["chunks"] if c],
"entities": [dict(e) for e in record["entities"] if e],
"relationships": {
"chunks": len([r for r in record["chunk_rels"] if r]),
"entities": len([r for r in record["entity_rels"] if r])
}
}
def delete_document(self, document_id: str):
"""
Delete a document and all its related nodes.
Args:
document_id: Document identifier
"""
with self.driver.session() as session:
query = """
MATCH (d:Document {id: $document_id})
OPTIONAL MATCH (d)-[:HAS_CHUNK]->(c:Chunk)
DETACH DELETE d, c
"""
session.run(query, document_id=document_id)
logger.info(f"Deleted document from graph: {document_id}")
def get_statistics(self) -> Dict[str, int]:
"""
Get database statistics.
Returns:
Statistics dictionary
"""
with self.driver.session() as session:
query = """
MATCH (d:Document) WITH count(d) as docs
MATCH (c:Chunk) WITH docs, count(c) as chunks
MATCH (e:Entity) WITH docs, chunks, count(e) as entities
MATCH ()-[r]->() WITH docs, chunks, entities, count(r) as relationships
RETURN docs, chunks, entities, relationships
"""
result = session.run(query)
record = result.single()
return {
"documents": record["docs"],
"chunks": record["chunks"],
"entities": record["entities"],
"relationships": record["relationships"]
}
# Made with Bob
A word on deploymentsâŚ
The OpenSearch-Docling-GraphRAG application is fully âKubernetes-ready,â providing all the essential architectural âbricksâ needed to deploy a scalable, enterprise-grade document intelligence platform. The repository includes a comprehensive set of k8s manifests, covering everything from core orchestration with namespace, configmap, and secrets definitions to dedicated deployment files for OpenSearch, Neo4j, and the main application itself. This containerized approach, combined with detailed deployment documentation and automated scripts, ensures that the transition from a local Docker environment to a production-ready Kubernetes cluster is seamless and highly configurable.
Conclusion
In conclusion, the synergy of these technologies creates a document intelligence system that is far greater than the sum of its parts. By combining the high-fidelity parsing of Docling with the lightning-fast retrieval of OpenSearch, Bob has engineered a pipeline that transforms raw, unstructured data into a structured, searchable, and deeply interconnected knowledge base.
While Docling ensures no detail is lost during ingestion and OpenSearch provides the semantic foundation, the integration of Neo4j adds a layer of relational intelligence that traditional RAG systems simply cannot match. Orchestrated through a modern Streamlit interface and powered by local Ollama LLMs, this âBob-builtâ architecture offers a secure, scalable, and transparent solution for modern AI challenges. Whether deployed via Docker or Kubernetes and enhanced by GPU acceleration, the combined power of these tools provides a robust blueprint for the future of GraphRAG and intelligent data exploration.
>>> Thanks for reading <<<
Links
- Code repository of this implementation: https://github.com/aairom/opensearch-docling-graphrag/tree/main
- OpenSearch: https://opensearch.org/
- Neo4j developer blog: https://neo4j.com/blog/developer/
- Docling Project: https://github.com/docling-project/docling
- IBM Bob: https://www.ibm.com/products/bob










Top comments (0)