1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 
20 alias InDataHandler = DataPipeIface!ubyte;
21 
22 public class ConnectError: Exception {
23     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
24         super(message, file, line, next);
25     }
26 }
27 
28 class DecodingExceptioin: Exception {
29     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
30         super(message, file, line, next);
31     }
32 }
33 
34 public class TimeoutException: Exception {
35     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
36         super(message, file, line, next);
37     }
38 }
39 
40 /**
41  * DataPipeIface can accept some data, process, and return processed data.
42  */
43 public interface DataPipeIface(E) {
44     /// Is there any processed data ready for reading?
45     bool empty();
46     /// Put next data portion for processing
47     //void put(E[]);
48     void putNoCopy(E[]);
49     /// Get any ready data
50     E[] get();
51     /// Signal on end of incoming data stream.
52     void flush();
53 }
54 /**
55  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
56  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
57  * and uncompress Content-Encoding "gzip".
58  */
59 public class DataPipe(E) : DataPipeIface!E {
60 
61     DataPipeIface!(E)[]  pipe;
62     Buffer!E             buffer;
63     /// Append data processor to pipeline
64     /// Params:
65     /// p = processor
66     void insert(DataPipeIface!E p) {
67         pipe ~= p;
68     }
69     E[][] process(DataPipeIface!E p, E[][] data) {
70         E[][] result;
71         data.each!(e => p.putNoCopy(e));
72         while(!p.empty()) result ~= p.get();
73         return result;
74     }
75     /// Process next data portion. Data passed over pipeline and store result in buffer.
76     /// Params:
77     /// data = input data buffer.
78     /// NoCopy means we do not copy data to buffer, we keep reference
79     void putNoCopy(E[] data) {
80         if ( pipe.empty ) {
81             buffer.putNoCopy(data);
82             return;
83         }
84         auto t = process(pipe.front, [data]);
85         foreach(ref p; pipe[1..$]) {
86             t = process(p, t);
87         }
88         t.each!(b => buffer.putNoCopy(b));
89     }
90     /// Get what was collected in internal buffer and clear it. 
91     /// Returns:
92     /// data collected.
93     E[] get() {
94         if ( buffer.empty ) {
95             return E[].init;
96         }
97         auto res = buffer.data;
98         buffer = Buffer!E.init;
99         return res;
100     }
101     ///
102     /// get without datamove. but user receive [][]
103     /// 
104     E[][] getNoCopy()  {
105         if ( buffer.empty ) {
106             return E[][].init;
107         }
108         E[][] res = buffer.__repr.__buffer;
109         buffer = Buffer!E.init;
110         return res;
111     }
112     /// Test if internal buffer is empty
113     /// Returns:
114     /// true if internal buffer is empty (nothing to get())
115     bool empty() pure const @safe {
116         return buffer.empty;
117     }
118     void flush() {
119         E[][] product;
120         foreach(ref p; pipe) {
121             product.each!(e => p.putNoCopy(e));
122             p.flush();
123             product.length = 0;
124             while( !p.empty ) product ~= p.get();
125         }
126         product.each!(b => buffer.putNoCopy(b));
127     }
128 }
129 
130 /**
131  * Processor for gzipped/compressed content.
132  * Also support InputRange interface.
133  */
134 public class Decompressor(E) : DataPipeIface!E {
135     private {
136         Buffer!ubyte __buff;
137         UnCompress   __zlib;
138     }
139     this() {
140         __buff = Buffer!ubyte();
141         __zlib = new UnCompress();
142     }
143     override void putNoCopy(E[] data) {
144         if ( __zlib is null  ) {
145             __zlib = new UnCompress();
146         }
147         __buff.putNoCopy(__zlib.uncompress(data));
148     }
149     override E[] get() pure {
150         assert(__buff.length);
151         auto r = __buff.__repr.__buffer[0];
152         __buff.popFrontN(r.length);
153         return cast(E[])r;
154     }
155     override void flush() {
156         if ( __zlib is null  ) {
157             return;
158         }
159         __buff.put(__zlib.flush());
160     }
161     override @property bool empty() const pure @safe {
162         debug tracef("empty=%b", __buff.empty);
163         return __buff.empty;
164     }
165     @property auto ref front() pure const @safe {
166         debug tracef("front: buff length=%d", __buff.length);
167         return __buff.front;
168     }
169     @property auto popFront() pure @safe {
170         debug tracef("popFront: buff length=%d", __buff.length);
171         return __buff.popFront;
172     }
173     @property void popFrontN(size_t n) pure @safe {
174         __buff.popFrontN(n);
175     }
176 }
177 
178 /**
179  * Unchunk chunked http responce body.
180  */
181 public class DecodeChunked : DataPipeIface!ubyte {
182     //    length := 0
183     //    read chunk-size, chunk-extension (if any) and CRLF
184     //    while (chunk-size > 0) {
185     //        read chunk-data and CRLF
186     //        append chunk-data to entity-body
187     //        length := length + chunk-size
188     //        read chunk-size and CRLF
189     //    }
190     //    read entity-header
191     //    while (entity-header not empty) {
192     //        append entity-header to existing header fields
193     //        read entity-header
194     //    }
195     //    Content-Length := length
196     //    Remove "chunked" from Transfer-Encoding
197     //
198 
199     //    Chunked-Body   = *chunk
200     //                      last-chunk
201     //                      trailer
202     //                      CRLF
203     //            
204     //    chunk          = chunk-size [ chunk-extension ] CRLF
205     //                     chunk-data CRLF
206     //                     chunk-size     = 1*HEX
207     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
208     //        
209     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
210     //    chunk-ext-name = token
211     //    chunk-ext-val  = token | quoted-string
212     //    chunk-data     = chunk-size(OCTET)
213     //    trailer        = *(entity-header CRLF)
214 
215     alias eType = ubyte;
216     immutable eType[] CRLF = ['\r', '\n'];
217     private {
218         enum         States {huntingSize, huntingSeparator, receiving, trailer};
219         char         state = States.huntingSize;
220         size_t       chunk_size, to_receive;
221         Buffer!ubyte buff;
222         ubyte[]      linebuff;
223     }
224     void putNoCopy(eType[] data) {
225         while ( data.length ) {
226             if ( state == States.trailer ) {
227                 return;
228             }
229             if ( state == States.huntingSize ) {
230                 import std.ascii;
231                 ubyte[10] digits;
232                 int i;
233                 for(i=0;i<data.length;i++) {
234                     ubyte v = data[i];
235                     digits[i] = v;
236                     if ( v == '\n' ) {
237                         i+=1;
238                         break;
239                     }
240                 }
241                 linebuff ~= digits[0..i];
242                 if ( linebuff.length >= 80 ) {
243                     throw new DecodingExceptioin("Can't find chunk size in the body");
244                 }
245                 data = data[i..$];
246 
247                 if (!linebuff.canFind(CRLF)) {
248                     continue;
249                 }
250                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
251                 state = States.receiving;
252                 to_receive = chunk_size;
253                 if ( chunk_size == 0 ) {
254                     state = States.trailer;
255                     return;
256                 }
257                 continue;
258             }
259             if ( state == States.receiving ) {
260                 if (to_receive > 0 ) {
261                     auto can_store = min(to_receive, data.length);
262                     buff.putNoCopy(data[0..can_store]);
263                     data = data[can_store..$];
264                     to_receive -= can_store;
265                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
266                     if ( to_receive == 0 ) {
267                         //tracef("switch to huntig separator");
268                         state = States.huntingSeparator;
269                         continue;
270                     }
271                     continue;
272                 }
273                 assert(false);
274             }
275             if ( state == States.huntingSeparator ) {
276                 if ( data[0] == '\n' || data[0]=='\r') {
277                     data = data[1..$];
278                     continue;
279                 }
280                 state = States.huntingSize;
281                 linebuff.length = 0;
282                 continue;
283             }
284         }
285     }
286     eType[] get() {
287         auto r = buff.__repr.__buffer[0];
288         buff.popFrontN(r.length);
289         return r;
290     }
291     void flush() {
292     }
293     bool empty() {
294         debug tracef("empty=%b", buff.empty);
295         return buff.empty;
296     }
297     bool done() {
298         return state==States.trailer;
299     }
300 }
301 
302 unittest {
303     info("Testing DataPipe");
304     globalLogLevel(LogLevel.info);
305     alias eType = char;
306     eType[] gzipped = [
307         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
308         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
309         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
310         0x08, 0x00, 0x00, 0x00
311     ]; // "abc\ndef\n"
312     auto d = new Decompressor!eType();
313     d.putNoCopy(gzipped[0..2].dup);
314     d.putNoCopy(gzipped[2..10].dup);
315     d.putNoCopy(gzipped[10..$].dup);
316     d.flush();
317     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
318 
319     auto e = new Decompressor!eType();
320     e.putNoCopy(gzipped[0..10].dup);
321     e.putNoCopy(gzipped[10..$].dup);
322     e.flush();
323     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
324     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
325     auto dp = new DataPipe!eType;
326     dp.insert(new Decompressor!eType());
327     dp.putNoCopy(gzipped[0..2].dup);
328     dp.putNoCopy(gzipped[2..$].dup);
329     dp.flush();
330     assert(equal(dp.get(), "abc\ndef\n"));
331     // empty datapipe shoul just pass input to output
332     auto dpu = new DataPipe!ubyte;
333     dpu.putNoCopy("abcd".dup.representation);
334     dpu.putNoCopy("efgh".dup.representation);
335     dpu.flush();
336     assert(equal(dpu.get(), "abcdefgh"));
337     info("Test unchunker properties");
338     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n".dup.representation;
339     ubyte[][] result;
340     auto uc = new DecodeChunked();
341     uc.putNoCopy(twoChunks);
342     while(!uc.empty) {
343         result ~= uc.get();
344     }
345     assert(equal(result[0], ['1', '2']));
346     assert(equal(result[1], ['3', '4']));
347     info("unchunker correctness - ok");
348     result[0][0] = '5';
349     assert(twoChunks[3] == '5');
350     info("unchunker zero copy - ok");
351     info("Testing DataPipe - done");
352 }
353 /**
354  * Buffer used to collect and process data from network. It remainds Appender, but support
355  * also Range interface.
356  * $(P To place data in buffer use put() method.)
357  * $(P  To retrieve data from buffer you can use several methods:)
358  * $(UL
359  *  $(LI Range methods: front, back, index [])
360  *  $(LI data method: return collected data (like Appender.data))
361  * )
362  */
363 static this() {
364 }
365 static ~this() {
366 }
367 enum   CACHESIZE = 1024;
368 
369 static long reprAlloc;
370 static long reprCacheHit;
371 static long reprCacheRequests;
372 
373 
374 public struct Buffer(T) {
375 //    static Repr[CACHESIZE]  cache;
376 //    static uint             cacheIndex;
377 
378     private {
379         Repr  cachedOrNew() {
380             return new Repr;
381 //            reprCacheRequests++;
382 //            if ( false && cacheIndex>0 ) {
383 //                reprCacheHit++;
384 //                cacheIndex -= 1;
385 //                return cache[cacheIndex];
386 //            } else {
387 //                return new Repr;
388 //            }
389         }
390         class Repr {
391             size_t         __length;
392             Unqual!T[][]   __buffer;
393             this() {
394                 reprAlloc++;
395                 __length = 0;
396             }
397             this(Repr other) {
398                 reprAlloc++;
399                 if ( other is null )
400                     return;
401                 __length = other.__length;
402                 __buffer = other.__buffer.dup;
403             }
404         }
405         Repr __repr;
406     }
407     
408     alias toString = data!string;
409     
410     this(this) {
411         if ( !__repr ) {
412             return;
413         }
414         __repr = new Repr(__repr);
415     }
416     this(U)(U[] data) {
417         put(data);
418     }
419     ~this() {
420         __repr = null;
421 //        if ( cacheIndex >= CACHESIZE ) {
422 //            __repr = null;
423 //            return;
424 //        }
425 //        if ( __repr ) {
426 //            __repr.__length = 0;
427 //            __repr.__buffer = null;
428 //            cache[cacheIndex] = __repr;
429 //            cacheIndex += 1;
430 //            __repr = null;
431 //        }
432     }
433     /***************
434      * store data. Data copied
435      */
436     auto put(U)(U[] data) {
437         if ( data.length == 0 ) {
438             return;
439         }
440         if ( !__repr ) {
441             __repr = cachedOrNew();
442         }
443         static if (!is(U == T)) {
444             auto d = castFrom!(U[]).to!(T[])(data);
445             __repr.__length += d.length;
446             __repr.__buffer ~= d.dup;
447         } else {
448             __repr.__length += data.length;
449             __repr.__buffer ~= data.dup;
450         }
451         return;
452     }
453     auto putNoCopy(U)(U[] data) {
454         if ( data.length == 0 ) {
455             return;
456         }
457         if ( !__repr ) {
458             __repr = cachedOrNew();
459         }
460         static if (!is(U == T)) {
461             auto d = castFrom!(U[]).to!(T[])(data);
462             __repr.__length += d.length;
463             __repr.__buffer ~= d;
464         } else {
465             __repr.__length += data.length;
466             __repr.__buffer ~= data;
467         }
468         return;
469     }
470     @property auto opDollar() const pure @safe {
471         return __repr.__length;
472     }
473     @property auto length() const pure @safe {
474         if ( !__repr ) {
475             return 0;
476         }
477         return __repr.__length;
478     }
479     @property auto empty() const pure @safe {
480         return length == 0;
481     }
482     @property auto ref front() const pure @safe {
483         assert(length);
484         return __repr.__buffer.front.front;
485     }
486     @property auto ref back() const pure @safe {
487         assert(length);
488         return __repr.__buffer.back.back;
489     }
490     @property void popFront() pure @safe {
491         assert(length);
492         with ( __repr ) {
493             __buffer.front.popFront;
494             if ( __buffer.front.length == 0 ) {
495                 __buffer.popFront;
496             }
497             __length--;
498         }
499     }
500     @property void popFrontN(size_t n) pure @safe {
501         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
502         __repr.__length -= n;
503         while( n ) {
504             if ( n <= __repr.__buffer.front.length ) {
505                 __repr.__buffer.front.popFrontN(n);
506                 if ( __repr.__buffer.front.length == 0 ) {
507                     __repr.__buffer.popFront;
508                 }
509                 return;
510             }
511             n -= __repr.__buffer.front.length;
512             __repr.__buffer.popFront;
513         }
514     }
515     @property void popBack() pure @safe {
516         assert(length);
517         __repr.__buffer.back.popBack;
518         if ( __repr.__buffer.back.length == 0 ) {
519             __repr.__buffer.popBack;
520         }
521         __repr.__length--;
522     }
523     @property void popBackN(size_t n) pure @safe {
524         assert(n <= length, "n: %d, length: %d".format(n, length));
525         __repr.__length -= n;
526         while( n ) {
527             if ( n <= __repr.__buffer.back.length ) {
528                 __repr.__buffer.back.popBackN(n);
529                 if ( __repr.__buffer.back.length == 0 ) {
530                     __repr.__buffer.popBack;
531                 }
532                 return;
533             }
534             n -= __repr.__buffer.back.length;
535             __repr.__buffer.popBack;
536         }
537     }
538     @property auto save() @safe {
539         auto n = Buffer!T();
540         n.__repr = new Repr(__repr);
541         return n;
542     }
543     @property auto ref opIndex(size_t n) const pure @safe {
544         assert( __repr && n < __repr.__length );
545         foreach(b; __repr.__buffer) {
546             if ( n < b.length ) {
547                 return b[n];
548             }
549             n -= b.length;
550         }
551         assert(false, "Impossible");
552     }
553     Buffer!T opSlice(size_t m, size_t n) {
554         if ( empty || m == n ) {
555             return Buffer!T();
556         }
557         assert( m <= n && n <= __repr.__length);
558         auto res = this.save();
559         res.popBackN(res.__repr.__length-n);
560         res.popFrontN(m);
561         return res;
562     }
563     @property auto data(U=T[])() pure {
564         static if ( is(U==T[]) ) {
565             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
566                 return __repr.__buffer.front;
567             }
568         }
569         Appender!(T[]) a;
570         if ( __repr && __repr.__buffer ) {
571             foreach(ref b; __repr.__buffer) {
572                 a.put(b);
573             }
574         }
575         static if ( is(U==T[]) ) {
576             return a.data;
577         } else {
578             return castFrom!(T[]).to!U(a.data);
579         }
580     }
581     string opCast(string)() {
582         return this.toString;
583     }
584     bool opEquals(U)(U x) {
585         return cast(U)this == x;
586     }
587 
588 }
589 ///
590 public unittest {
591 
592     static assert(isInputRange!(Buffer!ubyte));
593     static assert(isForwardRange!(Buffer!ubyte));
594     static assert(hasLength!(Buffer!ubyte));
595     static assert(hasSlicing!(Buffer!ubyte));
596     static assert(isBidirectionalRange!(Buffer!ubyte));
597     static assert(isRandomAccessRange!(Buffer!ubyte));
598     
599     auto b = Buffer!ubyte();
600     b.put("abc".representation.dup);
601     b.put("def".representation.dup);
602     assert(b.length == 6);
603     assert(b.toString == "abcdef");
604     assert(b.front == 'a');
605     assert(b.back == 'f');
606     assert(equal(b[0..$], "abcdef"));
607     assert(equal(b[$-2..$], "ef"));
608     assert(b == "abcdef");
609     b.popFront;
610     b.popBack;
611     assert(b.front == 'b');
612     assert(b.back == 'e');
613     assert(b.length == 4);
614     assert(retro(b).front == 'e');
615     assert(countUntil(b, 'e') == 3);
616     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
617     assert(equal(b, "bcde"));
618     b.popFront; b.popFront;
619     assert(b.front == 'd');
620     assert(b.front == b[0]);
621     assert(b.back == b[$-1]);
622 
623     auto c = Buffer!ubyte();
624     c.put("Header0: value0\n".representation.dup);
625     c.put("Header1: value1\n".representation.dup);
626     c.put("Header2: value2\n\nbody".representation.dup);
627     auto c_length = c.length;
628     auto eoh = countUntil(c, "\n\n");
629     assert(eoh == 47);
630     foreach(header; c[0..eoh].splitter('\n') ) {
631         writeln(castFrom!(ubyte[]).to!(string)(header.data));
632     }
633     assert(equal(findSplit(c, "\n\n")[2], "body"));
634     assert(c.length == c_length);
635 }
636 
637 version (sslLibs) {
638     extern(C) {
639         int SSL_library_init();
640         void OpenSSL_add_all_ciphers();
641         void OpenSSL_add_all_digests();
642         void SSL_load_error_strings();
643 
644         struct SSL {}
645         struct SSL_CTX {}
646         struct SSL_METHOD {}
647 
648         SSL_CTX* SSL_CTX_new(const SSL_METHOD* method);
649         SSL* SSL_new(SSL_CTX*);
650         int SSL_set_fd(SSL*, int);
651         int SSL_connect(SSL*);
652         int SSL_write(SSL*, const void*, int);
653         int SSL_read(SSL*, void*, int);
654         int SSL_shutdown(SSL*) @trusted @nogc nothrow;
655         void SSL_free(SSL*);
656         void SSL_CTX_free(SSL_CTX*);
657 
658         long    SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
659 
660         long SSL_CTX_set_mode(SSL_CTX *ctx, long mode);
661         long SSL_set_mode(SSL *ssl, long mode);
662 
663         long SSL_CTX_get_mode(SSL_CTX *ctx);
664         long SSL_get_mode(SSL *ssl);
665 
666         SSL_METHOD* SSLv3_client_method();
667         SSL_METHOD* TLSv1_2_client_method();
668         SSL_METHOD* TLSv1_client_method();
669     }
670 
671     //pragma(lib, "crypto");
672     //pragma(lib, "ssl");
673 
674     shared static this() {
675         SSL_library_init();
676         OpenSSL_add_all_ciphers();
677         OpenSSL_add_all_digests();
678         SSL_load_error_strings();
679     }
680 
681     public class OpenSslSocket : Socket {
682         enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
683         private SSL* ssl;
684         private SSL_CTX* ctx;
685         private void initSsl() {
686             //ctx = SSL_CTX_new(SSLv3_client_method());
687             ctx = SSL_CTX_new(TLSv1_client_method());
688             assert(ctx !is null);
689 
690             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
691             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
692             ssl = SSL_new(ctx);
693             SSL_set_fd(ssl, this.handle);
694         }
695 
696         @trusted
697         override void connect(Address to) {
698             super.connect(to);
699             if(SSL_connect(ssl) == -1)
700                 throw new Exception("ssl connect failed");
701         }
702 
703         @trusted
704         override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
705             return SSL_write(ssl, buf.ptr, cast(uint) buf.length);
706         }
707         override ptrdiff_t send(const(void)[] buf) {
708             return send(buf, SocketFlags.NONE);
709         }
710         @trusted
711         override ptrdiff_t receive(void[] buf, SocketFlags flags) {
712             return SSL_read(ssl, buf.ptr, cast(int)buf.length);
713         }
714         override ptrdiff_t receive(void[] buf) {
715             return receive(buf, SocketFlags.NONE);
716         }
717         this(AddressFamily af, SocketType type = SocketType.STREAM) {
718             super(af, type);
719             initSsl();
720         }
721 
722         this(socket_t sock, AddressFamily af) {
723             super(sock, af);
724             initSsl();
725         }
726         override void close() {
727             //SSL_shutdown(ssl);
728             super.close();
729         }
730         ~this() {
731             SSL_free(ssl);
732             SSL_CTX_free(ctx);
733         }
734     }
735 
736     public class SSLSocketStream: SocketStream {
737         override void open(AddressFamily fa) {
738             if ( s !is null ) {
739                 s.close();
740             }
741             s = new OpenSslSocket(fa);
742             assert(s !is null, "Can't create socket");
743             __isOpen = true;
744         }
745     }
746 }
747 
748 public interface NetworkStream {
749     @property bool isConnected() const;
750     void close() @trusted;
751 
752     ///
753     /// timeout is the socket write timeout.
754     ///
755     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
756 
757     ptrdiff_t send(const(void)[] buff);
758     ptrdiff_t receive(void[] buff);
759 
760     ///
761     /// Set timeout for receive calls. 0 means no timeout.
762     ///
763     @property void readTimeout(Duration timeout);
764 }
765 
766 public abstract class SocketStream : NetworkStream {
767     private {
768         Duration timeout;
769         Socket   s;
770         bool     __isOpen;
771         bool     __isConnected;
772     }
773     void open(AddressFamily fa) {
774     }
775     @property ref Socket so() @safe pure {
776         return s;
777     }
778     @property bool isOpen() @safe @nogc pure const {
779         return s && __isOpen;
780     }
781     @property bool isConnected() @safe @nogc pure const {
782         return s && __isConnected;
783     }
784     void close() @trusted {
785         tracef("Close socket");
786         if ( isOpen ) {
787             s.close();
788             __isOpen = false;
789             __isConnected = false;
790         }
791         s = null;
792     }
793     
794     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
795         tracef(format("Create connection to %s:%d", host, port));
796         Address[] addresses;
797         __isConnected = false;
798         try {
799             addresses = getAddress(host, port);
800         } catch (Exception e) {
801             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
802         }
803         foreach(a; addresses) {
804             tracef("Trying %s", a);
805             try {
806                 open(a.addressFamily);
807                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
808                 s.connect(a);
809                 tracef("Connected to %s", a);
810                 __isConnected = true;
811                 break;
812             } catch (SocketException e) {
813                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
814                 s.close();
815             }
816         }
817         if ( !__isConnected ) {
818             throw new ConnectError("Can't connect to %s:%d".format(host, port));
819         }
820         return this;
821     }
822     
823     ptrdiff_t send(const(void)[] buff) @safe
824     in {assert(isConnected);}
825     body {
826         auto rc = s.send(buff);
827         if (rc < 0) {
828             throw new ErrnoException("sending data");
829         }
830         return rc;
831     }
832     
833     ptrdiff_t receive(void[] buff) @safe {
834         while (true) {
835             auto r = s.receive(buff);
836             if (r < 0) {
837                 version(Windows) {
838                     if ( errno == 0 ) {
839                         throw new TimeoutException("Timeout receiving data");
840                     }
841                 }
842                 version(Posix) {
843                     if ( errno == EINTR ) {
844                         continue;
845                     }
846                     if ( errno == EAGAIN ) {
847                         throw new TimeoutException("Timeout receiving data");
848                     }
849                     throw new ErrnoException("receiving data");
850                 }
851             }
852             else {
853                 buff.length = r;
854             }
855             return r;
856         }
857         assert(false);
858     }
859 
860     @property void readTimeout(Duration timeout) @safe {
861         s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
862     }
863 }
864 
865 public class TCPSocketStream : SocketStream {
866     override void open(AddressFamily fa) {
867         if ( s !is null ) {
868             s.close();
869         }
870         s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
871         assert(s !is null, "Can't create socket");
872         __isOpen = true;
873     }
874 }
875 
876 version (vibeD) {
877     import vibe.core.net, vibe.stream.tls;
878 
879     public class TCPVibeStream : NetworkStream {
880     private:
881         TCPConnection _conn;
882         Duration _readTimeout = Duration.max;
883 
884     public:
885         @property bool isConnected() const {
886             return _conn.connected;
887         }
888 
889         void close() @trusted {
890             _conn.close();
891         }
892 
893         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
894             // FIXME: timeout not supported in vibe.d
895             try {
896                 _conn = connectTCP(host, port);
897             }
898             catch (Exception e)
899                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
900 
901             return this;
902         }
903 
904         ptrdiff_t send(const(void)[] buff) {
905             _conn.write(cast(const(ubyte)[])buff);
906             return buff.length;
907         }
908 
909         ptrdiff_t receive(void[] buff) {
910             if (!_conn.waitForData(_readTimeout)) {
911                 if (!_conn.connected) {
912                     return 0;
913                 }
914                 throw new TimeoutException("Timeout receiving data");
915             }
916 
917             if(_conn.empty) {
918                 return 0;
919             }
920 
921             auto chunk = min(_conn.leastSize, buff.length);
922             assert(chunk != 0);
923             _conn.read(cast(ubyte[])buff[0 .. chunk]);
924             return chunk;
925         }
926 
927         @property void readTimeout(Duration timeout) {
928             if (timeout == 0.seconds) {
929                 _readTimeout = Duration.max;
930             }
931             else {
932                 _readTimeout = timeout;
933             }
934         }
935     }
936 
937     public class SSLVibeStream : TCPVibeStream {
938     private:
939         Stream _sslStream;
940 
941     public:
942         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
943             try {
944                 _conn = connectTCP(host, port);
945                 auto sslctx = createTLSContext(TLSContextKind.client);
946                 sslctx.peerValidationMode = TLSPeerValidationMode.none;
947                 _sslStream = createTLSStream(_conn, sslctx);
948             }
949             catch (Exception e) {
950                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
951             }
952 
953             return this;
954         }
955 
956         override ptrdiff_t send(const(void)[] buff) {
957             _sslStream.write(cast(const(ubyte)[])buff);
958             return buff.length;
959         }
960 
961         override ptrdiff_t receive(void[] buff) {
962             if (!_sslStream.dataAvailableForRead) {
963                 if (!_conn.waitForData(_readTimeout)) {
964                     if (!_conn.connected) {
965                         return 0;
966                     }
967                     throw new TimeoutException("Timeout receiving data");
968                 }
969             }
970 
971             if(_sslStream.empty) {
972                 return 0;
973             }
974 
975             auto chunk = min(_sslStream.leastSize, buff.length);
976             assert(chunk != 0);
977             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
978             return chunk;
979         }
980 
981         override void close() @trusted {
982             _sslStream.finalize();
983             _conn.close();
984         }
985     }
986 }
987 
988 version (vibeD) {
989     public alias TCPStream = TCPVibeStream;
990     public alias SSLStream = SSLVibeStream;
991 }
992 else {
993     public alias TCPStream = TCPSocketStream;
994     public alias SSLStream = SSLSocketStream;
995 }