Week 9: Service Architecture
Learning Objectives
This final week covers deploying knowledge graph systems in production. You will learn about visualization, building APIs with FastAPI, and optimization techniques.
1. Knowledge Graph Architecture
The "Restaurant Service" Analogy
We have built a powerful system, but it's still in the "Kitchen".
- Information: Ingredients.
- Ontology: Recipe.
- Graph DB: The Kitchen.
- LLM: The Chef.
Service Architecture is about opening the restaurant:
- API (FastAPI): The Wait Staff. Takes orders (queries) and brings food (results).
- Visualization: The Plating. Makes the data consumable.
- Caching: Pre-prepped ingredients. Speeds up service.
2. Knowledge Graph Visualization
Effective visualization helps users explore and understand knowledge graphs.
Visualization Libraries
| Library | Use Case | Features |
|---|---|---|
| Pyvis | Python-based | Interactive HTML export |
| NetworkX + Matplotlib | Static graphs | Integration with analysis |
| D3.js | Web applications | Highly customizable |
| Cytoscape.js | Complex networks | Layouts, styling |
| Neo4j Browser | Neo4j-specific | Built-in, Cypher integration |
Interactive Visualization with Pyvis
from pyvis.network import Network
from rdflib import Graph
def visualize_kg(rdf_graph, output_file="knowledge_graph.html"):
"""Create interactive visualization from RDF graph."""
net = Network(height="750px", width="100%", directed=True)
# Configure physics
net.set_options("""
{
"physics": {
"forceAtlas2Based": {
"gravitationalConstant": -50,
"springLength": 100
},
"solver": "forceAtlas2Based"
}
}
""")
# Extract nodes and edges
nodes = set()
for s, p, o in rdf_graph:
nodes.add(str(s))
if not o.startswith("http://www.w3.org"): # Skip type assertions
nodes.add(str(o))
# Add nodes with colors based on type
for node in nodes:
color = get_node_color(node, rdf_graph)
label = node.split("/")[-1].split("#")[-1]
net.add_node(node, label=label, color=color)
# Add edges
for s, p, o in rdf_graph:
pred_label = str(p).split("/")[-1].split("#")[-1]
if str(o) in nodes:
net.add_edge(str(s), str(o), label=pred_label)
net.save_graph(output_file)
return output_file
def get_node_color(node, graph):
"""Assign color based on node type."""
type_colors = {
"Person": "#FF6B6B",
"Organization": "#4ECDC4",
"Location": "#45B7D1",
"Event": "#96CEB4",
"Concept": "#FFEAA7"
}
# Query for type
for s, p, o in graph.triples((URIRef(node), RDF.type, None)):
type_name = str(o).split("/")[-1].split("#")[-1]
if type_name in type_colors:
return type_colors[type_name]
return "#95A5A6" # Default grayD3.js Force-Directed Graph
// Frontend JavaScript for D3 visualization
function createKnowledgeGraph(data, container) {
const width = 960;
const height = 600;
const svg = d3.select(container)
.append("svg")
.attr("width", width)
.attr("height", height);
const simulation = d3.forceSimulation(data.nodes)
.force("link", d3.forceLink(data.links).id(d => d.id))
.force("charge", d3.forceManyBody().strength(-300))
.force("center", d3.forceCenter(width / 2, height / 2));
// Create links
const link = svg.append("g")
.selectAll("line")
.data(data.links)
.enter().append("line")
.attr("stroke", "#999")
.attr("stroke-width", 2);
// Create link labels
const linkLabel = svg.append("g")
.selectAll("text")
.data(data.links)
.enter().append("text")
.text(d => d.label)
.attr("font-size", "10px");
// Create nodes
const node = svg.append("g")
.selectAll("circle")
.data(data.nodes)
.enter().append("circle")
.attr("r", 10)
.attr("fill", d => d.color)
.call(drag(simulation));
// Node labels
const label = svg.append("g")
.selectAll("text")
.data(data.nodes)
.enter().append("text")
.text(d => d.label)
.attr("font-size", "12px");
simulation.on("tick", () => {
link
.attr("x1", d => d.source.x)
.attr("y1", d => d.source.y)
.attr("x2", d => d.target.x)
.attr("y2", d => d.target.y);
node
.attr("cx", d => d.x)
.attr("cy", d => d.y);
label
.attr("x", d => d.x + 12)
.attr("y", d => d.y + 4);
});
}2. FastAPI for Knowledge Graph Services
Basic API Structure
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import List, Optional
from rdflib import Graph, Namespace
app = FastAPI(
title="Knowledge Graph API",
description="API for querying and manipulating knowledge graphs",
version="1.0.0"
)
# Knowledge Graph instance
kg = Graph()
kg.parse("knowledge_graph.ttl", format="turtle")
class Triple(BaseModel):
subject: str
predicate: str
object: str
class QueryResult(BaseModel):
bindings: List[dict]
count: int
@app.get("/")
async def root():
return {"message": "Knowledge Graph API", "triples": len(kg)}CRUD Endpoints
@app.get("/entities/{entity_id}")
async def get_entity(entity_id: str):
"""Get all triples for an entity."""
EX = Namespace("http://example.org/")
entity_uri = EX[entity_id]
triples = []
for s, p, o in kg.triples((entity_uri, None, None)):
triples.append({
"subject": str(s),
"predicate": str(p),
"object": str(o)
})
if not triples:
raise HTTPException(status_code=404, detail="Entity not found")
return {"entity": entity_id, "triples": triples}
@app.post("/entities")
async def create_entity(triple: Triple):
"""Add a new triple to the knowledge graph."""
from rdflib import URIRef, Literal
subject = URIRef(triple.subject)
predicate = URIRef(triple.predicate)
# Determine if object is URI or literal
if triple.object.startswith("http"):
obj = URIRef(triple.object)
else:
obj = Literal(triple.object)
kg.add((subject, predicate, obj))
return {"message": "Triple added", "triple": triple}
@app.delete("/entities/{entity_id}")
async def delete_entity(entity_id: str):
"""Remove all triples for an entity."""
EX = Namespace("http://example.org/")
entity_uri = EX[entity_id]
count = 0
for triple in list(kg.triples((entity_uri, None, None))):
kg.remove(triple)
count += 1
return {"message": f"Removed {count} triples"}SPARQL Endpoint
@app.post("/sparql")
async def execute_sparql(query: str):
"""Execute a SPARQL query."""
try:
results = kg.query(query)
bindings = []
for row in results:
binding = {}
for var in results.vars:
value = getattr(row, str(var), None)
binding[str(var)] = str(value) if value else None
bindings.append(binding)
return QueryResult(bindings=bindings, count=len(bindings))
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/search")
async def search_entities(
q: str = Query(..., description="Search query"),
limit: int = Query(10, le=100)
):
"""Full-text search across entity labels."""
query = f"""
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT ?entity ?label
WHERE {{
?entity rdfs:label ?label .
FILTER(CONTAINS(LCASE(?label), LCASE("{q}")))
}}
LIMIT {limit}
"""
results = kg.query(query)
return [{"entity": str(row.entity), "label": str(row.label)}
for row in results]GraphRAG API
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
class QuestionRequest(BaseModel):
question: str
max_hops: int = 2
class AnswerResponse(BaseModel):
answer: str
sources: List[str]
confidence: float
@app.post("/ask", response_model=AnswerResponse)
async def ask_question(request: QuestionRequest):
"""Answer questions using GraphRAG."""
# Extract entities from question
entities = extract_entities(request.question)
# Get relevant subgraph
subgraph = get_subgraph(kg, entities, request.max_hops)
context = subgraph_to_text(subgraph)
# Generate answer
prompt = f"""
Context from knowledge graph:
{context}
Question: {request.question}
Answer based on the context. If the answer is not in the context, say so.
"""
response = llm.invoke(prompt)
return AnswerResponse(
answer=response.content,
sources=entities,
confidence=0.85
)3. Authentication and Authorization
API Security
from fastapi import Depends, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
"""Verify JWT token."""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/entities", dependencies=[Depends(verify_token)])
async def create_entity_secured(triple: Triple):
"""Secured endpoint for adding triples."""
# Only authenticated users can add data
pass
# Role-based access
async def require_admin(payload: dict = Depends(verify_token)):
if payload.get("role") != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return payload
@app.delete("/entities/{entity_id}", dependencies=[Depends(require_admin)])
async def delete_entity_admin(entity_id: str):
"""Only admins can delete entities."""
pass4. Performance Optimization
Caching Strategies
from functools import lru_cache
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis
# Initialize Redis cache
@app.on_event("startup")
async def startup():
redis_client = redis.Redis(host="localhost", port=6379, db=0)
FastAPICache.init(RedisBackend(redis_client), prefix="kg-cache")
@app.get("/entities/{entity_id}")
@cache(expire=3600) # Cache for 1 hour
async def get_entity_cached(entity_id: str):
"""Cached entity lookup."""
return get_entity(entity_id)
# Query result caching
@lru_cache(maxsize=1000)
def cached_sparql_query(query_hash: str):
"""Cache SPARQL query results."""
query = decode_query(query_hash)
return execute_query(query)Query Optimization
class QueryOptimizer:
def __init__(self, kg):
self.kg = kg
self.statistics = self._compute_statistics()
def _compute_statistics(self):
"""Compute predicate selectivity statistics."""
stats = {}
for p in set(self.kg.predicates()):
count = len(list(self.kg.triples((None, p, None))))
stats[str(p)] = count
return stats
def optimize_query(self, query):
"""Reorder query patterns for better performance."""
# Parse query patterns
patterns = parse_patterns(query)
# Sort by selectivity (most selective first)
sorted_patterns = sorted(
patterns,
key=lambda p: self.statistics.get(p['predicate'], float('inf'))
)
return reconstruct_query(sorted_patterns)
# Materialized views for common queries
class MaterializedView:
def __init__(self, kg, query, refresh_interval=3600):
self.kg = kg
self.query = query
self.cache = None
self.refresh_interval = refresh_interval
self.last_refresh = 0
def get(self):
"""Get cached results, refreshing if stale."""
import time
now = time.time()
if self.cache is None or (now - self.last_refresh) > self.refresh_interval:
self.cache = list(self.kg.query(self.query))
self.last_refresh = now
return self.cacheIndexing Strategies
# For Neo4j
def create_indexes(graph):
"""Create indexes for common query patterns."""
indexes = [
"CREATE INDEX entity_name FOR (n:Entity) ON (n.name)",
"CREATE INDEX entity_type FOR (n:Entity) ON (n.type)",
"CREATE FULLTEXT INDEX entity_search FOR (n:Entity) ON EACH [n.name, n.description]"
]
for index in indexes:
try:
graph.run(index)
except Exception as e:
print(f"Index may already exist: {e}")
# For RDF with rdflib + Sleepycat
def create_persistent_store():
"""Create indexed persistent store."""
from rdflib import Graph
from rdflib.store import Store
store = plugin.get("Sleepycat", Store)()
store.open("kg_store", create=True)
g = Graph(store)
return g5. Monitoring and Observability
Logging and Metrics
import logging
from prometheus_client import Counter, Histogram
from fastapi import Request
import time
# Metrics
REQUEST_COUNT = Counter(
"kg_requests_total",
"Total API requests",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"kg_request_duration_seconds",
"Request latency",
["endpoint"]
)
QUERY_COUNT = Counter(
"kg_queries_total",
"Total SPARQL queries",
["query_type"]
)
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("kg-api")
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""Log all requests with timing."""
start = time.time()
response = await call_next(request)
duration = time.time() - start
logger.info(
f"{request.method} {request.url.path} - "
f"Status: {response.status_code} - "
f"Duration: {duration:.3f}s"
)
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(endpoint=request.url.path).observe(duration)
return responseHealth Checks
@app.get("/health")
async def health_check():
"""Comprehensive health check."""
checks = {
"api": "healthy",
"knowledge_graph": check_kg_health(),
"cache": check_cache_health(),
"database": check_db_health()
}
all_healthy = all(v == "healthy" for v in checks.values())
return {
"status": "healthy" if all_healthy else "degraded",
"checks": checks
}
def check_kg_health():
"""Verify knowledge graph is accessible."""
try:
count = len(kg)
return "healthy" if count > 0 else "empty"
except Exception:
return "unhealthy"6. Deployment Architecture
Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- NEO4J_URI=bolt://neo4j:7687
- REDIS_URL=redis://redis:6379
depends_on:
- neo4j
- redis
neo4j:
image: neo4j:5.12
ports:
- "7474:7474"
- "7687:7687"
volumes:
- neo4j_data:/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
neo4j_data:Project: Movie Recommendation Knowledge Graph
Project Complete!
| Week | Topic | Project Milestone |
|---|---|---|
| 1 | Ontology Introduction | ✅ Movie domain design completed |
| 2 | RDF & RDFS | ✅ 10 movies converted to RDF |
| 3 | OWL & Reasoning | ✅ Inference rules applied |
| 4 | Knowledge Extraction | ✅ 100 movies auto-collected |
| 5 | Neo4j | ✅ Graph DB constructed |
| 6 | GraphRAG | ✅ Natural language query system completed |
| 7 | Ontology Agents | ✅ Auto-update agent completed |
| 8 | Domain Expansion | ✅ Medical domain expansion completed |
| 9 | Service Deployment | Production deployment |
Week 9 Milestone: Movie Recommendation Service Deployment
Deploy the movie knowledge graph built over 9 weeks as a production service.
Service Architecture:
┌─────────────────────────────────────────────┐
│ Frontend │
│ (React Dashboard + Chat Interface) │
└─────────────────┬───────────────────────────┘
│ REST API
┌─────────────────▼───────────────────────────┐
│ FastAPI Server │
│ /recommend /search /chat /update │
└──────┬──────────────┬───────────────────────┘
│ │
┌──────▼──────┐ ┌─────▼─────┐
│ Neo4j │ │ LLM │
│ (Graph DB) │ │ (OpenAI) │
└─────────────┘ └───────────┘API Endpoints:
@app.get("/recommend/{movie_title}")
# Similar movie recommendations
@app.post("/chat")
# Natural language queries (GraphRAG)
@app.post("/update")
# Add new movies (triggers agent)
@app.get("/graph/stats")
# Knowledge graph statisticsDeployment Checklist:
- Neo4j Aura cloud setup
- FastAPI server Dockerization
- Vercel/Railway deployment
- API key environment variables
- Monitoring (Sentry)
Congratulations! Your 9-week project is complete.
In the project notebook, you will implement:
- Build /recommend, /chat, /update APIs with FastAPI
- Knowledge graph visualization dashboard with pyvis
- Connect to Neo4j Aura cloud
- One-click deployment with Docker Compose
Complete system: An AI agent that answers "Recommend sci-fi movies like Nolan's style" by reasoning over director-genre-rating relationships in the knowledge graph
Practice Notebook
For deeper exploration of the theory:
The practice notebook covers additional topics:
- Kubernetes deployment configuration
- Load testing (Locust) and performance tuning
- Prometheus + Grafana monitoring
- CI/CD pipeline setup
Interview Questions
How would you scale a knowledge graph service for high traffic?
Scaling Strategies:
- Read replicas: Distribute read queries across multiple instances
- Caching layers: Redis/Memcached for frequent queries
- Query optimization: Indexes, materialized views, query rewriting
- Load balancing: Distribute requests across API instances
- Async processing: Background jobs for heavy computations
- Graph partitioning: Shard large graphs across nodes
Premium Content
Want complete solutions with detailed explanations and production-ready code?
Check out the Ontology & Knowledge Graph Cookbook Premium (opens in a new tab) for:
- Complete notebook solutions with step-by-step explanations
- Real-world case studies and best practices
- Interview preparation materials
- Production deployment guides
Congratulations!
You have completed the Ontology & Knowledge Graph Cookbook! You now have a solid foundation in:
- Ontology engineering and knowledge representation
- RDF, RDFS, OWL, and semantic web technologies
- Knowledge extraction from unstructured text
- Graph databases and Neo4j
- GraphRAG for enhanced question answering
- Ontology-based AI agents
- Domain-specific knowledge graphs
- Production service architecture
Continue your learning journey by exploring the Premium content (opens in a new tab) for advanced topics and real-world projects.