Files
Web-To-Knowledgebase/owui-site-crawler.py
2025-11-14 21:08:28 +07:00

721 lines
29 KiB
Python

#!/usr/bin/env python3
import os
import sys
import argparse
import requests
import time
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from markitdown import MarkItDown
import json
import logging
from io import BytesIO
import re
import tempfile
import shutil
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Try to import Selenium, but make it optional
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
SELENIUM_AVAILABLE = True
except ImportError:
SELENIUM_AVAILABLE = False
logger.warning("Selenium not available. Falling back to simple crawler.")
class WebScraper:
def __init__(self, base_url, max_depth=2, delay=1.0, exclude_patterns=None, use_selenium=False):
self.base_url = base_url
self.domain = urlparse(base_url).netloc
self.visited_urls = set()
self.max_depth = max_depth
self.delay = delay
self.exclude_patterns = exclude_patterns or []
self.pages = {}
self.use_selenium = use_selenium and SELENIUM_AVAILABLE
if self.use_selenium:
self.setup_selenium()
else:
self.session = requests.Session()
self.base_path = urlparse(base_url).path.rstrip('/')
def setup_selenium(self):
"""Setup Selenium WebDriver with headless Chrome."""
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.set_page_load_timeout(30)
logger.info("Selenium WebDriver initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Selenium: {e}")
logger.info("Falling back to requests")
self.use_selenium = False
self.session = requests.Session()
def should_exclude(self, url):
"""Check if URL should be excluded based on patterns."""
for pattern in self.exclude_patterns:
if pattern in url:
return True
return False
def is_valid_url(self, url):
"""Check if the URL is valid and belongs to the same domain."""
parsed = urlparse(url)
if not (parsed.netloc and parsed.netloc == self.domain):
return False
return parsed.path.startswith(self.base_path)
def get_links_selenium(self, url):
"""Extract all links from the page using Selenium."""
try:
self.driver.get(url)
# Wait for page to load
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Try to wait for main content to load
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.TAG_NAME, "main"))
)
except TimeoutException:
pass
# Get page source after JavaScript execution
html = self.driver.page_source
self.pages[url] = html
# Extract links
links = set()
for a_tag in self.driver.find_elements(By.TAG_NAME, "a"):
href = a_tag.get_attribute("href")
if href:
full_url = urljoin(url, href)
if self.is_valid_url(full_url) and not self.should_exclude(full_url):
links.add(full_url)
return list(links), html
except Exception as e:
logger.error(f"Error getting links with Selenium from {url}: {e}")
return [], ""
def get_links_requests(self, url, html):
"""Extract all links from the HTML content using requests."""
soup = BeautifulSoup(html, 'html.parser')
links = set()
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
full_url = urljoin(url, href)
if self.is_valid_url(full_url) and not self.should_exclude(full_url):
links.add(full_url)
return list(links)
def get_page_requests(self, url):
"""Get page content using requests."""
try:
response = self.session.get(url, timeout=10)
if response.status_code == 200:
return response.text
else:
logger.warning(f"Failed to fetch {url}: HTTP {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching {url} with requests: {e}")
return None
def crawl(self, url=None, depth=0):
"""Crawl the website starting from the URL up to max_depth."""
if url is None:
url = self.base_url
if depth > self.max_depth or url in self.visited_urls:
return
self.visited_urls.add(url)
try:
logger.info(f"Crawling: {url} (Depth: {depth})")
if self.use_selenium:
links, html = self.get_links_selenium(url)
if html:
self.pages[url] = html
else:
html = self.get_page_requests(url)
if html:
self.pages[url] = html
links = self.get_links_requests(url, html)
else:
links = []
# Follow links
if depth < self.max_depth and links:
logger.info(f"Found {len(links)} links to follow from {url}")
for link in links:
time.sleep(self.delay)
self.crawl(link, depth + 1)
except Exception as e:
logger.error(f"Error crawling {url}: {e}")
def get_pages(self):
"""Return the dictionary of crawled pages."""
return self.pages
def close(self):
"""Close the requests session or Selenium driver."""
if self.use_selenium and hasattr(self, 'driver'):
self.driver.quit()
elif hasattr(self, 'session'):
self.session.close()
class OpenWebUIUploader:
def __init__(self, base_url, api_token):
self.base_url = base_url.rstrip('/')
self.api_token = api_token
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_token}",
"Accept": "application/json"
})
def get_knowledge_bases(self):
"""Get a list of all knowledge bases."""
endpoint = f"{self.base_url}/api/v1/knowledge/list"
try:
response = self.session.get(endpoint)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error getting knowledge bases: {e}")
raise
def get_knowledge_base_by_name(self, name):
"""Check if a knowledge base with the given name exists, and return its details if it does."""
try:
kbs = self.get_knowledge_bases()
for kb in kbs:
if kb.get('name') == name:
return kb
return None
except Exception as e:
logger.error(f"Error checking for existing knowledge base: {e}")
return None
def get_knowledge_base_files(self, kb_id):
"""Get all files in a knowledge base."""
endpoint = f"{self.base_url}/api/v1/knowledge/{kb_id}"
try:
response = self.session.get(endpoint)
response.raise_for_status()
kb_data = response.json()
return kb_data.get('files', [])
except requests.exceptions.RequestException as e:
logger.error(f"Error getting knowledge base files: {e}")
return []
def file_exists_in_kb(self, kb_id, filename):
"""Check if a file with the given name exists in the knowledge base."""
files = self.get_knowledge_base_files(kb_id)
for file in files:
if 'meta' in file and 'name' in file['meta'] and file['meta']['name'] == filename:
return file['id']
return None
def create_knowledge_base(self, name, purpose=None):
"""Create a new knowledge base in OpenWebUI."""
endpoint = f"{self.base_url}/api/v1/knowledge/create"
payload = {
"name": name,
"description": purpose or "Documentation"
}
try:
response = self.session.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error creating knowledge base: {e}")
raise
def validate_content(self, content, filename):
"""Validate that content is not empty and has sufficient meaningful text."""
if not content or not content.strip():
return False, "Content is empty"
# Count meaningful lines (not just headers or empty lines)
lines = [line.strip() for line in content.split('\n') if line.strip()]
meaningful_lines = [line for line in lines if not line.startswith('#') and len(line) > 20]
if len(meaningful_lines) < 3:
return False, f"Not enough meaningful content ({len(meaningful_lines)} lines)"
# Count words in clean content
clean_content = re.sub(r'#.*?\n', '', content)
clean_content = re.sub(r'```.*?```', '', clean_content, flags=re.DOTALL)
clean_content = re.sub(r'`.*?`', '', clean_content)
clean_content = re.sub(r'\*.*?\*', '', clean_content)
clean_content = clean_content.strip()
words = clean_content.split()
if len(words) < 50:
return False, f"Content too short ({len(words)} words after cleaning)"
return True, "Valid content"
def upload_file_from_path(self, kb_id, file_path, filename, content_type="text/markdown"):
"""Upload a file to the knowledge base from a file path."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")
return {"status": "error", "reason": f"read_error: {str(e)}"}
is_valid, validation_msg = self.validate_content(content, filename)
if not is_valid:
logger.warning(f"Skipping invalid file {filename}: {validation_msg}")
return {"status": "skipped", "reason": validation_msg}
upload_endpoint = f"{self.base_url}/api/v1/files/"
try:
with open(file_path, 'rb') as f:
files = {'file': (filename, f, content_type)}
upload_response = self.session.post(
upload_endpoint,
headers={"Authorization": f"Bearer {self.api_token}"},
files=files
)
upload_response.raise_for_status()
file_id = upload_response.json().get('id')
if not file_id:
raise ValueError("No file ID returned from upload")
add_file_endpoint = f"{self.base_url}/api/v1/knowledge/{kb_id}/file/add"
add_response = self.session.post(
add_file_endpoint,
headers={
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
},
json={'file_id': file_id}
)
if add_response.status_code == 400:
error_msg = add_response.text
if "empty" in error_msg.lower():
logger.warning(f"OpenWebUI rejected file {filename} as empty content")
try:
delete_endpoint = f"{self.base_url}/api/v1/files/{file_id}"
self.session.delete(delete_endpoint)
except:
pass
return {"status": "skipped", "reason": "rejected_as_empty_by_openwebui"}
else:
add_response.raise_for_status()
add_response.raise_for_status()
return add_response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error uploading file {filename}: {e}")
if hasattr(e, 'response') and e.response is not None:
if e.response.status_code == 400 and "empty" in str(e.response.text).lower():
logger.warning(f"OpenWebUI rejected file {filename} as empty content")
return {"status": "skipped", "reason": "rejected_as_empty_by_openwebui"}
return {"status": "error", "reason": f"upload_error: {str(e)}"}
def close(self):
"""Close the requests session."""
if hasattr(self, 'session') and self.session:
self.session.close()
def extract_clean_text(html_content, url):
"""Extract clean, meaningful text from HTML."""
soup = BeautifulSoup(html_content, 'html.parser')
# Remove unwanted elements
for element in soup(["script", "style", "nav", "header", "footer", "aside",
"meta", "link", "button", "form", "input", "select"]):
element.decompose()
# Try different content selectors
content_selectors = [
'main', 'article', '.content', '#content', '.main', '#main',
'.documentation', '#documentation', '.doc', '#doc',
'.page', '#page', '.post', '#post',
'body'
]
content_element = None
for selector in content_selectors:
content_element = soup.select_one(selector)
if content_element:
logger.info(f"Found content using selector: {selector}")
break
if not content_element:
content_element = soup
# Extract text with structure
text_parts = []
for element in content_element.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'li', 'div']):
text = element.get_text(strip=True)
if text and len(text) > 10:
if element.name.startswith('h'):
level = int(element.name[1])
text_parts.append(f"{'#' * level} {text}")
elif element.name == 'li':
text_parts.append(f"- {text}")
else:
text_parts.append(text)
# Fallback to general text extraction
if len(text_parts) < 3:
text = content_element.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text_parts = [chunk for chunk in chunks if chunk and len(chunk) > 20]
if text_parts:
content = f"# Source: {url}\n\n" + "\n\n".join(text_parts)
logger.info(f"Extracted {len(text_parts)} content blocks, {len(content)} total chars")
return content
else:
logger.warning(f"No meaningful content extracted from {url}")
return f"# Source: {url}\n\n*No meaningful text content could be extracted from this page.*"
def convert_to_markdown(html_content, url):
"""Convert HTML to Markdown with robust fallbacks."""
clean_text = extract_clean_text(html_content, url)
if len(clean_text.strip()) > 200:
return clean_text
try:
md = MarkItDown()
html_bytes = BytesIO(html_content.encode('utf-8'))
result = md.convert_stream(html_bytes, mime_type='text/html')
if result and hasattr(result, 'text_content') and result.text_content:
markdown_content = result.text_content.strip()
if markdown_content and len(markdown_content) > 200:
return f"# Source: {url}\n\n{markdown_content}"
except Exception as e:
logger.warning(f"MarkItDown failed for {url}: {e}")
return clean_text
def is_valid_json(content):
"""Check if content is valid JSON."""
try:
json.loads(content)
return True
except (ValueError, TypeError):
return False
def create_unique_filename(url):
"""Create a unique filename from URL including fragment."""
parsed = urlparse(url)
path = parsed.path
if not path or path == '/':
path = 'index'
fragment = parsed.fragment
if fragment:
fragment_clean = re.sub(r'[^a-zA-Z0-9]', '_', fragment)
filename = f"{path.strip('/')}_{fragment_clean}"
else:
filename = path.strip('/')
filename = re.sub(r'[^a-zA-Z0-9_.-]', '_', filename)
if len(filename) < 5:
domain_part = re.sub(r'[^a-zA-Z0-9]', '_', parsed.netloc)
filename = f"{domain_part}_{filename}"
if not filename.endswith('.md'):
filename = f"{filename}.md"
return filename
def save_files_to_temp_dir(processed_files, temp_dir):
"""Save processed files to temporary directory."""
saved_files = []
for file_info in processed_files:
try:
file_path = os.path.join(temp_dir, file_info['filename'])
counter = 1
original_path = file_path
while os.path.exists(file_path):
name, ext = os.path.splitext(original_path)
file_path = f"{name}_{counter}{ext}"
counter += 1
with open(file_path, 'w', encoding='utf-8') as f:
f.write(file_info['content'])
saved_files.append({
'file_path': file_path,
'filename': os.path.basename(file_path),
'content_type': file_info['content_type'],
'url': file_info['url']
})
logger.info(f"Saved file to temp directory: {os.path.basename(file_path)}")
except Exception as e:
logger.error(f"Error saving file {file_info['filename']}: {e}")
return saved_files
def main():
parser = argparse.ArgumentParser(description='Scrape a website and create an Open WebUI knowledge base')
parser.add_argument('--token', '-t', required=True, help='Your OpenWebUI API token')
parser.add_argument('--base-url', '-u', required=True, help='Base URL of your OpenWebUI instance (e.g., http://localhost:3000)')
parser.add_argument('--website-url', '-w', required=True, help='URL of the website to scrape')
parser.add_argument('--kb-name', '-n', required=True, help='Name for the knowledge base')
parser.add_argument('--kb-purpose', '-p', help='Purpose description for the knowledge base', default=None)
parser.add_argument('--depth', '-d', type=int, default=2, help='Maximum depth to crawl (default: 2)')
parser.add_argument('--delay', type=float, default=1.0, help='Delay between requests in seconds (default: 1.0)')
parser.add_argument('--exclude', '-e', action='append', help='URL patterns to exclude from crawling (can be specified multiple times)')
parser.add_argument('--include-json', '-j', action='store_true', help='Include JSON files and API endpoints')
parser.add_argument('--update', action='store_true', help='Update existing files in the knowledge base')
parser.add_argument('--skip-existing', action='store_true', help='Skip existing files in the knowledge base')
parser.add_argument('--min-content-length', type=int, default=200, help='Minimum content length to include (default: 200 characters)')
parser.add_argument('--keep-temp-files', action='store_true', help='Keep temporary files for debugging')
parser.add_argument('--use-selenium', action='store_true', help='Use Selenium for JavaScript-rendered sites')
args = parser.parse_args()
if args.update and args.skip_existing:
logger.error("Cannot use both --update and --skip-existing flags at the same time")
return 1
# Check if Selenium is requested but not available
if args.use_selenium and not SELENIUM_AVAILABLE:
logger.warning("Selenium requested but not available. Install with: pip install selenium webdriver-manager")
logger.warning("Falling back to simple crawler.")
args.use_selenium = False
scraper = None
uploader = None
temp_dir = None
try:
logger.info(f"Starting web crawl of {args.website_url} to depth {args.depth}")
logger.info(f"Using {'Selenium' if args.use_selenium else 'simple'} crawler")
scraper = WebScraper(
base_url=args.website_url,
max_depth=args.depth,
delay=args.delay,
exclude_patterns=args.exclude or [],
use_selenium=args.use_selenium
)
scraper.crawl()
crawled_pages = scraper.get_pages()
logger.info(f"Crawled {len(crawled_pages)} pages")
if not crawled_pages:
logger.error("No pages were crawled. Exiting.")
return 1
logger.info("Processing crawled content")
processed_files = []
empty_files = 0
for url, html_content in crawled_pages.items():
if not html_content or len(html_content.strip()) < 100:
logger.warning(f"Skipping empty page: {url}")
empty_files += 1
continue
if url.endswith('.json') or (is_valid_json(html_content) and args.include_json):
if is_valid_json(html_content):
try:
json_obj = json.loads(html_content)
pretty_json = json.dumps(json_obj, indent=2)
if len(pretty_json.strip()) >= args.min_content_length:
filename = create_unique_filename(url)
if not filename.endswith('.json'):
filename = f"{filename}.json"
processed_files.append({
'content': pretty_json,
'content_type': 'application/json',
'filename': filename,
'url': url
})
logger.info(f"Processed JSON content from {url}")
else:
logger.warning(f"Skipping JSON file {url} - content too short")
empty_files += 1
continue
except ValueError:
pass
markdown_content = convert_to_markdown(html_content, url)
if not markdown_content or len(markdown_content.strip()) < args.min_content_length:
logger.warning(f"Skipping {url} - no extractable content found after conversion")
empty_files += 1
continue
filename = create_unique_filename(url)
processed_files.append({
'content': markdown_content,
'content_type': 'text/markdown',
'filename': filename,
'url': url
})
logger.info(f"Processed {len(processed_files)} files, skipped {empty_files} empty files")
if not processed_files:
logger.error("No files with valid content were processed. Exiting.")
return 1
script_dir = Path(__file__).parent
temp_dir = script_dir / "temp_webscraper_files"
temp_dir.mkdir(exist_ok=True)
logger.info(f"Created temporary directory: {temp_dir}")
saved_files = save_files_to_temp_dir(processed_files, temp_dir)
logger.info(f"Saved {len(saved_files)} files to temporary directory")
logger.info("=== DEBUG: File Content Analysis ===")
for file_info in saved_files:
try:
with open(file_info['file_path'], 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
meaningful_lines = [line for line in lines if line.strip() and not line.startswith('#') and len(line.strip()) > 20]
logger.info(f"File: {file_info['filename']}")
logger.info(f" Total size: {len(content)} chars")
logger.info(f" Total lines: {len(lines)}")
logger.info(f" Meaningful lines: {len(meaningful_lines)}")
if meaningful_lines:
logger.info(f" First meaningful line: {meaningful_lines[0][:100]}{'...' if len(meaningful_lines[0]) > 100 else ''}")
except Exception as e:
logger.error(f"Error reading saved file {file_info['filename']}: {e}")
logger.info("=== END DEBUG ===")
uploader = OpenWebUIUploader(args.base_url, args.token)
existing_kb = uploader.get_knowledge_base_by_name(args.kb_name)
if existing_kb:
kb_id = existing_kb.get('id')
logger.info(f"Found existing knowledge base '{args.kb_name}' with ID: {kb_id}")
else:
logger.info(f"Creating new knowledge base '{args.kb_name}' in Open WebUI")
kb = uploader.create_knowledge_base(args.kb_name, args.kb_purpose)
kb_id = kb.get('id')
if not kb_id:
logger.error("Failed to get knowledge base ID")
return 1
logger.info(f"Created knowledge base with ID: {kb_id}")
success_count = 0
skip_count = 0
update_count = 0
error_count = 0
empty_skip_count = 0
for file_info in saved_files:
try:
filename = file_info['filename']
file_path = file_info['file_path']
existing_file_id = uploader.file_exists_in_kb(kb_id, filename)
if existing_file_id and args.skip_existing:
logger.info(f"Skipping existing file: {filename}")
skip_count += 1
continue
logger.info(f"Uploading file: {filename}")
result = uploader.upload_file_from_path(
kb_id,
file_path,
filename,
file_info['content_type']
)
if isinstance(result, dict) and result.get('status') in ['skipped', 'error']:
if result.get('status') == 'skipped':
empty_skip_count += 1
else:
error_count += 1
logger.warning(f"Failed to upload {filename}: {result.get('reason')}")
else:
success_count += 1
time.sleep(0.5)
except Exception as e:
logger.error(f"Failed to process {file_info['filename']}: {e}")
error_count += 1
logger.info(f"Upload complete: {success_count} files uploaded, {update_count} files updated, {skip_count} files skipped, {empty_skip_count} empty/invalid files skipped, {error_count} errors")
return 0
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return 1
finally:
if scraper:
scraper.close()
if uploader:
uploader.close()
if temp_dir and temp_dir.exists():
if args.keep_temp_files:
logger.info(f"Keeping temporary files in: {temp_dir}")
else:
try:
shutil.rmtree(temp_dir)
logger.info("Cleaned up temporary directory")
except Exception as e:
logger.warning(f"Failed to clean up temporary directory {temp_dir}: {e}")
if __name__ == "__main__":
sys.exit(main())