mirror of
https://github.com/titanscouting/tra-analysis.git
synced 2025-01-27 15:15:54 +00:00
340 lines
12 KiB
JavaScript
340 lines
12 KiB
JavaScript
|
"use strict";
|
||
|
Object.defineProperty(exports, "__esModule", { value: true });
|
||
|
var ConfigStore = require("configstore");
|
||
|
var crypto = require("crypto");
|
||
|
var r = require("request");
|
||
|
var stream_1 = require("stream");
|
||
|
var util = require("util");
|
||
|
var streamEvents = require('stream-events');
|
||
|
var googleAuth = require('google-auto-auth');
|
||
|
var pumpify = require('pumpify');
|
||
|
var request = r.defaults({ json: true, pool: { maxSockets: Infinity } });
|
||
|
var BASE_URI = 'https://www.googleapis.com/upload/storage/v1/b';
|
||
|
var TERMINATED_UPLOAD_STATUS_CODE = 410;
|
||
|
var RESUMABLE_INCOMPLETE_STATUS_CODE = 308;
|
||
|
var RETRY_LIMIT = 5;
|
||
|
var wrapError = function (message, err) {
|
||
|
return new Error([message, err.message].join('\n'));
|
||
|
};
|
||
|
function Upload(cfg) {
|
||
|
var _this = this;
|
||
|
pumpify.call(this);
|
||
|
streamEvents.call(this);
|
||
|
cfg = cfg || {};
|
||
|
if (!cfg.bucket || !cfg.file) {
|
||
|
throw new Error('A bucket and file name are required');
|
||
|
}
|
||
|
cfg.authConfig = cfg.authConfig || {};
|
||
|
cfg.authConfig.scopes =
|
||
|
['https://www.googleapis.com/auth/devstorage.full_control'];
|
||
|
this.authClient = cfg.authClient || googleAuth(cfg.authConfig);
|
||
|
this.bucket = cfg.bucket;
|
||
|
this.file = cfg.file;
|
||
|
this.generation = cfg.generation;
|
||
|
this.kmsKeyName = cfg.kmsKeyName;
|
||
|
this.metadata = cfg.metadata || {};
|
||
|
this.offset = cfg.offset;
|
||
|
this.origin = cfg.origin;
|
||
|
this.userProject = cfg.userProject;
|
||
|
if (cfg.key) {
|
||
|
/**
|
||
|
* NOTE: This is `as string` because there appears to be some weird kind
|
||
|
* of TypeScript bug as 2.8. Tracking the issue here:
|
||
|
* https://github.com/Microsoft/TypeScript/issues/23155
|
||
|
*/
|
||
|
var base64Key = Buffer.from(cfg.key).toString('base64');
|
||
|
this.encryption = {
|
||
|
key: base64Key,
|
||
|
hash: crypto.createHash('sha256').update(cfg.key).digest('base64')
|
||
|
};
|
||
|
}
|
||
|
this.predefinedAcl = cfg.predefinedAcl;
|
||
|
if (cfg.private)
|
||
|
this.predefinedAcl = 'private';
|
||
|
if (cfg.public)
|
||
|
this.predefinedAcl = 'publicRead';
|
||
|
this.configStore = new ConfigStore('gcs-resumable-upload');
|
||
|
this.uriProvidedManually = !!cfg.uri;
|
||
|
this.uri = cfg.uri || this.get('uri');
|
||
|
this.numBytesWritten = 0;
|
||
|
this.numRetries = 0;
|
||
|
var contentLength = cfg.metadata ? Number(cfg.metadata.contentLength) : NaN;
|
||
|
this.contentLength = isNaN(contentLength) ? '*' : contentLength;
|
||
|
this.once('writing', function () {
|
||
|
if (_this.uri) {
|
||
|
_this.continueUploading();
|
||
|
}
|
||
|
else {
|
||
|
_this.createURI(function (err) {
|
||
|
if (err) {
|
||
|
return _this.destroy(err);
|
||
|
}
|
||
|
_this.startUploading();
|
||
|
});
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
util.inherits(Upload, pumpify);
|
||
|
Upload.prototype.createURI = function (callback) {
|
||
|
var _this = this;
|
||
|
var metadata = this.metadata;
|
||
|
var reqOpts = {
|
||
|
method: 'POST',
|
||
|
uri: [BASE_URI, this.bucket, 'o'].join('/'),
|
||
|
qs: { name: this.file, uploadType: 'resumable' },
|
||
|
json: metadata,
|
||
|
headers: {}
|
||
|
};
|
||
|
if (metadata.contentLength) {
|
||
|
reqOpts.headers['X-Upload-Content-Length'] = metadata.contentLength;
|
||
|
}
|
||
|
if (metadata.contentType) {
|
||
|
reqOpts.headers['X-Upload-Content-Type'] = metadata.contentType;
|
||
|
}
|
||
|
if (typeof this.generation !== 'undefined') {
|
||
|
reqOpts.qs.ifGenerationMatch = this.generation;
|
||
|
}
|
||
|
if (this.kmsKeyName) {
|
||
|
reqOpts.qs.kmsKeyName = this.kmsKeyName;
|
||
|
}
|
||
|
if (this.predefinedAcl) {
|
||
|
reqOpts.qs.predefinedAcl = this.predefinedAcl;
|
||
|
}
|
||
|
if (this.origin) {
|
||
|
reqOpts.headers.Origin = this.origin;
|
||
|
}
|
||
|
this.makeRequest(reqOpts, function (err, resp) {
|
||
|
if (err) {
|
||
|
return callback(err);
|
||
|
}
|
||
|
var uri = resp.headers.location;
|
||
|
_this.uri = uri;
|
||
|
_this.set({ uri: uri });
|
||
|
_this.offset = 0;
|
||
|
callback(null, uri);
|
||
|
});
|
||
|
};
|
||
|
Upload.prototype.continueUploading = function () {
|
||
|
if (typeof this.offset === 'number') {
|
||
|
return this.startUploading();
|
||
|
}
|
||
|
this.getAndSetOffset(this.startUploading.bind(this));
|
||
|
};
|
||
|
Upload.prototype.startUploading = function () {
|
||
|
var _this = this;
|
||
|
var reqOpts = {
|
||
|
method: 'PUT',
|
||
|
uri: this.uri,
|
||
|
headers: { 'Content-Range': 'bytes ' + this.offset + '-*/' + this.contentLength }
|
||
|
};
|
||
|
var bufferStream = this.bufferStream = new stream_1.PassThrough();
|
||
|
var offsetStream = this.offsetStream =
|
||
|
new stream_1.PassThrough({ transform: this.onChunk.bind(this) });
|
||
|
var delayStream = new stream_1.PassThrough();
|
||
|
this.getRequestStream(reqOpts, function (requestStream) {
|
||
|
_this.setPipeline(bufferStream, offsetStream, requestStream, delayStream);
|
||
|
// wait for "complete" from request before letting the stream finish
|
||
|
delayStream.on('prefinish', function () {
|
||
|
_this.cork();
|
||
|
});
|
||
|
requestStream.on('complete', function (resp) {
|
||
|
if (resp.statusCode < 200 || resp.statusCode > 299) {
|
||
|
_this.destroy(new Error('Upload failed'));
|
||
|
return;
|
||
|
}
|
||
|
_this.emit('metadata', resp.body);
|
||
|
_this.deleteConfig();
|
||
|
_this.uncork();
|
||
|
});
|
||
|
});
|
||
|
};
|
||
|
Upload.prototype.onChunk = function (chunk, enc, next) {
|
||
|
var offset = this.offset;
|
||
|
var numBytesWritten = this.numBytesWritten;
|
||
|
// check if this is the same content uploaded previously. this caches a slice
|
||
|
// of the first chunk, then compares it with the first byte of incoming data
|
||
|
if (numBytesWritten === 0) {
|
||
|
var cachedFirstChunk = this.get('firstChunk');
|
||
|
var firstChunk = chunk.slice(0, 16).valueOf();
|
||
|
if (!cachedFirstChunk) {
|
||
|
// This is a new upload. Cache the first chunk.
|
||
|
this.set({ uri: this.uri, firstChunk: firstChunk });
|
||
|
}
|
||
|
else {
|
||
|
// this continues an upload in progress. check if the bytes are the same
|
||
|
cachedFirstChunk = Buffer.from(cachedFirstChunk);
|
||
|
var nextChunk = Buffer.from(firstChunk);
|
||
|
if (Buffer.compare(cachedFirstChunk, nextChunk) !== 0) {
|
||
|
// this data is not the same. start a new upload
|
||
|
this.bufferStream.unshift(chunk);
|
||
|
this.bufferStream.unpipe(this.offsetStream);
|
||
|
this.restart();
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
var length = chunk.length;
|
||
|
if (typeof chunk === 'string')
|
||
|
length = Buffer.byteLength(chunk, enc);
|
||
|
if (numBytesWritten < offset)
|
||
|
chunk = chunk.slice(offset - numBytesWritten);
|
||
|
this.numBytesWritten += length;
|
||
|
// only push data from the byte after the one we left off on
|
||
|
next(null, this.numBytesWritten > offset ? chunk : undefined);
|
||
|
};
|
||
|
Upload.prototype.getAndSetOffset = function (callback) {
|
||
|
var _this = this;
|
||
|
this.makeRequest({
|
||
|
method: 'PUT',
|
||
|
uri: this.uri,
|
||
|
headers: { 'Content-Length': 0, 'Content-Range': 'bytes */*' }
|
||
|
}, function (err, resp) {
|
||
|
if (err) {
|
||
|
// we don't return a 404 to the user if they provided the resumable
|
||
|
// URI. if we're just using the configstore file to tell us that this
|
||
|
// file exists, and it turns out that it doesn't (the 404), that's
|
||
|
// probably stale config data.
|
||
|
if (resp && resp.statusCode === 404 && !_this.uriProvidedManually) {
|
||
|
return _this.restart();
|
||
|
}
|
||
|
// this resumable upload is unrecoverable (bad data or service error).
|
||
|
// -
|
||
|
// https://github.com/stephenplusplus/gcs-resumable-upload/issues/15
|
||
|
// -
|
||
|
// https://github.com/stephenplusplus/gcs-resumable-upload/pull/16#discussion_r80363774
|
||
|
if (resp && resp.statusCode === TERMINATED_UPLOAD_STATUS_CODE) {
|
||
|
return _this.restart();
|
||
|
}
|
||
|
return _this.destroy(err);
|
||
|
}
|
||
|
if (resp.statusCode === RESUMABLE_INCOMPLETE_STATUS_CODE) {
|
||
|
if (resp.headers.range) {
|
||
|
var range = resp.headers.range;
|
||
|
_this.offset = Number(range.split('-')[1]) + 1;
|
||
|
callback();
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
_this.offset = 0;
|
||
|
callback();
|
||
|
});
|
||
|
};
|
||
|
Upload.prototype.makeRequest = function (reqOpts, callback) {
|
||
|
if (this.encryption) {
|
||
|
reqOpts.headers = reqOpts.headers || {};
|
||
|
reqOpts.headers['x-goog-encryption-algorithm'] = 'AES256';
|
||
|
reqOpts.headers['x-goog-encryption-key'] = this.encryption.key;
|
||
|
reqOpts.headers['x-goog-encryption-key-sha256'] = this.encryption.hash;
|
||
|
}
|
||
|
if (this.userProject) {
|
||
|
reqOpts.qs = reqOpts.qs || {};
|
||
|
reqOpts.qs.userProject = this.userProject;
|
||
|
}
|
||
|
this.authClient.authorizeRequest(reqOpts, function (err, authorizedReqOpts) {
|
||
|
if (err) {
|
||
|
err = wrapError('Could not authenticate request', err);
|
||
|
return callback(err, null, null);
|
||
|
}
|
||
|
request(authorizedReqOpts, function (err, resp, body) {
|
||
|
if (err) {
|
||
|
return callback(err, resp, body);
|
||
|
}
|
||
|
if (body && body.error) {
|
||
|
return callback(body.error, resp, body);
|
||
|
}
|
||
|
var nonSuccess = Math.floor(resp.statusCode / 100) !== 2; // 200-299 status code
|
||
|
if (nonSuccess &&
|
||
|
resp.statusCode !== RESUMABLE_INCOMPLETE_STATUS_CODE) {
|
||
|
return callback(new Error(body), resp, body);
|
||
|
}
|
||
|
callback(null, resp, body);
|
||
|
});
|
||
|
});
|
||
|
};
|
||
|
Upload.prototype.getRequestStream = function (reqOpts, callback) {
|
||
|
var _this = this;
|
||
|
if (this.userProject) {
|
||
|
reqOpts.qs = reqOpts.qs || {};
|
||
|
reqOpts.qs.userProject = this.userProject;
|
||
|
}
|
||
|
this.authClient.authorizeRequest(reqOpts, function (err, authorizedReqOpts) {
|
||
|
if (err) {
|
||
|
return _this.destroy(wrapError('Could not authenticate request', err));
|
||
|
}
|
||
|
var requestStream = request(authorizedReqOpts);
|
||
|
requestStream.on('error', _this.destroy.bind(_this));
|
||
|
requestStream.on('response', _this.onResponse.bind(_this));
|
||
|
requestStream.on('complete', function (resp) {
|
||
|
var body = resp.body;
|
||
|
if (body && body.error)
|
||
|
_this.destroy(body.error);
|
||
|
});
|
||
|
// this makes the response body come back in the response (weird?)
|
||
|
requestStream.callback = function () { };
|
||
|
callback(requestStream);
|
||
|
});
|
||
|
};
|
||
|
Upload.prototype.restart = function () {
|
||
|
var _this = this;
|
||
|
this.numBytesWritten = 0;
|
||
|
this.deleteConfig();
|
||
|
this.createURI(function (err) {
|
||
|
if (err) {
|
||
|
return _this.destroy(err);
|
||
|
}
|
||
|
_this.startUploading();
|
||
|
});
|
||
|
};
|
||
|
Upload.prototype.get = function (prop) {
|
||
|
var store = this.configStore.get([this.bucket, this.file].join('/'));
|
||
|
return store && store[prop];
|
||
|
};
|
||
|
// tslint:disable-next-line no-any
|
||
|
Upload.prototype.set = function (props) {
|
||
|
this.configStore.set([this.bucket, this.file].join('/'), props);
|
||
|
};
|
||
|
Upload.prototype.deleteConfig = function () {
|
||
|
this.configStore.delete([this.bucket, this.file].join('/'));
|
||
|
};
|
||
|
/**
|
||
|
* @return {bool} is the request good?
|
||
|
*/
|
||
|
Upload.prototype.onResponse = function (resp) {
|
||
|
if (resp.statusCode === 404) {
|
||
|
if (this.numRetries < RETRY_LIMIT) {
|
||
|
this.numRetries++;
|
||
|
this.startUploading();
|
||
|
}
|
||
|
else {
|
||
|
this.destroy(new Error('Retry limit exceeded'));
|
||
|
}
|
||
|
return false;
|
||
|
}
|
||
|
if (resp.statusCode > 499 && resp.statusCode < 600) {
|
||
|
if (this.numRetries < RETRY_LIMIT) {
|
||
|
var randomMs = Math.round(Math.random() * 1000);
|
||
|
var waitTime = Math.pow(2, this.numRetries) * 1000 + randomMs;
|
||
|
this.numRetries++;
|
||
|
setTimeout(this.continueUploading.bind(this), waitTime);
|
||
|
}
|
||
|
else {
|
||
|
this.destroy(new Error('Retry limit exceeded'));
|
||
|
}
|
||
|
return false;
|
||
|
}
|
||
|
this.emit('response', resp);
|
||
|
return true;
|
||
|
};
|
||
|
function upload(cfg) {
|
||
|
// tslint:disable-next-line no-any
|
||
|
return new Upload(cfg);
|
||
|
}
|
||
|
// tslint:disable-next-line no-any
|
||
|
upload.createURI =
|
||
|
function (cfg, callback) {
|
||
|
// tslint:disable-next-line no-any
|
||
|
var up = new Upload(cfg);
|
||
|
up.createURI(callback);
|
||
|
};
|
||
|
module.exports = upload;
|
||
|
//# sourceMappingURL=index.js.map
|