Files
kanboard-api/lib/tasks.py
2025-10-18 00:52:08 +03:00

275 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Optional
from datetime import datetime
from lib.kanboard_api import KanboardAPI
from lib.base_migrator import BaseMigrator
class TasksMigrator(BaseMigrator):
"""Миграция задач в Kanboard."""
def __init__(
self,
source_api: KanboardAPI,
target_api: KanboardAPI,
project_mapping_file: str = "project_mapping.json",
user_mapping_file: str = "user_mapping.json",
tag_mapping_file: str = "tag_mapping.json",
column_mapping_file: str = "column_mapping.json"
):
self.source_api = source_api
self.target_api = target_api
self.project_mapping_file = project_mapping_file
self.user_mapping_file = user_mapping_file
self.tag_mapping_file = tag_mapping_file
self.column_mapping_file = column_mapping_file
self.project_mapping = self._load_mapping(self.project_mapping_file)
self.user_mapping = self._load_mapping(self.user_mapping_file)
self.tag_mapping = self._load_mapping(self.tag_mapping_file)
self.column_mapping = self._load_nested_mapping(self.column_mapping_file)
def migrate(self):
"""Миграция всех задач из исходных проектов в целевые."""
print("Начало миграции задач...")
for source_project_id, target_project_id in self.project_mapping.items():
print(f"Миграция задач из проекта {source_project_id} -> {target_project_id}")
# Получаем задачи с учетом статуса (1 = активные, 0 = закрытые)
source_tasks = self.source_api.call("getAllTasks", [int(source_project_id), 1]) or []
for task in source_tasks:
self._migrate_task(task, int(source_project_id), int(target_project_id))
print("Миграция задач завершена.")
def _migrate_task(self, task, source_project_id: int, target_project_id: int):
"""Миграция одной задачи."""
task_id = int(task["id"])
task_title = task["title"]
# Получаем маппинг колонки
source_column_id = task.get("column_id")
target_column_id = None
if source_column_id:
target_column_id = self.get_column_mapping(source_project_id, source_column_id)
if not target_column_id:
print(f" Предупреждение: не найден маппинг для колонки {source_column_id} проекта {source_project_id}")
params = {
"project_id": target_project_id,
"title": task_title,
"description": task.get("description", ""),
"color_id": task.get("color_id", "yellow"),
"owner_id": self._safe_map_id(task.get("owner_id"), self.user_mapping),
"creator_id": self._safe_map_id(task.get("creator_id"), self.user_mapping),
"score": int(task.get("score", 0)),
"priority": int(task.get("priority", 0)),
"reference": task.get("reference", ""),
"time_estimated": int(task.get("time_estimated", 0)),
"time_spent": int(task.get("time_spent", 0)),
}
# Добавляем column_id если нашли маппинг
if target_column_id:
params["column_id"] = target_column_id
# Обрабатываем даты
if task.get("date_due") and task["date_due"] != "0":
params["date_due"] = self._timestamp_to_kanboard_date(task["date_due"])
if task.get("date_started") and task["date_started"] != "0":
params["date_started"] = self._timestamp_to_kanboard_date(task["date_started"])
# Обрабатываем теги задачи
task_tags = self._get_task_tags(task_id)
if task_tags:
params["tags"] = task_tags
try:
new_task_id = self.target_api.call("createTask", params)
if new_task_id:
column_info = f" -> колонка {target_column_id}" if target_column_id else ""
print(f" Перенесена задача '{task_title}' ({task_id} -> {new_task_id}{column_info})")
# Мигрируем дополнительные данные задачи
self._migrate_task_metadata(task_id, new_task_id)
self._migrate_subtasks(task_id, new_task_id)
self._migrate_task_files(task_id, new_task_id, target_project_id)
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе задачи '{task_title}': {e}")
def _migrate_task_metadata(self, source_task_id: int, target_task_id: int):
"""Миграция метаданных задачи."""
try:
metadata = self.source_api.call("getTaskMetadata", [source_task_id]) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить метаданные задачи {source_task_id}: {e}")
return
if not metadata:
return
# Метаданные возвращаются как список словарей, берем первый
if isinstance(metadata, list) and len(metadata) > 0:
metadata_dict = metadata[0]
else:
metadata_dict = metadata
if not isinstance(metadata_dict, dict) or not metadata_dict:
return
try:
ok = self.target_api.call("saveTaskMetadata", [target_task_id, metadata_dict])
if ok:
print(f" Метаданные задачи перенесены ({len(metadata_dict)} ключей)")
else:
print(f" Не удалось сохранить метаданные задачи {target_task_id}")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при сохранении метаданных задачи {target_task_id}: {e}")
def _migrate_subtasks(self, source_task_id: int, target_task_id: int):
"""Миграция подзадач."""
try:
subtasks = self.source_api.call("getAllSubtasks", {"task_id": source_task_id}) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить подзадачи {source_task_id}: {e}")
return
if not subtasks:
return
print(f" Миграция подзадач ({len(subtasks)} подзадач)...")
for subtask in subtasks:
params = {
"task_id": target_task_id,
"title": subtask.get("title", ""),
"user_id": self._safe_map_id(subtask.get("user_id"), self.user_mapping),
"time_estimated": int(subtask.get("time_estimated", 0)),
"time_spent": int(subtask.get("time_spent", 0)),
"status": int(subtask.get("status", 0)),
}
try:
new_subtask_id = self.target_api.call("createSubtask", params)
if new_subtask_id:
print(f" Подзадача '{subtask.get('title')}' перенесена ({subtask.get('id')} -> {new_subtask_id})")
else:
print(f" Не удалось создать подзадачу '{subtask.get('title')}'")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе подзадачи: {e}")
def _migrate_task_files(self, source_task_id: int, target_task_id: int, target_project_id: int):
"""Миграция файлов задачи."""
try:
files = self.source_api.call("getAllTaskFiles", {"task_id": source_task_id}) or []
except Exception as e:
print(f"[{type(e).__name__}] Не удалось получить файлы задачи {source_task_id}: {e}")
return
if not files:
return
print(f" Миграция файлов задачи ({len(files)} файлов)...")
for file_info in files:
file_id = int(file_info.get("id"))
filename = file_info.get("name")
try:
# Скачиваем файл из исходной системы
data_b64 = self.source_api.call("downloadTaskFile", [file_id])
if not data_b64:
print(f" Пропущен файл '{filename}' — пустой контент")
continue
# Загружаем файл в целевую систему
new_file_id = self.target_api.call(
"createTaskFile",
[target_project_id, target_task_id, filename, data_b64]
)
if new_file_id:
print(f" Файл '{filename}' перенесён ({file_id} -> {new_file_id})")
else:
print(f" Не удалось перенести файл '{filename}'")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}")
def get_column_mapping(self, src_project_id: int, src_column_id: int) -> Optional[int]:
"""
Получить ID целевой колонки по ID исходной колонки и проекта.
Args:
src_project_id: ID исходного проекта
src_column_id: ID исходной колонки
Returns:
ID целевой колонки или None если маппинг не найден
"""
project_mapping = self.column_mapping.get(src_project_id, {})
return project_mapping.get(src_column_id)
def _safe_map_id(self, source_id, mapping_dict):
"""Безопасный маппинг ID с преобразованием типов."""
if not source_id or source_id in ("0", 0):
return 0
try:
source_id_int = int(source_id)
mapped_id = mapping_dict.get(source_id_int)
if mapped_id is not None:
return mapped_id
except (ValueError, TypeError):
pass
return 0
def _get_task_tags(self, task_id):
"""Получение тегов задачи - используем оригинальные названия тегов."""
source_tags = self.source_api.call("getTaskTags", [task_id]) or {}
if not source_tags:
return []
# Просто возвращаем названия тегов из source
# Kanboard автоматически создаст теги с такими названиями или свяжет с существующими
return list(source_tags.values())
def _timestamp_to_kanboard_date(self, timestamp):
"""Конвертация timestamp в формат даты Kanboard."""
try:
dt = datetime.fromtimestamp(int(timestamp))
return dt.strftime("%Y-%m-%d %H:%M")
except (ValueError, TypeError):
return ""
def delete_all(self, **kwargs):
"""Удаление всех задач из целевых проектов."""
print("Начало удаления всех задач...")
for target_project_id in self.project_mapping.values():
print(f"Удаление задач из проекта {target_project_id}")
tasks = self.target_api.call("getAllTasks", [int(target_project_id), 1]) or []
for task in tasks:
task_id = int(task["id"])
task_title = task["title"]
try:
# Затем удаляем саму задачу
ok = self.target_api.call("removeTask", [task_id])
if ok:
print(f" Задача '{task_title}' (ID {task_id}) удалена")
except Exception as e:
print(f"[{type(e).__name__}] Ошибка при удалении '{task_title}': {e}")
print("Удаление задач завершено.")