from lib.kanboard_api import KanboardAPI from lib.base_migrator import BaseMigrator import os class UsersMigrator(BaseMigrator): """Миграция и удаление пользователей в Kanboard.""" def __init__(self, source_api: KanboardAPI, target_api: KanboardAPI, user_mapping_file: str = "user_mapping.json"): self.source_api = source_api self.target_api = target_api self.user_mapping_file = user_mapping_file self.user_mapping = self._load_mapping(self.user_mapping_file) # === Миграция === def migrate(self): print("Начало миграции пользователей...") source_users = self.source_api.call("getAllUsers") or [] target_users = self.target_api.call("getAllUsers") or [] existing_usernames = {u["username"] for u in target_users} temp_password = os.getenv("TEMP_USER_PASSWORD") for user in source_users: source_id = int(user["id"]) username = user["username"] name = user.get("name") or "" email = user.get("email") or "" role = user.get("role") or "app-user" if username in existing_usernames: print(f"Пользователь '{username}' уже существует, пропускаем") continue password = temp_password if temp_password else username try: new_id = self.target_api.call("createUser", { "username": username, "password": password, "name": name, "email": email, "role": role }) if new_id: self.user_mapping[source_id] = new_id print(f"Пользователь '{username}' ({source_id} -> {new_id}) создан") except Exception as e: print(f"[{type(e).__name__}] Ошибка при переносе '{username}': {e}") self._save_mapping(self.user_mapping_file, self.user_mapping) print(f"Миграция пользователей завершена. Маппинг сохранён в {self.user_mapping_file}") # === Удаление === def delete_all(self, exclude_admin: bool = True, **kwargs): print("Начало удаления пользователей...") all_users = self.target_api.call("getAllUsers") or [] for user in all_users: user_id = int(user["id"]) username = user["username"] if exclude_admin and username == "admin": continue try: ok = self.target_api.call("removeUser", {"user_id": user_id}) if ok: print(f"Пользователь '{username}' (ID {user_id}) удалён") else: print(f"Пользователь '{username}' (ID {user_id}) не удалён") except Exception as e: print(f"[{type(e).__name__}] Ошибка при удалении '{username}': {e}") if hasattr(self, 'user_mapping_file'): self._delete_mapping_file(self.user_mapping_file) if hasattr(self, 'user_mapping'): self.user_mapping.clear() print("Удаление пользователей завершено.")