PubSub not receiving Events/Messages for Topics


#1

I’ve tried each of the forms of OAuth token generation, the bot connects, but neither of the Topics I’m listening to do I receive Messages for. Just Ping & Pong. Not even Whispers. I feel like I’m really close to getting this working but I’ve exhausted everything I can think of.

Scripted for Node.js, uses “websocket” and “request” Node.js packages. API Ext is a custom AJAX wrapper I wrote for Helix/Kraken Twitch APIs. The design of the PubSub class object is to take in the User/Channel names from the config and resolve them into IDs using https://api.twitch.tv/helix/users?login={0} and https://api.twitch.tv/api/channels/{0} APIs.

Please tell me what I’m doing wrong.


var WebSocket = require('websocket').w3cwebsocket;
var apiExt = require('./apiext.js');
var request = require('request');

var apiExtInstance = new apiExt();

var PubSubBot = (function PubSubBot(state, $event) {
    var $url = state.url || 'wss://pubsub-edge.twitch.tv';

    var $scopes = [
        'bits:read',
        'chat:edit',
        'chat:read',
        'whispers:edit',
        'whispers:read',
        'channel:moderate',
        'channel:read:subscriptions',
        'channel_subscriptions',
        'moderation:read',
        'channel_read',
        'user_read',
    ];

    var $pingTimeout = null;
    var $pongTimeout = null;
    var $socket = null;

    var $outboundQueue = null;

    var $sendInterval = setInterval(function () {
        if ($socket && ($socket.readyState === $socket.CLOSED || $socket.readyState === $socket.CLOSING)) {
            setTimeout(connect, 1);

            return;
        }
        else if (!$outboundQueue || $outboundQueue.length < 1 || !$socket || $socket.readyState !== $socket.OPEN) {
            return;
        }

        var message = $outboundQueue.pop();

        if (message) {
            $socket.send(message);
        }
    }, 550);

    newSession(function () {
        if (state.user_id === 0) {
            apiExtInstance.getUserInfo(state.user_name, state.client_id, function (error, response, body) {
                state.user_id = body.data[0].id;

                if (state.channel_id !== 0) {
                    setTimeout(connect, 1);
                }
            });
        }

        if (state.channel_id === 0) {
            apiExtInstance.getChannelInfo(state.channel_name, state.client_id, function (error, response, body) {
                state.channel_id = body._id;

                if (state.user_id !== 0) {
                    setTimeout(connect, 1);
                }
            });
        }
    });

    function htmlDecode(input) {
        return input.replace(/&#(\d+);/g, function (match, dec) {
            return String.fromCharCode(dec);
        });
    }

    function newSession(callback) {
        if (!state.token) {
            var parameters = [
                'response_type=code',
                'redirect_uri=' + encodeURIComponent('http://localhost'),
                'client_id=' + state.client_id,
                'scope=' + $scopes.join('+'),
            ];

            console.log('https://id.twitch.tv/oauth2/authorize?' + parameters.join('&'));

            return;
        }

        //var parameters = [
        //    'response_type=token',
        //    'redirect_uri=' + encodeURIComponent('http://localhost'),
        //    'client_id=' + state.client_id,
        //    'scope=' + $scopes.join('+'),
        //];

        //console.log('https://id.twitch.tv/oauth2/authorize?' + parameters.join('&'));

        //request.get(
        //    'https://id.twitch.tv/oauth2/authorize?' + parameters.join('&'),
        //    {},
        //    function (error, response, body) {
        //        if (!error && response.statusCode === 200) {
        //            var match, authenticity_token, redirect_path;

        //            if (match = body.match(/id=\"authenticity_token\" type=\"hidden\" value=\"([^\"]+)\"/)) {
        //                authenticity_token = htmlDecode(match[1]);
        //            }

        //            if (match = body.match(/name=\"redirect_path\" value=\"([^\"]+)\"/)) {
        //                redirect_path = htmlDecode(match[1]);
        //            }

        //            request.post(
        //                'https://id.twitch.tv/oauth2/authorize',
        //                {
        //                    authenticity_token: authenticity_token,
        //                    redirect_path: redirect_path,
        //                    client_id: state.client_id,
        //                    origin_uri: '',
        //                    embed: false,
        //                    username: state.user_name,
        //                    password: state.password,
        //                },
        //                function (error, response, body) {
        //                    console.log(body);

        //                    if (!error && response.statusCode === 200) {
        //                    }
        //                });
        //        }
        //    });

        //var parameters = [
        //    'grant_type=authorization_code',
        //    'code=' + state.token,
        //    'client_id=' + state.client_id,
        //    'client_secret=' + state.secret,
        //    'redirect_uri=' + encodeURIComponent('http://localhost'),
        //];

        //request.post(
        //    'https://id.twitch.tv/oauth2/token?' + parameters.join('&'),
        //    {},
        //    function (error, response, body) {
        //        if (!error && response.statusCode === 200) {
        //            state.oauth = body.access_token;
        //            state.refresh_token = body.refresh_token;

        //            var t = new Date();
        //            t.setSeconds(t.getSeconds() + body.expires_in);

        //            state.expires = t;

        //            if (callback && typeof callback === 'function') {
        //                callback();
        //            }
        //        }
        //    });

        //var parameters = [
        //	'grant_type=client_credentials',
        //	'client_id=' + state.client_id,
        //	'client_secret=' + state.secret,
        //  'scope=' + $scopes.join('+'),
        //];

        //request.post(
        //    'https://id.twitch.tv/oauth2/token?' + parameters.join('&'),
        //    {},
        //    function (error, response, body) {
        //        if (!error && response.statusCode === 200) {
        //            body = JSON.parse(body);

        //            state.oauth = body.access_token;

        //            var t = new Date();
        //            t.setSeconds(t.getSeconds() + body.expires_in);

        //            state.expires = t;

        //            if (callback && typeof callback === 'function') {
        //                callback();
        //            }
        //        }
        //    });

        if (callback && typeof callback === 'function') {
            callback();
        }
    }

    function init() {
        if ($socket !== null) {
            if ($socket.readyState === $socket.CONNECTING || $socket.readyState === $socket.OPEN) close();

            delete $socket;
        }

        $outboundQueue = [];

        $socket = new WebSocket($url);

        $socket.onopen = socketOpen;
        $socket.onclose = socketClose;
        $socket.onmessage = socketMessage;
        $socket.onerror = socketError;
    }

    function connect() {
        if (state.user_id === 0 || state.channel_id === 0) return;

        if (state.expires === null || state.expires.getTime() < (new Date()).getTime()) {
            newSession(function () {
                init();
            });
        } else {
            init();
        }
    }

    function ping() {
        send({ type: 'PING' });
        $pingTimeout = setTimeout(ping, 3 * 60 * 1000);
        $pongTimeout = setTimeout(function () {
            console.log('$pongTimeout');

            setTimeout(connect, 1);
        }, 10000); // if pong isn't received within 10 seconds, reconnect
    }

    function send(message) {
        if (typeof message === 'object' || Array.isArray(message)) {
            message = JSON.stringify(message);
        }

        switch ($socket.readyState) {
            case $socket.CLOSING:
            case $socket.CLOSED:
                console.log('$socket.CLOSED');

                setTimeout(connect, 1);

                $outboundQueue.push(message);

                break;
            case $socket.OPEN:
            case $socket.CONNECTING:
                $outboundQueue.push(message);

                break;
            default:
                break;
        }
    }

    function close() {
        $socket.close();

        $outboundQueue = [];
    }

    function socketOpen() {
        console.log('socketOpen');

        var frame = {
            type: 'LISTEN',
            nonce: 'listenToTopics-' + (Math.random() * 100),
            data: {
                topics: [
                    'channel-bits-events-v1.' + state.channel_id,
                    'channel-subscribe-events-v1.' + state.channel_id,
                    'chat_moderator_actions.' + state.user_id + '.' + state.channel_id,
                    'whispers.' + state.user_id,
                ],
                auth_token: state.oauth,
            },
        };

        console.log(frame);

        send(frame);

        ping();
    }

    function socketMessage(event) {
        var message = JSON.parse(event.data);

        switch (message.type) {
            case 'PONG':
                clearTimeout($pongTimeout);

                $pongTimeout = null;

                break;
            case 'RESPONSE':
                break;
            case 'RECONNECT':
                console.log('RECONNECT');

                setTimeout(connect, 1);

                break;
            case 'MESSAGE':
                console.log('MESSAGE');
                console.log(message);

                parseMessage(message.data);

                break;
            default:
                break;
        }
    }

    function socketClose() {
        console.log('socketClose');

        setTimeout(connect, 1);
    }

    function socketError(error) {
        console.error('socketError');
        //console.error(error);
    }

    function parseMessage(data) {
        var message = JSON.parse(data.message);

        switch (data.topic) {
            // https://dev.twitch.tv/docs/v5/guides/PubSub/
            case 'channel-bits-events-v1.' + state.channel_id:
                $event('bits', message);

                break;
            // https://discuss.dev.twitch.tv/t/in-line-broadcaster-chat-mod-logs/7281/12
            case 'chat_moderator_actions.' + state.user_id + '.' + state.channel_id:
                $event('moderation', message);

                break;
            case 'whispers.' + state.user_id:
                $event('whisper', message);

                break;
            case 'channel-subscribe-events-v1.' + state.channel_id:
                $event('sub', message);

                break;
            default:
                break;
        }
    }
});

