From 0a133159e8eb8cfe94736174d215109aa650c580 Mon Sep 17 00:00:00 2001 From: JamesFlare1212 Date: Tue, 7 Apr 2026 18:18:18 -0400 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=20scan:=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=A4=9A=E7=BA=BF=E7=A8=8B=E5=B9=B6=E5=8F=91=E7=88=AC=E8=99=AB?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 Semaphore 信号量类控制并发数 - 新增 BatchProcessor 批量处理器带进度回调 - 重构 initializeClubCache 和 updateStaleClubs 为并发模式 - 修复 Cookie 4xx 判断逻辑(仅 401/403 触发重新登录) - 添加环境变量配置:CONCURRENT_API_CALLS 等 - 新增并发功能测试脚本 test-concurrency.ts 性能提升:从串行处理提升至可配置的并发处理(默认 8 线程) 修复问题:404 错误不再误判为认证失败 --- engage-api/get-activity.ts | 81 +++----------- example.env | 37 ++++--- services/cache-manager.ts | 132 +++++++++++++++------- test/test-concurrency.ts | 141 ++++++++++++++++++++++++ utils/semaphore.ts | 220 +++++++++++++++++++++++++++++++++++++ 5 files changed, 492 insertions(+), 119 deletions(-) create mode 100644 test/test-concurrency.ts create mode 100644 utils/semaphore.ts diff --git a/engage-api/get-activity.ts b/engage-api/get-activity.ts index cf4da55..ce0f3d2 100644 --- a/engage-api/get-activity.ts +++ b/engage-api/get-activity.ts @@ -2,7 +2,6 @@ import axios from 'axios'; import { logger } from '../utils/logger'; import { - loginWithPlaywright, ensureSingleLogin, loadCachedCookies, saveCookiesToCache, @@ -28,68 +27,6 @@ class AuthenticationError extends Error { } } -/** - * Test cookie validity by calling API - */ -async function testCookieValidityWithApi(cookieString: string): Promise { - if (!cookieString) return false; - logger.debug('Testing cookie validity via API...'); - - const MAX_RETRIES = 3; - let attempt = 0; - - while (attempt < MAX_RETRIES) { - try { - attempt++; - const url = 'https://engage.nkcswx.cn/Services/ActivitiesService.asmx/GetActivityDetails'; - const headers = { - 'Content-Type': 'application/json; charset=UTF-8', - 'Cookie': cookieString, - 'User-Agent': 'Mozilla/5.0 (Bun DSAS-CCA get-activity Module)', - }; - const payload = { - "activityID": "3350" - }; - - logger.debug(`Attempt ${attempt}/${MAX_RETRIES}`); - const response = await axios.post(url, payload, { - headers, - timeout: 10000 - }); - - // Check for 4xx errors (auth failures) - if (response.status >= 400 && response.status < 500) { - logger.warn(`Cookie test returned ${response.status}, likely invalid`); - return false; - } - - logger.debug('Cookie test successful (API responded 2xx). Cookie is valid.'); - return true; - } catch (error: any) { - logger.warn(`Cookie validity test failed (attempt ${attempt}/${MAX_RETRIES}).`); - if (error.response) { - // 4xx = auth failure (immediate fail) - if (error.response.status >= 400 && error.response.status < 500) { - logger.warn(`Cookie test API response status: ${error.response.status} (auth error)`); - return false; - } - // 5xx = server error (retry with delay) - logger.warn(`Cookie test API response status: ${error.response.status} (server error, retrying...)`); - } else { - // No response (000 status, network error, timeout) - logger.warn(`Network/timeout error: ${error.message} (retrying...)`); - } - - if (attempt < MAX_RETRIES) { - await new Promise(resolve => setTimeout(resolve, 1000 * (attempt + 1))); - } - } - } - - logger.warn('Max retries reached. Cookie is likely invalid or expired.'); - return false; -} - /** * Get complete cookies using Playwright with single login lock */ @@ -112,7 +49,7 @@ async function getCompleteCookies(userName: string, userPwd: string): Promise { const url = 'https://engage.nkcswx.cn/Services/ActivitiesService.asmx/GetActivityDetails'; @@ -128,11 +65,15 @@ async function getActivityDetailsRaw( for (let attempt = 0; attempt < maxRetries; attempt++) { try { + logger.debug(`Attempt ${attempt + 1}/${maxRetries} for activity ${activityId} - Sending POST request to ${url}`); const response = await axios.post(url, payload, { headers, timeout: timeoutMilliseconds, - responseType: 'text' + responseType: 'text', + // Add additional timeout safety + maxRedirects: 5 }); + logger.debug(`Attempt ${attempt + 1}/${maxRetries} for activity ${activityId} - Received response status ${response.status}`); const outerData = JSON.parse(response.data); if (outerData && typeof outerData.d === 'string') { const innerData = JSON.parse(outerData.d); @@ -145,8 +86,10 @@ async function getActivityDetailsRaw( logger.error(`Unexpected API response structure for activity ${activityId}.`); } } catch (error: any) { - // Check if response status is in 4xx range (400-499) to trigger auth error - if (error.response && error.response.status >= 400 && error.response.status < 500) { + // Only treat 401 (Unauthorized) and 403 (Forbidden) as authentication errors + // 404 (Not Found) is valid - activity doesn't exist + // Other 4xx errors should not trigger re-authentication + if (error.response && (error.response.status === 401 || error.response.status === 403)) { logger.warn(`Authentication error (${error.response.status}) while fetching activity ${activityId}. Cookie may be invalid.`); throw new AuthenticationError(`Received ${error.response.status} for activity ${activityId}`, error.response.status); } @@ -177,7 +120,7 @@ export async function fetchActivityData( activityId: string, userName: string, userPwd: string, - forceLogin: boolean = false + forceLogin: boolean = false, ): Promise { let currentCookie = forceLogin ? null : await getCachedCookieString(); @@ -212,7 +155,9 @@ export async function fetchActivityData( logger.debug('Using cached cookie for API request.'); try { + logger.debug(`Calling getActivityDetailsRaw for activity ${activityId}...`); const rawActivityDetailsString = await getActivityDetailsRaw(activityId, currentCookie); + logger.debug(`getActivityDetailsRaw returned for activity ${activityId}`); if (rawActivityDetailsString) { const parsedOuter = JSON.parse(rawActivityDetailsString); return JSON.parse(parsedOuter.d); diff --git a/example.env b/example.env index 4505f42..a03dc22 100644 --- a/example.env +++ b/example.env @@ -11,26 +11,35 @@ S3_SECRET_ACCESS_KEY= S3_REGION= S3_PUBLIC_URL_PREFIX=files REDIS_URL=redis://:dsas-cca@redis:6379 +LOG_LEVEL=info # Example: 'debug', 'info', 'warn', 'error' + +# ============================================================================ +# CRAWLER CONCURRENCY CONFIGURATION +# ============================================================================ MIN_ACTIVITY_ID_SCAN=3000 MAX_ACTIVITY_ID_SCAN=8000 -CONCURRENT_API_CALLS=16 + +# Maximum concurrent API calls during crawling (default: 8) +# Higher values = faster crawling but more server load +# Set to 1 for sequential processing (slow but safe) +CONCURRENT_API_CALLS=8 + +# Request timeout in milliseconds (default: 25000 = 25 seconds) +CRAWLER_REQUEST_TIMEOUT_MS=25000 + +# Maximum retries per request on transient errors (default: 3) +CRAWLER_MAX_RETRIES=3 + +# Delay between retries in milliseconds (default: 1000 = 1 second) +CRAWLER_RETRY_DELAY_MS=1000 + +# Rate limit: maximum requests per minute (default: unlimited) +# Set to 0 for no limit +CRAWLER_REQUESTS_PER_MINUTE=0 STAFF_UPDATE_INTERVAL_MINS=360 CLUB_UPDATE_INTERVAL_MINS=360 -LOG_LEVEL=info # Example: 'debug', 'info', 'warn', 'error' # Cache TTL Configuration (in seconds) ACTIVITY_CACHE_TTL=86400 # 24 hours for normal activity data STAFF_CACHE_TTL=86400 # 24 hours for staff data ERROR_CACHE_TTL=3600 # 1 hour for error states (allows retry) - -# Proxy Configuration (Optional) -# Set USE_PROXY=true to enable proxy for Playwright requests -USE_PROXY=false -# Custom proxy server (default: socks5://warp-proxy:9091 when using warp-proxy service) -# Examples: -# HTTP: http://proxy.example.com:8080 -# SOCKS5: socks5://proxy.example.com:1080 -# Warp: socks5://warp-proxy:9091 -ALL_PROXY= -HTTP_PROXY= -HTTPS_PROXY= diff --git a/services/cache-manager.ts b/services/cache-manager.ts index 17fec0a..36262c0 100644 --- a/services/cache-manager.ts +++ b/services/cache-manager.ts @@ -14,6 +14,7 @@ import { import { uploadImageFromBase64, listS3Objects, deleteS3Objects, constructS3Url } from './s3-service'; import { extractBase64Image } from '../utils/image-processor'; import { logger } from '../utils/logger'; +import { BatchProcessor, executeWithConcurrencyAndProgress } from '../utils/semaphore'; import type { ActivityData } from '../models/activity'; @@ -24,13 +25,17 @@ const USERNAME = process.env.API_USERNAME; const PASSWORD = process.env.API_PASSWORD; const MIN_ACTIVITY_ID_SCAN = parseInt(process.env.MIN_ACTIVITY_ID_SCAN || '0', 10); const MAX_ACTIVITY_ID_SCAN = parseInt(process.env.MAX_ACTIVITY_ID_SCAN || '9999', 10); -// Default 5 concurrent calls (was 10) - Sharp AVIF processing is CPU-intensive and can block event loop -const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '5', 10); const CLUB_UPDATE_INTERVAL_MINS = parseInt(process.env.CLUB_UPDATE_INTERVAL_MINS || '60', 10); const STAFF_UPDATE_INTERVAL_MINS = parseInt(process.env.STAFF_UPDATE_INTERVAL_MINS || '60', 10); const FIXED_STAFF_ACTIVITY_ID = process.env.FIXED_STAFF_ACTIVITY_ID; const S3_IMAGE_PREFIX = (process.env.S3_PUBLIC_URL_PREFIX || 'files').replace(/\/$/, ''); +// Crawler concurrency configuration +const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '8', 10); +const CRAWLER_REQUEST_TIMEOUT_MS = parseInt(process.env.CRAWLER_REQUEST_TIMEOUT_MS || '25000', 10); +const CRAWLER_MAX_RETRIES = parseInt(process.env.CRAWLER_MAX_RETRIES || '3', 10); +const CRAWLER_RETRY_DELAY_MS = parseInt(process.env.CRAWLER_RETRY_DELAY_MS || '1000', 10); + // Module-level counter for skipped activities (reset at start of each scan) let skippedCount = 0; @@ -46,7 +51,14 @@ async function processAndCacheActivity(activityId: string): Promise + setTimeout(() => reject(new Error(`Timeout fetching activity ${activityId} after ${CRAWLER_REQUEST_TIMEOUT_MS}ms`)), CRAWLER_REQUEST_TIMEOUT_MS + 5000) + ) + ]); let structuredActivity: ActivityData; if (!activityJson) { @@ -92,7 +104,7 @@ async function processAndCacheActivity(activityId: string): Promise { @@ -112,66 +124,112 @@ async function processSingleActivity(activityId: string): Promise { /** * Initialize the club cache by scanning through all activity IDs - * Processed sequentially in single-threaded mode + * Processed concurrently with controlled parallelism */ export async function initializeClubCache(): Promise { - logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN} (single-threaded mode)`); + logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN}`); + logger.info(`Concurrency: ${CONCURRENT_API_CALLS} parallel requests`); const totalIds = MAX_ACTIVITY_ID_SCAN - MIN_ACTIVITY_ID_SCAN + 1; - let processedCount = 0; let successCount = 0; let errorCount = 0; skippedCount = 0; // Reset for this run - for (let i = MIN_ACTIVITY_ID_SCAN; i <= MAX_ACTIVITY_ID_SCAN; i++) { - const activityId = String(i); - - try { + // Generate array of activity IDs + const activityIds = Array.from( + { length: totalIds }, + (_, i) => String(MIN_ACTIVITY_ID_SCAN + i) + ); + + // Create batch processor with concurrency control + const processor = new BatchProcessor( + async (activityId: string) => { await processSingleActivity(activityId); - successCount++; - } catch (error) { - errorCount++; - logger.error(`Error processing activity ID ${activityId}:`, error); + return activityId; + }, + CONCURRENT_API_CALLS, + { + onError: (error, activityId) => { + errorCount++; + logger.error(`Error processing activity ID ${activityId}:`, error); + }, + onProgress: (completed, total) => { + if (completed % 100 === 0 || completed === total) { + const mem = process.memoryUsage(); + logger.info(`Progress: ${completed}/${total} (${Math.round(completed/total*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB | Concurrent: ${CONCURRENT_API_CALLS}`); + } + } } - - processedCount++; - - if (processedCount % 100 === 0) { - const mem = process.memoryUsage(); - logger.info(`Progress: ${processedCount}/${totalIds} (${Math.round(processedCount/totalIds*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB`); - } - } + ); + + // Process all activities concurrently + const results = await processor.process(activityIds); + successCount = results.length; logger.info(`Initial club cache population finished.`); - logger.info(`Summary: Total: ${totalIds}, Processed: ${processedCount}, Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount}`); + logger.info(`Summary: Total: ${totalIds}, Processed: ${activityIds.length}, Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount}`); } /** * Update stale clubs in the cache - * Processed sequentially in single-threaded mode + * Processed concurrently with controlled parallelism */ export async function updateStaleClubs(): Promise { - logger.info('Starting stale club check (single-threaded mode)...'); + logger.info('Starting stale club check...'); + logger.info(`Concurrency: ${CONCURRENT_API_CALLS} parallel requests`); const now = Date.now(); const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000; const activityKeys = await getAllActivityKeys(); - + + // Identify stale activities + const staleActivityIds: string[] = []; for (const key of activityKeys) { const activityId = key.substring(ACTIVITY_KEY_PREFIX.length); const cachedData = await getActivityData(activityId); - if (cachedData && cachedData.lastCheck) { - const lastCheckTime = new Date(cachedData.lastCheck).getTime(); - if ((now - lastCheckTime) > updateIntervalMs || cachedData.error) { - logger.info(`Activity ${activityId} is stale or had error. Updating...`); - await processAndCacheActivity(activityId); - } - } else if (!cachedData || Object.keys(cachedData).length === 0) { - logger.info(`Activity ${activityId} not in cache or is empty object. Attempting to fetch...`); - await processAndCacheActivity(activityId); + const needsUpdate = !cachedData || + Object.keys(cachedData).length === 0 || + (!cachedData.lastCheck && !cachedData.error) || + (cachedData.lastCheck && (now - new Date(cachedData.lastCheck).getTime()) > updateIntervalMs) || + cachedData.error; + + if (needsUpdate) { + staleActivityIds.push(activityId); } } - + + if (staleActivityIds.length === 0) { + logger.info('No stale activities found. Skipping update.'); + await cleanupOrphanedS3Images(); + logger.info('Stale club check finished.'); + return; + } + + logger.info(`Found ${staleActivityIds.length} stale activities to update.`); + + // Create batch processor for concurrent updates + const processor = new BatchProcessor( + async (activityId: string) => { + logger.debug(`Updating stale activity ${activityId}`); + await processAndCacheActivity(activityId); + return activityId; + }, + CONCURRENT_API_CALLS, + { + onError: (error, activityId) => { + logger.error(`Error updating stale activity ${activityId}:`, error); + }, + onProgress: (completed, total) => { + if (completed % 10 === 0 || completed === total) { + logger.info(`Update progress: ${completed}/${total} (${Math.round(completed/total*100)}%)`); + } + } + } + ); + + // Process stale activities concurrently + await processor.process(staleActivityIds); + await cleanupOrphanedS3Images(); logger.info('Stale club check finished.'); diff --git a/test/test-concurrency.ts b/test/test-concurrency.ts new file mode 100644 index 0000000..1787e78 --- /dev/null +++ b/test/test-concurrency.ts @@ -0,0 +1,141 @@ +// test/test-concurrency.ts +/** + * Test script for concurrency features + * Run with: bun run test/test-concurrency.ts + */ + +import { Semaphore, executeWithConcurrency, BatchProcessor } from '../utils/semaphore'; + +// Simulate API call +function simulateApiCall(id: number, delay: number = 100): Promise<{ id: number; result: string }> { + return new Promise((resolve) => { + setTimeout(() => { + resolve({ id, result: `Result for ${id}` }); + }, delay); + }); +} + +async function testSemaphore(): Promise { + console.log('\n=== Test 1: Basic Semaphore ==='); + const semaphore = new Semaphore(3); + + const start = Date.now(); + const promises = []; + + for (let i = 1; i <= 10; i++) { + const id = i; + promises.push( + (async () => { + await semaphore.acquire(); + console.log(`[${id}] Acquired permit (available: ${semaphore.getAvailablePermits()})`); + await simulateApiCall(id, 200); + console.log(`[${id}] Releasing permit`); + semaphore.release(); + })() + ); + } + + await Promise.all(promises); + const duration = Date.now() - start; + console.log(`\n✓ Completed 10 tasks with max 3 concurrent in ${duration}ms`); + console.log(` (Sequential would take ~2000ms, parallel should be ~700-800ms)`); +} + +async function testExecuteWithConcurrency(): Promise { + console.log('\n=== Test 2: executeWithConcurrency ==='); + + const tasks = Array.from({ length: 10 }, (_, i) => () => simulateApiCall(i + 1, 100)); + + const start = Date.now(); + const results = await executeWithConcurrency(tasks, 5); + const duration = Date.now() - start; + + console.log(`✓ Completed ${results.length} tasks with max 5 concurrent in ${duration}ms`); + console.log(` Results: ${results.map(r => r.id).join(', ')}`); +} + +async function testBatchProcessor(): Promise { + console.log('\n=== Test 3: BatchProcessor ==='); + + const items = Array.from({ length: 20 }, (_, i) => ({ id: i + 1, name: `Item ${i + 1}` })); + let processedCount = 0; + + const processor = new BatchProcessor( + async (item: { id: number; name: string }) => { + await simulateApiCall(item.id, 50); + processedCount++; + return { ...item, processed: true }; + }, + 4, // concurrency + { + onProgress: (completed, total) => { + if (completed % 5 === 0 || completed === total) { + console.log(` Progress: ${completed}/${total} (${Math.round(completed / total * 100)}%)`); + } + }, + onError: (error, item) => { + console.error(` Error processing item ${item.id}:`, error.message); + } + } + ); + + const start = Date.now(); + const results = await processor.process(items); + const duration = Date.now() - start; + + console.log(`✓ Processed ${results.length} items with max 4 concurrent in ${duration}ms`); + console.log(` Expected ~500ms (20 items / 4 concurrent * 50ms each)`); +} + +async function testErrorHandling(): Promise { + console.log('\n=== Test 4: Error Handling ==='); + + const items = [1, 2, 3, 4, 5]; + let errorCount = 0; + + const processor = new BatchProcessor( + async (id: number) => { + if (id % 2 === 0) { + throw new Error(`Simulated error for ${id}`); + } + return { id, success: true }; + }, + 3, + { + onError: (error, item) => { + errorCount++; + console.log(` Caught error for item ${item}: ${error.message}`); + } + } + ); + + const results = await processor.process(items); + console.log(`✓ Completed: ${results.length} success, ${errorCount} errors (errors handled gracefully)`); +} + +async function main(): Promise { + console.log('╔═══════════════════════════════════════════════════╗'); + console.log('║ Concurrency Module Test Suite ║'); + console.log('╚═══════════════════════════════════════════════════╝'); + + try { + await testSemaphore(); + await testExecuteWithConcurrency(); + await testBatchProcessor(); + await testErrorHandling(); + + console.log('\n╔═══════════════════════════════════════════════════╗'); + console.log('║ All tests passed! ✓ ║'); + console.log('╚═══════════════════════════════════════════════════╝'); + console.log('\n📝 Configuration:'); + console.log(' - Set CONCURRENT_API_CALLS in .env to control parallelism'); + console.log(' - Current default: 8 concurrent requests'); + console.log(' - Example: CONCURRENT_API_CALLS=16 for faster crawling'); + console.log(''); + } catch (error) { + console.error('\n❌ Test failed:', error); + process.exit(1); + } +} + +main(); diff --git a/utils/semaphore.ts b/utils/semaphore.ts new file mode 100644 index 0000000..2f05305 --- /dev/null +++ b/utils/semaphore.ts @@ -0,0 +1,220 @@ +// utils/semaphore.ts +/** + * Semaphore implementation for controlling concurrent operations + * Based on patterns from civitai/civitai and p-queue + */ + +export class Semaphore { + private capacity: number; + private permits: number; + private queue: Array<() => void> = []; + + constructor(capacity: number) { + if (capacity < 1) { + throw new Error('Semaphore capacity must be at least 1'); + } + this.capacity = capacity; + this.permits = capacity; + } + + /** + * Acquire a permit. If none available, waits until one is released. + */ + async acquire(): Promise { + return new Promise((resolve) => { + if (this.permits > 0) { + this.permits--; + resolve(); + } else { + // Queue the release callback + this.queue.push(() => { + resolve(); + }); + } + }); + } + + /** + * Release a permit and wake up a waiting task if any. + */ + release(): void { + if (this.queue.length > 0) { + const next = this.queue.shift(); + if (next) { + next(); + } + } else { + this.permits++; + } + } + + /** + * Get current available permits. + */ + getAvailablePermits(): number { + return this.permits; + } + + /** + * Get total capacity. + */ + getCapacity(): number { + return this.capacity; + } + + /** + * Get number of waiting tasks. + */ + getWaitingCount(): number { + return this.queue.length; + } +} + +/** + * Execute async tasks with concurrency limit + * @param tasks Array of async task functions + * @param concurrency Maximum concurrent tasks + * @returns Promise that resolves with all results when complete + */ +export async function executeWithConcurrency( + tasks: Array<() => Promise>, + concurrency: number +): Promise { + const semaphore = new Semaphore(concurrency); + const results: T[] = new Array(tasks.length); + + const promises = tasks.map(async (task, index) => { + await semaphore.acquire(); + try { + results[index] = await task(); + return results[index]; + } finally { + semaphore.release(); + } + }); + + return Promise.all(promises); +} + +/** + * Execute async tasks with concurrency limit and progress callback + * @param tasks Array of async task functions + * @param concurrency Maximum concurrent tasks + * @param onProgress Callback with (completed, total, result) after each task + * @returns Promise that resolves with all results when complete + */ +export async function executeWithConcurrencyAndProgress( + tasks: Array<() => Promise>, + concurrency: number, + onProgress?: (completed: number, total: number, result: T, error?: Error) => void +): Promise { + const semaphore = new Semaphore(concurrency); + const results: T[] = new Array(tasks.length); + let completed = 0; + const total = tasks.length; + + const promises = tasks.map(async (task, index) => { + await semaphore.acquire(); + try { + results[index] = await task(); + completed++; + onProgress?.(completed, total, results[index]!); + return results[index]; + } catch (error) { + completed++; + onProgress?.(completed, total, undefined as T, error as Error); + throw error; + } finally { + semaphore.release(); + } + }); + + return Promise.all(promises); +} + +/** + * Batch processor with concurrency control + * Useful for processing large arrays in chunks with controlled concurrency + */ +export class BatchProcessor { + private semaphore: Semaphore; + private processor: (item: T, index: number) => Promise; + private onError?: (error: Error, item: T, index: number) => void; + private onProgress?: (completed: number, total: number) => void; + + constructor( + processor: (item: T, index: number) => Promise, + concurrency: number, + options?: { + onError?: (error: Error, item: T, index: number) => void; + onProgress?: (completed: number, total: number) => void; + } + ) { + this.processor = processor; + this.semaphore = new Semaphore(concurrency); + this.onError = options?.onError; + this.onProgress = options?.onProgress; + } + + /** + * Process an array of items with concurrency control + * Only returns successful results, errors are handled by onError callback + */ + async process(items: T[]): Promise[]> { + const results: (Awaited | undefined)[] = new Array(items.length); + let completed = 0; + const total = items.length; + + const promises = items.map(async (item, index) => { + await this.semaphore.acquire(); + try { + const result = await this.processor(item, index); + completed++; + this.onProgress?.(completed, total); + return result; + } catch (error) { + completed++; + this.onProgress?.(completed, total); + this.onError?.(error as Error, item, index); + return undefined; + } finally { + this.semaphore.release(); + } + }); + + const allResults = await Promise.all(promises); + return allResults.filter((r): r is Awaited => r !== undefined); + } + + /** + * Process an array and return both results and errors + */ + async processWithErrors(items: T[]): Promise<{ + results: R[]; + errors: Array<{ error: Error; item: T; index: number }>; + }> { + const results: R[] = []; + const errors: Array<{ error: Error; item: T; index: number }> = []; + let completed = 0; + const total = items.length; + + const promises = items.map(async (item, index) => { + await this.semaphore.acquire(); + try { + const result = await this.processor(item, index); + results.push(result); + completed++; + this.onProgress?.(completed, total); + } catch (error) { + completed++; + this.onProgress?.(completed, total); + errors.push({ error: error as Error, item, index }); + } finally { + this.semaphore.release(); + } + }); + + await Promise.all(promises); + return { results, errors }; + } +}