Initial commit

This commit is contained in:
2025-10-18 00:52:08 +03:00
commit e049a302d3
18 changed files with 1541 additions and 0 deletions

15
.env.example Normal file
View File

@@ -0,0 +1,15 @@
# Source Server
SOURCE_URL="https://example.com/jsonrpc.php"
SOURCE_USER="jsonrpc"
SOURCE_TOKEN="SECRET"
# Target Server
TARGET_URL="https://example.com/jsonrpc.php"
TARGET_USER="jsonrpc"
TARGET_TOKEN="SECRET"
# Optional
TEMP_USER_PASSWORD=
# Если не задать TEMP_USER_PASSWORD, который всем пользователям поставит
# одинаковый пароль, то автоматически пароль будет равен username
# например user62 пароль: user62

9
.gitignore vendored Normal file
View File

@@ -0,0 +1,9 @@
**/__pycache__/**
venv/
.venv/
.env
user_mapping.json
project_mapping.json
tag_mapping.json
column_mapping.json

18
LICENSE Normal file
View File

@@ -0,0 +1,18 @@
MIT License
Copyright (c) 2025 y9938
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
associated documentation files (the "Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the
following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial
portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO
EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE OR OTHER DEALINGS IN THE SOFTWARE.

41
README.md Normal file
View File

@@ -0,0 +1,41 @@
# Kanboard API
> Взаимодействие с API, миграция с одного instance на другой.
> Для вас миграция может быть не идеальная.
> По документации https://docs.kanboard.org/v1/api/
Код затрагивает работу с API Reference по документации:
- Column API Procedures
- Project API Procedures
- Project File API Procedures
- Project Metadata API Procedures
- Project Permission API Procedures
- Subtask API Procedures
- Tags API Procedures
- Task API Procedures
- Task File API Procedures
- Task Metadata API Procedures
- User API Procedures
## Начальные шаги
```bash
cp .env.example .env
chmod +x main.py
```
Можно запустить main.py и другим способом без `uv`.
---
### Демонстрация
![help](assets/help.png)
![run full migration](assets/full_migration.png)
- Без указания COMMAND выполняется полная миграция для users, projects, tags и tasks
![interactive mode](assets/interactive.png)

BIN
assets/full_migration.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

BIN
assets/help.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

BIN
assets/interactive.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

0
lib/__init__.py Normal file
View File

78
lib/base_migrator.py Normal file
View File

@@ -0,0 +1,78 @@
import json
import os
from typing import Dict, Any
from abc import abstractmethod
class BaseMigrator:
"""Базовый класс для миграторов с общими методами работы с файлами маппинга."""
def _load_mapping(self, filename: str) -> Dict[int, int]:
"""Загружает маппинг из JSON файла."""
if os.path.exists(filename):
with open(filename, "r", encoding="utf-8") as f:
data = json.load(f)
return {int(k): int(v) for k, v in data.items()}
return {}
def _load_nested_mapping(self, filename: str) -> Dict[Any, Any]:
"""Загрузка вложенного маппинга из JSON-файла (для колонок)."""
if os.path.exists(filename):
with open(filename, 'r') as f:
data = json.load(f)
# Для вложенного маппинга колонок: {project_id: {src_col_id: target_col_id}}
result = {}
for project_id_str, column_map in data.items():
project_id = int(project_id_str)
result[project_id] = {}
for src_col_str, target_col_id in column_map.items():
result[project_id][int(src_col_str)] = int(target_col_id)
return result
return {}
def _save_mapping(self, filename: str, mapping: Dict[int, int]):
"""Сохраняет маппинг в JSON файл."""
with open(filename, "w", encoding="utf-8") as f:
json.dump(mapping, f, ensure_ascii=False, indent=2)
def _save_nested_mapping(self, filename: str, mapping: Dict[Any, Any]):
"""Сохранение вложенного маппинга в JSON-файл."""
# Преобразуем в формат, который можно сериализовать в JSON
serializable_mapping = {}
for project_id, column_map in mapping.items():
serializable_mapping[str(project_id)] = {str(k): v for k, v in column_map.items()}
with open(filename, 'w') as f:
json.dump(serializable_mapping, f, indent=2)
def _delete_mapping_file(self, filename: str) -> bool:
"""
Удаляет файл маппинга, если он существует.
Args:
filename: Путь к файлу маппинга.
Returns:
bool: True, если файл удален или не существует, False в случае ошибки.
"""
try:
if filename and os.path.exists(filename):
os.remove(filename)
print(f"Файл маппинга {filename} удалён")
return True
else:
print("Файл маппинга не найден или не задан")
return True
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении файла маппинга: {e}")
return False
@abstractmethod
def migrate(self):
"""Абстрактный метод для миграции данных."""
pass
@abstractmethod
def delete_all(self):
"""Абстрактный метод для удаления всех данных."""
pass

90
lib/interactive.py Normal file
View File

@@ -0,0 +1,90 @@
import json
from lib.kanboard_api import KanboardAPI
class KanboardInteractive:
"""Позволяет выполнять произвольные JSON-RPC запросы через KanboardAPI."""
def __init__(self, api: KanboardAPI):
self.api = api
def parse_simple_syntax(self, line):
"""Парсит упрощенный синтаксис: метод param1=value1 param2=value2"""
parts = line.strip().split()
if not parts:
return None, None
method = parts[0]
params = {}
for part in parts[1:]:
# Парсим key=value
if '=' in part:
key, value = part.split('=', 1)
# Пробуем преобразовать значение в число, если возможно
try:
value = int(value)
except ValueError:
try:
value = float(value)
except ValueError:
# Оставляем как строку
pass
params[key] = value
else:
# Если нет '=', считаем позиционным параметром с числовым ключом
try:
value = int(part)
except ValueError:
value = part
params[str(len(params))] = value
return method, params
def convert_to_array_params(self, params):
"""Конвертирует словарь параметров в массив, если ключи числовые"""
if all(key.isdigit() for key in params.keys()):
# Сортируем по числовым ключам и возвращаем массив значений
return [params[str(i)] for i in range(len(params))]
return params
def run(self):
print("Введите метод Kanboard API и параметры.")
print("Форматы:")
print(' JSON: {"method": "getProjectById", "params": {"project_id": 1}}')
print(' Простой: getProjectUsers project_id=32')
print(' Позиционный: getProjectUserRole 32 8')
print("Для выхода введите 'exit'")
while True:
line = input("Kanboard> ").strip()
if line.lower() in ("exit", "quit"):
break
if not line:
continue
# Определяем формат ввода
if line.startswith('{'):
# JSON формат
try:
data = json.loads(line)
method = data.get("method")
params = data.get("params", {})
except json.JSONDecodeError as e:
print(f"Ошибка JSON: {e}")
continue
else:
# Упрощенный формат
method, params = self.parse_simple_syntax(line)
if not method:
print("Не указан метод.")
continue
# Для методов, которые ожидают массив параметров, конвертируем
params = self.convert_to_array_params(params)
try:
result = self.api.call(method, params)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as e:
print(f"Ошибка API: {e}")

46
lib/kanboard_api.py Executable file
View File

@@ -0,0 +1,46 @@
import json
import requests
from typing import Any, Dict, Tuple
class KanboardAPI:
"""Базовый класс для работы с Kanboard через JSON-RPC."""
def __init__(self, url: str, user: str, token: str):
self.url: str = url
self.auth: Tuple[str, str] = (user, token)
self.request_id: int = 0
def call(self, method: str, params: Any = None) -> Any:
"""Выполнить JSON-RPC запрос с кодировкой UTF-8."""
self.request_id += 1
payload: Dict[str, Any] = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id,
"params": params or {}
}
try:
json_data: bytes = json.dumps(payload, ensure_ascii=False).encode('utf-8')
except Exception as e:
raise ValueError(f"Ошибка кодирования JSON: {e}")
headers = {'Content-Type': 'application/json; charset=utf-8'}
try:
response = requests.post(self.url, data=json_data, auth=self.auth, headers=headers)
response.raise_for_status()
response.encoding = 'utf-8'
except requests.exceptions.RequestException as e:
raise ConnectionError(f"Сетевая ошибка при вызове API {method}: {e}")
try:
result = response.json()
except json.JSONDecodeError as e:
raise ValueError(f"Ошибка декодирования JSON: {e}. Ответ: {response.text[:100]}...")
if 'error' in result:
raise Exception(f"API Error: {result['error']}")
return result.get('result')

408
lib/projects.py Normal file
View File

@@ -0,0 +1,408 @@
from typing import Any, Dict, Optional, List
from lib.kanboard_api import KanboardAPI
from lib.base_migrator import BaseMigrator
class ProjectsMigrator(BaseMigrator):
"""Миграция и удаление проектов в Kanboard."""
def __init__(
self,
source_api: KanboardAPI,
target_api: KanboardAPI,
project_mapping_file: str = "project_mapping.json",
user_mapping_file: str = "user_mapping.json",
column_mapping_file: str = "column_mapping.json"
):
self.source_api = source_api
self.target_api = target_api
self.project_mapping_file = project_mapping_file
self.user_mapping_file = user_mapping_file
self.column_mapping_file = column_mapping_file
self.project_mapping = self._load_mapping(self.project_mapping_file)
self.user_mapping = self._load_mapping(self.user_mapping_file)
self.column_mapping = self._load_nested_mapping(self.column_mapping_file)
# === Helpers ===
def _create_or_find_project(self, src_proj: Dict[str, Any]) -> Optional[int]:
name = src_proj.get("name") or ""
identifier = src_proj.get("identifier") or ""
description = src_proj.get("description") or ""
owner_id = src_proj.get("owner_id")
email = src_proj.get("email")
target_proj = None
found = False
if identifier:
try:
target_proj = self.target_api.call("getProjectByIdentifier", {"identifier": identifier})
except Exception:
target_proj = None
if not target_proj and name:
try:
target_proj = self.target_api.call("getProjectByName", {"name": name})
except Exception:
target_proj = None
if target_proj:
try:
found = True
return int(target_proj["id"]), found
except Exception:
return None, found
create_params: Dict[str, Any] = {"name": name}
if description is not None:
create_params["description"] = description
if identifier:
create_params["identifier"] = identifier
if email:
create_params["email"] = email
if owner_id:
try:
mapped_owner = self.user_mapping.get(int(owner_id))
if mapped_owner:
create_params["owner_id"] = mapped_owner
except Exception:
pass
try:
new_id = self.target_api.call("createProject", create_params)
if new_id:
return int(new_id), False
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при createProject '{name}': {e}")
return None, found
def _migrate_columns(self, src_project_id: int, target_project_id: int):
"""Миграция колонок - создаем такие же как в source и сохраняем маппинг."""
try:
src_columns = self.source_api.call("getColumns", [src_project_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить колонки {src_project_id}: {e}")
return
try:
target_columns = self.target_api.call("getColumns", [target_project_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить колонки target-проекта {target_project_id}: {e}")
target_columns = []
# Инициализируем маппинг колонок для этого проекта
if src_project_id not in self.column_mapping:
self.column_mapping[src_project_id] = {}
project_column_mapping = self.column_mapping[src_project_id]
created_columns: List[int] = []
existing_titles = {col.get("title", "").lower(): col for col in target_columns}
for src_col in sorted(src_columns, key=lambda c: int(c.get("position", 0))):
src_col_id = int(src_col.get("id"))
title = src_col.get("title", "")
task_limit = None
try:
tl = int(src_col.get("task_limit", 0))
if tl > 0:
task_limit = tl
except Exception:
pass
description = src_col.get("description")
# Проверяем, есть ли колонка с таким названием
if title.lower() in existing_titles:
# Колонка уже существует, находим ее ID и сохраняем маппинг
target_col = existing_titles[title.lower()]
target_col_id = int(target_col.get("id"))
project_column_mapping[src_col_id] = target_col_id
created_columns.append(target_col_id)
print(f"Колонка '{title}' уже существует ({src_col_id} -> {target_col_id})")
continue
# Создаем новую колонку
params = {"project_id": target_project_id, "title": title}
if task_limit is not None:
params["task_limit"] = task_limit
if description:
params["description"] = description
try:
new_col_id = self.target_api.call("addColumn", params)
if new_col_id:
new_col_id_int = int(new_col_id)
created_columns.append(new_col_id_int)
# Сохраняем маппинг новой колонки
project_column_mapping[src_col_id] = new_col_id_int
print(f"Добавлена колонка '{title}' ({src_col_id} -> {new_col_id_int})")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при addColumn '{title}': {e}")
# Обновляем позиции колонок
for position, col_id in enumerate(created_columns, start=1):
try:
ok = self.target_api.call("changeColumnPosition", [target_project_id, col_id, position])
if not ok:
print(f"Не удалось установить позицию {position} для колонки {col_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при changeColumnPosition {col_id}: {e}")
# Сохраняем маппинг колонок после обработки всех колонок проекта
self._save_nested_mapping(self.column_mapping_file, self.column_mapping)
def _migrate_columns(self, src_project_id: int, target_project_id: int):
"""Миграция колонок - создаем такие же как в source и сохраняем маппинг."""
try:
src_columns = self.source_api.call("getColumns", [src_project_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить колонки {src_project_id}: {e}")
return
try:
target_columns = self.target_api.call("getColumns", [target_project_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить колонки target-проекта {target_project_id}: {e}")
target_columns = []
# Инициализируем маппинг колонок для этого проекта
if str(src_project_id) not in self.column_mapping:
self.column_mapping[str(src_project_id)] = {}
project_column_mapping = self.column_mapping[str(src_project_id)]
created_columns: List[int] = []
existing_titles = {col.get("title", "").lower(): col for col in target_columns}
for src_col in sorted(src_columns, key=lambda c: int(c.get("position", 0))):
src_col_id = src_col.get("id")
title = src_col.get("title", "")
task_limit = None
try:
tl = int(src_col.get("task_limit", 0))
if tl > 0:
task_limit = tl
except Exception:
pass
description = src_col.get("description")
# Проверяем, есть ли колонка с таким названием
if title.lower() in existing_titles:
# Колонка уже существует, находим ее ID и сохраняем маппинг
target_col = existing_titles[title.lower()]
target_col_id = target_col.get("id")
project_column_mapping[str(src_col_id)] = target_col_id
created_columns.append(int(target_col_id))
print(f"Колонка '{title}' уже существует ({src_col_id} -> {target_col_id})")
continue
# Создаем новую колонку
params = {"project_id": target_project_id, "title": title}
if task_limit is not None:
params["task_limit"] = task_limit
if description:
params["description"] = description
try:
new_col_id = self.target_api.call("addColumn", params)
if new_col_id:
new_col_id_int = int(new_col_id)
created_columns.append(new_col_id_int)
# Сохраняем маппинг новой колонки
project_column_mapping[str(src_col_id)] = new_col_id_int
print(f"Добавлена колонка '{title}' ({src_col_id} -> {new_col_id_int})")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при addColumn '{title}': {e}")
# Обновляем позиции колонок
for position, col_id in enumerate(created_columns, start=1):
try:
ok = self.target_api.call("changeColumnPosition", [target_project_id, col_id, position])
if not ok:
print(f"Не удалось установить позицию {position} для колонки {col_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при changeColumnPosition {col_id}: {e}")
# Сохраняем маппинг колонок после обработки всех колонок проекта
self._save_mapping(self.column_mapping_file, self.column_mapping)
def _migrate_project_users(self, src_project_id: int, target_project_id: int):
try:
src_members = self.source_api.call("getProjectUsers", [src_project_id]) or {}
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить пользователей проекта {src_project_id}: {e}")
src_members = {}
for src_uid_str, username in list(src_members.items()):
try:
src_uid = int(src_uid_str)
except Exception:
continue
try:
role = self.source_api.call("getProjectUserRole", [src_project_id, src_uid])
except Exception:
role = None
target_uid = self.user_mapping.get(src_uid)
if not target_uid:
print(f" Пропускаем пользователя '{username}' ({src_uid}) — нет в user_mapping")
continue
try:
ok = self.target_api.call(
"addProjectUser",
[target_project_id, target_uid, role] if role else [target_project_id, target_uid]
)
if ok:
print(f" Добавлен пользователь '{username}' -> target_id {target_uid} роль='{role}'")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при addProjectUser '{username}': {e}")
def _migrate_project_files(self, src_project_id: int, target_project_id: int):
"""Миграция файлов проекта."""
try:
files = self.source_api.call("getAllProjectFiles", {"project_id": src_project_id}) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить файлы проекта {src_project_id}: {e}")
return
for f in files:
file_id = int(f.get("id"))
filename = f.get("name")
try:
data_b64 = self.source_api.call("downloadProjectFile", [src_project_id, file_id])
if not data_b64:
print(f" Пропущен файл '{filename}' — пустой контент")
continue
new_id = self.target_api.call("createProjectFile", [target_project_id, filename, data_b64])
if new_id:
print(f" Файл '{filename}' перенесён ({file_id} -> {new_id})")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}")
def _apply_project_settings(self, src_project_id: int, target_project_id: int):
try:
src_info = self.source_api.call("getProjectById", {"project_id": src_project_id}) or {}
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить info проекта {src_project_id}: {e}")
return
try:
if str(src_info.get("is_active", "1")) == "1":
self.target_api.call("enableProject", [target_project_id])
else:
self.target_api.call("disableProject", [target_project_id])
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при установке is_active {target_project_id}: {e}")
try:
if str(src_info.get("is_public", "0")) == "1":
self.target_api.call("enableProjectPublicAccess", [target_project_id])
else:
self.target_api.call("disableProjectPublicAccess", [target_project_id])
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при установке public access {target_project_id}: {e}")
updatable = {}
for fld in (
"start_date", "end_date", "priority_default",
"priority_start", "priority_end", "email",
"identifier", "description", "name"
):
if fld in src_info and src_info.get(fld) is not None:
updatable[fld] = src_info.get(fld)
if updatable:
updatable["project_id"] = target_project_id
try:
ok = self.target_api.call("updateProject", updatable)
if not ok:
print(f" Не удалось применить дополнительные настройки для проекта {target_project_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при updateProject {target_project_id}: {e}")
def _delete_project_files(self, project_id: int):
"""Удаление всех файлов проекта перед удалением."""
try:
files = self.target_api.call("getAllProjectFiles", {"project_id": project_id}) or []
if files:
self.target_api.call("removeAllProjectFiles", {"project_id": project_id})
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении файлов проекта {project_id}: {e}")
# === Public API ===
def migrate(self):
print("Начало миграции проектов...")
try:
source_projects = self.source_api.call("getAllProjects") or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить список проектов: {e}")
source_projects = []
for proj in source_projects:
try:
src_id = int(proj.get("id"))
except Exception:
continue
target_id, found = self._create_or_find_project(proj)
if not target_id:
print(f"Не удалось создать или найти проект '{proj.get('name')}' ({src_id})")
continue
self.project_mapping[src_id] = target_id
if found:
print(f"Проект '{proj.get('name')}' уже существует ({src_id} -> {target_id})")
else:
print(f"Проект '{proj.get('name')}' создан ({src_id} -> {target_id})")
self._migrate_columns(src_id, target_id)
self._migrate_project_users(src_id, target_id)
self._migrate_project_files(src_id, target_id)
self._apply_project_settings(src_id, target_id)
self._save_mapping(self.project_mapping_file, self.project_mapping)
self._save_mapping(self.column_mapping_file, self.column_mapping)
print(f"Миграция проектов завершена.")
print(f"Маппинг проектов сохранён в {self.project_mapping_file}")
print(f"Маппинг колонок сохранён в {self.column_mapping_file}")
def delete_all(self, exclude_project_ids: Optional[List[int]] = None, **kwargs):
"""
Удаление всех проектов.
Args:
exclude_project_ids: Список ID проектов, которые нужно исключить из удаления
"""
exclude_project_ids = set(exclude_project_ids or [])
print("Начало удаления проектов...")
try:
all_projects = self.target_api.call("getAllProjects") or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить проекты target: {e}")
return
for p in all_projects:
try:
pid = int(p.get("id"))
except Exception:
continue
if pid in exclude_project_ids:
continue
try:
self._delete_project_files(pid)
ok = self.target_api.call("removeProject", {"project_id": pid})
if ok:
print(f"Проект '{p.get('name')}' (ID {pid}) удалён")
else:
print(f"Проект '{p.get('name')}' (ID {pid}) не удалён")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении проекта {pid}: {e}")
if hasattr(self, 'project_mapping_file'):
self._delete_mapping_file(self.project_mapping_file)
if hasattr(self, 'column_mapping_file'):
self._delete_mapping_file(self.column_mapping_file)
if hasattr(self, 'project_mapping_file'):
self._delete_mapping_file(self.project_mapping_file)
if hasattr(self, 'project_mapping'):
self.project_mapping.clear()
print("Удаление проектов завершено.")

85
lib/tags.py Normal file
View File

@@ -0,0 +1,85 @@
from lib.kanboard_api import KanboardAPI
from lib.base_migrator import BaseMigrator
class TagsMigrator(BaseMigrator):
"""Миграция и полное удаление тегов в Kanboard."""
def __init__(
self,
source_api: KanboardAPI,
target_api: KanboardAPI,
tag_mapping_file: str = "tag_mapping.json",
project_mapping_file: str = "project_mapping.json"
):
self.source_api = source_api
self.target_api = target_api
self.tag_mapping_file = tag_mapping_file
self.project_mapping_file = project_mapping_file
self.tag_mapping = self._load_mapping(self.tag_mapping_file)
self.project_mapping = self._load_mapping(self.project_mapping_file)
# === Миграция ===
def migrate(self):
print("Начало миграции тегов...")
source_tags = self.source_api.call("getAllTags") or []
for tag in source_tags:
tag_id = int(tag["id"])
tag_name = tag["name"]
project_id = int(tag["project_id"])
# Определяем target_project_id
if project_id == 0:
target_project_id = 0 # глобальный тег
else:
target_project_id = self.project_mapping.get(project_id)
if not target_project_id:
print(f"Проект исходного тега {project_id} не найден на целевом сервере, пропускаем тег '{tag_name}'")
continue
# Проверяем, есть ли такой тег уже на целевом экземпляре
existing_tags = self.target_api.call("getTagsByProject", [target_project_id]) or []
if any(t["name"] == tag_name for t in existing_tags):
continue
try:
new_id = self.target_api.call("createTag", [target_project_id, tag_name])
if new_id:
self.tag_mapping[tag_id] = new_id
location = "глобальный" if target_project_id == 0 else f"проект {target_project_id}"
print(f"Перенесен тег '{tag_name}' ({tag_id} -> {new_id}) в {location}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе '{tag_name}': {e}")
self._save_mapping(self.tag_mapping_file, self.tag_mapping)
print("Миграция тегов завершена.")
print(f"Маппинг тегов сохранён в {self.tag_mapping_file}")
# === Полное удаление всех тегов ===
def delete_all(self, **kwargs):
print("Начало удаления всех тегов...")
all_tags = self.target_api.call("getAllTags") or []
if not all_tags:
print("Теги не найдены.")
return
for tag in all_tags:
tag_id = int(tag["id"])
tag_name = tag["name"]
try:
ok = self.target_api.call("removeTag", [tag_id])
if ok:
print(f"Тег '{tag_name}' (ID {tag_id}) удалён")
else:
print(f"Тег '{tag_name}' (ID {tag_id}) не удалён")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении '{tag_name}': {e}")
if hasattr(self, 'tag_mapping_file'):
self._delete_mapping_file(self.tag_mapping_file)
if hasattr(self, 'tag_mapping'):
self.tag_mapping.clear()
print("Удаление тегов завершено.")

227
lib/tasks-word.py Normal file
View File

@@ -0,0 +1,227 @@
from typing import Optional
from datetime import datetime
from lib.kanboard_api import KanboardAPI
from lib.base_migrator import BaseMigrator
class TasksMigrator(BaseMigrator):
"""Миграция задач в Kanboard."""
def __init__(
self,
source_api: KanboardAPI,
target_api: KanboardAPI,
project_mapping_file: str = "project_mapping.json",
user_mapping_file: str = "user_mapping.json",
tag_mapping_file: str = "tag_mapping.json",
column_mapping_file: str = "column_mapping.json"
):
self.source_api = source_api
self.target_api = target_api
self.project_mapping_file = project_mapping_file
self.user_mapping_file = user_mapping_file
self.tag_mapping_file = tag_mapping_file
self.column_mapping_file = column_mapping_file
self.project_mapping = self._load_mapping(self.project_mapping_file)
self.user_mapping = self._load_mapping(self.user_mapping_file)
self.tag_mapping = self._load_mapping(self.tag_mapping_file)
self.column_mapping = self._load_nested_mapping(self.column_mapping_file)
def migrate(self):
"""Миграция всех задач из исходных проектов в целевые."""
print("Начало миграции задач...")
for source_project_id, target_project_id in self.project_mapping.items():
print(f"Миграция задач из проекта {source_project_id} -> {target_project_id}")
# Получаем задачи с учетом статуса (1 = активные, 0 = закрытые)
source_tasks = self.source_api.call("getAllTasks", [int(source_project_id), 1]) or []
for task in source_tasks:
self._migrate_task(task, int(source_project_id), int(target_project_id))
print("Миграция задач завершена.")
def _migrate_task(self, task, source_project_id: int, target_project_id: int):
"""Миграция одной задачи."""
task_id = int(task["id"])
task_title = task["title"]
# Получаем маппинг колонки
source_column_id = task.get("column_id")
target_column_id = None
if source_column_id:
target_column_id = self.get_column_mapping(source_project_id, source_column_id)
if not target_column_id:
print(f" Предупреждение: не найден маппинг для колонки {source_column_id} проекта {source_project_id}")
params = {
"project_id": target_project_id,
"title": task_title,
"description": task.get("description", ""),
"color_id": task.get("color_id", "yellow"),
"owner_id": self._safe_map_id(task.get("owner_id"), self.user_mapping),
"creator_id": self._safe_map_id(task.get("creator_id"), self.user_mapping),
"score": int(task.get("score", 0)),
"priority": int(task.get("priority", 0)),
"reference": task.get("reference", ""),
"time_estimated": int(task.get("time_estimated", 0)),
"time_spent": int(task.get("time_spent", 0)),
}
# Добавляем column_id если нашли маппинг
if target_column_id:
params["column_id"] = target_column_id
# Обрабатываем даты
if task.get("date_due") and task["date_due"] != "0":
params["date_due"] = self._timestamp_to_kanboard_date(task["date_due"])
if task.get("date_started") and task["date_started"] != "0":
params["date_started"] = self._timestamp_to_kanboard_date(task["date_started"])
# Обрабатываем теги задачи
task_tags = self._get_task_tags(task_id)
if task_tags:
params["tags"] = task_tags
try:
new_task_id = self.target_api.call("createTask", params)
if new_task_id:
column_info = f" -> колонка {target_column_id}" if target_column_id else ""
print(f" Перенесена задача '{task_title}' ({task_id} -> {new_task_id}{column_info})")
# Мигрируем файлы задачи после создания задачи
self._migrate_task_files(task_id, new_task_id, target_project_id)
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе задачи '{task_title}': {e}")
def _migrate_task_files(self, source_task_id: int, target_task_id: int, target_project_id: int):
"""Миграция файлов задачи."""
try:
files = self.source_api.call("getAllTaskFiles", {"task_id": source_task_id}) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить файлы задачи {source_task_id}: {e}")
return
if not files:
return
print(f" Миграция файлов задачи ({len(files)} файлов)...")
for file_info in files:
file_id = int(file_info.get("id"))
filename = file_info.get("name")
try:
# Скачиваем файл из исходной системы
data_b64 = self.source_api.call("downloadTaskFile", [file_id])
if not data_b64:
print(f" Пропущен файл '{filename}' — пустой контент")
continue
# Загружаем файл в целевую систему
new_file_id = self.target_api.call(
"createTaskFile",
[target_project_id, target_task_id, filename, data_b64]
)
if new_file_id:
print(f" Файл '{filename}' перенесён ({file_id} -> {new_file_id})")
else:
print(f" Не удалось перенести файл '{filename}'")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}")
def get_column_mapping(self, src_project_id: int, src_column_id: int) -> Optional[int]:
"""
Получить ID целевой колонки по ID исходной колонки и проекта.
Args:
src_project_id: ID исходного проекта
src_column_id: ID исходной колонки
Returns:
ID целевой колонки или None если маппинг не найден
"""
project_mapping = self.column_mapping.get(src_project_id, {})
return project_mapping.get(src_column_id)
def _safe_map_id(self, source_id, mapping_dict):
"""Безопасный маппинг ID с преобразованием типов."""
if not source_id or source_id in ("0", 0):
return 0
try:
source_id_int = int(source_id)
mapped_id = mapping_dict.get(source_id_int)
if mapped_id is not None:
return mapped_id
except (ValueError, TypeError):
pass
return 0
def _get_task_tags(self, task_id):
"""Получение тегов задачи - используем оригинальные названия тегов."""
source_tags = self.source_api.call("getTaskTags", [task_id]) or {}
if not source_tags:
return []
# Просто возвращаем названия тегов из source
# Kanboard автоматически создаст теги с такими названиями или свяжет с существующими
return list(source_tags.values())
def _timestamp_to_kanboard_date(self, timestamp):
"""Конвертация timestamp в формат даты Kanboard."""
try:
dt = datetime.fromtimestamp(int(timestamp))
return dt.strftime("%Y-%m-%d %H:%M")
except (ValueError, TypeError):
return ""
def delete_all(self, **kwargs):
"""Удаление всех задач из целевых проектов."""
print("Начало удаления всех задач...")
for target_project_id in self.project_mapping.values():
print(f"Удаление задач из проекта {target_project_id}")
tasks = self.target_api.call("getAllTasks", [int(target_project_id), 1]) or []
for task in tasks:
task_id = int(task["id"])
task_title = task["title"]
try:
# Сначала удаляем все файлы задачи
self._delete_task_files(task_id)
# Затем удаляем саму задачу
ok = self.target_api.call("removeTask", [task_id])
if ok:
print(f" Задача '{task_title}' (ID {task_id}) удалена")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении '{task_title}': {e}")
print("Удаление задач завершено.")
def _delete_task_files(self, task_id: int):
"""Удаление всех файлов задачи."""
try:
files = self.target_api.call("getAllTaskFiles", {"task_id": task_id}) or []
if files:
# Используем removeAllTaskFiles для удаления всех файлов сразу
ok = self.target_api.call("removeAllTaskFiles", {"task_id": task_id})
if ok:
print(f" Удалены все файлы задачи {task_id} ({len(files)} файлов)")
else:
print(f" Не удалось удалить файлы задачи {task_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении файлов задачи {task_id}: {e}")

274
lib/tasks.py Normal file
View File

@@ -0,0 +1,274 @@
from typing import Optional
from datetime import datetime
from lib.kanboard_api import KanboardAPI
from lib.base_migrator import BaseMigrator
class TasksMigrator(BaseMigrator):
"""Миграция задач в Kanboard."""
def __init__(
self,
source_api: KanboardAPI,
target_api: KanboardAPI,
project_mapping_file: str = "project_mapping.json",
user_mapping_file: str = "user_mapping.json",
tag_mapping_file: str = "tag_mapping.json",
column_mapping_file: str = "column_mapping.json"
):
self.source_api = source_api
self.target_api = target_api
self.project_mapping_file = project_mapping_file
self.user_mapping_file = user_mapping_file
self.tag_mapping_file = tag_mapping_file
self.column_mapping_file = column_mapping_file
self.project_mapping = self._load_mapping(self.project_mapping_file)
self.user_mapping = self._load_mapping(self.user_mapping_file)
self.tag_mapping = self._load_mapping(self.tag_mapping_file)
self.column_mapping = self._load_nested_mapping(self.column_mapping_file)
def migrate(self):
"""Миграция всех задач из исходных проектов в целевые."""
print("Начало миграции задач...")
for source_project_id, target_project_id in self.project_mapping.items():
print(f"Миграция задач из проекта {source_project_id} -> {target_project_id}")
# Получаем задачи с учетом статуса (1 = активные, 0 = закрытые)
source_tasks = self.source_api.call("getAllTasks", [int(source_project_id), 1]) or []
for task in source_tasks:
self._migrate_task(task, int(source_project_id), int(target_project_id))
print("Миграция задач завершена.")
def _migrate_task(self, task, source_project_id: int, target_project_id: int):
"""Миграция одной задачи."""
task_id = int(task["id"])
task_title = task["title"]
# Получаем маппинг колонки
source_column_id = task.get("column_id")
target_column_id = None
if source_column_id:
target_column_id = self.get_column_mapping(source_project_id, source_column_id)
if not target_column_id:
print(f" Предупреждение: не найден маппинг для колонки {source_column_id} проекта {source_project_id}")
params = {
"project_id": target_project_id,
"title": task_title,
"description": task.get("description", ""),
"color_id": task.get("color_id", "yellow"),
"owner_id": self._safe_map_id(task.get("owner_id"), self.user_mapping),
"creator_id": self._safe_map_id(task.get("creator_id"), self.user_mapping),
"score": int(task.get("score", 0)),
"priority": int(task.get("priority", 0)),
"reference": task.get("reference", ""),
"time_estimated": int(task.get("time_estimated", 0)),
"time_spent": int(task.get("time_spent", 0)),
}
# Добавляем column_id если нашли маппинг
if target_column_id:
params["column_id"] = target_column_id
# Обрабатываем даты
if task.get("date_due") and task["date_due"] != "0":
params["date_due"] = self._timestamp_to_kanboard_date(task["date_due"])
if task.get("date_started") and task["date_started"] != "0":
params["date_started"] = self._timestamp_to_kanboard_date(task["date_started"])
# Обрабатываем теги задачи
task_tags = self._get_task_tags(task_id)
if task_tags:
params["tags"] = task_tags
try:
new_task_id = self.target_api.call("createTask", params)
if new_task_id:
column_info = f" -> колонка {target_column_id}" if target_column_id else ""
print(f" Перенесена задача '{task_title}' ({task_id} -> {new_task_id}{column_info})")
# Мигрируем дополнительные данные задачи
self._migrate_task_metadata(task_id, new_task_id)
self._migrate_subtasks(task_id, new_task_id)
self._migrate_task_files(task_id, new_task_id, target_project_id)
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе задачи '{task_title}': {e}")
def _migrate_task_metadata(self, source_task_id: int, target_task_id: int):
"""Миграция метаданных задачи."""
try:
metadata = self.source_api.call("getTaskMetadata", [source_task_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить метаданные задачи {source_task_id}: {e}")
return
if not metadata:
return
# Метаданные возвращаются как список словарей, берем первый
if isinstance(metadata, list) and len(metadata) > 0:
metadata_dict = metadata[0]
else:
metadata_dict = metadata
if not isinstance(metadata_dict, dict) or not metadata_dict:
return
try:
ok = self.target_api.call("saveTaskMetadata", [target_task_id, metadata_dict])
if ok:
print(f" Метаданные задачи перенесены ({len(metadata_dict)} ключей)")
else:
print(f" Не удалось сохранить метаданные задачи {target_task_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при сохранении метаданных задачи {target_task_id}: {e}")
def _migrate_subtasks(self, source_task_id: int, target_task_id: int):
"""Миграция подзадач."""
try:
subtasks = self.source_api.call("getAllSubtasks", {"task_id": source_task_id}) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить подзадачи {source_task_id}: {e}")
return
if not subtasks:
return
print(f" Миграция подзадач ({len(subtasks)} подзадач)...")
for subtask in subtasks:
params = {
"task_id": target_task_id,
"title": subtask.get("title", ""),
"user_id": self._safe_map_id(subtask.get("user_id"), self.user_mapping),
"time_estimated": int(subtask.get("time_estimated", 0)),
"time_spent": int(subtask.get("time_spent", 0)),
"status": int(subtask.get("status", 0)),
}
try:
new_subtask_id = self.target_api.call("createSubtask", params)
if new_subtask_id:
print(f" Подзадача '{subtask.get('title')}' перенесена ({subtask.get('id')} -> {new_subtask_id})")
else:
print(f" Не удалось создать подзадачу '{subtask.get('title')}'")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе подзадачи: {e}")
def _migrate_task_files(self, source_task_id: int, target_task_id: int, target_project_id: int):
"""Миграция файлов задачи."""
try:
files = self.source_api.call("getAllTaskFiles", {"task_id": source_task_id}) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить файлы задачи {source_task_id}: {e}")
return
if not files:
return
print(f" Миграция файлов задачи ({len(files)} файлов)...")
for file_info in files:
file_id = int(file_info.get("id"))
filename = file_info.get("name")
try:
# Скачиваем файл из исходной системы
data_b64 = self.source_api.call("downloadTaskFile", [file_id])
if not data_b64:
print(f" Пропущен файл '{filename}' — пустой контент")
continue
# Загружаем файл в целевую систему
new_file_id = self.target_api.call(
"createTaskFile",
[target_project_id, target_task_id, filename, data_b64]
)
if new_file_id:
print(f" Файл '{filename}' перенесён ({file_id} -> {new_file_id})")
else:
print(f" Не удалось перенести файл '{filename}'")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}")
def get_column_mapping(self, src_project_id: int, src_column_id: int) -> Optional[int]:
"""
Получить ID целевой колонки по ID исходной колонки и проекта.
Args:
src_project_id: ID исходного проекта
src_column_id: ID исходной колонки
Returns:
ID целевой колонки или None если маппинг не найден
"""
project_mapping = self.column_mapping.get(src_project_id, {})
return project_mapping.get(src_column_id)
def _safe_map_id(self, source_id, mapping_dict):
"""Безопасный маппинг ID с преобразованием типов."""
if not source_id or source_id in ("0", 0):
return 0
try:
source_id_int = int(source_id)
mapped_id = mapping_dict.get(source_id_int)
if mapped_id is not None:
return mapped_id
except (ValueError, TypeError):
pass
return 0
def _get_task_tags(self, task_id):
"""Получение тегов задачи - используем оригинальные названия тегов."""
source_tags = self.source_api.call("getTaskTags", [task_id]) or {}
if not source_tags:
return []
# Просто возвращаем названия тегов из source
# Kanboard автоматически создаст теги с такими названиями или свяжет с существующими
return list(source_tags.values())
def _timestamp_to_kanboard_date(self, timestamp):
"""Конвертация timestamp в формат даты Kanboard."""
try:
dt = datetime.fromtimestamp(int(timestamp))
return dt.strftime("%Y-%m-%d %H:%M")
except (ValueError, TypeError):
return ""
def delete_all(self, **kwargs):
"""Удаление всех задач из целевых проектов."""
print("Начало удаления всех задач...")
for target_project_id in self.project_mapping.values():
print(f"Удаление задач из проекта {target_project_id}")
tasks = self.target_api.call("getAllTasks", [int(target_project_id), 1]) or []
for task in tasks:
task_id = int(task["id"])
task_title = task["title"]
try:
# Затем удаляем саму задачу
ok = self.target_api.call("removeTask", [task_id])
if ok:
print(f" Задача '{task_title}' (ID {task_id}) удалена")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении '{task_title}': {e}")
print("Удаление задач завершено.")

79
lib/users.py Normal file
View File

@@ -0,0 +1,79 @@
from lib.kanboard_api import KanboardAPI
from lib.base_migrator import BaseMigrator
import os
class UsersMigrator(BaseMigrator):
"""Миграция и удаление пользователей в Kanboard."""
def __init__(self, source_api: KanboardAPI, target_api: KanboardAPI, user_mapping_file: str = "user_mapping.json"):
self.source_api = source_api
self.target_api = target_api
self.user_mapping_file = user_mapping_file
self.user_mapping = self._load_mapping(self.user_mapping_file)
# === Миграция ===
def migrate(self):
print("Начало миграции пользователей...")
source_users = self.source_api.call("getAllUsers") or []
target_users = self.target_api.call("getAllUsers") or []
existing_usernames = {u["username"] for u in target_users}
temp_password = os.getenv("TEMP_USER_PASSWORD")
for user in source_users:
source_id = int(user["id"])
username = user["username"]
name = user.get("name") or ""
email = user.get("email") or ""
role = user.get("role") or "app-user"
if username in existing_usernames:
print(f"Пользователь '{username}' уже существует, пропускаем")
continue
password = temp_password if temp_password else username
try:
new_id = self.target_api.call("createUser", {
"username": username,
"password": password,
"name": name,
"email": email,
"role": role
})
if new_id:
self.user_mapping[source_id] = new_id
print(f"Пользователь '{username}' ({source_id} -> {new_id}) создан")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе '{username}': {e}")
self._save_mapping(self.user_mapping_file, self.user_mapping)
print(f"Миграция пользователей завершена. Маппинг сохранён в {self.user_mapping_file}")
# === Удаление ===
def delete_all(self, exclude_admin: bool = True, **kwargs):
print("Начало удаления пользователей...")
all_users = self.target_api.call("getAllUsers") or []
for user in all_users:
user_id = int(user["id"])
username = user["username"]
if exclude_admin and username == "admin":
continue
try:
ok = self.target_api.call("removeUser", {"user_id": user_id})
if ok:
print(f"Пользователь '{username}' (ID {user_id}) удалён")
else:
print(f"Пользователь '{username}' (ID {user_id}) не удалён")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении '{username}': {e}")
if hasattr(self, 'user_mapping_file'):
self._delete_mapping_file(self.user_mapping_file)
if hasattr(self, 'user_mapping'):
self.user_mapping.clear()
print("Удаление пользователей завершено.")

167
main.py Executable file
View File

@@ -0,0 +1,167 @@
#!/usr/bin/env -S uv run --with-requirements requirements.txt
import os
import sys
import click
from dotenv import load_dotenv
from lib.kanboard_api import KanboardAPI
from lib.interactive import KanboardInteractive
from lib.users import UsersMigrator
from lib.projects import ProjectsMigrator
from lib.tags import TagsMigrator
from lib.tasks import TasksMigrator
load_dotenv()
MAPPING_FILES = {
"user": "user_mapping.json",
"project": "project_mapping.json",
"column": "column_mapping.json",
"tag": "tag_mapping.json",
}
SCOPES_ORDER = ["users", "projects", "tags", "tasks"]
def get_env_var(name: str) -> str:
value = os.environ.get(name)
if not value:
raise EnvironmentError(f"Не найдена переменная окружения: {name}")
return value
def setup_apis():
try:
source = KanboardAPI(
url=get_env_var("SOURCE_URL"),
user=get_env_var("SOURCE_USER"),
token=get_env_var("SOURCE_TOKEN")
)
target = KanboardAPI(
url=get_env_var("TARGET_URL"),
user=get_env_var("TARGET_USER"),
token=get_env_var("TARGET_TOKEN")
)
return source, target
except EnvironmentError as e:
print(e)
sys.exit(1)
def create_migrator(scope_name, source_api, target_api):
"""Создание одного мигратора для указанной области."""
if scope_name == "users":
return UsersMigrator(source_api, target_api, MAPPING_FILES["user"])
elif scope_name == "projects":
return ProjectsMigrator(source_api, target_api, MAPPING_FILES["project"], MAPPING_FILES["user"], MAPPING_FILES["column"])
elif scope_name == "tags":
return TagsMigrator(source_api, target_api, MAPPING_FILES["tag"], MAPPING_FILES["project"])
elif scope_name == "tasks":
return TasksMigrator(source_api, target_api, MAPPING_FILES["project"], MAPPING_FILES["user"], MAPPING_FILES["tag"], MAPPING_FILES["column"])
else:
raise ValueError(f"Неизвестный scope: {scope_name}")
def create_plan(scope, reverse_order=False):
"""
Создание плана выполнения на основе области.
Args:
scope: Область выполнения (None для полного выполнения)
reverse_order: Если True, используется обратный порядок для полного выполнения
Returns:
Список областей для выполнения
"""
if scope is None:
# Полное выполнение
order = list(reversed(SCOPES_ORDER)) if reverse_order else SCOPES_ORDER
return order
else:
# Выполнение только указанной области
return [scope]
#
# Основные операции
#
def run_interactive(source, target, instance):
if instance not in ["source", "target"]:
print(f"Неизвестный экземпляр: '{instance}'. Должен быть 'source' или 'target'")
sys.exit(1)
api = source if instance == "source" else target
interactive = KanboardInteractive(api)
interactive.run()
def run_migrate(source_api, target_api, scope):
"""Запуск миграции с созданием миграторов по мере выполнения."""
if scope and scope not in SCOPES_ORDER:
click.echo(f"Неизвестный scope: '{scope}'. Доступные: {', '.join(SCOPES_ORDER)}", err=True)
sys.exit(1)
migration_plan = create_plan(scope, reverse_order=False)
for scope_name in migration_plan:
# Создаем мигратор непосредственно перед выполнением
migrator = create_migrator(scope_name, source_api, target_api)
try:
click.echo(f"=== Выполнение миграции: {scope_name} ===")
migrator.migrate()
click.echo(f"=== Миграция {scope_name} завершена ===\n")
except Exception as e:
click.echo(f"Ошибка при миграции {scope_name}: {e}", err=True)
sys.exit(1)
def run_delete(source_api, target_api, scope):
"""Удаление данных из целевого экземпляра с созданием миграторов по мере выполнения."""
if scope and scope not in SCOPES_ORDER:
click.echo(f"Неизвестный scope: '{scope}'. Доступные: {', '.join(SCOPES_ORDER)}", err=True)
sys.exit(1)
deletion_plan = create_plan(scope, reverse_order=True)
for scope_name in deletion_plan:
# Создаем мигратор непосредственно перед выполнением
deleter = create_migrator(scope_name, source_api, target_api)
try:
click.echo(f"=== Выполнение удаления: {scope_name} ===")
deleter.delete_all()
click.echo(f"=== Удаление {scope_name} завершено ===\n")
except Exception as e:
click.echo(f"Ошибка при удалении {scope_name}: {e}", err=True)
sys.exit(1)
@click.group()
def cli():
"""Скрипт для миграции Kanboard. Используйте 'command --help' для деталей."""
@cli.command()
@click.argument('instance',
type=click.Choice(['source', 'target']))
def interactive(instance):
"""Интерактивный режим: укажите source или target."""
source, target = setup_apis()
run_interactive(source, target, instance)
@cli.command()
@click.option('--scope',
type=click.Choice(SCOPES_ORDER),
default=None,
help=f"Scope: {', '.join(SCOPES_ORDER)} (пример: --scope projects)")
def migrate(scope):
"""Миграция данных: укажите --scope или используйте для полного режима."""
source, target = setup_apis()
run_migrate(source, target, scope)
@cli.command()
@click.option('--scope',
type=click.Choice(SCOPES_ORDER),
default=None,
help=f"Scope: {', '.join(SCOPES_ORDER)} (пример: --scope tags)")
def delete(scope):
"""Удаление данных: укажите --scope или используйте для полного режима."""
source, target = setup_apis()
run_delete(source, target, scope)
if __name__ == '__main__':
cli()

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
dotenv
requests
click