if (typeof module !== "undefined" && module.exports) {
    module.exports = PubSubBot;
}
if (typeof window !== "undefined") {
    window.PubSubBot = PubSubBot;
}

var pubSub = require('./pubsub.js');

var $config = {
	user_id: 0,
	user_name: 'Bot's Username',
	channel_id: 0,
	channel_name: 'ChannelName',
	client_id: 'ClientID Token',
    secret: 'Secret Token',
    expires: null,
    oauth: 'OAuth Token',
    token: 'Code Token',
};

var $event = function(type, message) {
	switch (type) {
		case 'bits':

			break;
		case 'moderation':

			break;
		case 'whisper':

			break;
		case 'sub':

			break;
	}

	console.log(type);
	console.log(message);
};

var pubSubInstance = new pubSub($config, $event);

#2

When you got your RESPONSE did you check of the “error” was “”

An empty error means “ok”, but a non empty error will describe an error if one is present:

No error:

{ type: 'RESPONSE',
  error: '',
  nonce: 'BCarlyon-asdasdasd-1568630369877' }

Has Error

{ type: 'RESPONSE',
  error: 'ERR_BADAUTH',
  nonce: 'BCarlyon-adasdasd-1568630369993' }

Given most of your oAuth logic is commented out I’m assuming you have something else handling the oAuth side (step one of an oAuth is redirect the user offsite can’t be done progmatically)

