dvirlabs f8fa847c11
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
refactor the project to make it generic
2026-04-26 01:08:15 +03:00

727 lines
26 KiB
Python

#!/usr/bin/env python3
"""
GitOps Status Server API
Generic multi-server status API for GitOps deployments
Supports multiple server types: rsyslog, Splunk, IBM ITNM, nginx, etc.
Listens on port 5000 and handles status updates from pipelines and cron jobs
"""
import os
import json
import logging
from flask import Flask, request, jsonify
from datetime import datetime, UTC
from flasgger import Swagger
from typing import Dict, List, Optional
app = Flask(__name__)
swagger = Swagger(app)
# Configuration from environment
STATUS_FILE = os.environ.get('STATUS_FILE', '/data/status.json')
API_HOST = os.environ.get('API_HOST', '0.0.0.0')
API_PORT = int(os.environ.get('API_PORT', 5000))
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_data() -> Dict:
"""Load the current data structure from file"""
try:
if os.path.exists(STATUS_FILE):
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
# Handle legacy format (v1.x) - migrate to v2 format
if 'version' not in data and 'repo' in data:
logger.info("Migrating from legacy v1 format to v2 format")
legacy_data = data.copy()
server_key = f"{legacy_data.get('server', 'unknown')}/{legacy_data.get('server', 'unknown')}"
# Convert old format to new format
migrated_server = {
"project_name": "gitops-for-servers",
"repo_name": legacy_data.get('repo', 'unknown'),
"server_group": legacy_data.get('server', 'unknown'),
"server_type": legacy_data.get('repo', 'unknown'),
"environment": "unknown",
"host": legacy_data.get('server', 'unknown'),
"status": legacy_data.get('sync_status', 'unknown').lower(),
"changed_files": [f.get('name', '') for f in legacy_data.get('drifted_files', [])] if isinstance(legacy_data.get('drifted_files'), list) else [],
"validation_status": "unknown",
"validation_message": "",
"last_pipeline_run": legacy_data.get('last_check', ''),
"last_cron_check": legacy_data.get('last_check', ''),
"commit_sha": "",
"branch": "",
"updated_by": "legacy_migration",
"timestamp": datetime.now(UTC).isoformat()
}
data = {
"version": "2.0",
"servers": {
server_key: migrated_server
}
}
save_data(data)
logger.info(f"Migration complete: {server_key}")
return data
else:
logger.warning(f"Status file not found: {STATUS_FILE}, creating new v2 structure")
return {
"version": "2.0",
"servers": {}
}
except Exception as e:
logger.error(f"Error loading status: {e}")
return {
"version": "2.0",
"servers": {}
}
def save_data(data: Dict) -> bool:
"""Save the data structure to file"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(STATUS_FILE), exist_ok=True)
# Write with proper formatting
with open(STATUS_FILE, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Data saved successfully ({len(data.get('servers', {}))} servers)")
return True
except Exception as e:
logger.error(f"Error saving data: {e}")
return False
def get_server_key(server_group: str, host: str) -> str:
"""Generate consistent key for server storage"""
return f"{server_group}/{host}"
def validate_required_fields(payload: Dict, required_fields: List[str]) -> Optional[str]:
"""Validate that required fields are present in payload"""
missing = [field for field in required_fields if field not in payload or payload[field] == ""]
if missing:
return f"Missing required fields: {', '.join(missing)}"
return None
@app.route('/status/pipeline', methods=['POST', 'OPTIONS'])
def status_pipeline():
"""
Pipeline status update endpoint
Updates full deployment status from CI/CD pipeline
---
post:
summary: Update status from pipeline deployment
description: Full status update including deployment details, validation, commit info
parameters:
- in: body
name: body
required: true
schema:
type: object
required:
- server_group
- host
- status
properties:
project_name:
type: string
example: "gitops-for-servers"
repo_name:
type: string
example: "rsyslog"
server_group:
type: string
example: "rsyslog-prod"
server_type:
type: string
example: "rsyslog"
environment:
type: string
example: "prod"
host:
type: string
example: "rsyslog-01"
status:
type: string
enum: ["synced", "out_of_sync", "failed", "unknown"]
example: "synced"
changed_files:
type: array
items:
type: string
example: []
validation_status:
type: string
example: "passed"
validation_message:
type: string
example: "rsyslog syntax validation passed"
commit_sha:
type: string
example: "abc1234"
branch:
type: string
example: "master"
updated_by:
type: string
example: "woodpecker"
last_pipeline_run:
type: string
example: "2026-04-26T00:00:00Z"
responses:
200:
description: Status updated successfully
400:
description: Validation error
500:
description: Failed to save status
"""
if request.method == 'OPTIONS':
return '', 204
try:
payload = request.get_json()
if not payload:
return jsonify({"error": "No JSON data provided"}), 400
# Validate required fields
error = validate_required_fields(payload, ['server_group', 'host', 'status'])
if error:
return jsonify({"error": error}), 400
# Load current data
data = load_data()
# Generate server key
server_key = get_server_key(payload['server_group'], payload['host'])
# Get existing server data if present, or create new entry
server_data = data['servers'].get(server_key, {})
# Update all fields from pipeline (full update)
server_data.update({
"project_name": payload.get('project_name', server_data.get('project_name', '')),
"repo_name": payload.get('repo_name', server_data.get('repo_name', '')),
"server_group": payload['server_group'],
"server_type": payload.get('server_type', server_data.get('server_type', '')),
"environment": payload.get('environment', server_data.get('environment', '')),
"host": payload['host'],
"status": payload['status'],
"changed_files": payload.get('changed_files', []),
"validation_status": payload.get('validation_status', ''),
"validation_message": payload.get('validation_message', ''),
"commit_sha": payload.get('commit_sha', ''),
"branch": payload.get('branch', ''),
"updated_by": payload.get('updated_by', 'pipeline'),
"last_pipeline_run": payload.get('last_pipeline_run', datetime.now(UTC).isoformat()),
"last_cron_check": server_data.get('last_cron_check', ''),
"timestamp": datetime.now(UTC).isoformat()
})
# Save to data structure
data['servers'][server_key] = server_data
if save_data(data):
logger.info(f"Pipeline update: {server_key} -> {payload['status']}")
return jsonify({
"success": True,
"message": "Pipeline status updated successfully",
"server": server_data
}), 200
else:
return jsonify({"error": "Failed to save status"}), 500
except Exception as e:
logger.error(f"Error in POST /status/pipeline: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/status/cron', methods=['POST', 'OPTIONS'])
def status_cron():
"""
Cron status update endpoint
Updates only drift detection / health check data
Does NOT overwrite pipeline deployment data
---
post:
summary: Update status from cron drift check
description: Partial update for drift detection - does not overwrite commit_sha, changed_files, or validation data
parameters:
- in: body
name: body
required: true
schema:
type: object
required:
- server_group
- host
properties:
server_group:
type: string
example: "rsyslog-prod"
host:
type: string
example: "rsyslog-01"
status:
type: string
enum: ["synced", "out_of_sync", "failed", "unknown"]
example: "synced"
changed_files:
type: array
items:
type: string
example: []
last_cron_check:
type: string
example: "2026-04-26T01:00:00Z"
responses:
200:
description: Cron status updated successfully
400:
description: Validation error
404:
description: Server not found
500:
description: Failed to save status
"""
if request.method == 'OPTIONS':
return '', 204
try:
payload = request.get_json()
if not payload:
return jsonify({"error": "No JSON data provided"}), 400
# Validate required fields
error = validate_required_fields(payload, ['server_group', 'host'])
if error:
return jsonify({"error": error}), 400
# Load current data
data = load_data()
# Generate server key
server_key = get_server_key(payload['server_group'], payload['host'])
# Check if server exists
if server_key not in data['servers']:
logger.warning(f"Cron update for unknown server: {server_key}")
return jsonify({"error": f"Server {server_key} not found. Use /status/pipeline first."}), 404
server_data = data['servers'][server_key]
# Cron updates only specific fields, preserves pipeline data
if 'status' in payload:
server_data['status'] = payload['status']
if 'changed_files' in payload:
server_data['changed_files'] = payload['changed_files']
server_data['last_cron_check'] = payload.get('last_cron_check', datetime.now(UTC).isoformat())
server_data['timestamp'] = datetime.now(UTC).isoformat()
# Save to data structure
data['servers'][server_key] = server_data
if save_data(data):
logger.info(f"Cron update: {server_key} -> {server_data.get('status', 'unknown')}")
return jsonify({
"success": True,
"message": "Cron status updated successfully",
"server": server_data
}), 200
else:
return jsonify({"error": "Failed to save status"}), 500
except Exception as e:
logger.error(f"Error in POST /status/cron: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/status', methods=['GET'])
def get_all_status():
"""
Get all server statuses (Grafana Infinity friendly)
---
get:
summary: Retrieve all server statuses
description: Returns flat array of all servers, optimized for Grafana Infinity datasource
responses:
200:
description: List of all server statuses
schema:
type: array
items:
type: object
properties:
server_group:
type: string
server_type:
type: string
host:
type: string
status:
type: string
changed_files_count:
type: integer
changed_files:
type: array
items:
type: string
validation_status:
type: string
last_pipeline_run:
type: string
last_cron_check:
type: string
commit_sha:
type: string
"""
try:
data = load_data()
# Convert to flat array for Grafana
result = []
for server_key, server_data in data.get('servers', {}).items():
result.append({
"project_name": server_data.get('project_name', ''),
"repo_name": server_data.get('repo_name', ''),
"server_group": server_data.get('server_group', ''),
"server_type": server_data.get('server_type', ''),
"environment": server_data.get('environment', ''),
"host": server_data.get('host', ''),
"status": server_data.get('status', 'unknown'),
"changed_files_count": len(server_data.get('changed_files', [])),
"changed_files": server_data.get('changed_files', []),
"validation_status": server_data.get('validation_status', ''),
"validation_message": server_data.get('validation_message', ''),
"last_pipeline_run": server_data.get('last_pipeline_run', ''),
"last_cron_check": server_data.get('last_cron_check', ''),
"commit_sha": server_data.get('commit_sha', ''),
"branch": server_data.get('branch', ''),
"updated_by": server_data.get('updated_by', ''),
"timestamp": server_data.get('timestamp', '')
})
return jsonify(result), 200
except Exception as e:
logger.error(f"Error in GET /status: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/status/<server_group>', methods=['GET'])
def get_server_group_status(server_group: str):
"""
Get status for all servers in a specific server group
---
get:
summary: Retrieve status for a server group
parameters:
- in: path
name: server_group
type: string
required: true
description: The server group name
responses:
200:
description: List of servers in the group
404:
description: Server group not found
"""
try:
data = load_data()
# Filter by server_group
result = []
for server_key, server_data in data.get('servers', {}).items():
if server_data.get('server_group') == server_group:
result.append({
"project_name": server_data.get('project_name', ''),
"repo_name": server_data.get('repo_name', ''),
"server_group": server_data.get('server_group', ''),
"server_type": server_data.get('server_type', ''),
"environment": server_data.get('environment', ''),
"host": server_data.get('host', ''),
"status": server_data.get('status', 'unknown'),
"changed_files_count": len(server_data.get('changed_files', [])),
"changed_files": server_data.get('changed_files', []),
"validation_status": server_data.get('validation_status', ''),
"validation_message": server_data.get('validation_message', ''),
"last_pipeline_run": server_data.get('last_pipeline_run', ''),
"last_cron_check": server_data.get('last_cron_check', ''),
"commit_sha": server_data.get('commit_sha', ''),
"branch": server_data.get('branch', ''),
"updated_by": server_data.get('updated_by', ''),
"timestamp": server_data.get('timestamp', '')
})
if not result:
return jsonify({"error": f"No servers found in group: {server_group}"}), 404
return jsonify(result), 200
except Exception as e:
logger.error(f"Error in GET /status/{server_group}: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/status/<server_group>/<host>', methods=['GET'])
def get_host_status(server_group: str, host: str):
"""
Get status for a specific server
---
get:
summary: Retrieve status for a specific server
parameters:
- in: path
name: server_group
type: string
required: true
description: The server group name
- in: path
name: host
type: string
required: true
description: The hostname
responses:
200:
description: Server status
404:
description: Server not found
"""
try:
data = load_data()
server_key = get_server_key(server_group, host)
if server_key not in data.get('servers', {}):
return jsonify({"error": f"Server not found: {server_key}"}), 404
server_data = data['servers'][server_key]
return jsonify({
"project_name": server_data.get('project_name', ''),
"repo_name": server_data.get('repo_name', ''),
"server_group": server_data.get('server_group', ''),
"server_type": server_data.get('server_type', ''),
"environment": server_data.get('environment', ''),
"host": server_data.get('host', ''),
"status": server_data.get('status', 'unknown'),
"changed_files_count": len(server_data.get('changed_files', [])),
"changed_files": server_data.get('changed_files', []),
"validation_status": server_data.get('validation_status', ''),
"validation_message": server_data.get('validation_message', ''),
"last_pipeline_run": server_data.get('last_pipeline_run', ''),
"last_cron_check": server_data.get('last_cron_check', ''),
"commit_sha": server_data.get('commit_sha', ''),
"branch": server_data.get('branch', ''),
"updated_by": server_data.get('updated_by', ''),
"timestamp": server_data.get('timestamp', '')
}), 200
except Exception as e:
logger.error(f"Error in GET /status/{server_group}/{host}: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/status', methods=['GET', 'POST', 'OPTIONS'])
def api_status_legacy():
"""
Legacy API endpoint for backward compatibility
---
get:
summary: (Legacy) Retrieve current status
deprecated: true
description: Use GET /status instead
post:
summary: (Legacy) Update status with old format
deprecated: true
description: Use POST /status/pipeline instead
"""
if request.method == 'OPTIONS':
return '', 204
if request.method == 'GET':
# Return first server in old format, or empty old format
data = load_data()
if not data.get('servers'):
return jsonify({
"repo": "unknown",
"server": "unknown",
"sync_status": "UNKNOWN",
"drift_count": 0,
"deployed_files": [],
"drifted_files": [],
"last_check": ""
}), 200
# Get first server and convert to old format
first_key = list(data['servers'].keys())[0]
server = data['servers'][first_key]
return jsonify({
"repo": server.get('repo_name', 'unknown'),
"server": server.get('server_group', 'unknown'),
"sync_status": server.get('status', 'unknown').upper(),
"drift_count": len(server.get('changed_files', [])),
"deployed_files": [],
"drifted_files": [{"name": f} for f in server.get('changed_files', [])],
"last_check": server.get('last_pipeline_run', '')
}), 200
if request.method == 'POST':
# Accept old format and convert to new format
try:
incoming_data = request.get_json()
if not incoming_data:
return jsonify({"error": "No JSON data provided"}), 400
# Convert old format to new format
server_group = incoming_data.get('server', 'unknown')
host = incoming_data.get('server', 'unknown')
new_payload = {
"project_name": "gitops-for-servers",
"repo_name": incoming_data.get('repo', 'unknown'),
"server_group": server_group,
"server_type": incoming_data.get('repo', 'unknown'),
"environment": "unknown",
"host": host,
"status": incoming_data.get('sync_status', 'unknown').lower(),
"changed_files": [f.get('name', '') for f in incoming_data.get('drifted_files', [])] if isinstance(incoming_data.get('drifted_files'), list) else [],
"validation_status": "unknown",
"validation_message": "",
"commit_sha": "",
"branch": "",
"updated_by": "legacy_api",
"last_pipeline_run": incoming_data.get('last_check', datetime.now(UTC).isoformat())
}
# Load current data
data = load_data()
server_key = get_server_key(server_group, host)
server_data = data['servers'].get(server_key, {})
# Merge with existing data
server_data.update(new_payload)
server_data['timestamp'] = datetime.now(UTC).isoformat()
data['servers'][server_key] = server_data
if save_data(data):
logger.info(f"Legacy API update: {server_key}")
return jsonify({
"success": True,
"message": "Status updated successfully (legacy format)",
"status": incoming_data
}), 200
else:
return jsonify({"error": "Failed to save status"}), 500
except Exception as e:
logger.error(f"Error in POST /api/status (legacy): {e}")
return jsonify({"error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
"""
GET /health - Kubernetes liveness probe
---
responses:
200:
description: API is healthy
"""
return jsonify({"status": "healthy"}), 200
@app.route('/ready', methods=['GET'])
def ready():
"""
GET /ready - Kubernetes readiness probe
---
responses:
200:
description: API is ready to serve requests
503:
description: API is not ready
"""
try:
# Check if data directory is writable
data_dir = os.path.dirname(STATUS_FILE)
if not os.path.exists(data_dir):
os.makedirs(data_dir, exist_ok=True)
# Try to read or create status file
data = load_data()
# Verify we can read it
if isinstance(data, dict) and 'version' in data:
return jsonify({"status": "ready", "version": data['version']}), 200
return jsonify({"status": "not_ready", "reason": "invalid data structure"}), 503
except Exception as e:
logger.error(f"Readiness check failed: {e}")
return jsonify({"status": "not_ready", "error": str(e)}), 503
@app.route('/', methods=['GET'])
def root():
"""
GET / - API information and available endpoints
---
responses:
200:
description: API metadata and endpoint list
"""
return jsonify({
"name": "GitOps Status API",
"version": "2.0.0",
"description": "Generic multi-server status API for GitOps deployments",
"endpoints": {
"POST /status/pipeline": "Update full deployment status from pipeline",
"POST /status/cron": "Update drift check status from cron",
"GET /status": "Retrieve all server statuses (Grafana friendly)",
"GET /status/{server_group}": "Retrieve status for server group",
"GET /status/{server_group}/{host}": "Retrieve status for specific server",
"GET /api/status": "(Legacy) Retrieve status in old format",
"POST /api/status": "(Legacy) Update status with old format",
"GET /health": "Liveness probe",
"GET /ready": "Readiness probe"
},
"supported_server_types": [
"rsyslog",
"splunk",
"ibm-itnm",
"nginx",
"any-generic-server-type"
]
}), 200
if __name__ == '__main__':
logger.info(f"Starting GitOps Status API v2.0 on {API_HOST}:{API_PORT}")
logger.info(f"Status file location: {STATUS_FILE}")
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(STATUS_FILE) or '.', exist_ok=True)
app.run(host=API_HOST, port=API_PORT, debug=False, threaded=True)