可观察网络IO解析

我正在尝试使用Rx从TCPClient接收流读取数据,并将数据解析为由换行符“ r n”分隔的字符串IObservable。以下是我从套接字流接收的数据...

var messages = new Subject<string>();

var functionReceiveSocketData =
            Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
            (client.Client.BeginReceive, client.Client.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
        {
            var rs = new byte[buffer.Length];
            bs.CopyTo(rs, 0);
            return rs;
        };

Observable
    .Defer(() =>
            {
                var buffer = new byte[50];
                return
                    from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
                select copy(buffer, n);
            }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));

这是我想出来解析字符串。 这目前不起作用...

obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("rn"))
            );

消息主题接收大块的消息,所以我试图连接它们,并测试连接的字符串是否包含换行符,从而指示缓冲区关闭并输出缓冲区块。 不知道为什么它不起作用。 似乎我只能从obsStrings中获得第一个块。

所以我正在寻找两件事。 我想简化io流的阅读并消除消息主题的使用。 其次,我想让我的字符串解析工作。 我一直在对此进行黑客攻击,无法提供可行的解决方案。 我是Rx的初学者。

编辑:这是问题解决后的成品....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
            .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
            .Scan(String.Empty, (a, b) => (a.EndsWith("rn") ? "" : a) + b)
            .Where(x => x.EndsWith("rn"))
            .Select(buffered => String.Join("", buffered))
            .Select(a => a.Replace("n", ""));

“ReceiveUntilCompleted”是RXX项目的扩展。


messages
    .Scan(String.Empty, (a, b) => (a.EndsWith("rn") ? "" : a) + b)
    .Where(x => x.EndsWith("rn"))

而不是Subscribe和使用Subject ,你可以尝试只是Select

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

现在假设这已经进入了一个新的可观察messages ,你的下一个问题就是在这一行

var obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("rn"))
            );

您正在使用BufferScan ,并试图在两个方面做同样的事情! 请注意, Buffer需要关闭选择器。

你真正想要的是:

var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("rn")))
                         .Select(buffered => String.Join(buffered));

这为Buffered提供了关于什么时候关闭窗口(当它包含 r n)时的可观察性并给出了选择要并置的缓冲量。 这会导致您的拆分字符串的新观察结果。

一个问题是,你仍然可以在一个块的中间有新的线路,这会导致问题。 一个简单的想法是观察字符而不是完整的字符串块,例如:

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

然后,您可以执行messages.Where(c => c != 'r')跳过r并将缓冲区更改为:

var obsStrings = messages.Buffer(() => messages.Where(x => x == 'n')))
                         .Select(buffered => String.Join("", buffered));
链接地址: http://www.djcxy.com/p/59111.html

上一篇: Observable Network IO Parsing

下一篇: How to access php variable using concatenation