commit e049a302d3e58ef807e04cd4b4b856449f327fa5 Author: y9938 Date: Sat Oct 18 00:52:08 2025 +0300 Initial commit diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..bc23fa0 --- /dev/null +++ b/.env.example @@ -0,0 +1,15 @@ +# Source Server +SOURCE_URL="https://example.com/jsonrpc.php" +SOURCE_USER="jsonrpc" +SOURCE_TOKEN="SECRET" + +# Target Server +TARGET_URL="https://example.com/jsonrpc.php" +TARGET_USER="jsonrpc" +TARGET_TOKEN="SECRET" + +# Optional +TEMP_USER_PASSWORD= +# Если не задать TEMP_USER_PASSWORD, который всем пользователям поставит +# одинаковый пароль, то автоматически пароль будет равен username +# например user62 пароль: user62 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a7a0acd --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +**/__pycache__/** +venv/ +.venv/ +.env + +user_mapping.json +project_mapping.json +tag_mapping.json +column_mapping.json diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e9d9de0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +MIT License + +Copyright (c) 2025 y9938 + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +associated documentation files (the "Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the +following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial +portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT +LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO +EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f4bf018 --- /dev/null +++ b/README.md @@ -0,0 +1,41 @@ +# Kanboard API + +> Взаимодействие с API, миграция с одного instance на другой. + +> Для вас миграция может быть не идеальная. + +> По документации https://docs.kanboard.org/v1/api/ + +Код затрагивает работу с API Reference по документации: + +- Column API Procedures +- Project API Procedures +- Project File API Procedures +- Project Metadata API Procedures +- Project Permission API Procedures +- Subtask API Procedures +- Tags API Procedures +- Task API Procedures +- Task File API Procedures +- Task Metadata API Procedures +- User API Procedures + +## Начальные шаги + +```bash +cp .env.example .env +chmod +x main.py +``` +Можно запустить main.py и другим способом без `uv`. + +--- + +### Демонстрация + +![help](assets/help.png) + +![run full migration](assets/full_migration.png) +- Без указания COMMAND выполняется полная миграция для users, projects, tags и tasks + +![interactive mode](assets/interactive.png) + diff --git a/assets/full_migration.png b/assets/full_migration.png new file mode 100644 index 0000000..5b53ee3 Binary files /dev/null and b/assets/full_migration.png differ diff --git a/assets/help.png b/assets/help.png new file mode 100644 index 0000000..1270cf5 Binary files /dev/null and b/assets/help.png differ diff --git a/assets/interactive.png b/assets/interactive.png new file mode 100644 index 0000000..c90a8f9 Binary files /dev/null and b/assets/interactive.png differ diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/base_migrator.py b/lib/base_migrator.py new file mode 100644 index 0000000..20d98f7 --- /dev/null +++ b/lib/base_migrator.py @@ -0,0 +1,78 @@ +import json +import os +from typing import Dict, Any +from abc import abstractmethod + +class BaseMigrator: + """Базовый класс для миграторов с общими методами работы с файлами маппинга.""" + + def _load_mapping(self, filename: str) -> Dict[int, int]: + """Загружает маппинг из JSON файла.""" + if os.path.exists(filename): + with open(filename, "r", encoding="utf-8") as f: + data = json.load(f) + return {int(k): int(v) for k, v in data.items()} + return {} + + def _load_nested_mapping(self, filename: str) -> Dict[Any, Any]: + """Загрузка вложенного маппинга из JSON-файла (для колонок).""" + if os.path.exists(filename): + with open(filename, 'r') as f: + data = json.load(f) + + # Для вложенного маппинга колонок: {project_id: {src_col_id: target_col_id}} + result = {} + for project_id_str, column_map in data.items(): + project_id = int(project_id_str) + result[project_id] = {} + for src_col_str, target_col_id in column_map.items(): + result[project_id][int(src_col_str)] = int(target_col_id) + return result + return {} + + def _save_mapping(self, filename: str, mapping: Dict[int, int]): + """Сохраняет маппинг в JSON файл.""" + with open(filename, "w", encoding="utf-8") as f: + json.dump(mapping, f, ensure_ascii=False, indent=2) + + def _save_nested_mapping(self, filename: str, mapping: Dict[Any, Any]): + """Сохранение вложенного маппинга в JSON-файл.""" + # Преобразуем в формат, который можно сериализовать в JSON + serializable_mapping = {} + for project_id, column_map in mapping.items(): + serializable_mapping[str(project_id)] = {str(k): v for k, v in column_map.items()} + + with open(filename, 'w') as f: + json.dump(serializable_mapping, f, indent=2) + + def _delete_mapping_file(self, filename: str) -> bool: + """ + Удаляет файл маппинга, если он существует. + + Args: + filename: Путь к файлу маппинга. + Returns: + bool: True, если файл удален или не существует, False в случае ошибки. + """ + try: + if filename and os.path.exists(filename): + os.remove(filename) + print(f"Файл маппинга {filename} удалён") + return True + else: + print("Файл маппинга не найден или не задан") + return True + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при удалении файла маппинга: {e}") + return False + + + @abstractmethod + def migrate(self): + """Абстрактный метод для миграции данных.""" + pass + + @abstractmethod + def delete_all(self): + """Абстрактный метод для удаления всех данных.""" + pass diff --git a/lib/interactive.py b/lib/interactive.py new file mode 100644 index 0000000..d3eb81b --- /dev/null +++ b/lib/interactive.py @@ -0,0 +1,90 @@ +import json +from lib.kanboard_api import KanboardAPI + + +class KanboardInteractive: + """Позволяет выполнять произвольные JSON-RPC запросы через KanboardAPI.""" + + def __init__(self, api: KanboardAPI): + self.api = api + + def parse_simple_syntax(self, line): + """Парсит упрощенный синтаксис: метод param1=value1 param2=value2""" + parts = line.strip().split() + if not parts: + return None, None + + method = parts[0] + params = {} + + for part in parts[1:]: + # Парсим key=value + if '=' in part: + key, value = part.split('=', 1) + # Пробуем преобразовать значение в число, если возможно + try: + value = int(value) + except ValueError: + try: + value = float(value) + except ValueError: + # Оставляем как строку + pass + params[key] = value + else: + # Если нет '=', считаем позиционным параметром с числовым ключом + try: + value = int(part) + except ValueError: + value = part + params[str(len(params))] = value + + return method, params + + def convert_to_array_params(self, params): + """Конвертирует словарь параметров в массив, если ключи числовые""" + if all(key.isdigit() for key in params.keys()): + # Сортируем по числовым ключам и возвращаем массив значений + return [params[str(i)] for i in range(len(params))] + return params + + def run(self): + print("Введите метод Kanboard API и параметры.") + print("Форматы:") + print(' JSON: {"method": "getProjectById", "params": {"project_id": 1}}') + print(' Простой: getProjectUsers project_id=32') + print(' Позиционный: getProjectUserRole 32 8') + print("Для выхода введите 'exit'") + + while True: + line = input("Kanboard> ").strip() + if line.lower() in ("exit", "quit"): + break + if not line: + continue + + # Определяем формат ввода + if line.startswith('{'): + # JSON формат + try: + data = json.loads(line) + method = data.get("method") + params = data.get("params", {}) + except json.JSONDecodeError as e: + print(f"Ошибка JSON: {e}") + continue + else: + # Упрощенный формат + method, params = self.parse_simple_syntax(line) + if not method: + print("Не указан метод.") + continue + # Для методов, которые ожидают массив параметров, конвертируем + params = self.convert_to_array_params(params) + + try: + result = self.api.call(method, params) + print(json.dumps(result, ensure_ascii=False, indent=2)) + except Exception as e: + print(f"Ошибка API: {e}") + diff --git a/lib/kanboard_api.py b/lib/kanboard_api.py new file mode 100755 index 0000000..0b0a12c --- /dev/null +++ b/lib/kanboard_api.py @@ -0,0 +1,46 @@ +import json +import requests +from typing import Any, Dict, Tuple + +class KanboardAPI: + """Базовый класс для работы с Kanboard через JSON-RPC.""" + + def __init__(self, url: str, user: str, token: str): + self.url: str = url + self.auth: Tuple[str, str] = (user, token) + self.request_id: int = 0 + + def call(self, method: str, params: Any = None) -> Any: + """Выполнить JSON-RPC запрос с кодировкой UTF-8.""" + self.request_id += 1 + payload: Dict[str, Any] = { + "jsonrpc": "2.0", + "method": method, + "id": self.request_id, + "params": params or {} + } + + try: + json_data: bytes = json.dumps(payload, ensure_ascii=False).encode('utf-8') + except Exception as e: + raise ValueError(f"Ошибка кодирования JSON: {e}") + + headers = {'Content-Type': 'application/json; charset=utf-8'} + + try: + response = requests.post(self.url, data=json_data, auth=self.auth, headers=headers) + response.raise_for_status() + response.encoding = 'utf-8' + except requests.exceptions.RequestException as e: + raise ConnectionError(f"Сетевая ошибка при вызове API {method}: {e}") + + try: + result = response.json() + except json.JSONDecodeError as e: + raise ValueError(f"Ошибка декодирования JSON: {e}. Ответ: {response.text[:100]}...") + + if 'error' in result: + raise Exception(f"API Error: {result['error']}") + + return result.get('result') + diff --git a/lib/projects.py b/lib/projects.py new file mode 100644 index 0000000..3789a29 --- /dev/null +++ b/lib/projects.py @@ -0,0 +1,408 @@ +from typing import Any, Dict, Optional, List +from lib.kanboard_api import KanboardAPI +from lib.base_migrator import BaseMigrator + + +class ProjectsMigrator(BaseMigrator): + """Миграция и удаление проектов в Kanboard.""" + + def __init__( + self, + source_api: KanboardAPI, + target_api: KanboardAPI, + project_mapping_file: str = "project_mapping.json", + user_mapping_file: str = "user_mapping.json", + column_mapping_file: str = "column_mapping.json" + ): + self.source_api = source_api + self.target_api = target_api + self.project_mapping_file = project_mapping_file + self.user_mapping_file = user_mapping_file + self.column_mapping_file = column_mapping_file + self.project_mapping = self._load_mapping(self.project_mapping_file) + self.user_mapping = self._load_mapping(self.user_mapping_file) + self.column_mapping = self._load_nested_mapping(self.column_mapping_file) + + # === Helpers === + def _create_or_find_project(self, src_proj: Dict[str, Any]) -> Optional[int]: + name = src_proj.get("name") or "" + identifier = src_proj.get("identifier") or "" + description = src_proj.get("description") or "" + owner_id = src_proj.get("owner_id") + email = src_proj.get("email") + + target_proj = None + found = False + if identifier: + try: + target_proj = self.target_api.call("getProjectByIdentifier", {"identifier": identifier}) + except Exception: + target_proj = None + if not target_proj and name: + try: + target_proj = self.target_api.call("getProjectByName", {"name": name}) + except Exception: + target_proj = None + if target_proj: + try: + found = True + return int(target_proj["id"]), found + except Exception: + return None, found + + create_params: Dict[str, Any] = {"name": name} + if description is not None: + create_params["description"] = description + if identifier: + create_params["identifier"] = identifier + if email: + create_params["email"] = email + + if owner_id: + try: + mapped_owner = self.user_mapping.get(int(owner_id)) + if mapped_owner: + create_params["owner_id"] = mapped_owner + except Exception: + pass + + try: + new_id = self.target_api.call("createProject", create_params) + if new_id: + return int(new_id), False + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при createProject '{name}': {e}") + return None, found + + + def _migrate_columns(self, src_project_id: int, target_project_id: int): + """Миграция колонок - создаем такие же как в source и сохраняем маппинг.""" + try: + src_columns = self.source_api.call("getColumns", [src_project_id]) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить колонки {src_project_id}: {e}") + return + + try: + target_columns = self.target_api.call("getColumns", [target_project_id]) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить колонки target-проекта {target_project_id}: {e}") + target_columns = [] + + # Инициализируем маппинг колонок для этого проекта + if src_project_id not in self.column_mapping: + self.column_mapping[src_project_id] = {} + + project_column_mapping = self.column_mapping[src_project_id] + created_columns: List[int] = [] + existing_titles = {col.get("title", "").lower(): col for col in target_columns} + + for src_col in sorted(src_columns, key=lambda c: int(c.get("position", 0))): + src_col_id = int(src_col.get("id")) + title = src_col.get("title", "") + task_limit = None + try: + tl = int(src_col.get("task_limit", 0)) + if tl > 0: + task_limit = tl + except Exception: + pass + description = src_col.get("description") + + # Проверяем, есть ли колонка с таким названием + if title.lower() in existing_titles: + # Колонка уже существует, находим ее ID и сохраняем маппинг + target_col = existing_titles[title.lower()] + target_col_id = int(target_col.get("id")) + project_column_mapping[src_col_id] = target_col_id + created_columns.append(target_col_id) + print(f"Колонка '{title}' уже существует ({src_col_id} -> {target_col_id})") + continue + + # Создаем новую колонку + params = {"project_id": target_project_id, "title": title} + if task_limit is not None: + params["task_limit"] = task_limit + if description: + params["description"] = description + + try: + new_col_id = self.target_api.call("addColumn", params) + if new_col_id: + new_col_id_int = int(new_col_id) + created_columns.append(new_col_id_int) + # Сохраняем маппинг новой колонки + project_column_mapping[src_col_id] = new_col_id_int + print(f"Добавлена колонка '{title}' ({src_col_id} -> {new_col_id_int})") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при addColumn '{title}': {e}") + + # Обновляем позиции колонок + for position, col_id in enumerate(created_columns, start=1): + try: + ok = self.target_api.call("changeColumnPosition", [target_project_id, col_id, position]) + if not ok: + print(f"Не удалось установить позицию {position} для колонки {col_id}") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при changeColumnPosition {col_id}: {e}") + + # Сохраняем маппинг колонок после обработки всех колонок проекта + self._save_nested_mapping(self.column_mapping_file, self.column_mapping) + + def _migrate_columns(self, src_project_id: int, target_project_id: int): + """Миграция колонок - создаем такие же как в source и сохраняем маппинг.""" + try: + src_columns = self.source_api.call("getColumns", [src_project_id]) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить колонки {src_project_id}: {e}") + return + + try: + target_columns = self.target_api.call("getColumns", [target_project_id]) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить колонки target-проекта {target_project_id}: {e}") + target_columns = [] + + # Инициализируем маппинг колонок для этого проекта + if str(src_project_id) not in self.column_mapping: + self.column_mapping[str(src_project_id)] = {} + + project_column_mapping = self.column_mapping[str(src_project_id)] + created_columns: List[int] = [] + existing_titles = {col.get("title", "").lower(): col for col in target_columns} + + for src_col in sorted(src_columns, key=lambda c: int(c.get("position", 0))): + src_col_id = src_col.get("id") + title = src_col.get("title", "") + task_limit = None + try: + tl = int(src_col.get("task_limit", 0)) + if tl > 0: + task_limit = tl + except Exception: + pass + description = src_col.get("description") + + # Проверяем, есть ли колонка с таким названием + if title.lower() in existing_titles: + # Колонка уже существует, находим ее ID и сохраняем маппинг + target_col = existing_titles[title.lower()] + target_col_id = target_col.get("id") + project_column_mapping[str(src_col_id)] = target_col_id + created_columns.append(int(target_col_id)) + print(f"Колонка '{title}' уже существует ({src_col_id} -> {target_col_id})") + continue + + # Создаем новую колонку + params = {"project_id": target_project_id, "title": title} + if task_limit is not None: + params["task_limit"] = task_limit + if description: + params["description"] = description + + try: + new_col_id = self.target_api.call("addColumn", params) + if new_col_id: + new_col_id_int = int(new_col_id) + created_columns.append(new_col_id_int) + # Сохраняем маппинг новой колонки + project_column_mapping[str(src_col_id)] = new_col_id_int + print(f"Добавлена колонка '{title}' ({src_col_id} -> {new_col_id_int})") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при addColumn '{title}': {e}") + + # Обновляем позиции колонок + for position, col_id in enumerate(created_columns, start=1): + try: + ok = self.target_api.call("changeColumnPosition", [target_project_id, col_id, position]) + if not ok: + print(f"Не удалось установить позицию {position} для колонки {col_id}") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при changeColumnPosition {col_id}: {e}") + + # Сохраняем маппинг колонок после обработки всех колонок проекта + self._save_mapping(self.column_mapping_file, self.column_mapping) + def _migrate_project_users(self, src_project_id: int, target_project_id: int): + try: + src_members = self.source_api.call("getProjectUsers", [src_project_id]) or {} + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить пользователей проекта {src_project_id}: {e}") + src_members = {} + + for src_uid_str, username in list(src_members.items()): + try: + src_uid = int(src_uid_str) + except Exception: + continue + try: + role = self.source_api.call("getProjectUserRole", [src_project_id, src_uid]) + except Exception: + role = None + + target_uid = self.user_mapping.get(src_uid) + if not target_uid: + print(f" Пропускаем пользователя '{username}' ({src_uid}) — нет в user_mapping") + continue + + try: + ok = self.target_api.call( + "addProjectUser", + [target_project_id, target_uid, role] if role else [target_project_id, target_uid] + ) + if ok: + print(f" Добавлен пользователь '{username}' -> target_id {target_uid} роль='{role}'") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при addProjectUser '{username}': {e}") + + def _migrate_project_files(self, src_project_id: int, target_project_id: int): + """Миграция файлов проекта.""" + try: + files = self.source_api.call("getAllProjectFiles", {"project_id": src_project_id}) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить файлы проекта {src_project_id}: {e}") + return + + for f in files: + file_id = int(f.get("id")) + filename = f.get("name") + try: + data_b64 = self.source_api.call("downloadProjectFile", [src_project_id, file_id]) + if not data_b64: + print(f" Пропущен файл '{filename}' — пустой контент") + continue + new_id = self.target_api.call("createProjectFile", [target_project_id, filename, data_b64]) + if new_id: + print(f" Файл '{filename}' перенесён ({file_id} -> {new_id})") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}") + + def _apply_project_settings(self, src_project_id: int, target_project_id: int): + try: + src_info = self.source_api.call("getProjectById", {"project_id": src_project_id}) or {} + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить info проекта {src_project_id}: {e}") + return + + try: + if str(src_info.get("is_active", "1")) == "1": + self.target_api.call("enableProject", [target_project_id]) + else: + self.target_api.call("disableProject", [target_project_id]) + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при установке is_active {target_project_id}: {e}") + + try: + if str(src_info.get("is_public", "0")) == "1": + self.target_api.call("enableProjectPublicAccess", [target_project_id]) + else: + self.target_api.call("disableProjectPublicAccess", [target_project_id]) + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при установке public access {target_project_id}: {e}") + + updatable = {} + for fld in ( + "start_date", "end_date", "priority_default", + "priority_start", "priority_end", "email", + "identifier", "description", "name" + ): + if fld in src_info and src_info.get(fld) is not None: + updatable[fld] = src_info.get(fld) + + if updatable: + updatable["project_id"] = target_project_id + try: + ok = self.target_api.call("updateProject", updatable) + if not ok: + print(f" Не удалось применить дополнительные настройки для проекта {target_project_id}") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при updateProject {target_project_id}: {e}") + + def _delete_project_files(self, project_id: int): + """Удаление всех файлов проекта перед удалением.""" + try: + files = self.target_api.call("getAllProjectFiles", {"project_id": project_id}) or [] + if files: + self.target_api.call("removeAllProjectFiles", {"project_id": project_id}) + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при удалении файлов проекта {project_id}: {e}") + + # === Public API === + def migrate(self): + print("Начало миграции проектов...") + try: + source_projects = self.source_api.call("getAllProjects") or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить список проектов: {e}") + source_projects = [] + + for proj in source_projects: + try: + src_id = int(proj.get("id")) + except Exception: + continue + target_id, found = self._create_or_find_project(proj) + if not target_id: + print(f"Не удалось создать или найти проект '{proj.get('name')}' ({src_id})") + continue + + self.project_mapping[src_id] = target_id + if found: + print(f"Проект '{proj.get('name')}' уже существует ({src_id} -> {target_id})") + else: + print(f"Проект '{proj.get('name')}' создан ({src_id} -> {target_id})") + + self._migrate_columns(src_id, target_id) + self._migrate_project_users(src_id, target_id) + self._migrate_project_files(src_id, target_id) + self._apply_project_settings(src_id, target_id) + + self._save_mapping(self.project_mapping_file, self.project_mapping) + self._save_mapping(self.column_mapping_file, self.column_mapping) + print(f"Миграция проектов завершена.") + print(f"Маппинг проектов сохранён в {self.project_mapping_file}") + print(f"Маппинг колонок сохранён в {self.column_mapping_file}") + + def delete_all(self, exclude_project_ids: Optional[List[int]] = None, **kwargs): + """ + Удаление всех проектов. + + Args: + exclude_project_ids: Список ID проектов, которые нужно исключить из удаления + """ + exclude_project_ids = set(exclude_project_ids or []) + print("Начало удаления проектов...") + try: + all_projects = self.target_api.call("getAllProjects") or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить проекты target: {e}") + return + + for p in all_projects: + try: + pid = int(p.get("id")) + except Exception: + continue + if pid in exclude_project_ids: + continue + try: + self._delete_project_files(pid) + ok = self.target_api.call("removeProject", {"project_id": pid}) + if ok: + print(f"Проект '{p.get('name')}' (ID {pid}) удалён") + else: + print(f"Проект '{p.get('name')}' (ID {pid}) не удалён") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при удалении проекта {pid}: {e}") + + if hasattr(self, 'project_mapping_file'): + self._delete_mapping_file(self.project_mapping_file) + if hasattr(self, 'column_mapping_file'): + self._delete_mapping_file(self.column_mapping_file) + + if hasattr(self, 'project_mapping_file'): + self._delete_mapping_file(self.project_mapping_file) + if hasattr(self, 'project_mapping'): + self.project_mapping.clear() + + print("Удаление проектов завершено.") + diff --git a/lib/tags.py b/lib/tags.py new file mode 100644 index 0000000..31a1684 --- /dev/null +++ b/lib/tags.py @@ -0,0 +1,85 @@ +from lib.kanboard_api import KanboardAPI +from lib.base_migrator import BaseMigrator + + +class TagsMigrator(BaseMigrator): + """Миграция и полное удаление тегов в Kanboard.""" + + def __init__( + self, + source_api: KanboardAPI, + target_api: KanboardAPI, + tag_mapping_file: str = "tag_mapping.json", + project_mapping_file: str = "project_mapping.json" + ): + self.source_api = source_api + self.target_api = target_api + self.tag_mapping_file = tag_mapping_file + self.project_mapping_file = project_mapping_file + self.tag_mapping = self._load_mapping(self.tag_mapping_file) + self.project_mapping = self._load_mapping(self.project_mapping_file) + + # === Миграция === + def migrate(self): + print("Начало миграции тегов...") + source_tags = self.source_api.call("getAllTags") or [] + + for tag in source_tags: + tag_id = int(tag["id"]) + tag_name = tag["name"] + project_id = int(tag["project_id"]) + + # Определяем target_project_id + if project_id == 0: + target_project_id = 0 # глобальный тег + else: + target_project_id = self.project_mapping.get(project_id) + if not target_project_id: + print(f"Проект исходного тега {project_id} не найден на целевом сервере, пропускаем тег '{tag_name}'") + continue + + # Проверяем, есть ли такой тег уже на целевом экземпляре + existing_tags = self.target_api.call("getTagsByProject", [target_project_id]) or [] + if any(t["name"] == tag_name for t in existing_tags): + continue + + try: + new_id = self.target_api.call("createTag", [target_project_id, tag_name]) + if new_id: + self.tag_mapping[tag_id] = new_id + location = "глобальный" if target_project_id == 0 else f"проект {target_project_id}" + print(f"Перенесен тег '{tag_name}' ({tag_id} -> {new_id}) в {location}") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при переносе '{tag_name}': {e}") + + self._save_mapping(self.tag_mapping_file, self.tag_mapping) + print("Миграция тегов завершена.") + print(f"Маппинг тегов сохранён в {self.tag_mapping_file}") + + # === Полное удаление всех тегов === + def delete_all(self, **kwargs): + print("Начало удаления всех тегов...") + all_tags = self.target_api.call("getAllTags") or [] + if not all_tags: + print("Теги не найдены.") + return + + for tag in all_tags: + tag_id = int(tag["id"]) + tag_name = tag["name"] + try: + ok = self.target_api.call("removeTag", [tag_id]) + if ok: + print(f"Тег '{tag_name}' (ID {tag_id}) удалён") + else: + print(f"Тег '{tag_name}' (ID {tag_id}) не удалён") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при удалении '{tag_name}': {e}") + + if hasattr(self, 'tag_mapping_file'): + self._delete_mapping_file(self.tag_mapping_file) + if hasattr(self, 'tag_mapping'): + self.tag_mapping.clear() + + print("Удаление тегов завершено.") + diff --git a/lib/tasks-word.py b/lib/tasks-word.py new file mode 100644 index 0000000..5f15cf5 --- /dev/null +++ b/lib/tasks-word.py @@ -0,0 +1,227 @@ +from typing import Optional +from datetime import datetime + +from lib.kanboard_api import KanboardAPI +from lib.base_migrator import BaseMigrator + +class TasksMigrator(BaseMigrator): + """Миграция задач в Kanboard.""" + + def __init__( + self, + source_api: KanboardAPI, + target_api: KanboardAPI, + project_mapping_file: str = "project_mapping.json", + user_mapping_file: str = "user_mapping.json", + tag_mapping_file: str = "tag_mapping.json", + column_mapping_file: str = "column_mapping.json" + ): + self.source_api = source_api + self.target_api = target_api + self.project_mapping_file = project_mapping_file + self.user_mapping_file = user_mapping_file + self.tag_mapping_file = tag_mapping_file + self.column_mapping_file = column_mapping_file + self.project_mapping = self._load_mapping(self.project_mapping_file) + self.user_mapping = self._load_mapping(self.user_mapping_file) + self.tag_mapping = self._load_mapping(self.tag_mapping_file) + self.column_mapping = self._load_nested_mapping(self.column_mapping_file) + + def migrate(self): + """Миграция всех задач из исходных проектов в целевые.""" + print("Начало миграции задач...") + + for source_project_id, target_project_id in self.project_mapping.items(): + print(f"Миграция задач из проекта {source_project_id} -> {target_project_id}") + + # Получаем задачи с учетом статуса (1 = активные, 0 = закрытые) + source_tasks = self.source_api.call("getAllTasks", [int(source_project_id), 1]) or [] + + for task in source_tasks: + self._migrate_task(task, int(source_project_id), int(target_project_id)) + + print("Миграция задач завершена.") + + def _migrate_task(self, task, source_project_id: int, target_project_id: int): + """Миграция одной задачи.""" + task_id = int(task["id"]) + task_title = task["title"] + + # Получаем маппинг колонки + source_column_id = task.get("column_id") + target_column_id = None + + if source_column_id: + target_column_id = self.get_column_mapping(source_project_id, source_column_id) + if not target_column_id: + print(f" Предупреждение: не найден маппинг для колонки {source_column_id} проекта {source_project_id}") + + params = { + "project_id": target_project_id, + "title": task_title, + "description": task.get("description", ""), + "color_id": task.get("color_id", "yellow"), + "owner_id": self._safe_map_id(task.get("owner_id"), self.user_mapping), + "creator_id": self._safe_map_id(task.get("creator_id"), self.user_mapping), + "score": int(task.get("score", 0)), + "priority": int(task.get("priority", 0)), + "reference": task.get("reference", ""), + "time_estimated": int(task.get("time_estimated", 0)), + "time_spent": int(task.get("time_spent", 0)), + } + + # Добавляем column_id если нашли маппинг + if target_column_id: + params["column_id"] = target_column_id + + # Обрабатываем даты + if task.get("date_due") and task["date_due"] != "0": + params["date_due"] = self._timestamp_to_kanboard_date(task["date_due"]) + + if task.get("date_started") and task["date_started"] != "0": + params["date_started"] = self._timestamp_to_kanboard_date(task["date_started"]) + + # Обрабатываем теги задачи + task_tags = self._get_task_tags(task_id) + if task_tags: + params["tags"] = task_tags + + try: + new_task_id = self.target_api.call("createTask", params) + if new_task_id: + column_info = f" -> колонка {target_column_id}" if target_column_id else "" + print(f" Перенесена задача '{task_title}' ({task_id} -> {new_task_id}{column_info})") + + # Мигрируем файлы задачи после создания задачи + self._migrate_task_files(task_id, new_task_id, target_project_id) + + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при переносе задачи '{task_title}': {e}") + + def _migrate_task_files(self, source_task_id: int, target_task_id: int, target_project_id: int): + """Миграция файлов задачи.""" + try: + files = self.source_api.call("getAllTaskFiles", {"task_id": source_task_id}) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить файлы задачи {source_task_id}: {e}") + return + + if not files: + return + + print(f" Миграция файлов задачи ({len(files)} файлов)...") + + for file_info in files: + file_id = int(file_info.get("id")) + filename = file_info.get("name") + + try: + # Скачиваем файл из исходной системы + data_b64 = self.source_api.call("downloadTaskFile", [file_id]) + if not data_b64: + print(f" Пропущен файл '{filename}' — пустой контент") + continue + + # Загружаем файл в целевую систему + new_file_id = self.target_api.call( + "createTaskFile", + [target_project_id, target_task_id, filename, data_b64] + ) + + if new_file_id: + print(f" Файл '{filename}' перенесён ({file_id} -> {new_file_id})") + else: + print(f" Не удалось перенести файл '{filename}'") + + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}") + + def get_column_mapping(self, src_project_id: int, src_column_id: int) -> Optional[int]: + """ + Получить ID целевой колонки по ID исходной колонки и проекта. + + Args: + src_project_id: ID исходного проекта + src_column_id: ID исходной колонки + + Returns: + ID целевой колонки или None если маппинг не найден + """ + project_mapping = self.column_mapping.get(src_project_id, {}) + return project_mapping.get(src_column_id) + + def _safe_map_id(self, source_id, mapping_dict): + """Безопасный маппинг ID с преобразованием типов.""" + if not source_id or source_id in ("0", 0): + return 0 + + try: + source_id_int = int(source_id) + mapped_id = mapping_dict.get(source_id_int) + + if mapped_id is not None: + return mapped_id + except (ValueError, TypeError): + pass + + return 0 + + def _get_task_tags(self, task_id): + """Получение тегов задачи - используем оригинальные названия тегов.""" + source_tags = self.source_api.call("getTaskTags", [task_id]) or {} + + if not source_tags: + return [] + + # Просто возвращаем названия тегов из source + # Kanboard автоматически создаст теги с такими названиями или свяжет с существующими + return list(source_tags.values()) + + def _timestamp_to_kanboard_date(self, timestamp): + """Конвертация timestamp в формат даты Kanboard.""" + try: + dt = datetime.fromtimestamp(int(timestamp)) + return dt.strftime("%Y-%m-%d %H:%M") + except (ValueError, TypeError): + return "" + + def delete_all(self, **kwargs): + """Удаление всех задач из целевых проектов.""" + print("Начало удаления всех задач...") + + for target_project_id in self.project_mapping.values(): + print(f"Удаление задач из проекта {target_project_id}") + + tasks = self.target_api.call("getAllTasks", [int(target_project_id), 1]) or [] + + for task in tasks: + task_id = int(task["id"]) + task_title = task["title"] + + try: + # Сначала удаляем все файлы задачи + self._delete_task_files(task_id) + + # Затем удаляем саму задачу + ok = self.target_api.call("removeTask", [task_id]) + if ok: + print(f" Задача '{task_title}' (ID {task_id}) удалена") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при удалении '{task_title}': {e}") + + print("Удаление задач завершено.") + + def _delete_task_files(self, task_id: int): + """Удаление всех файлов задачи.""" + try: + files = self.target_api.call("getAllTaskFiles", {"task_id": task_id}) or [] + if files: + # Используем removeAllTaskFiles для удаления всех файлов сразу + ok = self.target_api.call("removeAllTaskFiles", {"task_id": task_id}) + if ok: + print(f" Удалены все файлы задачи {task_id} ({len(files)} файлов)") + else: + print(f" Не удалось удалить файлы задачи {task_id}") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при удалении файлов задачи {task_id}: {e}") + diff --git a/lib/tasks.py b/lib/tasks.py new file mode 100644 index 0000000..335dace --- /dev/null +++ b/lib/tasks.py @@ -0,0 +1,274 @@ +from typing import Optional +from datetime import datetime + +from lib.kanboard_api import KanboardAPI +from lib.base_migrator import BaseMigrator + +class TasksMigrator(BaseMigrator): + """Миграция задач в Kanboard.""" + + def __init__( + self, + source_api: KanboardAPI, + target_api: KanboardAPI, + project_mapping_file: str = "project_mapping.json", + user_mapping_file: str = "user_mapping.json", + tag_mapping_file: str = "tag_mapping.json", + column_mapping_file: str = "column_mapping.json" + ): + self.source_api = source_api + self.target_api = target_api + self.project_mapping_file = project_mapping_file + self.user_mapping_file = user_mapping_file + self.tag_mapping_file = tag_mapping_file + self.column_mapping_file = column_mapping_file + self.project_mapping = self._load_mapping(self.project_mapping_file) + self.user_mapping = self._load_mapping(self.user_mapping_file) + self.tag_mapping = self._load_mapping(self.tag_mapping_file) + self.column_mapping = self._load_nested_mapping(self.column_mapping_file) + + def migrate(self): + """Миграция всех задач из исходных проектов в целевые.""" + print("Начало миграции задач...") + + for source_project_id, target_project_id in self.project_mapping.items(): + print(f"Миграция задач из проекта {source_project_id} -> {target_project_id}") + + # Получаем задачи с учетом статуса (1 = активные, 0 = закрытые) + source_tasks = self.source_api.call("getAllTasks", [int(source_project_id), 1]) or [] + + for task in source_tasks: + self._migrate_task(task, int(source_project_id), int(target_project_id)) + + print("Миграция задач завершена.") + + def _migrate_task(self, task, source_project_id: int, target_project_id: int): + """Миграция одной задачи.""" + task_id = int(task["id"]) + task_title = task["title"] + + # Получаем маппинг колонки + source_column_id = task.get("column_id") + target_column_id = None + + if source_column_id: + target_column_id = self.get_column_mapping(source_project_id, source_column_id) + if not target_column_id: + print(f" Предупреждение: не найден маппинг для колонки {source_column_id} проекта {source_project_id}") + + params = { + "project_id": target_project_id, + "title": task_title, + "description": task.get("description", ""), + "color_id": task.get("color_id", "yellow"), + "owner_id": self._safe_map_id(task.get("owner_id"), self.user_mapping), + "creator_id": self._safe_map_id(task.get("creator_id"), self.user_mapping), + "score": int(task.get("score", 0)), + "priority": int(task.get("priority", 0)), + "reference": task.get("reference", ""), + "time_estimated": int(task.get("time_estimated", 0)), + "time_spent": int(task.get("time_spent", 0)), + } + + # Добавляем column_id если нашли маппинг + if target_column_id: + params["column_id"] = target_column_id + + # Обрабатываем даты + if task.get("date_due") and task["date_due"] != "0": + params["date_due"] = self._timestamp_to_kanboard_date(task["date_due"]) + + if task.get("date_started") and task["date_started"] != "0": + params["date_started"] = self._timestamp_to_kanboard_date(task["date_started"]) + + # Обрабатываем теги задачи + task_tags = self._get_task_tags(task_id) + if task_tags: + params["tags"] = task_tags + + try: + new_task_id = self.target_api.call("createTask", params) + if new_task_id: + column_info = f" -> колонка {target_column_id}" if target_column_id else "" + print(f" Перенесена задача '{task_title}' ({task_id} -> {new_task_id}{column_info})") + + # Мигрируем дополнительные данные задачи + self._migrate_task_metadata(task_id, new_task_id) + self._migrate_subtasks(task_id, new_task_id) + self._migrate_task_files(task_id, new_task_id, target_project_id) + + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при переносе задачи '{task_title}': {e}") + + def _migrate_task_metadata(self, source_task_id: int, target_task_id: int): + """Миграция метаданных задачи.""" + try: + metadata = self.source_api.call("getTaskMetadata", [source_task_id]) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить метаданные задачи {source_task_id}: {e}") + return + + if not metadata: + return + + # Метаданные возвращаются как список словарей, берем первый + if isinstance(metadata, list) and len(metadata) > 0: + metadata_dict = metadata[0] + else: + metadata_dict = metadata + + if not isinstance(metadata_dict, dict) or not metadata_dict: + return + + try: + ok = self.target_api.call("saveTaskMetadata", [target_task_id, metadata_dict]) + if ok: + print(f" Метаданные задачи перенесены ({len(metadata_dict)} ключей)") + else: + print(f" Не удалось сохранить метаданные задачи {target_task_id}") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при сохранении метаданных задачи {target_task_id}: {e}") + + def _migrate_subtasks(self, source_task_id: int, target_task_id: int): + """Миграция подзадач.""" + try: + subtasks = self.source_api.call("getAllSubtasks", {"task_id": source_task_id}) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить подзадачи {source_task_id}: {e}") + return + + if not subtasks: + return + + print(f" Миграция подзадач ({len(subtasks)} подзадач)...") + + for subtask in subtasks: + params = { + "task_id": target_task_id, + "title": subtask.get("title", ""), + "user_id": self._safe_map_id(subtask.get("user_id"), self.user_mapping), + "time_estimated": int(subtask.get("time_estimated", 0)), + "time_spent": int(subtask.get("time_spent", 0)), + "status": int(subtask.get("status", 0)), + } + + try: + new_subtask_id = self.target_api.call("createSubtask", params) + if new_subtask_id: + print(f" Подзадача '{subtask.get('title')}' перенесена ({subtask.get('id')} -> {new_subtask_id})") + else: + print(f" Не удалось создать подзадачу '{subtask.get('title')}'") + + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при переносе подзадачи: {e}") + + def _migrate_task_files(self, source_task_id: int, target_task_id: int, target_project_id: int): + """Миграция файлов задачи.""" + try: + files = self.source_api.call("getAllTaskFiles", {"task_id": source_task_id}) or [] + except Exception as e: + print(f"[{type(e).__name__}] Не удалось получить файлы задачи {source_task_id}: {e}") + return + + if not files: + return + + print(f" Миграция файлов задачи ({len(files)} файлов)...") + + for file_info in files: + file_id = int(file_info.get("id")) + filename = file_info.get("name") + + try: + # Скачиваем файл из исходной системы + data_b64 = self.source_api.call("downloadTaskFile", [file_id]) + if not data_b64: + print(f" Пропущен файл '{filename}' — пустой контент") + continue + + # Загружаем файл в целевую систему + new_file_id = self.target_api.call( + "createTaskFile", + [target_project_id, target_task_id, filename, data_b64] + ) + + if new_file_id: + print(f" Файл '{filename}' перенесён ({file_id} -> {new_file_id})") + else: + print(f" Не удалось перенести файл '{filename}'") + + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}") + + def get_column_mapping(self, src_project_id: int, src_column_id: int) -> Optional[int]: + """ + Получить ID целевой колонки по ID исходной колонки и проекта. + + Args: + src_project_id: ID исходного проекта + src_column_id: ID исходной колонки + + Returns: + ID целевой колонки или None если маппинг не найден + """ + project_mapping = self.column_mapping.get(src_project_id, {}) + return project_mapping.get(src_column_id) + + def _safe_map_id(self, source_id, mapping_dict): + """Безопасный маппинг ID с преобразованием типов.""" + if not source_id or source_id in ("0", 0): + return 0 + + try: + source_id_int = int(source_id) + mapped_id = mapping_dict.get(source_id_int) + + if mapped_id is not None: + return mapped_id + except (ValueError, TypeError): + pass + + return 0 + + def _get_task_tags(self, task_id): + """Получение тегов задачи - используем оригинальные названия тегов.""" + source_tags = self.source_api.call("getTaskTags", [task_id]) or {} + + if not source_tags: + return [] + + # Просто возвращаем названия тегов из source + # Kanboard автоматически создаст теги с такими названиями или свяжет с существующими + return list(source_tags.values()) + + def _timestamp_to_kanboard_date(self, timestamp): + """Конвертация timestamp в формат даты Kanboard.""" + try: + dt = datetime.fromtimestamp(int(timestamp)) + return dt.strftime("%Y-%m-%d %H:%M") + except (ValueError, TypeError): + return "" + + def delete_all(self, **kwargs): + """Удаление всех задач из целевых проектов.""" + print("Начало удаления всех задач...") + + for target_project_id in self.project_mapping.values(): + print(f"Удаление задач из проекта {target_project_id}") + + tasks = self.target_api.call("getAllTasks", [int(target_project_id), 1]) or [] + + for task in tasks: + task_id = int(task["id"]) + task_title = task["title"] + + try: + # Затем удаляем саму задачу + ok = self.target_api.call("removeTask", [task_id]) + if ok: + print(f" Задача '{task_title}' (ID {task_id}) удалена") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при удалении '{task_title}': {e}") + + print("Удаление задач завершено.") + diff --git a/lib/users.py b/lib/users.py new file mode 100644 index 0000000..b0b9df8 --- /dev/null +++ b/lib/users.py @@ -0,0 +1,79 @@ +from lib.kanboard_api import KanboardAPI +from lib.base_migrator import BaseMigrator +import os + + +class UsersMigrator(BaseMigrator): + """Миграция и удаление пользователей в Kanboard.""" + + def __init__(self, source_api: KanboardAPI, target_api: KanboardAPI, user_mapping_file: str = "user_mapping.json"): + self.source_api = source_api + self.target_api = target_api + self.user_mapping_file = user_mapping_file + self.user_mapping = self._load_mapping(self.user_mapping_file) + + # === Миграция === + def migrate(self): + print("Начало миграции пользователей...") + source_users = self.source_api.call("getAllUsers") or [] + target_users = self.target_api.call("getAllUsers") or [] + existing_usernames = {u["username"] for u in target_users} + temp_password = os.getenv("TEMP_USER_PASSWORD") + + for user in source_users: + source_id = int(user["id"]) + username = user["username"] + name = user.get("name") or "" + email = user.get("email") or "" + role = user.get("role") or "app-user" + + if username in existing_usernames: + print(f"Пользователь '{username}' уже существует, пропускаем") + continue + + password = temp_password if temp_password else username + + try: + new_id = self.target_api.call("createUser", { + "username": username, + "password": password, + "name": name, + "email": email, + "role": role + }) + if new_id: + self.user_mapping[source_id] = new_id + print(f"Пользователь '{username}' ({source_id} -> {new_id}) создан") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при переносе '{username}': {e}") + + self._save_mapping(self.user_mapping_file, self.user_mapping) + print(f"Миграция пользователей завершена. Маппинг сохранён в {self.user_mapping_file}") + + # === Удаление === + def delete_all(self, exclude_admin: bool = True, **kwargs): + print("Начало удаления пользователей...") + all_users = self.target_api.call("getAllUsers") or [] + + for user in all_users: + user_id = int(user["id"]) + username = user["username"] + if exclude_admin and username == "admin": + continue + + try: + ok = self.target_api.call("removeUser", {"user_id": user_id}) + if ok: + print(f"Пользователь '{username}' (ID {user_id}) удалён") + else: + print(f"Пользователь '{username}' (ID {user_id}) не удалён") + except Exception as e: + print(f"[{type(e).__name__}] Ошибка при удалении '{username}': {e}") + + if hasattr(self, 'user_mapping_file'): + self._delete_mapping_file(self.user_mapping_file) + if hasattr(self, 'user_mapping'): + self.user_mapping.clear() + + print("Удаление пользователей завершено.") + diff --git a/main.py b/main.py new file mode 100755 index 0000000..f9e9b54 --- /dev/null +++ b/main.py @@ -0,0 +1,167 @@ +#!/usr/bin/env -S uv run --with-requirements requirements.txt + +import os +import sys +import click +from dotenv import load_dotenv + +from lib.kanboard_api import KanboardAPI +from lib.interactive import KanboardInteractive +from lib.users import UsersMigrator +from lib.projects import ProjectsMigrator +from lib.tags import TagsMigrator +from lib.tasks import TasksMigrator + +load_dotenv() + +MAPPING_FILES = { + "user": "user_mapping.json", + "project": "project_mapping.json", + "column": "column_mapping.json", + "tag": "tag_mapping.json", +} + +SCOPES_ORDER = ["users", "projects", "tags", "tasks"] + +def get_env_var(name: str) -> str: + value = os.environ.get(name) + if not value: + raise EnvironmentError(f"Не найдена переменная окружения: {name}") + return value + +def setup_apis(): + try: + source = KanboardAPI( + url=get_env_var("SOURCE_URL"), + user=get_env_var("SOURCE_USER"), + token=get_env_var("SOURCE_TOKEN") + ) + target = KanboardAPI( + url=get_env_var("TARGET_URL"), + user=get_env_var("TARGET_USER"), + token=get_env_var("TARGET_TOKEN") + ) + return source, target + except EnvironmentError as e: + print(e) + sys.exit(1) + +def create_migrator(scope_name, source_api, target_api): + """Создание одного мигратора для указанной области.""" + if scope_name == "users": + return UsersMigrator(source_api, target_api, MAPPING_FILES["user"]) + elif scope_name == "projects": + return ProjectsMigrator(source_api, target_api, MAPPING_FILES["project"], MAPPING_FILES["user"], MAPPING_FILES["column"]) + elif scope_name == "tags": + return TagsMigrator(source_api, target_api, MAPPING_FILES["tag"], MAPPING_FILES["project"]) + elif scope_name == "tasks": + return TasksMigrator(source_api, target_api, MAPPING_FILES["project"], MAPPING_FILES["user"], MAPPING_FILES["tag"], MAPPING_FILES["column"]) + else: + raise ValueError(f"Неизвестный scope: {scope_name}") + +def create_plan(scope, reverse_order=False): + """ + Создание плана выполнения на основе области. + + Args: + scope: Область выполнения (None для полного выполнения) + reverse_order: Если True, используется обратный порядок для полного выполнения + + Returns: + Список областей для выполнения + """ + if scope is None: + # Полное выполнение + order = list(reversed(SCOPES_ORDER)) if reverse_order else SCOPES_ORDER + return order + else: + # Выполнение только указанной области + return [scope] + +# +# Основные операции +# + +def run_interactive(source, target, instance): + if instance not in ["source", "target"]: + print(f"Неизвестный экземпляр: '{instance}'. Должен быть 'source' или 'target'") + sys.exit(1) + api = source if instance == "source" else target + interactive = KanboardInteractive(api) + interactive.run() + +def run_migrate(source_api, target_api, scope): + """Запуск миграции с созданием миграторов по мере выполнения.""" + + if scope and scope not in SCOPES_ORDER: + click.echo(f"Неизвестный scope: '{scope}'. Доступные: {', '.join(SCOPES_ORDER)}", err=True) + sys.exit(1) + + migration_plan = create_plan(scope, reverse_order=False) + + for scope_name in migration_plan: + # Создаем мигратор непосредственно перед выполнением + migrator = create_migrator(scope_name, source_api, target_api) + try: + click.echo(f"=== Выполнение миграции: {scope_name} ===") + migrator.migrate() + click.echo(f"=== Миграция {scope_name} завершена ===\n") + except Exception as e: + click.echo(f"Ошибка при миграции {scope_name}: {e}", err=True) + sys.exit(1) + +def run_delete(source_api, target_api, scope): + """Удаление данных из целевого экземпляра с созданием миграторов по мере выполнения.""" + + if scope and scope not in SCOPES_ORDER: + click.echo(f"Неизвестный scope: '{scope}'. Доступные: {', '.join(SCOPES_ORDER)}", err=True) + sys.exit(1) + + deletion_plan = create_plan(scope, reverse_order=True) + + for scope_name in deletion_plan: + # Создаем мигратор непосредственно перед выполнением + deleter = create_migrator(scope_name, source_api, target_api) + try: + click.echo(f"=== Выполнение удаления: {scope_name} ===") + deleter.delete_all() + click.echo(f"=== Удаление {scope_name} завершено ===\n") + except Exception as e: + click.echo(f"Ошибка при удалении {scope_name}: {e}", err=True) + sys.exit(1) + +@click.group() +def cli(): + """Скрипт для миграции Kanboard. Используйте 'command --help' для деталей.""" + +@cli.command() +@click.argument('instance', + type=click.Choice(['source', 'target'])) +def interactive(instance): + """Интерактивный режим: укажите source или target.""" + source, target = setup_apis() + run_interactive(source, target, instance) + +@cli.command() +@click.option('--scope', + type=click.Choice(SCOPES_ORDER), + default=None, + help=f"Scope: {', '.join(SCOPES_ORDER)} (пример: --scope projects)") +def migrate(scope): + """Миграция данных: укажите --scope или используйте для полного режима.""" + source, target = setup_apis() + run_migrate(source, target, scope) + +@cli.command() +@click.option('--scope', + type=click.Choice(SCOPES_ORDER), + default=None, + help=f"Scope: {', '.join(SCOPES_ORDER)} (пример: --scope tags)") +def delete(scope): + """Удаление данных: укажите --scope или используйте для полного режима.""" + source, target = setup_apis() + run_delete(source, target, scope) + +if __name__ == '__main__': + cli() + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bab0c60 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +dotenv +requests +click +