diff --git a/batchpull.js b/batchpull.js index 152122e..17fb121 100644 --- a/batchpull.js +++ b/batchpull.js @@ -12,7 +12,7 @@ const MongoCacheManager = require('./mongoCacheManager'); const mongoCache = new MongoCacheManager( process.env.MONGO_URL, process.env.MONGO_DB_NAME, - 'cache' + process.env.MONGO_COLLECTION_NAME ); const models = [ @@ -45,7 +45,7 @@ async function getEmbedding(text) { return embeddings[0]; } -// Check cache: exact hash match or semantic similarity +// Check cache: exact hash match or semantic similarity with pagination async function getCachedResult(input, threshold = 0.9) { const inputHash = hashInput(input); @@ -56,62 +56,51 @@ async function getCachedResult(input, threshold = 0.9) { return exactMatch.value; } - // Semantic search with embedding & worker thread + // Semantic search with embedding & worker thread (paginated) const inputEmbedding = await getEmbedding(input); - const cachedEntries = await mongoCache.getAllEmbeddings(); - if (cachedEntries.length === 0) { - console.log("[HybridCache] No cache entries with embeddings found."); - return null; + const pageSize = 100; + let page = 0; + let globalBestMatch = null; + let globalBestScore = threshold; + + while (true) { + const cachedEntries = await mongoCache.getEmbeddingsPage(page, pageSize); + if (cachedEntries.length === 0) break; + + // Run worker on current page + const { bestMatch, bestScore } = await new Promise((resolve, reject) => { + const worker = new Worker(path.resolve(__dirname, './cosineSimilarityWorker.js')); + + worker.postMessage({ inputEmbedding, cachedEntries, threshold: globalBestScore }); + + worker.on('message', resolve); + worker.on('error', reject); + worker.on('exit', (code) => { + if (code !== 0) { + console.warn(`[HybridCache] Worker stopped with exit code ${code}`); + } + }); + }); + + if (bestScore > globalBestScore) { + globalBestScore = bestScore; + globalBestMatch = bestMatch; + } + + // Early exit if similarity is very high (e.g. 0.95) + if (globalBestScore >= 0.95) break; + + page++; } - return new Promise((resolve, reject) => { - const worker = new Worker(path.resolve(__dirname, './cosineSimilarityWorker.js')); - - let resolved = false; - - worker.postMessage({ inputEmbedding, cachedEntries, threshold }); - - worker.on('message', (msg) => { - if (msg.error) { - console.error('[HybridCache] Worker error:', msg.error); - if (!resolved) { - resolved = true; - resolve(null); - worker.terminate(); - } - } else { - const { bestMatch, bestScore } = msg; - if (!resolved) { - resolved = true; - if (bestMatch) { - console.log(`[HybridCache] Semantic match found with similarity ${bestScore.toFixed(2)}`); - resolve(bestMatch.value); - } else { - console.log("[HybridCache] No suitable semantic cache match found."); - resolve(null); - } - // Terminate after a short delay to ensure message flush - setTimeout(() => worker.terminate(), 50); - } - } - }); - - worker.on('error', (err) => { - console.error("[HybridCache] Worker thread error:", err); - if (!resolved) { - resolved = true; - reject(err); - } - }); - - worker.on('exit', (code) => { - if (code !== 0) { - console.warn(`[HybridCache] Worker stopped with exit code ${code}`); - } - }); - }); - + if (globalBestMatch) { + console.log(`[HybridCache] Semantic match found with similarity ${globalBestScore.toFixed(2)}`); + return globalBestMatch.value; + } else { + console.log("[HybridCache] No suitable semantic cache match found."); + return null; + } } // Store input + response in cache with embedding and hash @@ -169,7 +158,6 @@ async function fetchLLMResponse(input, retries = 5, backoff = 1000) { throw new Error('Max retries reached due to rate limiting.'); } - // Process a single input line: check cache, fetch if needed, store result async function processInput(input) { try { diff --git a/hybridCacheManager.js b/hybridCacheManager.js index cc0a240..ed31cc9 100644 --- a/hybridCacheManager.js +++ b/hybridCacheManager.js @@ -31,42 +31,47 @@ class HybridCacheManager { return exactMatch.value; } - // 🤖 Embedding-based semantic search + // 🤖 Embedding-based semantic search with pagination const inputEmbedding = await getEmbedding(input); - const cachedEntries = await this.mongoCache.getAllEmbeddings(); - if (cachedEntries.length === 0) { - console.log("[HybridCache] No cache entries with embeddings found."); - return null; + let page = 0; + const pageSize = 1000; + let globalBestMatch = null; + let globalBestScore = threshold; + + while (true) { + const cachedEntries = await this.mongoCache.getEmbeddingsPage(page, pageSize); + if (cachedEntries.length === 0) break; + + // Run worker on this page + const result = await new Promise((resolve, reject) => { + const worker = new Worker(path.resolve(__dirname, './cosineSimilarityWorker.js')); + worker.postMessage({ inputEmbedding, cachedEntries, threshold: globalBestScore }); + + worker.on('message', resolve); + worker.on('error', reject); + worker.on('exit', (code) => { + if (code !== 0) console.warn(`[HybridCache] Worker stopped with exit code ${code}`); + }); + }); + + if (result.bestScore > globalBestScore) { + globalBestScore = result.bestScore; + globalBestMatch = result.bestMatch; + } + + if (globalBestScore >= 0.95) break; + + page++; } - // Return a Promise that resolves with the worker's result - return new Promise((resolve, reject) => { - const worker = new Worker(path.resolve(__dirname, './cosineSimilarityWorker.js')); - - worker.postMessage({ inputEmbedding, cachedEntries, threshold }); - - worker.on('message', ({ bestMatch, bestScore }) => { - if (bestMatch) { - console.log(`[HybridCache] Semantic match found with similarity ${bestScore.toFixed(2)}`); - resolve(bestMatch.value); - } else { - console.log("[HybridCache] No suitable semantic cache match found."); - resolve(null); - } - worker.terminate(); - }); - - worker.on('error', (err) => { - console.error("[HybridCache] Worker thread error:", err); - reject(err); - }); - - worker.on('exit', (code) => { - if (code !== 0) - console.warn(`[HybridCache] Worker stopped with exit code ${code}`); - }); - }); + if (globalBestMatch) { + console.log(`[HybridCache] Semantic match found with similarity ${globalBestScore.toFixed(2)}`); + return globalBestMatch.value; + } else { + console.log("[HybridCache] No suitable semantic cache match found."); + return null; + } } async setCache(input, value) { diff --git a/mongoCacheManager.js b/mongoCacheManager.js index d3ce50f..0a68360 100644 --- a/mongoCacheManager.js +++ b/mongoCacheManager.js @@ -23,7 +23,8 @@ class MongoCacheManager { this.collection = this.client.db(this.dbName).collection(this.collectionName); this.connected = true; console.log("[MongoCache] Connected to MongoDB for caching."); - + await this.collection.createIndex({ hash: 1 }, { unique: true }); + await this.collection.createIndex({ key: 1 }, { unique: true }); // Try a dry-run write to detect read-only access try { await this.collection.insertOne({ _test: true }); @@ -74,18 +75,21 @@ class MongoCacheManager { return await this.collection.find({ embedding: { $exists: true } }).toArray(); } + async getEmbeddingsPage(page = 0, pageSize = 100) { + await this.connect(); + return await this.collection + .find({ embedding: { $exists: true } }) + .skip(page * pageSize) + .limit(pageSize) + .toArray(); + } + async setCache(input, value, embedding, hash) { await this.connect(); if (!this.connected || !this.collection || this.readOnly) return; const key = this.normalize(input); - const count = await this.collection.estimatedDocumentCount(); - const maxSize = 5000; - if (count >= maxSize) { - await this.collection.deleteOne({}, { sort: { updatedAt: 1 } }); - } - await this.collection.updateOne( { key }, { diff --git a/package-lock.json b/package-lock.json index 68a7800..db66e66 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "mongodb": "^5.7.0", "mysql2": "^3.2.0", "openai": "^4.104.0", + "p-limit": "^4.0.0", "qs": "^6.11.2", "redis": "^4.6.7", "uuid": "^9.0.0" @@ -1399,6 +1400,20 @@ "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==" }, + "node_modules/p-limit": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-4.0.0.tgz", + "integrity": "sha512-5b0R4txpzjPWVw/cXXUResoD4hb6U/x9BH08L7nw+GN1sezDzPdxeRvpc9c433fZhBan/wusjbCsqwqm4EIBIQ==", + "dependencies": { + "yocto-queue": "^1.0.0" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/parseurl": { "version": "1.3.3", "resolved": "https://registry.npmjs.org/parseurl/-/parseurl-1.3.3.tgz", @@ -1888,6 +1903,17 @@ "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==", "license": "ISC" + }, + "node_modules/yocto-queue": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.2.1.tgz", + "integrity": "sha512-AyeEbWOu/TAXdxlV9wmGcR0+yh2j3vYPGOECcIj2S7MkrLyC7ne+oye2BKTItt0ii2PHk4cDy+95+LshzbXnGg==", + "engines": { + "node": ">=12.20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } } } }