Fetch Data Function

use LLM7.io for batch ask LLM models then cache it.
target: inputs.txt
This commit is contained in:
NekoMonci12
2025-06-03 18:41:24 +07:00
parent a2c81409f0
commit 9ba4724282
3 changed files with 217 additions and 4 deletions

207
batchpull.js Normal file
View File

@@ -0,0 +1,207 @@
require('dotenv').config();
const fs = require('fs');
const axios = require('axios');
const readline = require('readline');
const crypto = require('crypto');
const { Worker } = require('worker_threads');
const path = require('path');
const MongoCacheManager = require('./mongoCacheManager');
// Initialize MongoCacheManager singleton
const mongoCache = new MongoCacheManager(
process.env.MONGO_URL,
process.env.MONGO_DB_NAME,
'cache'
);
const models = [
'llama-4-scout-17b-16e-instruct',
'ministral-8b-2410',
'mistral-large-2411',
'qwen2.5-coder-32b-instruct:int8',
'gpt-4.1'
];
// Limit concurrency (ESM import dynamic)
async function getPLimit() {
const pLimit = (await import('p-limit')).default;
return pLimit;
}
// Simple file loggers
const successLog = fs.createWriteStream('success.log', { flags: 'a' });
const failedLog = fs.createWriteStream('failed.log', { flags: 'a' });
// Hash input text (for exact match caching)
function hashInput(input) {
return crypto.createHash('sha256').update(input.trim().toLowerCase()).digest('hex');
}
// Get embedding for text (calls your embedding module)
async function getEmbedding(text) {
const { getVoyageEmbeddings } = require('./embedding');
const embeddings = await getVoyageEmbeddings([text]);
return embeddings[0];
}
// Check cache: exact hash match or semantic similarity
async function getCachedResult(input, threshold = 0.9) {
const inputHash = hashInput(input);
// Exact hash match
const exactMatch = await mongoCache.getByHash(inputHash);
if (exactMatch) {
console.log("[HybridCache] Exact hash match found.");
return exactMatch.value;
}
// Semantic search with embedding & worker thread
const inputEmbedding = await getEmbedding(input);
const cachedEntries = await mongoCache.getAllEmbeddings();
if (cachedEntries.length === 0) {
console.log("[HybridCache] No cache entries with embeddings found.");
return null;
}
return new Promise((resolve, reject) => {
const worker = new Worker(path.resolve(__dirname, './cosineSimilarityWorker.js'));
let resolved = false;
worker.postMessage({ inputEmbedding, cachedEntries, threshold });
worker.on('message', (msg) => {
if (msg.error) {
console.error('[HybridCache] Worker error:', msg.error);
if (!resolved) {
resolved = true;
resolve(null);
worker.terminate();
}
} else {
const { bestMatch, bestScore } = msg;
if (!resolved) {
resolved = true;
if (bestMatch) {
console.log(`[HybridCache] Semantic match found with similarity ${bestScore.toFixed(2)}`);
resolve(bestMatch.value);
} else {
console.log("[HybridCache] No suitable semantic cache match found.");
resolve(null);
}
// Terminate after a short delay to ensure message flush
setTimeout(() => worker.terminate(), 50);
}
}
});
worker.on('error', (err) => {
console.error("[HybridCache] Worker thread error:", err);
if (!resolved) {
resolved = true;
reject(err);
}
});
worker.on('exit', (code) => {
if (code !== 0) {
console.warn(`[HybridCache] Worker stopped with exit code ${code}`);
}
});
});
}
// Store input + response in cache with embedding and hash
async function setCache(input, value) {
const embedding = await getEmbedding(input);
const hash = hashInput(input);
await mongoCache.setCache(input, value, embedding, hash);
console.log("[HybridCache] Stored new cache entry with embedding and hash.");
}
// Read inputs from file
async function readInputs(filePath) {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity
});
const inputs = [];
for await (const line of rl) {
const trimmed = line.trim();
if (trimmed) inputs.push(trimmed);
}
return inputs;
}
async function fetchLLMResponse(input, retries = 5, backoff = 1000) {
for (let attempt = 0; attempt < retries; attempt++) {
try {
const randomModel = models[Math.floor(Math.random() * models.length)];
const res = await axios.post(
'https://api.llm7.io/v1/chat/completions',
{
model: randomModel,
messages: [{ role: 'user', content: input }]
},
{
headers: {
'Content-Type': 'application/json'
}
}
);
return res.data.choices[0].message.content.trim();
} catch (err) {
if (err.response && err.response.status === 429) {
// Rate limit hit, wait then retry
const waitTime = backoff * Math.pow(2, attempt); // Exponential backoff
console.warn(`⚠️ Rate limit hit. Waiting ${waitTime}ms before retrying... (Attempt ${attempt + 1}/${retries})`);
await new Promise(r => setTimeout(r, waitTime));
} else {
// Other errors - rethrow immediately
throw err;
}
}
}
throw new Error('Max retries reached due to rate limiting.');
}
// Process a single input line: check cache, fetch if needed, store result
async function processInput(input) {
try {
const cached = await getCachedResult(input);
if (cached) {
console.log(`🔁 Cache hit: "${input}"`);
return;
}
const response = await fetchLLMResponse(input);
await setCache(input, response);
console.log(`✅ Stored: "${input}"`);
successLog.write(`${input}\n`);
} catch (err) {
console.error(`❌ Failed: "${input}" → ${err.message}`);
failedLog.write(`${input} | ${err.message}\n`);
}
}
// Main async runner
async function main() {
const pLimit = await getPLimit();
const CONCURRENCY = 5;
const limit = pLimit(CONCURRENCY);
const inputs = await readInputs('./inputs.txt');
await Promise.all(inputs.map(input => limit(() => processInput(input))));
successLog.end();
failedLog.end();
console.log('📝 Done! Check success.log and failed.log.');
}
main();