en
Tutorials
Week 9: Service Architecture

Week 9: Service Architecture

Learning Objectives

This final week covers deploying knowledge graph systems in production. You will learn about visualization, building APIs with FastAPI, and optimization techniques.


1. Knowledge Graph Architecture

The "Restaurant Service" Analogy

We have built a powerful system, but it's still in the "Kitchen".

  • Information: Ingredients.
  • Ontology: Recipe.
  • Graph DB: The Kitchen.
  • LLM: The Chef.

Service Architecture is about opening the restaurant:

  • API (FastAPI): The Wait Staff. Takes orders (queries) and brings food (results).
  • Visualization: The Plating. Makes the data consumable.
  • Caching: Pre-prepped ingredients. Speeds up service.

2. Knowledge Graph Visualization

Effective visualization helps users explore and understand knowledge graphs.

Visualization Libraries

LibraryUse CaseFeatures
PyvisPython-basedInteractive HTML export
NetworkX + MatplotlibStatic graphsIntegration with analysis
D3.jsWeb applicationsHighly customizable
Cytoscape.jsComplex networksLayouts, styling
Neo4j BrowserNeo4j-specificBuilt-in, Cypher integration

Interactive Visualization with Pyvis

from pyvis.network import Network
from rdflib import Graph
 
def visualize_kg(rdf_graph, output_file="knowledge_graph.html"):
    """Create interactive visualization from RDF graph."""
    net = Network(height="750px", width="100%", directed=True)
 
    # Configure physics
    net.set_options("""
    {
        "physics": {
            "forceAtlas2Based": {
                "gravitationalConstant": -50,
                "springLength": 100
            },
            "solver": "forceAtlas2Based"
        }
    }
    """)
 
    # Extract nodes and edges
    nodes = set()
    for s, p, o in rdf_graph:
        nodes.add(str(s))
        if not o.startswith("http://www.w3.org"):  # Skip type assertions
            nodes.add(str(o))
 
    # Add nodes with colors based on type
    for node in nodes:
        color = get_node_color(node, rdf_graph)
        label = node.split("/")[-1].split("#")[-1]
        net.add_node(node, label=label, color=color)
 
    # Add edges
    for s, p, o in rdf_graph:
        pred_label = str(p).split("/")[-1].split("#")[-1]
        if str(o) in nodes:
            net.add_edge(str(s), str(o), label=pred_label)
 
    net.save_graph(output_file)
    return output_file
 
def get_node_color(node, graph):
    """Assign color based on node type."""
    type_colors = {
        "Person": "#FF6B6B",
        "Organization": "#4ECDC4",
        "Location": "#45B7D1",
        "Event": "#96CEB4",
        "Concept": "#FFEAA7"
    }
 
    # Query for type
    for s, p, o in graph.triples((URIRef(node), RDF.type, None)):
        type_name = str(o).split("/")[-1].split("#")[-1]
        if type_name in type_colors:
            return type_colors[type_name]
 
    return "#95A5A6"  # Default gray

D3.js Force-Directed Graph

// Frontend JavaScript for D3 visualization
function createKnowledgeGraph(data, container) {
    const width = 960;
    const height = 600;
 
    const svg = d3.select(container)
        .append("svg")
        .attr("width", width)
        .attr("height", height);
 
    const simulation = d3.forceSimulation(data.nodes)
        .force("link", d3.forceLink(data.links).id(d => d.id))
        .force("charge", d3.forceManyBody().strength(-300))
        .force("center", d3.forceCenter(width / 2, height / 2));
 
    // Create links
    const link = svg.append("g")
        .selectAll("line")
        .data(data.links)
        .enter().append("line")
        .attr("stroke", "#999")
        .attr("stroke-width", 2);
 
    // Create link labels
    const linkLabel = svg.append("g")
        .selectAll("text")
        .data(data.links)
        .enter().append("text")
        .text(d => d.label)
        .attr("font-size", "10px");
 
    // Create nodes
    const node = svg.append("g")
        .selectAll("circle")
        .data(data.nodes)
        .enter().append("circle")
        .attr("r", 10)
        .attr("fill", d => d.color)
        .call(drag(simulation));
 
    // Node labels
    const label = svg.append("g")
        .selectAll("text")
        .data(data.nodes)
        .enter().append("text")
        .text(d => d.label)
        .attr("font-size", "12px");
 
    simulation.on("tick", () => {
        link
            .attr("x1", d => d.source.x)
            .attr("y1", d => d.source.y)
            .attr("x2", d => d.target.x)
            .attr("y2", d => d.target.y);
 
        node
            .attr("cx", d => d.x)
            .attr("cy", d => d.y);
 
        label
            .attr("x", d => d.x + 12)
            .attr("y", d => d.y + 4);
    });
}

