import os import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from tensorflow import keras from transformers import BertTokenizer, BertModel import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import networkx as nx import psutil from tqdm import tqdm import time import json import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class WordDatabase: """ Manages the word database and provides methods for accessing and updating words. """ def __init__(self, database_path=None): self.words = {} self.relationships = {} self.domain_mappings = {} self.database_path = database_path if database_path and os.path.exists(database_path): self.load_database(database_path) def load_database(self, path): """Load existing word database from disk""" logger.info(f"Loading word database from {path}") try: with open(path, 'r', encoding='utf-8') as f: data = json.load(f) self.words = data.get('words', {}) self.relationships = data.get('relationships', {}) self.domain_mappings = data.get('domain_mappings', {}) logger.info(f"Successfully loaded {len(self.words)} words") except Exception as e: logger.error(f"Error loading database: {e}") # Initialize with empty database self.words = {} self.relationships = {} self.domain_mappings = {} def save_database(self, path=None): """Save current database to disk""" save_path = path or self.database_path if not save_path: logger.warning("No database path specified for saving") return False logger.info(f"Saving database with {len(self.words)} words to {save_path}") try: data = { 'words': self.words, 'relationships': self.relationships, 'domain_mappings': self.domain_mappings } with open(save_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"Error saving database: {e}") return False def add_word(self, word, definition=None, domain=None, metadata=None): """Add a new word to the database or update an existing one""" if word not in self.words: self.words[word] = { 'definition': definition or '', 'domains': [domain] if domain else [], 'usage_count': 1, 'relationships': [], 'metadata': metadata or {}, 'created_at': time.time() } else: self.words[word]['usage_count'] += 1 if domain and domain not in self.words[word]['domains']: self.words[word]['domains'].append(domain) if definition and not self.words[word]['definition']: self.words[word]['definition'] = definition if metadata: self.words[word]['metadata'].update(metadata) # Update domain mappings if domain: if domain not in self.domain_mappings: self.domain_mappings[domain] = [] if word not in self.domain_mappings[domain]: self.domain_mappings[domain].append(word) def add_relationship(self, word1, word2, relationship_type, strength=0.5, metadata=None): """Create a relationship between two words""" # Ensure both words exist if word1 not in self.words: self.add_word(word1) if word2 not in self.words: self.add_word(word2) # Create relationship key rel_key = f"{word1}:{word2}" # Add or update relationship if rel_key not in self.relationships: self.relationships[rel_key] = { 'type': relationship_type, 'strength': strength, 'metadata': metadata or {}, 'created_at': time.time() } else: # Update existing relationship current = self.relationships[rel_key] current['strength'] = (current['strength'] + strength) / 2 # Average strength if metadata: current['metadata'].update(metadata) # Update word relationship lists if word2 not in self.words[word1]['relationships']: self.words[word1]['relationships'].append(word2) if word1 not in self.words[word2]['relationships']: self.words[word2]['relationships'].append(word1) def get_word(self, word): """Retrieve a word and its data from the database""" return self.words.get(word, None) def get_relationships(self, word, max_depth=1): """Get all relationships for a word up to a certain depth""" if word not in self.words: return [] if max_depth <= 0: return [] relationships = [] direct_relationships = self.words[word]['relationships'] for related_word in direct_relationships: rel_key = f"{word}:{related_word}" reverse_key = f"{related_word}:{word}" # Get the relationship data rel_data = self.relationships.get(rel_key, self.relationships.get(reverse_key)) if rel_data: relationships.append({ 'word': related_word, 'relationship_type': rel_data['type'], 'strength': rel_data['strength'], 'metadata': rel_data['metadata'] }) # Recursively get deeper relationships if needed if max_depth > 1: deeper = self.get_relationships(related_word, max_depth - 1) relationships.extend(deeper) return relationships def get_domain_words(self, domain, limit=100): """Get words associated with a particular domain""" if domain not in self.domain_mappings: return [] words = self.domain_mappings[domain] if limit: return words[:limit] return words def get_total_chunks(self, chunk_size=100): """Get the number of chunks when processing database in chunks""" return (len(self.words) + chunk_size - 1) // chunk_size def get_chunk(self, chunk_index, chunk_size=100): """Get a specific chunk of words""" all_words = list(self.words.keys()) start_idx = chunk_index * chunk_size end_idx = min(start_idx + chunk_size, len(all_words)) chunk_words = all_words[start_idx:end_idx] chunk = { 'words': chunk_words, 'data': {word: self.words[word] for word in chunk_words} } return chunk class PatternRecognitionNetwork(nn.Module): """ Neural network for pattern recognition across multiple levels. """ def __init__(self, input_dim, hidden_dim, output_dim): super(PatternRecognitionNetwork, self).__init__() # Token-level pattern recognition self.token_encoder = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Dropout(0.3), nn.Linear(hidden_dim, hidden_dim) ) # Syntactic pattern recognition self.syntactic_encoder = nn.Sequential( nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Dropout(0.3), nn.Linear(hidden_dim, hidden_dim) ) # Semantic pattern recognition self.semantic_encoder = nn.Sequential( nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Dropout(0.3), nn.Linear(hidden_dim, hidden_dim) ) # Final output layer self.output_layer = nn.Linear(hidden_dim * 3, output_dim) # Attention mechanism for context self.context_attention = nn.MultiheadAttention(hidden_dim, num_heads=4) def forward(self, x, context=None): # Token-level patterns token_patterns = self.token_encoder(x) # Syntactic patterns syntactic_patterns = self.syntactic_encoder(token_patterns) # Apply context attention if available if context is not None: syntactic_patterns, _ = self.context_attention( syntactic_patterns.unsqueeze(0), context.unsqueeze(0), context.unsqueeze(0) ) syntactic_patterns = syntactic_patterns.squeeze(0) # Semantic patterns semantic_patterns = self.semantic_encoder(syntactic_patterns) # Concatenate all pattern levels combined = torch.cat([token_patterns, syntactic_patterns, semantic_patterns], dim=1) # Final output output = self.output_layer(combined) return output class LanguageModel(nn.Module): """ Language model for processing and generating text. """ def __init__(self, vocab_size, embed_dim, hidden_dim): super(LanguageModel, self).__init__() self.embedding = nn.Embedding(vocab_size, embed_dim) self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True, num_layers=2, bidirectional=True) self.output = nn.Linear(hidden_dim * 2, vocab_size) def forward(self, x): embedded = self.embedding(x) output, (hidden, cell) = self.lstm(embedded) logits = self.output(output) return logits class CausalReasoningSystem: """ System for detecting and validating causal relationships. """ def __init__(self): self.causal_graph = {} self.confidence_scores = {} def add_causal_relation(self, cause, effect, confidence=0.5, evidence=None): """Add a causal relationship with evidence""" relation_id = f"{cause}→{effect}" self.causal_graph[relation_id] = { 'cause': cause, 'effect': effect, 'confidence': confidence, 'evidence': evidence or [], 'created_at': time.time() } def verify_relation(self, cause, effect): """Verify if a causal relation is valid and return confidence""" relation_id = f"{cause}→{effect}" if relation_id in self.causal_graph: return self.causal_graph[relation_id]['confidence'] # Check for temporal inconsistency (effect before cause) reverse_id = f"{effect}→{cause}" if reverse_id in self.causal_graph: # Potential logical contradiction return -1 * self.causal_graph[reverse_id]['confidence'] return 0.0 # No known relationship def detect_inconsistencies(self): """Find logical inconsistencies in the causal graph""" inconsistencies = [] # Check for cycles in the causal graph visited = set() for relation_id in self.causal_graph: cause, effect = relation_id.split('→') # Simple cycle detection path = [cause] if self._has_cycle(effect, path, set(path)): inconsistencies.append({ 'type': 'cycle', 'path': path + [effect], 'relations': [f"{path[i]}→{path[i+1]}" for i in range(len(path) - 1)] + [f"{path[-1]}→{effect}"] }) # Check for direct contradictions (A→B and B→A with high confidence) for relation_id in self.causal_graph: cause, effect = relation_id.split('→') reverse_id = f"{effect}→{cause}" if reverse_id in self.causal_graph: confidence = self.causal_graph[relation_id]['confidence'] reverse_confidence = self.causal_graph[reverse_id]['confidence'] if confidence > 0.7 and reverse_confidence > 0.7: inconsistencies.append({ 'type': 'contradiction', 'relations': [relation_id, reverse_id], 'confidences': [confidence, reverse_confidence] }) return inconsistencies def _has_cycle(self, node, path, visited): """Helper method for cycle detection""" if node in visited: return True visited.add(node) path.append(node) for relation_id in self.causal_graph: cause, effect = relation_id.split('→') if cause == node and effect not in visited: if self._has_cycle(effect, path, visited): return True path.pop() return False def resolve_inconsistencies(self, inconsistencies): """Attempt to resolve detected inconsistencies""" resolved = [] for inconsistency in inconsistencies: if inconsistency['type'] == 'contradiction': # For direct contradictions, keep the relation with higher confidence rel1, rel2 = inconsistency['relations'] conf1, conf2 = inconsistency['confidences'] if conf1 > conf2: if rel2 in self.causal_graph: del self.causal_graph[rel2] resolved.append({ 'type': 'contradiction', 'resolved': True, 'kept': rel1, 'removed': rel2 }) else: if rel1 in self.causal_graph: del self.causal_graph[rel1] resolved.append({ 'type': 'contradiction', 'resolved': True, 'kept': rel2, 'removed': rel1 }) elif inconsistency['type'] == 'cycle': # For cycles, remove the weakest link in the cycle weakest_link = None min_confidence = 1.0 for relation in inconsistency['relations']: if relation in self.causal_graph: confidence = self.causal_graph[relation]['confidence'] if confidence < min_confidence: min_confidence = confidence weakest_link = relation if weakest_link and weakest_link in self.causal_graph: del self.causal_graph[weakest_link] resolved.append({ 'type': 'cycle', 'resolved': True, 'removed': weakest_link }) return resolved class MemorySystem: """ Multi-tiered memory system for the autobot. """ def __init__(self, working_memory_limit=100, semantic_memory_limit=10000): # Working memory (currently active processing) self.working_memory = { 'capacity': working_memory_limit, 'items': {}, 'priority_queue': [] } # Semantic memory (long-term knowledge) self.semantic_memory = { 'capacity': semantic_memory_limit, 'items': {}, 'access_frequency': {} } # Episodic memory (past interactions) self.episodic_memory = { 'interactions': [], 'last_accessed': {} } def add_to_working_memory(self, key, data, priority=1.0): """Add an item to working memory with priority""" # Check if we need to make space if len(self.working_memory['items']) >= self.working_memory['capacity']: # Remove lowest priority item self.working_memory['priority_queue'].sort(key=lambda x: x[1]) remove_key, _ = self.working_memory['priority_queue'].pop(0) del self.working_memory['items'][remove_key] # Add new item self.working_memory['items'][key] = { 'data': data, 'created_at': time.time(), 'priority': priority } # Update priority queue self.working_memory['priority_queue'].append((key, priority)) def get_from_working_memory(self, key): """Retrieve an item from working memory""" if key in self.working_memory['items']: return self.working_memory['items'][key]['data'] return None def clear_working_memory(self): """Clear all items from working memory""" self.working_memory['items'] = {} self.working_memory['priority_queue'] = [] def add_to_semantic_memory(self, key, data): """Add knowledge to semantic memory""" if key in self.semantic_memory['items']: # Update existing entry self.semantic_memory['items'][key]['data'] = data self.semantic_memory['items'][key]['updated_at'] = time.time() else: # Add new entry self.semantic_memory['items'][key] = { 'data': data, 'created_at': time.time(), 'updated_at': time.time() } # Update access frequency if key not in self.semantic_memory['access_frequency']: self.semantic_memory['access_frequency'][key] = 0 self.semantic_memory['access_frequency'][key] += 1 def get_from_semantic_memory(self, key): """Retrieve knowledge from semantic memory""" if key in self.semantic_memory['items']: # Update access frequency self.semantic_memory['access_frequency'][key] += 1 return self.semantic_memory['items'][key]['data'] return None def record_interaction(self, question, response, confidence): """Record an interaction in episodic memory""" interaction = { 'timestamp': time.time(), 'question': question, 'response': response, 'confidence': confidence } self.episodic_memory['interactions'].append(interaction) def find_similar_interactions(self, query, limit=5): """Find similar past interactions by simple keyword matching""" # In a real implementation, this would use semantic similarity matches = [] query_words = set(query.lower().split()) for interaction in self.episodic_memory['interactions']: question = interaction['question'].lower() question_words = set(question.split()) # Calculate simple overlap overlap = len(query_words.intersection(question_words)) if overlap > 0: matches.append({ 'interaction': interaction, 'overlap': overlap }) # Sort by overlap score matches.sort(key=lambda x: x['overlap'], reverse=True) return [m['interaction'] for m in matches[:limit]] def get_working_memory_usage(self): """Get current usage of working memory""" return { 'used': len(self.working_memory['items']), 'total': self.working_memory['capacity'], 'percentage': (len(self.working_memory['items']) / self.working_memory['capacity']) * 100 } def get_semantic_memory_usage(self): """Get current usage of semantic memory""" return { 'used': len(self.semantic_memory['items']), 'total': self.semantic_memory['capacity'], 'percentage': (len(self.semantic_memory['items']) / self.semantic_memory['capacity']) * 100 } def get_episodic_memory_size(self): """Get size of episodic memory""" return len(self.episodic_memory['interactions']) class PerformanceMonitor: """ Tracks performance metrics and system resource usage. """ def __init__(self): self.metrics = { 'processing_rates': [], 'memory_usage': [], 'accuracy': [], 'confidence': [], 'response_times': [], 'operations': {} } self.current_task = { 'name': None, 'start_time': None, 'progress': 0, 'status': 'idle' } self.convergence_history = [] self.convergence_threshold = 0.001 self.convergence_window = 5 def start_task(self, task_name): """Start tracking a new task""" self.current_task = { 'name': task_name, 'start_time': time.time(), 'progress': 0, 'status': 'running' } def update_task_progress(self, progress): """Update progress of current task (0-100)""" self.current_task['progress'] = progress def end_task(self, task_name): """End tracking for a task""" if self.current_task['name'] == task_name: duration = time.time() - self.current_task['start_time'] if task_name not in self.metrics['operations']: self.metrics['operations'][task_name] = [] self.metrics['operations'][task_name].append({ 'duration': duration, 'timestamp': time.time() }) self.current_task = { 'name': None, 'start_time': None, 'progress': 0, 'status': 'idle' } def track_memory_usage(self): """Track current memory usage""" usage = { 'timestamp': time.time(), 'system': { 'percent': psutil.virtual_memory().percent, 'used': psutil.virtual_memory().used, 'total': psutil.virtual_memory().total }, 'process': { 'used': psutil.Process().memory_info().rss } } self.metrics['memory_usage'].append(usage) return usage def track_processing_rate(self, items_processed, time_taken): """Track processing rate""" rate = items_processed / time_taken if time_taken > 0 else 0 self.metrics['processing_rates'].append({ 'timestamp': time.time(), 'rate': rate, 'items': items_processed, 'duration': time_taken }) return rate def record_accuracy(self, accuracy): """Record accuracy metric""" self.metrics['accuracy'].append({ 'timestamp': time.time(), 'value': accuracy }) # Update convergence history self.convergence_history.append(accuracy) if len(self.convergence_history) > self.convergence_window: self.convergence_history.pop(0) def has_converged(self): """Check if performance has converged based on recent history""" if len(self.convergence_history) < self.convergence_window: return False # Calculate improvement rate over window improvements = [abs(self.convergence_history[i] - self.convergence_history[i-1]) for i in range(1, len(self.convergence_history))] avg_improvement = sum(improvements) / len(improvements) return avg_improvement < self.convergence_threshold def get_average_processing_rate(self, window=10): """Get average processing rate over recent window""" if not self.metrics['processing_rates']: return 0 recent = self.metrics['processing_rates'][-window:] if not recent: return 0 return sum(r['rate'] for r in recent) / len(recent) def get_current_task_status(self): """Get current task status""" if self.current_task['name']: elapsed = time.time() - self.current_task['start_time'] # Estimate time remaining based on progress estimated_total = 0 if self.current_task['progress'] > 0: estimated_total = elapsed / (self.current_task['progress'] / 100) remaining = max(0, estimated_total - elapsed) return { 'task': self.current_task['name'], 'progress': self.current_task['progress'], 'elapsed': elapsed, 'estimated_remaining': remaining, 'status': self.current_task['status'] } return { 'task': None, 'progress': 0, 'elapsed': 0, 'estimated_remaining': 0, 'status': 'idle' } class WordProcessingTracker: """ Tracks which words are being processed and visualizes the process. """ def __init__(self): self.current_word = None self.processing_stage = 'idle' self.processing_history = [] self.start_time = None self.words_processed = 0 def set_current_word(self, word, stage='processing'): """Set the current word being processed""" if self.current_word: # Record previous word processing time end_time = time.time() duration = end_time - (self.start_time or end_time) self.processing_history.append({ 'word': self.current_word, 'stage': self.processing_stage, 'start_time': self.start_time, 'end_time': end_time, 'duration': duration }) self.words_processed += 1 self.current_word = word self.processing_stage = stage self.start_time = time.time() def get_processing_rate(self, window=60): """Calculate words processed per second over the last time window""" if not self.processing_history: return 0 # Get history entries from the last window seconds cutoff_time = time.time() - window recent = [entry for entry in self.processing_history if entry['end_time'] > cutoff_time] if not recent: return 0 # Count words and time span word_count = len(recent) time_span = recent[-1]['end_time'] - recent[0]['start_time'] if time_span <= 0: return 0 return word_count / time_span def get_current_word_info(self): """Get information about the current word being processed""" if not self.current_word: return { 'word': None, 'stage': 'idle', 'duration': 0 } return { 'word': self.current_word, 'stage': self.processing_stage, 'duration': time.time() - (self.start_time or time.time()) } class StrongAutobot: """ Main class for the Autobot system, implemented with PyTorch and Keras. """ def __init__(self, database_path=None, embedding_dim=300, hidden_dim=512): # Initialize word database self.word_database = WordDatabase(database_path) # Initialize tokenizer self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') self.bert_model = BertModel.from_pretrained('bert-base-uncased') # Initialize PyTorch models self.pattern_recognition = PatternRecognitionNetwork( input_dim=embedding_dim, hidden_dim=hidden_dim, output_dim=embedding_dim ) # Initialize memory systems self.memory_system = MemorySystem() # Initialize reasoning system self.reasoning_system = CausalReasoningSystem() # Initialize monitoring systems self.performance_monitor = PerformanceMonitor() self.word_tracker = WordProcessingTracker() # Configure visualization system self.enable_visualization = True self.visualization = { 'memory_fig': None, 'word_fig': None, 'performance_fig': None } # Initialize metrics self.metrics = { 'accuracy': 0, 'coverage': 0, 'consistency': 0 } self.running = True logger.info("StrongAutobot initialized") def initialize_system(self): """Initialize the system components and start monitoring""" logger.info("Starting system initialization") self.performance_monitor.start_task("system_initialization") # Setup visualization if enabled if self.enable_visualization: self.setup_visualization() total_words = len(self.word_database.words) logger.info(f"Loading initial embeddings for {total_words} words") # Process in chunks to avoid memory issues chunk_size = 100 total_chunks = self.word_database.get_total_chunks(chunk_size) for i in range(total_chunks): self.performance_monitor.update_task_progress((i / total_chunks) * 100) chunk = self.word_database.get_chunk(i, chunk_size) # Process each word in chunk for word in chunk['words']: self.word_tracker.set_current_word(word, 'initializing') # Update visualization if self.enable_visualization: self.update_visualization() # Get word data word_data = chunk['data'][word] # Add to memory system self.memory_system.add_to_semantic_memory(word, word_data) # Track memory usage self.performance_monitor.track_memory_usage() # Calculate processing rate processing_rate = self.word_tracker.get_processing_rate() logger.info(f"Chunk {i+1}/{total_chunks} processed. Rate: {processing_rate:.2f} words/sec") self.performance_monitor.end_task("system_initialization") logger.info("System initialization complete") import os import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from tensorflow import keras import tensorflow as tf from sklearn.metrics.pairwise import cosine_similarity import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import networkx as nx import psutil from tqdm import tqdm import time import json import logging import threading import queue # Continue from previous artifact class StrongAutobot: def setup_visualization(self): """Setup matplotlib visualization figures""" plt.ion() # Enable interactive mode # Create figure for memory visualization self.visualization['memory_fig'], memory_ax = plt.subplots(figsize=(8, 4)) memory_ax.set_title('Memory Usage') memory_ax.set_xlabel('Time') memory_ax.set_ylabel('Memory Usage %') memory_ax.grid(True) # Memory usage lines self.visualization['memory_lines'] = { 'working': memory_ax.plot([], [], 'b-', label='Working Memory')[0], 'semantic': memory_ax.plot([], [], 'g-', label='Semantic Memory')[0], 'system': memory_ax.plot([], [], 'r-', label='System Memory')[0] } memory_ax.legend() # Create figure for word tracking self.visualization['word_fig'], word_ax = plt.subplots(figsize=(10, 6)) word_ax.set_title('Word Relationship Network') self.visualization['word_network'] = nx.Graph() # Create figure for performance tracking self.visualization['performance_fig'], (perf_ax1, perf_ax2) = plt.subplots(2, 1, figsize=(8, 6)) perf_ax1.set_title('Processing Rate') perf_ax1.set_xlabel('Time') perf_ax1.set_ylabel('Words/sec') perf_ax1.grid(True) perf_ax2.set_title('Model Performance') perf_ax2.set_xlabel('Iteration') perf_ax2.set_ylabel('Metrics') perf_ax2.grid(True) # Performance lines self.visualization['performance_lines'] = { 'rate': perf_ax1.plot([], [], 'b-', label='Processing Rate')[0], 'accuracy': perf_ax2.plot([], [], 'g-', label='Accuracy')[0], 'consistency': perf_ax2.plot([], [], 'r-', label='Consistency')[0], 'coverage': perf_ax2.plot([], [], 'y-', label='Coverage')[0] } perf_ax1.legend() perf_ax2.legend() # Show all figures for fig_key in self.visualization: if fig_key.endswith('_fig'): self.visualization[fig_key].tight_layout() self.visualization[fig_key].show() def update_visualization(self): """Update all visualization components""" if not self.enable_visualization: return try: # Update memory usage visualization self._update_memory_visualization() # Update word processing visualization self._update_word_visualization() # Update performance visualization self._update_performance_visualization() # Redraw all figures for fig_key in self.visualization: if fig_key.endswith('_fig'): self.visualization[fig_key].canvas.draw_idle() self.visualization[fig_key].canvas.flush_events() except Exception as e: logger.error(f"Error updating visualization: {e}") def _update_memory_visualization(self): """Update memory usage visualization""" # Get memory data working_memory = self.memory_system.get_working_memory_usage()['percentage'] semantic_memory = self.memory_system.get_semantic_memory_usage()['percentage'] system_memory = psutil.virtual_memory().percent # Update data for memory lines x_time = time.time() - self.start_time # Extend line data x_data = self.visualization['memory_lines']['working'].get_xdata() x_data = np.append(x_data, x_time) # Working memory y_data = self.visualization['memory_lines']['working'].get_ydata() y_data = np.append(y_data, working_memory) self.visualization['memory_lines']['working'].set_data(x_data, y_data) # Semantic memory y_data = self.visualization['memory_lines']['semantic'].get_ydata() y_data = np.append(y_data, semantic_memory) self.visualization['memory_lines']['semantic'].set_data(x_data, y_data) # System memory y_data = self.visualization['memory_lines']['system'].get_ydata() y_data = np.append(y_data, system_memory) self.visualization['memory_lines']['system'].set_data(x_data, y_data) # Update axes limits mem_ax = self.visualization['memory_fig'].axes[0] mem_ax.set_xlim(max(0, x_time - 60), x_time + 5) # Show last 60 seconds mem_ax.set_ylim(0, 110) # 0-110% range # Update title with current word word_info = self.word_tracker.get_current_word_info() if word_info['word']: mem_ax.set_title(f"Memory Usage (Current Word: {word_info['word']})") else: mem_ax.set_title("Memory Usage") def _update_word_visualization(self): """Update word relationship visualization""" word_info = self.word_tracker.get_current_word_info() current_word = word_info['word'] if not current_word: return # Clear previous graph word_ax = self.visualization['word_fig'].axes[0] word_ax.clear() # Reset graph G = nx.Graph() # Add current word as central node G.add_node(current_word) # Get relationships for current word relationships = self.word_database.get_relationships(current_word, max_depth=2) # Add related words and edges for rel in relationships: related_word = rel['word'] strength = rel['strength'] rel_type = rel['relationship_type'] # Add node if not exists if not G.has_node(related_word): G.add_node(related_word) # Add edge with relationship data G.add_edge(current_word, related_word, weight=strength, type=rel_type) # Define colors for relationship types color_map = { 'semantic': 'blue', 'syntactic': 'green', 'etymological': 'purple', 'default': 'gray' } # Define node positions using spring layout pos = nx.spring_layout(G) # Draw nodes nx.draw_networkx_nodes(G, pos, node_color=['red' if node == current_word else 'skyblue' for node in G.nodes()], node_size=[700 if node == current_word else 300 for node in G.nodes()], alpha=0.8, ax=word_ax) # Draw edges with colors based on relationship type for u, v, data in G.edges(data=True): rel_type = data.get('type', 'default') color = color_map.get(rel_type, color_map['default']) width = data.get('weight', 0.5) * 3 # Scale width by strength nx.draw_networkx_edges(G, pos, edgelist=[(u, v)], width=width, alpha=0.7, edge_color=color, ax=word_ax) # Draw labels nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold', ax=word_ax) # Set title word_ax.set_title(f"Word Relationships: {current_word} (Stage: {word_info['stage']})") word_ax.axis('off') def _update_performance_visualization(self): """Update performance visualization""" # Update processing rate rate_ax = self.visualization['performance_fig'].axes[0] processing_rate = self.word_tracker.get_processing_rate() x_time = time.time() - self.start_time # Extend line data for processing rate x_data = self.visualization['performance_lines']['rate'].get_xdata() x_data = np.append(x_data, x_time) y_data = self.visualization['performance_lines']['rate'].get_ydata() y_data = np.append(y_data, processing_rate) self.visualization['performance_lines']['rate'].set_data(x_data, y_data) # Update rate axis limits rate_ax.set_xlim(max(0, x_time - 60), x_time + 5) # Show last 60 seconds rate_ax.set_ylim(0, max(y_data) * 1.2 if len(y_data) > 0 and max(y_data) > 0 else 10) # Update metrics if available performance_data = self.performance_monitor.metrics if 'accuracy' in performance_data and performance_data['accuracy']: acc_data = performance_data['accuracy'] x_iterations = list(range(len(acc_data))) y_accuracy = [d['value'] for d in acc_data] self.visualization['performance_lines']['accuracy'].set_data(x_iterations, y_accuracy) # Update other metrics if available if hasattr(self, 'metrics'): x_iterations_ext = list(range(len(x_iterations) + 1)) if 'consistency' in self.metrics: consistency_data = y_accuracy + [self.metrics['consistency']] self.visualization['performance_lines']['consistency'].set_data(x_iterations_ext, consistency_data) if 'coverage' in self.metrics: coverage_data = y_accuracy + [self.metrics['coverage']] self.visualization['performance_lines']['coverage'].set_data(x_iterations_ext, coverage_data) # Update metrics axis limits metrics_ax = self.visualization['performance_fig'].axes[1] metrics_ax.set_xlim(0, max(len(x_iterations) + 2, 10)) metrics_ax.set_ylim(0, 1.1) # Update task status in title task_status = self.performance_monitor.get_current_task_status() if task_status['task']: progress = task_status['progress'] rate_ax.set_title(f"Processing Rate - {task_status['task']} ({progress:.1f}%)") else: rate_ax.set_title("Processing Rate") def train_on_word_database(self, epochs=3, batch_size=64): """Train the system on the word database""" logger.info(f"Starting training for {epochs} epochs with batch size {batch_size}") self.performance_monitor.start_task("training") # Create optimizer for pattern recognition network optimizer = optim.Adam(self.pattern_recognition.parameters(), lr=0.001) # Set start time for visualization self.start_time = time.time() # Process in epochs for epoch in range(epochs): logger.info(f"Starting epoch {epoch+1}/{epochs}") # Process in chunks chunk_size = 100 total_chunks = self.word_database.get_total_chunks(chunk_size) # Track epoch metrics epoch_loss = 0.0 words_processed = 0 for i in range(total_chunks): # Update progress progress = ((epoch * total_chunks) + i) / (epochs * total_chunks) * 100 self.performance_monitor.update_task_progress(progress) # Get chunk chunk = self.word_database.get_chunk(i, chunk_size) # Skip empty chunks if not chunk['words']: continue # Process each word in chunk for word in chunk['words']: # Update word tracker self.word_tracker.set_current_word(word, 'training') # Get word data and relationships word_data = chunk['data'][word] relationships = word_data.get('relationships', []) # Skip words with no relationships if not relationships: continue # Get embeddings for word and related words word_embedding = self._get_word_embedding(word) # Get context from related words context_embeddings = [] for related in relationships[:5]: # Limit to 5 related words for efficiency rel_embedding = self._get_word_embedding(related) if rel_embedding is not None: context_embeddings.append(rel_embedding) # Skip if no context if not context_embeddings: continue # Average context embeddings context = torch.stack(context_embeddings).mean(dim=0) # Train pattern recognition network self.pattern_recognition.train() optimizer.zero_grad() # Forward pass with word embedding and context output = self.pattern_recognition(word_embedding, context) # Simple loss function: try to predict the context from the word loss = nn.functional.mse_loss(output, context) # Backward pass and optimization loss.backward() optimizer.step() # Update metrics epoch_loss += loss.item() words_processed += 1 # Update memory self.memory_system.add_to_semantic_memory(word, { 'embedding': word_embedding.detach().numpy(), 'data': word_data }) # Update visualization if self.enable_visualization and i % 10 == 0: # Update every 10 words to avoid slowdown self.update_visualization() # Log progress if i % 10 == 0 or i == total_chunks - 1: avg_loss = epoch_loss / max(1, words_processed) processing_rate = self.word_tracker.get_processing_rate() logger.info(f"Epoch {epoch+1}/{epochs}, Chunk {i+1}/{total_chunks}, " f"Loss: {avg_loss:.4f}, Rate: {processing_rate:.2f} words/sec") # Calculate epoch metrics avg_epoch_loss = epoch_loss / max(1, words_processed) # Evaluate model accuracy = self._evaluate_model() # Record metrics self.performance_monitor.record_accuracy(accuracy) # Update metrics dict self.metrics = { 'accuracy': accuracy, 'consistency': self._evaluate_consistency(), 'coverage': self._evaluate_coverage() } logger.info(f"Epoch {epoch+1}/{epochs} completed. " f"Loss: {avg_epoch_loss:.4f}, Accuracy: {accuracy:.4f}, " f"Consistency: {self.metrics['consistency']:.4f}, " f"Coverage: {self.metrics['coverage']:.4f}") # Check for convergence if self.performance_monitor.has_converged(): logger.info(f"Training converged after {epoch+1} epochs") break self.performance_monitor.end_task("training") logger.info("Training completed") def _get_word_embedding(self, word): """Get embedding for a word using BERT model""" # Check if already in memory cached = self.memory_system.get_from_semantic_memory(word) if cached and 'embedding' in cached: return torch.tensor(cached['embedding']) # Get embedding from BERT try: inputs = self.tokenizer(word, return_tensors="pt") with torch.no_grad(): outputs = self.bert_model(**inputs) # Use mean of last hidden state as embedding word_embedding = outputs.last_hidden_state.mean(dim=1).squeeze() return word_embedding except Exception as e: logger.error(f"Error getting embedding for word '{word}': {e}") return None def _evaluate_model(self, test_size=100): """Evaluate model performance on a sample of words""" self.pattern_recognition.eval() # Sample random words for testing all_words = list(self.word_database.words.keys()) if len(all_words) <= test_size: test_words = all_words else: test_words = np.random.choice(all_words, size=test_size, replace=False) correct = 0 total = 0 for word in test_words: # Get word relationships relationships = self.word_database.get_relationships(word) if not relationships: continue # Get word embedding word_embedding = self._get_word_embedding(word) if word_embedding is None: continue # Predict relationships using pattern recognition with torch.no_grad(): predicted_context = self.pattern_recognition(word_embedding) # Get actual related word embeddings actual_embeddings = [] for rel in relationships[:5]: # Top 5 relationships rel_word = rel['word'] rel_embedding = self._get_word_embedding(rel_word) if rel_embedding is not None: actual_embeddings.append(rel_embedding) if not actual_embeddings: continue # Calculate similarity between prediction and actual actual_context = torch.stack(actual_embeddings).mean(dim=0) similarity = torch.cosine_similarity(predicted_context.unsqueeze(0), actual_context.unsqueeze(0)).item() # Consider prediction correct if similarity is high if similarity > 0.7: correct += 1 total += 1 # Calculate accuracy accuracy = correct / max(1, total) return accuracy def _evaluate_consistency(self): """Evaluate logical consistency of the relationships""" # Check for inconsistencies in causal relationships inconsistencies = self.reasoning_system.detect_inconsistencies() # Calculate consistency score (1.0 = no inconsistencies) total_relations = len(self.reasoning_system.causal_graph) if total_relations == 0: return 1.0 consistency = 1.0 - (len(inconsistencies) / total_relations) return max(0.0, consistency) def _evaluate_coverage(self): """Evaluate knowledge coverage of the database""" # Count words with relationships words_with_relations = sum(1 for word in self.word_database.words if self.word_database.words[word]['relationships']) # Calculate coverage score total_words = len(self.word_database.words) if total_words == 0: return 0.0 coverage = words_with_relations / total_words return coverage def process_query(self, question): """Process a user query and generate a response based on the word database""" logger.info(f"Processing query: {question}") self.performance_monitor.start_task("query_processing") # Clear and update working memory self.memory_system.clear_working_memory() self.memory_system.add_to_working_memory('query', question, priority=1.0) # Set query as current "word" for visualization self.word_tracker.set_current_word(question[:30] + "..." if len(question) > 30 else question, 'query_analysis') # Track start time for performance measurement query_start_time = time.time() # Update visualization if self.enable_visualization: self.update_visualization() # Extract key terms from the question key_terms = self._extract_key_terms(question) # Retrieve relevant information for each key term relevant_info = [] for term in key_terms: # Set current word for visualization self.word_tracker.set_current_word(term, 'retrieving') if self.enable_visualization: self.update_visualization() # Get information about this term term_info = self.word_database.get_word(term) if term_info: # Get related words relationships = self.word_database.get_relationships(term) relevant_info.append({ 'term': term, 'info': term_info, 'relationships': relationships }) # Add to working memory self.memory_system.add_to_working_memory(term, term_info, priority=0.8) # Find similar past interactions similar_interactions = self.memory_system.find_similar_interactions(question) # Apply causal reasoning if appropriate self.word_tracker.set_current_word(question[:30] + "..." if len(question) > 30 else question, 'reasoning') if self.enable_visualization: self.update_visualization() # Formulate response based on context and retrieved information self.word_tracker.set_current_word(question[:30] + "..." if len(question) > 30 else question, 'formulating_response') if self.enable_visualization: self.update_visualization() response, confidence = self._formulate_response(question, relevant_info, similar_interactions) # Calculate processing time processing_time = time.time() - query_start_time # Record interaction in episodic memory self.memory_system.record_interaction(question, response, confidence) # Learn from this interaction self._learn_from_interaction(question, key_terms, response, confidence) # End task tracking self.performance_monitor.end_task("query_processing") logger.info(f"Query processed in {processing_time:.2f} seconds with confidence {confidence:.2f}") # Return response with metadata return { 'response': response, 'confidence': confidence, 'processing_time': processing_time, 'key_terms': key_terms, 'relevant_info_count': len(relevant_info) } def _extract_key_terms(self, text): """Extract key terms from input text""" # Tokenize text tokens = [token.lower() for token in text.split()] # Remove stopwords (simplified approach) stopwords = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'can', 'could', 'will', 'would', 'should', 'shall', 'may', 'might', 'must', 'of', 'by', 'as', 'that', 'this', 'these', 'those']) filtered_tokens = [token for token in tokens if token.lower() not in stopwords and len(token) > 2] # Filter to tokens in our database known_terms = [token for token in filtered_tokens if token in self.word_database.words] # If no known terms, try partial matches if not known_terms: for token in filtered_tokens: for word in self.word_database.words: if token in word or word in token: known_terms.append(word) # Limit to top terms (by usage count if available) if len(known_terms) > 5: term_counts = [(term, self.word_database.words.get(term, {}).get('usage_count', 0)) for term in known_terms] term_counts.sort(key=lambda x: x[1], reverse=True) known_terms = [term for term, _ in term_counts[:5]] return known_terms def _formulate_response(self, question, relevant_info, similar_interactions): """Generate a response based on retrieved information""" # If no relevant information found if not relevant_info: if similar_interactions: # Use similar past interaction prev_interaction = similar_interactions[0] confidence = 0.5 # Medium confidence since it's based on similarity return f"Based on similar questions I've seen before, {prev_interaction['response']}", confidence else: return "I don't have enough information to answer that question accurately.", 0.1 # Combine information from all relevant terms combined_info = {} for info in relevant_info: term = info['term'] term_data = info['info'] # Store term definition if 'definition' in term_data and term_data['definition']: combined_info[term] = term_data['definition'] # Add domain information if available if 'domains' in term_data and term_data['domains']: if term not in combined_info: combined_info[term] = "" combined_info[term] += f" (Domains: {', '.join(term_data['domains'])})" # Add relationship information if info['relationships']: top_relations = info['relationships'][:3] # Top 3 relationships related_terms = [rel['word'] for rel in top_relations] if term not in combined_info: combined_info[term] = "" combined_info[term] += f" Related: {', '.join(related_terms)}." # Determine question type (simplified) if question.lower().startswith('what is') or question.lower().startswith('what are'): question_type = 'definition' elif question.lower().startswith('how'): question_type = 'process' elif question.lower().startswith('why'): question_type = 'explanation' elif question.lower().startswith('who'): question_type = 'person' elif question.lower().startswith('where'): question_type = 'location' elif question.lower().startswith('when'): question_type = 'time' else: question_type = 'general' # Format response based on question type response = "" if question_type == 'definition': term = relevant_info[0]['term'] if term in combined_info: response = f"{term.capitalize()} is {combined_info[term]}" else: response = f"I have information about {term}, but no specific definition." elif question_type == 'explanation': # Try to use causal relationships for explanations for info in relevant_info: term = info['term'] for rel in info['relationships']: related = rel['word'] causal_conf = self.reasoning_system.verify_relation(term, related) if causal_conf > 0.5: response = f"This happens because {term} causes {related}. " break causal_conf = self.reasoning_system.verify_relation(related, term) if causal_conf > 0.5: response = f"This happens because {related} causes {term}. " break # If no causal relationships found, provide general information if not response: response = "Based on the information I have, " for term, info in combined_info.items(): response += f"{term} is {info} " else: # General format for other question types response = "Based on my knowledge, " for term, info in combined_info.items(): response += f"{term} is {info} " # Calculate confidence based on relevance and coverage term_coverage = len(relevant_info) / max(1, len(self._extract_key_terms(question))) info_detail = sum(len(str(info)) for info in combined_info.values()) / 100 # Normalize confidence = min(0.9, term_coverage * 0.7 + min(info_detail, 0.3)) return response.strip(), confidence def _learn_from_interaction(self, question, key_terms, response, confidence): """Learn from the interaction to improve future responses""" # Only learn from high-confidence responses if confidence < 0.6: return # Associate question terms with each other for i, term1 in enumerate(key_terms): for term2 in key_terms[i+1:]: # Add relationship between terms self.word_database.add_relationship( term1, term2, relationship_type='co-occurrence', strength=0.7, metadata={'source': 'user_query'} ) # Increment usage count for terms for term in key_terms: if term in self.word_database.words: self.word_database.words[term]['usage_count'] += 1 # Look for potential causal relationships in question-answer for term1 in key_terms: for term2 in key_terms: if term1 != term2: # Define causal phrases to detect potential causal relationships causal_phrases = [ f"{term1} causes {term2}", f"{term1} results in {term2}", f"{term1} leads to {term2}", f"because of {term1}, {term2}", f"{term2} is caused by {term1}" ] # Check if any causal phrase is in the response for phrase in causal_phrases: if phrase.lower() in response.lower(): # Calculate confidence based on phrase detection and response quality phrase_confidence = confidence * 0.8 # Slightly reduced confidence # Add causal relationship to reasoning system self.reasoning_system.add_causal_relation( term1, term2, confidence=phrase_confidence, evidence=[{ 'source': 'interaction', 'question': question, 'response': response, 'detected_phrase': phrase }] ) # Break after first match to avoid duplicate entries break