From 9ba47242822d5c5ff9c3f78f44ce40248e71af79 Mon Sep 17 00:00:00 2001 From: NekoMonci12 Date: Tue, 3 Jun 2025 18:41:24 +0700 Subject: [PATCH] Fetch Data Function use LLM7.io for batch ask LLM models then cache it. target: inputs.txt --- .gitignore | 5 ++ batchpull.js | 207 +++++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 9 ++- 3 files changed, 217 insertions(+), 4 deletions(-) create mode 100644 batchpull.js diff --git a/.gitignore b/.gitignore index c6bba59..576d577 100644 --- a/.gitignore +++ b/.gitignore @@ -128,3 +128,8 @@ dist .yarn/build-state.yml .yarn/install-state.gz .pnp.* + +# Inptus Files +inputs.txt +success.log +failed.lod diff --git a/batchpull.js b/batchpull.js new file mode 100644 index 0000000..152122e --- /dev/null +++ b/batchpull.js @@ -0,0 +1,207 @@ +require('dotenv').config(); + +const fs = require('fs'); +const axios = require('axios'); +const readline = require('readline'); +const crypto = require('crypto'); +const { Worker } = require('worker_threads'); +const path = require('path'); +const MongoCacheManager = require('./mongoCacheManager'); + +// Initialize MongoCacheManager singleton +const mongoCache = new MongoCacheManager( + process.env.MONGO_URL, + process.env.MONGO_DB_NAME, + 'cache' +); + +const models = [ + 'llama-4-scout-17b-16e-instruct', + 'ministral-8b-2410', + 'mistral-large-2411', + 'qwen2.5-coder-32b-instruct:int8', + 'gpt-4.1' +]; + +// Limit concurrency (ESM import dynamic) +async function getPLimit() { + const pLimit = (await import('p-limit')).default; + return pLimit; +} + +// Simple file loggers +const successLog = fs.createWriteStream('success.log', { flags: 'a' }); +const failedLog = fs.createWriteStream('failed.log', { flags: 'a' }); + +// Hash input text (for exact match caching) +function hashInput(input) { + return crypto.createHash('sha256').update(input.trim().toLowerCase()).digest('hex'); +} + +// Get embedding for text (calls your embedding module) +async function getEmbedding(text) { + const { getVoyageEmbeddings } = require('./embedding'); + const embeddings = await getVoyageEmbeddings([text]); + return embeddings[0]; +} + +// Check cache: exact hash match or semantic similarity +async function getCachedResult(input, threshold = 0.9) { + const inputHash = hashInput(input); + + // Exact hash match + const exactMatch = await mongoCache.getByHash(inputHash); + if (exactMatch) { + console.log("[HybridCache] Exact hash match found."); + return exactMatch.value; + } + + // Semantic search with embedding & worker thread + const inputEmbedding = await getEmbedding(input); + const cachedEntries = await mongoCache.getAllEmbeddings(); + + if (cachedEntries.length === 0) { + console.log("[HybridCache] No cache entries with embeddings found."); + return null; + } + + return new Promise((resolve, reject) => { + const worker = new Worker(path.resolve(__dirname, './cosineSimilarityWorker.js')); + + let resolved = false; + + worker.postMessage({ inputEmbedding, cachedEntries, threshold }); + + worker.on('message', (msg) => { + if (msg.error) { + console.error('[HybridCache] Worker error:', msg.error); + if (!resolved) { + resolved = true; + resolve(null); + worker.terminate(); + } + } else { + const { bestMatch, bestScore } = msg; + if (!resolved) { + resolved = true; + if (bestMatch) { + console.log(`[HybridCache] Semantic match found with similarity ${bestScore.toFixed(2)}`); + resolve(bestMatch.value); + } else { + console.log("[HybridCache] No suitable semantic cache match found."); + resolve(null); + } + // Terminate after a short delay to ensure message flush + setTimeout(() => worker.terminate(), 50); + } + } + }); + + worker.on('error', (err) => { + console.error("[HybridCache] Worker thread error:", err); + if (!resolved) { + resolved = true; + reject(err); + } + }); + + worker.on('exit', (code) => { + if (code !== 0) { + console.warn(`[HybridCache] Worker stopped with exit code ${code}`); + } + }); + }); + +} + +// Store input + response in cache with embedding and hash +async function setCache(input, value) { + const embedding = await getEmbedding(input); + const hash = hashInput(input); + await mongoCache.setCache(input, value, embedding, hash); + console.log("[HybridCache] Stored new cache entry with embedding and hash."); +} + +// Read inputs from file +async function readInputs(filePath) { + const rl = readline.createInterface({ + input: fs.createReadStream(filePath), + crlfDelay: Infinity + }); + + const inputs = []; + for await (const line of rl) { + const trimmed = line.trim(); + if (trimmed) inputs.push(trimmed); + } + return inputs; +} + +async function fetchLLMResponse(input, retries = 5, backoff = 1000) { + for (let attempt = 0; attempt < retries; attempt++) { + try { + const randomModel = models[Math.floor(Math.random() * models.length)]; + const res = await axios.post( + 'https://api.llm7.io/v1/chat/completions', + { + model: randomModel, + messages: [{ role: 'user', content: input }] + }, + { + headers: { + 'Content-Type': 'application/json' + } + } + ); + return res.data.choices[0].message.content.trim(); + } catch (err) { + if (err.response && err.response.status === 429) { + // Rate limit hit, wait then retry + const waitTime = backoff * Math.pow(2, attempt); // Exponential backoff + console.warn(`⚠️ Rate limit hit. Waiting ${waitTime}ms before retrying... (Attempt ${attempt + 1}/${retries})`); + await new Promise(r => setTimeout(r, waitTime)); + } else { + // Other errors - rethrow immediately + throw err; + } + } + } + throw new Error('Max retries reached due to rate limiting.'); +} + + +// Process a single input line: check cache, fetch if needed, store result +async function processInput(input) { + try { + const cached = await getCachedResult(input); + if (cached) { + console.log(`🔁 Cache hit: "${input}"`); + return; + } + + const response = await fetchLLMResponse(input); + await setCache(input, response); + + console.log(`✅ Stored: "${input}"`); + successLog.write(`${input}\n`); + } catch (err) { + console.error(`❌ Failed: "${input}" → ${err.message}`); + failedLog.write(`${input} | ${err.message}\n`); + } +} + +// Main async runner +async function main() { + const pLimit = await getPLimit(); + const CONCURRENCY = 5; + const limit = pLimit(CONCURRENCY); + + const inputs = await readInputs('./inputs.txt'); + await Promise.all(inputs.map(input => limit(() => processInput(input)))); + + successLog.end(); + failedLog.end(); + console.log('📝 Done! Check success.log and failed.log.'); +} + +main(); diff --git a/package.json b/package.json index c0dee09..10e4067 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,15 @@ { "dependencies": { - "express": "^4.18.2", + "axios": "^1.4.0", "discord.js": "^14.11.0", "dotenv": "^16.0.0", + "express": "^4.18.2", + "mongodb": "^5.7.0", "mysql2": "^3.2.0", "openai": "^4.104.0", - "axios": "^1.4.0", "qs": "^6.11.2", "redis": "^4.6.7", - "mongodb": "^5.7.0", - "uuid": "^9.0.0" + "p-limit": "^4.0.0", + "uuid": "^9.0.0" } }