DEV Community

Alain Airom
Alain Airom

Posted on

Architecting Next-Gen RAG: Integrating OpenSearch, Neo4j, and Docling

Mastering the Graph: How Bob Built a Smarter RAG for Unstructured Data

🚀 The Vision: Engineering the Ultimate Knowledge Engine — The Idea Behind the Tool

This project represents one of my most ambitious “public” collaborations with Bob, engineered by fusing the “best-of-breed” technologies into a single, cohesive powerhouse. To appreciate the sheer capability of this system, one must look at the two essential “brains” driving the operation: OpenSearch and Neo4j.

While both are titans of data management, they approach information from fundamentally different angles. When synchronized, they solve the most persistent “headaches” in modern AI — specifically the issues of context, accuracy, and scale. Finally, to feed these brains, we’ve integrated Docling. When it comes to transforming messy, unstructured data into machine-readable intelligence, Docling stands in a league of its own; no other tool on the market can match its precision in meeting the rigorous requirements of high-fidelity document processing.


🧠 The Dual-Brain Architecture

OpenSearch: The High-Speed Librarian

OpenSearch is an open-source search and analytics suite. At its core, it is a distributed search engine designed to handle massive amounts of data with lightning-fast speed.

  • Usage in RAG: In a standard Retrieval-Augmented Generation (RAG) setup, OpenSearch acts as the “Vector Database.” It takes the text chunks Bob extracted with Docling and converts them into “embeddings” (mathematical vectors). When you ask a question, OpenSearch performs a Similarity Search — finding the text that “feels” most similar to your query, even if the exact words don’t match.
  • Market Dominance: It is heavily used because it offers Hybrid Search. It can combine classic keyword matching (finding the exact word “iPhone”) with semantic vector search (understanding you mean “smartphone”), providing a level of precision that pure vector databases often miss.

Neo4j: The Master of Connections

Neo4j is the world’s leading Graph Database. Unlike traditional databases that store data in rows and columns, Neo4j stores data as Nodes (entities like “Bob” or “OpenSearch”) and Relationships (the lines connecting them, like “Created By”).

  • Usage in GraphRAG: While OpenSearch finds similar text, Neo4j finds related facts. In a GraphRAG implementation, Neo4j allows the AI to perform “multi-hop reasoning.” For example, if you ask, “What other tools did the creator of OpenSearch-Docling-GraphRAG build?” the system can hop from the Tool to Bob, and then to all of Bob’s other Projects.
  • Market Dominance: Neo4j is the “gold standard” because it uses Index-Free Adjacency. This is a technical way of saying it doesn’t have to search through a giant list to find a connection; it simply follows the physical pointers from one piece of data to the next, making it incredibly fast for complex, interconnected queries.

Why the Market is Obsessed with this Duo

The tech world is moving away from “Basic RAG” and toward “GraphRAG” for three main reasons:

  • Eliminating Hallucinations: Traditional RAG can sometimes grab two unrelated text chunks that just happen to sound similar. GraphRAG ensures the data is factually linked in a graph, providing a “ground truth” that prevents the AI from making up connections.
  • Contextual Depth: By using Docling to parse documents into a graph structure, Bob’s tool can understand the hierarchy of a PDF (e.g., this sub-header belongs to this section), which a flat search engine might lose.
  • Explainability: If the AI gives a weird answer, you can look at the Neo4j graph and see exactly which “path” it took to get there. It turns the “Black Box” of AI into a transparent map.

Let’s jump into practical implementation of this Project 🪂

🛠️ The Implementation: How Bob Connects the Dots

Application’s Architecture

Under the hood, the system is orchestrated via a Streamlit UI, utilizing Ollama for local LLM inference and embedding generation to ensure data privacy.

This synergy allows for a Hybrid RAG approach: the system first retrieves relevant context from OpenSearch, then enriches it with relationship data from Neo4j, resulting in answers that are both contextually deep and factually grounded.

| Feature     | Technology     | Usage in this Tool                                    |
| ----------- | -------------- | ----------------------------------------------------- |
| **Parsing** | **Docling**    | Processes PDF, DOCX, and PPTX into structured data.   |
| **Search**  | **OpenSearch** | Powers semantic retrieval via vector embeddings.      |
| **Graph**   | **Neo4j**      | Stores entity relationships for GraphRAG queries.     |
| **Model**   | **Ollama**     | Runs local models like `ibm/granite4` for generation. |
Enter fullscreen mode Exit fullscreen mode

Components Architecture


Features Implemented so far…

🕸️ Neo4j (Knowledge Graph)

  • Knowledge Graph Construction: Automatically builds a relational map of entities (88+ currently identified) and their connections from processed documents.
  • Interactive Visualization: Features a “Graph Explorer” using PyVis that allows users to click, drag, and zoom through three view types: Entity Graph, Document Structure, and Full Graph.
  • Relationship Mapping: Implements a fixed MENTIONS relationship logic that correctly links document chunks to entities (like People, Organizations, and Locations) using both unique IDs and names.
  • Multi-Hop Reasoning: Enables the AI to follow “paths” between entities, allowing it to answer complex questions that require connecting facts across different documents.

