refactor(scan): remove multi-thread scan logic, use sequential processing

This commit is contained in:
JamesFlare1212
2026-04-08 12:04:27 -04:00
parent 78c050a6fa
commit fb68c1ad5d
4 changed files with 31 additions and 410 deletions

View File

@@ -14,7 +14,7 @@ import {
import { uploadImageFromBase64, listS3Objects, constructS3Url } from './s3-service';
import { extractBase64Image } from '../utils/image-processor';
import { logger } from '../utils/logger';
import { BatchProcessor, executeWithConcurrencyAndProgress } from '../utils/semaphore';
import type { ActivityData } from '../models/activity';
@@ -61,8 +61,7 @@ function extractObjectKeyFromUrl(url: string): string | null {
}
}
// Crawler concurrency configuration
const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '8', 10);
// Crawler configuration
const CRAWLER_REQUEST_TIMEOUT_MS = parseInt(process.env.CRAWLER_REQUEST_TIMEOUT_MS || '25000', 10);
const CRAWLER_MAX_RETRIES = parseInt(process.env.CRAWLER_MAX_RETRIES || '3', 10);
const CRAWLER_RETRY_DELAY_MS = parseInt(process.env.CRAWLER_RETRY_DELAY_MS || '1000', 10);
@@ -155,11 +154,10 @@ async function processSingleActivity(activityId: string): Promise<void> {
/**
* Initialize the club cache by scanning through all activity IDs
* Processed concurrently with controlled parallelism
* Processed sequentially
*/
export async function initializeClubCache(): Promise<void> {
logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN}`);
logger.info(`Concurrency: ${CONCURRENT_API_CALLS} parallel requests`);
const totalIds = MAX_ACTIVITY_ID_SCAN - MIN_ACTIVITY_ID_SCAN + 1;
let successCount = 0;
@@ -172,30 +170,23 @@ export async function initializeClubCache(): Promise<void> {
(_, i) => String(MIN_ACTIVITY_ID_SCAN + i)
);
// Create batch processor with concurrency control
const processor = new BatchProcessor(
async (activityId: string) => {
// Process all activities sequentially
for (let i = 0; i < activityIds.length; i++) {
const activityId = activityIds[i]!;
try {
await processSingleActivity(activityId);
return activityId;
},
CONCURRENT_API_CALLS,
{
onError: (error, activityId) => {
errorCount++;
logger.error(`Error processing activity ID ${activityId}:`, error);
},
onProgress: (completed, total) => {
if (completed % 100 === 0 || completed === total) {
const mem = process.memoryUsage();
logger.info(`Progress: ${completed}/${total} (${Math.round(completed/total*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB | Concurrent: ${CONCURRENT_API_CALLS}`);
}
}
successCount++;
} catch (error) {
errorCount++;
logger.error(`Error processing activity ID ${activityId}:`, error);
}
);
// Process all activities concurrently
const results = await processor.process(activityIds);
successCount = results.length;
// Log progress every 100 activities or at completion
if ((i + 1) % 100 === 0 || i === activityIds.length - 1) {
const mem = process.memoryUsage();
logger.info(`Progress: ${i + 1}/${totalIds} (${Math.round((i + 1) / totalIds * 100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed / 1024 / 1024)}MB`);
}
}
logger.info(`Initial club cache population finished.`);
logger.info(`Summary: Total: ${totalIds}, Processed: ${activityIds.length}, Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount}`);
@@ -203,11 +194,10 @@ export async function initializeClubCache(): Promise<void> {
/**
* Update stale clubs in the cache
* Processed concurrently with controlled parallelism
* Processed sequentially
*/
export async function updateStaleClubs(): Promise<void> {
logger.info('Starting stale club check...');
logger.info(`Concurrency: ${CONCURRENT_API_CALLS} parallel requests`);
const now = Date.now();
const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000;
const activityKeys = await getAllActivityKeys();
@@ -237,28 +227,21 @@ export async function updateStaleClubs(): Promise<void> {
logger.info(`Found ${staleActivityIds.length} stale activities to update.`);
// Create batch processor for concurrent updates
const processor = new BatchProcessor(
async (activityId: string) => {
// Process stale activities sequentially
for (let i = 0; i < staleActivityIds.length; i++) {
const activityId = staleActivityIds[i]!;
try {
logger.debug(`Updating stale activity ${activityId}`);
await processAndCacheActivity(activityId);
return activityId;
},
CONCURRENT_API_CALLS,
{
onError: (error, activityId) => {
logger.error(`Error updating stale activity ${activityId}:`, error);
},
onProgress: (completed, total) => {
if (completed % 10 === 0 || completed === total) {
logger.info(`Update progress: ${completed}/${total} (${Math.round(completed/total*100)}%)`);
}
}
} catch (error) {
logger.error(`Error updating stale activity ${activityId}:`, error);
}
);
// Process stale activities concurrently
await processor.process(staleActivityIds);
// Log progress every 10 activities or at completion
if ((i + 1) % 10 === 0 || i === staleActivityIds.length - 1) {
logger.info(`Update progress: ${i + 1}/${staleActivityIds.length} (${Math.round((i + 1) / staleActivityIds.length * 100)}%)`);
}
}
logger.info('Stale club check finished.');
}