diff --git a/libs/proxy.js b/libs/proxy.js index 0508112..4b16447 100644 --- a/libs/proxy.js +++ b/libs/proxy.js @@ -354,8 +354,13 @@ module.exports = { }, { saveFilesToDir, parseFields: false}); } ], err => { - if (err) json(res, {error: err.message}); - else json(res, {success: true}); + if (err) { + logger.error(`Upload failed for ${taskId}: ${err.message}`); + json(res, {error: err.message}); + } else { + logger.debug(`Upload successful for ${taskId}`); + json(res, {success: true}); + } }); }else json(res, { error: `No uuid found in ${pathname}`}); }else if (req.method === 'POST' && pathname.indexOf('/task/new/commit') === 0){ diff --git a/libs/taskNew.js b/libs/taskNew.js index 76a6ab2..93f9a35 100644 --- a/libs/taskNew.js +++ b/libs/taskNew.js @@ -21,7 +21,10 @@ const netutils = require('./netutils'); const path = require('path'); const fs = require('fs'); const config = require('../config'); -const Curl = require('node-libcurl').Curl; +const FormData = require('form-data'); +const http = require('http'); +const https = require('https'); +const { URL } = require('url'); const tasktable = require('./tasktable'); const routetable = require('./routetable'); const nodes = require('./nodes'); @@ -98,6 +101,7 @@ module.exports = { if (options.limits === undefined) options.limits = {}; const busboy = new Busboy({ headers: req.headers }); + logger.debug(`Busboy created with headers: ${JSON.stringify(req.headers)}`); const params = { options: null, @@ -113,6 +117,7 @@ module.exports = { if (options.parseFields){ busboy.on('field', function(fieldname, val, fieldnameTruncated, valTruncated) { + logger.debug(`Busboy field received: ${fieldname} = ${val}`); // Save options if (fieldname === 'options'){ params.options = val; @@ -144,7 +149,19 @@ module.exports = { }); } if (options.saveFilesToDir){ - busboy.on('file', async function(fieldname, file, filename, encoding, mimetype) { + let pendingFiles = 0; + let allFilesProcessed = false; + + const checkCompletion = () => { + logger.debug(`Checking completion: pendingFiles=${pendingFiles}, allFilesProcessed=${allFilesProcessed}`); + if (pendingFiles === 0 && allFilesProcessed) { + logger.debug(`All files completed, calling onFinish`); + onFinish(params); + } + }; + + busboy.on('file', function(fieldname, file, filename, encoding, mimetype) { + logger.debug(`Busboy file received: fieldname=${fieldname}, filename=${filename}, encoding=${encoding}, mimetype=${mimetype}`); if (fieldname === 'images'){ if (options.limits.maxImages && params.imagesCount > options.limits.maxImages){ params.error = "Max images count exceeded."; @@ -152,55 +169,114 @@ module.exports = { return; } + pendingFiles++; + logger.debug(`Pending files increased to: ${pendingFiles}`); + filename = utils.sanitize(filename); // Special case if (filename === 'body.json') filename = '_body.json'; - filename = await assureUniqueFilename(options.saveFilesToDir, filename); + // Use async handling for unique filename + assureUniqueFilename(options.saveFilesToDir, filename).then(uniqueFilename => { + logger.debug(`Unique filename resolved: ${uniqueFilename}`); + + const name = path.basename(uniqueFilename); + params.fileNames.push(name); + + const saveTo = path.join(options.saveFilesToDir, name); + let saveStream = null; + + // Detect if a connection is aborted/interrupted + // and cleanup any open streams to avoid fd leaks + const handleClose = () => { + if (saveStream){ + saveStream.close(); + saveStream = null; + } + fs.exists(saveTo, exists => { + if (exists){ + fs.unlink(saveTo, err => { + if (err) logger.error(err); + }); + } + }); + }; + req.on('close', handleClose); + req.on('abort', handleClose); - const name = path.basename(filename); - params.fileNames.push(name); - - const saveTo = path.join(options.saveFilesToDir, name); - let saveStream = null; - - // Detect if a connection is aborted/interrupted - // and cleanup any open streams to avoid fd leaks - const handleClose = () => { - if (saveStream){ - saveStream.close(); + saveStream = fs.createWriteStream(saveTo); + + saveStream.on('finish', () => { + logger.debug(`File stream finished for: ${uniqueFilename}`); + req.removeListener('close', handleClose); + req.removeListener('abort', handleClose); saveStream = null; - } - fs.exists(saveTo, exists => { - if (exists){ - fs.unlink(saveTo, err => { - if (err) logger.error(err); - }); + params.imagesCount++; + pendingFiles--; + logger.debug(`File completed: ${uniqueFilename}, pendingFiles now: ${pendingFiles}`); + + if (options.limits.maxImages && params.imagesCount > options.limits.maxImages){ + params.error = "Max images count exceeded."; } + + checkCompletion(); + }); + + saveStream.on('error', (err) => { + logger.error(`File stream error for ${uniqueFilename}: ${err.message}`); + pendingFiles--; + params.error = `File save error: ${err.message}`; + checkCompletion(); }); - }; - req.on('close', handleClose); - req.on('abort', handleClose); - - file.on('end', () => { - req.removeListener('close', handleClose); - req.removeListener('abort', handleClose); - saveStream = null; - params.imagesCount++; - if (options.limits.maxImages && params.imagesCount > options.limits.maxImages){ - params.error = "Max images count exceeded."; - } - }); - saveStream = fs.createWriteStream(saveTo) - file.pipe(saveStream); + file.pipe(saveStream); + }).catch(err => { + logger.error(`Error getting unique filename: ${err.message}`); + pendingFiles--; + params.error = `Filename error: ${err.message}`; + file.resume(); // Skip this file + checkCompletion(); + }); } }); + + busboy.on('finish', function(){ + logger.debug(`Busboy finished parsing, setting allFilesProcessed=true`); + allFilesProcessed = true; + checkCompletion(); + }); } - busboy.on('finish', function(){ - onFinish(params); + + busboy.on('error', function(err) { + logger.error(`Busboy error: ${err.message}`); + }); + + if (!options.saveFilesToDir) { + busboy.on('finish', function(){ + logger.debug(`Busboy finished parsing`); + onFinish(params); + }); + } + + req.on('close', function() { + logger.debug(`Request connection closed`); }); + + req.on('end', function() { + logger.debug(`Request ended`); + if (options.saveFilesToDir && !allFilesProcessed) { + logger.debug(`Request ended but busboy finish not called, manually setting allFilesProcessed=true`); + allFilesProcessed = true; + checkCompletion(); + } + }); + + req.on('error', function(err) { + logger.error(`Request error: ${err.message}`); + }); + + logger.debug(`Piping request to busboy`); req.pipe(busboy); }, @@ -339,149 +415,219 @@ module.exports = { const eventEmitter = new events.EventEmitter(); eventEmitter.setMaxListeners(2 * (2 + PARALLEL_UPLOADS + 1)); - const curlInstance = (done, onError, url, body, validate) => { - // We use CURL, because NodeJS libraries are buggy - const curl = new Curl(), - close = curl.close.bind(curl); - - const tryClose = () => { - try{ - close(); - }catch(e){ - logger.warn(`Cannot close cURL: ${e.message}`); - } - eventEmitter.removeListener('abort', tryClose); - eventEmitter.removeListener('close', tryClose); - }; - - eventEmitter.on('abort', tryClose); - eventEmitter.on('close', tryClose); - - curl.on('end', async (statusCode, body, headers) => { - try{ - if (statusCode === 200){ - body = JSON.parse(body); - if (body.error) throw new Error(body.error); - if (validate !== undefined) validate(body); - - done(); - }else{ - throw new Error(`POST ${url} statusCode is ${statusCode}, expected 200`); + const httpRequest = async (url, formData, headers = {}, validate) => { + return new Promise((resolve, reject) => { + logger.debug(`Making HTTP request to: ${url}`); + + const parsedUrl = new URL(url); + const isHttps = parsedUrl.protocol === 'https:'; + const httpModule = isHttps ? https : http; + + const requestOptions = { + hostname: parsedUrl.hostname, + port: parsedUrl.port || (isHttps ? 443 : 80), + path: parsedUrl.pathname + parsedUrl.search, + method: 'POST', + headers: { + ...headers } - }catch(e){ - onError(e); - } - }); - - curl.on('error', onError); + }; - // logger.info(`Curl URL: ${url}`); - // logger.info(`Curl Body: ${JSON.stringify(body)}`); + if (formData) { + // Add form-data headers + const formHeaders = formData.getHeaders(); + requestOptions.headers = { + ...requestOptions.headers, + ...formHeaders + }; + logger.debug(`FormData headers: ${JSON.stringify(formHeaders)}`); + } - curl.setOpt(Curl.option.URL, url); - curl.setOpt(Curl.option.HTTPPOST, body || []); - if (config.upload_max_speed) curl.setOpt(Curl.option.MAX_SEND_SPEED_LARGE, config.upload_max_speed); - // abort if slower than 30 bytes/sec during 1600 seconds */ - curl.setOpt(Curl.option.LOW_SPEED_TIME, 1600); - curl.setOpt(Curl.option.LOW_SPEED_LIMIT, 30); - curl.setOpt(Curl.option.HTTPHEADER, [ - 'Content-Type: multipart/form-data' - ]); + logger.debug(`Request options: ${JSON.stringify(requestOptions, null, 2)}`); - return curl; - }; + const req = httpModule.request(requestOptions, (res) => { + logger.debug(`Response status: ${res.statusCode}`); + + let body = ''; + res.on('data', (chunk) => { + body += chunk; + }); + + res.on('end', () => { + logger.debug(`Response body length: ${body.length}`); + + if (res.statusCode !== 200) { + logger.error(`Non-200 status: ${res.statusCode}, body: ${body}`); + reject(new Error(`POST ${url} status is ${res.statusCode}, expected 200. Response: ${body}`)); + return; + } + + // Handle empty response + if (!body || body.trim() === '') { + logger.error(`Empty response received from ${url}`); + reject(new Error(`Empty response received from ${url}`)); + return; + } + + let parsedBody; + try { + parsedBody = JSON.parse(body); + logger.debug(`Successfully parsed JSON response`); + } catch (parseError) { + logger.error(`JSON parse error: ${parseError.message}, body: ${body}`); + reject(new Error(`Invalid JSON response from ${url}: ${body}`)); + return; + } + + if (parsedBody.error) { + logger.error(`Server returned error: ${parsedBody.error}`); + reject(new Error(parsedBody.error)); + return; + } + + if (validate !== undefined) { + try { + validate(parsedBody); + logger.debug(`Validation passed`); + } catch (validationError) { + logger.error(`Validation error: ${validationError.message}`); + reject(validationError); + return; + } + } - const taskNewInit = async () => { - return new Promise((resolve, reject) => { - const body = []; - body.push({ - name: 'name', - contents: name + resolve(parsedBody); + }); }); - body.push({ - name: 'options', - contents: JSON.stringify(taskOptions) + + req.on('error', (error) => { + logger.error(`HTTP request failed for ${url}: ${error.message}`); + reject(error); }); - body.push({ - name: 'dateCreated', - contents: dateC.getTime().toString() + + req.on('timeout', () => { + logger.error(`HTTP request timeout for ${url}`); + req.destroy(); + reject(new Error(`Request timeout for ${url}`)); }); - if (skipPostProcessing){ - body.push({ - name: 'skipPostProcessing', - contents: "true" - }); - } - if (webhook){ - body.push({ - name: 'webhook', - contents: webhook - }); - } - if (outputs){ - body.push({ - name: 'outputs', - contents: outputs - }); - } - - const curl = curlInstance(resolve, reject, - `${node.proxyTargetUrl()}/task/new/init?token=${node.getToken()}`, - body, - (res) => { - if (res.uuid !== uuid) throw new Error(`set-uuid did not match, ${res.uuid} !== ${uuid}`); - }); - curl.setOpt(Curl.option.HTTPHEADER, [ - 'Content-Type: multipart/form-data', - `set-uuid: ${uuid}` - ]); - curl.perform(); + if (formData) { + logger.debug(`Piping FormData to request`); + formData.pipe(req); + } else { + logger.debug(`Ending request without FormData`); + req.end(); + } }); }; + const taskNewInit = async () => { + const formData = new FormData(); + + formData.append('name', name); + formData.append('options', JSON.stringify(taskOptions)); + formData.append('dateCreated', dateC.getTime().toString()); + + if (skipPostProcessing) { + formData.append('skipPostProcessing', "true"); + } + if (webhook) { + formData.append('webhook', webhook); + } + if (outputs) { + formData.append('outputs', outputs); + } + + const headers = { + 'set-uuid': uuid + }; + + return await httpRequest( + `${node.proxyTargetUrl()}/task/new/init?token=${node.getToken()}`, + formData, + headers, + (res) => { + if (res.uuid !== uuid) throw new Error(`set-uuid did not match, ${res.uuid} !== ${uuid}`); + } + ); + }; + const taskNewUpload = async () => { - return new Promise((resolve, reject) => { - const MAX_RETRIES = 5; + const MAX_RETRIES = 5; - const chunks = utils.chunkArray(fileNames, Math.ceil(fileNames.length / PARALLEL_UPLOADS)); - let completed = 0; - const done = () => { - if (++completed >= chunks.length) resolve(); - }; - - chunks.forEach(fileNames => { + const chunks = utils.chunkArray(fileNames, Math.ceil(fileNames.length / PARALLEL_UPLOADS)); + + const uploadPromises = chunks.map(fileNames => { + return new Promise(async (resolve, reject) => { let retries = 0; - const body = fileNames.map(f => { return { name: 'images', file: path.join(tmpPath, f) } }); - const curl = curlInstance(done, async (err) => { - if (status.aborted) return; // Ignore if this was aborted by other code + const performUpload = async () => { + try { + if (status.aborted) return resolve(); // Ignore if this was aborted by other code + + const formData = new FormData(); + + for (const fileName of fileNames) { + const filePath = path.join(tmpPath, fileName); + + // Check if file exists before reading + if (!fs.existsSync(filePath)) { + throw new Error(`File not found: ${filePath}`); + } + + // Get file stats + const stats = fs.statSync(filePath); + const fileSize = stats.size; + logger.debug(`Adding file ${fileName} (${fileSize} bytes) to FormData`); + + // Use file stream with form-data (same as original node-libcurl approach) + const fileStream = fs.createReadStream(filePath); + formData.append('images', fileStream, { + filename: fileName, + contentType: 'application/octet-stream', + knownLength: fileSize + }); + logger.debug(`Successfully added ${fileName} as stream to FormData`); + } - if (retries < MAX_RETRIES){ + const uploadUrl = `${node.proxyTargetUrl()}/task/new/upload/${uuid}?token=${node.getToken()}`; + logger.debug(`Uploading to: ${uploadUrl}`); + + const result = await httpRequest( + uploadUrl, + formData, + {}, + (res) => { + if (!res.success) throw new Error(`no success flag in task upload response`); + } + ); + + logger.debug(`Upload successful for files: ${fileNames.join(', ')}`); + + resolve(result); + } catch (err) { + if (status.aborted) return resolve(); // Ignore if this was aborted by other code + + if (retries < MAX_RETRIES) { retries++; - logger.warn(`File upload to ${node} failed, retrying... (${retries})`); + logger.warn(`File upload to ${node} failed, retrying... (${retries}): ${err.message}`); await utils.sleep(2000); - curl.perform(); - }else{ + await performUpload(); + } else { reject(new Error(`${err.message}: maximum upload retries (${MAX_RETRIES}) exceeded`)); } - }, - `${node.proxyTargetUrl()}/task/new/upload/${uuid}?token=${node.getToken()}`, - body, - (res) => { - if (!res.success) throw new Error(`no success flag in task upload response`); - }); + } + }; - curl.perform(); + await performUpload(); }); }); + + await Promise.all(uploadPromises); }; const taskNewCommit = async () => { - return new Promise((resolve, reject) => { - const curl = curlInstance(resolve, reject, `${node.proxyTargetUrl()}/task/new/commit/${uuid}?token=${node.getToken()}`); - curl.perform(); - }); + return await httpRequest(`${node.proxyTargetUrl()}/task/new/commit/${uuid}?token=${node.getToken()}`); }; let retries = 0; diff --git a/package.json b/package.json index ea7e6d4..d7b04d2 100644 --- a/package.json +++ b/package.json @@ -21,9 +21,9 @@ "cors": "^2.8.5", "express": "^4.16.4", "express-basic-auth": "^1.2.0", + "form-data": "^4.0.0", "http-proxy": "^1.17.0", "minimist": "^1.2.0", - "node-libcurl": "^2.0.3", "rimraf": "^2.6.2", "short-uuid": "^3.1.1", "tree-kill": "^1.2.1",