Files
RakunNakun-AI/batchpull.js
2025-06-04 07:26:07 +07:00

196 lines
5.7 KiB
JavaScript

require('dotenv').config();
const fs = require('fs');
const axios = require('axios');
const readline = require('readline');
const crypto = require('crypto');
const { Worker } = require('worker_threads');
const path = require('path');
const MongoCacheManager = require('./mongoCacheManager');
// Initialize MongoCacheManager singleton
const mongoCache = new MongoCacheManager(
process.env.MONGO_URL,
process.env.MONGO_DB_NAME,
process.env.MONGO_COLLECTION_NAME
);
const models = [
'llama-4-scout-17b-16e-instruct',
'ministral-8b-2410',
'mistral-large-2411',
'qwen2.5-coder-32b-instruct:int8',
'gpt-4.1'
];
// Limit concurrency (ESM import dynamic)
async function getPLimit() {
const pLimit = (await import('p-limit')).default;
return pLimit;
}
// Simple file loggers
const successLog = fs.createWriteStream('success.log', { flags: 'a' });
const failedLog = fs.createWriteStream('failed.log', { flags: 'a' });
// Hash input text (for exact match caching)
function hashInput(input) {
return crypto.createHash('sha256').update(input.trim().toLowerCase()).digest('hex');
}
// Get embedding for text (calls your embedding module)
async function getEmbedding(text) {
const { getVoyageEmbeddings } = require('./embedding');
const embeddings = await getVoyageEmbeddings([text]);
return embeddings[0];
}
// Check cache: exact hash match or semantic similarity with pagination
async function getCachedResult(input, threshold = 0.9) {
const inputHash = hashInput(input);
// Exact hash match
const exactMatch = await mongoCache.getByHash(inputHash);
if (exactMatch) {
console.log("[HybridCache] Exact hash match found.");
return exactMatch.value;
}
// Semantic search with embedding & worker thread (paginated)
const inputEmbedding = await getEmbedding(input);
const pageSize = 100;
let page = 0;
let globalBestMatch = null;
let globalBestScore = threshold;
while (true) {
const cachedEntries = await mongoCache.getEmbeddingsPage(page, pageSize);
if (cachedEntries.length === 0) break;
// Run worker on current page
const { bestMatch, bestScore } = await new Promise((resolve, reject) => {
const worker = new Worker(path.resolve(__dirname, './cosineSimilarityWorker.js'));
worker.postMessage({ inputEmbedding, cachedEntries, threshold: globalBestScore });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
console.warn(`[HybridCache] Worker stopped with exit code ${code}`);
}
});
});
if (bestScore > globalBestScore) {
globalBestScore = bestScore;
globalBestMatch = bestMatch;
}
// Early exit if similarity is very high (e.g. 0.95)
if (globalBestScore >= 0.95) break;
page++;
}
if (globalBestMatch) {
console.log(`[HybridCache] Semantic match found with similarity ${globalBestScore.toFixed(2)}`);
return globalBestMatch.value;
} else {
console.log("[HybridCache] No suitable semantic cache match found.");
return null;
}
}
// Store input + response in cache with embedding and hash
async function setCache(input, value) {
const embedding = await getEmbedding(input);
const hash = hashInput(input);
await mongoCache.setCache(input, value, embedding, hash);
console.log("[HybridCache] Stored new cache entry with embedding and hash.");
}
// Read inputs from file
async function readInputs(filePath) {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity
});
const inputs = [];
for await (const line of rl) {
const trimmed = line.trim();
if (trimmed) inputs.push(trimmed);
}
return inputs;
}
async function fetchLLMResponse(input, retries = 5, backoff = 1000) {
for (let attempt = 0; attempt < retries; attempt++) {
try {
const randomModel = models[Math.floor(Math.random() * models.length)];
const res = await axios.post(
'https://api.llm7.io/v1/chat/completions',
{
model: randomModel,
messages: [{ role: 'user', content: input }]
},
{
headers: {
'Content-Type': 'application/json'
}
}
);
return res.data.choices[0].message.content.trim();
} catch (err) {
if (err.response && err.response.status === 429) {
// Rate limit hit, wait then retry
const waitTime = backoff * Math.pow(2, attempt); // Exponential backoff
console.warn(`⚠️ Rate limit hit. Waiting ${waitTime}ms before retrying... (Attempt ${attempt + 1}/${retries})`);
await new Promise(r => setTimeout(r, waitTime));
} else {
// Other errors - rethrow immediately
throw err;
}
}
}
throw new Error('Max retries reached due to rate limiting.');
}
// Process a single input line: check cache, fetch if needed, store result
async function processInput(input) {
try {
const cached = await getCachedResult(input);
if (cached) {
console.log(`🔁 Cache hit: "${input}"`);
return;
}
const response = await fetchLLMResponse(input);
await setCache(input, response);
console.log(`✅ Stored: "${input}"`);
successLog.write(`${input}\n`);
} catch (err) {
console.error(`❌ Failed: "${input}" → ${err.message}`);
failedLog.write(`${input} | ${err.message}\n`);
}
}
// Main async runner
async function main() {
const pLimit = await getPLimit();
const CONCURRENCY = 5;
const limit = pLimit(CONCURRENCY);
const inputs = await readInputs('./inputs.txt');
await Promise.all(inputs.map(input => limit(() => processInput(input))));
successLog.end();
failedLog.end();
console.log('📝 Done! Check success.log and failed.log.');
}
main();