request/lib/index.js

380 lines
8.8 KiB
JavaScript

import crypto from 'node:crypto'
import fs from 'node:fs'
import { join } from 'node:path'
import { EventEmitter } from 'node:events'
import { Stream } from 'node:stream'
import { StringDecoder } from 'node:string_decoder'
import File from './file.js'
import { MultipartParser } from './multipart_parser.js'
import { UrlencodedParser } from './urlencoded_parser.js'
import { OctetParser, EmptyParser } from './octet_parser.js'
import { JSONParser } from './json_parser.js'
function randomPath(uploadDir) {
var name = 'upload_' + crypto.randomBytes(16).toString('hex')
return join(uploadDir, name)
}
function parseFilename(headerValue) {
let matches = headerValue.match(/\bfilename="(.*?)"($|; )/i)
if (!matches) {
return
}
let filename = matches[1].slice(matches[1].lastIndexOf('\\') + 1)
filename = filename.replace(/%22/g, '"')
filename = filename.replace(/&#([\d]{4});/g, function (m, code) {
return String.fromCharCode(code)
})
return filename
}
/* ------------------------------------- */
export default class IncomingForm extends EventEmitter {
#req = null
#error = false
#ended = false
ended = false
headers = null
bytesReceived = null
bytesExpected = null
#parser = null
#pending = 0
#openedFiles = []
constructor(req, opts = {}) {
super()
this.#req = req
this.uploadDir = opts.uploadDir
this.encoding = opts.encoding || 'utf-8'
// Parse headers and setup the parser, ready to start listening for data.
this.writeHeaders(req.headers)
req
.on('error', err => {
this.#handleError(err)
this.#clearUploads()
})
.on('aborted', () => {
this.emit('aborted')
this.#clearUploads()
})
.on('data', buffer => this.write(buffer))
.on('end', () => {
if (this.#error) {
return
}
let err = this.#parser.end()
if (err) {
this.#handleError(err)
}
})
}
writeHeaders(headers) {
this.headers = headers
this.#parseContentLength()
this.#parseContentType()
}
write(buffer) {
if (this.#error) {
return
}
if (!this.#parser) {
return this.#handleError(new Error('uninitialized parser'))
}
this.bytesReceived += buffer.length
this.emit('progress', this.bytesReceived, this.bytesExpected)
this.#parser.write(buffer)
}
pause() {
try {
this.#req.pause()
} catch (err) {
if (!this.#ended) {
this.#handleError(err)
}
return false
}
return true
}
resume() {
try {
this.#req.resume()
} catch (err) {
if (!this.#ended) {
this.#handleError(err)
}
return false
}
return true
}
#handlePart(part) {
if (part.filename === undefined) {
let value = ''
let decoder = new StringDecoder(this.encoding)
part
.on('data', buffer => {
value += decoder.write(buffer)
})
.on('end', () => {
this.emit('field', part.name, value)
})
} else {
let file = new File({
path: randomPath(this.uploadDir),
name: part.filename,
type: part.mime
})
file.open()
this.#openedFiles.push(file)
// 表单解析完的时候文件写入不一定完成了, 所以需要加入pending计数
this.#pending++
part
.on('data', buffer => {
if (buffer.length == 0) {
return
}
file.write(buffer)
})
.on('end', () => {
if (part.ended) {
return
}
part.ended = true
file.end(() => {
this.emit('file', part.name, file)
this.#pending--
})
})
}
}
#parseContentType() {
let contentType = this.headers['content-type']
let lower = contentType.toLowerCase()
if (this.bytesExpected === 0) {
return (this.#parser = new EmptyParser())
}
if (lower.includes('octet-stream')) {
return this.#createStreamParser()
}
if (lower.includes('urlencoded')) {
return this.#createUrlencodedParser()
}
if (lower.includes('multipart')) {
let matches = contentType.match(/boundary=(?:"([^"]+)"|([^;]+))/)
if (matches) {
this.#createMultipartParser(matches[1] || matches[2])
} else {
this.#handleError(new TypeError('unknow multipart boundary'))
}
return
}
if (lower.match(/json|appliation|plain|text/)) {
return this.#createJsonParser()
}
this.#handleError(new TypeError('unknown content-type: ' + contentType))
}
#parseContentLength() {
this.bytesReceived = 0
if (this.headers['content-length']) {
this.bytesExpected = +this.headers['content-length']
} else if (this.headers['transfer-encoding'] === undefined) {
this.bytesExpected = 0
}
}
#createMultipartParser(boundary) {
let headerField, headerValue, part
this.#parser = new MultipartParser(boundary)
this.#parser.$partBegin = function () {
part = new Stream()
part.readable = true
part.headers = {}
part.name = null
part.filename = null
part.mime = null
part.transferEncoding = 'binary'
part.transferBuffer = ''
headerField = ''
headerValue = ''
}
this.#parser.$headerField = b => {
headerField += b.toString(this.encoding)
}
this.#parser.$headerValue = b => {
headerValue += b.toString(this.encoding)
}
this.#parser.$headerEnd = () => {
headerField = headerField.toLowerCase()
part.headers[headerField] = headerValue
let matches = headerValue.match(/\bname="([^"]+)"/i)
if (headerField == 'content-disposition') {
if (matches) {
part.name = matches[1]
}
part.filename = parseFilename(headerValue)
} else if (headerField == 'content-type') {
part.mime = headerValue
} else if (headerField == 'content-transfer-encoding') {
part.transferEncoding = headerValue.toLowerCase()
}
headerField = ''
headerValue = ''
}
this.#parser.$headersEnd = () => {
switch (part.transferEncoding) {
case 'binary':
case '7bit':
case '8bit':
this.#parser.$partData = function (b) {
part.emit('data', b)
}
this.#parser.$partEnd = function () {
part.emit('end')
}
break
case 'base64':
this.#parser.$partData = function (b) {
part.transferBuffer += b.toString('ascii')
// 确保offset的值能被4整除
let offset = ~~(part.transferBuffer.length / 4) * 4
part.emit(
'data',
Buffer.from(part.transferBuffer.slice(0, offset), 'base64')
)
part.transferBuffer = part.transferBuffer.slice(offset)
}
this.#parser.$partEnd = function () {
part.emit('data', Buffer.from(part.transferBuffer, 'base64'))
part.emit('end')
}
break
default:
return this.#handleError(new Error('unknown transfer-encoding'))
}
this.#handlePart(part)
}
this.#parser.$end = () => {
if (this.#pending > 0) {
setTimeout(_ => this.#parser.$end())
} else {
this.#handleEnd()
}
}
}
#createUrlencodedParser() {
this.#parser = new UrlencodedParser()
this.#parser
.on('field', fields => this.emit('field', false, fields))
.on('end', () => this.#handleEnd())
}
#createStreamParser() {
let filename = this.headers['x-file-name']
let mime = this.headers['x-file-type']
this.#parser = new OctetParser(filename, mime, randomPath(this.uploadDir))
if (this.bytesExpected) {
this.#parser.initLength(this.bytesExpected)
}
this.#parser
.on('file', file => {
this.emit('file', false, file)
})
.on('end', () => this.#handleEnd())
.on('error', err => this.#handleError(err))
}
#createJsonParser() {
this.#parser = new JSONParser()
if (this.bytesExpected) {
this.#parser.initLength(this.bytesExpected)
}
this.#parser
.on('field', (key, val) => {
this.emit('field', key, val)
})
.on('end', () => this.#handleEnd())
.on('error', err => this.#handleError(err))
}
#clearUploads() {
while (this.#openedFiles.length) {
let file = this.#openedFiles.pop()
file._writeStream.destroy()
setTimeout(_ => {
try {
fs.unlink(file.path)
} catch (e) {}
})
}
}
#handleError(err) {
if (this.#error || this.#ended) {
return
}
this.error = true
this.emit('error', err)
}
#handleEnd() {
if (this.#ended || this.#error) {
return
}
this.#ended = true
this.emit('end')
}
}
对Http的request进一步封装, 提供常用的API
JavaScript 100%