Initial commit

This commit is contained in:
Codex
2026-01-23 11:12:31 +01:00
commit 0c420a8697
27 changed files with 1767 additions and 0 deletions

1
client/src/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Backup client package."""

4
client/src/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
from .ui import main
if __name__ == "__main__":
main()

120
client/src/api_client.py Normal file
View File

@@ -0,0 +1,120 @@
from __future__ import annotations
import json
from datetime import datetime
from typing import Any, Dict, List
import requests
import urllib3
from .config import (
AppConfig,
ProfileConfig,
is_token_valid,
save_config,
update_token,
)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class ApiClientError(Exception):
pass
class ApiClient:
def __init__(self, config: AppConfig) -> None:
self.config = config
self._session = requests.Session()
self._session.verify = False
def _headers(self) -> Dict[str, str]:
headers = {"Content-Type": "application/json"}
token = self.config.token
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def login(self, password: str) -> None:
payload = {"username": self.config.api_user, "password": password}
response = self._session.post(f"{self.config.server_url}/auth/login", data=payload, timeout=10)
if response.status_code != 200:
raise ApiClientError("Login failed")
body = response.json()
token = body.get("access_token")
expires_at_raw = body.get("expires_at")
if not token or not expires_at_raw:
raise ApiClientError("Invalid auth payload")
expires_at = datetime.fromisoformat(expires_at_raw)
update_token(self.config, token, expires_at)
def ensure_authenticated(self) -> None:
if is_token_valid(self.config):
return
raise ApiClientError("Client not authenticated (login required)")
def ensure_remote_profile(self, profile: ProfileConfig) -> ProfileConfig:
if profile.server_id:
return profile
self.ensure_authenticated()
payload = {
"name": profile.name,
"folders": profile.folders,
"description": None,
"schedule_enabled": profile.schedule_enabled,
}
response = self._session.post(
f"{self.config.server_url}/profiles", headers=self._headers(), json=payload, timeout=10
)
if response.status_code not in (200, 201):
raise ApiClientError("Unable to create profile on server")
data = response.json()
profile.server_id = data.get("id")
save_config(self.config)
return profile
def start_backup(
self,
*,
profile: ProfileConfig,
folders: List[str],
client_host: str,
ssh_username: str,
ssh_password: str,
) -> int:
self.ensure_authenticated()
remote_profile = self.ensure_remote_profile(profile)
if not remote_profile.server_id:
raise ApiClientError("Profile missing server identifier")
payload: Dict[str, Any] = {
"profile_id": remote_profile.server_id,
"client_host": client_host,
"ssh_username": ssh_username,
"ssh_password": ssh_password,
"folders": folders,
}
response = self._session.post(
f"{self.config.server_url}/backup/start",
headers=self._headers(),
json=payload,
timeout=20,
)
if response.status_code != 200:
raise ApiClientError(f"Backup start failed: {response.text}")
return response.json().get("job_id")
def job_status(self, job_id: int) -> Dict[str, Any]:
response = self._session.get(
f"{self.config.server_url}/backup/status/{job_id}", headers=self._headers(), timeout=10
)
if response.status_code != 200:
raise ApiClientError("Unable to fetch job status")
return response.json()
def job_log(self, job_id: int) -> List[str]:
response = self._session.get(
f"{self.config.server_url}/backup/log/{job_id}", headers=self._headers(), timeout=10
)
if response.status_code != 200:
raise ApiClientError("Unable to fetch job log")
return response.json().get("lines", [])

110
client/src/config.py Normal file
View File

@@ -0,0 +1,110 @@
from __future__ import annotations
import json
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import List, Optional
CONFIG_PATH = Path(sys.argv[0]).resolve().parent / "config.json"
def _ensure_config_path() -> Path:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
return CONFIG_PATH
@dataclass
class ProfileConfig:
name: str
folders: List[str]
schedule_enabled: bool = False
server_id: Optional[int] = None
@dataclass
class AppConfig:
server_url: str = "https://backup-server.local:8443"
api_user: str = "backup-admin"
token: Optional[str] = None
token_expires_at: Optional[str] = None
profiles: List[ProfileConfig] = field(default_factory=list)
active_profile: Optional[str] = None
scheduler_enabled: bool = False
@property
def active_profile_obj(self) -> Optional[ProfileConfig]:
if not self.active_profile:
return None
for profile in self.profiles:
if profile.name == self.active_profile:
return profile
return None
def to_dict(self) -> dict:
payload = asdict(self)
payload["profiles"] = [asdict(profile) for profile in self.profiles]
return payload
def load_config() -> AppConfig:
path = _ensure_config_path()
if not path.exists():
default = AppConfig()
save_config(default)
return default
raw = json.loads(path.read_text(encoding="utf-8"))
profiles = [ProfileConfig(**profile) for profile in raw.get("profiles", [])]
return AppConfig(
server_url=raw.get("server_url", "https://backup-server.local:8443"),
api_user=raw.get("api_user", "backup-admin"),
token=raw.get("token"),
token_expires_at=raw.get("token_expires_at"),
profiles=profiles,
active_profile=raw.get("active_profile"),
scheduler_enabled=raw.get("scheduler_enabled", False),
)
def save_config(config: AppConfig) -> None:
path = _ensure_config_path()
path.write_text(json.dumps(config.to_dict(), indent=2), encoding="utf-8")
def clear_token(config: AppConfig) -> None:
config.token = None
config.token_expires_at = None
save_config(config)
def is_token_valid(config: AppConfig) -> bool:
if not config.token or not config.token_expires_at:
return False
try:
expires = datetime.fromisoformat(config.token_expires_at)
except ValueError:
return False
return expires > datetime.utcnow()
def update_token(config: AppConfig, token: str, expires_at: datetime) -> None:
config.token = token
config.token_expires_at = expires_at.isoformat()
save_config(config)
def find_profile(config: AppConfig, name: str) -> Optional[ProfileConfig]:
for profile in config.profiles:
if profile.name == name:
return profile
return None
def ensure_profile(config: AppConfig, profile: ProfileConfig) -> ProfileConfig:
existing = find_profile(config, profile.name)
if existing:
return existing
config.profiles.append(profile)
save_config(config)
return profile

76
client/src/scheduler.py Normal file
View File

@@ -0,0 +1,76 @@
from __future__ import annotations
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Iterable
class SchedulerError(Exception):
"""Raised when scheduler helper fails."""
def _sanitize(name: str) -> str:
return "".join(ch for ch in name if ch.isalnum() or ch in "-_").strip() or "default"
def _script_path() -> Path:
"""Return the path to the helper PowerShell script relative to the executable."""
base = Path(sys.argv[0]).resolve().parent
return base / "scheduler" / "manage_scheduler.ps1"
def _powershell_args(
action: str,
task_name: str,
executable: Path,
profile_name: str,
trigger: str = "ONLOGON",
start_time: str = "02:00",
) -> list[str]:
return [
shutil.which("powershell"),
"-ExecutionPolicy",
"Bypass",
"-File",
str(_script_path()),
"-Action",
action,
"-TaskName",
task_name,
"-ExecutablePath",
str(executable),
"-ProfileName",
profile_name,
"-Trigger",
trigger,
"-StartTime",
start_time,
]
def _run(action: str, task_name: str, profile_name: str, executable: Path) -> str:
script = _script_path()
if not script.exists():
raise SchedulerError(f"Script helpers not found at {script}")
ps_cmd = _powershell_args(action, task_name, executable, profile_name)
if not ps_cmd[0]:
raise SchedulerError("PowerShell not found in PATH")
result = subprocess.run(
[arg for arg in ps_cmd if arg], # filter None
capture_output=True,
text=True,
)
output = result.stdout.strip() or result.stderr.strip()
if result.returncode != 0:
raise SchedulerError(output or "Errore sconosciuto durante la configurazione dello scheduler")
return output
def enable_scheduler(task_name: str, profile_name: str, executable: Path) -> str:
return _run("Enable", task_name, _sanitize(profile_name), executable)
def disable_scheduler(task_name: str, profile_name: str, executable: Path) -> str:
return _run("Disable", task_name, _sanitize(profile_name), executable)

382
client/src/ui.py Normal file
View File

@@ -0,0 +1,382 @@
from __future__ import annotations
import sys
import time
from pathlib import Path
from typing import List, Optional
from PySide6.QtCore import QThread, Signal
from PySide6.QtWidgets import (
QApplication,
QCheckBox,
QFormLayout,
QGridLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QMainWindow,
QMessageBox,
QPushButton,
QPlainTextEdit,
QProgressBar,
QVBoxLayout,
QWidget,
QFileDialog,
QComboBox,
QInputDialog,
)
from .api_client import ApiClient, ApiClientError
from .config import (
AppConfig,
ProfileConfig,
clear_token,
find_profile,
is_token_valid,
load_config,
save_config,
)
from .scheduler import SchedulerError, disable_scheduler, enable_scheduler
class BackupWorker(QThread):
status_update = Signal(dict)
log_update = Signal(list)
completed = Signal(bool, str)
def __init__(
self,
api_client: ApiClient,
profile: ProfileConfig,
folders: List[str],
client_host: str,
ssh_username: str,
ssh_password: str,
) -> None:
super().__init__()
self.api_client = api_client
self.profile = profile
self.folders = folders
self.client_host = client_host
self.ssh_username = ssh_username
self.ssh_password = ssh_password
def run(self) -> None:
try:
job_id = self.api_client.start_backup(
profile=self.profile,
folders=self.folders,
client_host=self.client_host,
ssh_username=self.ssh_username,
ssh_password=self.ssh_password,
)
except ApiClientError as exc:
self.completed.emit(False, str(exc))
return
status: Optional[dict] = None
while True:
try:
status = self.api_client.job_status(job_id)
except ApiClientError as exc:
self.completed.emit(False, str(exc))
return
self.status_update.emit(status)
self.log_update.emit(status.get("last_log_lines", []))
state = status.get("status")
if state in ("COMPLETED", "FAILED"):
break
time.sleep(2)
try:
final_log = self.api_client.job_log(job_id)
self.log_update.emit(final_log)
except ApiClientError:
pass
summary = status.get("summary") if status else ""
success = status and status.get("status") == "COMPLETED"
self.completed.emit(success, summary or ("Backup completato" if success else "Backup fallito"))
class MainWindow(QMainWindow):
def __init__(self, config: AppConfig) -> None:
super().__init__()
self.config = config
self.api_client = ApiClient(config)
self.worker: Optional[BackupWorker] = None
self._log_buffer: List[str] = []
self._setup_ui()
self._load_profiles()
self._sync_scheduler()
self._update_run_state()
def _setup_ui(self) -> None:
self.setWindowTitle("Backup Client Windows")
central = QWidget()
self.setCentralWidget(central)
layout = QVBoxLayout(central)
profile_layout = QGridLayout()
profile_layout.addWidget(QLabel("Profilo:"), 0, 0)
self.profile_combo = QComboBox()
profile_layout.addWidget(self.profile_combo, 0, 1)
profile_layout.addWidget(QLabel("Nome:"), 1, 0)
self.profile_line = QLineEdit()
profile_layout.addWidget(self.profile_line, 1, 1)
self.save_profile_btn = QPushButton("Salva profilo")
profile_layout.addWidget(self.save_profile_btn, 2, 0)
self.delete_profile_btn = QPushButton("Elimina profilo")
profile_layout.addWidget(self.delete_profile_btn, 2, 1)
layout.addLayout(profile_layout)
folders_layout = QHBoxLayout()
self.folders_list = QListWidget()
folders_layout.addWidget(self.folders_list)
folders_btn_layout = QVBoxLayout()
self.add_folder_btn = QPushButton("Aggiungi cartella")
self.remove_folder_btn = QPushButton("Rimuovi cartella")
folders_btn_layout.addWidget(self.add_folder_btn)
folders_btn_layout.addWidget(self.remove_folder_btn)
folders_layout.addLayout(folders_btn_layout)
layout.addLayout(folders_layout)
creds_layout = QFormLayout()
self.client_host_input = QLineEdit()
self.ssh_user_input = QLineEdit()
self.ssh_password_input = QLineEdit()
self.ssh_password_input.setEchoMode(QLineEdit.Password)
creds_layout.addRow("Host client:", self.client_host_input)
creds_layout.addRow("SSH user:", self.ssh_user_input)
creds_layout.addRow("SSH password:", self.ssh_password_input)
layout.addLayout(creds_layout)
self.scheduler_checkbox = QCheckBox("Attiva Task Scheduler locale")
layout.addWidget(self.scheduler_checkbox)
self.run_button = QPushButton("Esegui backup")
layout.addWidget(self.run_button)
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
layout.addWidget(self.progress_bar)
self.status_label = QLabel("Pronto")
layout.addWidget(self.status_label)
self.log_output = QPlainTextEdit()
self.log_output.setReadOnly(True)
layout.addWidget(self.log_output)
self.profile_combo.currentTextChanged.connect(self._on_profile_changed)
self.save_profile_btn.clicked.connect(self.save_profile)
self.delete_profile_btn.clicked.connect(self.delete_profile)
self.add_folder_btn.clicked.connect(self.add_folder)
self.remove_folder_btn.clicked.connect(self.remove_folder)
self.folders_list.itemSelectionChanged.connect(self._update_run_state)
self.scheduler_checkbox.toggled.connect(self._on_scheduler_toggled)
self.run_button.clicked.connect(self.run_backup)
def _load_profiles(self) -> None:
self.profile_combo.blockSignals(True)
self.profile_combo.clear()
for profile in self.config.profiles:
self.profile_combo.addItem(profile.name)
self.profile_combo.blockSignals(False)
target = self.config.active_profile
if target:
index = self.profile_combo.findText(target)
if index >= 0:
self.profile_combo.setCurrentIndex(index)
profile = find_profile(self.config, target)
if profile:
self._apply_profile(profile)
elif self.config.profiles:
first = self.config.profiles[0]
self.profile_combo.setCurrentIndex(0)
self._apply_profile(first)
def _apply_profile(self, profile: ProfileConfig) -> None:
self.profile_line.setText(profile.name)
self.folders_list.clear()
for folder in profile.folders:
self.folders_list.addItem(folder)
self.config.active_profile = profile.name
save_config(self.config)
self._update_run_state()
def _sync_scheduler(self) -> None:
self.scheduler_checkbox.setChecked(self.config.scheduler_enabled)
def _on_scheduler_toggled(self, state: bool) -> None:
self.config.scheduler_enabled = state
task_name = f"BackupClient-{self.profile_line.text().strip() or 'default'}"
exe_path = Path(sys.argv[0]).resolve()
prev_state = self.config.scheduler_enabled
profile_name = self.profile_line.text().strip() or "default"
try:
message = (
enable_scheduler(task_name, profile_name, exe_path) if state else disable_scheduler(task_name, profile_name, exe_path)
)
self.config.scheduler_enabled = state
save_config(self.config)
self.status_label.setText(message)
except SchedulerError as exc:
QMessageBox.warning(self, "Scheduler", str(exc))
self.scheduler_checkbox.blockSignals(True)
self.scheduler_checkbox.setChecked(prev_state)
self.scheduler_checkbox.blockSignals(False)
self.status_label.setText("Scheduler non aggiornato")
def _on_profile_changed(self, name: str) -> None:
profile = find_profile(self.config, name)
if profile:
self._apply_profile(profile)
else:
self.profile_line.clear()
self.folders_list.clear()
self.config.active_profile = None
save_config(self.config)
self._update_run_state()
def add_folder(self) -> None:
folder = QFileDialog.getExistingDirectory(self, "Seleziona cartella")
if folder:
if folder not in self._current_folders():
self.folders_list.addItem(folder)
self._update_run_state()
def remove_folder(self) -> None:
selected = self.folders_list.selectedItems()
for item in selected:
self.folders_list.takeItem(self.folders_list.row(item))
self._update_run_state()
def _current_folders(self) -> List[str]:
return [self.folders_list.item(i).text() for i in range(self.folders_list.count())]
def _update_run_state(self) -> None:
has_folders = bool(self._current_folders())
self.run_button.setEnabled(has_folders)
def save_profile(self) -> None:
name = self.profile_line.text().strip()
if not name:
QMessageBox.warning(self, "Profilo", "Inserisci un nome profilo")
return
folders = self._current_folders()
if not folders:
QMessageBox.warning(self, "Profilo", "Seleziona almeno una cartella")
return
profile = find_profile(self.config, name)
if profile:
profile.folders = folders
profile.schedule_enabled = self.scheduler_checkbox.isChecked()
else:
profile = ProfileConfig(name=name, folders=folders, schedule_enabled=self.scheduler_checkbox.isChecked())
self.config.profiles.append(profile)
self.config.active_profile = name
save_config(self.config)
self._load_profiles()
self.status_label.setText(f"Profilo '{name}' salvato")
def delete_profile(self) -> None:
name = self.profile_line.text().strip()
profile = find_profile(self.config, name)
if not profile:
return
self.config.profiles.remove(profile)
if self.config.active_profile == name:
self.config.active_profile = None
save_config(self.config)
self._load_profiles()
self.status_label.setText(f"Profilo '{name}' eliminato")
def run_backup(self) -> None:
if self.worker and self.worker.isRunning():
return
if not is_token_valid(self.config):
if not self._prompt_login():
return
profile_name = self.profile_line.text().strip()
if not profile_name:
QMessageBox.warning(self, "Backup", "Inserisci un nome profilo")
return
folders = self._current_folders()
if not folders:
QMessageBox.warning(self, "Backup", "Seleziona almeno una cartella")
return
client_host = self.client_host_input.text().strip()
ssh_user = self.ssh_user_input.text().strip()
ssh_password = self.ssh_password_input.text().strip()
if not client_host or not ssh_user or not ssh_password:
QMessageBox.warning(self, "Backup", "Inserire host e credenziali SSH")
return
profile = find_profile(self.config, profile_name)
if not profile:
profile = ProfileConfig(name=profile_name, folders=folders, schedule_enabled=self.scheduler_checkbox.isChecked())
self.config.profiles.append(profile)
else:
profile.folders = folders
profile.schedule_enabled = self.scheduler_checkbox.isChecked()
self.config.active_profile = profile_name
save_config(self.config)
self.worker = BackupWorker(
api_client=self.api_client,
profile=profile,
folders=folders,
client_host=client_host,
ssh_username=ssh_user,
ssh_password=ssh_password,
)
self.worker.status_update.connect(self._handle_status)
self.worker.log_update.connect(self._handle_log)
self.worker.completed.connect(self._handle_completion)
self.run_button.setEnabled(False)
self.progress_bar.setValue(0)
self.status_label.setText("Avvio backup...")
self.worker.start()
def _handle_status(self, data: dict) -> None:
progress = data.get("progress", 0)
status = data.get("status", "")
self.progress_bar.setValue(progress)
self.status_label.setText(status)
def _handle_log(self, lines: List[str]) -> None:
if not lines:
return
self._log_buffer.extend(lines)
self._log_buffer = self._log_buffer[-500:]
self.log_output.setPlainText("\n".join(self._log_buffer))
self.log_output.verticalScrollBar().setValue(self.log_output.verticalScrollBar().maximum())
def _handle_completion(self, success: bool, message: str) -> None:
self.run_button.setEnabled(True)
self.status_label.setText(message)
QMessageBox.information(self, "Backup", "Backup completato" if success else f"Errore: {message}")
def _prompt_login(self) -> bool:
password, ok = QInputDialog.getText(self, "Accesso API", "Password API", QLineEdit.Password)
if not ok:
return False
try:
self.api_client.login(password)
self.status_label.setText("Autenticato")
return True
except ApiClientError as exc:
QMessageBox.warning(self, "Accesso", str(exc))
clear_token(self.config)
return False
def main() -> None:
app = QApplication(sys.argv)
window = MainWindow(load_config())
window.show()
app.exec()
if __name__ == "__main__":
main()