diff --git a/engage-api/get-activity.ts b/engage-api/get-activity.ts index 1a6b982..ea17b52 100644 --- a/engage-api/get-activity.ts +++ b/engage-api/get-activity.ts @@ -78,9 +78,15 @@ async function getActivityDetailsRaw( // CRITICAL: Only accept HTTP 200. Reject all other status codes including 5xx if (response.status !== 200) { logger.error(`Non-200 status ${response.status} for activity ${activityId}. NOT updating cache to preserve local data.`); - if (attempt === maxRetries - 1) { - logger.error(`All ${maxRetries} retries failed with non-200 status for activity ${activityId}.`); + + // On 5xx errors, set flag to validate cookie on next request + // Backend may be in degraded state and invalidated sessions + if (response.status >= 500 && response.status < 600) { + logger.warn(`Server error ${response.status} - will validate cookie on next request.`); + shouldValidateCookieOnNextRequest = true; } + + // Return null immediately on non-200, don't retry return null; } @@ -97,6 +103,7 @@ async function getActivityDetailsRaw( logger.error(`Unexpected API response structure for activity ${activityId}.`); } } catch (error: any) { + // Only treat 401 (Unauthorized) and 403 (Forbidden) as authentication errors // 404 (Not Found) is valid - activity doesn't exist // Other 4xx/5xx errors should not trigger re-authentication @@ -108,14 +115,18 @@ async function getActivityDetailsRaw( if (error.response) { logger.error(`Status: ${error.response.status}, Data (getActivityDetailsRaw): ${ String(error.response.data).slice(0,100)}...`); - // CRITICAL: 5xx errors should NOT update cache + // CRITICAL: 5xx errors should NOT update cache, return null immediately if (error.response.status >= 500 && error.response.status < 600) { logger.error(`Server error ${error.response.status} - preserving local cache, not updating.`); + // Set flag to validate cookie on next request + shouldValidateCookieOnNextRequest = true; + return null; } } if (attempt === maxRetries - 1) { logger.error(`All ${maxRetries} retries failed for activity ${activityId}.`); - throw error; + // Don't throw on network/timeout errors, just return null to preserve cache + return null; } await new Promise(resolve => setTimeout(resolve, 1000 * (attempt + 1))); } @@ -123,6 +134,9 @@ async function getActivityDetailsRaw( return null; } +// Flag to track if we need to validate cookies after server errors +let shouldValidateCookieOnNextRequest = false; + /** * Main exported function. Handles cookie caching, validation, re-authentication, and fetches activity details. * @param activityId - The ID of the activity to fetch. @@ -146,7 +160,7 @@ export async function fetchActivityData( } // Optimization: Skip pre-validation, directly request data - // Only validate/re-login when we get 4xx error (fail-fast strategy) + // Only validate/re-login when we get 4xx error OR after 5xx (backend may be in degraded state) if (!currentCookie) { logger.info('No cached cookie found. Attempting login...'); try { @@ -167,6 +181,31 @@ export async function fetchActivityData( return null; } + // Validate cookie after previous 5xx error (backend may have invalidated sessions) + if (shouldValidateCookieOnNextRequest) { + logger.info('Validating cookie after previous server error...'); + shouldValidateCookieOnNextRequest = false; + // Simple validation: try to fetch a known activity + const testResponse = await getActivityDetailsRaw('1', currentCookie, 1, 5000); + if (!testResponse) { + logger.warn('Cookie validation failed after server error. Re-login required.'); + await clearCookieCache(); + try { + currentCookie = await getCompleteCookies(userName, userPwd); + const cookies = await loadCachedCookies(); + if (cookies) { + await saveCookiesToCache(cookies); + } + logger.info('Re-login completed due to cookie validation failure.'); + } catch (loginError) { + logger.error(`Re-login failed: ${(loginError as Error).message}`); + return null; + } + } else { + logger.info('Cookie validation successful after server error.'); + } + } + logger.debug('Using cached cookie for API request.'); try { @@ -177,6 +216,7 @@ export async function fetchActivityData( const parsedOuter = JSON.parse(rawActivityDetailsString); return JSON.parse(parsedOuter.d); } + // Check if this was a 5xx error and set flag for cookie validation logger.warn(`No data returned from getActivityDetailsRaw for activity ${activityId}, but no authentication error was thrown.`); return null; } catch (error) { diff --git a/services/cache-manager.ts b/services/cache-manager.ts index feb3f89..5239f9b 100644 --- a/services/cache-manager.ts +++ b/services/cache-manager.ts @@ -14,7 +14,7 @@ import { import { uploadImageFromBase64, listS3Objects, constructS3Url } from './s3-service'; import { extractBase64Image } from '../utils/image-processor'; import { logger } from '../utils/logger'; - +import { BatchProcessor } from '../utils/semaphore'; import type { ActivityData } from '../models/activity'; @@ -30,38 +30,8 @@ const STAFF_UPDATE_INTERVAL_MINS = parseInt(process.env.STAFF_UPDATE_INTERVAL_MI const FIXED_STAFF_ACTIVITY_ID = process.env.FIXED_STAFF_ACTIVITY_ID; const S3_IMAGE_PREFIX = (process.env.S3_PUBLIC_URL_PREFIX || 'files').replace(/\/$/, ''); -/** - * Extracts S3 object key from a public URL. - * Handles both S3_PUBLIC_URL and S3_ENDPOINT formats. - * - * Examples: - * - https://cdn.example.com/bucket/files/abc123.avif → files/abc123.avif - * - https://s3.amazonaws.com/bucket/files/abc123.avif → files/abc123.avif - * - * @param url - The full S3 public URL - * @returns The object key (e.g., "files/abc123.avif") or null if parsing fails - */ -function extractObjectKeyFromUrl(url: string): string | null { - try { - const urlObj = new URL(url); - const path = urlObj.pathname; - - const parts = path.replace(/^\//, '').split('/').filter(p => p); - - if (parts.length >= 2) { - const key = parts.slice(1).join('/'); - return key; - } - - logger.warn(`Failed to extract object key from URL: ${url} - insufficient path parts`); - return null; - } catch (error) { - logger.warn(`Failed to parse URL: ${url}`, error); - return null; - } -} - -// Crawler configuration +// Crawler concurrency configuration +const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '8', 10); const CRAWLER_REQUEST_TIMEOUT_MS = parseInt(process.env.CRAWLER_REQUEST_TIMEOUT_MS || '25000', 10); const CRAWLER_MAX_RETRIES = parseInt(process.env.CRAWLER_MAX_RETRIES || '3', 10); const CRAWLER_RETRY_DELAY_MS = parseInt(process.env.CRAWLER_RETRY_DELAY_MS || '1000', 10); @@ -172,10 +142,11 @@ async function processSingleActivity(activityId: string): Promise { /** * Initialize the club cache by scanning through all activity IDs - * Processed sequentially + * Processed concurrently with controlled parallelism */ export async function initializeClubCache(): Promise { logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN}`); + logger.info(`Concurrency: ${CONCURRENT_API_CALLS} parallel requests`); const totalIds = MAX_ACTIVITY_ID_SCAN - MIN_ACTIVITY_ID_SCAN + 1; let successCount = 0; @@ -188,23 +159,30 @@ export async function initializeClubCache(): Promise { (_, i) => String(MIN_ACTIVITY_ID_SCAN + i) ); - // Process all activities sequentially - for (let i = 0; i < activityIds.length; i++) { - const activityId = activityIds[i]!; - try { + // Create batch processor with concurrency control + const processor = new BatchProcessor( + async (activityId: string) => { await processSingleActivity(activityId); - successCount++; - } catch (error) { - errorCount++; - logger.error(`Error processing activity ID ${activityId}:`, error); + return activityId; + }, + CONCURRENT_API_CALLS, + { + onError: (error, activityId) => { + errorCount++; + logger.error(`Error processing activity ID ${activityId}:`, error); + }, + onProgress: (completed, total) => { + if (completed % 100 === 0 || completed === total) { + const mem = process.memoryUsage(); + logger.info(`Progress: ${completed}/${total} (${Math.round(completed/total*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB | Concurrent: ${CONCURRENT_API_CALLS}`); + } + } } - - // Log progress every 100 activities or at completion - if ((i + 1) % 100 === 0 || i === activityIds.length - 1) { - const mem = process.memoryUsage(); - logger.info(`Progress: ${i + 1}/${totalIds} (${Math.round((i + 1) / totalIds * 100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed / 1024 / 1024)}MB`); - } - } + ); + + // Process all activities concurrently + const results = await processor.process(activityIds); + successCount = results.length; logger.info(`Initial club cache population finished.`); logger.info(`Summary: Total: ${totalIds}, Processed: ${activityIds.length}, Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount}`); @@ -212,10 +190,11 @@ export async function initializeClubCache(): Promise { /** * Update stale clubs in the cache - * Processed sequentially + * Processed concurrently with controlled parallelism */ export async function updateStaleClubs(): Promise { logger.info('Starting stale club check...'); + logger.info(`Concurrency: ${CONCURRENT_API_CALLS} parallel requests`); const now = Date.now(); const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000; const activityKeys = await getAllActivityKeys(); @@ -245,21 +224,28 @@ export async function updateStaleClubs(): Promise { logger.info(`Found ${staleActivityIds.length} stale activities to update.`); - // Process stale activities sequentially - for (let i = 0; i < staleActivityIds.length; i++) { - const activityId = staleActivityIds[i]!; - try { + // Create batch processor for concurrent updates + const processor = new BatchProcessor( + async (activityId: string) => { logger.debug(`Updating stale activity ${activityId}`); await processAndCacheActivity(activityId); - } catch (error) { - logger.error(`Error updating stale activity ${activityId}:`, error); + return activityId; + }, + CONCURRENT_API_CALLS, + { + onError: (error, activityId) => { + logger.error(`Error updating stale activity ${activityId}:`, error); + }, + onProgress: (completed, total) => { + if (completed % 10 === 0 || completed === total) { + logger.info(`Update progress: ${completed}/${total} (${Math.round(completed/total*100)}%)`); + } + } } - - // Log progress every 10 activities or at completion - if ((i + 1) % 10 === 0 || i === staleActivityIds.length - 1) { - logger.info(`Update progress: ${i + 1}/${staleActivityIds.length} (${Math.round((i + 1) / staleActivityIds.length * 100)}%)`); - } - } + ); + + // Process stale activities concurrently + await processor.process(staleActivityIds); logger.info('Stale club check finished.'); } diff --git a/test/test-concurrency.ts b/test/test-concurrency.ts new file mode 100644 index 0000000..1787e78 --- /dev/null +++ b/test/test-concurrency.ts @@ -0,0 +1,141 @@ +// test/test-concurrency.ts +/** + * Test script for concurrency features + * Run with: bun run test/test-concurrency.ts + */ + +import { Semaphore, executeWithConcurrency, BatchProcessor } from '../utils/semaphore'; + +// Simulate API call +function simulateApiCall(id: number, delay: number = 100): Promise<{ id: number; result: string }> { + return new Promise((resolve) => { + setTimeout(() => { + resolve({ id, result: `Result for ${id}` }); + }, delay); + }); +} + +async function testSemaphore(): Promise { + console.log('\n=== Test 1: Basic Semaphore ==='); + const semaphore = new Semaphore(3); + + const start = Date.now(); + const promises = []; + + for (let i = 1; i <= 10; i++) { + const id = i; + promises.push( + (async () => { + await semaphore.acquire(); + console.log(`[${id}] Acquired permit (available: ${semaphore.getAvailablePermits()})`); + await simulateApiCall(id, 200); + console.log(`[${id}] Releasing permit`); + semaphore.release(); + })() + ); + } + + await Promise.all(promises); + const duration = Date.now() - start; + console.log(`\n✓ Completed 10 tasks with max 3 concurrent in ${duration}ms`); + console.log(` (Sequential would take ~2000ms, parallel should be ~700-800ms)`); +} + +async function testExecuteWithConcurrency(): Promise { + console.log('\n=== Test 2: executeWithConcurrency ==='); + + const tasks = Array.from({ length: 10 }, (_, i) => () => simulateApiCall(i + 1, 100)); + + const start = Date.now(); + const results = await executeWithConcurrency(tasks, 5); + const duration = Date.now() - start; + + console.log(`✓ Completed ${results.length} tasks with max 5 concurrent in ${duration}ms`); + console.log(` Results: ${results.map(r => r.id).join(', ')}`); +} + +async function testBatchProcessor(): Promise { + console.log('\n=== Test 3: BatchProcessor ==='); + + const items = Array.from({ length: 20 }, (_, i) => ({ id: i + 1, name: `Item ${i + 1}` })); + let processedCount = 0; + + const processor = new BatchProcessor( + async (item: { id: number; name: string }) => { + await simulateApiCall(item.id, 50); + processedCount++; + return { ...item, processed: true }; + }, + 4, // concurrency + { + onProgress: (completed, total) => { + if (completed % 5 === 0 || completed === total) { + console.log(` Progress: ${completed}/${total} (${Math.round(completed / total * 100)}%)`); + } + }, + onError: (error, item) => { + console.error(` Error processing item ${item.id}:`, error.message); + } + } + ); + + const start = Date.now(); + const results = await processor.process(items); + const duration = Date.now() - start; + + console.log(`✓ Processed ${results.length} items with max 4 concurrent in ${duration}ms`); + console.log(` Expected ~500ms (20 items / 4 concurrent * 50ms each)`); +} + +async function testErrorHandling(): Promise { + console.log('\n=== Test 4: Error Handling ==='); + + const items = [1, 2, 3, 4, 5]; + let errorCount = 0; + + const processor = new BatchProcessor( + async (id: number) => { + if (id % 2 === 0) { + throw new Error(`Simulated error for ${id}`); + } + return { id, success: true }; + }, + 3, + { + onError: (error, item) => { + errorCount++; + console.log(` Caught error for item ${item}: ${error.message}`); + } + } + ); + + const results = await processor.process(items); + console.log(`✓ Completed: ${results.length} success, ${errorCount} errors (errors handled gracefully)`); +} + +async function main(): Promise { + console.log('╔═══════════════════════════════════════════════════╗'); + console.log('║ Concurrency Module Test Suite ║'); + console.log('╚═══════════════════════════════════════════════════╝'); + + try { + await testSemaphore(); + await testExecuteWithConcurrency(); + await testBatchProcessor(); + await testErrorHandling(); + + console.log('\n╔═══════════════════════════════════════════════════╗'); + console.log('║ All tests passed! ✓ ║'); + console.log('╚═══════════════════════════════════════════════════╝'); + console.log('\n📝 Configuration:'); + console.log(' - Set CONCURRENT_API_CALLS in .env to control parallelism'); + console.log(' - Current default: 8 concurrent requests'); + console.log(' - Example: CONCURRENT_API_CALLS=16 for faster crawling'); + console.log(''); + } catch (error) { + console.error('\n❌ Test failed:', error); + process.exit(1); + } +} + +main(); diff --git a/utils/semaphore.ts b/utils/semaphore.ts new file mode 100644 index 0000000..2f05305 --- /dev/null +++ b/utils/semaphore.ts @@ -0,0 +1,220 @@ +// utils/semaphore.ts +/** + * Semaphore implementation for controlling concurrent operations + * Based on patterns from civitai/civitai and p-queue + */ + +export class Semaphore { + private capacity: number; + private permits: number; + private queue: Array<() => void> = []; + + constructor(capacity: number) { + if (capacity < 1) { + throw new Error('Semaphore capacity must be at least 1'); + } + this.capacity = capacity; + this.permits = capacity; + } + + /** + * Acquire a permit. If none available, waits until one is released. + */ + async acquire(): Promise { + return new Promise((resolve) => { + if (this.permits > 0) { + this.permits--; + resolve(); + } else { + // Queue the release callback + this.queue.push(() => { + resolve(); + }); + } + }); + } + + /** + * Release a permit and wake up a waiting task if any. + */ + release(): void { + if (this.queue.length > 0) { + const next = this.queue.shift(); + if (next) { + next(); + } + } else { + this.permits++; + } + } + + /** + * Get current available permits. + */ + getAvailablePermits(): number { + return this.permits; + } + + /** + * Get total capacity. + */ + getCapacity(): number { + return this.capacity; + } + + /** + * Get number of waiting tasks. + */ + getWaitingCount(): number { + return this.queue.length; + } +} + +/** + * Execute async tasks with concurrency limit + * @param tasks Array of async task functions + * @param concurrency Maximum concurrent tasks + * @returns Promise that resolves with all results when complete + */ +export async function executeWithConcurrency( + tasks: Array<() => Promise>, + concurrency: number +): Promise { + const semaphore = new Semaphore(concurrency); + const results: T[] = new Array(tasks.length); + + const promises = tasks.map(async (task, index) => { + await semaphore.acquire(); + try { + results[index] = await task(); + return results[index]; + } finally { + semaphore.release(); + } + }); + + return Promise.all(promises); +} + +/** + * Execute async tasks with concurrency limit and progress callback + * @param tasks Array of async task functions + * @param concurrency Maximum concurrent tasks + * @param onProgress Callback with (completed, total, result) after each task + * @returns Promise that resolves with all results when complete + */ +export async function executeWithConcurrencyAndProgress( + tasks: Array<() => Promise>, + concurrency: number, + onProgress?: (completed: number, total: number, result: T, error?: Error) => void +): Promise { + const semaphore = new Semaphore(concurrency); + const results: T[] = new Array(tasks.length); + let completed = 0; + const total = tasks.length; + + const promises = tasks.map(async (task, index) => { + await semaphore.acquire(); + try { + results[index] = await task(); + completed++; + onProgress?.(completed, total, results[index]!); + return results[index]; + } catch (error) { + completed++; + onProgress?.(completed, total, undefined as T, error as Error); + throw error; + } finally { + semaphore.release(); + } + }); + + return Promise.all(promises); +} + +/** + * Batch processor with concurrency control + * Useful for processing large arrays in chunks with controlled concurrency + */ +export class BatchProcessor { + private semaphore: Semaphore; + private processor: (item: T, index: number) => Promise; + private onError?: (error: Error, item: T, index: number) => void; + private onProgress?: (completed: number, total: number) => void; + + constructor( + processor: (item: T, index: number) => Promise, + concurrency: number, + options?: { + onError?: (error: Error, item: T, index: number) => void; + onProgress?: (completed: number, total: number) => void; + } + ) { + this.processor = processor; + this.semaphore = new Semaphore(concurrency); + this.onError = options?.onError; + this.onProgress = options?.onProgress; + } + + /** + * Process an array of items with concurrency control + * Only returns successful results, errors are handled by onError callback + */ + async process(items: T[]): Promise[]> { + const results: (Awaited | undefined)[] = new Array(items.length); + let completed = 0; + const total = items.length; + + const promises = items.map(async (item, index) => { + await this.semaphore.acquire(); + try { + const result = await this.processor(item, index); + completed++; + this.onProgress?.(completed, total); + return result; + } catch (error) { + completed++; + this.onProgress?.(completed, total); + this.onError?.(error as Error, item, index); + return undefined; + } finally { + this.semaphore.release(); + } + }); + + const allResults = await Promise.all(promises); + return allResults.filter((r): r is Awaited => r !== undefined); + } + + /** + * Process an array and return both results and errors + */ + async processWithErrors(items: T[]): Promise<{ + results: R[]; + errors: Array<{ error: Error; item: T; index: number }>; + }> { + const results: R[] = []; + const errors: Array<{ error: Error; item: T; index: number }> = []; + let completed = 0; + const total = items.length; + + const promises = items.map(async (item, index) => { + await this.semaphore.acquire(); + try { + const result = await this.processor(item, index); + results.push(result); + completed++; + this.onProgress?.(completed, total); + } catch (error) { + completed++; + this.onProgress?.(completed, total); + errors.push({ error: error as Error, item, index }); + } finally { + this.semaphore.release(); + } + }); + + await Promise.all(promises); + return { results, errors }; + } +}