1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 
20 import requests.ssl_adapter : openssl, SSL, SSL_CTX;
21 
22 alias InDataHandler = DataPipeIface!ubyte;
23 
24 public class ConnectError: Exception {
25     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
26         super(message, file, line, next);
27     }
28 }
29 
30 class DecodingException: Exception {
31     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
32         super(message, file, line, next);
33     }
34 }
35 
36 public class TimeoutException: Exception {
37     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
38         super(message, file, line, next);
39     }
40 }
41 
42 public class NetworkException: Exception {
43     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
44         super(message, file, line, next);
45     }
46 }
47 
48 /**
49  * DataPipeIface can accept some data, process, and return processed data.
50  */
51 public interface DataPipeIface(E) {
52     /// Is there any processed data ready for reading?
53     bool empty();
54     /// Put next data portion for processing
55     //void put(E[]);
56     void putNoCopy(E[]);
57     /// Get any ready data
58     E[] get();
59     /// Signal on end of incoming data stream.
60     void flush();
61 }
62 /**
63  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
64  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
65  * and uncompress Content-Encoding "gzip".
66  */
67 public class DataPipe(E) : DataPipeIface!E {
68 
69     DataPipeIface!(E)[]  pipe;
70     Buffer!E             buffer;
71     /// Append data processor to pipeline
72     /// Params:
73     /// p = processor
74     final void insert(DataPipeIface!E p) {
75         pipe ~= p;
76     }
77     final E[][] process(DataPipeIface!E p, E[][] data) {
78         E[][] result;
79         data.each!(e => p.putNoCopy(e));
80         while(!p.empty()) result ~= p.get();
81         return result;
82     }
83     /// Process next data portion. Data passed over pipeline and store result in buffer.
84     /// Params:
85     /// data = input data buffer.
86     /// NoCopy means we do not copy data to buffer, we keep reference
87     final void putNoCopy(E[] data) {
88         if ( pipe.empty ) {
89             buffer.putNoCopy(data);
90             return;
91         }
92         try {
93             auto t = process(pipe.front, [data]);
94             foreach(ref p; pipe[1..$]) {
95                 t = process(p, t);
96             }
97             t.each!(b => buffer.putNoCopy(b));
98         }
99         catch (Exception e) {
100             throw new DecodingException(e.msg);
101         }
102     }
103     /// Get what was collected in internal buffer and clear it.
104     /// Returns:
105     /// data collected.
106     final E[] get() {
107         if ( buffer.empty ) {
108             return E[].init;
109         }
110         auto res = buffer.data;
111         buffer = Buffer!E.init;
112         return res;
113     }
114     ///
115     /// get without datamove. but user receive [][]
116     ///
117     final E[][] getNoCopy()  {
118         if ( buffer.empty ) {
119             return E[][].init;
120         }
121         E[][] res = buffer.__repr.__buffer;
122         buffer = Buffer!E.init;
123         return res;
124     }
125     /// Test if internal buffer is empty
126     /// Returns:
127     /// true if internal buffer is empty (nothing to get())
128     final bool empty() pure const @safe {
129         return buffer.empty;
130     }
131     final void flush() {
132         E[][] product;
133         foreach(ref p; pipe) {
134             product.each!(e => p.putNoCopy(e));
135             p.flush();
136             product.length = 0;
137             while( !p.empty ) product ~= p.get();
138         }
139         product.each!(b => buffer.putNoCopy(b));
140     }
141 }
142 
143 /**
144  * Processor for gzipped/compressed content.
145  * Also support InputRange interface.
146  */
147 public class Decompressor(E) : DataPipeIface!E {
148     private {
149         Buffer!ubyte __buff;
150         UnCompress   __zlib;
151     }
152     this() {
153         __buff = Buffer!ubyte();
154         __zlib = new UnCompress();
155     }
156     final override void putNoCopy(E[] data) {
157         if ( __zlib is null  ) {
158             __zlib = new UnCompress();
159         }
160         __buff.putNoCopy(__zlib.uncompress(data));
161     }
162     final override E[] get() pure {
163         assert(__buff.length);
164         auto r = __buff.__repr.__buffer[0];
165         __buff.popFrontN(r.length);
166         return cast(E[])r;
167     }
168     final override void flush() {
169         if ( __zlib is null  ) {
170             return;
171         }
172         __buff.put(__zlib.flush());
173     }
174     final override @property bool empty() const pure @safe {
175         debug(requests) tracef("empty=%b", __buff.empty);
176         return __buff.empty;
177     }
178     final @property auto ref front() pure const @safe {
179         debug(requests) tracef("front: buff length=%d", __buff.length);
180         return __buff.front;
181     }
182     final @property auto popFront() pure @safe {
183         debug(requests) tracef("popFront: buff length=%d", __buff.length);
184         return __buff.popFront;
185     }
186     final @property void popFrontN(size_t n) pure @safe {
187         __buff.popFrontN(n);
188     }
189 }
190 
191 /**
192  * Unchunk chunked http responce body.
193  */
194 public class DecodeChunked : DataPipeIface!ubyte {
195     //    length := 0
196     //    read chunk-size, chunk-extension (if any) and CRLF
197     //    while (chunk-size > 0) {
198     //        read chunk-data and CRLF
199     //        append chunk-data to entity-body
200     //        length := length + chunk-size
201     //        read chunk-size and CRLF
202     //    }
203     //    read entity-header
204     //    while (entity-header not empty) {
205     //        append entity-header to existing header fields
206     //        read entity-header
207     //    }
208     //    Content-Length := length
209     //    Remove "chunked" from Transfer-Encoding
210     //
211 
212     //    Chunked-Body   = *chunk
213     //                      last-chunk
214     //                      trailer
215     //                      CRLF
216     //
217     //    chunk          = chunk-size [ chunk-extension ] CRLF
218     //                     chunk-data CRLF
219     //                     chunk-size     = 1*HEX
220     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
221     //
222     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
223     //    chunk-ext-name = token
224     //    chunk-ext-val  = token | quoted-string
225     //    chunk-data     = chunk-size(OCTET)
226     //    trailer        = *(entity-header CRLF)
227 
228     alias eType = ubyte;
229     immutable eType[] CRLF = ['\r', '\n'];
230     private {
231         enum         States {huntingSize, huntingSeparator, receiving, trailer};
232         char         state = States.huntingSize;
233         size_t       chunk_size, to_receive;
234         Buffer!ubyte buff;
235         ubyte[]      linebuff;
236     }
237     final void putNoCopy(eType[] data) {
238         while ( data.length ) {
239             if ( state == States.trailer ) {
240                 to_receive = to_receive - min(to_receive, data.length);
241                 return;
242             }
243             if ( state == States.huntingSize ) {
244                 import std.ascii;
245                 ubyte[10] digits;
246                 int i;
247                 for(i=0;i<data.length;i++) {
248                     ubyte v = data[i];
249                     digits[i] = v;
250                     if ( v == '\n' ) {
251                         i+=1;
252                         break;
253                     }
254                 }
255                 linebuff ~= digits[0..i];
256                 if ( linebuff.length >= 80 ) {
257                     throw new DecodingException("Can't find chunk size in the body");
258                 }
259                 data = data[i..$];
260                 if (!linebuff.canFind(CRLF)) {
261                     continue;
262                 }
263                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
264                 state = States.receiving;
265                 to_receive = chunk_size;
266                 if ( chunk_size == 0 ) {
267                     to_receive = 2-min(2, data.length); // trailing \r\n
268                     state = States.trailer;
269                     return;
270                 }
271                 continue;
272             }
273             if ( state == States.receiving ) {
274                 if (to_receive > 0 ) {
275                     auto can_store = min(to_receive, data.length);
276                     buff.putNoCopy(data[0..can_store]);
277                     data = data[can_store..$];
278                     to_receive -= can_store;
279                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
280                     if ( to_receive == 0 ) {
281                         //tracef("switch to huntig separator");
282                         state = States.huntingSeparator;
283                         continue;
284                     }
285                     continue;
286                 }
287                 assert(false);
288             }
289             if ( state == States.huntingSeparator ) {
290                 if ( data[0] == '\n' || data[0]=='\r') {
291                     data = data[1..$];
292                     continue;
293                 }
294                 state = States.huntingSize;
295                 linebuff.length = 0;
296                 continue;
297             }
298         }
299     }
300     final eType[] get() {
301         auto r = buff.__repr.__buffer[0];
302         buff.popFrontN(r.length);
303         return r;
304     }
305     final void flush() {
306     }
307     final bool empty() {
308         debug(requests) tracef("empty=%b", buff.empty);
309         return buff.empty;
310     }
311     final bool done() {
312         return state==States.trailer && to_receive==0;
313     }
314 }
315 
316 unittest {
317     info("Testing DataPipe");
318     globalLogLevel(LogLevel.info);
319     alias eType = char;
320     eType[] gzipped = [
321         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
322         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
323         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
324         0x08, 0x00, 0x00, 0x00
325     ]; // "abc\ndef\n"
326     auto d = new Decompressor!eType();
327     d.putNoCopy(gzipped[0..2].dup);
328     d.putNoCopy(gzipped[2..10].dup);
329     d.putNoCopy(gzipped[10..$].dup);
330     d.flush();
331     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
332 
333     auto e = new Decompressor!eType();
334     e.putNoCopy(gzipped[0..10].dup);
335     e.putNoCopy(gzipped[10..$].dup);
336     e.flush();
337     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
338     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
339     auto dp = new DataPipe!eType;
340     dp.insert(new Decompressor!eType());
341     dp.putNoCopy(gzipped[0..2].dup);
342     dp.putNoCopy(gzipped[2..$].dup);
343     dp.flush();
344     assert(equal(dp.get(), "abc\ndef\n"));
345     // empty datapipe shoul just pass input to output
346     auto dpu = new DataPipe!ubyte;
347     dpu.putNoCopy("abcd".dup.representation);
348     dpu.putNoCopy("efgh".dup.representation);
349     dpu.flush();
350     assert(equal(dpu.get(), "abcdefgh"));
351     info("Test unchunker properties");
352     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation;
353     ubyte[][] result;
354     auto uc = new DecodeChunked();
355     uc.putNoCopy(twoChunks);
356     while(!uc.empty) {
357         result ~= uc.get();
358     }
359     assert(equal(result[0], ['1', '2']));
360     assert(equal(result[1], ['3', '4']));
361     info("unchunker correctness - ok");
362     result[0][0] = '5';
363     assert(twoChunks[3] == '5');
364     info("unchunker zero copy - ok");
365     info("Testing DataPipe - done");
366 }
367 /**
368  * Buffer used to collect and process data from network. It remainds Appender, but support
369  * also Range interface.
370  * $(P To place data in buffer use put() method.)
371  * $(P  To retrieve data from buffer you can use several methods:)
372  * $(UL
373  *  $(LI Range methods: front, back, index [])
374  *  $(LI data method: return collected data (like Appender.data))
375  * )
376  */
377 static this() {
378 }
379 static ~this() {
380 }
381 enum   CACHESIZE = 1024;
382 
383 static long reprAlloc;
384 static long reprCacheHit;
385 static long reprCacheRequests;
386 
387 
388 public struct Buffer(T) {
389 //    static Repr[CACHESIZE]  cache;
390 //    static uint             cacheIndex;
391 
392     private {
393         Repr  cachedOrNew() {
394             return new Repr;
395 //            reprCacheRequests++;
396 //            if ( false && cacheIndex>0 ) {
397 //                reprCacheHit++;
398 //                cacheIndex -= 1;
399 //                return cache[cacheIndex];
400 //            } else {
401 //                return new Repr;
402 //            }
403         }
404         class Repr {
405             size_t         __length;
406             Unqual!T[][]   __buffer;
407             this() {
408                 reprAlloc++;
409                 __length = 0;
410             }
411             this(Repr other) {
412                 reprAlloc++;
413                 if ( other is null )
414                     return;
415                 __length = other.__length;
416                 __buffer = other.__buffer.dup;
417             }
418         }
419         Repr __repr;
420     }
421 
422     alias toString = data!string;
423 
424     this(this) {
425         if ( !__repr ) {
426             return;
427         }
428         __repr = new Repr(__repr);
429     }
430     this(U)(U[] data) {
431         put(data);
432     }
433     ~this() {
434         __repr = null;
435     }
436     /***************
437      * store data. Data copied
438      */
439     auto put(U)(U[] data) {
440         if ( data.length == 0 ) {
441             return;
442         }
443         if ( !__repr ) {
444             __repr = cachedOrNew();
445         }
446         static if (!is(U == T)) {
447             auto d = cast(T[])(data);
448             __repr.__length += d.length;
449             __repr.__buffer ~= d.dup;
450         } else {
451             __repr.__length += data.length;
452             __repr.__buffer ~= data.dup;
453         }
454         return;
455     }
456     auto putNoCopy(U)(U[] data) {
457         if ( data.length == 0 ) {
458             return;
459         }
460         if ( !__repr ) {
461             __repr = cachedOrNew();
462         }
463         static if (!is(U == T)) {
464             auto d = cast(T[])(data);
465             __repr.__length += d.length;
466             __repr.__buffer ~= d;
467         } else {
468             __repr.__length += data.length;
469             __repr.__buffer ~= data;
470         }
471         return;
472     }
473     @property auto opDollar() const pure @safe {
474         return __repr.__length;
475     }
476     @property size_t length() const pure @safe {
477         if ( !__repr ) {
478             return 0;
479         }
480         return __repr.__length;
481     }
482     @property auto empty() const pure @safe {
483         return length == 0;
484     }
485     @property auto ref front() const pure @safe {
486         assert(length);
487         return __repr.__buffer.front.front;
488     }
489     @property auto ref back() const pure @safe {
490         assert(length);
491         return __repr.__buffer.back.back;
492     }
493     @property void popFront() pure @safe {
494         assert(length);
495         with ( __repr ) {
496             __buffer.front.popFront;
497             if ( __buffer.front.length == 0 ) {
498                 __buffer.popFront;
499             }
500             __length--;
501         }
502     }
503     @property void popFrontN(size_t n) pure @safe {
504         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
505         __repr.__length -= n;
506         while( n ) {
507             if ( n <= __repr.__buffer.front.length ) {
508                 __repr.__buffer.front.popFrontN(n);
509                 if ( __repr.__buffer.front.length == 0 ) {
510                     __repr.__buffer.popFront;
511                 }
512                 return;
513             }
514             n -= __repr.__buffer.front.length;
515             __repr.__buffer.popFront;
516         }
517     }
518     @property void popBack() pure @safe {
519         assert(length);
520         __repr.__buffer.back.popBack;
521         if ( __repr.__buffer.back.length == 0 ) {
522             __repr.__buffer.popBack;
523         }
524         __repr.__length--;
525     }
526     @property void popBackN(size_t n) pure @safe {
527         assert(n <= length, "n: %d, length: %d".format(n, length));
528         __repr.__length -= n;
529         while( n ) {
530             if ( n <= __repr.__buffer.back.length ) {
531                 __repr.__buffer.back.popBackN(n);
532                 if ( __repr.__buffer.back.length == 0 ) {
533                     __repr.__buffer.popBack;
534                 }
535                 return;
536             }
537             n -= __repr.__buffer.back.length;
538             __repr.__buffer.popBack;
539         }
540     }
541     @property auto save() @safe {
542         auto n = Buffer!T();
543         n.__repr = new Repr(__repr);
544         return n;
545     }
546     @property auto ref opIndex(size_t n) const pure @safe {
547         assert( __repr && n < __repr.__length );
548         foreach(b; __repr.__buffer) {
549             if ( n < b.length ) {
550                 return b[n];
551             }
552             n -= b.length;
553         }
554         assert(false, "Impossible");
555     }
556     Buffer!T opSlice(size_t m, size_t n) {
557         if ( empty || m == n ) {
558             return Buffer!T();
559         }
560         assert( m <= n && n <= __repr.__length);
561         auto res = this.save();
562         res.popBackN(res.__repr.__length-n);
563         res.popFrontN(m);
564         return res;
565     }
566     @property auto data(U=T[])() pure {
567         static if ( is(U==T[]) ) {
568             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
569                 return __repr.__buffer.front;
570             }
571         }
572         Appender!(T[]) a;
573         if ( __repr && __repr.__buffer ) {
574             foreach(ref b; __repr.__buffer) {
575                 a.put(b);
576             }
577         }
578         static if ( is(U==T[]) ) {
579             return a.data;
580         } else {
581             return cast(U)a.data;
582         }
583     }
584     string opCast(string)() {
585         return this.toString;
586     }
587     bool opEquals(U)(U x) {
588         return cast(U)this == x;
589     }
590 
591 }
592 ///
593 public unittest {
594 
595     static assert(isInputRange!(Buffer!ubyte));
596     static assert(isForwardRange!(Buffer!ubyte));
597     static assert(hasLength!(Buffer!ubyte));
598     static assert(hasSlicing!(Buffer!ubyte));
599     static assert(isBidirectionalRange!(Buffer!ubyte));
600     static assert(isRandomAccessRange!(Buffer!ubyte));
601 
602     auto b = Buffer!ubyte();
603     b.put("abc".representation.dup);
604     b.put("def".representation.dup);
605     assert(b.length == 6);
606     assert(b.toString == "abcdef");
607     assert(b.front == 'a');
608     assert(b.back == 'f');
609     assert(equal(b[0..$], "abcdef"));
610     assert(equal(b[$-2..$], "ef"));
611     assert(b == "abcdef");
612     b.popFront;
613     b.popBack;
614     assert(b.front == 'b');
615     assert(b.back == 'e');
616     assert(b.length == 4);
617     assert(retro(b).front == 'e');
618     assert(countUntil(b, 'e') == 3);
619     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
620     assert(equal(b, "bcde"));
621     b.popFront; b.popFront;
622     assert(b.front == 'd');
623     assert(b.front == b[0]);
624     assert(b.back == b[$-1]);
625 
626     auto c = Buffer!ubyte();
627     c.put("Header0: value0\n".representation.dup);
628     c.put("Header1: value1\n".representation.dup);
629     c.put("Header2: value2\n\nbody".representation.dup);
630     auto c_length = c.length;
631     auto eoh = countUntil(c, "\n\n");
632     assert(eoh == 47);
633     foreach(header; c[0..eoh].splitter('\n') ) {
634         writeln(cast(string)header.data);
635     }
636     assert(equal(findSplit(c, "\n\n")[2], "body"));
637     assert(c.length == c_length);
638 }
639 
640 public struct SSLOptions {
641     enum filetype {
642         pem,
643         asn1,
644         der = asn1,
645     }
646     private {
647         /**
648          * do we need to veryfy peer?
649          */
650         bool     _verifyPeer = false;
651         /**
652          * path to CA cert
653          */
654         string   _caCert;
655         /**
656          * path to key file (can also contain cert (for pem)
657          */
658         string   _keyFile;
659         /**
660          * path to cert file (can also contain key (for pem)
661          */
662         string   _certFile;
663         filetype _keyType = filetype.pem;
664         filetype _certType = filetype.pem;
665     }
666     ubyte haveFiles() pure nothrow @safe @nogc {
667         ubyte r = 0;
668         if ( _keyFile  ) r|=1;
669         if ( _certFile ) r|=2;
670         return r;
671     }
672     // do we want to verify peer certificates?
673     bool getVerifyPeer() pure nothrow @nogc {
674         return _verifyPeer;
675     }
676     SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe {
677         _verifyPeer = v;
678         return this;
679     }
680     /// set key file name and type (default - pem)
681     auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
682         _keyFile = f;
683         _keyType = t;
684         return this;
685     }
686     auto getKeyFile() @safe pure nothrow @nogc {
687         return _keyFile;
688     }
689     auto getKeyType() @safe pure nothrow @nogc {
690         return _keyType;
691     }
692     /// set cert file name and type (default - pem)
693     auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
694         _certFile = f;
695         _certType = t;
696         return this;
697     }
698     auto setCaCert(string p) @safe pure nothrow @nogc {
699         _caCert = p;
700         return this;
701     }
702     auto getCaCert() @safe pure nothrow @nogc {
703         return _caCert;
704     }
705     auto getCertFile() @safe pure nothrow @nogc {
706         return _certFile;
707     }
708     auto getCertType() @safe pure nothrow @nogc {
709         return _certType;
710     }
711     /// set key file type
712     void setKeyType(string t) @safe pure nothrow {
713         _keyType = cast(filetype)sslKeyTypes[t];
714     }
715     /// set cert file type
716     void setCertType(string t) @safe pure nothrow {
717         _certType = cast(filetype)sslKeyTypes[t];
718     }
719 }
720 static immutable int[string] sslKeyTypes;
721 shared static this() {
722     sslKeyTypes = [
723         "pem":SSLOptions.filetype.pem,
724         "asn1":SSLOptions.filetype.asn1,
725         "der":SSLOptions.filetype.der,
726     ];
727 }
728 
729 version(vibeD) {
730 }
731 else {
732     extern(C) {
733         int SSL_library_init();
734     }
735 
736     enum SSL_VERIFY_PEER = 0x01;
737     enum SSL_FILETYPE_PEM = 1;
738     enum SSL_FILETYPE_ASN1 = 2;
739 
740     immutable int[SSLOptions.filetype] ft2ssl;
741 
742     shared static this() {
743         ft2ssl = [
744             SSLOptions.filetype.pem: SSL_FILETYPE_PEM,
745             SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1,
746             SSLOptions.filetype.der: SSL_FILETYPE_ASN1
747         ];
748     }
749 
750     public class OpenSslSocket : Socket {
751         //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
752         private SSL* ssl;
753         private SSL_CTX* ctx;
754 
755         private void initSsl(SSLOptions opts) {
756             //ctx = SSL_CTX_new(SSLv3_client_method());
757             ctx = openssl.SSL_CTX_new(openssl.TLS_method());
758             assert(ctx !is null);
759             if ( opts.getVerifyPeer() ) {
760                 openssl.SSL_CTX_set_default_verify_paths(ctx);
761                 if ( opts.getCaCert() ) {
762                     openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null);
763                 }
764                 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null);
765             }
766             immutable keyFile = opts.getKeyFile();
767             immutable keyType = opts.getKeyType();
768             immutable certFile = opts.getCertFile();
769             immutable certType = opts.getCertType();
770             final switch(opts.haveFiles()) {
771                 case 0b11:  // both files
772                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
773                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]);
774                     break;
775                 case 0b01:  // key only
776                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
777                     openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]);
778                     break;
779                 case 0b10:  // cert only
780                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  certFile.toStringz(), ft2ssl[certType]);
781                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]);
782                     break;
783                 case 0b00:
784                     break;
785             }
786             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
787             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
788             ssl = openssl.SSL_new(ctx);
789             openssl.SSL_set_fd(ssl, cast(int)this.handle);
790         }
791 
792         @trusted
793         override void connect(Address dest) {
794             super.connect(dest);
795             if(openssl.SSL_connect(ssl) == -1) {
796                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
797             }
798         }
799         auto connectSSL() {
800             if(openssl.SSL_connect(ssl) == -1) {
801                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
802             }
803             debug(requests) tracef("ssl socket connected");
804             return this;
805         }
806         @trusted
807         override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
808             return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length);
809         }
810         override ptrdiff_t send(const(void)[] buf) {
811             return send(buf, SocketFlags.NONE);
812         }
813         @trusted
814         override ptrdiff_t receive(void[] buf, SocketFlags flags) {
815             return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length);
816         }
817         override ptrdiff_t receive(void[] buf) {
818             return receive(buf, SocketFlags.NONE);
819         }
820         this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) {
821             super(af, type);
822             initSsl(opts);
823         }
824         this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) {
825             super(sock, af);
826             initSsl(opts);
827         }
828         override void close() {
829             //SSL_shutdown(ssl);
830             super.close();
831         }
832         ~this() {
833             openssl.SSL_free(ssl);
834             openssl.SSL_CTX_free(ctx);
835         }
836         void SSL_set_tlsext_host_name(string host) {
837 
838         }
839     }
840 
841     public class SSLSocketStream: SocketStream {
842         private SSLOptions _sslOptions;
843         private Socket underlyingSocket;
844         private SSL* ssl;
845         private string host;
846 
847         this(SSLOptions opts) {
848             _sslOptions = opts;
849         }
850         this(NetworkStream ostream, SSLOptions opts, string host = null) {
851             _sslOptions = opts;
852             this.host = host;
853             auto osock = ostream.so();
854             underlyingSocket = osock;
855             auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions);
856             ssl = ss.ssl;
857             if ( host !is null ) {
858                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
859             }
860             ss.connectSSL();
861             __isOpen = true;
862             __isConnected = true;
863             s = ss;
864             debug(requests) tracef("ssl stream created from another stream: %s", s);
865         }
866         override void close() {
867             ssl = null;
868             host = null;
869             super.close();
870             if ( underlyingSocket ) {
871                 underlyingSocket.close();
872             }
873         }
874         override void open(AddressFamily fa) {
875             if ( s !is null ) {
876                 s.close();
877             }
878             auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions);
879             assert(ss !is null, "Can't create socket");
880             ssl = ss.ssl;
881             if ( host !is null ) {
882                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
883             }
884             s = ss;
885             __isOpen = true;
886         }
887         override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) {
888             host = h;
889             return super.connect(h, p, timeout);
890         }
891         override SSLSocketStream accept() {
892             auto newso = s.accept();
893             if ( s is null ) {
894                 return null;
895             }
896             auto newstream = new SSLSocketStream(_sslOptions);
897             auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily);
898             newstream.s = sslSocket;
899             newstream.__isOpen = true;
900             newstream.__isConnected = true;
901             return newstream;
902         }
903     }
904     public class TCPSocketStream : SocketStream {
905         override void open(AddressFamily fa) {
906             if ( s !is null ) {
907                 s.close();
908             }
909             s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
910             assert(s !is null, "Can't create socket");
911             __isOpen = true;
912             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
913         }
914         override TCPSocketStream accept() {
915             auto newso = s.accept();
916             if ( s is null ) {
917                 return null;
918             }
919             auto newstream = new TCPSocketStream();
920             newstream.s = newso;
921             newstream.__isOpen = true;
922             newstream.__isConnected = true;
923             newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
924             return newstream;
925         }
926     }
927 }
928 
929 public interface NetworkStream {
930     @property bool isConnected() const;
931     @property bool isOpen() const;
932 
933     void close() @trusted;
934 
935     ///
936     /// timeout is the socket write timeout.
937     ///
938     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
939 
940     ptrdiff_t send(const(void)[] buff);
941     ptrdiff_t receive(void[] buff);
942 
943     NetworkStream accept();
944     @property void reuseAddr(bool);
945     void bind(string);
946     void bind(Address);
947     void listen(int);
948     version(vibeD) {
949         TCPConnection so();
950     } else {
951         Socket so();
952     }
953     ///
954     /// Set timeout for receive calls. 0 means no timeout.
955     ///
956     @property void readTimeout(Duration timeout);
957 }
958 
959 public abstract class SocketStream : NetworkStream {
960     private {
961         Duration timeout;
962         Socket   s;
963         bool     __isOpen;
964         bool     __isConnected;
965         string   _bind;
966     }
967     void open(AddressFamily fa) {
968     }
969     @property Socket so() @safe pure {
970         return s;
971     }
972     @property bool isOpen() @safe @nogc pure const {
973         return s && __isOpen;
974     }
975     @property bool isConnected() @safe @nogc pure const {
976         return s && __isOpen && __isConnected;
977     }
978     void close() @trusted {
979         debug(requests) tracef("Close socket");
980         if ( isOpen ) {
981             s.close();
982             __isOpen = false;
983             __isConnected = false;
984         }
985         s = null;
986     }
987     /***
988     *  bind() just remember address. We will cal bind() at the time of connect as
989     *  we can have several connection trials.
990     ***/
991     override void bind(string to) {
992         _bind = to;
993     }
994     /***
995     *  Make connection to remote site. Bind, handle connection error, try several addresses, etc
996     ***/
997     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
998         debug(requests) tracef(format("Create connection to %s:%d", host, port));
999         Address[] addresses;
1000         __isConnected = false;
1001         try {
1002             addresses = getAddress(host, port);
1003         } catch (Exception e) {
1004             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
1005         }
1006         foreach(a; addresses) {
1007             debug(requests) tracef("Trying %s", a);
1008             try {
1009                 open(a.addressFamily);
1010                 if ( _bind !is null ) {
1011                     auto ad = getAddress(_bind);
1012                     debug(requests) tracef("bind to %s", ad[0]);
1013                     s.bind(ad[0]);
1014                 }
1015                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
1016                 s.connect(a);
1017                 debug(requests) tracef("Connected to %s", a);
1018                 __isConnected = true;
1019                 break;
1020             } catch (SocketException e) {
1021                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
1022                 s.close();
1023             }
1024         }
1025         if ( !__isConnected ) {
1026             throw new ConnectError("Can't connect to %s:%d".format(host, port));
1027         }
1028         return this;
1029     }
1030 
1031     ptrdiff_t send(const(void)[] buff) @safe
1032     in {assert(isConnected);}
1033     body {
1034         auto rc = s.send(buff);
1035         if (rc < 0) {
1036             close();
1037             throw new NetworkException("sending data");
1038         }
1039         return rc;
1040     }
1041 
1042     ptrdiff_t receive(void[] buff) @safe {
1043         while (true) {
1044             auto r = s.receive(buff);
1045             if (r < 0) {
1046                 version(Windows) {
1047                     close();
1048                     if ( errno == 0 ) {
1049                         throw new TimeoutException("Timeout receiving data");
1050                     }
1051                     throw new NetworkException("receiving data");
1052                 }
1053                 version(Posix) {
1054                     if ( errno == EINTR ) {
1055                         continue;
1056                     }
1057                     close();
1058                     if ( errno == EAGAIN ) {
1059                         throw new TimeoutException("Timeout receiving data");
1060                     }
1061                     throw new NetworkException("receiving data");
1062                 }
1063             }
1064             else {
1065                 buff.length = r;
1066             }
1067             return r;
1068         }
1069         assert(false);
1070     }
1071 
1072     @property void readTimeout(Duration timeout) @safe {
1073         s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
1074     }
1075     override SocketStream accept() {
1076         assert(false, "Implement before use");
1077     }
1078     @property override void reuseAddr(bool yes){
1079         if (yes) {
1080             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
1081         }
1082         else {
1083             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
1084         }
1085     }
1086     override void bind(Address addr){
1087         s.bind(addr);
1088     }
1089     override void listen(int n) {
1090         s.listen(n);
1091     };
1092 }
1093 
1094 version (vibeD) {
1095     import vibe.core.net, vibe.stream.tls;
1096 
1097     public class TCPVibeStream : NetworkStream {
1098     private:
1099         TCPConnection _conn;
1100         Duration _readTimeout = Duration.max;
1101         bool _isOpen = true;
1102         string _bind;
1103 
1104     public:
1105         @property bool isConnected() const {
1106             return _conn.connected;
1107         }
1108         @property override bool isOpen() const {
1109             return _conn && _isOpen;
1110         }
1111         void close() @trusted {
1112             _conn.close();
1113             _isOpen = false;
1114         }
1115         override TCPConnection so() {
1116             return _conn;
1117         }
1118         override void bind(string to) {
1119                 _bind = to;
1120         }
1121         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1122             // FIXME: timeout not supported in vibe.d
1123             try {
1124                 _conn = connectTCP(host, port, _bind);
1125             }
1126             catch (Exception e)
1127                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1128 
1129             return this;
1130         }
1131 
1132         ptrdiff_t send(const(void)[] buff) {
1133             _conn.write(cast(const(ubyte)[])buff);
1134             return buff.length;
1135         }
1136 
1137         ptrdiff_t receive(void[] buff) {
1138             if (!_conn.waitForData(_readTimeout)) {
1139                 if (!_conn.connected) {
1140                     return 0;
1141                 }
1142                 throw new TimeoutException("Timeout receiving data");
1143             }
1144 
1145             if(_conn.empty) {
1146                 return 0;
1147             }
1148 
1149             auto chunk = min(_conn.leastSize, buff.length);
1150             assert(chunk != 0);
1151             _conn.read(cast(ubyte[])buff[0 .. chunk]);
1152             return chunk;
1153         }
1154 
1155         @property void readTimeout(Duration timeout) {
1156             if (timeout == 0.seconds) {
1157                 _readTimeout = Duration.max;
1158             }
1159             else {
1160                 _readTimeout = timeout;
1161             }
1162         }
1163         override TCPVibeStream accept() {
1164             assert(false, "Must be implemented");
1165         }
1166         override @property void reuseAddr(bool){
1167             assert(false, "Not Implemented");
1168         }
1169         override void bind(Address){
1170             assert(false, "Not Implemented");
1171         }
1172         override void listen(int){
1173             assert(false, "Not Implemented");
1174         }
1175     }
1176 
1177     public class SSLVibeStream : TCPVibeStream {
1178     private:
1179         Stream _sslStream;
1180         bool   _isOpen = true;
1181         SSLOptions _sslOptions;
1182         TCPConnection underlyingConnection;
1183 
1184         void connectSSL(string host) {
1185             auto sslctx = createTLSContext(TLSContextKind.client);
1186             if ( _sslOptions.getVerifyPeer() ) {
1187                 if ( _sslOptions.getCaCert() == null ) {
1188                     throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate.");
1189                 }
1190                 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert());
1191                 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert;
1192             } else {
1193                 sslctx.peerValidationMode = TLSPeerValidationMode.none;
1194             }
1195             immutable keyFile = _sslOptions.getKeyFile();
1196             immutable certFile = _sslOptions.getCertFile();
1197             final switch(_sslOptions.haveFiles()) {
1198                 case 0b11:  // both files
1199                     sslctx.usePrivateKeyFile(keyFile);
1200                     sslctx.useCertificateChainFile(certFile);
1201                     break;
1202                 case 0b01:  // key only
1203                     sslctx.usePrivateKeyFile(keyFile);
1204                     sslctx.useCertificateChainFile(keyFile);
1205                     break;
1206                 case 0b10:  // cert only
1207                     sslctx.usePrivateKeyFile(certFile);
1208                     sslctx.useCertificateChainFile(certFile);
1209                     break;
1210                 case 0b00:
1211                     break;
1212             }
1213             _sslStream = createTLSStream(_conn, sslctx, host);
1214         }
1215 
1216     public:
1217         this(SSLOptions opts) {
1218             _sslOptions = opts;
1219         }
1220         override TCPConnection so() {
1221             return _conn;
1222         }
1223         this(NetworkStream ostream, SSLOptions opts, string host = null) {
1224             _sslOptions = opts;
1225             auto oconn = ostream.so();
1226             underlyingConnection = oconn;
1227             _conn = oconn;
1228             connectSSL(host);
1229         }
1230         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1231             try {
1232                 _conn = connectTCP(host, port);
1233                 connectSSL(host);
1234             }
1235             catch (ConnectError e) {
1236                 throw e;
1237             }
1238             catch (Exception e) {
1239                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1240             }
1241 
1242             return this;
1243         }
1244 
1245         override ptrdiff_t send(const(void)[] buff) {
1246             _sslStream.write(cast(const(ubyte)[])buff);
1247             return buff.length;
1248         }
1249 
1250         override ptrdiff_t receive(void[] buff) {
1251             if (!_sslStream.dataAvailableForRead) {
1252                 if (!_conn.waitForData(_readTimeout)) {
1253                     if (!_conn.connected) {
1254                         return 0;
1255                     }
1256                     throw new TimeoutException("Timeout receiving data");
1257                 }
1258             }
1259 
1260             if(_sslStream.empty) {
1261                 return 0;
1262             }
1263 
1264             auto chunk = min(_sslStream.leastSize, buff.length);
1265             assert(chunk != 0);
1266             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
1267             return chunk;
1268         }
1269 
1270         override void close() @trusted {
1271             _sslStream.finalize();
1272             _conn.close();
1273             _isOpen = false;
1274         }
1275         @property override bool isOpen() const {
1276             return _conn && _isOpen;
1277         }
1278         override SSLVibeStream accept() {
1279             assert(false, "Must be implemented");
1280         }
1281         override @property void reuseAddr(bool){
1282             assert(false, "Not Implemented");
1283         }
1284         override void bind(Address){
1285             assert(false, "Not Implemented");
1286         }
1287         override void listen(int){
1288             assert(false, "Not Implemented");
1289         }
1290     }
1291 }
1292 
1293 version (vibeD) {
1294     public alias TCPStream = TCPVibeStream;
1295     public alias SSLStream = SSLVibeStream;
1296 }
1297 else {
1298     public alias TCPStream = TCPSocketStream;
1299     public alias SSLStream = SSLSocketStream;
1300 }