🔍 OpenSearch (Vector Search)

  • Semantic Retrieval: Functions as the system’s high-speed vector store, performing similarity searches on document embeddings to find contextually relevant information.
  • High-Volume Indexing: Capable of efficiently managing and searching hundreds of document chunks (696+ in the current build).
  • Metadata Integration: Stores document text alongside metadata, ensuring that search results are tied back to their original sources for transparency.

🔮 GraphQL API

  • Flexible Data Retrieval: Provides a dedicated playground at http://localhost:8000/api/graphql for running complex, nested queries.
  • Comprehensive Schema: Supports queries for health checks, semantic searches, RAG queries, and detailed graph statistics.
  • Entity Exploration: Allows developers to list entities by type (e.g., “Person”) and see their document counts and connection distances.

🌐 REST API

  • Standardized Endpoints: Offers traditional HTTP endpoints for all core functions, including document uploading, batch processing, and RAG-based question answering.
  • Background Job Management: Includes endpoints to trigger batch jobs and monitor their progress (e.g., % completed) via unique job IDs.
  • System Monitoring: Provides health check and configuration endpoints that report the status of connected services like Neo4j, OpenSearch, and Ollama.

🚀 GPU Acceleration

  • Performance Optimization: When enabled, it provides a 2–3x boost in document processing speed and a 3–5x increase in embedding generation speed.
  • Configurable Resources: Users can manage GPU usage via environment variables, specifically setting GPU_DEVICE_ID and GPU_MEMORY_FRACTION to optimize hardware allocation.
  • NVIDIA CUDA Support: Designed to leverage NVIDIA hardware to handle heavy batch processing tasks more efficiently than a standard CPU.

🛫 Getting Started: Deploying the Powerhouse in 5 Minutes

Bob designed this system to be “plug-and-play” for developers. Whether you are running it locally or in a containerized environment, you can go from zero to a fully functional GraphRAG system in minutes.

Prerequisites
Before diving in, ensure your machine meets these basic requirements:

  • Python: Version 3.11, 3.12, or 3.13.
  • Docker & Docker Compose: For orchestrating OpenSearch and Neo4j.
  • Ollama: To run local LLMs like ibm/granite4:latest and granite-embedding:278m.
  • Hardware: At least 8GB RAM and 10GB disk space.
    The Setup Process
    To get the system up and running, follow Bob’s streamlined installation path:

  • Clone & Configure: Clone the repository and set up your environment variables by copying the .env.example file to .env.

  • Pull AI Models: Use Ollama to download the necessary intelligence models:

ollama pull ibm/granite4:latest ollama pull granite-embedding:278m
Enter fullscreen mode Exit fullscreen mode
  • Launch: Execute the startup script:
./start.sh
Enter fullscreen mode Exit fullscreen mode
  • This script automatically builds the Python environment, installs dependencies, and starts the Docker services for OpenSearch and Neo4j.
"""
Main Streamlit application for OpenSearch-Docling-GraphRAG.
"""
import streamlit as st
from streamlit_option_menu import option_menu
import os
from pathlib import Path
from datetime import datetime
import json
from loguru import logger
import sys

# Configure logger
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("logs/app_{time}.log", rotation="1 day", retention="7 days", level="DEBUG")

# Import application modules
from src.processors import DoclingProcessor
from src.rag import OpenSearchClient, OllamaClient
from src.graphrag import Neo4jClient, GraphBuilder, GraphVisualizer
from config.settings import settings

