1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 
20 alias InDataHandler = DataPipeIface!ubyte;
21 
22 public class ConnectError: Exception {
23     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
24         super(message, file, line, next);
25     }
26 }
27 
28 class DecodingException: Exception {
29     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
30         super(message, file, line, next);
31     }
32 }
33 
34 public class TimeoutException: Exception {
35     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
36         super(message, file, line, next);
37     }
38 }
39 
40 /**
41  * DataPipeIface can accept some data, process, and return processed data.
42  */
43 public interface DataPipeIface(E) {
44     /// Is there any processed data ready for reading?
45     bool empty();
46     /// Put next data portion for processing
47     //void put(E[]);
48     void putNoCopy(E[]);
49     /// Get any ready data
50     E[] get();
51     /// Signal on end of incoming data stream.
52     void flush();
53 }
54 /**
55  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
56  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
57  * and uncompress Content-Encoding "gzip".
58  */
59 public class DataPipe(E) : DataPipeIface!E {
60 
61     DataPipeIface!(E)[]  pipe;
62     Buffer!E             buffer;
63     /// Append data processor to pipeline
64     /// Params:
65     /// p = processor
66     void insert(DataPipeIface!E p) {
67         pipe ~= p;
68     }
69     E[][] process(DataPipeIface!E p, E[][] data) {
70         E[][] result;
71         data.each!(e => p.putNoCopy(e));
72         while(!p.empty()) result ~= p.get();
73         return result;
74     }
75     /// Process next data portion. Data passed over pipeline and store result in buffer.
76     /// Params:
77     /// data = input data buffer.
78     /// NoCopy means we do not copy data to buffer, we keep reference
79     void putNoCopy(E[] data) {
80         if ( pipe.empty ) {
81             buffer.putNoCopy(data);
82             return;
83         }
84         try {
85             auto t = process(pipe.front, [data]);
86             foreach(ref p; pipe[1..$]) {
87                 t = process(p, t);
88             }
89             t.each!(b => buffer.putNoCopy(b));
90         }
91         catch (Exception e) {
92             throw new DecodingException(e.msg);
93         }
94     }
95     /// Get what was collected in internal buffer and clear it. 
96     /// Returns:
97     /// data collected.
98     E[] get() {
99         if ( buffer.empty ) {
100             return E[].init;
101         }
102         auto res = buffer.data;
103         buffer = Buffer!E.init;
104         return res;
105     }
106     ///
107     /// get without datamove. but user receive [][]
108     /// 
109     E[][] getNoCopy()  {
110         if ( buffer.empty ) {
111             return E[][].init;
112         }
113         E[][] res = buffer.__repr.__buffer;
114         buffer = Buffer!E.init;
115         return res;
116     }
117     /// Test if internal buffer is empty
118     /// Returns:
119     /// true if internal buffer is empty (nothing to get())
120     bool empty() pure const @safe {
121         return buffer.empty;
122     }
123     void flush() {
124         E[][] product;
125         foreach(ref p; pipe) {
126             product.each!(e => p.putNoCopy(e));
127             p.flush();
128             product.length = 0;
129             while( !p.empty ) product ~= p.get();
130         }
131         product.each!(b => buffer.putNoCopy(b));
132     }
133 }
134 
135 /**
136  * Processor for gzipped/compressed content.
137  * Also support InputRange interface.
138  */
139 public class Decompressor(E) : DataPipeIface!E {
140     private {
141         Buffer!ubyte __buff;
142         UnCompress   __zlib;
143     }
144     this() {
145         __buff = Buffer!ubyte();
146         __zlib = new UnCompress();
147     }
148     override void putNoCopy(E[] data) {
149         if ( __zlib is null  ) {
150             __zlib = new UnCompress();
151         }
152         __buff.putNoCopy(__zlib.uncompress(data));
153     }
154     override E[] get() pure {
155         assert(__buff.length);
156         auto r = __buff.__repr.__buffer[0];
157         __buff.popFrontN(r.length);
158         return cast(E[])r;
159     }
160     override void flush() {
161         if ( __zlib is null  ) {
162             return;
163         }
164         __buff.put(__zlib.flush());
165     }
166     override @property bool empty() const pure @safe {
167         debug(requests) tracef("empty=%b", __buff.empty);
168         return __buff.empty;
169     }
170     @property auto ref front() pure const @safe {
171         debug(requests) tracef("front: buff length=%d", __buff.length);
172         return __buff.front;
173     }
174     @property auto popFront() pure @safe {
175         debug(requests) tracef("popFront: buff length=%d", __buff.length);
176         return __buff.popFront;
177     }
178     @property void popFrontN(size_t n) pure @safe {
179         __buff.popFrontN(n);
180     }
181 }
182 
183 /**
184  * Unchunk chunked http responce body.
185  */
186 public class DecodeChunked : DataPipeIface!ubyte {
187     //    length := 0
188     //    read chunk-size, chunk-extension (if any) and CRLF
189     //    while (chunk-size > 0) {
190     //        read chunk-data and CRLF
191     //        append chunk-data to entity-body
192     //        length := length + chunk-size
193     //        read chunk-size and CRLF
194     //    }
195     //    read entity-header
196     //    while (entity-header not empty) {
197     //        append entity-header to existing header fields
198     //        read entity-header
199     //    }
200     //    Content-Length := length
201     //    Remove "chunked" from Transfer-Encoding
202     //
203 
204     //    Chunked-Body   = *chunk
205     //                      last-chunk
206     //                      trailer
207     //                      CRLF
208     //            
209     //    chunk          = chunk-size [ chunk-extension ] CRLF
210     //                     chunk-data CRLF
211     //                     chunk-size     = 1*HEX
212     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
213     //        
214     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
215     //    chunk-ext-name = token
216     //    chunk-ext-val  = token | quoted-string
217     //    chunk-data     = chunk-size(OCTET)
218     //    trailer        = *(entity-header CRLF)
219 
220     alias eType = ubyte;
221     immutable eType[] CRLF = ['\r', '\n'];
222     private {
223         enum         States {huntingSize, huntingSeparator, receiving, trailer};
224         char         state = States.huntingSize;
225         size_t       chunk_size, to_receive;
226         Buffer!ubyte buff;
227         ubyte[]      linebuff;
228     }
229     void putNoCopy(eType[] data) {
230         while ( data.length ) {
231             if ( state == States.trailer ) {
232                 return;
233             }
234             if ( state == States.huntingSize ) {
235                 import std.ascii;
236                 ubyte[10] digits;
237                 int i;
238                 for(i=0;i<data.length;i++) {
239                     ubyte v = data[i];
240                     digits[i] = v;
241                     if ( v == '\n' ) {
242                         i+=1;
243                         break;
244                     }
245                 }
246                 linebuff ~= digits[0..i];
247                 if ( linebuff.length >= 80 ) {
248                     throw new DecodingException("Can't find chunk size in the body");
249                 }
250                 data = data[i..$];
251 
252                 if (!linebuff.canFind(CRLF)) {
253                     continue;
254                 }
255                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
256                 state = States.receiving;
257                 to_receive = chunk_size;
258                 if ( chunk_size == 0 ) {
259                     state = States.trailer;
260                     return;
261                 }
262                 continue;
263             }
264             if ( state == States.receiving ) {
265                 if (to_receive > 0 ) {
266                     auto can_store = min(to_receive, data.length);
267                     buff.putNoCopy(data[0..can_store]);
268                     data = data[can_store..$];
269                     to_receive -= can_store;
270                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
271                     if ( to_receive == 0 ) {
272                         //tracef("switch to huntig separator");
273                         state = States.huntingSeparator;
274                         continue;
275                     }
276                     continue;
277                 }
278                 assert(false);
279             }
280             if ( state == States.huntingSeparator ) {
281                 if ( data[0] == '\n' || data[0]=='\r') {
282                     data = data[1..$];
283                     continue;
284                 }
285                 state = States.huntingSize;
286                 linebuff.length = 0;
287                 continue;
288             }
289         }
290     }
291     eType[] get() {
292         auto r = buff.__repr.__buffer[0];
293         buff.popFrontN(r.length);
294         return r;
295     }
296     void flush() {
297     }
298     bool empty() {
299         debug(requests) tracef("empty=%b", buff.empty);
300         return buff.empty;
301     }
302     bool done() {
303         return state==States.trailer;
304     }
305 }
306 
307 unittest {
308     info("Testing DataPipe");
309     globalLogLevel(LogLevel.info);
310     alias eType = char;
311     eType[] gzipped = [
312         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
313         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
314         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
315         0x08, 0x00, 0x00, 0x00
316     ]; // "abc\ndef\n"
317     auto d = new Decompressor!eType();
318     d.putNoCopy(gzipped[0..2].dup);
319     d.putNoCopy(gzipped[2..10].dup);
320     d.putNoCopy(gzipped[10..$].dup);
321     d.flush();
322     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
323 
324     auto e = new Decompressor!eType();
325     e.putNoCopy(gzipped[0..10].dup);
326     e.putNoCopy(gzipped[10..$].dup);
327     e.flush();
328     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
329     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
330     auto dp = new DataPipe!eType;
331     dp.insert(new Decompressor!eType());
332     dp.putNoCopy(gzipped[0..2].dup);
333     dp.putNoCopy(gzipped[2..$].dup);
334     dp.flush();
335     assert(equal(dp.get(), "abc\ndef\n"));
336     // empty datapipe shoul just pass input to output
337     auto dpu = new DataPipe!ubyte;
338     dpu.putNoCopy("abcd".dup.representation);
339     dpu.putNoCopy("efgh".dup.representation);
340     dpu.flush();
341     assert(equal(dpu.get(), "abcdefgh"));
342     info("Test unchunker properties");
343     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n".dup.representation;
344     ubyte[][] result;
345     auto uc = new DecodeChunked();
346     uc.putNoCopy(twoChunks);
347     while(!uc.empty) {
348         result ~= uc.get();
349     }
350     assert(equal(result[0], ['1', '2']));
351     assert(equal(result[1], ['3', '4']));
352     info("unchunker correctness - ok");
353     result[0][0] = '5';
354     assert(twoChunks[3] == '5');
355     info("unchunker zero copy - ok");
356     info("Testing DataPipe - done");
357 }
358 /**
359  * Buffer used to collect and process data from network. It remainds Appender, but support
360  * also Range interface.
361  * $(P To place data in buffer use put() method.)
362  * $(P  To retrieve data from buffer you can use several methods:)
363  * $(UL
364  *  $(LI Range methods: front, back, index [])
365  *  $(LI data method: return collected data (like Appender.data))
366  * )
367  */
368 static this() {
369 }
370 static ~this() {
371 }
372 enum   CACHESIZE = 1024;
373 
374 static long reprAlloc;
375 static long reprCacheHit;
376 static long reprCacheRequests;
377 
378 
379 public struct Buffer(T) {
380 //    static Repr[CACHESIZE]  cache;
381 //    static uint             cacheIndex;
382 
383     private {
384         Repr  cachedOrNew() {
385             return new Repr;
386 //            reprCacheRequests++;
387 //            if ( false && cacheIndex>0 ) {
388 //                reprCacheHit++;
389 //                cacheIndex -= 1;
390 //                return cache[cacheIndex];
391 //            } else {
392 //                return new Repr;
393 //            }
394         }
395         class Repr {
396             size_t         __length;
397             Unqual!T[][]   __buffer;
398             this() {
399                 reprAlloc++;
400                 __length = 0;
401             }
402             this(Repr other) {
403                 reprAlloc++;
404                 if ( other is null )
405                     return;
406                 __length = other.__length;
407                 __buffer = other.__buffer.dup;
408             }
409         }
410         Repr __repr;
411     }
412     
413     alias toString = data!string;
414     
415     this(this) {
416         if ( !__repr ) {
417             return;
418         }
419         __repr = new Repr(__repr);
420     }
421     this(U)(U[] data) {
422         put(data);
423     }
424     ~this() {
425         __repr = null;
426 //        if ( cacheIndex >= CACHESIZE ) {
427 //            __repr = null;
428 //            return;
429 //        }
430 //        if ( __repr ) {
431 //            __repr.__length = 0;
432 //            __repr.__buffer = null;
433 //            cache[cacheIndex] = __repr;
434 //            cacheIndex += 1;
435 //            __repr = null;
436 //        }
437     }
438     /***************
439      * store data. Data copied
440      */
441     auto put(U)(U[] data) {
442         if ( data.length == 0 ) {
443             return;
444         }
445         if ( !__repr ) {
446             __repr = cachedOrNew();
447         }
448         static if (!is(U == T)) {
449             auto d = castFrom!(U[]).to!(T[])(data);
450             __repr.__length += d.length;
451             __repr.__buffer ~= d.dup;
452         } else {
453             __repr.__length += data.length;
454             __repr.__buffer ~= data.dup;
455         }
456         return;
457     }
458     auto putNoCopy(U)(U[] data) {
459         if ( data.length == 0 ) {
460             return;
461         }
462         if ( !__repr ) {
463             __repr = cachedOrNew();
464         }
465         static if (!is(U == T)) {
466             auto d = castFrom!(U[]).to!(T[])(data);
467             __repr.__length += d.length;
468             __repr.__buffer ~= d;
469         } else {
470             __repr.__length += data.length;
471             __repr.__buffer ~= data;
472         }
473         return;
474     }
475     @property auto opDollar() const pure @safe {
476         return __repr.__length;
477     }
478     @property auto length() const pure @safe {
479         if ( !__repr ) {
480             return 0;
481         }
482         return __repr.__length;
483     }
484     @property auto empty() const pure @safe {
485         return length == 0;
486     }
487     @property auto ref front() const pure @safe {
488         assert(length);
489         return __repr.__buffer.front.front;
490     }
491     @property auto ref back() const pure @safe {
492         assert(length);
493         return __repr.__buffer.back.back;
494     }
495     @property void popFront() pure @safe {
496         assert(length);
497         with ( __repr ) {
498             __buffer.front.popFront;
499             if ( __buffer.front.length == 0 ) {
500                 __buffer.popFront;
501             }
502             __length--;
503         }
504     }
505     @property void popFrontN(size_t n) pure @safe {
506         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
507         __repr.__length -= n;
508         while( n ) {
509             if ( n <= __repr.__buffer.front.length ) {
510                 __repr.__buffer.front.popFrontN(n);
511                 if ( __repr.__buffer.front.length == 0 ) {
512                     __repr.__buffer.popFront;
513                 }
514                 return;
515             }
516             n -= __repr.__buffer.front.length;
517             __repr.__buffer.popFront;
518         }
519     }
520     @property void popBack() pure @safe {
521         assert(length);
522         __repr.__buffer.back.popBack;
523         if ( __repr.__buffer.back.length == 0 ) {
524             __repr.__buffer.popBack;
525         }
526         __repr.__length--;
527     }
528     @property void popBackN(size_t n) pure @safe {
529         assert(n <= length, "n: %d, length: %d".format(n, length));
530         __repr.__length -= n;
531         while( n ) {
532             if ( n <= __repr.__buffer.back.length ) {
533                 __repr.__buffer.back.popBackN(n);
534                 if ( __repr.__buffer.back.length == 0 ) {
535                     __repr.__buffer.popBack;
536                 }
537                 return;
538             }
539             n -= __repr.__buffer.back.length;
540             __repr.__buffer.popBack;
541         }
542     }
543     @property auto save() @safe {
544         auto n = Buffer!T();
545         n.__repr = new Repr(__repr);
546         return n;
547     }
548     @property auto ref opIndex(size_t n) const pure @safe {
549         assert( __repr && n < __repr.__length );
550         foreach(b; __repr.__buffer) {
551             if ( n < b.length ) {
552                 return b[n];
553             }
554             n -= b.length;
555         }
556         assert(false, "Impossible");
557     }
558     Buffer!T opSlice(size_t m, size_t n) {
559         if ( empty || m == n ) {
560             return Buffer!T();
561         }
562         assert( m <= n && n <= __repr.__length);
563         auto res = this.save();
564         res.popBackN(res.__repr.__length-n);
565         res.popFrontN(m);
566         return res;
567     }
568     @property auto data(U=T[])() pure {
569         static if ( is(U==T[]) ) {
570             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
571                 return __repr.__buffer.front;
572             }
573         }
574         Appender!(T[]) a;
575         if ( __repr && __repr.__buffer ) {
576             foreach(ref b; __repr.__buffer) {
577                 a.put(b);
578             }
579         }
580         static if ( is(U==T[]) ) {
581             return a.data;
582         } else {
583             return castFrom!(T[]).to!U(a.data);
584         }
585     }
586     string opCast(string)() {
587         return this.toString;
588     }
589     bool opEquals(U)(U x) {
590         return cast(U)this == x;
591     }
592 
593 }
594 ///
595 public unittest {
596 
597     static assert(isInputRange!(Buffer!ubyte));
598     static assert(isForwardRange!(Buffer!ubyte));
599     static assert(hasLength!(Buffer!ubyte));
600     static assert(hasSlicing!(Buffer!ubyte));
601     static assert(isBidirectionalRange!(Buffer!ubyte));
602     static assert(isRandomAccessRange!(Buffer!ubyte));
603     
604     auto b = Buffer!ubyte();
605     b.put("abc".representation.dup);
606     b.put("def".representation.dup);
607     assert(b.length == 6);
608     assert(b.toString == "abcdef");
609     assert(b.front == 'a');
610     assert(b.back == 'f');
611     assert(equal(b[0..$], "abcdef"));
612     assert(equal(b[$-2..$], "ef"));
613     assert(b == "abcdef");
614     b.popFront;
615     b.popBack;
616     assert(b.front == 'b');
617     assert(b.back == 'e');
618     assert(b.length == 4);
619     assert(retro(b).front == 'e');
620     assert(countUntil(b, 'e') == 3);
621     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
622     assert(equal(b, "bcde"));
623     b.popFront; b.popFront;
624     assert(b.front == 'd');
625     assert(b.front == b[0]);
626     assert(b.back == b[$-1]);
627 
628     auto c = Buffer!ubyte();
629     c.put("Header0: value0\n".representation.dup);
630     c.put("Header1: value1\n".representation.dup);
631     c.put("Header2: value2\n\nbody".representation.dup);
632     auto c_length = c.length;
633     auto eoh = countUntil(c, "\n\n");
634     assert(eoh == 47);
635     foreach(header; c[0..eoh].splitter('\n') ) {
636         writeln(castFrom!(ubyte[]).to!(string)(header.data));
637     }
638     assert(equal(findSplit(c, "\n\n")[2], "body"));
639     assert(c.length == c_length);
640 }
641 version(vibeD) {
642 }
643 else {
644     extern(C) {
645         int SSL_library_init();
646         void OpenSSL_add_all_ciphers();
647         void OpenSSL_add_all_digests();
648         void SSL_load_error_strings();
649 
650         struct SSL {}
651         struct SSL_CTX {}
652         struct SSL_METHOD {}
653 
654         SSL_CTX* SSL_CTX_new(const SSL_METHOD* method);
655         SSL* SSL_new(SSL_CTX*);
656         int SSL_set_fd(SSL*, int);
657         int SSL_connect(SSL*);
658         int SSL_write(SSL*, const void*, int);
659         int SSL_read(SSL*, void*, int);
660         int SSL_shutdown(SSL*) @trusted @nogc nothrow;
661         void SSL_free(SSL*);
662         void SSL_CTX_free(SSL_CTX*);
663 
664         long    SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
665 
666         long SSL_CTX_set_mode(SSL_CTX *ctx, long mode);
667         long SSL_set_mode(SSL *ssl, long mode);
668 
669         long SSL_CTX_get_mode(SSL_CTX *ctx);
670         long SSL_get_mode(SSL *ssl);
671 
672         SSL_METHOD* SSLv3_client_method();
673         SSL_METHOD* TLSv1_2_client_method();
674         SSL_METHOD* TLSv1_client_method();
675     }
676 
677     //pragma(lib, "crypto");
678     //pragma(lib, "ssl");
679 
680     shared static this() {
681         SSL_library_init();
682         OpenSSL_add_all_ciphers();
683         OpenSSL_add_all_digests();
684         SSL_load_error_strings();
685     }
686 
687     public class OpenSslSocket : Socket {
688         enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
689         private SSL* ssl;
690         private SSL_CTX* ctx;
691         private void initSsl() {
692             //ctx = SSL_CTX_new(SSLv3_client_method());
693             ctx = SSL_CTX_new(TLSv1_client_method());
694             assert(ctx !is null);
695 
696             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
697             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
698             ssl = SSL_new(ctx);
699             SSL_set_fd(ssl, this.handle);
700         }
701 
702         @trusted
703         override void connect(Address to) {
704             super.connect(to);
705             if(SSL_connect(ssl) == -1)
706                 throw new Exception("ssl connect failed");
707         }
708 
709         @trusted
710         override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
711             return SSL_write(ssl, buf.ptr, cast(uint) buf.length);
712         }
713         override ptrdiff_t send(const(void)[] buf) {
714             return send(buf, SocketFlags.NONE);
715         }
716         @trusted
717         override ptrdiff_t receive(void[] buf, SocketFlags flags) {
718             return SSL_read(ssl, buf.ptr, cast(int)buf.length);
719         }
720         override ptrdiff_t receive(void[] buf) {
721             return receive(buf, SocketFlags.NONE);
722         }
723         this(AddressFamily af, SocketType type = SocketType.STREAM) {
724             super(af, type);
725             initSsl();
726         }
727 
728         this(socket_t sock, AddressFamily af) {
729             super(sock, af);
730             initSsl();
731         }
732         override void close() {
733             //SSL_shutdown(ssl);
734             super.close();
735         }
736         ~this() {
737             SSL_free(ssl);
738             SSL_CTX_free(ctx);
739         }
740     }
741 
742     public class SSLSocketStream: SocketStream {
743         override void open(AddressFamily fa) {
744             if ( s !is null ) {
745                 s.close();
746             }
747             s = new OpenSslSocket(fa);
748             assert(s !is null, "Can't create socket");
749             __isOpen = true;
750         }
751         override SSLSocketStream accept() {
752             auto newso = s.accept();
753             if ( s is null ) {
754                 return null;
755             }
756             auto newstream = new SSLSocketStream();
757             auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily);
758             newstream.s = sslSocket;
759             newstream.__isOpen = true;
760             newstream.__isConnected = true;
761             return newstream;
762         }
763     }
764 }
765 
766 public interface NetworkStream {
767     @property bool isConnected() const;
768     @property bool isOpen() const;
769 
770     void close() @trusted;
771 
772     ///
773     /// timeout is the socket write timeout.
774     ///
775     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
776 
777     ptrdiff_t send(const(void)[] buff);
778     ptrdiff_t receive(void[] buff);
779 
780     NetworkStream accept();
781     @property void reuseAddr(bool);
782     void bind(Address);
783     void listen(int);
784 
785     ///
786     /// Set timeout for receive calls. 0 means no timeout.
787     ///
788     @property void readTimeout(Duration timeout);
789 }
790 
791 public abstract class SocketStream : NetworkStream {
792     private {
793         Duration timeout;
794         Socket   s;
795         bool     __isOpen;
796         bool     __isConnected;
797     }
798     void open(AddressFamily fa) {
799     }
800     @property ref Socket so() @safe pure {
801         return s;
802     }
803     @property bool isOpen() @safe @nogc pure const {
804         return s && __isOpen;
805     }
806     @property bool isConnected() @safe @nogc pure const {
807         return s && __isOpen && __isConnected;
808     }
809     void close() @trusted {
810         debug(requests) tracef("Close socket");
811         if ( isOpen ) {
812             s.close();
813             __isOpen = false;
814             __isConnected = false;
815         }
816         s = null;
817     }
818     
819     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
820         debug(requests) tracef(format("Create connection to %s:%d", host, port));
821         Address[] addresses;
822         __isConnected = false;
823         try {
824             addresses = getAddress(host, port);
825         } catch (Exception e) {
826             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
827         }
828         foreach(a; addresses) {
829             debug(requests) tracef("Trying %s", a);
830             try {
831                 open(a.addressFamily);
832                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
833                 s.connect(a);
834                 debug(requests) tracef("Connected to %s", a);
835                 __isConnected = true;
836                 break;
837             } catch (SocketException e) {
838                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
839                 s.close();
840             }
841         }
842         if ( !__isConnected ) {
843             throw new ConnectError("Can't connect to %s:%d".format(host, port));
844         }
845         return this;
846     }
847     
848     ptrdiff_t send(const(void)[] buff) @safe
849     in {assert(isConnected);}
850     body {
851         auto rc = s.send(buff);
852         if (rc < 0) {
853             close();
854             throw new ErrnoException("sending data");
855         }
856         return rc;
857     }
858     
859     ptrdiff_t receive(void[] buff) @safe {
860         while (true) {
861             auto r = s.receive(buff);
862             if (r < 0) {
863                 version(Windows) {
864                     close();
865                     if ( errno == 0 ) {
866                         throw new TimeoutException("Timeout receiving data");
867                     }
868                     throw new ErrnoException();
869                 }
870                 version(Posix) {
871                     if ( errno == EINTR ) {
872                         continue;
873                     }
874                     close();
875                     if ( errno == EAGAIN ) {
876                         throw new TimeoutException("Timeout receiving data");
877                     }
878                     throw new ErrnoException("receiving data");
879                 }
880             }
881             else {
882                 buff.length = r;
883             }
884             return r;
885         }
886         assert(false);
887     }
888 
889     @property void readTimeout(Duration timeout) @safe {
890         s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
891     }
892     override SocketStream accept() {
893         assert(false, "Implement before use");
894     }
895     @property override void reuseAddr(bool yes){
896         if (yes) {
897             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
898         }
899         else {
900             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
901         }
902     }
903     override void bind(Address addr){
904         s.bind(addr);
905     }
906     override void listen(int n) {
907         s.listen(n);
908     };
909 }
910 
911 public class TCPSocketStream : SocketStream {
912     override void open(AddressFamily fa) {
913         if ( s !is null ) {
914             s.close();
915         }
916         s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
917         assert(s !is null, "Can't create socket");
918         __isOpen = true;
919     }
920     override TCPSocketStream accept() {
921         auto newso = s.accept();
922         if ( s is null ) {
923             return null;
924         }
925         auto newstream = new TCPSocketStream();
926         newstream.s = newso;
927         newstream.__isOpen = true;
928         newstream.__isConnected = true;
929         return newstream;
930     }
931 }
932 
933 version (vibeD) {
934     import vibe.core.net, vibe.stream.tls;
935 
936     public class TCPVibeStream : NetworkStream {
937     private:
938         TCPConnection _conn;
939         Duration _readTimeout = Duration.max;
940         bool _isOpen = true;
941 
942     public:
943         @property bool isConnected() const {
944             return _conn.connected;
945         }
946         @property override bool isOpen() const {
947             return _conn && _isOpen;
948         }
949         void close() @trusted {
950             _conn.close();
951             _isOpen = false;
952         }
953 
954         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
955             // FIXME: timeout not supported in vibe.d
956             try {
957                 _conn = connectTCP(host, port);
958             }
959             catch (Exception e)
960                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
961 
962             return this;
963         }
964 
965         ptrdiff_t send(const(void)[] buff) {
966             _conn.write(cast(const(ubyte)[])buff);
967             return buff.length;
968         }
969 
970         ptrdiff_t receive(void[] buff) {
971             if (!_conn.waitForData(_readTimeout)) {
972                 if (!_conn.connected) {
973                     return 0;
974                 }
975                 throw new TimeoutException("Timeout receiving data");
976             }
977 
978             if(_conn.empty) {
979                 return 0;
980             }
981 
982             auto chunk = min(_conn.leastSize, buff.length);
983             assert(chunk != 0);
984             _conn.read(cast(ubyte[])buff[0 .. chunk]);
985             return chunk;
986         }
987 
988         @property void readTimeout(Duration timeout) {
989             if (timeout == 0.seconds) {
990                 _readTimeout = Duration.max;
991             }
992             else {
993                 _readTimeout = timeout;
994             }
995         }
996         override TCPVibeStream accept() {
997             assert(false, "Must be implemented");
998         }
999         override @property void reuseAddr(bool){
1000             assert(false, "Not Implemented");
1001         }
1002         override void bind(Address){
1003             assert(false, "Not Implemented");
1004         }
1005         override void listen(int){
1006             assert(false, "Not Implemented");
1007         }
1008     }
1009 
1010     public class SSLVibeStream : TCPVibeStream {
1011     private:
1012         Stream _sslStream;
1013         bool   _isOpen = true;
1014     public:
1015         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1016             try {
1017                 _conn = connectTCP(host, port);
1018                 auto sslctx = createTLSContext(TLSContextKind.client);
1019                 sslctx.peerValidationMode = TLSPeerValidationMode.none;
1020                 _sslStream = createTLSStream(_conn, sslctx);
1021             }
1022             catch (Exception e) {
1023                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1024             }
1025 
1026             return this;
1027         }
1028 
1029         override ptrdiff_t send(const(void)[] buff) {
1030             _sslStream.write(cast(const(ubyte)[])buff);
1031             return buff.length;
1032         }
1033 
1034         override ptrdiff_t receive(void[] buff) {
1035             if (!_sslStream.dataAvailableForRead) {
1036                 if (!_conn.waitForData(_readTimeout)) {
1037                     if (!_conn.connected) {
1038                         return 0;
1039                     }
1040                     throw new TimeoutException("Timeout receiving data");
1041                 }
1042             }
1043 
1044             if(_sslStream.empty) {
1045                 return 0;
1046             }
1047 
1048             auto chunk = min(_sslStream.leastSize, buff.length);
1049             assert(chunk != 0);
1050             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
1051             return chunk;
1052         }
1053 
1054         override void close() @trusted {
1055             _sslStream.finalize();
1056             _conn.close();
1057             _isOpen = false;
1058         }
1059         @property override bool isOpen() const {
1060             return _conn && _isOpen;
1061         }
1062         override SSLVibeStream accept() {
1063             assert(false, "Must be implemented");
1064         }
1065         override @property void reuseAddr(bool){
1066             assert(false, "Not Implemented");
1067         }
1068         override void bind(Address){
1069             assert(false, "Not Implemented");
1070         }
1071         override void listen(int){
1072             assert(false, "Not Implemented");
1073         }
1074     }
1075 }
1076 
1077 version (vibeD) {
1078     public alias TCPStream = TCPVibeStream;
1079     public alias SSLStream = SSLVibeStream;
1080 }
1081 else {
1082     public alias TCPStream = TCPSocketStream;
1083     public alias SSLStream = SSLSocketStream;
1084 }