Node.js,Socket.io,Redis pub / sub高容量,低延迟困难

当将socket.io/node.js和redis pub / sub结合起来试图创建一个由可处理多个传输的服务器事件驱动的实时Web广播系统时,似乎有三种方法:

  • 'createClient'redis连接并订阅频道。 在socket.io客户端连接上,将客户端连接到socket.io房间。 在redis.on(“message”,...)事件中,调用io.sockets.in(room).emit(“event”,data)分发给相关房间中的所有客户端。 像如何在socket.io中重用redis连接?

  • 'createClient'是一个redis连接。 在socket.io客户端连接上,将客户端加入到socket.io房间并订阅相关的redis通道。 在客户端连接关闭和收到消息调用client.emit(“event”,data)时引入redis.on(“message”,...)以引发特定客户端上的事件。 就像在使用socket.io中的RedisStore的示例中的答案一样

  • 按照socketio-spec协议,使用RedisStore烘焙到socket.io中,并在Redis中的单个“调度”通道中“广播”。

  • Number 1允许为所有客户端处理Redis sub和相关事件一次。 Number 2为Redis pub / sub提供了更直接的钩子。 数字3更简单,但对消息传递事件的控制很少。

    但是,在我的测试中,所有连接的客户端都超过1个,性能出乎意料地低。 有问题的服务器事件是尽可能快地发布到redis频道的1,000条消息,以便尽快分发。 性能通过连接客户端的时间进行度量(基于socket.io-client的日志时间戳进入Redis列表进行分析)。

    我猜测的是,在选项1中,服务器接收到消息,然后将其依次写入所有连接的客户端。 在选项2中,服务器多次接收每条消息(每个客户端订阅一次)并将其写入相关客户端。 在任何情况下,服务器都不会收到第二个消息事件,直到它传递给所有连接的客户端。 情况明显加剧,并发性增加。

    这似乎与堆栈功能的智慧不一致。 我想相信,但我很挣扎。

    这种情况(大量消息的低延迟分布)不是这些工具的选项(还是?),还是我错过了一个技巧?


    我认为这是一个合理的问题,并在短时间内对其进行了研究。 我花了一点时间寻找可以从中获得一些有用技巧的示例。

    例子

    我喜欢以直截了当的例子开始:

  • 光即时示例代码
  • Node.js + Redis Pub / Sub + socket.io演示
  • light sample是一个单独的页面(注意你想用Matt Ranney的node_redis来代替redis-node-client:

    /*
     * Mclarens Bar: Redis based Instant Messaging
     * Nikhil Marathe - 22/04/2010
    
     * A simple example of an IM client implemented using
     * Redis PUB/SUB commands so that all the communication
     * is offloaded to Redis, and the node.js code only
     * handles command interpretation,presentation and subscribing.
     * 
     * Requires redis-node-client and a recent version of Redis
     *    http://code.google.com/p/redis
     *    http://github.com/fictorial/redis-node-client
     *
     * Start the server then telnet to port 8000
     * Register with NICK <nick>, use WHO to see others
     * Use TALKTO <nick> to initiate a chat. Send a message
     * using MSG <nick> <msg>. Note its important to do a
     * TALKTO so that both sides are listening. Use STOP <nick>
     * to stop talking to someone, and QUIT to exit.
     *
     * This code is in the public domain.
     */
    var redis = require('./redis-node-client/lib/redis-client');
    
    var sys = require('sys');
    var net = require('net');
    
    var server = net.createServer(function(stream) {
        var sub; // redis connection
        var pub;
        var registered = false;
        var nick = "";
    
        function channel(a,b) {
        return [a,b].sort().join(':');
        }
    
        function shareTable(other) {
        sys.debug(nick + ": Subscribing to "+channel(nick,other));
        sub.subscribeTo(channel(nick,other), function(channel, message) {
            var str = message.toString();
            var sender = str.slice(0, str.indexOf(':'));
            if( sender != nick )
            stream.write("[" + sender + "] " + str.substr(str.indexOf(':')+1) + "n");
        });
        }
    
        function leaveTable(other) {
        sub.unsubscribeFrom(channel(nick,other), function(err) {
            stream.write("Stopped talking to " + other+ "n");
        });
        }
    
        stream.addListener("connect", function() {
        sub = redis.createClient();
        pub = redis.createClient();
        });
    
        stream.addListener("data", function(data) {
        if( !registered ) {
            var msg = data.toString().match(/^NICK (w*)/);
            if(msg) {
            stream.write("SERVER: Hi " + msg[1] + "n");
            pub.sadd('mclarens:inside', msg[1], function(err) {
                if(err) {
                stream.end();
                }
                registered = true;
                nick = msg[1];
    // server messages
                sub.subscribeTo( nick + ":info", function(nick, message) {
                var m = message.toString().split(' ');
                var cmd = m[0];
                var who = m[1];
                if( cmd == "start" ) {
                    stream.write( who + " is now talking to youn");
                    shareTable(who);
                }
                else if( cmd == "stop" ) {
                    stream.write( who + " stopped talking to youn");
                    leaveTable(who);
                }
                });
            });
            }
            else {
            stream.write("Please register with NICK <nickname>n");
            }
            return;
        }
    
        var fragments = data.toString().replace('rn', '').split(' ');
        switch(fragments[0]) {
        case 'TALKTO':
            pub.publish(fragments[1]+":info", "start " + nick, function(a,b) {
            });
            shareTable(fragments[1]);
            break;
        case 'MSG':
            pub.publish(channel(nick, fragments[1]),
                nick + ':' +fragments.slice(2).join(' '),
                  function(err, reply) {
                  if(err) {
                      stream.write("ERROR!");
                  }
                  });
            break;
        case 'WHO':
            pub.smembers('mclarens:inside', function(err, users) {
            stream.write("Online:n" + users.join('n') + "n");
            });
            break;
        case 'STOP':
            leaveTable(fragments[1]);
            pub.publish(fragments[1]+":info", "stop " + nick, function() {});
            break;
        case 'QUIT':
            stream.end();
            break;
        }
        });
    
        stream.addListener("end", function() {
        pub.publish(nick, nick + " is offline");
        pub.srem('mclarens:inside', nick, function(err) {
            if(err) {
            sys.debug("Could not remove client");
            }
        });
        });
    });
    
    server.listen(8000, "localhost");
    

    文件

    这里有大量的文档,并且这种类型的堆栈上的apis正在迅速改变,因此您必须权衡每个文档的时间相关性。

  • 节点活动流
  • 云代工厂的例子
  • 如何节点redis pubsub
  • redis延迟
  • redis cookbook使用Pub / Sub进行异步通信
  • linkedin的通用提示
  • 节点redis绑定
  • 谷歌组nodejs的问题
  • 相关问题

    只是一些相关的问题,这是堆栈上的热门话题:

  • node.js中的聊天服务器的Redis pub / sub
  • 如何设计即时消息系统的redis pub / sub?
  • 值得注意的技巧(ymmv)

    关闭或优化套接字池,使用高效的绑定,监视延迟,并确保你没有复制工作(即不需要两次向所有听众发布)。

    链接地址: http://www.djcxy.com/p/34157.html

    上一篇: Node.js, Socket.io, Redis pub/sub high volume, low latency difficulties

    下一篇: Using Redis for Pub Sub . Advantages / Disadvantages over RabbitMQ