#!/usr/bin/env python3 """ RAG Learning System - Simplified Educational Assistant Tracks learning progress across subjects and provides AI tutoring guidance. """ import os import sys import json import hashlib import asyncio import re from pathlib import Path from datetime import datetime from typing import Dict, List, Set, Optional from dataclasses import dataclass, asdict from dotenv import load_dotenv from rich.console import Console from rich.panel import Panel from rich.prompt import Prompt from rich.progress import Progress, SpinnerColumn, TextColumn from prompt_toolkit import PromptSession from prompt_toolkit.styles import Style from langchain_community.document_loaders import UnstructuredMarkdownLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_ollama import OllamaEmbeddings, ChatOllama from langchain_chroma import Chroma from langchain_core.documents import Document from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # ========================= # CONFIGURATION # ========================= console = Console(color_system="standard", force_terminal=True) session = PromptSession() load_dotenv() style = Style.from_dict({"prompt": "bold #6a0dad"}) # Core Configuration OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") ANSWER_COLOR = os.getenv("ANSWER_COLOR", "blue") # Subject-specific keywords for filtering SUBJECT_KEYWORDS = { "english": ["english", "английский", "vocabulary", "grammar", "перевод", "словарь", "грамматика"], "math": ["math", "математика", "алгебра", "геометрия", "calculus", "дискретная", "logic", "логика"], "cs": ["computer science", "алгоритмы", "data structures", "oop", "python", "programming", "код"], "electronics": ["electronics", "электротехника", "circuit", "микроконтроллер", "arduino", "цифровая"], "linux": ["linux", "kali", "bash", "terminal", "command line", "скрипт", "администрирование"], "networking": ["network", "сеть", "tcp", "ip", "osi", "маршрутизация", "vlan", "протокол"], "cybersecurity": ["cybersecurity", "безопасность", "owasp", "уязвимость", "pentest", "hack", "хак"], "sql": ["sql"] } # System Prompt for Educational Assistant SYSTEM_PROMPT = """Ты — наставник-преподаватель по кибербезопасности. Твоя цель — довести ученика с уровня "пользователь ПК" до уровня junior в кибербезопасности. КУРСОВАЯ СТРУКТУРА Модули (6 независимых курсов): 1. Computer Science (фундамент) 2. Математика 3. Основы электротехники 4. Linux + Kali Linux 5. Основы сетей 6. Введение в кибербезопасность 7. Английский язык СТРУКТУРА КАЖДОГО МОДУЛЯ • Цель урока • Темы в хронологическом порядке (от простого к сложному) • Практические задания • Прогресс-бар (по нормам Минобрнауки РФ) • Блок вопросов для самопроверки • Названия тем для поиска в YouTube/статьях ОТСЛЕЖИВАНИЕ ПРОГРЕССА Методология: • Каждый предмет = числовая прямая от 0 до ∞ • Темы = точки на прямой (например: "цифры" = 0.01, "дроби" = 0.04) • Без усвоения базы — не переходить дальше • Адаптация вектора обучения по прогрессу Критерии Junior-уровня: • CS: Алгоритмы, структуры данных, ООП • Математика: Дискретная математика, логика, теория чисел • Электротехника: Цифровая логика, микроконтроллеры • Linux: CLI, bash-скрипты, системное администрирование • Сети: OSI, TCP/IP, маршрутизация, VLAN • Кибербезопасность: OWASP Top 10, базовые уязвимости, инструменты • Английский: Технический английский, терминология РАБОЧИЙ ПРОЦЕСС Ответ пользователю: 1. Определи стартовую точку по заметкам Obsidian 2. Построй фундамент текущего урока 3. Сверяйся с заметками ученика 4. Комбинируй стиль живого наставника и учебника Формат ответа: "В [ПРЕДМЕТ] будем проходить [ТЕМА_1] и [ТЕМА_2]. [Дополнительные инструкции по структуре изучения]" ПРАВИЛА ПРОГРЕССИИ • Проверяй усвоение предыдущих тем • Не суди по одному слову вне контекста • Учитывай межпредметные связи • Корректируй траекторию обучения динамически ПОИСКОВЫЕ ЗАПРОСЫ Формируй темы для поиска в формате: "[ПРЕДМЕТ] [УРОВЕНЬ] [ТЕМА] [ЯЗЫК]" Пример: "Computer Science beginner algorithms Russian" """ USER_PROMPT_TEMPLATE = """Текущий прогресс обучения: {progress} Контекст из заметок: {context} Вопрос ученика: {question}""" # Paths and Models MD_DIRECTORY = os.getenv("MD_FOLDER", "./notes") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "mxbai-embed-large:latest") LLM_MODEL = os.getenv("LLM_MODEL", "qwen2.5:7b-instruct-q8_0") CHROMA_PATH = "./.cache/chroma_db" KNOWLEDGE_STATE_PATH = "./.cache/knowledge_state.json" FILE_HASHES_PATH = "./.cache/file_hashes.json" # Processing Configuration CHUNK_SIZE = 400 CHUNK_OVERLAP = 50 TOP_K = 6 COLLECTION_NAME = "learning_rag" MAX_CONTEXT_CHARS = 8000 # ========================= # DATA STRUCTURES # ========================= @dataclass class SubjectProgress: """Track progress for a specific subject""" name: str topics_covered: Set[str] last_studied: Optional[str] confidence_level: float = 0.0 # 0.0 to 1.0 def to_dict(self): return { "name": self.name, "topics_covered": list(self.topics_covered), "last_studied": self.last_studied, "confidence_level": self.confidence_level } @classmethod def from_dict(cls, data): return cls( name=data["name"], topics_covered=set(data.get("topics_covered", [])), last_studied=data.get("last_studied"), confidence_level=data.get("confidence_level", 0.0) ) @dataclass class KnowledgeState: """Complete learning state across all subjects""" subjects: Dict[str, SubjectProgress] last_analysis: str file_hashes: Dict[str, str] def to_dict(self): return { "subjects": {name: subject.to_dict() for name, subject in self.subjects.items()}, "last_analysis": self.last_analysis, "file_hashes": self.file_hashes } @classmethod def from_dict(cls, data): subjects = {} for name, subject_data in data.get("subjects", {}).items(): subjects[name] = SubjectProgress.from_dict(subject_data) return cls( subjects=subjects, last_analysis=data.get("last_analysis", ""), file_hashes=data.get("file_hashes", {}) ) # ========================= # UTILITY FUNCTIONS # ========================= def get_file_hash(file_path: str) -> str: """Generate MD5 hash for file change detection""" return hashlib.md5(Path(file_path).read_bytes()).hexdigest() def load_json_cache(file_path: str) -> dict: """Load JSON cache with error handling""" Path(file_path).parent.mkdir(parents=True, exist_ok=True) if Path(file_path).exists(): try: return json.loads(Path(file_path).read_text()) except json.JSONDecodeError: console.print(f"[yellow]⚠️ Corrupted cache: {file_path}. Resetting.[/yellow]") return {} return {} def save_json_cache(data, file_path: str): """Save JSON cache with error handling""" try: Path(file_path).write_text(json.dumps(data, indent=2, ensure_ascii=False)) except Exception as e: console.print(f"[red]✗ Failed to save cache {file_path}: {e}[/red]") # ========================= # SUBJECT DETECTION # ========================= def detect_subject_from_query(query: str) -> Optional[str]: """Detect which subject the user wants to study""" query_lower = query.lower() # Check for explicit subject mentions for subject, keywords in SUBJECT_KEYWORDS.items(): for keyword in keywords: if keyword.lower() in query_lower: return subject return None def detect_subject_from_content(text: str) -> Optional[str]: """Detect subject from note content""" text_lower = text.lower() subject_scores = {subject: 0 for subject in SUBJECT_KEYWORDS.keys()} for subject, keywords in SUBJECT_KEYWORDS.items(): for keyword in keywords: if keyword.lower() in text_lower: subject_scores[subject] += 1 # Return subject with highest score, if any matches best_subject = max(subject_scores.items(), key=lambda x: x[1]) return best_subject[0] if best_subject[1] > 0 else None # ========================= # KNOWLEDGE ANALYSIS # ========================= class KnowledgeAnalyzer: """Analyze learning progress from notes""" def __init__(self, vectorstore): self.vectorstore = vectorstore async def analyze_all_notes(self, file_hashes: Dict[str, str]) -> KnowledgeState: """Analyze all notes to build complete knowledge state""" console.print("[cyan]🔍 Analyzing all notes for learning progress...[/cyan]") # Initialize subjects subjects = { name: SubjectProgress(name=name, topics_covered=set(), last_studied=None) for name in SUBJECT_KEYWORDS.keys() } # Get all documents from vectorstore try: db_data = await asyncio.to_thread(self.vectorstore.get) if not db_data or not db_data['documents']: console.print("[yellow]⚠️ No documents found in vectorstore[/yellow]") return KnowledgeState(subjects, datetime.now().isoformat(), file_hashes) # Process each document for text, metadata in zip(db_data['documents'], db_data['metadatas']): if not metadata or 'source' not in metadata: continue # Detect subject subject = detect_subject_from_content(text) if subject: subjects[subject].topics_covered.add(text[:100]) # Use first 100 chars as topic identifier # Update last studied timestamp file_path = metadata['source'] if file_path in file_hashes: subjects[subject].last_studied = file_hashes[file_path] # Calculate confidence levels based on topic coverage for subject in subjects.values(): subject.confidence_level = min(len(subject.topics_covered) / 10.0, 1.0) console.print(f"[green]✓ Analysis complete. Found progress in {len([s for s in subjects.values() if s.topics_covered])} subjects[/green]") except Exception as e: console.print(f"[red]✗ Error during analysis: {e}[/red]") return KnowledgeState(subjects, datetime.now().isoformat(), file_hashes) def get_progress_summary(self, knowledge_state: KnowledgeState, subject: Optional[str] = None) -> str: """Generate human-readable progress summary""" if subject and subject in knowledge_state.subjects: subj = knowledge_state.subjects[subject] return f"Предмет: {subj.name}\n" \ f"Тем изучено: {len(subj.topics_covered)}\n" \ f"Уровень уверенности: {subj.confidence_level:.1%}" # Return all subjects summary summary = "Текущий прогресс обучения:\n" for subj in knowledge_state.subjects.values(): if subj.topics_covered: summary += f"- {subj.name}: {len(subj.topics_covered)} тем, уверенность {subj.confidence_level:.1%}\n" return summary # ========================= # DOCUMENT PROCESSING # ========================= class DocumentProcessor: """Process markdown documents for the learning system""" def __init__(self, vectorstore): self.vectorstore = vectorstore self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separators=["\n\n", "\n", ". ", " "] ) async def process_file(self, file_path: str) -> List[Document]: """Process a single markdown file""" try: loader = UnstructuredMarkdownLoader(file_path) documents = loader.load() if not documents: return [] # Add source metadata for doc in documents: doc.metadata["source"] = file_path # Split into chunks chunks = self.text_splitter.split_documents(documents) return chunks except Exception as e: console.print(f"[red]✗ Error processing {Path(file_path).name}: {e}[/red]") return [] async def index_files(self, file_paths: List[str]) -> bool: """Index multiple files with batching""" all_chunks = [] for file_path in file_paths: chunks = await self.process_file(file_path) all_chunks.extend(chunks) if not all_chunks: return False batch_size = 20 total_batches = (len(all_chunks) + batch_size - 1) // batch_size try: await asyncio.to_thread(self.vectorstore.reset_collection) for i in range(0, len(all_chunks), batch_size): batch = all_chunks[i:i + batch_size] await asyncio.to_thread(self.vectorstore.add_documents, batch) console.print(f" [dim]Пакет {i//batch_size + 1}/{total_batches} проиндексирован[/dim]") return True except Exception as e: console.print(f"[red]✗ Error indexing documents: {e}[/red]") return False # ========================= # LEARNING ASSISTANT # ========================= class LearningAssistant: """Main learning assistant class""" def __init__(self): self.embeddings = OllamaEmbeddings( model=EMBEDDING_MODEL, base_url=OLLAMA_BASE_URL ) self.vectorstore = Chroma( collection_name=COLLECTION_NAME, persist_directory=CHROMA_PATH, embedding_function=self.embeddings ) self.llm = ChatOllama( model=LLM_MODEL, temperature=0.2, base_url=OLLAMA_BASE_URL ) self.prompt = ChatPromptTemplate.from_messages([ ("system", SYSTEM_PROMPT), ("human", USER_PROMPT_TEMPLATE) ]) self.chain = self.prompt | self.llm | StrOutputParser() self.processor = DocumentProcessor(self.vectorstore) self.analyzer = KnowledgeAnalyzer(self.vectorstore) async def initialize(self): """Initialize the learning system""" console.print(Panel.fit( "[bold cyan]🎓 RAG Learning System - Educational Assistant[/bold cyan]\n" "📂 Notes Directory: {}\n" "🧠 Model: {}\n" "[dim]Analyzing your learning progress...[/dim]".format( MD_DIRECTORY, LLM_MODEL ), border_style="cyan" )) # Load or create knowledge state knowledge_state = await self.load_or_analyze_knowledge() console.print("[green]✓ System initialized successfully![/green]") console.print("[dim]💡 Tip: /help[/dim]\n") return knowledge_state async def load_or_analyze_knowledge(self) -> KnowledgeState: """Load existing knowledge state or analyze all notes""" # Load file hashes file_hashes = self.get_file_hashes() # Load knowledge state state_data = load_json_cache(KNOWLEDGE_STATE_PATH) if state_data: knowledge_state = KnowledgeState.from_dict(state_data) # Check if files have changed if self.have_files_changed(file_hashes, knowledge_state.file_hashes): console.print("[yellow]📁 Files changed, re-analyzing knowledge...[/yellow]") knowledge_state = await self.analyzer.analyze_all_notes(file_hashes) save_json_cache(knowledge_state.to_dict(), KNOWLEDGE_STATE_PATH) else: console.print("[green]✓ Knowledge state up to date[/green]") else: console.print("[yellow]📊 First time setup - analyzing all notes...[/yellow]") knowledge_state = await self.analyzer.analyze_all_notes(file_hashes) save_json_cache(knowledge_state.to_dict(), KNOWLEDGE_STATE_PATH) return knowledge_state def get_file_hashes(self) -> Dict[str, str]: """Get hashes for all markdown files""" file_hashes = {} for root, _, files in os.walk(MD_DIRECTORY): for file in files: if file.endswith(".md"): file_path = os.path.join(root, file) try: file_hashes[file_path] = get_file_hash(file_path) except Exception as e: console.print(f"[red]✗ Error reading {file}: {e}[/red]") return file_hashes def have_files_changed(self, current_hashes: Dict[str, str], cached_hashes: Dict[str, str]) -> bool: """Check if any files have changed""" if len(current_hashes) != len(cached_hashes): return True for file_path, current_hash in current_hashes.items(): if file_path not in cached_hashes or cached_hashes[file_path] != current_hash: return True return False async def get_relevant_context(self, subject: str, knowledge_state: KnowledgeState) -> str: """Get context relevant to the specified subject""" try: # Get all documents and filter by subject db_data = await asyncio.to_thread(self.vectorstore.get) if not db_data or not db_data['documents']: return "Нет доступных заметок для данного предмета." relevant_docs = [] for text, metadata in zip(db_data['documents'], db_data['metadatas']): detected_subject = detect_subject_from_content(text) if detected_subject == subject: relevant_docs.append({ "text": text, "source": Path(metadata.get('source', 'unknown')).name }) # Build context string context = f"Найдено {len(relevant_docs)} заметок по предмету:\n" char_count = len(context) for doc in relevant_docs[:TOP_K]: # Limit to top K documents doc_text = f"\n---\nИсточник: {doc['source']}\n{doc['text']}\n" if char_count + len(doc_text) > MAX_CONTEXT_CHARS: context += "\n[... Контекст обрезан из-за лимита ...]" break context += doc_text char_count += len(doc_text) if not relevant_docs: return f"Заметок по предмету '{subject}' не найдено." return context except Exception as e: console.print(f"[red]✗ Error getting context: {e}[/red]") return "Ошибка при получении контекста." async def process_learning_query(self, query: str, knowledge_state: KnowledgeState) -> str: """Process a learning query""" # Detect subject from query subject = detect_subject_from_query(query) if not subject: # Try to infer from broader context or ask for clarification return "Пожалуйста, уточните предмет для изучения (например: 'изучаем английский', 'учим математику')." # Get relevant context context = await self.get_relevant_context(subject, knowledge_state) # Get progress summary progress = self.analyzer.get_progress_summary(knowledge_state, subject) # Generate response console.print(f"[blue]🔍 Анализирую прогресс по предмету: {subject}[/blue]") console.print(f"[dim]Контекст: {len(context)} символов[/dim]\n") response = "" console.print("[bold blue]Ассистент:[/bold blue] ", end="") async for chunk in self.chain.astream({ "context": context, "question": query, "progress": progress }): console.print(chunk, end="", style=ANSWER_COLOR) response += chunk console.print("\n") return response # ========================= # MAIN APPLICATION # ========================= async def main(): """Main application entry point""" # Setup directories Path(MD_DIRECTORY).mkdir(parents=True, exist_ok=True) assistant = LearningAssistant() try: # Initialize system knowledge_state = await assistant.initialize() # Main interaction loop while True: # Get user input query = await session.prompt_async("> ", style=style) query = query.strip() if not query: continue # Handle exit commands if query.lower() in ['/exit', '/quit', 'exit', 'quit', 'выход']: console.print("\n👋 До свидания! Удачи в обучении!", style="yellow") break # Handle help if query.lower() in ['/help', 'help', 'помощь']: await show_help() continue # Handle reindex command if query.lower() in ['/reindex', 'reindex']: console.print("[yellow]🔄 Переиндексирую все файлы...[/yellow]") files = [os.path.join(root, f) for root, _, files in os.walk(MD_DIRECTORY) for f in files if f.endswith(".md")] if not files: console.print("[yellow]⚠️ Markdown файлы не найдены[/yellow]") continue # Вызовите index_files напрямую — он сам напечатает прогресс success = await assistant.processor.index_files(files) if success: console.print("[cyan]📊 Анализирую знания...[/cyan]") knowledge_state = await assistant.analyzer.analyze_all_notes( assistant.get_file_hashes() ) save_json_cache(knowledge_state.to_dict(), KNOWLEDGE_STATE_PATH) console.print("[green]✓ Индексация завершена![/green]") else: console.print("[red]✗ Ошибка индексации[/red]") continue # Process learning query await assistant.process_learning_query(query, knowledge_state) except KeyboardInterrupt: console.print("\n👋 До свидания! Удачи в обучении!", style="yellow") except Exception as e: console.print(f"[red]✗ Unexpected error: {e}[/red]") console.print_exception() async def show_help(): """Display help information""" console.print("\n[bold cyan]🎓 RAG Learning System - Справка[/bold cyan]") console.print("=" * 60, style="dim") console.print("\n[bold green]Использование:[/bold green]") console.print("Просто напишите, что хотите изучать:") console.print(" • 'изучаем английский'") console.print(" • 'учим математику'") console.print(" • 'погнали по сетям'") console.print(" • 'давай python'\n") console.print("[bold green]Доступные предметы:[/bold green]") for subject, keywords in SUBJECT_KEYWORDS.items(): console.print(f" • {subject}: {', '.join(keywords[:3])}...") console.print("\n[bold green]Команды:[/bold green]") console.print(" • /help или помощь - показать эту справку") console.print(" • /reindex - переиндексировать все файлы") console.print(" • exit, quit, выход - выйти из программы") console.print("\n[bold green]Как работает система:[/bold green]") console.print("1. Система анализирует все ваши .md файлы при запуске") console.print("2. Определяет, по каким предметам у вас есть заметки") console.print("3. Когда вы указываете предмет, находит релевантные заметки") console.print("4. AI ассистент строит обучение на основе ваших заметок") console.print("5. Если заметок нет - начинает обучение с нуля\n") if __name__ == "__main__": import nest_asyncio nest_asyncio.apply() try: asyncio.run(main()) except KeyboardInterrupt: console.print("\n👋 До свидания! Удачи в обучении!", style="yellow") sys.exit(0) except Exception as e: console.print(f"[red]✗ Unexpected error: {e}[/red]") sys.exit(1)