diff --git a/src/table.ts b/src/table.ts index 8fbbe47f6..5eac0f790 100644 --- a/src/table.ts +++ b/src/table.ts @@ -45,7 +45,9 @@ import {Duplex} from 'stream'; // See protos/google/rpc/code.proto // (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE) -const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]); +const UNAVAILABLE_STATUS_CODE = 14; +const INTERNAL_STATUS_CODE = 13; +const RETRYABLE_STATUS_CODES = new Set([4, 10, UNAVAILABLE_STATUS_CODE]); // (1=CANCELLED) const IGNORED_STATUS_CODES = new Set([1]); @@ -929,9 +931,23 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); return; } rowStream.unpipe(userStream); + + // Under normal conditions RST_STREAM errors will get wrapped as UNAVAILABLE. + // However if the RST_STREAM occurs further upstream, it can come down + // as an INTERNAL. This is a workaround to enable retrying those errors. + let effectiveCode = error.code; + if (effectiveCode === INTERNAL_STATUS_CODE) { + const errMsg = (error.message || '').toLowerCase(); + if ( + errMsg.indexOf('rst stream') > -1 || + errMsg.indexOf('rst_stream') > -1 + ) { + effectiveCode = UNAVAILABLE_STATUS_CODE; + } + } if ( numRequestsMade <= maxRetries && - RETRYABLE_STATUS_CODES.has(error.code) + RETRYABLE_STATUS_CODES.has(effectiveCode) ) { makeNewRequest(); } else { diff --git a/test/table.ts b/test/table.ts index 88c5f7eb5..c4a9e326b 100644 --- a/test/table.ts +++ b/test/table.ts @@ -1217,6 +1217,27 @@ describe('Bigtable/Table', () => { }); }); + it('should retry the stream on internal rst_stream errors', done => { + emitters = [ + ((stream: Writable) => { + const error = new Error( + 'Received RST_STREAM with code 2 (Internal server error)' + ) as ServiceError; + error.code = 13; // INTERNAL + stream.emit('error', error); + stream.end(); + }) as {} as EventEmitter, + ((stream: Writable) => { + stream.end(); + }) as {} as EventEmitter, + ]; + + callCreateReadStream(null, () => { + assert.strictEqual(reqOptsCalls.length, 2); + done(); + }); + }); + it('should not retry CANCELLED errors', done => { emitters = [ ((stream: Writable) => {