import os import uuid import shutil import zipfile import tarfile import subprocess import tempfile import yaml import asyncio from pathlib import Path from typing import Optional, List, Dict, Any from datetime import datetime from fastapi import FastAPI, UploadFile, File, HTTPException, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel, Field # Configuration WORKSPACE_BASE_PATH = Path(os.getenv("WORKSPACE_BASE_PATH", "/tmp/helmview_workspaces")) MAX_UPLOAD_SIZE = int(os.getenv("MAX_UPLOAD_SIZE", 104857600)) # 100MB HELM_TIMEOUT = int(os.getenv("HELM_TIMEOUT", 60)) WORKSPACE_BASE_PATH.mkdir(parents=True, exist_ok=True) app = FastAPI(title="HelmView API", version="1.0.0") # CORS configuration app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================================ # MODELS # ============================================================================ class ProjectCreate(BaseModel): name: str = Field(..., min_length=1, max_length=100) description: Optional[str] = None class RenderOptions(BaseModel): release_name: str = Field(default="release", min_length=1) namespace: str = Field(default="default", min_length=1) values_override: Optional[str] = None # YAML string set_flags: Optional[List[str]] = None # e.g., ["key1=value1", "key2=value2"] set_string_flags: Optional[List[str]] = None class ResourceUpdate(BaseModel): yaml_content: str class Project(BaseModel): id: str name: str description: Optional[str] created_at: str chart_uploaded: bool chart_name: Optional[str] class Resource(BaseModel): uid: str apiVersion: str kind: str name: str namespace: Optional[str] labels: Optional[Dict[str, str]] rawYaml: str modified: bool = False class RenderResult(BaseModel): success: bool resources: List[Resource] = [] lint_output: str = "" lint_errors: List[str] = [] template_output: str = "" template_errors: List[str] = [] class ExportResult(BaseModel): success: bool errors: List[str] = [] chart_path: Optional[str] = None lint_output: Optional[str] = None version: Optional[str] = None # ============================================================================ # STORAGE (In-memory for now, can be replaced with DB) # ============================================================================ projects_db: Dict[str, Dict[str, Any]] = {} # ============================================================================ # HELPER FUNCTIONS # ============================================================================ def get_project_path(project_id: str) -> Path: """Get the filesystem path for a project.""" return WORKSPACE_BASE_PATH / project_id def get_chart_path(project_id: str) -> Path: """Get the path to the extracted chart directory.""" return get_project_path(project_id) / "chart" def safe_extract_archive(file_path: Path, extract_to: Path) -> Path: """ Safely extract tar.gz or zip archive, protecting against zip-slip. Returns the path to the chart directory. """ extract_to.mkdir(parents=True, exist_ok=True) # Determine file type and extract if file_path.suffix == ".tgz" or file_path.name.endswith(".tar.gz"): with tarfile.open(file_path, "r:gz") as tar: # Check for zip-slip for member in tar.getmembers(): member_path = extract_to / member.name if not member_path.resolve().is_relative_to(extract_to.resolve()): raise HTTPException(400, "Archive contains invalid paths") tar.extractall(extract_to) elif file_path.suffix == ".zip": with zipfile.ZipFile(file_path, "r") as zip_ref: # Check for zip-slip for name in zip_ref.namelist(): member_path = extract_to / name if not member_path.resolve().is_relative_to(extract_to.resolve()): raise HTTPException(400, "Archive contains invalid paths") zip_ref.extractall(extract_to) else: raise HTTPException(400, "Unsupported archive format. Use .tgz or .zip") # Find the chart directory (search recursively for Chart.yaml) # First check root if (extract_to / "Chart.yaml").exists(): return extract_to # Then check immediate subdirectories (common for packaged charts) chart_dirs = list(extract_to.glob("*/Chart.yaml")) if chart_dirs: return chart_dirs[0].parent # Search deeper (up to 3 levels) for GitHub zip downloads chart_dirs = list(extract_to.glob("**/Chart.yaml")) if chart_dirs: # Return the first (shallowest) match return sorted(chart_dirs, key=lambda p: len(p.parts))[0].parent raise HTTPException(400, "No Chart.yaml found in archive") async def run_helm_command(cmd: List[str], cwd: Path, timeout: int = HELM_TIMEOUT) -> tuple[str, str, int]: """ Run a Helm command safely with timeout. Returns (stdout, stderr, returncode). """ try: process = await asyncio.create_subprocess_exec( *cmd, cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) return stdout.decode(), stderr.decode(), process.returncode except asyncio.TimeoutError: try: process.kill() except: pass raise HTTPException(408, f"Helm command timed out after {timeout}s") except Exception as e: raise HTTPException(500, f"Failed to execute Helm command: {str(e)}") def parse_kubernetes_resources(yaml_content: str) -> List[Resource]: """ Parse multi-document YAML from helm template output. Returns a list of Resource objects. """ resources = [] print(f"\n{'='*60}") print(f"PARSING HELM TEMPLATE OUTPUT") print(f"{'='*60}") print(f"Total output length: {len(yaml_content)} characters") # Split by document separator (handle both --- and ---\n) documents = [] for doc in yaml_content.split("\n---"): doc = doc.strip() if doc: documents.append(doc) # Also try splitting by ---\n if we didn't get many docs if len(documents) <= 1: for doc in yaml_content.split("---"): doc = doc.strip() if doc: documents.append(doc) print(f"Split into {len(documents)} documents") print(f"{'-'*60}") for idx, doc in enumerate(documents): doc = doc.strip() print(f"\nDocument {idx + 1}/{len(documents)}:") print(f" Length: {len(doc)} chars") print(f" First 80 chars: {doc[:80].replace(chr(10), ' ')}") # Skip empty docs or pure comments if not doc: print(f" → SKIPPED: Empty document") continue # Remove leading comments but keep the YAML lines = doc.split('\n') yaml_lines = [] for line in lines: stripped = line.strip() # Skip comment-only lines at the start if not yaml_lines and (stripped.startswith('#') or not stripped): continue yaml_lines.append(line) doc = '\n'.join(yaml_lines) if not doc: print(f" → SKIPPED: Only comments") continue try: obj = yaml.safe_load(doc) if not obj or not isinstance(obj, dict): print(f" → SKIPPED: Not a dict (type={type(obj).__name__})") continue # Skip if not a Kubernetes resource if "apiVersion" not in obj or "kind" not in obj: print(f" → SKIPPED: Missing apiVersion or kind (keys={list(obj.keys())[:5]})") continue metadata = obj.get("metadata", {}) name = metadata.get("name", "unknown") namespace = metadata.get("namespace") labels = metadata.get("labels", {}) # Generate UID uid = f"{obj['kind']}-{name}-{str(uuid.uuid4())[:8]}" print(f" → ✓ ADDED: {obj['kind']}/{name}" + (f" (ns: {namespace})" if namespace else "")) resource = Resource( uid=uid, apiVersion=obj["apiVersion"], kind=obj["kind"], name=name, namespace=namespace, labels=labels, rawYaml=doc ) resources.append(resource) except yaml.YAMLError as e: # Log but continue print(f" → ERROR: YAML parse failed: {str(e)}") continue except Exception as e: print(f" → ERROR: Processing failed: {str(e)}") continue print(f"\n{'='*60}") print(f"PARSING COMPLETE: {len(resources)} resources extracted") print(f"{'='*60}\n") return resources def update_chart_version(chart_yaml_path: Path) -> str: """ Bump the patch version in Chart.yaml. Returns the new version. """ with open(chart_yaml_path, "r") as f: chart = yaml.safe_load(f) version = chart.get("version", "0.1.0") parts = version.split(".") # Increment patch version if len(parts) >= 3: parts[2] = str(int(parts[2]) + 1) else: parts = ["0", "1", "1"] new_version = ".".join(parts) chart["version"] = new_version with open(chart_yaml_path, "w") as f: yaml.safe_dump(chart, f) return new_version # ============================================================================ # API ENDPOINTS # ============================================================================ @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()} @app.post("/api/projects", response_model=Project) async def create_project(project: ProjectCreate): """Create a new project.""" project_id = str(uuid.uuid4()) project_path = get_project_path(project_id) project_path.mkdir(parents=True, exist_ok=True) project_data = { "id": project_id, "name": project.name, "description": project.description, "created_at": datetime.utcnow().isoformat(), "chart_uploaded": False, "chart_name": None, "resources": {}, "modified_resources": {} } projects_db[project_id] = project_data return Project(**project_data) @app.get("/api/projects") async def list_projects(): """List all projects.""" return [Project(**proj) for proj in projects_db.values()] @app.get("/api/projects/{project_id}", response_model=Project) async def get_project(project_id: str): """Get project details.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") return Project(**projects_db[project_id]) @app.post("/api/projects/{project_id}/upload") async def upload_chart(project_id: str, file: UploadFile = File(...)): """Upload and extract a Helm chart.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") # Check file size file.file.seek(0, 2) size = file.file.tell() file.file.seek(0) if size > MAX_UPLOAD_SIZE: raise HTTPException(413, f"File too large. Max size: {MAX_UPLOAD_SIZE} bytes") project_path = get_project_path(project_id) # Save uploaded file with original extension file_extension = Path(file.filename).suffix or ".tgz" upload_path = project_path / f"uploaded_chart{file_extension}" upload_path.parent.mkdir(parents=True, exist_ok=True) with open(upload_path, "wb") as f: content = await file.read() f.write(content) # Extract archive try: chart_path = safe_extract_archive(upload_path, project_path) # Move extracted chart to standard location final_chart_path = get_chart_path(project_id) if final_chart_path.exists(): shutil.rmtree(final_chart_path) shutil.move(str(chart_path), str(final_chart_path)) # Read Chart.yaml to get chart name chart_yaml_path = final_chart_path / "Chart.yaml" chart_name = "unknown" if chart_yaml_path.exists(): try: with open(chart_yaml_path, "r", encoding="utf-8") as f: chart_yaml = yaml.safe_load(f) if chart_yaml and isinstance(chart_yaml, dict): chart_name = chart_yaml.get("name", "unknown") except Exception as e: # If Chart.yaml parsing fails, continue with "unknown" chart_name = "unknown" # Update project projects_db[project_id]["chart_uploaded"] = True projects_db[project_id]["chart_name"] = chart_name return { "success": True, "message": "Chart uploaded and extracted successfully", "chart_name": chart_name } except HTTPException: raise except Exception as e: raise HTTPException(400, f"Failed to extract chart: {str(e)}") finally: # Clean up uploaded file if upload_path.exists(): upload_path.unlink() class GitUploadRequest(BaseModel): git_url: str = Field(..., min_length=1) chart_path: Optional[str] = None @app.post("/api/projects/{project_id}/upload-git") async def upload_chart_from_git(project_id: str, request: GitUploadRequest): """Clone a Git repository and extract Helm chart.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") project_path = get_project_path(project_id) temp_clone_dir = project_path / "temp_git_clone" try: # Clean up any existing temp directory if temp_clone_dir.exists(): shutil.rmtree(temp_clone_dir) temp_clone_dir.mkdir(parents=True, exist_ok=True) # Clone the repository (shallow clone for speed) clone_command = [ "git", "clone", "--depth", "1", request.git_url, str(temp_clone_dir) ] result = subprocess.run( clone_command, capture_output=True, text=True, timeout=120 # 2 minute timeout for git clone ) if result.returncode != 0: raise HTTPException(400, f"Git clone failed: {result.stderr}") # Find chart directory if request.chart_path: chart_source = temp_clone_dir / request.chart_path if not chart_source.exists(): raise HTTPException(400, f"Chart path '{request.chart_path}' not found in repository") else: # Look for Chart.yaml in root or common locations possible_locations = [ temp_clone_dir, temp_clone_dir / "chart", temp_clone_dir / "charts", ] # Also search recursively for Chart.yaml chart_yamls = list(temp_clone_dir.glob("**/Chart.yaml")) chart_source = None for location in possible_locations: if (location / "Chart.yaml").exists(): chart_source = location break if not chart_source and chart_yamls: # Use the first found Chart.yaml location chart_source = chart_yamls[0].parent if not chart_source: raise HTTPException(400, "No Chart.yaml found in repository. Please specify chart_path.") # Validate it's a helm chart if not (chart_source / "Chart.yaml").exists(): raise HTTPException(400, "Invalid Helm chart: Chart.yaml not found") # Copy to final location final_chart_path = get_chart_path(project_id) if final_chart_path.exists(): shutil.rmtree(final_chart_path) shutil.copytree(chart_source, final_chart_path) # Read Chart.yaml to get chart name chart_yaml_path = final_chart_path / "Chart.yaml" chart_name = "unknown" try: with open(chart_yaml_path, "r", encoding="utf-8") as f: chart_yaml = yaml.safe_load(f) if chart_yaml and isinstance(chart_yaml, dict): chart_name = chart_yaml.get("name", "unknown") except Exception: chart_name = "unknown" # Update project projects_db[project_id]["chart_uploaded"] = True projects_db[project_id]["chart_name"] = chart_name return { "success": True, "message": "Chart downloaded from Git successfully", "chart_name": chart_name, "chart_path": str(chart_source.relative_to(temp_clone_dir)) } except subprocess.TimeoutExpired: raise HTTPException(408, "Git clone timed out") except HTTPException: raise except Exception as e: raise HTTPException(400, f"Failed to download chart from Git: {str(e)}") finally: # Clean up temp clone directory if temp_clone_dir.exists(): try: shutil.rmtree(temp_clone_dir) except Exception: pass # Best effort cleanup @app.get("/api/projects/{project_id}/chart-info") async def get_chart_info(project_id: str): """Get Chart.yaml validation status.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") if not projects_db[project_id]["chart_uploaded"]: raise HTTPException(400, "No chart uploaded for this project") chart_path = get_chart_path(project_id) chart_yaml_path = chart_path / "Chart.yaml" result = { "valid": False, "errors": [] } # Validate Chart.yaml if chart_yaml_path.exists(): try: with open(chart_yaml_path, "r", encoding="utf-8") as f: content = f.read() # Check if file is empty if not content.strip(): result["errors"].append("⚠ Chart.yaml file is EMPTY") result["errors"].append("The file exists but has no content") return result # Try to parse YAML chart_yaml = yaml.safe_load(content) if not chart_yaml or not isinstance(chart_yaml, dict): result["errors"].append("⚠ Chart.yaml has invalid YAML format") result["errors"].append(f"File contains: {content[:200]}") return result # Check required fields if not chart_yaml.get("name"): result["errors"].append("⚠ Missing REQUIRED: 'name' field in Chart.yaml") if not chart_yaml.get("apiVersion"): result["errors"].append("⚠ Missing REQUIRED: 'apiVersion' field (must be 'v1' or 'v2')") elif chart_yaml.get("apiVersion") not in ["v1", "v2"]: result["errors"].append("⚠ Invalid apiVersion - must be 'v1' or 'v2'") if not chart_yaml.get("version"): result["errors"].append("⚠ Missing REQUIRED: 'version' field (e.g., '0.1.0')") if len(result["errors"]) == 0: result["valid"] = True except yaml.YAMLError as e: result["errors"].append(f"⚠ YAML syntax error: {str(e)}") except Exception as e: result["errors"].append(f"Failed to read Chart.yaml: {str(e)}") else: result["errors"].append("Chart.yaml file not found") return result @app.post("/api/projects/{project_id}/fix-chart-yaml") async def fix_chart_yaml(project_id: str): """Attempt to fix or create a valid Chart.yaml file.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") if not projects_db[project_id]["chart_uploaded"]: raise HTTPException(400, "No chart uploaded for this project") chart_path = get_chart_path(project_id) chart_yaml_path = chart_path / "Chart.yaml" # Try to read existing Chart.yaml existing_name = "my-chart" if chart_yaml_path.exists(): try: with open(chart_yaml_path, "r", encoding="utf-8") as f: content = f.read() if content.strip(): existing = yaml.safe_load(content) if existing and isinstance(existing, dict): existing_name = existing.get("name", "my-chart") except: pass # Create a valid Chart.yaml default_chart = { "apiVersion": "v2", "name": existing_name, "description": "A Helm chart for Kubernetes", "type": "application", "version": "0.1.0", "appVersion": "1.0.0" } # Write the fixed Chart.yaml with open(chart_yaml_path, "w", encoding="utf-8") as f: yaml.safe_dump(default_chart, f, default_flow_style=False, sort_keys=False) # Update project projects_db[project_id]["chart_name"] = existing_name return { "success": True, "message": "Chart.yaml has been fixed with default values", "chart_yaml": default_chart } @app.get("/api/projects/{project_id}/values") async def get_chart_values(project_id: str): """Get the default values.yaml from the chart.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") if not projects_db[project_id]["chart_uploaded"]: raise HTTPException(400, "No chart uploaded for this project") chart_path = get_chart_path(project_id) values_path = chart_path / "values.yaml" if not values_path.exists(): return {"values": "# No values.yaml found in chart\n# Add your custom values below:\n\nreplicaCount: 1\n\nimage:\n repository: nginx\n tag: latest\n pullPolicy: IfNotPresent\n"} try: with open(values_path, "r", encoding="utf-8") as f: values_content = f.read().strip() # If values.yaml is empty, provide a template if not values_content: values_content = "# Values file is empty\n# Add your custom values below:\n\nreplicaCount: 1\n" # Add helpful header header = """# Default values for this Helm chart # Edit the values below to customize your deployment # Lines marked with ##OPTIONAL can be removed or commented out # TIP: Many charts have features disabled by default (like ingress, autoscaling) # Look for 'enabled: false' and change to 'enabled: true' to enable them """ # Add ##OPTIONAL marker and enable suggestions lines = values_content.split('\n') enhanced_lines = [] optional_keywords = [ 'ingress:', 'enabled:', 'annotations:', 'labels:', 'resources:', 'limits:', 'requests:', 'nodeSelector:', 'tolerations:', 'affinity:', 'podAnnotations:', 'podLabels:', 'autoscaling:', 'serviceAccount:', 'securityContext:', 'livenessProbe:', 'readinessProbe:', 'volumeMounts:', 'volumes:' ] for i, line in enumerate(lines): stripped = line.lstrip() # Suggest enabling disabled features if 'enabled: false' in line.lower() or 'enabled:false' in line.lower(): enhanced_lines.append(f"{line} ##OPTIONAL - Change to 'true' to enable") # Check if this line starts a common optional section elif any(stripped.startswith(kw) for kw in optional_keywords): enhanced_lines.append(f"{line} ##OPTIONAL") else: enhanced_lines.append(line) return {"values": header + '\n'.join(enhanced_lines)} except Exception as e: raise HTTPException(500, f"Failed to read values.yaml: {str(e)}") @app.get("/api/projects/{project_id}/templates") async def get_chart_templates(project_id: str): """Get list of all template files in the chart.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") if not projects_db[project_id]["chart_uploaded"]: raise HTTPException(400, "No chart uploaded for this project") chart_path = get_chart_path(project_id) templates_path = chart_path / "templates" if not templates_path.exists(): return {"templates": [], "count": 0} # Find all YAML and tpl files recursively templates = [] for ext in ["*.yaml", "*.yml", "*.tpl"]: for template_file in templates_path.rglob(ext): if template_file.is_file(): rel_path = template_file.relative_to(chart_path) templates.append({ "path": str(rel_path), "name": template_file.name, "size": template_file.stat().st_size }) return {"templates": templates, "count": len(templates)} @app.post("/api/projects/{project_id}/render", response_model=RenderResult) async def render_chart(project_id: str, options: RenderOptions): """Render Helm chart with helm lint and helm template.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") if not projects_db[project_id]["chart_uploaded"]: raise HTTPException(400, "No chart uploaded for this project") chart_path = get_chart_path(project_id) if not chart_path.exists(): raise HTTPException(400, "Chart directory not found") result = RenderResult(success=True) # Save override values if provided values_override_path = None if options.values_override: values_override_path = chart_path / "values-override.yaml" with open(values_override_path, "w") as f: f.write(options.values_override) # Run helm lint lint_cmd = ["helm", "lint", str(chart_path)] lint_stdout, lint_stderr, lint_code = await run_helm_command(lint_cmd, chart_path) result.lint_output = lint_stdout + "\n" + lint_stderr if lint_code != 0: result.lint_errors = lint_stderr.split("\n") # Run helm template template_cmd = [ "helm", "template", options.release_name, str(chart_path), "--namespace", options.namespace, "--debug" # Add debug flag for more output ] if values_override_path: template_cmd.extend(["-f", str(values_override_path)]) if options.set_flags: for flag in options.set_flags: template_cmd.extend(["--set", flag]) if options.set_string_flags: for flag in options.set_string_flags: template_cmd.extend(["--set-string", flag]) template_stdout, template_stderr, template_code = await run_helm_command(template_cmd, chart_path) result.template_output = template_stdout if template_code != 0: result.success = False result.template_errors = template_stderr.split("\n") return result # Parse resources resources = parse_kubernetes_resources(template_stdout) result.resources = resources # Store template output for debugging projects_db[project_id]["last_template_output"] = template_stdout projects_db[project_id]["last_template_stderr"] = template_stderr # Store resources in project projects_db[project_id]["resources"] = {r.uid: r.dict() for r in resources} # Clean up override values if values_override_path and values_override_path.exists(): values_override_path.unlink() return result @app.get("/api/projects/{project_id}/debug-output") async def get_debug_output(project_id: str): """Get the raw helm template output for debugging.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") output = projects_db[project_id].get("last_template_output", "") stderr = projects_db[project_id].get("last_template_stderr", "") # Count documents in output doc_count = len([d for d in output.split("---") if d.strip()]) return { "raw_output": output, "stderr": stderr, "document_count": doc_count, "output_length": len(output) } @app.get("/api/projects/{project_id}/resources") async def get_resources(project_id: str): """Get all resources for a project.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") resources = projects_db[project_id].get("resources", {}) return list(resources.values()) @app.put("/api/projects/{project_id}/resources/{resource_uid}") async def update_resource(project_id: str, resource_uid: str, update: ResourceUpdate): """Update a resource's YAML content.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") resources = projects_db[project_id].get("resources", {}) if resource_uid not in resources: raise HTTPException(404, "Resource not found") # Validate YAML syntax try: yaml.safe_load(update.yaml_content) except yaml.YAMLError as e: raise HTTPException(400, f"Invalid YAML: {str(e)}") # Update resource resources[resource_uid]["rawYaml"] = update.yaml_content resources[resource_uid]["modified"] = True # Track modified resources if "modified_resources" not in projects_db[project_id]: projects_db[project_id]["modified_resources"] = {} projects_db[project_id]["modified_resources"][resource_uid] = update.yaml_content return {"success": True, "message": "Resource updated"} @app.post("/api/projects/{project_id}/resources/{resource_uid}/template-diff") async def get_template_diff(project_id: str, resource_uid: str, update: ResourceUpdate): """Generate diff showing how the template would change.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") resources = projects_db[project_id].get("resources", {}) if resource_uid not in resources: raise HTTPException(404, "Resource not found") resource = resources[resource_uid] chart_path = get_chart_path(project_id) templates_dir = chart_path / "templates" if not templates_dir.exists(): raise HTTPException(404, "Templates directory not found") # Find template file matching the resource kind template_file = None kind_lower = resource["kind"].lower() # Try common naming patterns for pattern in [f"{kind_lower}.yaml", f"{kind_lower}.yml"]: potential_file = templates_dir / pattern if potential_file.exists(): template_file = potential_file break # Search all templates for matching kind if not template_file: for tpl in templates_dir.glob("*.yaml"): if tpl.stem.startswith('_'): continue try: with open(tpl, "r", encoding="utf-8") as f: content = f.read() if f"kind: {resource['kind']}" in content: template_file = tpl break except Exception: continue if not template_file: raise HTTPException(404, f"Could not find template for {resource['kind']}") # Read original template with open(template_file, "r", encoding="utf-8") as f: original_template = f.read() return { "template_file": str(template_file.relative_to(chart_path)), "original_template": original_template, "modified_template": update.yaml_content, "additions": 0, "deletions": 0, } @app.post("/api/projects/{project_id}/resources/{resource_uid}/update-template") async def update_template(project_id: str, resource_uid: str, update: ResourceUpdate): """Update the template file with changes from edited resource.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") resources = projects_db[project_id].get("resources", {}) if resource_uid not in resources: raise HTTPException(404, "Resource not found") resource = resources[resource_uid] chart_path = get_chart_path(project_id) templates_dir = chart_path / "templates" if not templates_dir.exists(): raise HTTPException(404, "Templates directory not found") # Find template file template_file = None kind_lower = resource["kind"].lower() for pattern in [f"{kind_lower}.yaml", f"{kind_lower}.yml"]: potential_file = templates_dir / pattern if potential_file.exists(): template_file = potential_file break if not template_file: for tpl in templates_dir.glob("*.yaml"): if tpl.stem.startswith('_'): continue try: with open(tpl, "r", encoding="utf-8") as f: content = f.read() if f"kind: {resource['kind']}" in content: template_file = tpl break except Exception: continue if not template_file: raise HTTPException(404, f"Could not find template for {resource['kind']}") # Write updated YAML to template file with open(template_file, "w", encoding="utf-8") as f: f.write(update.yaml_content) return { "success": True, "message": f"Template {template_file.name} updated", "template_file": str(template_file.relative_to(chart_path)) } @app.post("/api/projects/{project_id}/export", response_model=ExportResult) async def export_chart(project_id: str): """ Export the modified chart as a new .tgz package. Runs helm lint and helm template to validate before export. """ if project_id not in projects_db: raise HTTPException(404, "Project not found") if not projects_db[project_id]["chart_uploaded"]: raise HTTPException(400, "No chart uploaded for this project") chart_path = get_chart_path(project_id) if not chart_path.exists(): raise HTTPException(400, "Chart directory not found") result = ExportResult(success=True) # Create modified chart in temporary location modified_chart_path = get_project_path(project_id) / "modified_chart" if modified_chart_path.exists(): shutil.rmtree(modified_chart_path) shutil.copytree(chart_path, modified_chart_path) # Apply modifications (simplified strategy: create overlay templates) modified_resources = projects_db[project_id].get("modified_resources", {}) if modified_resources: overlay_path = modified_chart_path / "templates" / "_helmview_generated.yaml" overlay_path.parent.mkdir(parents=True, exist_ok=True) with open(overlay_path, "w") as f: f.write("# HelmView Generated Overrides\n") f.write("# These resources were modified in the HelmView editor\n\n") for resource_yaml in modified_resources.values(): f.write("---\n") f.write(resource_yaml) f.write("\n") # Update Chart.yaml version chart_yaml_path = modified_chart_path / "Chart.yaml" new_version = update_chart_version(chart_yaml_path) result.version = new_version # Run helm lint lint_cmd = ["helm", "lint", str(modified_chart_path)] lint_stdout, lint_stderr, lint_code = await run_helm_command(lint_cmd, modified_chart_path) result.lint_output = lint_stdout + "\n" + lint_stderr if lint_code != 0: result.success = False result.errors.append(f"Helm lint failed: {lint_stderr}") return result # Run helm template to validate template_cmd = ["helm", "template", "test", str(modified_chart_path)] template_stdout, template_stderr, template_code = await run_helm_command(template_cmd, modified_chart_path) if template_code != 0: result.success = False result.errors.append(f"Helm template failed: {template_stderr}") return result # Package chart package_cmd = ["helm", "package", str(modified_chart_path), "-d", str(get_project_path(project_id))] package_stdout, package_stderr, package_code = await run_helm_command(package_cmd, modified_chart_path) if package_code != 0: result.success = False result.errors.append(f"Helm package failed: {package_stderr}") return result # Find the packaged chart chart_name = projects_db[project_id]["chart_name"] package_pattern = f"{chart_name}-{new_version}.tgz" package_path = get_project_path(project_id) / package_pattern if not package_path.exists(): # Try to find any .tgz file tgz_files = list(get_project_path(project_id).glob("*.tgz")) if tgz_files: package_path = tgz_files[0] else: result.success = False result.errors.append("Packaged chart file not found") return result result.chart_path = str(package_path) return result @app.get("/api/projects/{project_id}/download") async def download_chart(project_id: str): """Download the exported chart package.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") # Find the most recent .tgz file project_path = get_project_path(project_id) tgz_files = list(project_path.glob("*.tgz")) if not tgz_files: raise HTTPException(404, "No exported chart found. Run export first.") # Get the most recent file latest_file = max(tgz_files, key=lambda p: p.stat().st_mtime) return FileResponse( path=str(latest_file), filename=latest_file.name, media_type="application/gzip" ) @app.delete("/api/projects/{project_id}") async def delete_project(project_id: str): """Delete a project and its files.""" if project_id not in projects_db: raise HTTPException(404, "Project not found") # Delete filesystem project_path = get_project_path(project_id) if project_path.exists(): shutil.rmtree(project_path) # Delete from DB del projects_db[project_id] return {"success": True, "message": "Project deleted"} @app.get("/api/kubernetes-schemas/{kind}") async def get_kubernetes_schema(kind: str): """ Get Kubernetes schema for a specific resource kind. This is a simplified version. In production, use actual Kubernetes OpenAPI schemas. """ # This would ideally load from Kubernetes OpenAPI spec # For now, return a basic structure schemas = { "Deployment": { "apiVersion": ["apps/v1"], "kind": "Deployment", "metadata": {"name": "string", "namespace": "string", "labels": "object"}, "spec": { "replicas": "integer", "selector": "object", "template": "object" } }, "Service": { "apiVersion": ["v1"], "kind": "Service", "metadata": {"name": "string", "namespace": "string"}, "spec": { "type": ["ClusterIP", "NodePort", "LoadBalancer"], "ports": "array", "selector": "object" } } } return schemas.get(kind, {}) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)