mirror of https://github.com/grpc/grpc-node.git
286 lines
8.5 KiB
JavaScript
286 lines
8.5 KiB
JavaScript
/*
|
|
*
|
|
* Copyright 2015, Google Inc.
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are
|
|
* met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above
|
|
* copyright notice, this list of conditions and the following disclaimer
|
|
* in the documentation and/or other materials provided with the
|
|
* distribution.
|
|
* * Neither the name of Google Inc. nor the names of its
|
|
* contributors may be used to endorse or promote products derived from
|
|
* this software without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*
|
|
*/
|
|
|
|
'use strict';
|
|
|
|
var async = require('async');
|
|
var fs = require('fs');
|
|
var GoogleAuth = require('google-auth-library');
|
|
var parseArgs = require('minimist');
|
|
var strftime = require('strftime');
|
|
var _ = require('underscore');
|
|
var grpc = require('../..');
|
|
var PROTO_PATH = __dirname + '/pubsub.proto';
|
|
var pubsub = grpc.load(PROTO_PATH).tech.pubsub;
|
|
|
|
function PubsubRunner(pub, sub, args) {
|
|
this.pub = pub;
|
|
this.sub = sub;
|
|
this.args = args;
|
|
}
|
|
|
|
PubsubRunner.prototype.getTestTopicName = function() {
|
|
var base_name = '/topics/' + this.args.project_id + '/';
|
|
if (this.args.topic_name) {
|
|
return base_name + this.args.topic_name;
|
|
}
|
|
var now_text = strftime('%Y%m%d%H%M%S%L');
|
|
return base_name + process.env.USER + '-' + now_text;
|
|
};
|
|
|
|
PubsubRunner.prototype.getTestSubName = function() {
|
|
var base_name = '/subscriptions/' + this.args.project_id + '/';
|
|
if (this.args.sub_name) {
|
|
return base_name + this.args.sub_name;
|
|
}
|
|
var now_text = strftime('%Y%m%d%H%M%S%L');
|
|
return base_name + process.env.USER + '-' + now_text;
|
|
};
|
|
|
|
PubsubRunner.prototype.listProjectTopics = function(callback) {
|
|
var q = ('cloud.googleapis.com/project in (/projects/' +
|
|
this.args.project_id + ')');
|
|
this.pub.listTopics({query: q}, callback);
|
|
};
|
|
|
|
PubsubRunner.prototype.topicExists = function(name, callback) {
|
|
this.listProjectTopics(function(err, response) {
|
|
if (err) {
|
|
callback(err);
|
|
} else {
|
|
callback(null, _.some(response.topic, function(t) {
|
|
return t.name === name;
|
|
}));
|
|
}
|
|
});
|
|
};
|
|
|
|
PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
|
|
var self = this;
|
|
this.topicExists(name, function(err, exists) {
|
|
if (err) {
|
|
callback(err);
|
|
} else{
|
|
if (exists) {
|
|
callback(null);
|
|
} else {
|
|
self.pub.createTopic({name: name}, callback);
|
|
}
|
|
}
|
|
});
|
|
};
|
|
|
|
PubsubRunner.prototype.removeTopic = function(callback) {
|
|
var name = this.getTestTopicName();
|
|
console.log('... removing Topic', name);
|
|
this.pub.deleteTopic({topic: name}, function(err, value) {
|
|
if (err) {
|
|
console.log('Could not delete a topic: rpc failed with', err);
|
|
callback(err);
|
|
} else {
|
|
console.log('removed Topic', name, 'OK');
|
|
callback(null);
|
|
}
|
|
});
|
|
};
|
|
|
|
PubsubRunner.prototype.createTopic = function(callback) {
|
|
var name = this.getTestTopicName();
|
|
console.log('... creating Topic', name);
|
|
this.pub.createTopic({name: name}, function(err, value) {
|
|
if (err) {
|
|
console.log('Could not create a topic: rpc failed with', err);
|
|
callback(err);
|
|
} else {
|
|
console.log('created Topic', name, 'OK');
|
|
callback(null);
|
|
}
|
|
});
|
|
};
|
|
|
|
PubsubRunner.prototype.listSomeTopics = function(callback) {
|
|
console.log('Listing topics');
|
|
console.log('-------------_');
|
|
this.listProjectTopics(function(err, response) {
|
|
if (err) {
|
|
console.log('Could not list topic: rpc failed with', err);
|
|
callback(err);
|
|
} else {
|
|
_.each(response.topic, function(t) {
|
|
console.log(t.name);
|
|
});
|
|
callback(null);
|
|
}
|
|
});
|
|
};
|
|
|
|
PubsubRunner.prototype.checkExists = function(callback) {
|
|
var name = this.getTestTopicName();
|
|
console.log('... checking for topic', name);
|
|
this.topicExists(name, function(err, exists) {
|
|
if (err) {
|
|
console.log('Could not check for a topics: rpc failed with', err);
|
|
callback(err);
|
|
} else {
|
|
if (exists) {
|
|
console.log(name, 'is a topic');
|
|
} else {
|
|
console.log(name, 'is not a topic');
|
|
}
|
|
callback(null);
|
|
}
|
|
});
|
|
};
|
|
|
|
PubsubRunner.prototype.randomPubSub = function(callback) {
|
|
var self = this;
|
|
var topic_name = this.getTestTopicName();
|
|
var sub_name = this.getTestSubName();
|
|
var subscription = {name: sub_name, topic: topic_name};
|
|
async.waterfall([
|
|
_.bind(this.createTopicIfNeeded, this, topic_name),
|
|
_.bind(this.sub.createSubscription, this.sub, subscription),
|
|
function(resp, cb) {
|
|
var msg_count = _.random(10, 30);
|
|
// Set up msg_count messages to publish
|
|
var message_senders = _.times(msg_count, function(n) {
|
|
return _.bind(self.pub.publish, self.pub, {
|
|
topic: topic_name,
|
|
message: {data: new Buffer('message ' + n)}
|
|
});
|
|
});
|
|
async.parallel(message_senders, function(err, result) {
|
|
cb(err, result, msg_count);
|
|
});
|
|
},
|
|
function(result, msg_count, cb) {
|
|
console.log('Sent', msg_count, 'messages to', topic_name + ',',
|
|
'checking for them now.');
|
|
var batch_request = {
|
|
subscription: sub_name,
|
|
max_events: msg_count
|
|
};
|
|
self.sub.pullBatch(batch_request, cb);
|
|
},
|
|
function(batch, cb) {
|
|
var ack_id = _.pluck(batch.pull_responses, 'ack_id');
|
|
console.log('Got', ack_id.length, 'messages, acknowledging them...');
|
|
var ack_request = {
|
|
subscription: sub_name,
|
|
ack_id: ack_id
|
|
};
|
|
self.sub.acknowledge(ack_request, cb);
|
|
},
|
|
function(result, cb) {
|
|
console.log(
|
|
'Test messages were acknowledged OK, deleting the subscription');
|
|
self.sub.deleteSubscription({subscription: sub_name}, cb);
|
|
}
|
|
], function (err, result) {
|
|
if (err) {
|
|
console.log('Could not do random pub sub: rpc failed with', err);
|
|
}
|
|
callback(err, result);
|
|
});
|
|
};
|
|
|
|
function main(callback) {
|
|
var argv = parseArgs(process.argv, {
|
|
string: [
|
|
'host',
|
|
'oauth_scope',
|
|
'port',
|
|
'action',
|
|
'project_id',
|
|
'topic_name',
|
|
'sub_name'
|
|
],
|
|
default: {
|
|
host: 'pubsub-staging.googleapis.com',
|
|
oauth_scope: 'https://www.googleapis.com/auth/pubsub',
|
|
port: 443,
|
|
action: 'listSomeTopics',
|
|
project_id: 'stoked-keyword-656'
|
|
}
|
|
});
|
|
var valid_actions = [
|
|
'createTopic',
|
|
'removeTopic',
|
|
'listSomeTopics',
|
|
'checkExists',
|
|
'randomPubSub'
|
|
];
|
|
if (_.some(valid_actions, function(action) {
|
|
return action === argv.action;
|
|
})) {
|
|
callback(new Error('Action was not valid'));
|
|
}
|
|
var address = argv.host + ':' + argv.port;
|
|
(new GoogleAuth()).getApplicationDefault(function(err, credential) {
|
|
if (err) {
|
|
callback(err);
|
|
return;
|
|
}
|
|
if (credential.createScopedRequired()) {
|
|
credential = credential.createScoped(argv.oauth_scope);
|
|
}
|
|
var updateMetadata = grpc.getGoogleAuthDelegate(credential);
|
|
var ca_path = process.env.SSL_CERT_FILE;
|
|
fs.readFile(ca_path, function(err, ca_data) {
|
|
if (err) {
|
|
callback(err);
|
|
return;
|
|
}
|
|
var ssl_creds = grpc.Credentials.createSsl(ca_data);
|
|
var options = {
|
|
credentials: ssl_creds,
|
|
'grpc.ssl_target_name_override': argv.host
|
|
};
|
|
var pub = new pubsub.PublisherService(address, options, updateMetadata);
|
|
var sub = new pubsub.SubscriberService(address, options, updateMetadata);
|
|
var runner = new PubsubRunner(pub, sub, argv);
|
|
runner[argv.action](callback);
|
|
});
|
|
});
|
|
}
|
|
|
|
if (require.main === module) {
|
|
main(function(err) {
|
|
if (err) {
|
|
throw err;
|
|
}
|
|
});
|
|
}
|
|
|
|
module.exports = PubsubRunner;
|