from typing import Optional from datetime import datetime from lib.kanboard_api import KanboardAPI from lib.base_migrator import BaseMigrator class TasksMigrator(BaseMigrator): """Миграция задач в Kanboard.""" def __init__( self, source_api: KanboardAPI, target_api: KanboardAPI, project_mapping_file: str = "project_mapping.json", user_mapping_file: str = "user_mapping.json", tag_mapping_file: str = "tag_mapping.json", column_mapping_file: str = "column_mapping.json" ): self.source_api = source_api self.target_api = target_api self.project_mapping_file = project_mapping_file self.user_mapping_file = user_mapping_file self.tag_mapping_file = tag_mapping_file self.column_mapping_file = column_mapping_file self.project_mapping = self._load_mapping(self.project_mapping_file) self.user_mapping = self._load_mapping(self.user_mapping_file) self.tag_mapping = self._load_mapping(self.tag_mapping_file) self.column_mapping = self._load_nested_mapping(self.column_mapping_file) def migrate(self): """Миграция всех задач из исходных проектов в целевые.""" print("Начало миграции задач...") for source_project_id, target_project_id in self.project_mapping.items(): print(f"Миграция задач из проекта {source_project_id} -> {target_project_id}") # Получаем задачи с учетом статуса (1 = активные, 0 = закрытые) source_tasks = self.source_api.call("getAllTasks", [int(source_project_id), 1]) or [] for task in source_tasks: self._migrate_task(task, int(source_project_id), int(target_project_id)) print("Миграция задач завершена.") def _migrate_task(self, task, source_project_id: int, target_project_id: int): """Миграция одной задачи.""" task_id = int(task["id"]) task_title = task["title"] # Получаем маппинг колонки source_column_id = task.get("column_id") target_column_id = None if source_column_id: target_column_id = self.get_column_mapping(source_project_id, source_column_id) if not target_column_id: print(f" Предупреждение: не найден маппинг для колонки {source_column_id} проекта {source_project_id}") params = { "project_id": target_project_id, "title": task_title, "description": task.get("description", ""), "color_id": task.get("color_id", "yellow"), "owner_id": self._safe_map_id(task.get("owner_id"), self.user_mapping), "creator_id": self._safe_map_id(task.get("creator_id"), self.user_mapping), "score": int(task.get("score", 0)), "priority": int(task.get("priority", 0)), "reference": task.get("reference", ""), "time_estimated": int(task.get("time_estimated", 0)), "time_spent": int(task.get("time_spent", 0)), } # Добавляем column_id если нашли маппинг if target_column_id: params["column_id"] = target_column_id # Обрабатываем даты if task.get("date_due") and task["date_due"] != "0": params["date_due"] = self._timestamp_to_kanboard_date(task["date_due"]) if task.get("date_started") and task["date_started"] != "0": params["date_started"] = self._timestamp_to_kanboard_date(task["date_started"]) # Обрабатываем теги задачи task_tags = self._get_task_tags(task_id) if task_tags: params["tags"] = task_tags try: new_task_id = self.target_api.call("createTask", params) if new_task_id: column_info = f" -> колонка {target_column_id}" if target_column_id else "" print(f" Перенесена задача '{task_title}' ({task_id} -> {new_task_id}{column_info})") # Мигрируем файлы задачи после создания задачи self._migrate_task_files(task_id, new_task_id, target_project_id) except Exception as e: print(f"[{type(e).__name__}] Ошибка при переносе задачи '{task_title}': {e}") def _migrate_task_files(self, source_task_id: int, target_task_id: int, target_project_id: int): """Миграция файлов задачи.""" try: files = self.source_api.call("getAllTaskFiles", {"task_id": source_task_id}) or [] except Exception as e: print(f"[{type(e).__name__}] Не удалось получить файлы задачи {source_task_id}: {e}") return if not files: return print(f" Миграция файлов задачи ({len(files)} файлов)...") for file_info in files: file_id = int(file_info.get("id")) filename = file_info.get("name") try: # Скачиваем файл из исходной системы data_b64 = self.source_api.call("downloadTaskFile", [file_id]) if not data_b64: print(f" Пропущен файл '{filename}' — пустой контент") continue # Загружаем файл в целевую систему new_file_id = self.target_api.call( "createTaskFile", [target_project_id, target_task_id, filename, data_b64] ) if new_file_id: print(f" Файл '{filename}' перенесён ({file_id} -> {new_file_id})") else: print(f" Не удалось перенести файл '{filename}'") except Exception as e: print(f"[{type(e).__name__}] Ошибка при переносе файла '{filename}': {e}") def get_column_mapping(self, src_project_id: int, src_column_id: int) -> Optional[int]: """ Получить ID целевой колонки по ID исходной колонки и проекта. Args: src_project_id: ID исходного проекта src_column_id: ID исходной колонки Returns: ID целевой колонки или None если маппинг не найден """ project_mapping = self.column_mapping.get(src_project_id, {}) return project_mapping.get(src_column_id) def _safe_map_id(self, source_id, mapping_dict): """Безопасный маппинг ID с преобразованием типов.""" if not source_id or source_id in ("0", 0): return 0 try: source_id_int = int(source_id) mapped_id = mapping_dict.get(source_id_int) if mapped_id is not None: return mapped_id except (ValueError, TypeError): pass return 0 def _get_task_tags(self, task_id): """Получение тегов задачи - используем оригинальные названия тегов.""" source_tags = self.source_api.call("getTaskTags", [task_id]) or {} if not source_tags: return [] # Просто возвращаем названия тегов из source # Kanboard автоматически создаст теги с такими названиями или свяжет с существующими return list(source_tags.values()) def _timestamp_to_kanboard_date(self, timestamp): """Конвертация timestamp в формат даты Kanboard.""" try: dt = datetime.fromtimestamp(int(timestamp)) return dt.strftime("%Y-%m-%d %H:%M") except (ValueError, TypeError): return "" def delete_all(self, **kwargs): """Удаление всех задач из целевых проектов.""" print("Начало удаления всех задач...") for target_project_id in self.project_mapping.values(): print(f"Удаление задач из проекта {target_project_id}") tasks = self.target_api.call("getAllTasks", [int(target_project_id), 1]) or [] for task in tasks: task_id = int(task["id"]) task_title = task["title"] try: # Сначала удаляем все файлы задачи self._delete_task_files(task_id) # Затем удаляем саму задачу ok = self.target_api.call("removeTask", [task_id]) if ok: print(f" Задача '{task_title}' (ID {task_id}) удалена") except Exception as e: print(f"[{type(e).__name__}] Ошибка при удалении '{task_title}': {e}") print("Удаление задач завершено.") def _delete_task_files(self, task_id: int): """Удаление всех файлов задачи.""" try: files = self.target_api.call("getAllTaskFiles", {"task_id": task_id}) or [] if files: # Используем removeAllTaskFiles для удаления всех файлов сразу ok = self.target_api.call("removeAllTaskFiles", {"task_id": task_id}) if ok: print(f" Удалены все файлы задачи {task_id} ({len(files)} файлов)") else: print(f" Не удалось удалить файлы задачи {task_id}") except Exception as e: print(f"[{type(e).__name__}] Ошибка при удалении файлов задачи {task_id}: {e}")