JavaScript 自定义限流队列 fetch

JavaScript 自定义限流队列 fetch

为什么需要它?

有些时候不得不需要限制并发 fetch 的请求数量,避免请求过快导致 IP 封禁

需要做到什么?

  • 允许限制 fetch 请求同时存在的数量
  • 时间过久便认为是超时了

如何实现?

暂停请求

该方法的请求是无序的!

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前判断当前请求的数量,添加请求等待数量
    1. 如果请求数量已满,则进行等待
    2. 如果请求数量未满,则删除一个请求等待数量
  3. 请求完成,删除当前请求数量

等待队列:循环监听

该方法需要使用回调函数

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前将请求 argments 添加到等待队列中
  3. 使用 setInterval 函数持续监听队列和当前执行的请求数
    • 发现请求数量没有到达最大值,且等待队列中还有值,那么就执行一次请求

等待队列:触发钩子

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前将请求 argments 添加到等待队列中
  3. 添加完成,等待当前请求数量未满
  4. 尝试启动等待队列(钩子)

实现代码

暂停请求实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise(resolve => {
if (typeof param === 'number') {
setTimeout(resolve, param)
} else if (typeof param === 'function') {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer)
resolve()
}
}, 100)
} else {
resolve()
}
})
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null
//这是一个可以被reject的promise
var abortPromise = new Promise(function(resolve, reject) {
abortFn = function() {
reject('abort promise')
}
})
var abortablePromise = Promise.race([fetchPromise, abortPromise])
setTimeout(function() {
abortFn()
}, timeout)

return abortablePromise
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout
this.limit = limit
this.execCount = 0
this.waitCount = 0
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init) {
this.waitCount++
await wait(() => this.execCount < this.limit)
this.waitCount--
this.execCount++
try {
return await promiseTimeout(fetch(url, init), this.timeout)
} finally {
this.execCount--
}
}
}

使用示例

1
2
3
4
5
6
7
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 })
new Array(100).fill(0).forEach(i =>
requestLimiting
._fetch('/')
.then(res => console.log(res))
.catch(err => console.log(err)),
)

等待队列:循环监听实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise(resolve => {
if (typeof param === 'number') {
setTimeout(resolve, param)
} else if (typeof param === 'function') {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer)
resolve()
}
}, 100)
} else {
resolve()
}
})
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null

//这是一个可以被reject的promise
var abortPromise = new Promise(function(resolve, reject) {
abortFn = function() {
reject('abort promise')
}
})

var abortablePromise = Promise.race([fetchPromise, abortPromise])

setTimeout(function() {
abortFn()
}, timeout)

return abortablePromise
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout
this.limit = limit
this.execCount = 0
// 等待队列
this.waitArr = []

// 监视 execCount 的值
setInterval(async () => {
if (this.execCount >= this.limit) {
return
}
console.debug(
`执行 execCount: ${this.execCount}, waitArr length: ${
this.waitArr.length
}, index: ${JSON.stringify(this.waitArr[0])}`,
)
const args = this.waitArr.shift(0)
if (!args) {
return
}
this.execCount++
const callback = args[2]
try {
// 如果没有错误就返回 res
callback({ res: await promiseTimeout(fetch(...args), this.timeout) })
} catch (err) {
// 否则返回 err
callback({
err: err,
})
} finally {
this.execCount--
}
}, 100)
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @param {Function} callback 回调函数
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init, callback) {
this.waitArr.push(arguments)
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 })
new Array(100).fill(0).forEach((item, i) =>
requestLimiting._fetch(
'/',
{
// 这里设置添加时的 index,用于验证是否真的顺序执行了
headers: {
index: i,
},
},
// 这里使用了回调函数,参数使用解构得到
({ res, err }) => {
console.log(`res: ${res}, err: ${err}`)
},
),
)

等待队列:触发钩子实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise(resolve => {
if (typeof param === 'number') {
setTimeout(resolve, param)
} else if (typeof param === 'function') {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer)
resolve()
}
}, 100)
} else {
resolve()
}
})
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null
//这是一个可以被 reject 的 Promise
var abortPromise = new Promise(function(resolve, reject) {
abortFn = function() {
reject('abort promise')
}
})
// 有一个 Promise 完成就立刻结束
var abortablePromise = Promise.race([fetchPromise, abortPromise])
setTimeout(function() {
abortFn()
}, timeout)
return abortablePromise
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout
this.limit = limit
this.execCount = 0
// 等待队列
this.waitArr = []
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init) {
const _innerFetch = async () => {
console.log(
`执行 execCount: ${this.execCount}, waitArr length: ${
this.waitArr.length
}, index: ${JSON.stringify(this.waitArr[0])}`,
)
this.execCount++
const args = this.waitArr.shift(0)
try {
return await promiseTimeout(fetch(...args), this.timeout)
} finally {
this.execCount--
}
}
this.waitArr.push(arguments)
await wait(() => this.execCount < this.limit)
// 尝试启动等待队列
return _innerFetch()
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 })
new Array(100).fill(0).forEach((item, i) =>
requestLimiting
._fetch('/', {
// 这里设置添加时的 index,用于验证是否真的顺序执行了
headers: {
index: i,
},
})
.then(res => console.log(res))
.catch(err => console.log(err)),
)

总结

目前而言,最后一种实现是最好的,同时实现了两种规范

  • 返回 Promise,避免使用回调函数
  • 请求执行与添加顺序相同