# Page configuration
st.set_page_config(
    page_title="OpenSearch-Docling-GraphRAG",
    page_icon="📚",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Initialize session state
if 'processor' not in st.session_state:
    st.session_state.processor = None
if 'opensearch_client' not in st.session_state:
    st.session_state.opensearch_client = None
if 'ollama_client' not in st.session_state:
    st.session_state.ollama_client = None
if 'neo4j_client' not in st.session_state:
    st.session_state.neo4j_client = None
if 'graph_builder' not in st.session_state:
    st.session_state.graph_builder = None
if 'graph_visualizer' not in st.session_state:
    st.session_state.graph_visualizer = None
if 'initialized' not in st.session_state:
    st.session_state.initialized = False


def initialize_clients():
    """Initialize all clients."""
    try:
        with st.spinner("Initializing clients..."):
            if not st.session_state.initialized:
                st.session_state.processor = DoclingProcessor()
                st.session_state.opensearch_client = OpenSearchClient()
                st.session_state.ollama_client = OllamaClient()
                st.session_state.neo4j_client = Neo4jClient()
                st.session_state.graph_builder = GraphBuilder(st.session_state.neo4j_client)
                st.session_state.graph_visualizer = GraphVisualizer(st.session_state.neo4j_client)
                st.session_state.initialized = True
                st.success("✅ All clients initialized successfully!")
                logger.info("All clients initialized")
    except Exception as e:
        st.error(f"❌ Error initializing clients: {str(e)}")
        logger.error(f"Initialization error: {str(e)}")


def process_single_file(uploaded_file):
    """Process a single uploaded file."""
    try:
        # Save uploaded file temporarily
        temp_path = Path(settings.input_dir) / uploaded_file.name
        with open(temp_path, 'wb') as f:
            f.write(uploaded_file.getbuffer())

        # Process document
        with st.spinner(f"Processing {uploaded_file.name}..."):
            doc_data = st.session_state.processor.process_document(str(temp_path))

            # Generate embeddings
            texts = [chunk['text'] for chunk in doc_data['chunks']]
            embeddings = st.session_state.ollama_client.generate_embeddings_batch(texts)

            # Index in OpenSearch
            document_id = f"{Path(uploaded_file.name).stem}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
            st.session_state.opensearch_client.index_document(
                document_id=document_id,
                file_name=uploaded_file.name,
                file_path=str(temp_path),
                chunks=doc_data['chunks'],
                embeddings=embeddings,
                metadata=doc_data['metadata']
            )

            # Build knowledge graph
            st.session_state.graph_builder.build_document_graph(
                document_id=document_id,
                file_name=uploaded_file.name,
                file_path=str(temp_path),
                chunks=doc_data['chunks'],
                metadata=doc_data['metadata']
            )

            # Save output
            output_file = st.session_state.processor.save_output(doc_data, settings.output_dir)

            return {
                'success': True,
                'document_id': document_id,
                'output_file': output_file,
                'chunks': len(doc_data['chunks'])
            }

    except Exception as e:
        logger.error(f"Error processing file: {str(e)}")
        return {'success': False, 'error': str(e)}


def process_batch_files():
    """Process all files in the input directory."""
    input_dir = Path(settings.input_dir)
    files = list(input_dir.glob('*'))
    files = [f for f in files if f.is_file() and not f.name.startswith('.')]

    if not files:
        st.warning("No files found in input directory")
        return

    progress_bar = st.progress(0)
    status_text = st.empty()
    results = []

    for i, file_path in enumerate(files):
        status_text.text(f"Processing {file_path.name} ({i+1}/{len(files)})")

        try:
            # Process document
            doc_data = st.session_state.processor.process_document(str(file_path))

            # Generate embeddings
            texts = [chunk['text'] for chunk in doc_data['chunks']]
            embeddings = st.session_state.ollama_client.generate_embeddings_batch(texts)

            # Index in OpenSearch
            document_id = f"{file_path.stem}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
            st.session_state.opensearch_client.index_document(
                document_id=document_id,
                file_name=file_path.name,
                file_path=str(file_path),
                chunks=doc_data['chunks'],
                embeddings=embeddings,
                metadata=doc_data['metadata']
            )

            # Build knowledge graph
            st.session_state.graph_builder.build_document_graph(
                document_id=document_id,
                file_name=file_path.name,
                file_path=str(file_path),
                chunks=doc_data['chunks'],
                metadata=doc_data['metadata']
            )

            # Save output
            output_file = st.session_state.processor.save_output(doc_data, settings.output_dir)

            results.append({
                'file': file_path.name,
                'status': 'success',
                'document_id': document_id,
                'chunks': len(doc_data['chunks'])
            })

        except Exception as e:
            logger.error(f"Error processing {file_path.name}: {str(e)}")
            results.append({
                'file': file_path.name,
                'status': 'failed',
                'error': str(e)
            })

        progress_bar.progress((i + 1) / len(files))

    status_text.text("Batch processing complete!")
    return results


def main():
    """Main application."""

    # Sidebar
    with st.sidebar:
        st.title("📚 Document RAG System")

        selected = option_menu(
            menu_title=None,
            options=["Home", "Upload", "Batch Process", "Search", "Graph Explorer", "Settings"],
            icons=["house", "cloud-upload", "files", "search", "diagram-3", "gear"],
            default_index=0,
        )

        st.divider()

        # System status
        st.subheader("System Status")
        if st.session_state.initialized:
            st.success("✅ System Ready")

            # Show statistics
            try:
                doc_count = st.session_state.opensearch_client.get_document_count()
                st.metric("Indexed Chunks", doc_count)

                graph_stats = st.session_state.graph_builder.get_graph_summary()
                st.metric("Documents in Graph", graph_stats.get('total_documents', 0))
                st.metric("Entities", graph_stats.get('total_entities', 0))
            except:
                pass
        else:
            st.warning("⚠️ System Not Initialized")
            if st.button("Initialize System"):
                initialize_clients()

    # Main content
    if selected == "Home":
        st.title("🏠 Welcome to OpenSearch-Docling-GraphRAG")
        st.markdown("""
        ### A Comprehensive Document Processing and RAG System

        This application combines:
        - **Docling**: Advanced document processing
        - **OpenSearch**: Vector search and retrieval
        - **Neo4j**: Knowledge graph construction
        - **Ollama**: Local LLM for embeddings and generation

        #### Features:
        - 📄 Process various document formats (PDF, DOCX, PPTX, etc.)
        - 🔍 Semantic search with vector embeddings
        - 🕸️ Knowledge graph construction and exploration
        - 💬 RAG-based question answering
        - 📊 Batch processing capabilities

        #### Getting Started:
        1. Initialize the system using the sidebar button
        2. Upload documents or process batch files
        3. Search and query your documents
        4. Explore the knowledge graph
        """)

        if not st.session_state.initialized:
            st.info("👈 Please initialize the system from the sidebar to get started")

    elif selected == "Upload":
        st.title("📤 Upload Documents")

        if not st.session_state.initialized:
            st.warning("Please initialize the system first")
            return

        uploaded_file = st.file_uploader(
            "Choose a document",
            type=['pdf', 'docx', 'pptx', 'txt', 'md', 'html']
        )

        if uploaded_file:
            st.info(f"File: {uploaded_file.name} ({uploaded_file.size} bytes)")

            if st.button("Process Document"):
                result = process_single_file(uploaded_file)

                if result['success']:
                    st.success(f"✅ Document processed successfully!")
                    st.json({
                        'Document ID': result['document_id'],
                        'Chunks Created': result['chunks'],
                        'Output File': result['output_file']
                    })
                else:
                    st.error(f"❌ Error: {result['error']}")

    elif selected == "Batch Process":
        st.title("📁 Batch Process Documents")

        if not st.session_state.initialized:
            st.warning("Please initialize the system first")
            return

        st.info(f"Input directory: {settings.input_dir}")

        input_dir = Path(settings.input_dir)
        files = list(input_dir.glob('*'))
        files = [f for f in files if f.is_file() and not f.name.startswith('.')]

        st.write(f"Found {len(files)} files:")
        for f in files:
            st.text(f"  • {f.name}")

        if st.button("Process All Files"):
            if files:
                results = process_batch_files()

                # Display results
                if results:
                    st.subheader("Processing Results")
                    success_count = sum(1 for r in results if r['status'] == 'success')
                    st.metric("Successfully Processed", f"{success_count}/{len(results)}")

                    # Show details
                    for result in results:
                        if result['status'] == 'success':
                            st.success(f"✅ {result['file']} - {result['chunks']} chunks")
                        else:
                            st.error(f"❌ {result['file']} - {result['error']}")
            else:
                st.warning("No files to process")

    elif selected == "Search":
        st.title("🔍 Search Documents")

        if not st.session_state.initialized:
            st.warning("Please initialize the system first")
            return

        query = st.text_input("Enter your question:")
        k = st.slider("Number of results", 1, 10, 5)

        if query and st.button("Search"):
            with st.spinner("Searching..."):
                # Generate query embedding
                query_embedding = st.session_state.ollama_client.generate_embedding(query)

                # Search
                results = st.session_state.opensearch_client.search(
                    query_embedding=query_embedding,
                    k=k
                )

                # Generate RAG response
                rag_response = st.session_state.ollama_client.generate_rag_response(
                    query=query,
                    retrieved_docs=results
                )

                # Display answer
                st.subheader("Answer")
                st.write(rag_response['answer'])

                # Display sources
                st.subheader("Sources")
                for i, source in enumerate(rag_response['sources'], 1):
                    with st.expander(f"Source {i}: {source['file_name']} (Score: {source['score']:.4f})"):
                        st.write(f"Chunk ID: {source['chunk_id']}")

    elif selected == "Graph Explorer":
        st.title("🕸️ Knowledge Graph Explorer")

        if not st.session_state.initialized:
            st.warning("Please initialize the system first")
            return

        # Tabs for different views
        viz_tab, search_tab, stats_tab = st.tabs(["📊 Visualize", "🔍 Search", "📈 Statistics"])

        with viz_tab:
            st.subheader("Interactive Graph Visualization")

            # Visualization options
            viz_type = st.radio(
                "Select visualization type:",
                ["Entity Graph", "Document Structure", "Full Graph"],
                horizontal=True
            )

            if viz_type == "Entity Graph":
                st.markdown("**Visualize an entity and its connections**")

                col1, col2 = st.columns([3, 1])
                with col1:
                    entity_name = st.text_input(
                        "Enter entity name:",
                        placeholder="e.g., Bob, Python, AI",
                        key="entity_viz"
                    )
                with col2:
                    max_depth = st.number_input("Max depth:", min_value=1, max_value=5, value=2)

                if st.button("🎨 Visualize Entity", type="primary"):
                    if entity_name:
                        try:
                            with st.spinner(f"Generating graph for '{entity_name}'..."):
                                html = st.session_state.graph_visualizer.visualize_entity_graph(
                                    entity_name=entity_name,
                                    max_depth=max_depth,
                                    max_nodes=50
                                )
                                st.session_state.graph_visualizer.render_graph(html)

                                st.success(f"✅ Graph generated for '{entity_name}'")
                                st.info("💡 **Tip:** Click and drag nodes, scroll to zoom, hover for details")

                        except Exception as e:
                            st.error(f"❌ Error: {str(e)}")
                            st.info("💡 Try searching for a different entity or check if documents have been processed")
                    else:
                        st.warning("⚠️ Please enter an entity name")

            elif viz_type == "Document Structure":
                st.markdown("**Visualize document structure with chunks and entities**")

                # Get list of documents
                try:
                    with st.session_state.neo4j_client.driver.session() as session:
                        result = session.run("MATCH (d:Document) RETURN d.id as id, d.file_name as name")
                        documents = [(r['id'], r['name']) for r in result]

                    if documents:
                        doc_options = ["All Documents"] + [f"{name} ({id[:8]}...)" for id, name in documents]
                        selected_doc = st.selectbox("Select document:", doc_options)

                        if st.button("🎨 Visualize Document", type="primary"):
                            try:
                                with st.spinner("Generating document graph..."):
                                    if selected_doc == "All Documents":
                                        html = st.session_state.graph_visualizer.visualize_document_graph(
                                            document_id=None,
                                            max_nodes=100
                                        )
                                    else:
                                        # Extract document ID from selection
                                        doc_id = [id for id, name in documents if name in selected_doc][0]
                                        html = st.session_state.graph_visualizer.visualize_document_graph(
                                            document_id=doc_id,
                                            max_nodes=100
                                        )

                                    st.session_state.graph_visualizer.render_graph(html)
                                    st.success("✅ Document graph generated")

                                    # Legend
                                    st.markdown("---")
                                    st.markdown("**Legend:**")
                                    col1, col2, col3 = st.columns(3)
                                    with col1:
                                        st.markdown("🔵 **Blue Box** = Document")
                                    with col2:
                                        st.markdown("🟢 **Green Dot** = Chunk")
                                    with col3:
                                        st.markdown("🔴 **Red Star** = Entity")

                            except Exception as e:
                                st.error(f"❌ Error: {str(e)}")
                    else:
                        st.info("📄 No documents found. Upload and process documents first.")

                except Exception as e:
                    st.error(f"❌ Error fetching documents: {str(e)}")

            else:  # Full Graph
                st.markdown("**Visualize the entire knowledge graph**")
                st.warning("⚠️ This may be slow for large graphs")

                max_nodes = st.slider("Maximum nodes to display:", 10, 200, 50)

                if st.button("🎨 Visualize Full Graph", type="primary"):
                    try:
                        with st.spinner("Generating full graph..."):
                            html = st.session_state.graph_visualizer.visualize_document_graph(
                                document_id=None,
                                max_nodes=max_nodes
                            )
                            st.session_state.graph_visualizer.render_graph(html)
                            st.success("✅ Full graph generated")

                    except Exception as e:
                        st.error(f"❌ Error: {str(e)}")

        with search_tab:
            st.subheader("🔍 Search Entities")

            entity_name = st.text_input("Search for entity:", key="search_entity")

            if st.button("Search", key="search_btn"):
                if entity_name:
                    try:
                        # Search for entity
                        with st.session_state.neo4j_client.driver.session() as session:
                            result = session.run("""
                                MATCH (e:Entity)
                                WHERE toLower(e.name) CONTAINS toLower($name)
                                RETURN e.name as name, id(e) as id
                                LIMIT 20
                            """, name=entity_name)

                            entities = list(result)

                            if entities:
                                st.success(f"Found {len(entities)} entities:")

                                for entity in entities:
                                    with st.expander(f"🔴 {entity['name']}"):
                                        # Get connections
                                        conn_result = session.run("""
                                            MATCH (e:Entity)-[r]-(n)
                                            WHERE id(e) = $id
                                            RETURN type(r) as rel_type, labels(n) as labels, count(*) as count
                                        """, id=entity['id'])

                                        connections = list(conn_result)

                                        if connections:
                                            st.markdown("**Connections:**")
                                            for conn in connections:
                                                st.text(f"  • {conn['rel_type']} → {conn['labels'][0]}: {conn['count']}")

                                        # Visualize button
                                        if st.button(f"Visualize {entity['name']}", key=f"viz_{entity['id']}"):
                                            html = st.session_state.graph_visualizer.visualize_entity_graph(
                                                entity_name=entity['name'],
                                                max_depth=2
                                            )
                                            st.session_state.graph_visualizer.render_graph(html)
                            else:
                                st.info(f"No entities found matching '{entity_name}'")

                    except Exception as e:
                        st.error(f"❌ Error: {str(e)}")
                else:
                    st.warning("Please enter an entity name")

        with stats_tab:
            st.subheader("📈 Graph Statistics")

            try:
                with st.session_state.neo4j_client.driver.session() as session:
                    # Get counts
                    stats = {}

                    # Document count
                    result = session.run("MATCH (d:Document) RETURN count(d) as count")
                    stats['documents'] = result.single()['count']

                    # Chunk count
                    result = session.run("MATCH (c:Chunk) RETURN count(c) as count")
                    stats['chunks'] = result.single()['count']

                    # Entity count
                    result = session.run("MATCH (e:Entity) RETURN count(e) as count")
                    stats['entities'] = result.single()['count']

                    # Relationship count
                    result = session.run("MATCH ()-[r]->() RETURN count(r) as count")
                    stats['relationships'] = result.single()['count']

                    # Display stats
                    col1, col2, col3, col4 = st.columns(4)
                    with col1:
                        st.metric("Documents", stats['documents'])
                    with col2:
                        st.metric("Chunks", stats['chunks'])
                    with col3:
                        st.metric("Entities", stats['entities'])
                    with col4:
                        st.metric("Relationships", stats['relationships'])

                    st.markdown("---")

                    # Top entities
                    st.markdown("**Top 10 Most Connected Entities:**")
                    result = session.run("""
                        MATCH (e:Entity)-[r]-()
                        RETURN e.name as name, count(r) as connections
                        ORDER BY connections DESC
                        LIMIT 10
                    """)

                    top_entities = list(result)
                    if top_entities:
                        for i, entity in enumerate(top_entities, 1):
                            st.text(f"{i}. {entity['name']}: {entity['connections']} connections")
                    else:
                        st.info("No entities found")

            except Exception as e:
                st.error(f"❌ Error fetching statistics: {str(e)}")

    elif selected == "Settings":
        st.title("⚙️ Settings")

        st.subheader("Configuration")
        st.json({
            'Ollama Model': settings.ollama_model,
            'Embedding Model': settings.ollama_embedding_model,
            'OpenSearch Host': f"{settings.opensearch_host}:{settings.opensearch_port}",
            'Neo4j URI': settings.neo4j_uri,
            'Input Directory': settings.input_dir,
            'Output Directory': settings.output_dir,
            'Chunk Size': settings.chunk_size,
            'Chunk Overlap': settings.chunk_overlap
        })


if __name__ == "__main__":
    main()

# Made with Bob
Enter fullscreen mode Exit fullscreen mode

The First Workflow


Once the application is live at http://localhost:8501, you can begin processing data immediately:

  • Initialize: Click the “Initialize System” button in the sidebar to connect all “brains”.

  • Upload: Use the Upload tab to process a single document or drop files into the /input folder for Batch Processing.
  • Query: Navigate to the Search tab to ask questions. The system will retrieve the best text chunks from OpenSearch and use the Neo4j graph to provide a deep, contextual answer.

A focus on Knowledge Graph Implementation

Bob’s system elevates standard data retrieval by implementing a sophisticated Knowledge Graph via Neo4j, transforming flat document text into an interconnected web of intelligence. Unlike traditional databases, this implementation maps complex relationships between 88+ identified entities — such as people, organizations, and locations — using a specialized MENTIONS relationship logic that links specific document chunks to their real-world subjects. By leveraging the Graph Explorer, users can perform multi-hop reasoning to uncover hidden connections across disparate documents, effectively turning a static library into a dynamic, navigable map of organizational knowledge.

  • Relational Mapping: Automatically extracts and connects entities from processed documents.

  • Interactive Visualization: Features a PyVis-powered explorer with three specialized views: Entity Graph, Document Structure, and Full Graph.
  • Visual Discovery: Utilizes a color-coded legend where Blue Boxes represent documents, Green Circles signify chunks, and Red Stars indicate key entities.
  • Deep Exploration: Allows for adjustable depth searches (1–5 levels) to visualize how a single entity like “Bob” connects to the broader database.
  • Local neo4j portal: There is also local access to the neo4j server.
"""
Neo4j client for graph database operations.
"""
from typing import List, Dict, Any, Optional
from neo4j import GraphDatabase
from loguru import logger

from config.settings import settings

class Neo4jClient:
    """Client for interacting with Neo4j graph database."""

    def __init__(self):
        """Initialize Neo4j client."""
        self.driver = GraphDatabase.driver(
            settings.neo4j_uri,
            auth=(settings.neo4j_user, settings.neo4j_password)
        )
        self._verify_connectivity()
        logger.info("Neo4j client initialized")

    def _verify_connectivity(self):
        """Verify connection to Neo4j."""
        try:
            with self.driver.session() as session:
                session.run("RETURN 1")
            logger.info("Neo4j connection verified")
        except Exception as e:
            logger.error(f"Neo4j connection failed: {str(e)}")
            raise

    def close(self):
        """Close the driver connection."""
        self.driver.close()
        logger.info("Neo4j connection closed")

    def _flatten_metadata(self, metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Flatten nested metadata dictionaries for Neo4j compatibility.
        Neo4j only accepts primitive types and arrays as property values.

        Args:
            metadata: Metadata dictionary (possibly nested)

        Returns:
            Flattened metadata dictionary
        """
        if not metadata:
            return {}

        flattened = {}
        for key, value in metadata.items():
            if isinstance(value, dict):
                # Flatten nested dict with dot notation
                for nested_key, nested_value in value.items():
                    flat_key = f"{key}_{nested_key}"
                    if isinstance(nested_value, (str, int, float, bool)):
                        flattened[flat_key] = nested_value
                    else:
                        flattened[flat_key] = str(nested_value)
            elif isinstance(value, (str, int, float, bool)):
                flattened[key] = value
            elif isinstance(value, list):
                # Convert list elements to strings if not primitive
                flattened[key] = [str(v) if not isinstance(v, (str, int, float, bool)) else v for v in value]
            else:
                flattened[key] = str(value)

        return flattened

    def create_document_node(
        self,
        document_id: str,
        file_name: str,
        file_path: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """
        Create a document node in the graph.

        Args:
            document_id: Unique document identifier
            file_name: Name of the file
            file_path: Path to the file
            metadata: Additional metadata

        Returns:
            Created node information
        """
        # Flatten metadata to avoid nested dict issues
        flat_metadata = self._flatten_metadata(metadata)

        with self.driver.session() as session:
            query = """
            MERGE (d:Document {id: $document_id})
            SET d.file_name = $file_name,
                d.file_path = $file_path,
                d += $metadata,
                d.created_at = datetime()
            RETURN d
            """
            result = session.run(
                query,
                document_id=document_id,
                file_name=file_name,
                file_path=file_path,
                metadata=flat_metadata
            )
            record = result.single()
            logger.info(f"Created document node: {document_id}")
            return dict(record["d"])

    def create_chunk_node(
        self,
        document_id: str,
        chunk_id: int,
        text: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """
        Create a chunk node and link it to its document.

        Args:
            document_id: Parent document identifier
            chunk_id: Chunk identifier
            text: Chunk text
            metadata: Additional metadata

        Returns:
            Created node information
        """
        # Flatten metadata to avoid nested dict issues
        flat_metadata = self._flatten_metadata(metadata)

        with self.driver.session() as session:
            query = """
            MATCH (d:Document {id: $document_id})
            CREATE (c:Chunk {id: $chunk_id, document_id: $document_id})
            SET c.text = $text,
                c += $metadata,
                c.created_at = datetime()
            CREATE (d)-[:HAS_CHUNK]->(c)
            RETURN c
            """
            result = session.run(
                query,
                document_id=document_id,
                chunk_id=chunk_id,
                text=text,
                metadata=flat_metadata
            )
            record = result.single()
            return dict(record["c"])

    def create_entity_node(
        self,
        entity_name: str,
        entity_type: str,
        properties: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """
        Create an entity node.

        Args:
            entity_name: Name of the entity
            entity_type: Type of entity (Person, Organization, Location, etc.)
            properties: Additional properties

        Returns:
            Created node information
        """
        with self.driver.session() as session:
            query = f"""
            MERGE (e:Entity:{entity_type} {{name: $entity_name}})
            SET e += $properties,
                e.created_at = coalesce(e.created_at, datetime())
            RETURN e
            """
            result = session.run(
                query,
                entity_name=entity_name,
                properties=properties or {}
            )
            record = result.single()
            return dict(record["e"])

    def create_relationship(
        self,
        from_node_id: str,
        to_node_id: str,
        relationship_type: str,
        properties: Optional[Dict[str, Any]] = None
    ):
        """
        Create a relationship between two nodes.

        Args:
            from_node_id: Source node ID (or name for entities)
            to_node_id: Target node ID (or name for entities)
            relationship_type: Type of relationship
            properties: Relationship properties
        """
        with self.driver.session() as session:
            # Convert IDs to appropriate types
            # Try to convert to int for chunk IDs, keep as string for entity names
            def convert_id(id_val):
                try:
                    return int(id_val)
                except (ValueError, TypeError):
                    return id_val

            from_id_int = convert_id(from_node_id)
            to_id_int = convert_id(to_node_id)

            # Try to match by id (as int or string) or by name (for entities)
            query = f"""
            MATCH (a)
            WHERE a.id = $from_id OR a.id = $from_id_int OR a.name = $from_id
            MATCH (b)
            WHERE b.id = $to_id OR b.id = $to_id_int OR b.name = $to_id
            MERGE (a)-[r:{relationship_type}]->(b)
            SET r += $properties
            RETURN r
            """
            result = session.run(
                query,
                from_id=from_node_id,
                from_id_int=from_id_int,
                to_id=to_node_id,
                to_id_int=to_id_int,
                properties=properties or {}
            )
            # Check if relationship was created
            record = result.single()
            if not record:
                logger.warning(f"Could not create relationship {relationship_type} from {from_node_id} to {to_node_id}")

    def find_related_documents(
        self,
        entity_name: str,
        max_depth: int = 2
    ) -> List[Dict[str, Any]]:
        """
        Find documents related to an entity.

        Args:
            entity_name: Name of the entity
            max_depth: Maximum relationship depth

        Returns:
            List of related documents
        """
        with self.driver.session() as session:
            query = """
            MATCH (e:Entity {name: $entity_name})
            MATCH path = (e)-[*1..%d]-(d:Document)
            RETURN DISTINCT d.id as document_id, 
                   d.file_name as file_name,
                   length(path) as distance
            ORDER BY distance
            """ % max_depth

            result = session.run(query, entity_name=entity_name)
            return [dict(record) for record in result]

    def get_document_graph(self, document_id: str) -> Dict[str, Any]:
        """
        Get the graph structure for a document.

        Args:
            document_id: Document identifier

        Returns:
            Graph structure with nodes and relationships
        """
        with self.driver.session() as session:
            query = """
            MATCH (d:Document {id: $document_id})
            OPTIONAL MATCH (d)-[r1:HAS_CHUNK]->(c:Chunk)
            OPTIONAL MATCH (c)-[r2]-(e:Entity)
            RETURN d, collect(DISTINCT c) as chunks, 
                   collect(DISTINCT e) as entities,
                   collect(DISTINCT r1) as chunk_rels,
                   collect(DISTINCT r2) as entity_rels
            """
            result = session.run(query, document_id=document_id)
            record = result.single()

            if not record:
                return {}

            return {
                "document": dict(record["d"]),
                "chunks": [dict(c) for c in record["chunks"] if c],
                "entities": [dict(e) for e in record["entities"] if e],
                "relationships": {
                    "chunks": len([r for r in record["chunk_rels"] if r]),
                    "entities": len([r for r in record["entity_rels"] if r])
                }
            }

    def delete_document(self, document_id: str):
        """
        Delete a document and all its related nodes.

        Args:
            document_id: Document identifier
        """
        with self.driver.session() as session:
            query = """
            MATCH (d:Document {id: $document_id})
            OPTIONAL MATCH (d)-[:HAS_CHUNK]->(c:Chunk)
            DETACH DELETE d, c
            """
            session.run(query, document_id=document_id)
            logger.info(f"Deleted document from graph: {document_id}")

    def get_statistics(self) -> Dict[str, int]:
        """
        Get database statistics.

        Returns:
            Statistics dictionary
        """
        with self.driver.session() as session:
            query = """
            MATCH (d:Document) WITH count(d) as docs
            MATCH (c:Chunk) WITH docs, count(c) as chunks
            MATCH (e:Entity) WITH docs, chunks, count(e) as entities
            MATCH ()-[r]->() WITH docs, chunks, entities, count(r) as relationships
            RETURN docs, chunks, entities, relationships
            """
            result = session.run(query)
            record = result.single()

            return {
                "documents": record["docs"],
                "chunks": record["chunks"],
                "entities": record["entities"],
                "relationships": record["relationships"]
            }

# Made with Bob
Enter fullscreen mode Exit fullscreen mode


A word on deployments…

The OpenSearch-Docling-GraphRAG application is fully “Kubernetes-ready,” providing all the essential architectural “bricks” needed to deploy a scalable, enterprise-grade document intelligence platform. The repository includes a comprehensive set of k8s manifests, covering everything from core orchestration with namespace, configmap, and secrets definitions to dedicated deployment files for OpenSearch, Neo4j, and the main application itself. This containerized approach, combined with detailed deployment documentation and automated scripts, ensures that the transition from a local Docker environment to a production-ready Kubernetes cluster is seamless and highly configurable.


Conclusion

In conclusion, the synergy of these technologies creates a document intelligence system that is far greater than the sum of its parts. By combining the high-fidelity parsing of Docling with the lightning-fast retrieval of OpenSearch, Bob has engineered a pipeline that transforms raw, unstructured data into a structured, searchable, and deeply interconnected knowledge base.

While Docling ensures no detail is lost during ingestion and OpenSearch provides the semantic foundation, the integration of Neo4j adds a layer of relational intelligence that traditional RAG systems simply cannot match. Orchestrated through a modern Streamlit interface and powered by local Ollama LLMs, this “Bob-built” architecture offers a secure, scalable, and transparent solution for modern AI challenges. Whether deployed via Docker or Kubernetes and enhanced by GPU acceleration, the combined power of these tools provides a robust blueprint for the future of GraphRAG and intelligent data exploration.

>>> Thanks for reading <<<

Links

Top comments (0)