2. FastAPI for Knowledge Graph Services

Basic API Structure

from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import List, Optional
from rdflib import Graph, Namespace
 
app = FastAPI(
    title="Knowledge Graph API",
    description="API for querying and manipulating knowledge graphs",
    version="1.0.0"
)
 
# Knowledge Graph instance
kg = Graph()
kg.parse("knowledge_graph.ttl", format="turtle")
 
class Triple(BaseModel):
    subject: str
    predicate: str
    object: str
 
class QueryResult(BaseModel):
    bindings: List[dict]
    count: int
 
@app.get("/")
async def root():
    return {"message": "Knowledge Graph API", "triples": len(kg)}

CRUD Endpoints

@app.get("/entities/{entity_id}")
async def get_entity(entity_id: str):
    """Get all triples for an entity."""
    EX = Namespace("http://example.org/")
    entity_uri = EX[entity_id]
 
    triples = []
    for s, p, o in kg.triples((entity_uri, None, None)):
        triples.append({
            "subject": str(s),
            "predicate": str(p),
            "object": str(o)
        })
 
    if not triples:
        raise HTTPException(status_code=404, detail="Entity not found")
 
    return {"entity": entity_id, "triples": triples}
 
@app.post("/entities")
async def create_entity(triple: Triple):
    """Add a new triple to the knowledge graph."""
    from rdflib import URIRef, Literal
 
    subject = URIRef(triple.subject)
    predicate = URIRef(triple.predicate)
 
    # Determine if object is URI or literal
    if triple.object.startswith("http"):
        obj = URIRef(triple.object)
    else:
        obj = Literal(triple.object)
 
    kg.add((subject, predicate, obj))
 
    return {"message": "Triple added", "triple": triple}
 
@app.delete("/entities/{entity_id}")
async def delete_entity(entity_id: str):
    """Remove all triples for an entity."""
    EX = Namespace("http://example.org/")
    entity_uri = EX[entity_id]
 
    count = 0
    for triple in list(kg.triples((entity_uri, None, None))):
        kg.remove(triple)
        count += 1
 
    return {"message": f"Removed {count} triples"}

SPARQL Endpoint

@app.post("/sparql")
async def execute_sparql(query: str):
    """Execute a SPARQL query."""
    try:
        results = kg.query(query)
 
        bindings = []
        for row in results:
            binding = {}
            for var in results.vars:
                value = getattr(row, str(var), None)
                binding[str(var)] = str(value) if value else None
            bindings.append(binding)
 
        return QueryResult(bindings=bindings, count=len(bindings))
 
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
 
@app.get("/search")
async def search_entities(
    q: str = Query(..., description="Search query"),
    limit: int = Query(10, le=100)
):
    """Full-text search across entity labels."""
    query = f"""
    PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
 
    SELECT ?entity ?label
    WHERE {{
        ?entity rdfs:label ?label .
        FILTER(CONTAINS(LCASE(?label), LCASE("{q}")))
    }}
    LIMIT {limit}
    """
 
    results = kg.query(query)
    return [{"entity": str(row.entity), "label": str(row.label)}
            for row in results]

GraphRAG API

from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-4")
 
class QuestionRequest(BaseModel):
    question: str
    max_hops: int = 2
 
class AnswerResponse(BaseModel):
    answer: str
    sources: List[str]
    confidence: float
 
@app.post("/ask", response_model=AnswerResponse)
async def ask_question(request: QuestionRequest):
    """Answer questions using GraphRAG."""
    # Extract entities from question
    entities = extract_entities(request.question)
 
    # Get relevant subgraph
    subgraph = get_subgraph(kg, entities, request.max_hops)
    context = subgraph_to_text(subgraph)
 
    # Generate answer
    prompt = f"""
    Context from knowledge graph:
    {context}
 
    Question: {request.question}
 
    Answer based on the context. If the answer is not in the context, say so.
    """
 
    response = llm.invoke(prompt)
 
    return AnswerResponse(
        answer=response.content,
        sources=entities,
        confidence=0.85
    )

3. Authentication and Authorization

API Security

from fastapi import Depends, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
 
