Update json schema
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

This commit is contained in:
dvirlabs 2026-04-26 02:16:14 +03:00
parent f8fa847c11
commit 688e749030

641
app.py
View File

@ -1,9 +1,8 @@
#!/usr/bin/env python3
"""
GitOps Status Server API
Generic multi-server status API for GitOps deployments
Supports multiple server types: rsyslog, Splunk, IBM ITNM, nginx, etc.
Listens on port 5000 and handles status updates from pipelines and cron jobs
Multi-server status API organized by server type
Supports: rsyslog, Splunk, IBM ITNM, nginx, etc.
"""
import os
import json
@ -11,7 +10,6 @@ import logging
from flask import Flask, request, jsonify
from datetime import datetime, UTC
from flasgger import Swagger
from typing import Dict, List, Optional
app = Flask(__name__)
swagger = Swagger(app)
@ -29,540 +27,114 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
def load_data() -> Dict:
"""Load the current data structure from file"""
def load_data():
"""Load the current data from file"""
try:
if os.path.exists(STATUS_FILE):
with open(STATUS_FILE, 'r') as f:
data = json.load(f)
# Handle legacy format (v1.x) - migrate to v2 format
if 'version' not in data and 'repo' in data:
logger.info("Migrating from legacy v1 format to v2 format")
legacy_data = data.copy()
server_key = f"{legacy_data.get('server', 'unknown')}/{legacy_data.get('server', 'unknown')}"
# Convert old format to new format
migrated_server = {
"project_name": "gitops-for-servers",
"repo_name": legacy_data.get('repo', 'unknown'),
"server_group": legacy_data.get('server', 'unknown'),
"server_type": legacy_data.get('repo', 'unknown'),
"environment": "unknown",
"host": legacy_data.get('server', 'unknown'),
"status": legacy_data.get('sync_status', 'unknown').lower(),
"changed_files": [f.get('name', '') for f in legacy_data.get('drifted_files', [])] if isinstance(legacy_data.get('drifted_files'), list) else [],
"validation_status": "unknown",
"validation_message": "",
"last_pipeline_run": legacy_data.get('last_check', ''),
"last_cron_check": legacy_data.get('last_check', ''),
"commit_sha": "",
"branch": "",
"updated_by": "legacy_migration",
"timestamp": datetime.now(UTC).isoformat()
}
data = {
"version": "2.0",
"servers": {
server_key: migrated_server
}
}
save_data(data)
logger.info(f"Migration complete: {server_key}")
return data
return json.load(f)
else:
logger.warning(f"Status file not found: {STATUS_FILE}, creating new v2 structure")
return {
"version": "2.0",
"servers": {}
}
logger.warning(f"Status file not found: {STATUS_FILE}, creating new structure")
return {}
except Exception as e:
logger.error(f"Error loading status: {e}")
return {
"version": "2.0",
"servers": {}
}
return {}
def save_data(data: Dict) -> bool:
"""Save the data structure to file"""
def save_data(data):
"""Save the data to file"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(STATUS_FILE), exist_ok=True)
os.makedirs(os.path.dirname(STATUS_FILE) or '.', exist_ok=True)
# Write with proper formatting
with open(STATUS_FILE, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Data saved successfully ({len(data.get('servers', {}))} servers)")
logger.info(f"Data saved successfully")
return True
except Exception as e:
logger.error(f"Error saving data: {e}")
return False
def get_server_key(server_group: str, host: str) -> str:
"""Generate consistent key for server storage"""
return f"{server_group}/{host}"
def validate_required_fields(payload: Dict, required_fields: List[str]) -> Optional[str]:
"""Validate that required fields are present in payload"""
missing = [field for field in required_fields if field not in payload or payload[field] == ""]
if missing:
return f"Missing required fields: {', '.join(missing)}"
return None
@app.route('/status/pipeline', methods=['POST', 'OPTIONS'])
def status_pipeline():
@app.route('/api/<server_type>/status', methods=['GET', 'POST', 'OPTIONS'])
def api_server_status(server_type):
"""
Pipeline status update endpoint
Updates full deployment status from CI/CD pipeline
Server type status endpoint
GET: Retrieve status for a server type
POST: Update status for a server type
---
post:
summary: Update status from pipeline deployment
description: Full status update including deployment details, validation, commit info
get:
summary: Retrieve status for server type
parameters:
- in: path
name: server_type
type: string
required: true
description: Server type (rsyslog, splunk, nginx, etc.)
responses:
200:
description: Server status
post:
summary: Update status for server type
parameters:
- in: path
name: server_type
type: string
required: true
description: Server type (rsyslog, splunk, nginx, etc.)
- in: body
name: body
required: true
schema:
type: object
required:
- server_group
- host
- status
properties:
project_name:
server:
type: string
example: "gitops-for-servers"
repo_name:
example: "rsyslog-lab"
sync_status:
type: string
example: "rsyslog"
server_group:
type: string
example: "rsyslog-prod"
server_type:
type: string
example: "rsyslog"
environment:
type: string
example: "prod"
host:
type: string
example: "rsyslog-01"
status:
type: string
enum: ["synced", "out_of_sync", "failed", "unknown"]
example: "synced"
changed_files:
enum: ["SYNCED", "OUT_OF_SYNC", "UNKNOWN", "FAILED"]
example: "SYNCED"
drift_count:
type: integer
example: 0
deployed_files:
type: array
items:
type: object
properties:
name:
type: string
example: [{"name": "rsyslog.conf"}]
drifted_files:
type: array
items:
type: object
properties:
name:
type: string
example: []
validation_status:
type: string
example: "passed"
validation_message:
type: string
example: "rsyslog syntax validation passed"
commit_sha:
type: string
example: "abc1234"
branch:
type: string
example: "master"
updated_by:
type: string
example: "woodpecker"
last_pipeline_run:
last_check:
type: string
example: "2026-04-26T00:00:00Z"
responses:
200:
description: Status updated successfully
400:
description: Validation error
500:
description: Failed to save status
"""
if request.method == 'OPTIONS':
return '', 204
try:
payload = request.get_json()
if not payload:
return jsonify({"error": "No JSON data provided"}), 400
# Validate required fields
error = validate_required_fields(payload, ['server_group', 'host', 'status'])
if error:
return jsonify({"error": error}), 400
# Load current data
data = load_data()
# Generate server key
server_key = get_server_key(payload['server_group'], payload['host'])
# Get existing server data if present, or create new entry
server_data = data['servers'].get(server_key, {})
# Update all fields from pipeline (full update)
server_data.update({
"project_name": payload.get('project_name', server_data.get('project_name', '')),
"repo_name": payload.get('repo_name', server_data.get('repo_name', '')),
"server_group": payload['server_group'],
"server_type": payload.get('server_type', server_data.get('server_type', '')),
"environment": payload.get('environment', server_data.get('environment', '')),
"host": payload['host'],
"status": payload['status'],
"changed_files": payload.get('changed_files', []),
"validation_status": payload.get('validation_status', ''),
"validation_message": payload.get('validation_message', ''),
"commit_sha": payload.get('commit_sha', ''),
"branch": payload.get('branch', ''),
"updated_by": payload.get('updated_by', 'pipeline'),
"last_pipeline_run": payload.get('last_pipeline_run', datetime.now(UTC).isoformat()),
"last_cron_check": server_data.get('last_cron_check', ''),
"timestamp": datetime.now(UTC).isoformat()
})
# Save to data structure
data['servers'][server_key] = server_data
if save_data(data):
logger.info(f"Pipeline update: {server_key} -> {payload['status']}")
return jsonify({
"success": True,
"message": "Pipeline status updated successfully",
"server": server_data
}), 200
else:
return jsonify({"error": "Failed to save status"}), 500
except Exception as e:
logger.error(f"Error in POST /status/pipeline: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/status/cron', methods=['POST', 'OPTIONS'])
def status_cron():
"""
Cron status update endpoint
Updates only drift detection / health check data
Does NOT overwrite pipeline deployment data
---
post:
summary: Update status from cron drift check
description: Partial update for drift detection - does not overwrite commit_sha, changed_files, or validation data
parameters:
- in: body
name: body
required: true
schema:
type: object
required:
- server_group
- host
properties:
server_group:
type: string
example: "rsyslog-prod"
host:
type: string
example: "rsyslog-01"
status:
type: string
enum: ["synced", "out_of_sync", "failed", "unknown"]
example: "synced"
changed_files:
type: array
items:
type: string
example: []
last_cron_check:
type: string
example: "2026-04-26T01:00:00Z"
responses:
200:
description: Cron status updated successfully
400:
description: Validation error
404:
description: Server not found
500:
description: Failed to save status
"""
if request.method == 'OPTIONS':
return '', 204
try:
payload = request.get_json()
if not payload:
return jsonify({"error": "No JSON data provided"}), 400
# Validate required fields
error = validate_required_fields(payload, ['server_group', 'host'])
if error:
return jsonify({"error": error}), 400
# Load current data
data = load_data()
# Generate server key
server_key = get_server_key(payload['server_group'], payload['host'])
# Check if server exists
if server_key not in data['servers']:
logger.warning(f"Cron update for unknown server: {server_key}")
return jsonify({"error": f"Server {server_key} not found. Use /status/pipeline first."}), 404
server_data = data['servers'][server_key]
# Cron updates only specific fields, preserves pipeline data
if 'status' in payload:
server_data['status'] = payload['status']
if 'changed_files' in payload:
server_data['changed_files'] = payload['changed_files']
server_data['last_cron_check'] = payload.get('last_cron_check', datetime.now(UTC).isoformat())
server_data['timestamp'] = datetime.now(UTC).isoformat()
# Save to data structure
data['servers'][server_key] = server_data
if save_data(data):
logger.info(f"Cron update: {server_key} -> {server_data.get('status', 'unknown')}")
return jsonify({
"success": True,
"message": "Cron status updated successfully",
"server": server_data
}), 200
else:
return jsonify({"error": "Failed to save status"}), 500
except Exception as e:
logger.error(f"Error in POST /status/cron: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/status', methods=['GET'])
def get_all_status():
"""
Get all server statuses (Grafana Infinity friendly)
---
get:
summary: Retrieve all server statuses
description: Returns flat array of all servers, optimized for Grafana Infinity datasource
responses:
200:
description: List of all server statuses
schema:
type: array
items:
type: object
properties:
server_group:
type: string
server_type:
type: string
host:
type: string
status:
type: string
changed_files_count:
type: integer
changed_files:
type: array
items:
type: string
validation_status:
type: string
last_pipeline_run:
type: string
last_cron_check:
type: string
commit_sha:
type: string
"""
try:
data = load_data()
# Convert to flat array for Grafana
result = []
for server_key, server_data in data.get('servers', {}).items():
result.append({
"project_name": server_data.get('project_name', ''),
"repo_name": server_data.get('repo_name', ''),
"server_group": server_data.get('server_group', ''),
"server_type": server_data.get('server_type', ''),
"environment": server_data.get('environment', ''),
"host": server_data.get('host', ''),
"status": server_data.get('status', 'unknown'),
"changed_files_count": len(server_data.get('changed_files', [])),
"changed_files": server_data.get('changed_files', []),
"validation_status": server_data.get('validation_status', ''),
"validation_message": server_data.get('validation_message', ''),
"last_pipeline_run": server_data.get('last_pipeline_run', ''),
"last_cron_check": server_data.get('last_cron_check', ''),
"commit_sha": server_data.get('commit_sha', ''),
"branch": server_data.get('branch', ''),
"updated_by": server_data.get('updated_by', ''),
"timestamp": server_data.get('timestamp', '')
})
return jsonify(result), 200
except Exception as e:
logger.error(f"Error in GET /status: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/status/<server_group>', methods=['GET'])
def get_server_group_status(server_group: str):
"""
Get status for all servers in a specific server group
---
get:
summary: Retrieve status for a server group
parameters:
- in: path
name: server_group
type: string
required: true
description: The server group name
responses:
200:
description: List of servers in the group
404:
description: Server group not found
"""
try:
data = load_data()
# Filter by server_group
result = []
for server_key, server_data in data.get('servers', {}).items():
if server_data.get('server_group') == server_group:
result.append({
"project_name": server_data.get('project_name', ''),
"repo_name": server_data.get('repo_name', ''),
"server_group": server_data.get('server_group', ''),
"server_type": server_data.get('server_type', ''),
"environment": server_data.get('environment', ''),
"host": server_data.get('host', ''),
"status": server_data.get('status', 'unknown'),
"changed_files_count": len(server_data.get('changed_files', [])),
"changed_files": server_data.get('changed_files', []),
"validation_status": server_data.get('validation_status', ''),
"validation_message": server_data.get('validation_message', ''),
"last_pipeline_run": server_data.get('last_pipeline_run', ''),
"last_cron_check": server_data.get('last_cron_check', ''),
"commit_sha": server_data.get('commit_sha', ''),
"branch": server_data.get('branch', ''),
"updated_by": server_data.get('updated_by', ''),
"timestamp": server_data.get('timestamp', '')
})
if not result:
return jsonify({"error": f"No servers found in group: {server_group}"}), 404
return jsonify(result), 200
except Exception as e:
logger.error(f"Error in GET /status/{server_group}: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/status/<server_group>/<host>', methods=['GET'])
def get_host_status(server_group: str, host: str):
"""
Get status for a specific server
---
get:
summary: Retrieve status for a specific server
parameters:
- in: path
name: server_group
type: string
required: true
description: The server group name
- in: path
name: host
type: string
required: true
description: The hostname
responses:
200:
description: Server status
404:
description: Server not found
"""
try:
data = load_data()
server_key = get_server_key(server_group, host)
if server_key not in data.get('servers', {}):
return jsonify({"error": f"Server not found: {server_key}"}), 404
server_data = data['servers'][server_key]
return jsonify({
"project_name": server_data.get('project_name', ''),
"repo_name": server_data.get('repo_name', ''),
"server_group": server_data.get('server_group', ''),
"server_type": server_data.get('server_type', ''),
"environment": server_data.get('environment', ''),
"host": server_data.get('host', ''),
"status": server_data.get('status', 'unknown'),
"changed_files_count": len(server_data.get('changed_files', [])),
"changed_files": server_data.get('changed_files', []),
"validation_status": server_data.get('validation_status', ''),
"validation_message": server_data.get('validation_message', ''),
"last_pipeline_run": server_data.get('last_pipeline_run', ''),
"last_cron_check": server_data.get('last_cron_check', ''),
"commit_sha": server_data.get('commit_sha', ''),
"branch": server_data.get('branch', ''),
"updated_by": server_data.get('updated_by', ''),
"timestamp": server_data.get('timestamp', '')
}), 200
except Exception as e:
logger.error(f"Error in GET /status/{server_group}/{host}: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/status', methods=['GET', 'POST', 'OPTIONS'])
def api_status_legacy():
"""
Legacy API endpoint for backward compatibility
---
get:
summary: (Legacy) Retrieve current status
deprecated: true
description: Use GET /status instead
post:
summary: (Legacy) Update status with old format
deprecated: true
description: Use POST /status/pipeline instead
"""
if request.method == 'OPTIONS':
return '', 204
if request.method == 'GET':
# Return first server in old format, or empty old format
try:
data = load_data()
if not data.get('servers'):
# Return status for this server type
if server_type not in data:
return jsonify({
"repo": "unknown",
"repo": server_type,
"server": "unknown",
"sync_status": "UNKNOWN",
"drift_count": 0,
@ -571,75 +143,53 @@ def api_status_legacy():
"last_check": ""
}), 200
# Get first server and convert to old format
first_key = list(data['servers'].keys())[0]
server = data['servers'][first_key]
return jsonify(data[server_type]), 200
return jsonify({
"repo": server.get('repo_name', 'unknown'),
"server": server.get('server_group', 'unknown'),
"sync_status": server.get('status', 'unknown').upper(),
"drift_count": len(server.get('changed_files', [])),
"deployed_files": [],
"drifted_files": [{"name": f} for f in server.get('changed_files', [])],
"last_check": server.get('last_pipeline_run', '')
}), 200
except Exception as e:
logger.error(f"Error in GET /api/{server_type}/status: {e}")
return jsonify({"error": str(e)}), 500
if request.method == 'POST':
# Accept old format and convert to new format
try:
incoming_data = request.get_json()
if not incoming_data:
return jsonify({"error": "No JSON data provided"}), 400
# Convert old format to new format
server_group = incoming_data.get('server', 'unknown')
host = incoming_data.get('server', 'unknown')
new_payload = {
"project_name": "gitops-for-servers",
"repo_name": incoming_data.get('repo', 'unknown'),
"server_group": server_group,
"server_type": incoming_data.get('repo', 'unknown'),
"environment": "unknown",
"host": host,
"status": incoming_data.get('sync_status', 'unknown').lower(),
"changed_files": [f.get('name', '') for f in incoming_data.get('drifted_files', [])] if isinstance(incoming_data.get('drifted_files'), list) else [],
"validation_status": "unknown",
"validation_message": "",
"commit_sha": "",
"branch": "",
"updated_by": "legacy_api",
"last_pipeline_run": incoming_data.get('last_check', datetime.now(UTC).isoformat())
}
# Load current data
data = load_data()
server_key = get_server_key(server_group, host)
server_data = data['servers'].get(server_key, {})
# Merge with existing data
server_data.update(new_payload)
server_data['timestamp'] = datetime.now(UTC).isoformat()
# Get existing data for this server type or create new
if server_type not in data:
data[server_type] = {}
data['servers'][server_key] = server_data
# Update with incoming data (merge)
data[server_type].update(incoming_data)
# Set repo field
data[server_type]['repo'] = server_type
# Add/update timestamp if not present
if 'last_check' not in data[server_type] or not data[server_type]['last_check']:
data[server_type]['last_check'] = datetime.now(UTC).isoformat()
# Save updated data
if save_data(data):
logger.info(f"Legacy API update: {server_key}")
logger.info(f"Status updated for {server_type}: {data[server_type].get('server', 'unknown')} -> {data[server_type].get('sync_status', 'UNKNOWN')}")
return jsonify({
"success": True,
"message": "Status updated successfully (legacy format)",
"status": incoming_data
"message": "Status updated successfully",
"status": data[server_type]
}), 200
else:
return jsonify({"error": "Failed to save status"}), 500
except json.JSONDecodeError:
return jsonify({"error": "Invalid JSON"}), 400
except Exception as e:
logger.error(f"Error in POST /api/status (legacy): {e}")
logger.error(f"Error in POST /api/{server_type}/status: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
"""
@ -673,8 +223,8 @@ def ready():
data = load_data()
# Verify we can read it
if isinstance(data, dict) and 'version' in data:
return jsonify({"status": "ready", "version": data['version']}), 200
if isinstance(data, dict):
return jsonify({"status": "ready"}), 200
return jsonify({"status": "not_ready", "reason": "invalid data structure"}), 503
except Exception as e:
@ -694,15 +244,10 @@ def root():
return jsonify({
"name": "GitOps Status API",
"version": "2.0.0",
"description": "Generic multi-server status API for GitOps deployments",
"description": "Multi-server status API organized by server type",
"endpoints": {
"POST /status/pipeline": "Update full deployment status from pipeline",
"POST /status/cron": "Update drift check status from cron",
"GET /status": "Retrieve all server statuses (Grafana friendly)",
"GET /status/{server_group}": "Retrieve status for server group",
"GET /status/{server_group}/{host}": "Retrieve status for specific server",
"GET /api/status": "(Legacy) Retrieve status in old format",
"POST /api/status": "(Legacy) Update status with old format",
"GET /api/{server_type}/status": "Retrieve status for server type (rsyslog, splunk, nginx, etc.)",
"POST /api/{server_type}/status": "Update status for server type",
"GET /health": "Liveness probe",
"GET /ready": "Readiness probe"
},
@ -711,8 +256,13 @@ def root():
"splunk",
"ibm-itnm",
"nginx",
"any-generic-server-type"
]
"any-server-type"
],
"examples": {
"rsyslog": "GET/POST /api/rsyslog/status",
"splunk": "GET/POST /api/splunk/status",
"nginx": "GET/POST /api/nginx/status"
}
}), 200
@ -724,3 +274,4 @@ if __name__ == '__main__':
os.makedirs(os.path.dirname(STATUS_FILE) or '.', exist_ok=True)
app.run(host=API_HOST, port=API_PORT, debug=False, threaded=True)