Added pubsub demo client

This commit is contained in:
murgatroid99 2015-02-23 10:26:01 -08:00
parent fb0f4ddb00
commit ddce31ab43
3 changed files with 38 additions and 33 deletions

View File

@ -34,8 +34,8 @@
syntax = "proto2"; syntax = "proto2";
import "examples/pubsub/empty.proto"; import "empty.proto";
import "examples/pubsub/label.proto"; import "label.proto";
package tech.pubsub; package tech.pubsub;

View File

@ -28,7 +28,10 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
var async = require('async'); var async = require('async');
var fs = require('fs');
var GoogleAuth = require('googleauth');
var parseArgs = require('minimist'); var parseArgs = require('minimist');
var strftime = require('strftime');
var _ = require('underscore'); var _ = require('underscore');
var grpc = require('../..'); var grpc = require('../..');
var PROTO_PATH = __dirname + '/pubsub.proto'; var PROTO_PATH = __dirname + '/pubsub.proto';
@ -45,7 +48,7 @@ PubsubRunner.prototype.getTestTopicName = function() {
if (this.args.topic_name) { if (this.args.topic_name) {
return base_name + this.args.topic_name; return base_name + this.args.topic_name;
} }
var now_text = new Date().toLocaleFormat('%Y%m%d%H%M%S%L'); var now_text = strftime('%Y%m%d%H%M%S%L');
return base_name + process.env.USER + '-' + now_text; return base_name + process.env.USER + '-' + now_text;
}; };
@ -54,7 +57,7 @@ PubsubRunner.prototype.getTestSubName = function() {
if (this.args.sub_name) { if (this.args.sub_name) {
return base_name + this.args.sub_name; return base_name + this.args.sub_name;
} }
var now_text = new Date().toLocaleFormat('%Y%m%d%H%M%S%L'); var now_text = strftime('%Y%m%d%H%M%S%L');
return base_name + process.env.USER + '-' + now_text; return base_name + process.env.USER + '-' + now_text;
}; };
@ -77,6 +80,7 @@ PubsubRunner.prototype.topicExists = function(name, callback) {
}; };
PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) { PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
var self = this;
this.topicExists(name, function(err, exists) { this.topicExists(name, function(err, exists) {
if (err) { if (err) {
callback(err); callback(err);
@ -84,7 +88,7 @@ PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
if (exists) { if (exists) {
callback(null); callback(null);
} else { } else {
this.pub.createTopic({name: name}, callback); self.pub.createTopic({name: name}, callback);
} }
} }
}); });
@ -153,45 +157,48 @@ PubsubRunner.prototype.checkExists = function(callback) {
}; };
PubsubRunner.prototype.randomPubSub = function(callback) { PubsubRunner.prototype.randomPubSub = function(callback) {
var self = this;
var topic_name = this.getTestTopicName(); var topic_name = this.getTestTopicName();
var sub_name = this.getTestSubName(); var sub_name = this.getTestSubName();
var subscription = {name: sub_name, topic: topic_name}; var subscription = {name: sub_name, topic: topic_name};
async.waterfall([ async.waterfall([
_.bind(this.createTopicIfNeeded, this, topic_name), _.bind(this.createTopicIfNeeded, this, topic_name),
_.bind(this.sub.createSubscription, this, subscription), _.bind(this.sub.createSubscription, this.sub, subscription),
function(resp, cb) { function(resp, cb) {
var msg_count = _.random(10, 30); var msg_count = _.random(10, 30);
// Set up msg_count messages to publish // Set up msg_count messages to publish
var message_senders = _.times(msg_count, function(n) { var message_senders = _.times(msg_count, function(n) {
return _.bind(this.pub.publish, this.pub, { return _.bind(self.pub.publish, self.pub, {
topic: topic_name, topic: topic_name,
message: {data: 'message ' + n} message: {data: new Buffer('message ' + n)}
}); });
}); });
async.parallel(message_senders, cb); async.parallel(message_senders, function(err, result) {
cb(err, result, msg_count);
});
}, },
function(result, cb) { function(result, msg_count, cb) {
console.log('Sent', msg_count, 'messages to', topic_name + ',', console.log('Sent', msg_count, 'messages to', topic_name + ',',
'checking for them now.'); 'checking for them now.');
var batch_request = { var batch_request = {
subscription: sub_name, subscription: sub_name,
max_events: msg_count max_events: msg_count
}; };
this.sub.pull_batch(batch_request, cb); self.sub.pullBatch(batch_request, cb);
}, },
function(batch, cb) { function(batch, cb) {
var ack_ids = _.pluck(batch.pull_responses, 'ack_id'); var ack_id = _.pluck(batch.pull_responses, 'ack_id');
console.log('Got', ack_ids.length, 'messages, acknowledging them...'); console.log('Got', ack_id.length, 'messages, acknowledging them...');
var ack_request = { var ack_request = {
subscription: sub_name, subscription: sub_name,
ack_ids: ack_ids ack_id: ack_id
}; };
this.sub.acknowledge(ack_request, cb); self.sub.acknowledge(ack_request, cb);
}, },
function(result, cb) { function(result, cb) {
console.log( console.log(
'Test messages were acknowledged OK, deleting the subscription'); 'Test messages were acknowledged OK, deleting the subscription');
this.sub.delete({subscription: sub_name}, cb); self.sub.deleteSubscription({subscription: sub_name}, cb);
} }
], function (err, result) { ], function (err, result) {
if (err) { if (err) {
@ -213,23 +220,23 @@ function main(callback) {
'sub_name' 'sub_name'
], ],
default: { default: {
host: 'pubsub-testing.googleapis.com', host: 'pubsub-staging.googleapis.com',
oauth_scope: 'https://www.googleapis.com/auth/pubsub', oauth_scope: 'https://www.googleapis.com/auth/pubsub',
port: 443, port: 443,
action: 'all', action: 'listSomeTopics',
project: 'stoked-keyword-656' project_id: 'stoked-keyword-656'
} }
}); });
var valid_actions = [ var valid_actions = [
'removeTopic',
'createTopic', 'createTopic',
'listSomeTopic', 'removeTopic',
'listSomeTopics',
'checkExists', 'checkExists',
'randomPubSub' 'randomPubSub'
]; ];
if (!(argv.action === 'all' || _.some(valid_actions, function(action) { if (_.some(valid_actions, function(action) {
return action === argv.action; return action === argv.action;
}))) { })) {
callback(new Error('Action was not valid')); callback(new Error('Action was not valid'));
} }
var address = argv.host + ':' + argv.port; var address = argv.host + ':' + argv.port;
@ -249,17 +256,14 @@ function main(callback) {
return; return;
} }
var ssl_creds = grpc.Credentials.createSsl(ca_data); var ssl_creds = grpc.Credentials.createSsl(ca_data);
var options = {credentials: ssl_creds}; var options = {
credentials: ssl_creds,
'grpc.ssl_target_name_override': argv.host
};
var pub = new pubsub.PublisherService(address, options, updateMetadata); var pub = new pubsub.PublisherService(address, options, updateMetadata);
var sub = new pubsub.SubscriberService(address, options, updateMetadata); var sub = new pubsub.SubscriberService(address, options, updateMetadata);
var runner = new PubsubRunner(pub, sub, argv); var runner = new PubsubRunner(pub, sub, argv);
if (argv.action === 'all') { runner[argv.action](callback);
async.series(_.map(valid_actions, function(name) {
_.bind(runner[name], runner);
}), callback);
} else {
runner[argv.action](callback);
}
}); });
}); });
} }

View File

@ -15,9 +15,10 @@
"underscore.string": "^3.0.0" "underscore.string": "^3.0.0"
}, },
"devDependencies": { "devDependencies": {
"mocha": "~1.21.0", "googleauth": "google/google-auth-library-nodejs",
"minimist": "^1.1.0", "minimist": "^1.1.0",
"googleauth": "google/google-auth-library-nodejs" "mocha": "~1.21.0",
"strftime": "^0.8.2"
}, },
"files": [ "files": [
"README.md", "README.md",