重构 scan: 实现多线程并发爬虫功能
- 新增 Semaphore 信号量类控制并发数 - 新增 BatchProcessor 批量处理器带进度回调 - 重构 initializeClubCache 和 updateStaleClubs 为并发模式 - 修复 Cookie 4xx 判断逻辑(仅 401/403 触发重新登录) - 添加环境变量配置:CONCURRENT_API_CALLS 等 - 新增并发功能测试脚本 test-concurrency.ts 性能提升:从串行处理提升至可配置的并发处理(默认 8 线程) 修复问题:404 错误不再误判为认证失败
This commit is contained in:
@@ -14,6 +14,7 @@ import {
|
||||
import { uploadImageFromBase64, listS3Objects, deleteS3Objects, constructS3Url } from './s3-service';
|
||||
import { extractBase64Image } from '../utils/image-processor';
|
||||
import { logger } from '../utils/logger';
|
||||
import { BatchProcessor, executeWithConcurrencyAndProgress } from '../utils/semaphore';
|
||||
|
||||
import type { ActivityData } from '../models/activity';
|
||||
|
||||
@@ -24,13 +25,17 @@ const USERNAME = process.env.API_USERNAME;
|
||||
const PASSWORD = process.env.API_PASSWORD;
|
||||
const MIN_ACTIVITY_ID_SCAN = parseInt(process.env.MIN_ACTIVITY_ID_SCAN || '0', 10);
|
||||
const MAX_ACTIVITY_ID_SCAN = parseInt(process.env.MAX_ACTIVITY_ID_SCAN || '9999', 10);
|
||||
// Default 5 concurrent calls (was 10) - Sharp AVIF processing is CPU-intensive and can block event loop
|
||||
const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '5', 10);
|
||||
const CLUB_UPDATE_INTERVAL_MINS = parseInt(process.env.CLUB_UPDATE_INTERVAL_MINS || '60', 10);
|
||||
const STAFF_UPDATE_INTERVAL_MINS = parseInt(process.env.STAFF_UPDATE_INTERVAL_MINS || '60', 10);
|
||||
const FIXED_STAFF_ACTIVITY_ID = process.env.FIXED_STAFF_ACTIVITY_ID;
|
||||
const S3_IMAGE_PREFIX = (process.env.S3_PUBLIC_URL_PREFIX || 'files').replace(/\/$/, '');
|
||||
|
||||
// Crawler concurrency configuration
|
||||
const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '8', 10);
|
||||
const CRAWLER_REQUEST_TIMEOUT_MS = parseInt(process.env.CRAWLER_REQUEST_TIMEOUT_MS || '25000', 10);
|
||||
const CRAWLER_MAX_RETRIES = parseInt(process.env.CRAWLER_MAX_RETRIES || '3', 10);
|
||||
const CRAWLER_RETRY_DELAY_MS = parseInt(process.env.CRAWLER_RETRY_DELAY_MS || '1000', 10);
|
||||
|
||||
// Module-level counter for skipped activities (reset at start of each scan)
|
||||
let skippedCount = 0;
|
||||
|
||||
@@ -46,7 +51,14 @@ async function processAndCacheActivity(activityId: string): Promise<ActivityData
|
||||
throw new Error('API username or password not configured');
|
||||
}
|
||||
|
||||
const activityJson = await fetchActivityData(activityId, USERNAME, PASSWORD);
|
||||
// Add timeout protection for the entire fetch operation
|
||||
logger.debug(`Fetching activity data for ID: ${activityId}`);
|
||||
const activityJson = await Promise.race([
|
||||
fetchActivityData(activityId, USERNAME, PASSWORD, false),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error(`Timeout fetching activity ${activityId} after ${CRAWLER_REQUEST_TIMEOUT_MS}ms`)), CRAWLER_REQUEST_TIMEOUT_MS + 5000)
|
||||
)
|
||||
]);
|
||||
let structuredActivity: ActivityData;
|
||||
|
||||
if (!activityJson) {
|
||||
@@ -92,7 +104,7 @@ async function processAndCacheActivity(activityId: string): Promise<ActivityData
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single activity for initialization (extracted for Promise.allSettled)
|
||||
* Process a single activity for initialization
|
||||
* @param activityId - The activity ID to process
|
||||
*/
|
||||
async function processSingleActivity(activityId: string): Promise<void> {
|
||||
@@ -112,66 +124,112 @@ async function processSingleActivity(activityId: string): Promise<void> {
|
||||
|
||||
/**
|
||||
* Initialize the club cache by scanning through all activity IDs
|
||||
* Processed sequentially in single-threaded mode
|
||||
* Processed concurrently with controlled parallelism
|
||||
*/
|
||||
export async function initializeClubCache(): Promise<void> {
|
||||
logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN} (single-threaded mode)`);
|
||||
logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN}`);
|
||||
logger.info(`Concurrency: ${CONCURRENT_API_CALLS} parallel requests`);
|
||||
|
||||
const totalIds = MAX_ACTIVITY_ID_SCAN - MIN_ACTIVITY_ID_SCAN + 1;
|
||||
let processedCount = 0;
|
||||
let successCount = 0;
|
||||
let errorCount = 0;
|
||||
skippedCount = 0; // Reset for this run
|
||||
|
||||
for (let i = MIN_ACTIVITY_ID_SCAN; i <= MAX_ACTIVITY_ID_SCAN; i++) {
|
||||
const activityId = String(i);
|
||||
|
||||
try {
|
||||
// Generate array of activity IDs
|
||||
const activityIds = Array.from(
|
||||
{ length: totalIds },
|
||||
(_, i) => String(MIN_ACTIVITY_ID_SCAN + i)
|
||||
);
|
||||
|
||||
// Create batch processor with concurrency control
|
||||
const processor = new BatchProcessor(
|
||||
async (activityId: string) => {
|
||||
await processSingleActivity(activityId);
|
||||
successCount++;
|
||||
} catch (error) {
|
||||
errorCount++;
|
||||
logger.error(`Error processing activity ID ${activityId}:`, error);
|
||||
return activityId;
|
||||
},
|
||||
CONCURRENT_API_CALLS,
|
||||
{
|
||||
onError: (error, activityId) => {
|
||||
errorCount++;
|
||||
logger.error(`Error processing activity ID ${activityId}:`, error);
|
||||
},
|
||||
onProgress: (completed, total) => {
|
||||
if (completed % 100 === 0 || completed === total) {
|
||||
const mem = process.memoryUsage();
|
||||
logger.info(`Progress: ${completed}/${total} (${Math.round(completed/total*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB | Concurrent: ${CONCURRENT_API_CALLS}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
processedCount++;
|
||||
|
||||
if (processedCount % 100 === 0) {
|
||||
const mem = process.memoryUsage();
|
||||
logger.info(`Progress: ${processedCount}/${totalIds} (${Math.round(processedCount/totalIds*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Process all activities concurrently
|
||||
const results = await processor.process(activityIds);
|
||||
successCount = results.length;
|
||||
|
||||
logger.info(`Initial club cache population finished.`);
|
||||
logger.info(`Summary: Total: ${totalIds}, Processed: ${processedCount}, Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount}`);
|
||||
logger.info(`Summary: Total: ${totalIds}, Processed: ${activityIds.length}, Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update stale clubs in the cache
|
||||
* Processed sequentially in single-threaded mode
|
||||
* Processed concurrently with controlled parallelism
|
||||
*/
|
||||
export async function updateStaleClubs(): Promise<void> {
|
||||
logger.info('Starting stale club check (single-threaded mode)...');
|
||||
logger.info('Starting stale club check...');
|
||||
logger.info(`Concurrency: ${CONCURRENT_API_CALLS} parallel requests`);
|
||||
const now = Date.now();
|
||||
const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000;
|
||||
const activityKeys = await getAllActivityKeys();
|
||||
|
||||
|
||||
// Identify stale activities
|
||||
const staleActivityIds: string[] = [];
|
||||
for (const key of activityKeys) {
|
||||
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
|
||||
const cachedData = await getActivityData(activityId);
|
||||
|
||||
if (cachedData && cachedData.lastCheck) {
|
||||
const lastCheckTime = new Date(cachedData.lastCheck).getTime();
|
||||
if ((now - lastCheckTime) > updateIntervalMs || cachedData.error) {
|
||||
logger.info(`Activity ${activityId} is stale or had error. Updating...`);
|
||||
await processAndCacheActivity(activityId);
|
||||
}
|
||||
} else if (!cachedData || Object.keys(cachedData).length === 0) {
|
||||
logger.info(`Activity ${activityId} not in cache or is empty object. Attempting to fetch...`);
|
||||
await processAndCacheActivity(activityId);
|
||||
const needsUpdate = !cachedData ||
|
||||
Object.keys(cachedData).length === 0 ||
|
||||
(!cachedData.lastCheck && !cachedData.error) ||
|
||||
(cachedData.lastCheck && (now - new Date(cachedData.lastCheck).getTime()) > updateIntervalMs) ||
|
||||
cachedData.error;
|
||||
|
||||
if (needsUpdate) {
|
||||
staleActivityIds.push(activityId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (staleActivityIds.length === 0) {
|
||||
logger.info('No stale activities found. Skipping update.');
|
||||
await cleanupOrphanedS3Images();
|
||||
logger.info('Stale club check finished.');
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(`Found ${staleActivityIds.length} stale activities to update.`);
|
||||
|
||||
// Create batch processor for concurrent updates
|
||||
const processor = new BatchProcessor(
|
||||
async (activityId: string) => {
|
||||
logger.debug(`Updating stale activity ${activityId}`);
|
||||
await processAndCacheActivity(activityId);
|
||||
return activityId;
|
||||
},
|
||||
CONCURRENT_API_CALLS,
|
||||
{
|
||||
onError: (error, activityId) => {
|
||||
logger.error(`Error updating stale activity ${activityId}:`, error);
|
||||
},
|
||||
onProgress: (completed, total) => {
|
||||
if (completed % 10 === 0 || completed === total) {
|
||||
logger.info(`Update progress: ${completed}/${total} (${Math.round(completed/total*100)}%)`);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Process stale activities concurrently
|
||||
await processor.process(staleActivityIds);
|
||||
|
||||
await cleanupOrphanedS3Images();
|
||||
|
||||
logger.info('Stale club check finished.');
|
||||
|
||||
Reference in New Issue
Block a user