IN THIS ARTICLE
Streams are a very powerful concept. They are programming constructs which move data between objects and services without having to manually loop through the data, which is one reason why they are so popular. Twitter has streams, NodeJS has streams, Java 8 has streams, and even RSS feeds can be represented as streams. PubNub also has streams, though we call them channels. The great thing about streams is that you can pipe them together to do interesting things.
With just a few Node modules, we can stream a realtime list of all tweets matching a hashtag into a PubNub channel. From there we can do all sorts of things including archiving for later analysis, distributing the stream to millions of people at once, or data visualization. For example, say you want to draw a map of the US showing which states are tweeting different candidates in the 2016 election. In this blog we will pipe a stream of tweets matching a particular hashtag into a PubNub channel using NodeJS streams.
A Single Line of Code
Our end goal is to make this single line of code work:
new TwitterStream(twcfg,"#twitter").pipe(new PubNubOutStream(pncfg,"awesome-tweets"))
This line creates an ongoing Twitter query for tweets containing the hashtag #twitter, then pipes it into the PubNub channel called awesome-tweets.
Node comes with many stream types built in, such as files and HTTP sockets, but we can build our own by subclassing Readable/Writable and overriding a function or two. Here we will use NodeJS input and output structures to pipe tweets from Twitter into a PubNub channel.
Make a LogStream
Here is a simple output stream which logs everything it receives to the console. It’s a simple object which overrides its _write method with a new one that prints to the console.
function LogStream() {
Writable.call(this,{objectMode:true});
this._write = function(obj,encoding,callback) {
console.log("LOG",util.inspect(obj,{depth:0}));
callback();
};
}
util.inherits(LogStream,Writable);
We can tell that LogStream is an output stream because it subclasses Writable
. An input stream would subclass Readable
. Notice that the LogStream calls Writable.call
(its ‘parent class’ constructor) with objectMode
set to true. This tells Node that we are working with a stream of objects rather than a stream of bytes. Also notice that the _write
method does its work, printing to the console, then invokes the callback
function that was passed to it. This callback is very important. It’s what lets the stream system pass control back and forth from reader to writer without overflowing any internal buffers.
Make a TwitterStream
Now we need to create a Twitter input stream to pipe into LogStream. We will use the aptly named twitter
module, which provides full access to the Twitter APIs. Most of the APIs are REST based, meaning you make a request and get back a single response. However, Twitter also has a streaming API to access realtime hashtag streams. This will open a connection to Twitter and keep it open, so that’s the one we want.
The code below makes a simple call to the Twitter API.
var Twitter = require('twitter');
var client = new Twitter(cfg);
client.stream('statuses/filter', {track: query}, function(stream) {
stream.on('data', function(tweet) {
console.log(“got a tweet”,tweet);
});
stream.on('error', function(error) {
console.log("got an error",error);
});
});
Every time a new tweet comes in from the stream it invokes the data
callback function to print that tweet to the console. This works, but what we really want is to output those tweets as a proper stream so we can hook it up to our LogStream from above. To make a readable stream we need a single function which extends stream.Readable
and overrides the _read
function, or else calls the push()
method. In our case we want to call push
whenever a new tweet comes in. Here’s the expanded code:
var Twitter = require('twitter');
var util = require('util');
var Readable = require('stream').Readable;
function TwitterStream(cfg, query) {
Readable.call(this,{objectMode:true});
var client = new Twitter(cfg);
this._read = function() { /* do nothing */ };
var self = this;
function connect() {
client.stream('statuses/filter', {track: query},
function(stream) {
stream.on('data', (tweet) => self.push(tweet));
stream.on('error', (error) => connect());
});
}
connect();
}
util.inherits(TwitterStream, Readable);
This contains the same code as before, but wrapped up inside the TwitterStream
function. Every time a tweet comes in the function will call push on itself. This adds the tweet to the Readable
’s internal data store, which will then make the data be pipe-able.
Now we can output the Twitter stream to the console by piping them together like this:
var twcfg = {
consumer_key:"...",
consumer_secret:"...",
access_token_key:"...",
access_token_secret:".."
};
new TwitterStream(cfg,"#twitter").pipe(new LogStream());
Remember to put in your own app keys from Twitter’s Application Management webpage.
And we get the output
node twitter2pubnub/example.js
LOG { created_at: 'Mon Apr 11 17:36:14 +0000 2016',
id: 719579827968286700,
id_str: '719579827968286720',
text: 'RT @UpdatesTiniPL: [Foto] Nowe zdjęcia #TiniStoessel udostępnione przez serwis #Twitter z fanami 11.04 #3 https://t.co/oB82Z0unqf',
source: 'Twitter for Android',
truncated: false,
in_reply_to_status_id: null,
in_reply_to_status_id_str: null,
in_reply_to_user_id: null,
in_reply_to_user_id_str: null,
in_reply_to_screen_name: null,
user: [Object],
geo: null,
coordinates: null,
place: null,
contributors: null,
retweeted_status: [Object],
is_quote_status: false,
retweet_count: 0,
favorite_count: 0,
entities: [Object],
extended_entities: [Object],
favorited: false,
retweeted: false,
possibly_sensitive: false,
filter_level: 'low',
lang: 'pl',
timestamp_ms: '1460396174791' }
New Line Delimited JSON
That works quite well. Let’s try another output stream. Instead of printing to the console let’s write a log file to disk. We will use a log format called Newline Delimited JSON, which is just what it sounds like. Each line is a complete JSON object, with newlines separating entries. It’s an extremely simple format with reasonable performance.
var Writable = require('stream').Writable;
var util = require('util');
var fs = require('fs');
function NDJSONOutStream(path) {
Writable.call(this,{objectMode:true});
var fout = fs.createWriteStream(path,{flags:'a'});
this._write = function(obj,encoding,callback) {
fout.write(JSON.stringify(obj)+"
",callback);
};
}
util.inherits(NDJSONOutStream,Writable);
The code for NDJSONOutStream
looks very similar to LogStream, again we set objectMode
to true. Note that we have nested callbacks here. Each time _write
is called we write to the underlying file on disk. When write
completes then we call callback()
. This ensures another call to _write
doesn’t happen until the previous one is finished.
Now we can stream tweets to a disk file like this:
new TwitterStream(twcfg,"#twitter").pipe(new NDJSONOutStream("mylog.ndjson"));
Stream to PubNub
Now that we can store it, let’s pipe the Twitter stream into a real PubNub channel. Channels are PubNub’s equivalent of a NodeJS stream, and they allow both reading and writing. The code below creates a PubNubOutStream
which accepts a standard PubNub config object (remember to put in your own free app keys), plus the name of a channel to publish to.
var pubnub = require("pubnub");
var pncfg = {
ssl : true, // enable TLS Tunneling over TCP
publish_key : "Enter-your-publish-key",
subscribe_key : "Enter-your-subscribe-key"
};
function PubNubOutStream(cfg, channel) {
Writable.call(this,{objectMode:true});
var pn = pubnub(cfg);
this._write = function(obj,encoding,callback) {
pn.publish({
channel: channel,
message: obj,
callback: () => callback()
});
};
}
util.inherits(PubNubOutStream, Writable);
Now we can pipe a tweet stream into a PubNub channel like this:
new TwitterStream(twcfg,"#twitter").pipe(new PubNubOutStream(pncfg,"awesome-tweets"))
And that’s it. An easy way to view these tweets is through the PubNub Debug Console. It provides a simple way to debug your entire PubNub implementation.
NodeJS streams are a very powerful way to move data in and out of PubNub channels. From here you could add sentiment analysis, data visualization, geo tracking, or all three! Streams will become even more powerful in the future when they are combined with PubNub BLOCKS, coming this summer.
To learn more about NodeJS streams check out the Stream Handbook.