1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 
20 import requests.ssl_adapter : openssl, SSL, SSL_CTX;
21 
22 alias InDataHandler = DataPipeIface!ubyte;
23 
24 public class ConnectError: Exception {
25     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
26         super(message, file, line, next);
27     }
28 }
29 
30 class DecodingException: Exception {
31     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
32         super(message, file, line, next);
33     }
34 }
35 
36 public class TimeoutException: Exception {
37     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
38         super(message, file, line, next);
39     }
40 }
41 
42 public class NetworkException: Exception {
43     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
44         super(message, file, line, next);
45     }
46 }
47 
48 /**
49  * DataPipeIface can accept some data, process, and return processed data.
50  */
51 public interface DataPipeIface(E) {
52     /// Is there any processed data ready for reading?
53     bool empty();
54     /// Put next data portion for processing
55     //void put(E[]);
56     void putNoCopy(E[]);
57     /// Get any ready data
58     E[] get();
59     /// Signal on end of incoming data stream.
60     void flush();
61 }
62 /**
63  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
64  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
65  * and uncompress Content-Encoding "gzip".
66  */
67 public class DataPipe(E) : DataPipeIface!E {
68 
69     DataPipeIface!(E)[]  pipe;
70     Buffer!E             buffer;
71     /// Append data processor to pipeline
72     /// Params:
73     /// p = processor
74     final void insert(DataPipeIface!E p) {
75         pipe ~= p;
76     }
77     final E[][] process(DataPipeIface!E p, E[][] data) {
78         E[][] result;
79         data.each!(e => p.putNoCopy(e));
80         while(!p.empty()) result ~= p.get();
81         return result;
82     }
83     /// Process next data portion. Data passed over pipeline and store result in buffer.
84     /// Params:
85     /// data = input data buffer.
86     /// NoCopy means we do not copy data to buffer, we keep reference
87     final void putNoCopy(E[] data) {
88         if ( pipe.empty ) {
89             buffer.putNoCopy(data);
90             return;
91         }
92         try {
93             auto t = process(pipe.front, [data]);
94             foreach(ref p; pipe[1..$]) {
95                 t = process(p, t);
96             }
97             t.each!(b => buffer.putNoCopy(b));
98         }
99         catch (Exception e) {
100             throw new DecodingException(e.msg);
101         }
102     }
103     /// Get what was collected in internal buffer and clear it.
104     /// Returns:
105     /// data collected.
106     final E[] get() {
107         if ( buffer.empty ) {
108             return E[].init;
109         }
110         auto res = buffer.data;
111         buffer = Buffer!E.init;
112         return res;
113     }
114     ///
115     /// get without datamove. but user receive [][]
116     ///
117     final E[][] getNoCopy()  {
118         if ( buffer.empty ) {
119             return E[][].init;
120         }
121         E[][] res = buffer.__repr.__buffer;
122         buffer = Buffer!E.init;
123         return res;
124     }
125     /// Test if internal buffer is empty
126     /// Returns:
127     /// true if internal buffer is empty (nothing to get())
128     final bool empty() pure const @safe {
129         return buffer.empty;
130     }
131     final void flush() {
132         E[][] product;
133         foreach(ref p; pipe) {
134             product.each!(e => p.putNoCopy(e));
135             p.flush();
136             product.length = 0;
137             while( !p.empty ) product ~= p.get();
138         }
139         product.each!(b => buffer.putNoCopy(b));
140     }
141 }
142 
143 /**
144  * Processor for gzipped/compressed content.
145  * Also support InputRange interface.
146  */
147 public class Decompressor(E) : DataPipeIface!E {
148     private {
149         Buffer!ubyte __buff;
150         UnCompress   __zlib;
151     }
152     this() {
153         __buff = Buffer!ubyte();
154         __zlib = new UnCompress();
155     }
156     final override void putNoCopy(E[] data) {
157         if ( __zlib is null  ) {
158             __zlib = new UnCompress();
159         }
160         __buff.putNoCopy(__zlib.uncompress(data));
161     }
162     final override E[] get() pure {
163         assert(__buff.length);
164         auto r = __buff.__repr.__buffer[0];
165         __buff.popFrontN(r.length);
166         return cast(E[])r;
167     }
168     final override void flush() {
169         if ( __zlib is null  ) {
170             return;
171         }
172         __buff.put(__zlib.flush());
173     }
174     final override @property bool empty() const pure @safe {
175         debug(requests) tracef("empty=%b", __buff.empty);
176         return __buff.empty;
177     }
178     final @property auto ref front() pure const @safe {
179         debug(requests) tracef("front: buff length=%d", __buff.length);
180         return __buff.front;
181     }
182     final @property auto popFront() pure @safe {
183         debug(requests) tracef("popFront: buff length=%d", __buff.length);
184         return __buff.popFront;
185     }
186     final @property void popFrontN(size_t n) pure @safe {
187         __buff.popFrontN(n);
188     }
189 }
190 
191 /**
192  * Unchunk chunked http responce body.
193  */
194 public class DecodeChunked : DataPipeIface!ubyte {
195     //    length := 0
196     //    read chunk-size, chunk-extension (if any) and CRLF
197     //    while (chunk-size > 0) {
198     //        read chunk-data and CRLF
199     //        append chunk-data to entity-body
200     //        length := length + chunk-size
201     //        read chunk-size and CRLF
202     //    }
203     //    read entity-header
204     //    while (entity-header not empty) {
205     //        append entity-header to existing header fields
206     //        read entity-header
207     //    }
208     //    Content-Length := length
209     //    Remove "chunked" from Transfer-Encoding
210     //
211 
212     //    Chunked-Body   = *chunk
213     //                      last-chunk
214     //                      trailer
215     //                      CRLF
216     //
217     //    chunk          = chunk-size [ chunk-extension ] CRLF
218     //                     chunk-data CRLF
219     //                     chunk-size     = 1*HEX
220     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
221     //
222     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
223     //    chunk-ext-name = token
224     //    chunk-ext-val  = token | quoted-string
225     //    chunk-data     = chunk-size(OCTET)
226     //    trailer        = *(entity-header CRLF)
227 
228     alias eType = ubyte;
229     immutable eType[] CRLF = ['\r', '\n'];
230     private {
231         enum         States {huntingSize, huntingSeparator, receiving, trailer};
232         char         state = States.huntingSize;
233         size_t       chunk_size, to_receive;
234         Buffer!ubyte buff;
235         ubyte[]      linebuff;
236     }
237     final void putNoCopy(eType[] data) {
238         while ( data.length ) {
239             if ( state == States.trailer ) {
240                 to_receive = to_receive - min(to_receive, data.length);
241                 return;
242             }
243             if ( state == States.huntingSize ) {
244                 import std.ascii;
245                 ubyte[10] digits;
246                 int i;
247                 for(i=0;i<data.length;i++) {
248                     ubyte v = data[i];
249                     digits[i] = v;
250                     if ( v == '\n' ) {
251                         i+=1;
252                         break;
253                     }
254                 }
255                 linebuff ~= digits[0..i];
256                 if ( linebuff.length >= 80 ) {
257                     throw new DecodingException("Can't find chunk size in the body");
258                 }
259                 data = data[i..$];
260                 if (!linebuff.canFind(CRLF)) {
261                     continue;
262                 }
263                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
264                 state = States.receiving;
265                 to_receive = chunk_size;
266                 if ( chunk_size == 0 ) {
267                     to_receive = 2-min(2, data.length); // trailing \r\n
268                     state = States.trailer;
269                     return;
270                 }
271                 continue;
272             }
273             if ( state == States.receiving ) {
274                 if (to_receive > 0 ) {
275                     auto can_store = min(to_receive, data.length);
276                     buff.putNoCopy(data[0..can_store]);
277                     data = data[can_store..$];
278                     to_receive -= can_store;
279                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
280                     if ( to_receive == 0 ) {
281                         //tracef("switch to huntig separator");
282                         state = States.huntingSeparator;
283                         continue;
284                     }
285                     continue;
286                 }
287                 assert(false);
288             }
289             if ( state == States.huntingSeparator ) {
290                 if ( data[0] == '\n' || data[0]=='\r') {
291                     data = data[1..$];
292                     continue;
293                 }
294                 state = States.huntingSize;
295                 linebuff.length = 0;
296                 continue;
297             }
298         }
299     }
300     final eType[] get() {
301         auto r = buff.__repr.__buffer[0];
302         buff.popFrontN(r.length);
303         return r;
304     }
305     final void flush() {
306     }
307     final bool empty() {
308         debug(requests) tracef("empty=%b", buff.empty);
309         return buff.empty;
310     }
311     final bool done() {
312         return state==States.trailer && to_receive==0;
313     }
314 }
315 
316 unittest {
317     info("Testing DataPipe");
318     globalLogLevel(LogLevel.info);
319     alias eType = char;
320     eType[] gzipped = [
321         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
322         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
323         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
324         0x08, 0x00, 0x00, 0x00
325     ]; // "abc\ndef\n"
326     auto d = new Decompressor!eType();
327     d.putNoCopy(gzipped[0..2].dup);
328     d.putNoCopy(gzipped[2..10].dup);
329     d.putNoCopy(gzipped[10..$].dup);
330     d.flush();
331     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
332 
333     auto e = new Decompressor!eType();
334     e.putNoCopy(gzipped[0..10].dup);
335     e.putNoCopy(gzipped[10..$].dup);
336     e.flush();
337     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
338     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
339     auto dp = new DataPipe!eType;
340     dp.insert(new Decompressor!eType());
341     dp.putNoCopy(gzipped[0..2].dup);
342     dp.putNoCopy(gzipped[2..$].dup);
343     dp.flush();
344     assert(equal(dp.get(), "abc\ndef\n"));
345     // empty datapipe shoul just pass input to output
346     auto dpu = new DataPipe!ubyte;
347     dpu.putNoCopy("abcd".dup.representation);
348     dpu.putNoCopy("efgh".dup.representation);
349     dpu.flush();
350     assert(equal(dpu.get(), "abcdefgh"));
351     info("Test unchunker properties");
352     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation;
353     ubyte[][] result;
354     auto uc = new DecodeChunked();
355     uc.putNoCopy(twoChunks);
356     while(!uc.empty) {
357         result ~= uc.get();
358     }
359     assert(equal(result[0], ['1', '2']));
360     assert(equal(result[1], ['3', '4']));
361     info("unchunker correctness - ok");
362     result[0][0] = '5';
363     assert(twoChunks[3] == '5');
364     info("unchunker zero copy - ok");
365     info("Testing DataPipe - done");
366 }
367 /**
368  * Buffer used to collect and process data from network. It remainds Appender, but support
369  * also Range interface.
370  * $(P To place data in buffer use put() method.)
371  * $(P  To retrieve data from buffer you can use several methods:)
372  * $(UL
373  *  $(LI Range methods: front, back, index [])
374  *  $(LI data method: return collected data (like Appender.data))
375  * )
376  */
377 static this() {
378 }
379 static ~this() {
380 }
381 enum   CACHESIZE = 1024;
382 
383 static long reprAlloc;
384 static long reprCacheHit;
385 static long reprCacheRequests;
386 
387 
388 public struct Buffer(T) {
389 //    static Repr[CACHESIZE]  cache;
390 //    static uint             cacheIndex;
391 
392     private {
393         Repr  cachedOrNew() {
394             return new Repr;
395 //            reprCacheRequests++;
396 //            if ( false && cacheIndex>0 ) {
397 //                reprCacheHit++;
398 //                cacheIndex -= 1;
399 //                return cache[cacheIndex];
400 //            } else {
401 //                return new Repr;
402 //            }
403         }
404         class Repr {
405             size_t         __length;
406             Unqual!T[][]   __buffer;
407             this() {
408                 reprAlloc++;
409                 __length = 0;
410             }
411             this(Repr other) {
412                 reprAlloc++;
413                 if ( other is null )
414                     return;
415                 __length = other.__length;
416                 __buffer = other.__buffer.dup;
417             }
418         }
419         Repr __repr;
420     }
421 
422     alias toString = data!string;
423 
424     this(this) {
425         if ( !__repr ) {
426             return;
427         }
428         __repr = new Repr(__repr);
429     }
430     this(U)(U[] data) {
431         put(data);
432     }
433     ~this() {
434         __repr = null;
435     }
436     /***************
437      * store data. Data copied
438      */
439     auto put(U)(U[] data) {
440         if ( data.length == 0 ) {
441             return;
442         }
443         if ( !__repr ) {
444             __repr = cachedOrNew();
445         }
446         static if (!is(U == T)) {
447             auto d = cast(T[])(data);
448             __repr.__length += d.length;
449             __repr.__buffer ~= d.dup;
450         } else {
451             __repr.__length += data.length;
452             __repr.__buffer ~= data.dup;
453         }
454         return;
455     }
456     auto putNoCopy(U)(U[] data) {
457         if ( data.length == 0 ) {
458             return;
459         }
460         if ( !__repr ) {
461             __repr = cachedOrNew();
462         }
463         static if (!is(U == T)) {
464             auto d = cast(T[])(data);
465             __repr.__length += d.length;
466             __repr.__buffer ~= d;
467         } else {
468             __repr.__length += data.length;
469             __repr.__buffer ~= data;
470         }
471         return;
472     }
473     @property auto opDollar() const pure @safe {
474         return __repr.__length;
475     }
476     @property size_t length() const pure @safe {
477         if ( !__repr ) {
478             return 0;
479         }
480         return __repr.__length;
481     }
482     @property auto empty() const pure @safe {
483         return length == 0;
484     }
485     @property auto ref front() const pure @safe {
486         assert(length);
487         return __repr.__buffer.front.front;
488     }
489     @property auto ref back() const pure @safe {
490         assert(length);
491         return __repr.__buffer.back.back;
492     }
493     @property void popFront() pure @safe {
494         assert(length);
495         with ( __repr ) {
496             __buffer.front.popFront;
497             if ( __buffer.front.length == 0 ) {
498                 __buffer.popFront;
499             }
500             __length--;
501         }
502     }
503     @property void popFrontN(size_t n) pure @safe {
504         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
505         __repr.__length -= n;
506         while( n ) {
507             if ( n <= __repr.__buffer.front.length ) {
508                 __repr.__buffer.front.popFrontN(n);
509                 if ( __repr.__buffer.front.length == 0 ) {
510                     __repr.__buffer.popFront;
511                 }
512                 return;
513             }
514             n -= __repr.__buffer.front.length;
515             __repr.__buffer.popFront;
516         }
517     }
518     @property void popBack() pure @safe {
519         assert(length);
520         __repr.__buffer.back.popBack;
521         if ( __repr.__buffer.back.length == 0 ) {
522             __repr.__buffer.popBack;
523         }
524         __repr.__length--;
525     }
526     @property void popBackN(size_t n) pure @safe {
527         assert(n <= length, "n: %d, length: %d".format(n, length));
528         __repr.__length -= n;
529         while( n ) {
530             if ( n <= __repr.__buffer.back.length ) {
531                 __repr.__buffer.back.popBackN(n);
532                 if ( __repr.__buffer.back.length == 0 ) {
533                     __repr.__buffer.popBack;
534                 }
535                 return;
536             }
537             n -= __repr.__buffer.back.length;
538             __repr.__buffer.popBack;
539         }
540     }
541     @property auto save() @safe {
542         auto n = Buffer!T();
543         n.__repr = new Repr(__repr);
544         return n;
545     }
546     @property auto ref opIndex(size_t n) const pure @safe {
547         assert( __repr && n < __repr.__length );
548         foreach(b; __repr.__buffer) {
549             if ( n < b.length ) {
550                 return b[n];
551             }
552             n -= b.length;
553         }
554         assert(false, "Impossible");
555     }
556     Buffer!T opSlice(size_t m, size_t n) {
557         if ( empty || m == n ) {
558             return Buffer!T();
559         }
560         assert( m <= n && n <= __repr.__length);
561         auto res = this.save();
562         res.popBackN(res.__repr.__length-n);
563         res.popFrontN(m);
564         return res;
565     }
566     @property auto data(U=T[])() pure {
567         static if ( is(U==T[]) ) {
568             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
569                 return __repr.__buffer.front;
570             }
571         }
572         Appender!(T[]) a;
573         if ( __repr && __repr.__buffer ) {
574             foreach(ref b; __repr.__buffer) {
575                 a.put(b);
576             }
577         }
578         static if ( is(U==T[]) ) {
579             return a.data;
580         } else {
581             return cast(U)a.data;
582         }
583     }
584     string opCast(string)() {
585         return this.toString;
586     }
587     bool opEquals(U)(U x) {
588         return cast(U)this == x;
589     }
590 
591 }
592 ///
593 public unittest {
594 
595     static assert(isInputRange!(Buffer!ubyte));
596     static assert(isForwardRange!(Buffer!ubyte));
597     static assert(hasLength!(Buffer!ubyte));
598     static assert(hasSlicing!(Buffer!ubyte));
599     static assert(isBidirectionalRange!(Buffer!ubyte));
600     static assert(isRandomAccessRange!(Buffer!ubyte));
601 
602     auto b = Buffer!ubyte();
603     b.put("abc".representation.dup);
604     b.put("def".representation.dup);
605     assert(b.length == 6);
606     assert(b.toString == "abcdef");
607     assert(b.front == 'a');
608     assert(b.back == 'f');
609     assert(equal(b[0..$], "abcdef"));
610     assert(equal(b[$-2..$], "ef"));
611     assert(b == "abcdef");
612     b.popFront;
613     b.popBack;
614     assert(b.front == 'b');
615     assert(b.back == 'e');
616     assert(b.length == 4);
617     assert(retro(b).front == 'e');
618     assert(countUntil(b, 'e') == 3);
619     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
620     assert(equal(b, "bcde"));
621     b.popFront; b.popFront;
622     assert(b.front == 'd');
623     assert(b.front == b[0]);
624     assert(b.back == b[$-1]);
625 
626     auto c = Buffer!ubyte();
627     c.put("Header0: value0\n".representation.dup);
628     c.put("Header1: value1\n".representation.dup);
629     c.put("Header2: value2\n\nbody".representation.dup);
630     auto c_length = c.length;
631     auto eoh = countUntil(c, "\n\n");
632     assert(eoh == 47);
633     foreach(header; c[0..eoh].splitter('\n') ) {
634         writeln(cast(string)header.data);
635     }
636     assert(equal(findSplit(c, "\n\n")[2], "body"));
637     assert(c.length == c_length);
638 }
639 
640 public struct SSLOptions {
641     enum filetype {
642         pem,
643         asn1,
644         der = asn1,
645     }
646     private {
647         /**
648          * do we need to veryfy peer?
649          */
650         bool     _verifyPeer = true;
651         /**
652          * path to CA cert
653          */
654         string   _caCert;
655         /**
656          * path to key file (can also contain cert (for pem)
657          */
658         string   _keyFile;
659         /**
660          * path to cert file (can also contain key (for pem)
661          */
662         string   _certFile;
663         filetype _keyType = filetype.pem;
664         filetype _certType = filetype.pem;
665     }
666     ubyte haveFiles() pure nothrow @safe @nogc {
667         ubyte r = 0;
668         if ( _keyFile  ) r|=1;
669         if ( _certFile ) r|=2;
670         return r;
671     }
672     // do we want to verify peer certificates?
673     bool getVerifyPeer() pure nothrow @nogc {
674         return _verifyPeer;
675     }
676     SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe {
677         _verifyPeer = v;
678         return this;
679     }
680     /// set key file name and type (default - pem)
681     auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
682         _keyFile = f;
683         _keyType = t;
684         return this;
685     }
686     auto getKeyFile() @safe pure nothrow @nogc {
687         return _keyFile;
688     }
689     auto getKeyType() @safe pure nothrow @nogc {
690         return _keyType;
691     }
692     /// set cert file name and type (default - pem)
693     auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
694         _certFile = f;
695         _certType = t;
696         return this;
697     }
698     auto setCaCert(string p) @safe pure nothrow @nogc {
699         _caCert = p;
700         return this;
701     }
702     auto getCaCert() @safe pure nothrow @nogc {
703         return _caCert;
704     }
705     auto getCertFile() @safe pure nothrow @nogc {
706         return _certFile;
707     }
708     auto getCertType() @safe pure nothrow @nogc {
709         return _certType;
710     }
711     /// set key file type
712     void setKeyType(string t) @safe pure nothrow {
713         _keyType = cast(filetype)sslKeyTypes[t];
714     }
715     /// set cert file type
716     void setCertType(string t) @safe pure nothrow {
717         _certType = cast(filetype)sslKeyTypes[t];
718     }
719 }
720 static immutable int[string] sslKeyTypes;
721 shared static this() {
722     sslKeyTypes = [
723         "pem":SSLOptions.filetype.pem,
724         "asn1":SSLOptions.filetype.asn1,
725         "der":SSLOptions.filetype.der,
726     ];
727 }
728 
729 version(vibeD) {
730 }
731 else {
732     extern(C) {
733         int SSL_library_init();
734     }
735 
736     enum SSL_VERIFY_PEER = 0x01;
737     enum SSL_FILETYPE_PEM = 1;
738     enum SSL_FILETYPE_ASN1 = 2;
739 
740     immutable int[SSLOptions.filetype] ft2ssl;
741 
742     shared static this() {
743         ft2ssl = [
744             SSLOptions.filetype.pem: SSL_FILETYPE_PEM,
745             SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1,
746             SSLOptions.filetype.der: SSL_FILETYPE_ASN1
747         ];
748     }
749 
750     public class OpenSslSocket : Socket {
751         //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
752         private SSL* ssl;
753         private SSL_CTX* ctx;
754 
755         private void initSsl(SSLOptions opts) {
756             //ctx = SSL_CTX_new(SSLv3_client_method());
757             ctx = openssl.SSL_CTX_new(openssl.TLS_method());
758             assert(ctx !is null);
759             if ( opts.getVerifyPeer() ) {
760                 openssl.SSL_CTX_set_default_verify_paths(ctx);
761                 if ( opts.getCaCert() ) {
762                     openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null);
763                 }
764                 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null);
765             }
766             immutable keyFile = opts.getKeyFile();
767             immutable keyType = opts.getKeyType();
768             immutable certFile = opts.getCertFile();
769             immutable certType = opts.getCertType();
770             final switch(opts.haveFiles()) {
771                 case 0b11:  // both files
772                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
773                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]);
774                     break;
775                 case 0b01:  // key only
776                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
777                     openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]);
778                     break;
779                 case 0b10:  // cert only
780                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  certFile.toStringz(), ft2ssl[certType]);
781                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]);
782                     break;
783                 case 0b00:
784                     break;
785             }
786             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
787             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
788             ssl = openssl.SSL_new(ctx);
789             openssl.SSL_set_fd(ssl, cast(int)this.handle);
790         }
791 
792         @trusted
793         override void connect(Address dest) {
794             super.connect(dest);
795             if(openssl.SSL_connect(ssl) == -1) {
796                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
797             }
798         }
799         auto connectSSL() {
800             if(openssl.SSL_connect(ssl) == -1) {
801                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
802             }
803             debug(requests) tracef("ssl socket connected");
804             return this;
805         }
806         @trusted
807         override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope {
808             return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length);
809         }
810         override ptrdiff_t send(const(void)[] buf) scope {
811             return send(buf, SocketFlags.NONE);
812         }
813         @trusted
814         override ptrdiff_t receive(void[] buf, SocketFlags flags) scope {
815             return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length);
816         }
817         override ptrdiff_t receive(void[] buf) scope {
818             return receive(buf, SocketFlags.NONE);
819         }
820         this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) {
821             super(af, type);
822             initSsl(opts);
823         }
824         this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) {
825             super(sock, af);
826             initSsl(opts);
827         }
828         override void close() scope {
829             super.close();
830             if ( ssl !is null ) {
831                 openssl.SSL_free(ssl);
832                 ssl = null;
833             }
834             if ( ctx !is null ) {
835                 openssl.SSL_CTX_free(ctx);
836                 ctx = null;
837             }
838         }
839         void SSL_set_tlsext_host_name(string host) {
840 
841         }
842     }
843 
844     public class SSLSocketStream: SocketStream {
845         private SSLOptions _sslOptions;
846         private Socket underlyingSocket;
847         private SSL* ssl;
848         private string host;
849 
850         this(SSLOptions opts) {
851             _sslOptions = opts;
852         }
853         this(NetworkStream ostream, SSLOptions opts, string host = null) {
854             _sslOptions = opts;
855             this.host = host;
856             auto osock = ostream.so();
857             underlyingSocket = osock;
858             auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions);
859             ssl = ss.ssl;
860             if ( host !is null ) {
861                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
862             }
863             ss.connectSSL();
864             __isOpen = true;
865             __isConnected = true;
866             s = ss;
867             debug(requests) tracef("ssl stream created from another stream: %s", s);
868         }
869         override void close() {
870             ssl = null;
871             host = null;
872             super.close();
873             if ( underlyingSocket ) {
874                 underlyingSocket.close();
875             }
876         }
877         override void open(AddressFamily fa) {
878             if ( s !is null ) {
879                 s.close();
880             }
881             auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions);
882             assert(ss !is null, "Can't create socket");
883             ssl = ss.ssl;
884             if ( host !is null ) {
885                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
886             }
887             s = ss;
888             __isOpen = true;
889         }
890         override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) {
891             host = h;
892             return super.connect(h, p, timeout);
893         }
894         override SSLSocketStream accept() {
895             auto newso = s.accept();
896             if ( s is null ) {
897                 return null;
898             }
899             auto newstream = new SSLSocketStream(_sslOptions);
900             auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily);
901             newstream.s = sslSocket;
902             newstream.__isOpen = true;
903             newstream.__isConnected = true;
904             return newstream;
905         }
906     }
907     public class TCPSocketStream : SocketStream {
908         override void open(AddressFamily fa) {
909             if ( s !is null ) {
910                 s.close();
911             }
912             s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
913             assert(s !is null, "Can't create socket");
914             __isOpen = true;
915             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
916         }
917         override TCPSocketStream accept() {
918             auto newso = s.accept();
919             if ( s is null ) {
920                 return null;
921             }
922             auto newstream = new TCPSocketStream();
923             newstream.s = newso;
924             newstream.__isOpen = true;
925             newstream.__isConnected = true;
926             newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
927             return newstream;
928         }
929     }
930 }
931 
932 public interface NetworkStream {
933     @property bool isConnected() const;
934     @property bool isOpen() const;
935 
936     void close() @trusted;
937 
938     ///
939     /// timeout is the socket write timeout.
940     ///
941     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
942 
943     ptrdiff_t send(const(void)[] buff);
944     ptrdiff_t receive(void[] buff);
945 
946     NetworkStream accept();
947     @property void reuseAddr(bool);
948     void bind(string);
949     void bind(Address);
950     void listen(int);
951     version(vibeD) {
952         TCPConnection so();
953     } else {
954         Socket so();
955     }
956     ///
957     /// Set timeout for receive calls. 0 means no timeout.
958     ///
959     @property void readTimeout(Duration timeout);
960 }
961 
962 public abstract class SocketStream : NetworkStream {
963     private {
964         Duration timeout;
965         Socket   s;
966         bool     __isOpen;
967         bool     __isConnected;
968         string   _bind;
969     }
970     void open(AddressFamily fa) {
971     }
972     @property Socket so() @safe pure {
973         return s;
974     }
975     @property bool isOpen() @safe @nogc pure const {
976         return s && __isOpen;
977     }
978     @property bool isConnected() @safe @nogc pure const {
979         return s && __isOpen && __isConnected;
980     }
981     void close() @trusted {
982         debug(requests) tracef("Close socket");
983         if ( isOpen ) {
984             s.close();
985             __isOpen = false;
986             __isConnected = false;
987         }
988         s = null;
989     }
990     /***
991     *  bind() just remember address. We will cal bind() at the time of connect as
992     *  we can have several connection trials.
993     ***/
994     override void bind(string to) {
995         _bind = to;
996     }
997     /***
998     *  Make connection to remote site. Bind, handle connection error, try several addresses, etc
999     ***/
1000     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1001         debug(requests) tracef(format("Create connection to %s:%d", host, port));
1002         Address[] addresses;
1003         __isConnected = false;
1004         try {
1005             addresses = getAddress(host, port);
1006         } catch (Exception e) {
1007             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
1008         }
1009         foreach(a; addresses) {
1010             debug(requests) tracef("Trying %s", a);
1011             try {
1012                 open(a.addressFamily);
1013                 if ( _bind !is null ) {
1014                     auto ad = getAddress(_bind);
1015                     debug(requests) tracef("bind to %s", ad[0]);
1016                     s.bind(ad[0]);
1017                 }
1018                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
1019                 s.connect(a);
1020                 debug(requests) tracef("Connected to %s", a);
1021                 __isConnected = true;
1022                 break;
1023             } catch (SocketException e) {
1024                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
1025                 s.close();
1026             }
1027         }
1028         if ( !__isConnected ) {
1029             throw new ConnectError("Can't connect to %s:%d".format(host, port));
1030         }
1031         return this;
1032     }
1033 
1034     ptrdiff_t send(const(void)[] buff) @safe
1035     in {assert(isConnected);}
1036     body {
1037         auto rc = s.send(buff);
1038         if (rc < 0) {
1039             close();
1040             throw new NetworkException("sending data");
1041         }
1042         return rc;
1043     }
1044 
1045     ptrdiff_t receive(void[] buff) @safe {
1046         while (true) {
1047             auto r = s.receive(buff);
1048             if (r < 0) {
1049                 version(Windows) {
1050                     close();
1051                     if ( errno == 0 ) {
1052                         throw new TimeoutException("Timeout receiving data");
1053                     }
1054                     throw new NetworkException("receiving data");
1055                 }
1056                 version(Posix) {
1057                     if ( errno == EINTR ) {
1058                         continue;
1059                     }
1060                     close();
1061                     if ( errno == EAGAIN ) {
1062                         throw new TimeoutException("Timeout receiving data");
1063                     }
1064                     throw new NetworkException("receiving data");
1065                 }
1066             }
1067             else {
1068                 buff.length = r;
1069             }
1070             return r;
1071         }
1072         assert(false);
1073     }
1074 
1075     @property void readTimeout(Duration timeout) @safe {
1076         s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
1077     }
1078     override SocketStream accept() {
1079         assert(false, "Implement before use");
1080     }
1081     @property override void reuseAddr(bool yes){
1082         if (yes) {
1083             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
1084         }
1085         else {
1086             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
1087         }
1088     }
1089     override void bind(Address addr){
1090         s.bind(addr);
1091     }
1092     override void listen(int n) {
1093         s.listen(n);
1094     };
1095 }
1096 
1097 version (vibeD) {
1098     import vibe.core.net, vibe.stream.tls;
1099 
1100     public class TCPVibeStream : NetworkStream {
1101     private:
1102         TCPConnection _conn;
1103         Duration _readTimeout = Duration.max;
1104         bool _isOpen = true;
1105         string _bind;
1106 
1107     public:
1108         @property bool isConnected() const {
1109             return _conn.connected;
1110         }
1111         @property override bool isOpen() const {
1112             return _conn && _isOpen;
1113         }
1114         void close() @trusted {
1115             _conn.close();
1116             _isOpen = false;
1117         }
1118         override TCPConnection so() {
1119             return _conn;
1120         }
1121         override void bind(string to) {
1122                 _bind = to;
1123         }
1124         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1125             // FIXME: timeout not supported in vibe.d
1126             try {
1127                 _conn = connectTCP(host, port, _bind);
1128             }
1129             catch (Exception e)
1130                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1131 
1132             return this;
1133         }
1134 
1135         ptrdiff_t send(const(void)[] buff) {
1136             _conn.write(cast(const(ubyte)[])buff);
1137             return buff.length;
1138         }
1139 
1140         ptrdiff_t receive(void[] buff) {
1141             if (!_conn.waitForData(_readTimeout)) {
1142                 if (!_conn.connected || _conn.empty ) {
1143                     return 0;
1144                 }
1145                 throw new TimeoutException("Timeout receiving data");
1146             }
1147 
1148             if(_conn.empty) {
1149                 return 0;
1150             }
1151 
1152             auto chunk = min(_conn.leastSize, buff.length);
1153             assert(chunk != 0);
1154             _conn.read(cast(ubyte[])buff[0 .. chunk]);
1155             return chunk;
1156         }
1157 
1158         @property void readTimeout(Duration timeout) {
1159             if (timeout == 0.seconds) {
1160                 _readTimeout = Duration.max;
1161             }
1162             else {
1163                 _readTimeout = timeout;
1164             }
1165         }
1166         override TCPVibeStream accept() {
1167             assert(false, "Must be implemented");
1168         }
1169         override @property void reuseAddr(bool){
1170             assert(false, "Not Implemented");
1171         }
1172         override void bind(Address){
1173             assert(false, "Not Implemented");
1174         }
1175         override void listen(int){
1176             assert(false, "Not Implemented");
1177         }
1178     }
1179 
1180     public class SSLVibeStream : TCPVibeStream {
1181     private:
1182         TLSStream _sslStream;
1183         bool   _isOpen = true;
1184         SSLOptions _sslOptions;
1185         TCPConnection underlyingConnection;
1186 
1187         void connectSSL(string host) {
1188             auto sslctx = createTLSContext(TLSContextKind.client);
1189             if ( _sslOptions.getVerifyPeer() ) {
1190                 if ( _sslOptions.getCaCert() == null ) {
1191                     throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate.");
1192                 }
1193                 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert());
1194                 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert;
1195             } else {
1196                 sslctx.peerValidationMode = TLSPeerValidationMode.none;
1197             }
1198             immutable keyFile = _sslOptions.getKeyFile();
1199             immutable certFile = _sslOptions.getCertFile();
1200             final switch(_sslOptions.haveFiles()) {
1201                 case 0b11:  // both files
1202                     sslctx.usePrivateKeyFile(keyFile);
1203                     sslctx.useCertificateChainFile(certFile);
1204                     break;
1205                 case 0b01:  // key only
1206                     sslctx.usePrivateKeyFile(keyFile);
1207                     sslctx.useCertificateChainFile(keyFile);
1208                     break;
1209                 case 0b10:  // cert only
1210                     sslctx.usePrivateKeyFile(certFile);
1211                     sslctx.useCertificateChainFile(certFile);
1212                     break;
1213                 case 0b00:
1214                     break;
1215             }
1216             _sslStream = createTLSStream(_conn, sslctx, host);
1217         }
1218 
1219     public:
1220         this(SSLOptions opts) {
1221             _sslOptions = opts;
1222         }
1223         override TCPConnection so() {
1224             return _conn;
1225         }
1226         this(NetworkStream ostream, SSLOptions opts, string host = null) {
1227             _sslOptions = opts;
1228             auto oconn = ostream.so();
1229             underlyingConnection = oconn;
1230             _conn = oconn;
1231             connectSSL(host);
1232         }
1233         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1234             try {
1235                 _conn = connectTCP(host, port);
1236                 connectSSL(host);
1237             }
1238             catch (ConnectError e) {
1239                 throw e;
1240             }
1241             catch (Exception e) {
1242                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1243             }
1244 
1245             return this;
1246         }
1247 
1248         override ptrdiff_t send(const(void)[] buff) {
1249             _sslStream.write(cast(const(ubyte)[])buff);
1250             return buff.length;
1251         }
1252 
1253         override ptrdiff_t receive(void[] buff) {
1254             if (!_sslStream.dataAvailableForRead) {
1255                 if (!_conn.waitForData(_readTimeout)) {
1256                     if (!_conn.connected) {
1257                         return 0;
1258                     }
1259                     throw new TimeoutException("Timeout receiving data");
1260                 }
1261             }
1262 
1263             if(_sslStream.empty) {
1264                 return 0;
1265             }
1266 
1267             auto chunk = min(_sslStream.leastSize, buff.length);
1268             assert(chunk != 0);
1269             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
1270             return chunk;
1271         }
1272 
1273         override void close() @trusted {
1274             _sslStream.finalize();
1275             _conn.close();
1276             _isOpen = false;
1277         }
1278         @property override bool isOpen() const {
1279             return _conn && _isOpen;
1280         }
1281         override SSLVibeStream accept() {
1282             assert(false, "Must be implemented");
1283         }
1284         override @property void reuseAddr(bool){
1285             assert(false, "Not Implemented");
1286         }
1287         override void bind(Address){
1288             assert(false, "Not Implemented");
1289         }
1290         override void listen(int){
1291             assert(false, "Not Implemented");
1292         }
1293     }
1294 }
1295 
1296 version (vibeD) {
1297     public alias TCPStream = TCPVibeStream;
1298     public alias SSLStream = SSLVibeStream;
1299 }
1300 else {
1301     public alias TCPStream = TCPSocketStream;
1302     public alias SSLStream = SSLSocketStream;
1303 }