mirror of
https://github.com/titanscouting/tra-analysis.git
synced 2025-02-28 03:45:47 +00:00
998 lines
31 KiB
JavaScript
998 lines
31 KiB
JavaScript
|
/**
|
||
|
* @license
|
||
|
* Copyright 2015 gRPC authors.
|
||
|
*
|
||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
* you may not use this file except in compliance with the License.
|
||
|
* You may obtain a copy of the License at
|
||
|
*
|
||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||
|
*
|
||
|
* Unless required by applicable law or agreed to in writing, software
|
||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
* See the License for the specific language governing permissions and
|
||
|
* limitations under the License.
|
||
|
*
|
||
|
*/
|
||
|
|
||
|
'use strict';
|
||
|
|
||
|
var grpc = require('./grpc_extension');
|
||
|
|
||
|
var common = require('./common');
|
||
|
|
||
|
var Metadata = require('./metadata');
|
||
|
|
||
|
var constants = require('./constants');
|
||
|
|
||
|
var stream = require('stream');
|
||
|
|
||
|
var Readable = stream.Readable;
|
||
|
var Writable = stream.Writable;
|
||
|
var Duplex = stream.Duplex;
|
||
|
var util = require('util');
|
||
|
|
||
|
var EventEmitter = require('events').EventEmitter;
|
||
|
|
||
|
/**
|
||
|
* Handle an error on a call by sending it as a status
|
||
|
* @private
|
||
|
* @param {grpc.internal~Call} call The call to send the error on
|
||
|
* @param {(Object|Error)} error The error object
|
||
|
*/
|
||
|
function handleError(call, error) {
|
||
|
var statusMetadata = new Metadata();
|
||
|
var status = {
|
||
|
code: constants.status.UNKNOWN,
|
||
|
details: 'Unknown Error'
|
||
|
};
|
||
|
if (error.hasOwnProperty('message')) {
|
||
|
status.details = error.message;
|
||
|
}
|
||
|
if (error.hasOwnProperty('code') && Number.isInteger(error.code)) {
|
||
|
status.code = error.code;
|
||
|
if (error.hasOwnProperty('details')) {
|
||
|
status.details = error.details;
|
||
|
}
|
||
|
}
|
||
|
if (error.hasOwnProperty('metadata')) {
|
||
|
statusMetadata = error.metadata;
|
||
|
}
|
||
|
status.metadata = statusMetadata._getCoreRepresentation();
|
||
|
var error_batch = {};
|
||
|
if (!call.metadataSent) {
|
||
|
error_batch[grpc.opType.SEND_INITIAL_METADATA] =
|
||
|
(new Metadata())._getCoreRepresentation();
|
||
|
}
|
||
|
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
|
||
|
call.startBatch(error_batch, function(){});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Send a response to a unary or client streaming call.
|
||
|
* @private
|
||
|
* @param {grpc.Call} call The call to respond on
|
||
|
* @param {*} value The value to respond with
|
||
|
* @param {grpc~serialize} serialize Serialization function for the
|
||
|
* response
|
||
|
* @param {grpc.Metadata=} metadata Optional trailing metadata to send with
|
||
|
* status
|
||
|
* @param {number=} [flags=0] Flags for modifying how the message is sent.
|
||
|
*/
|
||
|
function sendUnaryResponse(call, value, serialize, metadata, flags) {
|
||
|
var end_batch = {};
|
||
|
var statusMetadata = new Metadata();
|
||
|
var status = {
|
||
|
code: constants.status.OK,
|
||
|
details: 'OK'
|
||
|
};
|
||
|
if (metadata) {
|
||
|
statusMetadata = metadata;
|
||
|
}
|
||
|
var message;
|
||
|
try {
|
||
|
message = serialize(value);
|
||
|
} catch (e) {
|
||
|
e.code = constants.status.INTERNAL;
|
||
|
handleError(call, e);
|
||
|
return;
|
||
|
}
|
||
|
status.metadata = statusMetadata._getCoreRepresentation();
|
||
|
if (!call.metadataSent) {
|
||
|
end_batch[grpc.opType.SEND_INITIAL_METADATA] =
|
||
|
(new Metadata())._getCoreRepresentation();
|
||
|
call.metadataSent = true;
|
||
|
}
|
||
|
message.grpcWriteFlags = flags;
|
||
|
end_batch[grpc.opType.SEND_MESSAGE] = message;
|
||
|
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
|
||
|
call.startBatch(end_batch, function (){});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Initialize a writable stream. This is used for both the writable and duplex
|
||
|
* stream constructors.
|
||
|
* @private
|
||
|
* @param {Writable} stream The stream to set up
|
||
|
* @param {function(*):Buffer=} Serialization function for responses
|
||
|
*/
|
||
|
function setUpWritable(stream, serialize) {
|
||
|
stream.finished = false;
|
||
|
stream.status = {
|
||
|
code : constants.status.OK,
|
||
|
details : 'OK',
|
||
|
metadata : new Metadata()
|
||
|
};
|
||
|
stream.serialize = common.wrapIgnoreNull(serialize);
|
||
|
function sendStatus() {
|
||
|
var batch = {};
|
||
|
if (!stream.call.metadataSent) {
|
||
|
stream.call.metadataSent = true;
|
||
|
batch[grpc.opType.SEND_INITIAL_METADATA] =
|
||
|
(new Metadata())._getCoreRepresentation();
|
||
|
}
|
||
|
|
||
|
if (stream.status.metadata) {
|
||
|
stream.status.metadata = stream.status.metadata._getCoreRepresentation();
|
||
|
}
|
||
|
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
|
||
|
stream.call.startBatch(batch, function(){});
|
||
|
}
|
||
|
stream.on('finish', sendStatus);
|
||
|
/**
|
||
|
* Set the pending status to a given error status. If the error does not have
|
||
|
* code or details properties, the code will be set to grpc.status.UNKNOWN
|
||
|
* and the details will be set to 'Unknown Error'.
|
||
|
* @param {Error} err The error object
|
||
|
*/
|
||
|
function setStatus(err) {
|
||
|
var code = constants.status.UNKNOWN;
|
||
|
var details = 'Unknown Error';
|
||
|
var metadata = new Metadata();
|
||
|
if (err.hasOwnProperty('message')) {
|
||
|
details = err.message;
|
||
|
}
|
||
|
if (err.hasOwnProperty('code')) {
|
||
|
code = err.code;
|
||
|
if (err.hasOwnProperty('details')) {
|
||
|
details = err.details;
|
||
|
}
|
||
|
}
|
||
|
if (err.hasOwnProperty('metadata')) {
|
||
|
metadata = err.metadata;
|
||
|
}
|
||
|
stream.status = {code: code, details: details, metadata: metadata};
|
||
|
}
|
||
|
/**
|
||
|
* Terminate the call. This includes indicating that reads are done, draining
|
||
|
* all pending writes, and sending the given error as a status
|
||
|
* @param {Error} err The error object
|
||
|
* @this GrpcServerStream
|
||
|
*/
|
||
|
function terminateCall(err) {
|
||
|
// Drain readable data
|
||
|
setStatus(err);
|
||
|
stream.end();
|
||
|
}
|
||
|
stream.on('error', terminateCall);
|
||
|
/**
|
||
|
* Override of Writable#end method that allows for sending metadata with a
|
||
|
* success status.
|
||
|
* @param {Metadata=} metadata Metadata to send with the status
|
||
|
*/
|
||
|
stream.end = function(metadata) {
|
||
|
if (metadata) {
|
||
|
stream.status.metadata = metadata;
|
||
|
}
|
||
|
Writable.prototype.end.call(this);
|
||
|
};
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Initialize a readable stream. This is used for both the readable and duplex
|
||
|
* stream constructors.
|
||
|
* @private
|
||
|
* @param {Readable} stream The stream to initialize
|
||
|
* @param {grpc~deserialize} deserialize Deserialization function for
|
||
|
* incoming data.
|
||
|
*/
|
||
|
function setUpReadable(stream, deserialize) {
|
||
|
stream.deserialize = common.wrapIgnoreNull(deserialize);
|
||
|
stream.finished = false;
|
||
|
stream.reading = false;
|
||
|
|
||
|
stream.terminate = function() {
|
||
|
stream.finished = true;
|
||
|
stream.on('data', function() {});
|
||
|
};
|
||
|
|
||
|
stream.on('cancelled', function() {
|
||
|
stream.terminate();
|
||
|
});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Emitted when the call has been cancelled. After this has been emitted, the
|
||
|
* call's `cancelled` property will be set to `true`.
|
||
|
* @event grpc~ServerUnaryCall~cancelled
|
||
|
*/
|
||
|
|
||
|
util.inherits(ServerUnaryCall, EventEmitter);
|
||
|
|
||
|
/**
|
||
|
* An EventEmitter. Used for unary calls.
|
||
|
* @constructor grpc~ServerUnaryCall
|
||
|
* @extends external:EventEmitter
|
||
|
* @param {grpc.internal~Call} call The call object associated with the request
|
||
|
* @param {grpc.Metadata} metadata The request metadata from the client
|
||
|
*/
|
||
|
function ServerUnaryCall(call, metadata) {
|
||
|
EventEmitter.call(this);
|
||
|
this.call = call;
|
||
|
/**
|
||
|
* Indicates if the call has been cancelled
|
||
|
* @member {boolean} grpc~ServerUnaryCall#cancelled
|
||
|
*/
|
||
|
this.cancelled = false;
|
||
|
/**
|
||
|
* The request metadata from the client
|
||
|
* @member {grpc.Metadata} grpc~ServerUnaryCall#metadata
|
||
|
*/
|
||
|
this.metadata = metadata;
|
||
|
/**
|
||
|
* The request message from the client
|
||
|
* @member {*} grpc~ServerUnaryCall#request
|
||
|
*/
|
||
|
this.request = undefined;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Emitted when the call has been cancelled. After this has been emitted, the
|
||
|
* call's `cancelled` property will be set to `true`.
|
||
|
* @event grpc~ServerWritableStream~cancelled
|
||
|
*/
|
||
|
|
||
|
util.inherits(ServerWritableStream, Writable);
|
||
|
|
||
|
/**
|
||
|
* A stream that the server can write to. Used for calls that are streaming from
|
||
|
* the server side.
|
||
|
* @constructor grpc~ServerWritableStream
|
||
|
* @extends external:Writable
|
||
|
* @borrows grpc~ServerUnaryCall#sendMetadata as
|
||
|
* grpc~ServerWritableStream#sendMetadata
|
||
|
* @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerWritableStream#getPeer
|
||
|
* @param {grpc.internal~Call} call The call object to send data with
|
||
|
* @param {grpc.Metadata} metadata The request metadata from the client
|
||
|
* @param {grpc~serialize} serialize Serialization function for writes
|
||
|
*/
|
||
|
function ServerWritableStream(call, metadata, serialize) {
|
||
|
Writable.call(this, {objectMode: true});
|
||
|
this.call = call;
|
||
|
|
||
|
this.finished = false;
|
||
|
setUpWritable(this, serialize);
|
||
|
/**
|
||
|
* Indicates if the call has been cancelled
|
||
|
* @member {boolean} grpc~ServerWritableStream#cancelled
|
||
|
*/
|
||
|
this.cancelled = false;
|
||
|
/**
|
||
|
* The request metadata from the client
|
||
|
* @member {grpc.Metadata} grpc~ServerWritableStream#metadata
|
||
|
*/
|
||
|
this.metadata = metadata;
|
||
|
/**
|
||
|
* The request message from the client
|
||
|
* @member {*} grpc~ServerWritableStream#request
|
||
|
*/
|
||
|
this.request = undefined;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Start writing a chunk of data. This is an implementation of a method required
|
||
|
* for implementing stream.Writable.
|
||
|
* @private
|
||
|
* @param {Buffer} chunk The chunk of data to write
|
||
|
* @param {string} encoding Used to pass write flags
|
||
|
* @param {function(Error=)} callback Callback to indicate that the write is
|
||
|
* complete
|
||
|
*/
|
||
|
function _write(chunk, encoding, callback) {
|
||
|
/* jshint validthis: true */
|
||
|
var batch = {};
|
||
|
var self = this;
|
||
|
var message;
|
||
|
try {
|
||
|
message = this.serialize(chunk);
|
||
|
} catch (e) {
|
||
|
e.code = constants.status.INTERNAL;
|
||
|
callback(e);
|
||
|
return;
|
||
|
}
|
||
|
if (!this.call.metadataSent) {
|
||
|
batch[grpc.opType.SEND_INITIAL_METADATA] =
|
||
|
(new Metadata())._getCoreRepresentation();
|
||
|
this.call.metadataSent = true;
|
||
|
}
|
||
|
if (Number.isFinite(encoding)) {
|
||
|
/* Attach the encoding if it is a finite number. This is the closest we
|
||
|
* can get to checking that it is valid flags */
|
||
|
message.grpcWriteFlags = encoding;
|
||
|
}
|
||
|
batch[grpc.opType.SEND_MESSAGE] = message;
|
||
|
this.call.startBatch(batch, function(err, value) {
|
||
|
if (err) {
|
||
|
self.emit('error', err);
|
||
|
return;
|
||
|
}
|
||
|
callback();
|
||
|
});
|
||
|
}
|
||
|
|
||
|
ServerWritableStream.prototype._write = _write;
|
||
|
|
||
|
/**
|
||
|
* Emitted when the call has been cancelled. After this has been emitted, the
|
||
|
* call's `cancelled` property will be set to `true`.
|
||
|
* @event grpc~ServerReadableStream~cancelled
|
||
|
*/
|
||
|
|
||
|
util.inherits(ServerReadableStream, Readable);
|
||
|
|
||
|
/**
|
||
|
* A stream that the server can read from. Used for calls that are streaming
|
||
|
* from the client side.
|
||
|
* @constructor grpc~ServerReadableStream
|
||
|
* @extends external:Readable
|
||
|
* @borrows grpc~ServerUnaryCall#sendMetadata as
|
||
|
* grpc~ServerReadableStream#sendMetadata
|
||
|
* @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerReadableStream#getPeer
|
||
|
* @param {grpc.internal~Call} call The call object to read data with
|
||
|
* @param {grpc.Metadata} metadata The request metadata from the client
|
||
|
* @param {grpc~deserialize} deserialize Deserialization function for reads
|
||
|
*/
|
||
|
function ServerReadableStream(call, metadata, deserialize) {
|
||
|
Readable.call(this, {objectMode: true});
|
||
|
this.call = call;
|
||
|
setUpReadable(this, deserialize);
|
||
|
/**
|
||
|
* Indicates if the call has been cancelled
|
||
|
* @member {boolean} grpc~ServerReadableStream#cancelled
|
||
|
*/
|
||
|
this.cancelled = false;
|
||
|
/**
|
||
|
* The request metadata from the client
|
||
|
* @member {grpc.Metadata} grpc~ServerReadableStream#metadata
|
||
|
*/
|
||
|
this.metadata = metadata;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Start reading from the gRPC data source. This is an implementation of a
|
||
|
* method required for implementing stream.Readable
|
||
|
* @access private
|
||
|
* @param {number} size Ignored
|
||
|
*/
|
||
|
function _read(size) {
|
||
|
/* jshint validthis: true */
|
||
|
var self = this;
|
||
|
/**
|
||
|
* Callback to be called when a READ event is received. Pushes the data onto
|
||
|
* the read queue and starts reading again if applicable
|
||
|
* @param {grpc.Event} event READ event object
|
||
|
*/
|
||
|
function readCallback(err, event) {
|
||
|
if (err) {
|
||
|
self.terminate();
|
||
|
return;
|
||
|
}
|
||
|
if (self.finished) {
|
||
|
self.push(null);
|
||
|
return;
|
||
|
}
|
||
|
var data = event.read;
|
||
|
var deserialized;
|
||
|
try {
|
||
|
deserialized = self.deserialize(data);
|
||
|
} catch (e) {
|
||
|
e.code = constants.status.INTERNAL;
|
||
|
self.emit('error', e);
|
||
|
return;
|
||
|
}
|
||
|
if (self.push(deserialized) && data !== null) {
|
||
|
var read_batch = {};
|
||
|
read_batch[grpc.opType.RECV_MESSAGE] = true;
|
||
|
self.call.startBatch(read_batch, readCallback);
|
||
|
} else {
|
||
|
self.reading = false;
|
||
|
}
|
||
|
}
|
||
|
if (self.finished) {
|
||
|
self.push(null);
|
||
|
} else {
|
||
|
if (!self.reading) {
|
||
|
self.reading = true;
|
||
|
var batch = {};
|
||
|
batch[grpc.opType.RECV_MESSAGE] = true;
|
||
|
self.call.startBatch(batch, readCallback);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
ServerReadableStream.prototype._read = _read;
|
||
|
|
||
|
/**
|
||
|
* Emitted when the call has been cancelled. After this has been emitted, the
|
||
|
* call's `cancelled` property will be set to `true`.
|
||
|
* @event grpc~ServerDuplexStream~cancelled
|
||
|
*/
|
||
|
|
||
|
util.inherits(ServerDuplexStream, Duplex);
|
||
|
|
||
|
/**
|
||
|
* A stream that the server can read from or write to. Used for calls with
|
||
|
* duplex streaming.
|
||
|
* @constructor grpc~ServerDuplexStream
|
||
|
* @extends external:Duplex
|
||
|
* @borrows grpc~ServerUnaryCall#sendMetadata as
|
||
|
* grpc~ServerDuplexStream#sendMetadata
|
||
|
* @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerDuplexStream#getPeer
|
||
|
* @param {grpc.internal~Call} call Call object to proxy
|
||
|
* @param {grpc.Metadata} metadata The request metadata from the client
|
||
|
* @param {grpc~serialize} serialize Serialization function for requests
|
||
|
* @param {grpc~deserialize} deserialize Deserialization function for
|
||
|
* responses
|
||
|
*/
|
||
|
function ServerDuplexStream(call, metadata, serialize, deserialize) {
|
||
|
Duplex.call(this, {objectMode: true});
|
||
|
this.call = call;
|
||
|
setUpWritable(this, serialize);
|
||
|
setUpReadable(this, deserialize);
|
||
|
/**
|
||
|
* Indicates if the call has been cancelled
|
||
|
* @member {boolean} grpc~ServerReadableStream#cancelled
|
||
|
*/
|
||
|
this.cancelled = false;
|
||
|
/**
|
||
|
* The request metadata from the client
|
||
|
* @member {grpc.Metadata} grpc~ServerReadableStream#metadata
|
||
|
*/
|
||
|
this.metadata = metadata;
|
||
|
}
|
||
|
|
||
|
ServerDuplexStream.prototype._read = _read;
|
||
|
ServerDuplexStream.prototype._write = _write;
|
||
|
|
||
|
/**
|
||
|
* Send the initial metadata for a writable stream.
|
||
|
* @alias grpc~ServerUnaryCall#sendMetadata
|
||
|
* @param {grpc.Metadata} responseMetadata Metadata to send
|
||
|
*/
|
||
|
function sendMetadata(responseMetadata) {
|
||
|
/* jshint validthis: true */
|
||
|
var self = this;
|
||
|
if (!this.call.metadataSent) {
|
||
|
this.call.metadataSent = true;
|
||
|
var batch = {};
|
||
|
batch[grpc.opType.SEND_INITIAL_METADATA] =
|
||
|
responseMetadata._getCoreRepresentation();
|
||
|
this.call.startBatch(batch, function(err) {
|
||
|
if (err) {
|
||
|
self.emit('error', err);
|
||
|
return;
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
ServerUnaryCall.prototype.sendMetadata = sendMetadata;
|
||
|
ServerWritableStream.prototype.sendMetadata = sendMetadata;
|
||
|
ServerReadableStream.prototype.sendMetadata = sendMetadata;
|
||
|
ServerDuplexStream.prototype.sendMetadata = sendMetadata;
|
||
|
|
||
|
/**
|
||
|
* Get the endpoint this call/stream is connected to.
|
||
|
* @alias grpc~ServerUnaryCall#getPeer
|
||
|
* @return {string} The URI of the endpoint
|
||
|
*/
|
||
|
function getPeer() {
|
||
|
/* jshint validthis: true */
|
||
|
return this.call.getPeer();
|
||
|
}
|
||
|
|
||
|
ServerUnaryCall.prototype.getPeer = getPeer;
|
||
|
ServerReadableStream.prototype.getPeer = getPeer;
|
||
|
ServerWritableStream.prototype.getPeer = getPeer;
|
||
|
ServerDuplexStream.prototype.getPeer = getPeer;
|
||
|
|
||
|
/**
|
||
|
* Wait for the client to close, then emit a cancelled event if the client
|
||
|
* cancelled.
|
||
|
* @private
|
||
|
*/
|
||
|
function waitForCancel() {
|
||
|
/* jshint validthis: true */
|
||
|
var self = this;
|
||
|
var cancel_batch = {};
|
||
|
cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
|
||
|
self.call.startBatch(cancel_batch, function(err, result) {
|
||
|
if (err) {
|
||
|
self.emit('error', err);
|
||
|
}
|
||
|
if (result.cancelled) {
|
||
|
self.cancelled = true;
|
||
|
self.emit('cancelled');
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
|
||
|
ServerUnaryCall.prototype.waitForCancel = waitForCancel;
|
||
|
ServerReadableStream.prototype.waitForCancel = waitForCancel;
|
||
|
ServerWritableStream.prototype.waitForCancel = waitForCancel;
|
||
|
ServerDuplexStream.prototype.waitForCancel = waitForCancel;
|
||
|
|
||
|
/**
|
||
|
* Callback function passed to server handlers that handle methods with unary
|
||
|
* responses.
|
||
|
* @callback grpc.Server~sendUnaryData
|
||
|
* @param {grpc~ServiceError} error An error, if the call failed
|
||
|
* @param {*} value The response value. Must be a valid argument to the
|
||
|
* `responseSerialize` method of the method that is being handled
|
||
|
* @param {grpc.Metadata=} trailer Trailing metadata to send, if applicable
|
||
|
* @param {grpc.writeFlags=} flags Flags to modify writing the response
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* User-provided method to handle unary requests on a server
|
||
|
* @callback grpc.Server~handleUnaryCall
|
||
|
* @param {grpc~ServerUnaryCall} call The call object
|
||
|
* @param {grpc.Server~sendUnaryData} callback The callback to call to respond
|
||
|
* to the request
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* Fully handle a unary call
|
||
|
* @private
|
||
|
* @param {grpc.internal~Call} call The call to handle
|
||
|
* @param {Object} handler Request handler object for the method that was called
|
||
|
* @param {grpc~Server.handleUnaryCall} handler.func The handler function
|
||
|
* @param {grpc~deserialize} handler.deserialize The deserialization function
|
||
|
* for request data
|
||
|
* @param {grpc~serialize} handler.serialize The serialization function for
|
||
|
* response data
|
||
|
* @param {grpc.Metadata} metadata Metadata from the client
|
||
|
*/
|
||
|
function handleUnary(call, handler, metadata) {
|
||
|
var emitter = new ServerUnaryCall(call, metadata);
|
||
|
emitter.on('error', function(error) {
|
||
|
handleError(call, error);
|
||
|
});
|
||
|
emitter.waitForCancel();
|
||
|
var batch = {};
|
||
|
batch[grpc.opType.RECV_MESSAGE] = true;
|
||
|
call.startBatch(batch, function(err, result) {
|
||
|
if (err) {
|
||
|
handleError(call, err);
|
||
|
return;
|
||
|
}
|
||
|
try {
|
||
|
emitter.request = handler.deserialize(result.read);
|
||
|
} catch (e) {
|
||
|
e.code = constants.status.INTERNAL;
|
||
|
handleError(call, e);
|
||
|
return;
|
||
|
}
|
||
|
if (emitter.cancelled) {
|
||
|
return;
|
||
|
}
|
||
|
handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
|
||
|
if (err) {
|
||
|
if (trailer) {
|
||
|
err.metadata = trailer;
|
||
|
}
|
||
|
handleError(call, err);
|
||
|
} else {
|
||
|
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
|
||
|
}
|
||
|
});
|
||
|
});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* User provided method to handle server streaming methods on the server.
|
||
|
* @callback grpc.Server~handleServerStreamingCall
|
||
|
* @param {grpc~ServerWritableStream} call The call object
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* Fully handle a server streaming call
|
||
|
* @private
|
||
|
* @param {grpc.internal~Call} call The call to handle
|
||
|
* @param {Object} handler Request handler object for the method that was called
|
||
|
* @param {grpc~Server.handleServerStreamingCall} handler.func The handler
|
||
|
* function
|
||
|
* @param {grpc~deserialize} handler.deserialize The deserialization function
|
||
|
* for request data
|
||
|
* @param {grpc~serialize} handler.serialize The serialization function for
|
||
|
* response data
|
||
|
* @param {grpc.Metadata} metadata Metadata from the client
|
||
|
*/
|
||
|
function handleServerStreaming(call, handler, metadata) {
|
||
|
var stream = new ServerWritableStream(call, metadata, handler.serialize);
|
||
|
stream.waitForCancel();
|
||
|
var batch = {};
|
||
|
batch[grpc.opType.RECV_MESSAGE] = true;
|
||
|
call.startBatch(batch, function(err, result) {
|
||
|
if (err) {
|
||
|
stream.emit('error', err);
|
||
|
return;
|
||
|
}
|
||
|
try {
|
||
|
stream.request = handler.deserialize(result.read);
|
||
|
} catch (e) {
|
||
|
e.code = constants.status.INTERNAL;
|
||
|
stream.emit('error', e);
|
||
|
return;
|
||
|
}
|
||
|
handler.func(stream);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* User provided method to handle client streaming methods on the server.
|
||
|
* @callback grpc.Server~handleClientStreamingCall
|
||
|
* @param {grpc~ServerReadableStream} call The call object
|
||
|
* @param {grpc.Server~sendUnaryData} callback The callback to call to respond
|
||
|
* to the request
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* Fully handle a client streaming call
|
||
|
* @access private
|
||
|
* @param {grpc.internal~Call} call The call to handle
|
||
|
* @param {Object} handler Request handler object for the method that was called
|
||
|
* @param {grpc~Server.handleClientStreamingCall} handler.func The handler
|
||
|
* function
|
||
|
* @param {grpc~deserialize} handler.deserialize The deserialization function
|
||
|
* for request data
|
||
|
* @param {grpc~serialize} handler.serialize The serialization function for
|
||
|
* response data
|
||
|
* @param {grpc.Metadata} metadata Metadata from the client
|
||
|
*/
|
||
|
function handleClientStreaming(call, handler, metadata) {
|
||
|
var stream = new ServerReadableStream(call, metadata, handler.deserialize);
|
||
|
stream.on('error', function(error) {
|
||
|
handleError(call, error);
|
||
|
});
|
||
|
stream.waitForCancel();
|
||
|
handler.func(stream, function(err, value, trailer, flags) {
|
||
|
stream.terminate();
|
||
|
if (err) {
|
||
|
if (trailer) {
|
||
|
err.metadata = trailer;
|
||
|
}
|
||
|
handleError(call, err);
|
||
|
} else {
|
||
|
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* User provided method to handle bidirectional streaming calls on the server.
|
||
|
* @callback grpc.Server~handleBidiStreamingCall
|
||
|
* @param {grpc~ServerDuplexStream} call The call object
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* Fully handle a bidirectional streaming call
|
||
|
* @private
|
||
|
* @param {grpc.internal~Call} call The call to handle
|
||
|
* @param {Object} handler Request handler object for the method that was called
|
||
|
* @param {grpc~Server.handleBidiStreamingCall} handler.func The handler
|
||
|
* function
|
||
|
* @param {grpc~deserialize} handler.deserialize The deserialization function
|
||
|
* for request data
|
||
|
* @param {grpc~serialize} handler.serialize The serialization function for
|
||
|
* response data
|
||
|
* @param {Metadata} metadata Metadata from the client
|
||
|
*/
|
||
|
function handleBidiStreaming(call, handler, metadata) {
|
||
|
var stream = new ServerDuplexStream(call, metadata, handler.serialize,
|
||
|
handler.deserialize);
|
||
|
stream.waitForCancel();
|
||
|
handler.func(stream);
|
||
|
}
|
||
|
|
||
|
var streamHandlers = {
|
||
|
unary: handleUnary,
|
||
|
server_stream: handleServerStreaming,
|
||
|
client_stream: handleClientStreaming,
|
||
|
bidi: handleBidiStreaming
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Constructs a server object that stores request handlers and delegates
|
||
|
* incoming requests to those handlers
|
||
|
* @memberof grpc
|
||
|
* @constructor
|
||
|
* @param {Object=} options Options that should be passed to the internal server
|
||
|
* implementation
|
||
|
* @example
|
||
|
* var server = new grpc.Server();
|
||
|
* server.addProtoService(protobuf_service_descriptor, service_implementation);
|
||
|
* server.bind('address:port', server_credential);
|
||
|
* server.start();
|
||
|
*/
|
||
|
function Server(options) {
|
||
|
this.handlers = {};
|
||
|
var server = new grpc.Server(options);
|
||
|
this._server = server;
|
||
|
this.started = false;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Start the server and begin handling requests
|
||
|
*/
|
||
|
Server.prototype.start = function() {
|
||
|
if (this.started) {
|
||
|
throw new Error('Server is already running');
|
||
|
}
|
||
|
var self = this;
|
||
|
this.started = true;
|
||
|
this._server.start();
|
||
|
/**
|
||
|
* Handles the SERVER_RPC_NEW event. If there is a handler associated with
|
||
|
* the requested method, use that handler to respond to the request. Then
|
||
|
* wait for the next request
|
||
|
* @param {grpc.internal~Event} event The event to handle with tag
|
||
|
* SERVER_RPC_NEW
|
||
|
*/
|
||
|
function handleNewCall(err, event) {
|
||
|
if (err) {
|
||
|
return;
|
||
|
}
|
||
|
var details = event.new_call;
|
||
|
var call = details.call;
|
||
|
var method = details.method;
|
||
|
var metadata = Metadata._fromCoreRepresentation(details.metadata);
|
||
|
if (method === null) {
|
||
|
return;
|
||
|
}
|
||
|
self._server.requestCall(handleNewCall);
|
||
|
var handler;
|
||
|
if (self.handlers.hasOwnProperty(method)) {
|
||
|
handler = self.handlers[method];
|
||
|
} else {
|
||
|
var batch = {};
|
||
|
batch[grpc.opType.SEND_INITIAL_METADATA] =
|
||
|
(new Metadata())._getCoreRepresentation();
|
||
|
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
|
||
|
code: constants.status.UNIMPLEMENTED,
|
||
|
details: 'RPC method not implemented ' + method,
|
||
|
metadata: {}
|
||
|
};
|
||
|
batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
|
||
|
call.startBatch(batch, function() {});
|
||
|
return;
|
||
|
}
|
||
|
streamHandlers[handler.type](call, handler, metadata);
|
||
|
}
|
||
|
this._server.requestCall(handleNewCall);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Unified type for application handlers for all types of calls
|
||
|
* @typedef {(grpc.Server~handleUnaryCall
|
||
|
* |grpc.Server~handleClientStreamingCall
|
||
|
* |grpc.Server~handleServerStreamingCall
|
||
|
* |grpc.Server~handleBidiStreamingCall)} grpc.Server~handleCall
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* Registers a handler to handle the named method. Fails if there already is
|
||
|
* a handler for the given method. Returns true on success
|
||
|
* @param {string} name The name of the method that the provided function should
|
||
|
* handle/respond to.
|
||
|
* @param {grpc.Server~handleCall} handler Function that takes a stream of
|
||
|
* request values and returns a stream of response values
|
||
|
* @param {grpc~serialize} serialize Serialization function for responses
|
||
|
* @param {grpc~deserialize} deserialize Deserialization function for requests
|
||
|
* @param {('unary'|'client_stream'|'server_stream'|'bidi')} type The streaming type of method that this handles
|
||
|
* @return {boolean} True if the handler was set. False if a handler was already
|
||
|
* set for that name.
|
||
|
*/
|
||
|
Server.prototype.register = function(name, handler, serialize, deserialize,
|
||
|
type) {
|
||
|
if (this.handlers.hasOwnProperty(name)) {
|
||
|
return false;
|
||
|
}
|
||
|
this.handlers[name] = {
|
||
|
func: handler,
|
||
|
serialize: serialize,
|
||
|
deserialize: deserialize,
|
||
|
type: type
|
||
|
};
|
||
|
return true;
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Gracefully shuts down the server. The server will stop receiving new calls,
|
||
|
* and any pending calls will complete. The callback will be called when all
|
||
|
* pending calls have completed and the server is fully shut down. This method
|
||
|
* is idempotent with itself and forceShutdown.
|
||
|
* @param {function()} callback The shutdown complete callback
|
||
|
*/
|
||
|
Server.prototype.tryShutdown = function(callback) {
|
||
|
this._server.tryShutdown(callback);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Forcibly shuts down the server. The server will stop receiving new calls
|
||
|
* and cancel all pending calls. When it returns, the server has shut down.
|
||
|
* This method is idempotent with itself and tryShutdown, and it will trigger
|
||
|
* any outstanding tryShutdown callbacks.
|
||
|
*/
|
||
|
Server.prototype.forceShutdown = function() {
|
||
|
this._server.forceShutdown();
|
||
|
};
|
||
|
|
||
|
var unimplementedStatusResponse = {
|
||
|
code: constants.status.UNIMPLEMENTED,
|
||
|
details: 'The server does not implement this method'
|
||
|
};
|
||
|
|
||
|
var defaultHandler = {
|
||
|
unary: function(call, callback) {
|
||
|
callback(unimplementedStatusResponse);
|
||
|
},
|
||
|
client_stream: function(call, callback) {
|
||
|
callback(unimplementedStatusResponse);
|
||
|
},
|
||
|
server_stream: function(call) {
|
||
|
call.emit('error', unimplementedStatusResponse);
|
||
|
},
|
||
|
bidi: function(call) {
|
||
|
call.emit('error', unimplementedStatusResponse);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
function isObject(thing) {
|
||
|
return (typeof thing === 'object' || typeof thing === 'function') && thing !== null;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Add a service to the server, with a corresponding implementation.
|
||
|
* @param {grpc~ServiceDefinition} service The service descriptor
|
||
|
* @param {Object<String, grpc.Server~handleCall>} implementation Map of method
|
||
|
* names to method implementation for the provided service.
|
||
|
*/
|
||
|
Server.prototype.addService = function(service, implementation) {
|
||
|
if (!isObject(service) || !isObject(implementation)) {
|
||
|
throw new Error('addService requires two objects as arguments');
|
||
|
}
|
||
|
if (Object.keys(service).length === 0) {
|
||
|
throw new Error('Cannot add an empty service to a server');
|
||
|
}
|
||
|
if (this.started) {
|
||
|
throw new Error('Can\'t add a service to a started server.');
|
||
|
}
|
||
|
var self = this;
|
||
|
Object.keys(service).forEach(name => {
|
||
|
const attrs = service[name];
|
||
|
var method_type;
|
||
|
if (attrs.requestStream) {
|
||
|
if (attrs.responseStream) {
|
||
|
method_type = 'bidi';
|
||
|
} else {
|
||
|
method_type = 'client_stream';
|
||
|
}
|
||
|
} else {
|
||
|
if (attrs.responseStream) {
|
||
|
method_type = 'server_stream';
|
||
|
} else {
|
||
|
method_type = 'unary';
|
||
|
}
|
||
|
}
|
||
|
var impl;
|
||
|
if (implementation[name] === undefined) {
|
||
|
/* Handle the case where the method is passed with the name exactly as
|
||
|
written in the proto file, instead of using JavaScript function
|
||
|
naming style */
|
||
|
if (implementation[attrs.originalName] === undefined) {
|
||
|
common.log(constants.logVerbosity.ERROR, 'Method handler ' + name +
|
||
|
' for ' + attrs.path + ' expected but not provided');
|
||
|
impl = defaultHandler[method_type];
|
||
|
} else {
|
||
|
impl = implementation[attrs.originalName].bind(implementation);
|
||
|
}
|
||
|
} else {
|
||
|
impl = implementation[name].bind(implementation);
|
||
|
}
|
||
|
var serialize = attrs.responseSerialize;
|
||
|
var deserialize = attrs.requestDeserialize;
|
||
|
var register_success = self.register(attrs.path, impl, serialize,
|
||
|
deserialize, method_type);
|
||
|
if (!register_success) {
|
||
|
throw new Error('Method handler for ' + attrs.path +
|
||
|
' already provided.');
|
||
|
}
|
||
|
});
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Add a proto service to the server, with a corresponding implementation
|
||
|
* @deprecated Use {@link grpc.Server#addService} instead
|
||
|
* @param {Protobuf.Reflect.Service} service The proto service descriptor
|
||
|
* @param {Object<String, grpc.Server~handleCall>} implementation Map of method
|
||
|
* names to method implementation for the provided service.
|
||
|
*/
|
||
|
Server.prototype.addProtoService = util.deprecate(function(service,
|
||
|
implementation) {
|
||
|
var options;
|
||
|
var protobuf_js_5_common = require('./protobuf_js_5_common');
|
||
|
var protobuf_js_6_common = require('./protobuf_js_6_common');
|
||
|
if (protobuf_js_5_common.isProbablyProtobufJs5(service)) {
|
||
|
options = Object.assign({}, common.defaultGrpcOptions, service.grpc_options);
|
||
|
this.addService(
|
||
|
protobuf_js_5_common.getProtobufServiceAttrs(service, options),
|
||
|
implementation);
|
||
|
} else if (protobuf_js_6_common.isProbablyProtobufJs6(service)) {
|
||
|
options = Object.assign({}, common.defaultGrpcOptions, service.grpc_options);
|
||
|
this.addService(
|
||
|
protobuf_js_6_common.getProtobufServiceAttrs(service, options),
|
||
|
implementation);
|
||
|
} else {
|
||
|
// We assume that this is a service attributes object
|
||
|
this.addService(service, implementation);
|
||
|
}
|
||
|
}, 'Server#addProtoService: Use Server#addService instead');
|
||
|
|
||
|
/**
|
||
|
* Binds the server to the given port, with SSL disabled if creds is an
|
||
|
* insecure credentials object
|
||
|
* @param {string} port The port that the server should bind on, in the format
|
||
|
* "address:port"
|
||
|
* @param {grpc.ServerCredentials} creds Server credential object to be used for
|
||
|
* SSL. Pass an insecure credentials object for an insecure port.
|
||
|
* @return {number} The bound port number. Negative if binding the port failed.
|
||
|
*/
|
||
|
Server.prototype.bind = function(port, creds) {
|
||
|
if (this.started) {
|
||
|
throw new Error('Can\'t bind an already running server to an address');
|
||
|
}
|
||
|
return this._server.addHttp2Port(port, creds);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Called with the result of attempting to bind a port
|
||
|
* @callback grpc.Server~bindCallback
|
||
|
* @param {Error=} error If non-null, indicates that binding the port failed.
|
||
|
* @param {number} port The bound port number. If binding the port fails, this
|
||
|
* will be negative to match the output of bind.
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* Binds the server to the given port, with SSL disabled if creds is an
|
||
|
* insecure credentials object. Provides the result asynchronously.
|
||
|
* @param {string} port The port that the server should bind on, in the format
|
||
|
* "address:port"
|
||
|
* @param {grpc.ServerCredentials} creds Server credential object to be used for
|
||
|
* SSL. Pass an insecure credentials object for an insecure port.
|
||
|
*/
|
||
|
Server.prototype.bindAsync = function(port, creds, callback) {
|
||
|
/* This can throw. We do not try to catch that error because it indicates an
|
||
|
* incorrect use of the function, which should not be surfaced asynchronously
|
||
|
*/
|
||
|
const result = this.bind(port, creds)
|
||
|
if (result < 0) {
|
||
|
setImmediate(callback, new Error('Failed to bind port'), result);
|
||
|
} else {
|
||
|
setImmediate(callback, null, result);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
exports.Server = Server;
|