2026-01-19 02:16:19 +02:00

1130 lines
40 KiB
Python

import os
import uuid
import shutil
import zipfile
import tarfile
import subprocess
import tempfile
import yaml
import asyncio
from pathlib import Path
from typing import Optional, List, Dict, Any
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel, Field
# Configuration
WORKSPACE_BASE_PATH = Path(os.getenv("WORKSPACE_BASE_PATH", "/tmp/helmview_workspaces"))
MAX_UPLOAD_SIZE = int(os.getenv("MAX_UPLOAD_SIZE", 104857600)) # 100MB
HELM_TIMEOUT = int(os.getenv("HELM_TIMEOUT", 60))
WORKSPACE_BASE_PATH.mkdir(parents=True, exist_ok=True)
app = FastAPI(title="HelmView API", version="1.0.0")
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# MODELS
# ============================================================================
class ProjectCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = None
class RenderOptions(BaseModel):
release_name: str = Field(default="release", min_length=1)
namespace: str = Field(default="default", min_length=1)
values_override: Optional[str] = None # YAML string
set_flags: Optional[List[str]] = None # e.g., ["key1=value1", "key2=value2"]
set_string_flags: Optional[List[str]] = None
class ResourceUpdate(BaseModel):
yaml_content: str
class Project(BaseModel):
id: str
name: str
description: Optional[str]
created_at: str
chart_uploaded: bool
chart_name: Optional[str]
class Resource(BaseModel):
uid: str
apiVersion: str
kind: str
name: str
namespace: Optional[str]
labels: Optional[Dict[str, str]]
rawYaml: str
modified: bool = False
class RenderResult(BaseModel):
success: bool
resources: List[Resource] = []
lint_output: str = ""
lint_errors: List[str] = []
template_output: str = ""
template_errors: List[str] = []
class ExportResult(BaseModel):
success: bool
errors: List[str] = []
chart_path: Optional[str] = None
lint_output: Optional[str] = None
version: Optional[str] = None
# ============================================================================
# STORAGE (In-memory for now, can be replaced with DB)
# ============================================================================
projects_db: Dict[str, Dict[str, Any]] = {}
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def get_project_path(project_id: str) -> Path:
"""Get the filesystem path for a project."""
return WORKSPACE_BASE_PATH / project_id
def get_chart_path(project_id: str) -> Path:
"""Get the path to the extracted chart directory."""
return get_project_path(project_id) / "chart"
def safe_extract_archive(file_path: Path, extract_to: Path) -> Path:
"""
Safely extract tar.gz or zip archive, protecting against zip-slip.
Returns the path to the chart directory.
"""
extract_to.mkdir(parents=True, exist_ok=True)
# Determine file type and extract
if file_path.suffix == ".tgz" or file_path.name.endswith(".tar.gz"):
with tarfile.open(file_path, "r:gz") as tar:
# Check for zip-slip
for member in tar.getmembers():
member_path = extract_to / member.name
if not member_path.resolve().is_relative_to(extract_to.resolve()):
raise HTTPException(400, "Archive contains invalid paths")
tar.extractall(extract_to)
elif file_path.suffix == ".zip":
with zipfile.ZipFile(file_path, "r") as zip_ref:
# Check for zip-slip
for name in zip_ref.namelist():
member_path = extract_to / name
if not member_path.resolve().is_relative_to(extract_to.resolve()):
raise HTTPException(400, "Archive contains invalid paths")
zip_ref.extractall(extract_to)
else:
raise HTTPException(400, "Unsupported archive format. Use .tgz or .zip")
# Find the chart directory (search recursively for Chart.yaml)
# First check root
if (extract_to / "Chart.yaml").exists():
return extract_to
# Then check immediate subdirectories (common for packaged charts)
chart_dirs = list(extract_to.glob("*/Chart.yaml"))
if chart_dirs:
return chart_dirs[0].parent
# Search deeper (up to 3 levels) for GitHub zip downloads
chart_dirs = list(extract_to.glob("**/Chart.yaml"))
if chart_dirs:
# Return the first (shallowest) match
return sorted(chart_dirs, key=lambda p: len(p.parts))[0].parent
raise HTTPException(400, "No Chart.yaml found in archive")
async def run_helm_command(cmd: List[str], cwd: Path, timeout: int = HELM_TIMEOUT) -> tuple[str, str, int]:
"""
Run a Helm command safely with timeout.
Returns (stdout, stderr, returncode).
"""
try:
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=str(cwd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
return stdout.decode(), stderr.decode(), process.returncode
except asyncio.TimeoutError:
try:
process.kill()
except:
pass
raise HTTPException(408, f"Helm command timed out after {timeout}s")
except Exception as e:
raise HTTPException(500, f"Failed to execute Helm command: {str(e)}")
def parse_kubernetes_resources(yaml_content: str) -> List[Resource]:
"""
Parse multi-document YAML from helm template output.
Returns a list of Resource objects.
"""
resources = []
print(f"\n{'='*60}")
print(f"PARSING HELM TEMPLATE OUTPUT")
print(f"{'='*60}")
print(f"Total output length: {len(yaml_content)} characters")
# Split by document separator (handle both --- and ---\n)
documents = []
for doc in yaml_content.split("\n---"):
doc = doc.strip()
if doc:
documents.append(doc)
# Also try splitting by ---\n if we didn't get many docs
if len(documents) <= 1:
for doc in yaml_content.split("---"):
doc = doc.strip()
if doc:
documents.append(doc)
print(f"Split into {len(documents)} documents")
print(f"{'-'*60}")
for idx, doc in enumerate(documents):
doc = doc.strip()
print(f"\nDocument {idx + 1}/{len(documents)}:")
print(f" Length: {len(doc)} chars")
print(f" First 80 chars: {doc[:80].replace(chr(10), ' ')}")
# Skip empty docs or pure comments
if not doc:
print(f" → SKIPPED: Empty document")
continue
# Remove leading comments but keep the YAML
lines = doc.split('\n')
yaml_lines = []
for line in lines:
stripped = line.strip()
# Skip comment-only lines at the start
if not yaml_lines and (stripped.startswith('#') or not stripped):
continue
yaml_lines.append(line)
doc = '\n'.join(yaml_lines)
if not doc:
print(f" → SKIPPED: Only comments")
continue
try:
obj = yaml.safe_load(doc)
if not obj or not isinstance(obj, dict):
print(f" → SKIPPED: Not a dict (type={type(obj).__name__})")
continue
# Skip if not a Kubernetes resource
if "apiVersion" not in obj or "kind" not in obj:
print(f" → SKIPPED: Missing apiVersion or kind (keys={list(obj.keys())[:5]})")
continue
metadata = obj.get("metadata", {})
name = metadata.get("name", "unknown")
namespace = metadata.get("namespace")
labels = metadata.get("labels", {})
# Generate UID
uid = f"{obj['kind']}-{name}-{str(uuid.uuid4())[:8]}"
print(f" → ✓ ADDED: {obj['kind']}/{name}" + (f" (ns: {namespace})" if namespace else ""))
resource = Resource(
uid=uid,
apiVersion=obj["apiVersion"],
kind=obj["kind"],
name=name,
namespace=namespace,
labels=labels,
rawYaml=doc
)
resources.append(resource)
except yaml.YAMLError as e:
# Log but continue
print(f" → ERROR: YAML parse failed: {str(e)}")
continue
except Exception as e:
print(f" → ERROR: Processing failed: {str(e)}")
continue
print(f"\n{'='*60}")
print(f"PARSING COMPLETE: {len(resources)} resources extracted")
print(f"{'='*60}\n")
return resources
def update_chart_version(chart_yaml_path: Path) -> str:
"""
Bump the patch version in Chart.yaml.
Returns the new version.
"""
with open(chart_yaml_path, "r") as f:
chart = yaml.safe_load(f)
version = chart.get("version", "0.1.0")
parts = version.split(".")
# Increment patch version
if len(parts) >= 3:
parts[2] = str(int(parts[2]) + 1)
else:
parts = ["0", "1", "1"]
new_version = ".".join(parts)
chart["version"] = new_version
with open(chart_yaml_path, "w") as f:
yaml.safe_dump(chart, f)
return new_version
# ============================================================================
# API ENDPOINTS
# ============================================================================
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.post("/api/projects", response_model=Project)
async def create_project(project: ProjectCreate):
"""Create a new project."""
project_id = str(uuid.uuid4())
project_path = get_project_path(project_id)
project_path.mkdir(parents=True, exist_ok=True)
project_data = {
"id": project_id,
"name": project.name,
"description": project.description,
"created_at": datetime.utcnow().isoformat(),
"chart_uploaded": False,
"chart_name": None,
"resources": {},
"modified_resources": {}
}
projects_db[project_id] = project_data
return Project(**project_data)
@app.get("/api/projects")
async def list_projects():
"""List all projects."""
return [Project(**proj) for proj in projects_db.values()]
@app.get("/api/projects/{project_id}", response_model=Project)
async def get_project(project_id: str):
"""Get project details."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
return Project(**projects_db[project_id])
@app.post("/api/projects/{project_id}/upload")
async def upload_chart(project_id: str, file: UploadFile = File(...)):
"""Upload and extract a Helm chart."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
# Check file size
file.file.seek(0, 2)
size = file.file.tell()
file.file.seek(0)
if size > MAX_UPLOAD_SIZE:
raise HTTPException(413, f"File too large. Max size: {MAX_UPLOAD_SIZE} bytes")
project_path = get_project_path(project_id)
# Save uploaded file with original extension
file_extension = Path(file.filename).suffix or ".tgz"
upload_path = project_path / f"uploaded_chart{file_extension}"
upload_path.parent.mkdir(parents=True, exist_ok=True)
with open(upload_path, "wb") as f:
content = await file.read()
f.write(content)
# Extract archive
try:
chart_path = safe_extract_archive(upload_path, project_path)
# Move extracted chart to standard location
final_chart_path = get_chart_path(project_id)
if final_chart_path.exists():
shutil.rmtree(final_chart_path)
shutil.move(str(chart_path), str(final_chart_path))
# Read Chart.yaml to get chart name
chart_yaml_path = final_chart_path / "Chart.yaml"
chart_name = "unknown"
if chart_yaml_path.exists():
try:
with open(chart_yaml_path, "r", encoding="utf-8") as f:
chart_yaml = yaml.safe_load(f)
if chart_yaml and isinstance(chart_yaml, dict):
chart_name = chart_yaml.get("name", "unknown")
except Exception as e:
# If Chart.yaml parsing fails, continue with "unknown"
chart_name = "unknown"
# Update project
projects_db[project_id]["chart_uploaded"] = True
projects_db[project_id]["chart_name"] = chart_name
return {
"success": True,
"message": "Chart uploaded and extracted successfully",
"chart_name": chart_name
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(400, f"Failed to extract chart: {str(e)}")
finally:
# Clean up uploaded file
if upload_path.exists():
upload_path.unlink()
class GitUploadRequest(BaseModel):
git_url: str = Field(..., min_length=1)
chart_path: Optional[str] = None
@app.post("/api/projects/{project_id}/upload-git")
async def upload_chart_from_git(project_id: str, request: GitUploadRequest):
"""Clone a Git repository and extract Helm chart."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
project_path = get_project_path(project_id)
temp_clone_dir = project_path / "temp_git_clone"
try:
# Clean up any existing temp directory
if temp_clone_dir.exists():
shutil.rmtree(temp_clone_dir)
temp_clone_dir.mkdir(parents=True, exist_ok=True)
# Clone the repository (shallow clone for speed)
clone_command = [
"git", "clone",
"--depth", "1",
request.git_url,
str(temp_clone_dir)
]
result = subprocess.run(
clone_command,
capture_output=True,
text=True,
timeout=120 # 2 minute timeout for git clone
)
if result.returncode != 0:
raise HTTPException(400, f"Git clone failed: {result.stderr}")
# Find chart directory
if request.chart_path:
chart_source = temp_clone_dir / request.chart_path
if not chart_source.exists():
raise HTTPException(400, f"Chart path '{request.chart_path}' not found in repository")
else:
# Look for Chart.yaml in root or common locations
possible_locations = [
temp_clone_dir,
temp_clone_dir / "chart",
temp_clone_dir / "charts",
]
# Also search recursively for Chart.yaml
chart_yamls = list(temp_clone_dir.glob("**/Chart.yaml"))
chart_source = None
for location in possible_locations:
if (location / "Chart.yaml").exists():
chart_source = location
break
if not chart_source and chart_yamls:
# Use the first found Chart.yaml location
chart_source = chart_yamls[0].parent
if not chart_source:
raise HTTPException(400, "No Chart.yaml found in repository. Please specify chart_path.")
# Validate it's a helm chart
if not (chart_source / "Chart.yaml").exists():
raise HTTPException(400, "Invalid Helm chart: Chart.yaml not found")
# Copy to final location
final_chart_path = get_chart_path(project_id)
if final_chart_path.exists():
shutil.rmtree(final_chart_path)
shutil.copytree(chart_source, final_chart_path)
# Read Chart.yaml to get chart name
chart_yaml_path = final_chart_path / "Chart.yaml"
chart_name = "unknown"
try:
with open(chart_yaml_path, "r", encoding="utf-8") as f:
chart_yaml = yaml.safe_load(f)
if chart_yaml and isinstance(chart_yaml, dict):
chart_name = chart_yaml.get("name", "unknown")
except Exception:
chart_name = "unknown"
# Update project
projects_db[project_id]["chart_uploaded"] = True
projects_db[project_id]["chart_name"] = chart_name
return {
"success": True,
"message": "Chart downloaded from Git successfully",
"chart_name": chart_name,
"chart_path": str(chart_source.relative_to(temp_clone_dir))
}
except subprocess.TimeoutExpired:
raise HTTPException(408, "Git clone timed out")
except HTTPException:
raise
except Exception as e:
raise HTTPException(400, f"Failed to download chart from Git: {str(e)}")
finally:
# Clean up temp clone directory
if temp_clone_dir.exists():
try:
shutil.rmtree(temp_clone_dir)
except Exception:
pass # Best effort cleanup
@app.get("/api/projects/{project_id}/chart-info")
async def get_chart_info(project_id: str):
"""Get Chart.yaml validation status."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
if not projects_db[project_id]["chart_uploaded"]:
raise HTTPException(400, "No chart uploaded for this project")
chart_path = get_chart_path(project_id)
chart_yaml_path = chart_path / "Chart.yaml"
result = {
"valid": False,
"errors": []
}
# Validate Chart.yaml
if chart_yaml_path.exists():
try:
with open(chart_yaml_path, "r", encoding="utf-8") as f:
content = f.read()
# Check if file is empty
if not content.strip():
result["errors"].append("⚠ Chart.yaml file is EMPTY")
result["errors"].append("The file exists but has no content")
return result
# Try to parse YAML
chart_yaml = yaml.safe_load(content)
if not chart_yaml or not isinstance(chart_yaml, dict):
result["errors"].append("⚠ Chart.yaml has invalid YAML format")
result["errors"].append(f"File contains: {content[:200]}")
return result
# Check required fields
if not chart_yaml.get("name"):
result["errors"].append("⚠ Missing REQUIRED: 'name' field in Chart.yaml")
if not chart_yaml.get("apiVersion"):
result["errors"].append("⚠ Missing REQUIRED: 'apiVersion' field (must be 'v1' or 'v2')")
elif chart_yaml.get("apiVersion") not in ["v1", "v2"]:
result["errors"].append("⚠ Invalid apiVersion - must be 'v1' or 'v2'")
if not chart_yaml.get("version"):
result["errors"].append("⚠ Missing REQUIRED: 'version' field (e.g., '0.1.0')")
if len(result["errors"]) == 0:
result["valid"] = True
except yaml.YAMLError as e:
result["errors"].append(f"⚠ YAML syntax error: {str(e)}")
except Exception as e:
result["errors"].append(f"Failed to read Chart.yaml: {str(e)}")
else:
result["errors"].append("Chart.yaml file not found")
return result
@app.post("/api/projects/{project_id}/fix-chart-yaml")
async def fix_chart_yaml(project_id: str):
"""Attempt to fix or create a valid Chart.yaml file."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
if not projects_db[project_id]["chart_uploaded"]:
raise HTTPException(400, "No chart uploaded for this project")
chart_path = get_chart_path(project_id)
chart_yaml_path = chart_path / "Chart.yaml"
# Try to read existing Chart.yaml
existing_name = "my-chart"
if chart_yaml_path.exists():
try:
with open(chart_yaml_path, "r", encoding="utf-8") as f:
content = f.read()
if content.strip():
existing = yaml.safe_load(content)
if existing and isinstance(existing, dict):
existing_name = existing.get("name", "my-chart")
except:
pass
# Create a valid Chart.yaml
default_chart = {
"apiVersion": "v2",
"name": existing_name,
"description": "A Helm chart for Kubernetes",
"type": "application",
"version": "0.1.0",
"appVersion": "1.0.0"
}
# Write the fixed Chart.yaml
with open(chart_yaml_path, "w", encoding="utf-8") as f:
yaml.safe_dump(default_chart, f, default_flow_style=False, sort_keys=False)
# Update project
projects_db[project_id]["chart_name"] = existing_name
return {
"success": True,
"message": "Chart.yaml has been fixed with default values",
"chart_yaml": default_chart
}
@app.get("/api/projects/{project_id}/values")
async def get_chart_values(project_id: str):
"""Get the default values.yaml from the chart."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
if not projects_db[project_id]["chart_uploaded"]:
raise HTTPException(400, "No chart uploaded for this project")
chart_path = get_chart_path(project_id)
values_path = chart_path / "values.yaml"
if not values_path.exists():
return {"values": "# No values.yaml found in chart\n# Add your custom values below:\n\nreplicaCount: 1\n\nimage:\n repository: nginx\n tag: latest\n pullPolicy: IfNotPresent\n"}
try:
with open(values_path, "r", encoding="utf-8") as f:
values_content = f.read().strip()
# If values.yaml is empty, provide a template
if not values_content:
values_content = "# Values file is empty\n# Add your custom values below:\n\nreplicaCount: 1\n"
# Add helpful header
header = """# Default values for this Helm chart
# Edit the values below to customize your deployment
# Lines marked with ##OPTIONAL can be removed or commented out
# TIP: Many charts have features disabled by default (like ingress, autoscaling)
# Look for 'enabled: false' and change to 'enabled: true' to enable them
"""
# Add ##OPTIONAL marker and enable suggestions
lines = values_content.split('\n')
enhanced_lines = []
optional_keywords = [
'ingress:', 'enabled:', 'annotations:', 'labels:',
'resources:', 'limits:', 'requests:', 'nodeSelector:',
'tolerations:', 'affinity:', 'podAnnotations:', 'podLabels:',
'autoscaling:', 'serviceAccount:', 'securityContext:',
'livenessProbe:', 'readinessProbe:', 'volumeMounts:', 'volumes:'
]
for i, line in enumerate(lines):
stripped = line.lstrip()
# Suggest enabling disabled features
if 'enabled: false' in line.lower() or 'enabled:false' in line.lower():
enhanced_lines.append(f"{line} ##OPTIONAL - Change to 'true' to enable")
# Check if this line starts a common optional section
elif any(stripped.startswith(kw) for kw in optional_keywords):
enhanced_lines.append(f"{line} ##OPTIONAL")
else:
enhanced_lines.append(line)
return {"values": header + '\n'.join(enhanced_lines)}
except Exception as e:
raise HTTPException(500, f"Failed to read values.yaml: {str(e)}")
@app.get("/api/projects/{project_id}/templates")
async def get_chart_templates(project_id: str):
"""Get list of all template files in the chart."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
if not projects_db[project_id]["chart_uploaded"]:
raise HTTPException(400, "No chart uploaded for this project")
chart_path = get_chart_path(project_id)
templates_path = chart_path / "templates"
if not templates_path.exists():
return {"templates": [], "count": 0}
# Find all YAML and tpl files recursively
templates = []
for ext in ["*.yaml", "*.yml", "*.tpl"]:
for template_file in templates_path.rglob(ext):
if template_file.is_file():
rel_path = template_file.relative_to(chart_path)
templates.append({
"path": str(rel_path),
"name": template_file.name,
"size": template_file.stat().st_size
})
return {"templates": templates, "count": len(templates)}
@app.post("/api/projects/{project_id}/render", response_model=RenderResult)
async def render_chart(project_id: str, options: RenderOptions):
"""Render Helm chart with helm lint and helm template."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
if not projects_db[project_id]["chart_uploaded"]:
raise HTTPException(400, "No chart uploaded for this project")
chart_path = get_chart_path(project_id)
if not chart_path.exists():
raise HTTPException(400, "Chart directory not found")
result = RenderResult(success=True)
# Save override values if provided
values_override_path = None
if options.values_override:
values_override_path = chart_path / "values-override.yaml"
with open(values_override_path, "w") as f:
f.write(options.values_override)
# Run helm lint
lint_cmd = ["helm", "lint", str(chart_path)]
lint_stdout, lint_stderr, lint_code = await run_helm_command(lint_cmd, chart_path)
result.lint_output = lint_stdout + "\n" + lint_stderr
if lint_code != 0:
result.lint_errors = lint_stderr.split("\n")
# Run helm template
template_cmd = [
"helm", "template",
options.release_name,
str(chart_path),
"--namespace", options.namespace,
"--debug" # Add debug flag for more output
]
if values_override_path:
template_cmd.extend(["-f", str(values_override_path)])
if options.set_flags:
for flag in options.set_flags:
template_cmd.extend(["--set", flag])
if options.set_string_flags:
for flag in options.set_string_flags:
template_cmd.extend(["--set-string", flag])
template_stdout, template_stderr, template_code = await run_helm_command(template_cmd, chart_path)
result.template_output = template_stdout
if template_code != 0:
result.success = False
result.template_errors = template_stderr.split("\n")
return result
# Parse resources
resources = parse_kubernetes_resources(template_stdout)
result.resources = resources
# Store template output for debugging
projects_db[project_id]["last_template_output"] = template_stdout
projects_db[project_id]["last_template_stderr"] = template_stderr
# Store resources in project
projects_db[project_id]["resources"] = {r.uid: r.dict() for r in resources}
# Clean up override values
if values_override_path and values_override_path.exists():
values_override_path.unlink()
return result
@app.get("/api/projects/{project_id}/debug-output")
async def get_debug_output(project_id: str):
"""Get the raw helm template output for debugging."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
output = projects_db[project_id].get("last_template_output", "")
stderr = projects_db[project_id].get("last_template_stderr", "")
# Count documents in output
doc_count = len([d for d in output.split("---") if d.strip()])
return {
"raw_output": output,
"stderr": stderr,
"document_count": doc_count,
"output_length": len(output)
}
@app.get("/api/projects/{project_id}/resources")
async def get_resources(project_id: str):
"""Get all resources for a project."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
resources = projects_db[project_id].get("resources", {})
return list(resources.values())
@app.put("/api/projects/{project_id}/resources/{resource_uid}")
async def update_resource(project_id: str, resource_uid: str, update: ResourceUpdate):
"""Update a resource's YAML content."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
resources = projects_db[project_id].get("resources", {})
if resource_uid not in resources:
raise HTTPException(404, "Resource not found")
# Validate YAML syntax
try:
yaml.safe_load(update.yaml_content)
except yaml.YAMLError as e:
raise HTTPException(400, f"Invalid YAML: {str(e)}")
# Update resource
resources[resource_uid]["rawYaml"] = update.yaml_content
resources[resource_uid]["modified"] = True
# Track modified resources
if "modified_resources" not in projects_db[project_id]:
projects_db[project_id]["modified_resources"] = {}
projects_db[project_id]["modified_resources"][resource_uid] = update.yaml_content
return {"success": True, "message": "Resource updated"}
@app.post("/api/projects/{project_id}/resources/{resource_uid}/template-diff")
async def get_template_diff(project_id: str, resource_uid: str, update: ResourceUpdate):
"""Generate diff showing how the template would change."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
resources = projects_db[project_id].get("resources", {})
if resource_uid not in resources:
raise HTTPException(404, "Resource not found")
resource = resources[resource_uid]
chart_path = get_chart_path(project_id)
templates_dir = chart_path / "templates"
if not templates_dir.exists():
raise HTTPException(404, "Templates directory not found")
# Find template file matching the resource kind
template_file = None
kind_lower = resource["kind"].lower()
# Try common naming patterns
for pattern in [f"{kind_lower}.yaml", f"{kind_lower}.yml"]:
potential_file = templates_dir / pattern
if potential_file.exists():
template_file = potential_file
break
# Search all templates for matching kind
if not template_file:
for tpl in templates_dir.glob("*.yaml"):
if tpl.stem.startswith('_'):
continue
try:
with open(tpl, "r", encoding="utf-8") as f:
content = f.read()
if f"kind: {resource['kind']}" in content:
template_file = tpl
break
except Exception:
continue
if not template_file:
raise HTTPException(404, f"Could not find template for {resource['kind']}")
# Read original template
with open(template_file, "r", encoding="utf-8") as f:
original_template = f.read()
return {
"template_file": str(template_file.relative_to(chart_path)),
"original_template": original_template,
"modified_template": update.yaml_content,
"additions": 0,
"deletions": 0,
}
@app.post("/api/projects/{project_id}/resources/{resource_uid}/update-template")
async def update_template(project_id: str, resource_uid: str, update: ResourceUpdate):
"""Update the template file with changes from edited resource."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
resources = projects_db[project_id].get("resources", {})
if resource_uid not in resources:
raise HTTPException(404, "Resource not found")
resource = resources[resource_uid]
chart_path = get_chart_path(project_id)
templates_dir = chart_path / "templates"
if not templates_dir.exists():
raise HTTPException(404, "Templates directory not found")
# Find template file
template_file = None
kind_lower = resource["kind"].lower()
for pattern in [f"{kind_lower}.yaml", f"{kind_lower}.yml"]:
potential_file = templates_dir / pattern
if potential_file.exists():
template_file = potential_file
break
if not template_file:
for tpl in templates_dir.glob("*.yaml"):
if tpl.stem.startswith('_'):
continue
try:
with open(tpl, "r", encoding="utf-8") as f:
content = f.read()
if f"kind: {resource['kind']}" in content:
template_file = tpl
break
except Exception:
continue
if not template_file:
raise HTTPException(404, f"Could not find template for {resource['kind']}")
# Write updated YAML to template file
with open(template_file, "w", encoding="utf-8") as f:
f.write(update.yaml_content)
return {
"success": True,
"message": f"Template {template_file.name} updated",
"template_file": str(template_file.relative_to(chart_path))
}
@app.post("/api/projects/{project_id}/export", response_model=ExportResult)
async def export_chart(project_id: str):
"""
Export the modified chart as a new .tgz package.
Runs helm lint and helm template to validate before export.
"""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
if not projects_db[project_id]["chart_uploaded"]:
raise HTTPException(400, "No chart uploaded for this project")
chart_path = get_chart_path(project_id)
if not chart_path.exists():
raise HTTPException(400, "Chart directory not found")
result = ExportResult(success=True)
# Create modified chart in temporary location
modified_chart_path = get_project_path(project_id) / "modified_chart"
if modified_chart_path.exists():
shutil.rmtree(modified_chart_path)
shutil.copytree(chart_path, modified_chart_path)
# Apply modifications (simplified strategy: create overlay templates)
modified_resources = projects_db[project_id].get("modified_resources", {})
if modified_resources:
overlay_path = modified_chart_path / "templates" / "_helmview_generated.yaml"
overlay_path.parent.mkdir(parents=True, exist_ok=True)
with open(overlay_path, "w") as f:
f.write("# HelmView Generated Overrides\n")
f.write("# These resources were modified in the HelmView editor\n\n")
for resource_yaml in modified_resources.values():
f.write("---\n")
f.write(resource_yaml)
f.write("\n")
# Update Chart.yaml version
chart_yaml_path = modified_chart_path / "Chart.yaml"
new_version = update_chart_version(chart_yaml_path)
result.version = new_version
# Run helm lint
lint_cmd = ["helm", "lint", str(modified_chart_path)]
lint_stdout, lint_stderr, lint_code = await run_helm_command(lint_cmd, modified_chart_path)
result.lint_output = lint_stdout + "\n" + lint_stderr
if lint_code != 0:
result.success = False
result.errors.append(f"Helm lint failed: {lint_stderr}")
return result
# Run helm template to validate
template_cmd = ["helm", "template", "test", str(modified_chart_path)]
template_stdout, template_stderr, template_code = await run_helm_command(template_cmd, modified_chart_path)
if template_code != 0:
result.success = False
result.errors.append(f"Helm template failed: {template_stderr}")
return result
# Package chart
package_cmd = ["helm", "package", str(modified_chart_path), "-d", str(get_project_path(project_id))]
package_stdout, package_stderr, package_code = await run_helm_command(package_cmd, modified_chart_path)
if package_code != 0:
result.success = False
result.errors.append(f"Helm package failed: {package_stderr}")
return result
# Find the packaged chart
chart_name = projects_db[project_id]["chart_name"]
package_pattern = f"{chart_name}-{new_version}.tgz"
package_path = get_project_path(project_id) / package_pattern
if not package_path.exists():
# Try to find any .tgz file
tgz_files = list(get_project_path(project_id).glob("*.tgz"))
if tgz_files:
package_path = tgz_files[0]
else:
result.success = False
result.errors.append("Packaged chart file not found")
return result
result.chart_path = str(package_path)
return result
@app.get("/api/projects/{project_id}/download")
async def download_chart(project_id: str):
"""Download the exported chart package."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
# Find the most recent .tgz file
project_path = get_project_path(project_id)
tgz_files = list(project_path.glob("*.tgz"))
if not tgz_files:
raise HTTPException(404, "No exported chart found. Run export first.")
# Get the most recent file
latest_file = max(tgz_files, key=lambda p: p.stat().st_mtime)
return FileResponse(
path=str(latest_file),
filename=latest_file.name,
media_type="application/gzip"
)
@app.delete("/api/projects/{project_id}")
async def delete_project(project_id: str):
"""Delete a project and its files."""
if project_id not in projects_db:
raise HTTPException(404, "Project not found")
# Delete filesystem
project_path = get_project_path(project_id)
if project_path.exists():
shutil.rmtree(project_path)
# Delete from DB
del projects_db[project_id]
return {"success": True, "message": "Project deleted"}
@app.get("/api/kubernetes-schemas/{kind}")
async def get_kubernetes_schema(kind: str):
"""
Get Kubernetes schema for a specific resource kind.
This is a simplified version. In production, use actual Kubernetes OpenAPI schemas.
"""
# This would ideally load from Kubernetes OpenAPI spec
# For now, return a basic structure
schemas = {
"Deployment": {
"apiVersion": ["apps/v1"],
"kind": "Deployment",
"metadata": {"name": "string", "namespace": "string", "labels": "object"},
"spec": {
"replicas": "integer",
"selector": "object",
"template": "object"
}
},
"Service": {
"apiVersion": ["v1"],
"kind": "Service",
"metadata": {"name": "string", "namespace": "string"},
"spec": {
"type": ["ClusterIP", "NodePort", "LoadBalancer"],
"ports": "array",
"selector": "object"
}
}
}
return schemas.get(kind, {})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)