Files
rag-llm/main.py

567 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Simplified RAG Learning Assistant
Tracks learning progress across 17 subjects and provides AI tutoring guidance.
"""
import os
import json
import hashlib
import asyncio
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Set, Optional
from dataclasses import dataclass
from rich.console import Console
from prompt_toolkit import PromptSession
from prompt_toolkit.styles import Style
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_ollama import OllamaEmbeddings, ChatOllama
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# =========================
# CONFIGURATION
# =========================
console = Console(color_system="standard", force_terminal=True)
session = PromptSession()
style = Style.from_dict({"prompt": "bold #6a0dad"})
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
MD_DIRECTORY = os.getenv("MD_FOLDER", "./notes")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "mxbai-embed-large:latest")
LLM_MODEL = os.getenv("LLM_MODEL", "qwen2.5:7b-instruct-q8_0")
CHROMA_PATH = "./.cache/chroma_db"
KNOWLEDGE_STATE_PATH = "./.cache/knowledge_state.json"
FILE_HASHES_PATH = "./.cache/file_hashes.json"
CHUNK_SIZE = 400
CHUNK_OVERLAP = 50
TOP_K = 6
COLLECTION_NAME = "learning_rag"
MAX_CONTEXT_CHARS = 8000
# =========================
# SUBJECT CONFIGURATION
# =========================
SUBJECTS = {
"computer_science": "Computer Science",
"math": "Математика",
"english": "Английский язык",
"programming": "Основы программирования",
"linux": "Операционные системы Linux",
"windows": "Операционные системы Windows",
"networking": "Сетевые технологии",
"databases": "Базы данных и SQL",
"web": "Веб-технологии",
"cryptography": "Криптография",
"cybersecurity": "Базовые принципы кибербезопасности",
"pentest": "Тестирование на проникновение (Red Team)",
"soc": "SOC и Blue Team",
"devsecops": "DevSecOps",
"tools": "Инструменты и практика",
"certifications": "Сертификации и карьера",
"professional": "Профессиональное развитие"
}
SUBJECT_KEYWORDS = {
"computer_science": ["computer science", "алгоритмы", "data structures", "oop", "структуры данных"],
"math": ["math", "математика", "алгебра", "геометрия", "дискретная", "logic", "логика", "теория чисел"],
"english": ["english", "английский", "vocabulary", "grammar", "перевод", "словарь", "грамматика"],
"programming": ["programming", "python", "код", "code", "разработка", "программирование"],
"linux": ["linux", "kali", "bash", "terminal", "command line", "скрипт", "администрирование"],
"windows": ["windows", "powershell", "администрирование windows"],
"networking": ["network", "сеть", "tcp", "ip", "osi", "маршрутизация", "vlan", "протокол"],
"databases": ["database", "sql", "база данных", "postgresql", "mysql"],
"web": ["web", "html", "css", "javascript", "http", "frontend", "backend"],
"cryptography": ["cryptography", "криптография", "шифрование", "rsa", "aes"],
"cybersecurity": ["cybersecurity", "безопасность", "owasp", "уязвимость", "pentest"],
"pentest": ["pentest", "pentesting", "red team", "тестирование на проникновение"],
"soc": ["soc", "blue team", "security operations", "siem"],
"devsecops": ["devsecops", "ci/cd", "security automation"],
"tools": ["tools", "инструменты", "nmap", "burp", "metasploit", "wireshark"],
"certifications": ["certification", "сертификация", "ceh", "oscp", "cissp"],
"professional": ["github", "portfolio", "linkedin", "блог", "конференция"]
}
SYSTEM_PROMPT = """Ты — наставник-преподаватель по кибербезопасности. Твоя цель — довести ученика с уровня "пользователь ПК" до уровня junior в кибербезопасности.
КУРСОВАЯ СТРУКТУРА (17 модулей):
1) Computer Science: с полного нуля до уровня стандарта мировых вузов
2) Математика: с полного нуля до уровня стандарта мировых вузов
3) Английский язык: с полного нуля до уровня B2
4) Основы программирования: с полного нуля до уровня стандарта мировых вузов
5) Операционные системы Linux: с полного нуля до уровня стандарта мировых вузов
6) Операционные системы Windows: с уровня пользователя до уровня стандарта мировых вузов
7) Сетевые технологии: с полного нуля до уровня стандарта мировых вузов
8) Базы данных и SQL: с полного нуля до уровня стандарта мировых вузов
9) Веб-технологии: с полного нуля до уровня стандарта мировых вузов
10) Криптография: с полного нуля до уровня стандарта мировых вузов
11) Базовые принципы кибербезопасности: с полного нуля до уровня стандарта мировых вузов
12) Тестирование на проникновение (Red Team): с полного нуля до уровня стандарта мировых вузов
13) SOC и Blue Team: с полного нуля до уровня стандарта мировых вузов
14) DevSecOps: с полного нуля до уровня стандарта мировых вузов
15) Инструменты и практика: список тем для изучения, без практических
16) Сертификации и карьера: список тем для изучения, без практических
17) Профессиональное развитие: GitHub портфолио, блог, нетворкинг, конференции
МЕТОДОЛОГИЯ:
- Каждый предмет = числовая прямая от 0 до ∞
- Темы = точки на прямой (например: "цифры" = 0.01, "дроби" = 0.04)
- Без усвоения базы — не переходить дальше
- Адаптация вектора обучения по прогрессу
ФОРМАТ ОТВЕТА:
"В [ПРЕДМЕТ] будем проходить [ТЕМА_1] и [ТЕМА_2].
[Дополнительные инструкции по структуре изучения]"
ПРАВИЛА:
- Проверяй усвоение предыдущих тем
- Не суди по одному слову вне контекста
- Учитывай межпредметные связи
- Корректируй траекторию обучения динамически
- Отвечай всегда на русском языке"""
USER_PROMPT_TEMPLATE = """Текущий прогресс обучения:
{progress}
Контекст из заметок по предмету:
{context}
Вопрос ученика: {question}"""
# =========================
# DATA STRUCTURES
# =========================
@dataclass
class SubjectProgress:
name: str
topics_covered: Set[str]
last_studied: Optional[str]
confidence_level: float = 0.0
def to_dict(self):
return {
"name": self.name,
"topics_covered": list(self.topics_covered),
"last_studied": self.last_studied,
"confidence_level": self.confidence_level
}
@classmethod
def from_dict(cls, data):
return cls(
name=data["name"],
topics_covered=set(data.get("topics_covered", [])),
last_studied=data.get("last_studied"),
confidence_level=data.get("confidence_level", 0.0)
)
@dataclass
class KnowledgeState:
subjects: Dict[str, SubjectProgress]
last_analysis: str
file_hashes: Dict[str, str]
def to_dict(self):
return {
"subjects": {name: subject.to_dict() for name, subject in self.subjects.items()},
"last_analysis": self.last_analysis,
"file_hashes": self.file_hashes
}
@classmethod
def from_dict(cls, data):
subjects = {}
for name, subject_data in data.get("subjects", {}).items():
subjects[name] = SubjectProgress.from_dict(subject_data)
return cls(
subjects=subjects,
last_analysis=data.get("last_analysis", ""),
file_hashes=data.get("file_hashes", {})
)
# =========================
# UTILITY FUNCTIONS
# =========================
def get_file_hash(file_path: str) -> str:
return hashlib.md5(Path(file_path).read_bytes()).hexdigest()
def load_json_cache(file_path: str) -> dict:
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
if Path(file_path).exists():
try:
return json.loads(Path(file_path).read_text())
except json.JSONDecodeError:
return {}
return {}
def save_json_cache(data, file_path: str):
try:
Path(file_path).write_text(json.dumps(data, indent=2, ensure_ascii=False))
except Exception as e:
console.print(f"[red]✗ Failed to save cache {file_path}: {e}[/red]")
# =========================
# SUBJECT DETECTION
# =========================
def detect_subjects_from_query(query: str) -> List[str]:
query_lower = query.lower()
detected = []
for subject, keywords in SUBJECT_KEYWORDS.items():
for keyword in keywords:
if keyword.lower() in query_lower:
if subject not in detected:
detected.append(subject)
break
return detected
def detect_subject_from_content(text: str) -> Optional[str]:
text_lower = text.lower()
subject_scores = {subject: 0 for subject in SUBJECT_KEYWORDS.keys()}
for subject, keywords in SUBJECT_KEYWORDS.items():
for keyword in keywords:
if keyword.lower() in text_lower:
subject_scores[subject] += 1
best_subject = max(subject_scores.items(), key=lambda x: x[1])
return best_subject[0] if best_subject[1] > 0 else None
# =========================
# LEARNING ASSISTANT
# =========================
class LearningAssistant:
def __init__(self):
self.embeddings = OllamaEmbeddings(
model=EMBEDDING_MODEL,
base_url=OLLAMA_BASE_URL
)
self.vectorstore = Chroma(
collection_name=COLLECTION_NAME,
persist_directory=CHROMA_PATH,
embedding_function=self.embeddings
)
self.llm = ChatOllama(
model=LLM_MODEL,
temperature=0.2,
base_url=OLLAMA_BASE_URL
)
self.prompt = ChatPromptTemplate.from_messages([
("system", SYSTEM_PROMPT),
("human", USER_PROMPT_TEMPLATE)
])
self.chain = self.prompt | self.llm | StrOutputParser()
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separators=["\n\n", "\n", ". ", " "]
)
async def initialize(self):
console.print("[bold cyan]🎓 RAG Learning System - Educational Assistant[/bold cyan]")
console.print(f"📂 Notes Directory: {MD_DIRECTORY}")
console.print(f"🧠 Model: {LLM_MODEL}\n")
knowledge_state = await self.load_or_analyze_knowledge()
console.print("[green]✓ System initialized successfully![/green]")
console.print("[dim]💡 Tip: /help[/dim]\n")
return knowledge_state
async def load_or_analyze_knowledge(self) -> KnowledgeState:
file_hashes = self.get_file_hashes()
state_data = load_json_cache(KNOWLEDGE_STATE_PATH)
if state_data:
knowledge_state = KnowledgeState.from_dict(state_data)
if self.have_files_changed(file_hashes, knowledge_state.file_hashes):
console.print("[yellow]📁 Files changed, re-analyzing knowledge...[/yellow]")
knowledge_state = await self.analyze_all_notes(file_hashes)
save_json_cache(knowledge_state.to_dict(), KNOWLEDGE_STATE_PATH)
else:
console.print("[green]✓ Knowledge state up to date[/green]")
else:
console.print("[yellow]📊 First time setup - analyzing all notes...[/yellow]")
knowledge_state = await self.analyze_all_notes(file_hashes)
save_json_cache(knowledge_state.to_dict(), KNOWLEDGE_STATE_PATH)
return knowledge_state
def get_file_hashes(self) -> Dict[str, str]:
file_hashes = {}
for root, _, files in os.walk(MD_DIRECTORY):
for file in files:
if file.endswith(".md"):
file_path = os.path.join(root, file)
try:
file_hashes[file_path] = get_file_hash(file_path)
except Exception as e:
console.print(f"[red]✗ Error reading {file}: {e}[/red]")
return file_hashes
def have_files_changed(self, current: Dict[str, str], cached: Dict[str, str]) -> bool:
if len(current) != len(cached):
return True
for path, hash_val in current.items():
if path not in cached or cached[path] != hash_val:
return True
return False
async def analyze_all_notes(self, file_hashes: Dict[str, str]) -> KnowledgeState:
console.print("[cyan]🔍 Analyzing all notes for learning progress...[/cyan]")
subjects = {
name: SubjectProgress(name=name, topics_covered=set(), last_studied=None)
for name in SUBJECTS.keys()
}
try:
db_data = await asyncio.to_thread(self.vectorstore.get)
if db_data and db_data['documents']:
for text, metadata in zip(db_data['documents'], db_data['metadatas']):
if not metadata or 'source' not in metadata:
continue
subject = detect_subject_from_content(text)
if subject:
subjects[subject].topics_covered.add(text[:100])
file_path = metadata['source']
if file_path in file_hashes:
subjects[subject].last_studied = file_hashes[file_path]
for subject in subjects.values():
subject.confidence_level = min(len(subject.topics_covered) / 10.0, 1.0)
studied_count = len([s for s in subjects.values() if s.topics_covered])
console.print(f"[green]✓ Analysis complete. Found progress in {studied_count} subjects[/green]")
except Exception as e:
console.print(f"[red]✗ Error during analysis: {e}[/red]")
return KnowledgeState(subjects, datetime.now().isoformat(), file_hashes)
async def get_relevant_context(self, subject: str) -> str:
try:
db_data = await asyncio.to_thread(self.vectorstore.get)
if not db_data or not db_data['documents']:
return "Нет доступных заметок для данного предмета."
relevant_docs = []
for text, metadata in zip(db_data['documents'], db_data['metadatas']):
detected_subject = detect_subject_from_content(text)
if detected_subject == subject:
relevant_docs.append({
"text": text,
"source": Path(metadata.get('source', 'unknown')).name
})
context = f"Найдено {len(relevant_docs)} заметок по предмету:\n"
char_count = len(context)
for doc in relevant_docs[:TOP_K]:
doc_text = f"\n---\nИсточник: {doc['source']}\n{doc['text']}\n"
if char_count + len(doc_text) > MAX_CONTEXT_CHARS:
context += "\n[... Контекст обрезан из-за лимита ...]"
break
context += doc_text
char_count += len(doc_text)
if not relevant_docs:
return f"Заметок по предмету '{SUBJECTS.get(subject, subject)}' не найдено."
return context
except Exception as e:
console.print(f"[red]✗ Error getting context: {e}[/red]")
return "Ошибка при получении контекста."
def get_progress_summary(self, knowledge_state: KnowledgeState, subjects: List[str]) -> str:
summary = "Текущий прогресс обучения:\n"
for subject in subjects:
if subject in knowledge_state.subjects:
subj = knowledge_state.subjects[subject]
if subj.topics_covered:
summary += f"- {SUBJECTS[subject]}: {len(subj.topics_covered)} тем, уверенность {subj.confidence_level:.1%}\n"
else:
summary += f"- {SUBJECTS[subject]}: изучение с нуля\n"
return summary
async def process_learning_query(self, query: str, knowledge_state: KnowledgeState) -> str:
subjects = detect_subjects_from_query(query)
if not subjects:
return "Пожалуйста, уточните предмет для изучения (например: 'изучаем английский', 'учим математику')."
responses = []
for subject in subjects:
context = await self.get_relevant_context(subject)
progress = self.get_progress_summary(knowledge_state, [subject])
console.print(f"[blue]🔍 Анализирую прогресс по предмету: {SUBJECTS[subject]}[/blue]")
response = ""
console.print("[bold blue]Ассистент:[/bold blue] ", end="")
async for chunk in self.chain.astream({
"context": context,
"question": query,
"progress": progress
}):
console.print(chunk, end="", style="blue")
response += chunk
console.print("\n")
responses.append(response)
return "\n\n".join(responses) if len(responses) > 1 else responses[0]
async def index_files(self, file_paths: List[str]) -> bool:
all_chunks = []
for file_path in file_paths:
try:
loader = UnstructuredMarkdownLoader(file_path)
documents = loader.load()
if documents:
for doc in documents:
doc.metadata["source"] = file_path
chunks = self.text_splitter.split_documents(documents)
all_chunks.extend(chunks)
except Exception as e:
console.print(f"[red]✗ Error processing {Path(file_path).name}: {e}[/red]")
if not all_chunks:
return False
try:
await asyncio.to_thread(self.vectorstore.reset_collection)
batch_size = 20
for i in range(0, len(all_chunks), batch_size):
batch = all_chunks[i:i + batch_size]
await asyncio.to_thread(self.vectorstore.add_documents, batch)
return True
except Exception as e:
console.print(f"[red]✗ Error indexing documents: {e}[/red]")
return False
# =========================
# MAIN APPLICATION
# =========================
async def main():
Path(MD_DIRECTORY).mkdir(parents=True, exist_ok=True)
assistant = LearningAssistant()
try:
knowledge_state = await assistant.initialize()
while True:
query = await session.prompt_async("> ", style=style)
query = query.strip()
if not query:
continue
if query.lower() in ['/exit', '/quit', 'exit', 'quit', 'выход']:
console.print("\n👋 Goodbye!", style="yellow")
break
if query.lower() in ['/help', 'help', 'помощь']:
await show_help()
continue
if query.lower() in ['/reindex', 'reindex']:
console.print("[yellow]🔄 Переиндексирую все файлы...[/yellow]")
files = [os.path.join(root, f) for root, _, files in os.walk(MD_DIRECTORY)
for f in files if f.endswith(".md")]
if not files:
console.print("[yellow]⚠️ Markdown файлы не найдены[/yellow]")
continue
success = await assistant.index_files(files)
if success:
console.print("[cyan]📊 Анализирую знания...[/cyan]")
knowledge_state = await assistant.analyze_all_notes(
assistant.get_file_hashes()
)
save_json_cache(knowledge_state.to_dict(), KNOWLEDGE_STATE_PATH)
console.print("[green]✓ Индексация завершена![/green]")
else:
console.print("[red]✗ Ошибка индексации[/red]")
continue
await assistant.process_learning_query(query, knowledge_state)
except KeyboardInterrupt:
console.print("\n👋 Goodbye!", style="yellow")
except Exception as e:
console.print(f"[red]✗ Unexpected error: {e}[/red]")
async def show_help():
console.print("\n[bold cyan]🎓 RAG Learning System - Справка[/bold cyan]")
console.print("=" * 60, style="dim")
console.print("\n[bold green]Использование:[/bold green]")
console.print("Просто напишите, что хотите изучать:")
console.print("'изучаем английский'")
console.print("'учим математику и программирование'")
console.print("'давай по сетям'")
console.print("'пора изучать кибербезопасность'\n")
console.print("[bold green]Доступные предметы:[/bold green]")
for key, name in SUBJECTS.items():
console.print(f"{name}")
console.print("\n[bold green]Команды:[/bold green]")
console.print(" • /help или помощь - показать эту справку")
console.print(" • /reindex - переиндексировать все файлы")
console.print(" • exit, quit, выход - выйти из программы")
console.print("\n[bold green]Как работает система:[/bold green]")
console.print("1. Система анализирует все ваши .md файлы при запуске")
console.print("2. Определяет, по каким предметам у вас есть заметки")
console.print("3. Когда вы указываете предмет, находит релевантные заметки")
console.print("4. AI ассистент строит обучение на основе ваших заметок")
console.print("5. Если заметок нет - начинает обучение с нуля\n")
if __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
try:
asyncio.run(main())
except KeyboardInterrupt:
console.print("\n👋 Goodbye!", style="yellow")