From fc98dbbbae1820d9e39f28f8fb936804d2be50da Mon Sep 17 00:00:00 2001 From: JamesFlare1212 Date: Tue, 7 Apr 2026 08:38:15 -0400 Subject: [PATCH] fix(scan): remove p-limit --- docker-compose.yaml | 1 - index.ts | 34 +++++++------ services/cache-manager.ts | 103 ++++++++++---------------------------- services/s3-service.ts | 24 +++------ 4 files changed, 52 insertions(+), 110 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index d4b4a43..7597684 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -48,7 +48,6 @@ services: redis: image: "redis:8.0-alpine" container_name: dsas-cca-redis - # Add memory limits to prevent OOM command: redis-server --requirepass "dsas-cca" volumes: - ./redis_data:/data diff --git a/index.ts b/index.ts index 6100039..0d8a25b 100644 --- a/index.ts +++ b/index.ts @@ -191,9 +191,11 @@ app.get('/v1/activity/list', async (req: Request, res: Response) => { return res.json({}); } - const allActivities = await Promise.all( - activityKeys.map(async k => getActivityData(k.substring(ACTIVITY_KEY_PREFIX.length))) - ); + const allActivities: (ActivityData | null)[] = []; + for (const k of activityKeys) { + const activityData = await getActivityData(k.substring(ACTIVITY_KEY_PREFIX.length)); + allActivities.push(activityData); + } /* ---------- gather available filter values for validation ---------- */ const availableCategories = new Set(); @@ -261,13 +263,13 @@ app.get('/v1/activity/category', async (_req: Request, res: Response) => { logger.info('No activity keys found in Redis for categories.'); return res.json({}); } - // Fetch all activity data in parallel - const allActivityDataPromises = activityKeys.map(async (key) => { + // Fetch all activity data sequentially + const allActivities: (ActivityData | null)[] = []; + for (const key of activityKeys) { const activityId = key.substring(ACTIVITY_KEY_PREFIX.length); - return getActivityData(activityId); - }); - - const allActivities = await Promise.all(allActivityDataPromises); + const activityData = await getActivityData(activityId); + allActivities.push(activityData); + } allActivities.forEach((activityData: ActivityData | null) => { if (activityData && @@ -301,13 +303,13 @@ app.get('/v1/activity/academicYear', async (_req: Request, res: Response) => { logger.info('No activity keys found in Redis for academic years.'); return res.json({}); } - // 1. Fetch all activity data in parallel - const allActivities = await Promise.all( - activityKeys.map(async (key) => { - const activityId = key.substring(ACTIVITY_KEY_PREFIX.length); - return getActivityData(activityId); - }) - ); + // 1. Fetch all activity data sequentially + const allActivities: (ActivityData | null)[] = []; + for (const key of activityKeys) { + const activityId = key.substring(ACTIVITY_KEY_PREFIX.length); + const activityData = await getActivityData(activityId); + allActivities.push(activityData); + } // 2. Count activities per academic year allActivities.forEach((activityData: ActivityData | null) => { if ( diff --git a/services/cache-manager.ts b/services/cache-manager.ts index fd2f176..17fec0a 100644 --- a/services/cache-manager.ts +++ b/services/cache-manager.ts @@ -1,6 +1,5 @@ // services/cache-manager.ts import { config } from 'dotenv'; -import pLimit from 'p-limit'; import { fetchActivityData } from '../engage-api/get-activity'; import { structActivityData } from '../engage-api/struct-activity'; import { structStaffData } from '../engage-api/struct-staff'; @@ -113,62 +112,33 @@ async function processSingleActivity(activityId: string): Promise { /** * Initialize the club cache by scanning through all activity IDs - * Processed in batches to prevent memory pressure from accumulating all promises upfront + * Processed sequentially in single-threaded mode */ export async function initializeClubCache(): Promise { - logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN}`); + logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN} (single-threaded mode)`); const totalIds = MAX_ACTIVITY_ID_SCAN - MIN_ACTIVITY_ID_SCAN + 1; - const BATCH_SIZE = 100; let processedCount = 0; let successCount = 0; let errorCount = 0; skippedCount = 0; // Reset for this run - for (let batchStart = MIN_ACTIVITY_ID_SCAN; batchStart <= MAX_ACTIVITY_ID_SCAN; batchStart += BATCH_SIZE) { - const batchEnd = Math.min(batchStart + BATCH_SIZE - 1, MAX_ACTIVITY_ID_SCAN); - const batchPromises: Promise[] = []; - // Create fresh p-limit instance per batch to prevent internal queue accumulation - const limit = pLimit(CONCURRENT_API_CALLS); + for (let i = MIN_ACTIVITY_ID_SCAN; i <= MAX_ACTIVITY_ID_SCAN; i++) { + const activityId = String(i); - logger.info(`Processing batch ${Math.floor(processedCount / BATCH_SIZE) + 1}/${Math.ceil(totalIds / BATCH_SIZE)} (IDs ${batchStart}-${batchEnd})`); - - for (let i = batchStart; i <= batchEnd; i++) { - const activityId = String(i); - batchPromises.push( - limit(() => - processSingleActivity(activityId) - .then(() => { - successCount++; - processedCount++; - if (processedCount % 100 === 0) { - const mem = process.memoryUsage(); - logger.info(`Progress: ${processedCount}/${totalIds} (${Math.round(processedCount/totalIds*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB`); - } - }) - .catch((error: unknown) => { - errorCount++; - processedCount++; - logger.error(`Error processing activity ID ${activityId}:`, error); - if (processedCount % 100 === 0) { - const mem = process.memoryUsage(); - logger.info(`Progress: ${processedCount}/${totalIds} (${Math.round(processedCount/totalIds*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB`); - } - }) - ) - ); - } - - await Promise.allSettled(batchPromises); - batchPromises.length = 0; - - // Garbage collection hint and longer delay between batches to allow event loop to drain - if (global.gc) { - global.gc(false); + try { + await processSingleActivity(activityId); + successCount++; + } catch (error) { + errorCount++; + logger.error(`Error processing activity ID ${activityId}:`, error); } - if (batchEnd < MAX_ACTIVITY_ID_SCAN) { - await new Promise(resolve => setTimeout(resolve, 500)); + processedCount++; + + if (processedCount % 100 === 0) { + const mem = process.memoryUsage(); + logger.info(`Progress: ${processedCount}/${totalIds} (${Math.round(processedCount/totalIds*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB`); } } @@ -178,51 +148,32 @@ export async function initializeClubCache(): Promise { /** * Update stale clubs in the cache + * Processed sequentially in single-threaded mode */ export async function updateStaleClubs(): Promise { - logger.info('Starting stale club check...'); + logger.info('Starting stale club check (single-threaded mode)...'); const now = Date.now(); const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000; - const limit = pLimit(CONCURRENT_API_CALLS); - const promises: Promise[] = []; const activityKeys = await getAllActivityKeys(); for (const key of activityKeys) { const activityId = key.substring(ACTIVITY_KEY_PREFIX.length); - promises.push(limit(async () => { - const cachedData = await getActivityData(activityId); - - if (cachedData && cachedData.lastCheck) { - const lastCheckTime = new Date(cachedData.lastCheck).getTime(); - if ((now - lastCheckTime) > updateIntervalMs || cachedData.error) { - logger.info(`Activity ${activityId} is stale or had error. Updating...`); - await processAndCacheActivity(activityId); - } - } else if (!cachedData || Object.keys(cachedData).length === 0) { - logger.info(`Activity ${activityId} not in cache or is empty object. Attempting to fetch...`); + const cachedData = await getActivityData(activityId); + + if (cachedData && cachedData.lastCheck) { + const lastCheckTime = new Date(cachedData.lastCheck).getTime(); + if ((now - lastCheckTime) > updateIntervalMs || cachedData.error) { + logger.info(`Activity ${activityId} is stale or had error. Updating...`); await processAndCacheActivity(activityId); } - })); + } else if (!cachedData || Object.keys(cachedData).length === 0) { + logger.info(`Activity ${activityId} not in cache or is empty object. Attempting to fetch...`); + await processAndCacheActivity(activityId); + } } await cleanupOrphanedS3Images(); - // Process promises in batches to prevent event loop blocking - const BATCH_SIZE = 50; - for (let i = 0; i < promises.length; i += BATCH_SIZE) { - const batch = promises.slice(i, i + BATCH_SIZE); - const batchNum = Math.floor(i / BATCH_SIZE) + 1; - const totalBatches = Math.ceil(promises.length / BATCH_SIZE); - - await Promise.all(batch); - logger.info(`Stale check batch ${batchNum}/${totalBatches} complete`); - - // Small delay between batches to prevent event loop blocking - if (i + BATCH_SIZE < promises.length) { - await new Promise(resolve => setTimeout(resolve, 100)); - } - } - logger.info('Stale club check finished.'); } diff --git a/services/s3-service.ts b/services/s3-service.ts index aeb573a..24689e5 100644 --- a/services/s3-service.ts +++ b/services/s3-service.ts @@ -164,26 +164,16 @@ export async function deleteS3Objects(objectKeysArray: string[]): Promise s3Client!.delete(key)) - ); - // Count successes and failures - for (const result of results) { - if (result.status === 'fulfilled') { - successCount++; - } else { - errorCount++; - logger.error(`Failed to delete object: ${result.reason}`); - } + for (const key of objectKeysArray) { + try { + await s3Client.delete(key); + successCount++; + } catch (error) { + errorCount++; + logger.error(`Failed to delete object ${key}:`, error); } } logger.info(`Deleted ${successCount} objects from S3. Failed: ${errorCount}`);