I don’t really see anything else wrong with your code.

The only difference between your code and mine is that I’m using require('ws') rather than require('websocket').w3cwebsocket;

And my oAuth logic is no where near the PubSub handler


#3

I switched the WebSocket packages over to ws, same behavior. For awhile I was generating the user redirect string, copying and pasting it out of CMD/Console, pasting it into the browser, going through the motions, then pasting the OAuth or code from the hash portion of the Url in the Addressbar.

If I go through all of that, the bot connects but I never receive any events/messages for the topics I’m listening to. Nothing I change affects the behavior of the outcome. No errors, it just doesn’t work.


#4

Then if you are getting a message of type RESPONSE with a blank/empty error then the problem must be in your code somewhere else. Either it’s stalling or unable to process the incoming messages correctly.

My own pubsub handler is working as expected. The only difference we have is that you are using code that matches W3C WebSocket API and I am not.

You have

$socket.onmessage = socketMessage;

Where I have instead

ws.on(‘message’, function(msg) { // do stuff with msg });

I’ve not tried using the W3C method with the Twitch PubSub to confirm if that works.


#5

Now I’m getting a RESPONSE BAD_AUTH error message. Thanks this got me to the next step. Sorry still in the tinkering phase.

Many thanks!


#6

The response_type=token OAuth method is working for me now (Profit):

MESSAGE
{ type: ‘MESSAGE’,
data:
{ topic: ‘whispers.123’,
message:
'{“type”:“whisper_sent”,“data”:"{\“message_id\”…


#7

Sounds like you forgot to take the code and exchange it for an access token in your oAuth loop (step 3).

You have requested an implicit code with will die and cannot be renewed automatically. So you should “upgrade” to the normal oAuth token so you can refresh it automatically.

Rechecking your commented code from earlier, your step 3 doesn’t wait for step 2 to complete.

Step 1) Link out (redirect)
Step 2) Receive ?code (incoming GET)
Step 3) Exchange code for access_token and refresh_token (outgoing POST)


#8

Just keep in mind that tokens gained through the Implicit auth flow can not be refreshed, meaning every time the token expires you’ll have to actually go through the auth process again.

If you use the Authorization Code Flow you’ll be able to programmatically refresh your access token to get a new one, so whenever you need to reconnect to pubsub you’ll have a valid access token rather than an expired one.


closed #9

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.