You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent.js 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const tls = require('tls');
  4. const http2 = require('http2');
  5. const QuickLRU = require('quick-lru');
  6. const kCurrentStreamsCount = Symbol('currentStreamsCount');
  7. const kRequest = Symbol('request');
  8. const kOriginSet = Symbol('cachedOriginSet');
  9. const kGracefullyClosing = Symbol('gracefullyClosing');
  10. const nameKeys = [
  11. // `http2.connect()` options
  12. 'maxDeflateDynamicTableSize',
  13. 'maxSessionMemory',
  14. 'maxHeaderListPairs',
  15. 'maxOutstandingPings',
  16. 'maxReservedRemoteStreams',
  17. 'maxSendHeaderBlockLength',
  18. 'paddingStrategy',
  19. // `tls.connect()` options
  20. 'localAddress',
  21. 'path',
  22. 'rejectUnauthorized',
  23. 'minDHSize',
  24. // `tls.createSecureContext()` options
  25. 'ca',
  26. 'cert',
  27. 'clientCertEngine',
  28. 'ciphers',
  29. 'key',
  30. 'pfx',
  31. 'servername',
  32. 'minVersion',
  33. 'maxVersion',
  34. 'secureProtocol',
  35. 'crl',
  36. 'honorCipherOrder',
  37. 'ecdhCurve',
  38. 'dhparam',
  39. 'secureOptions',
  40. 'sessionIdContext'
  41. ];
  42. const getSortedIndex = (array, value, compare) => {
  43. let low = 0;
  44. let high = array.length;
  45. while (low < high) {
  46. const mid = (low + high) >>> 1;
  47. /* istanbul ignore next */
  48. if (compare(array[mid], value)) {
  49. // This never gets called because we use descending sort. Better to have this anyway.
  50. low = mid + 1;
  51. } else {
  52. high = mid;
  53. }
  54. }
  55. return low;
  56. };
  57. const compareSessions = (a, b) => {
  58. return a.remoteSettings.maxConcurrentStreams > b.remoteSettings.maxConcurrentStreams;
  59. };
  60. // See https://tools.ietf.org/html/rfc8336
  61. const closeCoveredSessions = (where, session) => {
  62. // Clients SHOULD NOT emit new requests on any connection whose Origin
  63. // Set is a proper subset of another connection's Origin Set, and they
  64. // SHOULD close it once all outstanding requests are satisfied.
  65. for (const coveredSession of where) {
  66. if (
  67. // The set is a proper subset when its length is less than the other set.
  68. coveredSession[kOriginSet].length < session[kOriginSet].length &&
  69. // And the other set includes all elements of the subset.
  70. coveredSession[kOriginSet].every(origin => session[kOriginSet].includes(origin)) &&
  71. // Makes sure that the session can handle all requests from the covered session.
  72. coveredSession[kCurrentStreamsCount] + session[kCurrentStreamsCount] <= session.remoteSettings.maxConcurrentStreams
  73. ) {
  74. // This allows pending requests to finish and prevents making new requests.
  75. gracefullyClose(coveredSession);
  76. }
  77. }
  78. };
  79. // This is basically inverted `closeCoveredSessions(...)`.
  80. const closeSessionIfCovered = (where, coveredSession) => {
  81. for (const session of where) {
  82. if (
  83. coveredSession[kOriginSet].length < session[kOriginSet].length &&
  84. coveredSession[kOriginSet].every(origin => session[kOriginSet].includes(origin)) &&
  85. coveredSession[kCurrentStreamsCount] + session[kCurrentStreamsCount] <= session.remoteSettings.maxConcurrentStreams
  86. ) {
  87. gracefullyClose(coveredSession);
  88. }
  89. }
  90. };
  91. const getSessions = ({agent, isFree}) => {
  92. const result = {};
  93. // eslint-disable-next-line guard-for-in
  94. for (const normalizedOptions in agent.sessions) {
  95. const sessions = agent.sessions[normalizedOptions];
  96. const filtered = sessions.filter(session => {
  97. const result = session[Agent.kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams;
  98. return isFree ? result : !result;
  99. });
  100. if (filtered.length !== 0) {
  101. result[normalizedOptions] = filtered;
  102. }
  103. }
  104. return result;
  105. };
  106. const gracefullyClose = session => {
  107. session[kGracefullyClosing] = true;
  108. if (session[kCurrentStreamsCount] === 0) {
  109. session.close();
  110. }
  111. };
  112. class Agent extends EventEmitter {
  113. constructor({timeout = 60000, maxSessions = Infinity, maxFreeSessions = 10, maxCachedTlsSessions = 100} = {}) {
  114. super();
  115. // A session is considered busy when its current streams count
  116. // is equal to or greater than the `maxConcurrentStreams` value.
  117. // A session is considered free when its current streams count
  118. // is less than the `maxConcurrentStreams` value.
  119. // SESSIONS[NORMALIZED_OPTIONS] = [];
  120. this.sessions = {};
  121. // The queue for creating new sessions. It looks like this:
  122. // QUEUE[NORMALIZED_OPTIONS][NORMALIZED_ORIGIN] = ENTRY_FUNCTION
  123. //
  124. // The entry function has `listeners`, `completed` and `destroyed` properties.
  125. // `listeners` is an array of objects containing `resolve` and `reject` functions.
  126. // `completed` is a boolean. It's set to true after ENTRY_FUNCTION is executed.
  127. // `destroyed` is a boolean. If it's set to true, the session will be destroyed if hasn't connected yet.
  128. this.queue = {};
  129. // Each session will use this timeout value.
  130. this.timeout = timeout;
  131. // Max sessions in total
  132. this.maxSessions = maxSessions;
  133. // Max free sessions in total
  134. // TODO: decreasing `maxFreeSessions` should close some sessions
  135. this.maxFreeSessions = maxFreeSessions;
  136. this._freeSessionsCount = 0;
  137. this._sessionsCount = 0;
  138. // We don't support push streams by default.
  139. this.settings = {
  140. enablePush: false
  141. };
  142. // Reusing TLS sessions increases performance.
  143. this.tlsSessionCache = new QuickLRU({maxSize: maxCachedTlsSessions});
  144. }
  145. static normalizeOrigin(url, servername) {
  146. if (typeof url === 'string') {
  147. url = new URL(url);
  148. }
  149. if (servername && url.hostname !== servername) {
  150. url.hostname = servername;
  151. }
  152. return url.origin;
  153. }
  154. normalizeOptions(options) {
  155. let normalized = '';
  156. if (options) {
  157. for (const key of nameKeys) {
  158. if (options[key]) {
  159. normalized += `:${options[key]}`;
  160. }
  161. }
  162. }
  163. return normalized;
  164. }
  165. _tryToCreateNewSession(normalizedOptions, normalizedOrigin) {
  166. if (!(normalizedOptions in this.queue) || !(normalizedOrigin in this.queue[normalizedOptions])) {
  167. return;
  168. }
  169. const item = this.queue[normalizedOptions][normalizedOrigin];
  170. // The entry function can be run only once.
  171. // BUG: The session may be never created when:
  172. // - the first condition is false AND
  173. // - this function is never called with the same arguments in the future.
  174. if (this._sessionsCount < this.maxSessions && !item.completed) {
  175. item.completed = true;
  176. item();
  177. }
  178. }
  179. getSession(origin, options, listeners) {
  180. return new Promise((resolve, reject) => {
  181. if (Array.isArray(listeners)) {
  182. listeners = [...listeners];
  183. // Resolve the current promise ASAP, we're just moving the listeners.
  184. // They will be executed at a different time.
  185. resolve();
  186. } else {
  187. listeners = [{resolve, reject}];
  188. }
  189. const normalizedOptions = this.normalizeOptions(options);
  190. const normalizedOrigin = Agent.normalizeOrigin(origin, options && options.servername);
  191. if (normalizedOrigin === undefined) {
  192. for (const {reject} of listeners) {
  193. reject(new TypeError('The `origin` argument needs to be a string or an URL object'));
  194. }
  195. return;
  196. }
  197. if (normalizedOptions in this.sessions) {
  198. const sessions = this.sessions[normalizedOptions];
  199. let maxConcurrentStreams = -1;
  200. let currentStreamsCount = -1;
  201. let optimalSession;
  202. // We could just do this.sessions[normalizedOptions].find(...) but that isn't optimal.
  203. // Additionally, we are looking for session which has biggest current pending streams count.
  204. for (const session of sessions) {
  205. const sessionMaxConcurrentStreams = session.remoteSettings.maxConcurrentStreams;
  206. if (sessionMaxConcurrentStreams < maxConcurrentStreams) {
  207. break;
  208. }
  209. if (session[kOriginSet].includes(normalizedOrigin)) {
  210. const sessionCurrentStreamsCount = session[kCurrentStreamsCount];
  211. if (
  212. sessionCurrentStreamsCount >= sessionMaxConcurrentStreams ||
  213. session[kGracefullyClosing] ||
  214. // Unfortunately the `close` event isn't called immediately,
  215. // so `session.destroyed` is `true`, but `session.closed` is `false`.
  216. session.destroyed
  217. ) {
  218. continue;
  219. }
  220. // We only need set this once.
  221. if (!optimalSession) {
  222. maxConcurrentStreams = sessionMaxConcurrentStreams;
  223. }
  224. // We're looking for the session which has biggest current pending stream count,
  225. // in order to minimalize the amount of active sessions.
  226. if (sessionCurrentStreamsCount > currentStreamsCount) {
  227. optimalSession = session;
  228. currentStreamsCount = sessionCurrentStreamsCount;
  229. }
  230. }
  231. }
  232. if (optimalSession) {
  233. /* istanbul ignore next: safety check */
  234. if (listeners.length !== 1) {
  235. for (const {reject} of listeners) {
  236. const error = new Error(
  237. `Expected the length of listeners to be 1, got ${listeners.length}.\n` +
  238. 'Please report this to https://github.com/szmarczak/http2-wrapper/'
  239. );
  240. reject(error);
  241. }
  242. return;
  243. }
  244. listeners[0].resolve(optimalSession);
  245. return;
  246. }
  247. }
  248. if (normalizedOptions in this.queue) {
  249. if (normalizedOrigin in this.queue[normalizedOptions]) {
  250. // There's already an item in the queue, just attach ourselves to it.
  251. this.queue[normalizedOptions][normalizedOrigin].listeners.push(...listeners);
  252. // This shouldn't be executed here.
  253. // See the comment inside _tryToCreateNewSession.
  254. this._tryToCreateNewSession(normalizedOptions, normalizedOrigin);
  255. return;
  256. }
  257. } else {
  258. this.queue[normalizedOptions] = {};
  259. }
  260. // The entry must be removed from the queue IMMEDIATELY when:
  261. // 1. the session connects successfully,
  262. // 2. an error occurs.
  263. const removeFromQueue = () => {
  264. // Our entry can be replaced. We cannot remove the new one.
  265. if (normalizedOptions in this.queue && this.queue[normalizedOptions][normalizedOrigin] === entry) {
  266. delete this.queue[normalizedOptions][normalizedOrigin];
  267. if (Object.keys(this.queue[normalizedOptions]).length === 0) {
  268. delete this.queue[normalizedOptions];
  269. }
  270. }
  271. };
  272. // The main logic is here
  273. const entry = () => {
  274. const name = `${normalizedOrigin}:${normalizedOptions}`;
  275. let receivedSettings = false;
  276. try {
  277. const session = http2.connect(origin, {
  278. createConnection: this.createConnection,
  279. settings: this.settings,
  280. session: this.tlsSessionCache.get(name),
  281. ...options
  282. });
  283. session[kCurrentStreamsCount] = 0;
  284. session[kGracefullyClosing] = false;
  285. const isFree = () => session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams;
  286. let wasFree = true;
  287. session.socket.once('session', tlsSession => {
  288. this.tlsSessionCache.set(name, tlsSession);
  289. });
  290. session.once('error', error => {
  291. // Listeners are empty when the session successfully connected.
  292. for (const {reject} of listeners) {
  293. reject(error);
  294. }
  295. // The connection got broken, purge the cache.
  296. this.tlsSessionCache.delete(name);
  297. });
  298. session.setTimeout(this.timeout, () => {
  299. // Terminates all streams owned by this session.
  300. // TODO: Maybe the streams should have a "Session timed out" error?
  301. session.destroy();
  302. });
  303. session.once('close', () => {
  304. if (receivedSettings) {
  305. // 1. If it wasn't free then no need to decrease because
  306. // it has been decreased already in session.request().
  307. // 2. `stream.once('close')` won't increment the count
  308. // because the session is already closed.
  309. if (wasFree) {
  310. this._freeSessionsCount--;
  311. }
  312. this._sessionsCount--;
  313. // This cannot be moved to the stream logic,
  314. // because there may be a session that hadn't made a single request.
  315. const where = this.sessions[normalizedOptions];
  316. where.splice(where.indexOf(session), 1);
  317. if (where.length === 0) {
  318. delete this.sessions[normalizedOptions];
  319. }
  320. } else {
  321. // Broken connection
  322. const error = new Error('Session closed without receiving a SETTINGS frame');
  323. error.code = 'HTTP2WRAPPER_NOSETTINGS';
  324. for (const {reject} of listeners) {
  325. reject(error);
  326. }
  327. removeFromQueue();
  328. }
  329. // There may be another session awaiting.
  330. this._tryToCreateNewSession(normalizedOptions, normalizedOrigin);
  331. });
  332. // Iterates over the queue and processes listeners.
  333. const processListeners = () => {
  334. if (!(normalizedOptions in this.queue) || !isFree()) {
  335. return;
  336. }
  337. for (const origin of session[kOriginSet]) {
  338. if (origin in this.queue[normalizedOptions]) {
  339. const {listeners} = this.queue[normalizedOptions][origin];
  340. // Prevents session overloading.
  341. while (listeners.length !== 0 && isFree()) {
  342. // We assume `resolve(...)` calls `request(...)` *directly*,
  343. // otherwise the session will get overloaded.
  344. listeners.shift().resolve(session);
  345. }
  346. const where = this.queue[normalizedOptions];
  347. if (where[origin].listeners.length === 0) {
  348. delete where[origin];
  349. if (Object.keys(where).length === 0) {
  350. delete this.queue[normalizedOptions];
  351. break;
  352. }
  353. }
  354. // We're no longer free, no point in continuing.
  355. if (!isFree()) {
  356. break;
  357. }
  358. }
  359. }
  360. };
  361. // The Origin Set cannot shrink. No need to check if it suddenly became covered by another one.
  362. session.on('origin', () => {
  363. session[kOriginSet] = session.originSet;
  364. if (!isFree()) {
  365. // The session is full.
  366. return;
  367. }
  368. processListeners();
  369. // Close covered sessions (if possible).
  370. closeCoveredSessions(this.sessions[normalizedOptions], session);
  371. });
  372. session.once('remoteSettings', () => {
  373. // Fix Node.js bug preventing the process from exiting
  374. session.ref();
  375. session.unref();
  376. this._sessionsCount++;
  377. // The Agent could have been destroyed already.
  378. if (entry.destroyed) {
  379. const error = new Error('Agent has been destroyed');
  380. for (const listener of listeners) {
  381. listener.reject(error);
  382. }
  383. session.destroy();
  384. return;
  385. }
  386. session[kOriginSet] = session.originSet;
  387. {
  388. const where = this.sessions;
  389. if (normalizedOptions in where) {
  390. const sessions = where[normalizedOptions];
  391. sessions.splice(getSortedIndex(sessions, session, compareSessions), 0, session);
  392. } else {
  393. where[normalizedOptions] = [session];
  394. }
  395. }
  396. this._freeSessionsCount += 1;
  397. receivedSettings = true;
  398. this.emit('session', session);
  399. processListeners();
  400. removeFromQueue();
  401. // TODO: Close last recently used (or least used?) session
  402. if (session[kCurrentStreamsCount] === 0 && this._freeSessionsCount > this.maxFreeSessions) {
  403. session.close();
  404. }
  405. // Check if we haven't managed to execute all listeners.
  406. if (listeners.length !== 0) {
  407. // Request for a new session with predefined listeners.
  408. this.getSession(normalizedOrigin, options, listeners);
  409. listeners.length = 0;
  410. }
  411. // `session.remoteSettings.maxConcurrentStreams` might get increased
  412. session.on('remoteSettings', () => {
  413. processListeners();
  414. // In case the Origin Set changes
  415. closeCoveredSessions(this.sessions[normalizedOptions], session);
  416. });
  417. });
  418. // Shim `session.request()` in order to catch all streams
  419. session[kRequest] = session.request;
  420. session.request = (headers, streamOptions) => {
  421. if (session[kGracefullyClosing]) {
  422. throw new Error('The session is gracefully closing. No new streams are allowed.');
  423. }
  424. const stream = session[kRequest](headers, streamOptions);
  425. // The process won't exit until the session is closed or all requests are gone.
  426. session.ref();
  427. ++session[kCurrentStreamsCount];
  428. if (session[kCurrentStreamsCount] === session.remoteSettings.maxConcurrentStreams) {
  429. this._freeSessionsCount--;
  430. }
  431. stream.once('close', () => {
  432. wasFree = isFree();
  433. --session[kCurrentStreamsCount];
  434. if (!session.destroyed && !session.closed) {
  435. closeSessionIfCovered(this.sessions[normalizedOptions], session);
  436. if (isFree() && !session.closed) {
  437. if (!wasFree) {
  438. this._freeSessionsCount++;
  439. wasFree = true;
  440. }
  441. const isEmpty = session[kCurrentStreamsCount] === 0;
  442. if (isEmpty) {
  443. session.unref();
  444. }
  445. if (
  446. isEmpty &&
  447. (
  448. this._freeSessionsCount > this.maxFreeSessions ||
  449. session[kGracefullyClosing]
  450. )
  451. ) {
  452. session.close();
  453. } else {
  454. closeCoveredSessions(this.sessions[normalizedOptions], session);
  455. processListeners();
  456. }
  457. }
  458. }
  459. });
  460. return stream;
  461. };
  462. } catch (error) {
  463. for (const listener of listeners) {
  464. listener.reject(error);
  465. }
  466. removeFromQueue();
  467. }
  468. };
  469. entry.listeners = listeners;
  470. entry.completed = false;
  471. entry.destroyed = false;
  472. this.queue[normalizedOptions][normalizedOrigin] = entry;
  473. this._tryToCreateNewSession(normalizedOptions, normalizedOrigin);
  474. });
  475. }
  476. request(origin, options, headers, streamOptions) {
  477. return new Promise((resolve, reject) => {
  478. this.getSession(origin, options, [{
  479. reject,
  480. resolve: session => {
  481. try {
  482. resolve(session.request(headers, streamOptions));
  483. } catch (error) {
  484. reject(error);
  485. }
  486. }
  487. }]);
  488. });
  489. }
  490. createConnection(origin, options) {
  491. return Agent.connect(origin, options);
  492. }
  493. static connect(origin, options) {
  494. options.ALPNProtocols = ['h2'];
  495. const port = origin.port || 443;
  496. const host = origin.hostname || origin.host;
  497. if (typeof options.servername === 'undefined') {
  498. options.servername = host;
  499. }
  500. return tls.connect(port, host, options);
  501. }
  502. closeFreeSessions() {
  503. for (const sessions of Object.values(this.sessions)) {
  504. for (const session of sessions) {
  505. if (session[kCurrentStreamsCount] === 0) {
  506. session.close();
  507. }
  508. }
  509. }
  510. }
  511. destroy(reason) {
  512. for (const sessions of Object.values(this.sessions)) {
  513. for (const session of sessions) {
  514. session.destroy(reason);
  515. }
  516. }
  517. for (const entriesOfAuthority of Object.values(this.queue)) {
  518. for (const entry of Object.values(entriesOfAuthority)) {
  519. entry.destroyed = true;
  520. }
  521. }
  522. // New requests should NOT attach to destroyed sessions
  523. this.queue = {};
  524. }
  525. get freeSessions() {
  526. return getSessions({agent: this, isFree: true});
  527. }
  528. get busySessions() {
  529. return getSessions({agent: this, isFree: false});
  530. }
  531. }
  532. Agent.kCurrentStreamsCount = kCurrentStreamsCount;
  533. Agent.kGracefullyClosing = kGracefullyClosing;
  534. module.exports = {
  535. Agent,
  536. globalAgent: new Agent()
  537. };