security = HTTPBearer()
 
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Verify JWT token."""
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")
 
@app.post("/entities", dependencies=[Depends(verify_token)])
async def create_entity_secured(triple: Triple):
    """Secured endpoint for adding triples."""
    # Only authenticated users can add data
    pass
 
# Role-based access
async def require_admin(payload: dict = Depends(verify_token)):
    if payload.get("role") != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    return payload
 
@app.delete("/entities/{entity_id}", dependencies=[Depends(require_admin)])
async def delete_entity_admin(entity_id: str):
    """Only admins can delete entities."""
    pass

4. Performance Optimization

Caching Strategies

from functools import lru_cache
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis
 
# Initialize Redis cache
@app.on_event("startup")
async def startup():
    redis_client = redis.Redis(host="localhost", port=6379, db=0)
    FastAPICache.init(RedisBackend(redis_client), prefix="kg-cache")
 
@app.get("/entities/{entity_id}")
@cache(expire=3600)  # Cache for 1 hour
async def get_entity_cached(entity_id: str):
    """Cached entity lookup."""
    return get_entity(entity_id)
 
# Query result caching
@lru_cache(maxsize=1000)
def cached_sparql_query(query_hash: str):
    """Cache SPARQL query results."""
    query = decode_query(query_hash)
    return execute_query(query)

Query Optimization

class QueryOptimizer:
    def __init__(self, kg):
        self.kg = kg
        self.statistics = self._compute_statistics()
 
    def _compute_statistics(self):
        """Compute predicate selectivity statistics."""
        stats = {}
        for p in set(self.kg.predicates()):
            count = len(list(self.kg.triples((None, p, None))))
            stats[str(p)] = count
        return stats
 
    def optimize_query(self, query):
        """Reorder query patterns for better performance."""
        # Parse query patterns
        patterns = parse_patterns(query)
 
        # Sort by selectivity (most selective first)
        sorted_patterns = sorted(
            patterns,
            key=lambda p: self.statistics.get(p['predicate'], float('inf'))
        )
 
        return reconstruct_query(sorted_patterns)
 
# Materialized views for common queries
class MaterializedView:
    def __init__(self, kg, query, refresh_interval=3600):
        self.kg = kg
        self.query = query
        self.cache = None
        self.refresh_interval = refresh_interval
        self.last_refresh = 0
 
    def get(self):
        """Get cached results, refreshing if stale."""
        import time
        now = time.time()
 
        if self.cache is None or (now - self.last_refresh) > self.refresh_interval:
            self.cache = list(self.kg.query(self.query))
            self.last_refresh = now
 
        return self.cache

Indexing Strategies

# For Neo4j
def create_indexes(graph):
    """Create indexes for common query patterns."""
    indexes = [
        "CREATE INDEX entity_name FOR (n:Entity) ON (n.name)",
        "CREATE INDEX entity_type FOR (n:Entity) ON (n.type)",
        "CREATE FULLTEXT INDEX entity_search FOR (n:Entity) ON EACH [n.name, n.description]"
    ]
 
    for index in indexes:
        try:
            graph.run(index)
        except Exception as e:
            print(f"Index may already exist: {e}")
 
# For RDF with rdflib + Sleepycat
def create_persistent_store():
    """Create indexed persistent store."""
    from rdflib import Graph
    from rdflib.store import Store
 
    store = plugin.get("Sleepycat", Store)()
    store.open("kg_store", create=True)
 
    g = Graph(store)
    return g

5. Monitoring and Observability

Logging and Metrics

import logging
from prometheus_client import Counter, Histogram
from fastapi import Request
import time
 
# Metrics
REQUEST_COUNT = Counter(
    "kg_requests_total",
    "Total API requests",
    ["method", "endpoint", "status"]
)
 
REQUEST_LATENCY = Histogram(
    "kg_request_duration_seconds",
    "Request latency",
    ["endpoint"]
)
 
QUERY_COUNT = Counter(
    "kg_queries_total",
    "Total SPARQL queries",
    ["query_type"]
)
 
# Logging configuration
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("kg-api")
 
@app.middleware("http")
async def log_requests(request: Request, call_next):
    """Log all requests with timing."""
    start = time.time()
 
    response = await call_next(request)
 
    duration = time.time() - start
    logger.info(
        f"{request.method} {request.url.path} - "
        f"Status: {response.status_code} - "
        f"Duration: {duration:.3f}s"
    )
 
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
 
    REQUEST_LATENCY.labels(endpoint=request.url.path).observe(duration)
 
    return response

Health Checks

@app.get("/health")
async def health_check():
    """Comprehensive health check."""
    checks = {
        "api": "healthy",
        "knowledge_graph": check_kg_health(),
        "cache": check_cache_health(),
        "database": check_db_health()
    }
 
    all_healthy = all(v == "healthy" for v in checks.values())
 
    return {
        "status": "healthy" if all_healthy else "degraded",
        "checks": checks
    }
 
def check_kg_health():
    """Verify knowledge graph is accessible."""
    try:
        count = len(kg)
        return "healthy" if count > 0 else "empty"
    except Exception:
        return "unhealthy"

6. Deployment Architecture

Docker Deployment

# Dockerfile
FROM python:3.11-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY . .
 
EXPOSE 8000
 
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
 
services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - NEO4J_URI=bolt://neo4j:7687
      - REDIS_URL=redis://redis:6379
    depends_on:
      - neo4j
      - redis
 
  neo4j:
    image: neo4j:5.12
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
      - neo4j_data:/data
 
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
 
volumes:
  neo4j_data:

Project: Movie Recommendation Knowledge Graph

Project Complete!

WeekTopicProject Milestone
1Ontology Introduction✅ Movie domain design completed
2RDF & RDFS✅ 10 movies converted to RDF
3OWL & Reasoning✅ Inference rules applied
4Knowledge Extraction✅ 100 movies auto-collected
5Neo4j✅ Graph DB constructed
6GraphRAG✅ Natural language query system completed
7Ontology Agents✅ Auto-update agent completed
8Domain Expansion✅ Medical domain expansion completed
9Service DeploymentProduction deployment

Week 9 Milestone: Movie Recommendation Service Deployment

Deploy the movie knowledge graph built over 9 weeks as a production service.

Service Architecture:

┌─────────────────────────────────────────────┐
│                   Frontend                   │
│  (React Dashboard + Chat Interface)          │
└─────────────────┬───────────────────────────┘
                  │ REST API
┌─────────────────▼───────────────────────────┐
│                FastAPI Server                │
│  /recommend  /search  /chat  /update        │
└──────┬──────────────┬───────────────────────┘
       │              │
┌──────▼──────┐ ┌─────▼─────┐
│   Neo4j     │ │   LLM     │
│ (Graph DB)  │ │ (OpenAI)  │
└─────────────┘ └───────────┘

API Endpoints:

@app.get("/recommend/{movie_title}")
# Similar movie recommendations
 
@app.post("/chat")
# Natural language queries (GraphRAG)
 
@app.post("/update")
# Add new movies (triggers agent)
 
@app.get("/graph/stats")
# Knowledge graph statistics

Deployment Checklist:

  • Neo4j Aura cloud setup
  • FastAPI server Dockerization
  • Vercel/Railway deployment
  • API key environment variables
  • Monitoring (Sentry)

Congratulations! Your 9-week project is complete.

In the project notebook, you will implement:

  • Build /recommend, /chat, /update APIs with FastAPI
  • Knowledge graph visualization dashboard with pyvis
  • Connect to Neo4j Aura cloud
  • One-click deployment with Docker Compose

Complete system: An AI agent that answers "Recommend sci-fi movies like Nolan's style" by reasoning over director-genre-rating relationships in the knowledge graph


Practice Notebook

For deeper exploration of the theory:

The practice notebook covers additional topics:

  • Kubernetes deployment configuration
  • Load testing (Locust) and performance tuning
  • Prometheus + Grafana monitoring
  • CI/CD pipeline setup

Interview Questions

How would you scale a knowledge graph service for high traffic?

Scaling Strategies:

  • Read replicas: Distribute read queries across multiple instances
  • Caching layers: Redis/Memcached for frequent queries
  • Query optimization: Indexes, materialized views, query rewriting
  • Load balancing: Distribute requests across API instances
  • Async processing: Background jobs for heavy computations
  • Graph partitioning: Shard large graphs across nodes

Premium Content

Want complete solutions with detailed explanations and production-ready code?

Check out the Ontology & Knowledge Graph Cookbook Premium (opens in a new tab) for:

  • Complete notebook solutions with step-by-step explanations
  • Real-world case studies and best practices
  • Interview preparation materials
  • Production deployment guides

Congratulations!

You have completed the Ontology & Knowledge Graph Cookbook! You now have a solid foundation in:

  • Ontology engineering and knowledge representation
  • RDF, RDFS, OWL, and semantic web technologies
  • Knowledge extraction from unstructured text
  • Graph databases and Neo4j
  • GraphRAG for enhanced question answering
  • Ontology-based AI agents
  • Domain-specific knowledge graphs
  • Production service architecture

Continue your learning journey by exploring the Premium content (opens in a new tab) for advanced topics and real-world projects.