Files
kanboard-api/lib/projects.py
2025-10-18 00:52:08 +03:00

409 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Any, Dict, Optional, List
from lib.kanboard_api import KanboardAPI
from lib.base_migrator import BaseMigrator
class ProjectsMigrator(BaseMigrator):
"""Миграция и удаление проектов в Kanboard."""
def __init__(
self,
source_api: KanboardAPI,
target_api: KanboardAPI,
project_mapping_file: str = "project_mapping.json",
user_mapping_file: str = "user_mapping.json",
column_mapping_file: str = "column_mapping.json"
):
self.source_api = source_api
self.target_api = target_api
self.project_mapping_file = project_mapping_file
self.user_mapping_file = user_mapping_file
self.column_mapping_file = column_mapping_file
self.project_mapping = self._load_mapping(self.project_mapping_file)
self.user_mapping = self._load_mapping(self.user_mapping_file)
self.column_mapping = self._load_nested_mapping(self.column_mapping_file)
# === Helpers ===
def _create_or_find_project(self, src_proj: Dict[str, Any]) -> Optional[int]:
name = src_proj.get("name") or ""
identifier = src_proj.get("identifier") or ""
description = src_proj.get("description") or ""
owner_id = src_proj.get("owner_id")
email = src_proj.get("email")
target_proj = None
found = False
if identifier:
try:
target_proj = self.target_api.call("getProjectByIdentifier", {"identifier": identifier})
except Exception:
target_proj = None
if not target_proj and name:
try:
target_proj = self.target_api.call("getProjectByName", {"name": name})
except Exception:
target_proj = None
if target_proj:
try:
found = True
return int(target_proj["id"]), found
except Exception:
return None, found
create_params: Dict[str, Any] = {"name": name}
if description is not None:
create_params["description"] = description
if identifier:
create_params["identifier"] = identifier
if email:
create_params["email"] = email
if owner_id:
try:
mapped_owner = self.user_mapping.get(int(owner_id))
if mapped_owner:
create_params["owner_id"] = mapped_owner
except Exception:
pass
try:
new_id = self.target_api.call("createProject", create_params)
if new_id:
return int(new_id), False
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при createProject '{name}': {e}")
return None, found
def _migrate_columns(self, src_project_id: int, target_project_id: int):
"""Миграция колонок - создаем такие же как в source и сохраняем маппинг."""
try:
src_columns = self.source_api.call("getColumns", [src_project_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить колонки {src_project_id}: {e}")
return
try:
target_columns = self.target_api.call("getColumns", [target_project_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить колонки target-проекта {target_project_id}: {e}")
target_columns = []
# Инициализируем маппинг колонок для этого проекта
if src_project_id not in self.column_mapping:
self.column_mapping[src_project_id] = {}
project_column_mapping = self.column_mapping[src_project_id]
created_columns: List[int] = []
existing_titles = {col.get("title", "").lower(): col for col in target_columns}
for src_col in sorted(src_columns, key=lambda c: int(c.get("position", 0))):
src_col_id = int(src_col.get("id"))
title = src_col.get("title", "")
task_limit = None
try:
tl = int(src_col.get("task_limit", 0))
if tl > 0:
task_limit = tl
except Exception:
pass
description = src_col.get("description")
# Проверяем, есть ли колонка с таким названием
if title.lower() in existing_titles:
# Колонка уже существует, находим ее ID и сохраняем маппинг
target_col = existing_titles[title.lower()]
target_col_id = int(target_col.get("id"))
project_column_mapping[src_col_id] = target_col_id
created_columns.append(target_col_id)
print(f"Колонка '{title}' уже существует ({src_col_id} -> {target_col_id})")
continue
# Создаем новую колонку
params = {"project_id": target_project_id, "title": title}
if task_limit is not None:
params["task_limit"] = task_limit
if description:
params["description"] = description
try:
new_col_id = self.target_api.call("addColumn", params)
if new_col_id:
new_col_id_int = int(new_col_id)
created_columns.append(new_col_id_int)
# Сохраняем маппинг новой колонки
project_column_mapping[src_col_id] = new_col_id_int
print(f"Добавлена колонка '{title}' ({src_col_id} -> {new_col_id_int})")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при addColumn '{title}': {e}")
# Обновляем позиции колонок
for position, col_id in enumerate(created_columns, start=1):
try:
ok = self.target_api.call("changeColumnPosition", [target_project_id, col_id, position])
if not ok:
print(f"Не удалось установить позицию {position} для колонки {col_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при changeColumnPosition {col_id}: {e}")
# Сохраняем маппинг колонок после обработки всех колонок проекта
self._save_nested_mapping(self.column_mapping_file, self.column_mapping)
def _migrate_columns(self, src_project_id: int, target_project_id: int):
"""Миграция колонок - создаем такие же как в source и сохраняем маппинг."""
try:
src_columns = self.source_api.call("getColumns", [src_project_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить колонки {src_project_id}: {e}")
return
try:
target_columns = self.target_api.call("getColumns", [target_project_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить колонки target-проекта {target_project_id}: {e}")
target_columns = []
# Инициализируем маппинг колонок для этого проекта
if str(src_project_id) not in self.column_mapping:
self.column_mapping[str(src_project_id)] = {}
project_column_mapping = self.column_mapping[str(src_project_id)]
created_columns: List[int] = []
existing_titles = {col.get("title", "").lower(): col for col in target_columns}
for src_col in sorted(src_columns, key=lambda c: int(c.get("position", 0))):
src_col_id = src_col.get("id")
title = src_col.get("title", "")
task_limit = None
try:
tl = int(src_col.get("task_limit", 0))
if tl > 0:
task_limit = tl
except Exception:
pass
description = src_col.get("description")
# Проверяем, есть ли колонка с таким названием
if title.lower() in existing_titles:
# Колонка уже существует, находим ее ID и сохраняем маппинг
target_col = existing_titles[title.lower()]
target_col_id = target_col.get("id")
project_column_mapping[str(src_col_id)] = target_col_id
created_columns.append(int(target_col_id))
print(f"Колонка '{title}' уже существует ({src_col_id} -> {target_col_id})")
continue
# Создаем новую колонку
params = {"project_id": target_project_id, "title": title}
if task_limit is not None:
params["task_limit"] = task_limit
if description:
params["description"] = description
try:
new_col_id = self.target_api.call("addColumn", params)
if new_col_id:
new_col_id_int = int(new_col_id)
created_columns.append(new_col_id_int)
# Сохраняем маппинг новой колонки
project_column_mapping[str(src_col_id)] = new_col_id_int
print(f"Добавлена колонка '{title}' ({src_col_id} -> {new_col_id_int})")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при addColumn '{title}': {e}")
# Обновляем позиции колонок
for position, col_id in enumerate(created_columns, start=1):
try:
ok = self.target_api.call("changeColumnPosition", [target_project_id, col_id, position])
if not ok:
print(f"Не удалось установить позицию {position} для колонки {col_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при changeColumnPosition {col_id}: {e}")
# Сохраняем маппинг колонок после обработки всех колонок проекта
self._save_mapping(self.column_mapping_file, self.column_mapping)
def _migrate_project_users(self, src_project_id: int, target_project_id: int):
try:
src_members = self.source_api.call("getProjectUsers", [src_project_id]) or {}
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить пользователей проекта {src_project_id}: {e}")
src_members = {}
for src_uid_str, username in list(src_members.items()):
try:
src_uid = int(src_uid_str)
except Exception:
continue
try:
role = self.source_api.call("getProjectUserRole", [src_project_id, src_uid])
except Exception:
role = None
target_uid = self.user_mapping.get(src_uid)
if not target_uid:
print(f" Пропускаем пользователя '{username}' ({src_uid}) — нет в user_mapping")
continue
try:
ok = self.target_api.call(
"addProjectUser",
[target_project_id, target_uid, role] if role else [target_project_id, target_uid]
)
if ok:
print(f" Добавлен пользователь '{username}' -> target_id {target_uid} роль='{role}'")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при addProjectUser '{username}': {e}")
def _migrate_project_files(self, src_project_id: int, target_project_id: int):
"""Миграция файлов проекта."""
try:
files = self.source_api.call("getAllProjectFiles", {"project_id": src_project_id}) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить файлы проекта {src_project_id}: {e}")
return
for f in files:
file_id = int(f.get("id"))
filename = f.get("name")
try:
data_b64 = self.source_api.call("downloadProjectFile", [src_project_id, file_id])
if not data_b64:
print(f" Пропущен файл '{filename}' — пустой контент")
continue
new_id = self.target_api.call("createProjectFile", [target_project_id, filename, data_b64])
if new_id:
print(f" Файл '{filename}' перенесён ({file_id} -> {new_id})")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}")
def _apply_project_settings(self, src_project_id: int, target_project_id: int):
try:
src_info = self.source_api.call("getProjectById", {"project_id": src_project_id}) or {}
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить info проекта {src_project_id}: {e}")
return
try:
if str(src_info.get("is_active", "1")) == "1":
self.target_api.call("enableProject", [target_project_id])
else:
self.target_api.call("disableProject", [target_project_id])
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при установке is_active {target_project_id}: {e}")
try:
if str(src_info.get("is_public", "0")) == "1":
self.target_api.call("enableProjectPublicAccess", [target_project_id])
else:
self.target_api.call("disableProjectPublicAccess", [target_project_id])
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при установке public access {target_project_id}: {e}")
updatable = {}
for fld in (
"start_date", "end_date", "priority_default",
"priority_start", "priority_end", "email",
"identifier", "description", "name"
):
if fld in src_info and src_info.get(fld) is not None:
updatable[fld] = src_info.get(fld)
if updatable:
updatable["project_id"] = target_project_id
try:
ok = self.target_api.call("updateProject", updatable)
if not ok:
print(f" Не удалось применить дополнительные настройки для проекта {target_project_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при updateProject {target_project_id}: {e}")
def _delete_project_files(self, project_id: int):
"""Удаление всех файлов проекта перед удалением."""
try:
files = self.target_api.call("getAllProjectFiles", {"project_id": project_id}) or []
if files:
self.target_api.call("removeAllProjectFiles", {"project_id": project_id})
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении файлов проекта {project_id}: {e}")
# === Public API ===
def migrate(self):
print("Начало миграции проектов...")
try:
source_projects = self.source_api.call("getAllProjects") or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить список проектов: {e}")
source_projects = []
for proj in source_projects:
try:
src_id = int(proj.get("id"))
except Exception:
continue
target_id, found = self._create_or_find_project(proj)
if not target_id:
print(f"Не удалось создать или найти проект '{proj.get('name')}' ({src_id})")
continue
self.project_mapping[src_id] = target_id
if found:
print(f"Проект '{proj.get('name')}' уже существует ({src_id} -> {target_id})")
else:
print(f"Проект '{proj.get('name')}' создан ({src_id} -> {target_id})")
self._migrate_columns(src_id, target_id)
self._migrate_project_users(src_id, target_id)
self._migrate_project_files(src_id, target_id)
self._apply_project_settings(src_id, target_id)
self._save_mapping(self.project_mapping_file, self.project_mapping)
self._save_mapping(self.column_mapping_file, self.column_mapping)
print(f"Миграция проектов завершена.")
print(f"Маппинг проектов сохранён в {self.project_mapping_file}")
print(f"Маппинг колонок сохранён в {self.column_mapping_file}")
def delete_all(self, exclude_project_ids: Optional[List[int]] = None, **kwargs):
"""
Удаление всех проектов.
Args:
exclude_project_ids: Список ID проектов, которые нужно исключить из удаления
"""
exclude_project_ids = set(exclude_project_ids or [])
print("Начало удаления проектов...")
try:
all_projects = self.target_api.call("getAllProjects") or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить проекты target: {e}")
return
for p in all_projects:
try:
pid = int(p.get("id"))
except Exception:
continue
if pid in exclude_project_ids:
continue
try:
self._delete_project_files(pid)
ok = self.target_api.call("removeProject", {"project_id": pid})
if ok:
print(f"Проект '{p.get('name')}' (ID {pid}) удалён")
else:
print(f"Проект '{p.get('name')}' (ID {pid}) не удалён")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении проекта {pid}: {e}")
if hasattr(self, 'project_mapping_file'):
self._delete_mapping_file(self.project_mapping_file)
if hasattr(self, 'column_mapping_file'):
self._delete_mapping_file(self.column_mapping_file)
if hasattr(self, 'project_mapping_file'):
self._delete_mapping_file(self.project_mapping_file)
if hasattr(self, 'project_mapping'):
self.project_mapping.clear()
print("Удаление проектов завершено.")