1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 import core.stdc.string;
20 
21 import requests.ssl_adapter : openssl, SSL, SSL_CTX;
22 
23 alias InDataHandler = DataPipeIface!ubyte;
24 
25 public class ConnectError: Exception {
26     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
27         super(message, file, line, next);
28     }
29 }
30 
31 class DecodingException: Exception {
32     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
33         super(message, file, line, next);
34     }
35 }
36 
37 public class TimeoutException: Exception {
38     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
39         super(message, file, line, next);
40     }
41 }
42 
43 public class NetworkException: Exception {
44     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
45         super(message, file, line, next);
46     }
47 }
48 
49 /**
50  * DataPipeIface can accept some data, process, and return processed data.
51  */
52 public interface DataPipeIface(E) {
53     /// Is there any processed data ready for reading?
54     bool empty();
55     /// Put next data portion for processing
56     //void put(E[]);
57     void putNoCopy(E[]);
58     /// Get any ready data
59     E[] get();
60     /// Signal on end of incoming data stream.
61     void flush();
62 }
63 /**
64  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
65  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
66  * and uncompress Content-Encoding "gzip".
67  */
68 public class DataPipe(E) : DataPipeIface!E {
69 
70     DataPipeIface!(E)[]  pipe;
71     Buffer!E             buffer;
72     /// Append data processor to pipeline
73     /// Params:
74     /// p = processor
75     final void insert(DataPipeIface!E p) {
76         pipe ~= p;
77     }
78     final E[][] process(DataPipeIface!E p, E[][] data) {
79         E[][] result;
80         data.each!(e => p.putNoCopy(e));
81         while(!p.empty()) result ~= p.get();
82         return result;
83     }
84     /// Process next data portion. Data passed over pipeline and store result in buffer.
85     /// Params:
86     /// data = input data buffer.
87     /// NoCopy means we do not copy data to buffer, we keep reference
88     final void putNoCopy(E[] data) {
89         if ( pipe.empty ) {
90             buffer.putNoCopy(data);
91             return;
92         }
93         try {
94             auto t = process(pipe.front, [data]);
95             foreach(ref p; pipe[1..$]) {
96                 t = process(p, t);
97             }
98             t.each!(b => buffer.putNoCopy(b));
99         }
100         catch (Exception e) {
101             throw new DecodingException(e.msg);
102         }
103     }
104     /// Get what was collected in internal buffer and clear it.
105     /// Returns:
106     /// data collected.
107     final E[] get() {
108         if ( buffer.empty ) {
109             return E[].init;
110         }
111         auto res = buffer.data;
112         buffer = Buffer!E.init;
113         return res;
114     }
115     ///
116     /// get without datamove. but user receive [][]
117     ///
118     final E[][] getNoCopy()  {
119         if ( buffer.empty ) {
120             return E[][].init;
121         }
122         E[][] res = buffer.__repr.__buffer;
123         buffer = Buffer!E.init;
124         return res;
125     }
126     /// Test if internal buffer is empty
127     /// Returns:
128     /// true if internal buffer is empty (nothing to get())
129     final bool empty() pure const @safe {
130         return buffer.empty;
131     }
132     final void flush() {
133         E[][] product;
134         foreach(ref p; pipe) {
135             product.each!(e => p.putNoCopy(e));
136             p.flush();
137             product.length = 0;
138             while( !p.empty ) product ~= p.get();
139         }
140         product.each!(b => buffer.putNoCopy(b));
141     }
142 }
143 
144 /**
145  * Processor for gzipped/compressed content.
146  * Also support InputRange interface.
147  */
148 public class Decompressor(E) : DataPipeIface!E {
149     private {
150         Buffer!ubyte __buff;
151         UnCompress   __zlib;
152     }
153     this() {
154         __buff = Buffer!ubyte();
155         __zlib = new UnCompress();
156     }
157     final override void putNoCopy(E[] data) {
158         if ( __zlib is null  ) {
159             __zlib = new UnCompress();
160         }
161         __buff.putNoCopy(__zlib.uncompress(data));
162     }
163     final override E[] get() pure {
164         assert(__buff.length);
165         auto r = __buff.__repr.__buffer[0];
166         __buff.popFrontN(r.length);
167         return cast(E[])r;
168     }
169     final override void flush() {
170         if ( __zlib is null  ) {
171             return;
172         }
173         __buff.put(__zlib.flush());
174     }
175     final override @property bool empty() const pure @safe {
176         debug(requests) tracef("empty=%b", __buff.empty);
177         return __buff.empty;
178     }
179     final @property auto ref front() pure const @safe {
180         debug(requests) tracef("front: buff length=%d", __buff.length);
181         return __buff.front;
182     }
183     final @property auto popFront() pure @safe {
184         debug(requests) tracef("popFront: buff length=%d", __buff.length);
185         return __buff.popFront;
186     }
187     final @property void popFrontN(size_t n) pure @safe {
188         __buff.popFrontN(n);
189     }
190 }
191 
192 /**
193  * Unchunk chunked http responce body.
194  */
195 public class DecodeChunked : DataPipeIface!ubyte {
196     //    length := 0
197     //    read chunk-size, chunk-extension (if any) and CRLF
198     //    while (chunk-size > 0) {
199     //        read chunk-data and CRLF
200     //        append chunk-data to entity-body
201     //        length := length + chunk-size
202     //        read chunk-size and CRLF
203     //    }
204     //    read entity-header
205     //    while (entity-header not empty) {
206     //        append entity-header to existing header fields
207     //        read entity-header
208     //    }
209     //    Content-Length := length
210     //    Remove "chunked" from Transfer-Encoding
211     //
212 
213     //    Chunked-Body   = *chunk
214     //                      last-chunk
215     //                      trailer
216     //                      CRLF
217     //
218     //    chunk          = chunk-size [ chunk-extension ] CRLF
219     //                     chunk-data CRLF
220     //                     chunk-size     = 1*HEX
221     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
222     //
223     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
224     //    chunk-ext-name = token
225     //    chunk-ext-val  = token | quoted-string
226     //    chunk-data     = chunk-size(OCTET)
227     //    trailer        = *(entity-header CRLF)
228 
229     alias eType = ubyte;
230     immutable eType[] CRLF = ['\r', '\n'];
231     private {
232         enum         States {huntingSize, huntingSeparator, receiving, trailer};
233         char         state = States.huntingSize;
234         size_t       chunk_size, to_receive;
235         Buffer!ubyte buff;
236         ubyte[]      linebuff;
237     }
238     final void putNoCopy(eType[] data) {
239         while ( data.length ) {
240             if ( state == States.trailer ) {
241                 to_receive = to_receive - min(to_receive, data.length);
242                 return;
243             }
244             if ( state == States.huntingSize ) {
245                 import std.ascii;
246                 ubyte[10] digits;
247                 int i;
248                 for(i=0;i<data.length;i++) {
249                     ubyte v = data[i];
250                     digits[i] = v;
251                     if ( v == '\n' ) {
252                         i+=1;
253                         break;
254                     }
255                 }
256                 linebuff ~= digits[0..i];
257                 if ( linebuff.length >= 80 ) {
258                     throw new DecodingException("Can't find chunk size in the body");
259                 }
260                 data = data[i..$];
261                 if (!linebuff.canFind(CRLF)) {
262                     continue;
263                 }
264                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
265                 state = States.receiving;
266                 to_receive = chunk_size;
267                 if ( chunk_size == 0 ) {
268                     to_receive = 2-min(2, data.length); // trailing \r\n
269                     state = States.trailer;
270                     return;
271                 }
272                 continue;
273             }
274             if ( state == States.receiving ) {
275                 if (to_receive > 0 ) {
276                     auto can_store = min(to_receive, data.length);
277                     buff.putNoCopy(data[0..can_store]);
278                     data = data[can_store..$];
279                     to_receive -= can_store;
280                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
281                     if ( to_receive == 0 ) {
282                         //tracef("switch to huntig separator");
283                         state = States.huntingSeparator;
284                         continue;
285                     }
286                     continue;
287                 }
288                 assert(false);
289             }
290             if ( state == States.huntingSeparator ) {
291                 if ( data[0] == '\n' || data[0]=='\r') {
292                     data = data[1..$];
293                     continue;
294                 }
295                 state = States.huntingSize;
296                 linebuff.length = 0;
297                 continue;
298             }
299         }
300     }
301     final eType[] get() {
302         auto r = buff.__repr.__buffer[0];
303         buff.popFrontN(r.length);
304         return r;
305     }
306     final void flush() {
307     }
308     final bool empty() {
309         debug(requests) tracef("empty=%b", buff.empty);
310         return buff.empty;
311     }
312     final bool done() {
313         return state==States.trailer && to_receive==0;
314     }
315 }
316 
317 unittest {
318     info("Testing DataPipe");
319     globalLogLevel(LogLevel.info);
320     alias eType = char;
321     eType[] gzipped = [
322         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
323         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
324         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
325         0x08, 0x00, 0x00, 0x00
326     ]; // "abc\ndef\n"
327     auto d = new Decompressor!eType();
328     d.putNoCopy(gzipped[0..2].dup);
329     d.putNoCopy(gzipped[2..10].dup);
330     d.putNoCopy(gzipped[10..$].dup);
331     d.flush();
332     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
333 
334     auto e = new Decompressor!eType();
335     e.putNoCopy(gzipped[0..10].dup);
336     e.putNoCopy(gzipped[10..$].dup);
337     e.flush();
338     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
339     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
340     auto dp = new DataPipe!eType;
341     dp.insert(new Decompressor!eType());
342     dp.putNoCopy(gzipped[0..2].dup);
343     dp.putNoCopy(gzipped[2..$].dup);
344     dp.flush();
345     assert(equal(dp.get(), "abc\ndef\n"));
346     // empty datapipe shoul just pass input to output
347     auto dpu = new DataPipe!ubyte;
348     dpu.putNoCopy("abcd".dup.representation);
349     dpu.putNoCopy("efgh".dup.representation);
350     dpu.flush();
351     assert(equal(dpu.get(), "abcdefgh"));
352     info("Test unchunker properties");
353     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation;
354     ubyte[][] result;
355     auto uc = new DecodeChunked();
356     uc.putNoCopy(twoChunks);
357     while(!uc.empty) {
358         result ~= uc.get();
359     }
360     assert(equal(result[0], ['1', '2']));
361     assert(equal(result[1], ['3', '4']));
362     info("unchunker correctness - ok");
363     result[0][0] = '5';
364     assert(twoChunks[3] == '5');
365     info("unchunker zero copy - ok");
366     info("Testing DataPipe - done");
367 }
368 /**
369  * Buffer used to collect and process data from network. It remainds Appender, but support
370  * also Range interface.
371  * $(P To place data in buffer use put() method.)
372  * $(P  To retrieve data from buffer you can use several methods:)
373  * $(UL
374  *  $(LI Range methods: front, back, index [])
375  *  $(LI data method: return collected data (like Appender.data))
376  * )
377  */
378 static this() {
379 }
380 static ~this() {
381 }
382 enum   CACHESIZE = 1024;
383 
384 static long reprAlloc;
385 static long reprCacheHit;
386 static long reprCacheRequests;
387 
388 
389 public struct Buffer(T) {
390 //    static Repr[CACHESIZE]  cache;
391 //    static uint             cacheIndex;
392 
393     private {
394         Repr  cachedOrNew() {
395             return new Repr;
396 //            reprCacheRequests++;
397 //            if ( false && cacheIndex>0 ) {
398 //                reprCacheHit++;
399 //                cacheIndex -= 1;
400 //                return cache[cacheIndex];
401 //            } else {
402 //                return new Repr;
403 //            }
404         }
405         class Repr {
406             size_t         __length;
407             Unqual!T[][]   __buffer;
408             this() {
409                 reprAlloc++;
410                 __length = 0;
411             }
412             this(Repr other) {
413                 reprAlloc++;
414                 if ( other is null )
415                     return;
416                 __length = other.__length;
417                 __buffer = other.__buffer.dup;
418             }
419         }
420         Repr __repr;
421     }
422 
423     alias toString = data!string;
424 
425     this(this) {
426         if ( !__repr ) {
427             return;
428         }
429         __repr = new Repr(__repr);
430     }
431     this(U)(U[] data) {
432         put(data);
433     }
434     ~this() {
435         __repr = null;
436     }
437     /***************
438      * store data. Data copied
439      */
440     auto put(U)(U[] data) {
441         if ( data.length == 0 ) {
442             return;
443         }
444         if ( !__repr ) {
445             __repr = cachedOrNew();
446         }
447         static if (!is(U == T)) {
448             auto d = cast(T[])(data);
449             __repr.__length += d.length;
450             __repr.__buffer ~= d.dup;
451         } else {
452             __repr.__length += data.length;
453             __repr.__buffer ~= data.dup;
454         }
455         return;
456     }
457     auto putNoCopy(U)(U[] data) {
458         if ( data.length == 0 ) {
459             return;
460         }
461         if ( !__repr ) {
462             __repr = cachedOrNew();
463         }
464         static if (!is(U == T)) {
465             auto d = cast(T[])(data);
466             __repr.__length += d.length;
467             __repr.__buffer ~= d;
468         } else {
469             __repr.__length += data.length;
470             __repr.__buffer ~= data;
471         }
472         return;
473     }
474     @property auto opDollar() const pure @safe {
475         return __repr.__length;
476     }
477     @property size_t length() const pure @safe {
478         if ( !__repr ) {
479             return 0;
480         }
481         return __repr.__length;
482     }
483     @property auto empty() const pure @safe {
484         return length == 0;
485     }
486     @property auto ref front() const pure @safe {
487         assert(length);
488         return __repr.__buffer.front.front;
489     }
490     @property auto ref back() const pure @safe {
491         assert(length);
492         return __repr.__buffer.back.back;
493     }
494     @property void popFront() pure @safe {
495         assert(length);
496         with ( __repr ) {
497             __buffer.front.popFront;
498             if ( __buffer.front.length == 0 ) {
499                 __buffer.popFront;
500             }
501             __length--;
502         }
503     }
504     @property void popFrontN(size_t n) pure @safe {
505         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
506         __repr.__length -= n;
507         while( n ) {
508             if ( n <= __repr.__buffer.front.length ) {
509                 __repr.__buffer.front.popFrontN(n);
510                 if ( __repr.__buffer.front.length == 0 ) {
511                     __repr.__buffer.popFront;
512                 }
513                 return;
514             }
515             n -= __repr.__buffer.front.length;
516             __repr.__buffer.popFront;
517         }
518     }
519     @property void popBack() pure @safe {
520         assert(length);
521         __repr.__buffer.back.popBack;
522         if ( __repr.__buffer.back.length == 0 ) {
523             __repr.__buffer.popBack;
524         }
525         __repr.__length--;
526     }
527     @property void popBackN(size_t n) pure @safe {
528         assert(n <= length, "n: %d, length: %d".format(n, length));
529         __repr.__length -= n;
530         while( n ) {
531             if ( n <= __repr.__buffer.back.length ) {
532                 __repr.__buffer.back.popBackN(n);
533                 if ( __repr.__buffer.back.length == 0 ) {
534                     __repr.__buffer.popBack;
535                 }
536                 return;
537             }
538             n -= __repr.__buffer.back.length;
539             __repr.__buffer.popBack;
540         }
541     }
542     @property auto save() @safe {
543         auto n = Buffer!T();
544         n.__repr = new Repr(__repr);
545         return n;
546     }
547     @property auto ref opIndex(size_t n) const pure @safe {
548         assert( __repr && n < __repr.__length );
549         foreach(b; __repr.__buffer) {
550             if ( n < b.length ) {
551                 return b[n];
552             }
553             n -= b.length;
554         }
555         assert(false, "Impossible");
556     }
557     Buffer!T opSlice(size_t m, size_t n) {
558         if ( empty || m == n ) {
559             return Buffer!T();
560         }
561         assert( m <= n && n <= __repr.__length);
562         auto res = this.save();
563         res.popBackN(res.__repr.__length-n);
564         res.popFrontN(m);
565         return res;
566     }
567     @property auto data(U=T[])() pure {
568         static if ( is(U==T[]) ) {
569             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
570                 return __repr.__buffer.front;
571             }
572         }
573         Appender!(T[]) a;
574         if ( __repr && __repr.__buffer ) {
575             foreach(ref b; __repr.__buffer) {
576                 a.put(b);
577             }
578         }
579         static if ( is(U==T[]) ) {
580             return a.data;
581         } else {
582             return cast(U)a.data;
583         }
584     }
585     string opCast(string)() {
586         return this.toString;
587     }
588     bool opEquals(U)(U x) {
589         return cast(U)this == x;
590     }
591 
592 }
593 ///
594 public unittest {
595 
596     static assert(isInputRange!(Buffer!ubyte));
597     static assert(isForwardRange!(Buffer!ubyte));
598     static assert(hasLength!(Buffer!ubyte));
599     static assert(hasSlicing!(Buffer!ubyte));
600     static assert(isBidirectionalRange!(Buffer!ubyte));
601     static assert(isRandomAccessRange!(Buffer!ubyte));
602 
603     auto b = Buffer!ubyte();
604     b.put("abc".representation.dup);
605     b.put("def".representation.dup);
606     assert(b.length == 6);
607     assert(b.toString == "abcdef");
608     assert(b.front == 'a');
609     assert(b.back == 'f');
610     assert(equal(b[0..$], "abcdef"));
611     assert(equal(b[$-2..$], "ef"));
612     assert(b == "abcdef");
613     b.popFront;
614     b.popBack;
615     assert(b.front == 'b');
616     assert(b.back == 'e');
617     assert(b.length == 4);
618     assert(retro(b).front == 'e');
619     assert(countUntil(b, 'e') == 3);
620     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
621     assert(equal(b, "bcde"));
622     b.popFront; b.popFront;
623     assert(b.front == 'd');
624     assert(b.front == b[0]);
625     assert(b.back == b[$-1]);
626 
627     auto c = Buffer!ubyte();
628     c.put("Header0: value0\n".representation.dup);
629     c.put("Header1: value1\n".representation.dup);
630     c.put("Header2: value2\n\nbody".representation.dup);
631     auto c_length = c.length;
632     auto eoh = countUntil(c, "\n\n");
633     assert(eoh == 47);
634     foreach(header; c[0..eoh].splitter('\n') ) {
635         writeln(cast(string)header.data);
636     }
637     assert(equal(findSplit(c, "\n\n")[2], "body"));
638     assert(c.length == c_length);
639 }
640 
641 public struct SSLOptions {
642     enum filetype {
643         pem,
644         asn1,
645         der = asn1,
646     }
647     private {
648         /**
649          * do we need to veryfy peer?
650          */
651         bool     _verifyPeer = true;
652         /**
653          * path to CA cert
654          */
655         string   _caCert;
656         /**
657          * path to key file (can also contain cert (for pem)
658          */
659         string   _keyFile;
660         /**
661          * path to cert file (can also contain key (for pem)
662          */
663         string   _certFile;
664         filetype _keyType = filetype.pem;
665         filetype _certType = filetype.pem;
666     }
667     ubyte haveFiles() pure nothrow @safe @nogc {
668         ubyte r = 0;
669         if ( _keyFile  ) r|=1;
670         if ( _certFile ) r|=2;
671         return r;
672     }
673     // do we want to verify peer certificates?
674     bool getVerifyPeer() pure nothrow @nogc {
675         return _verifyPeer;
676     }
677     SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe {
678         _verifyPeer = v;
679         return this;
680     }
681     /// set key file name and type (default - pem)
682     auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
683         _keyFile = f;
684         _keyType = t;
685         return this;
686     }
687     auto getKeyFile() @safe pure nothrow @nogc {
688         return _keyFile;
689     }
690     auto getKeyType() @safe pure nothrow @nogc {
691         return _keyType;
692     }
693     /// set cert file name and type (default - pem)
694     auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
695         _certFile = f;
696         _certType = t;
697         return this;
698     }
699     auto setCaCert(string p) @safe pure nothrow @nogc {
700         _caCert = p;
701         return this;
702     }
703     auto getCaCert() @safe pure nothrow @nogc {
704         return _caCert;
705     }
706     auto getCertFile() @safe pure nothrow @nogc {
707         return _certFile;
708     }
709     auto getCertType() @safe pure nothrow @nogc {
710         return _certType;
711     }
712     /// set key file type
713     void setKeyType(string t) @safe pure nothrow {
714         _keyType = cast(filetype)sslKeyTypes[t];
715     }
716     /// set cert file type
717     void setCertType(string t) @safe pure nothrow {
718         _certType = cast(filetype)sslKeyTypes[t];
719     }
720 }
721 static immutable int[string] sslKeyTypes;
722 shared static this() {
723     sslKeyTypes = [
724         "pem":SSLOptions.filetype.pem,
725         "asn1":SSLOptions.filetype.asn1,
726         "der":SSLOptions.filetype.der,
727     ];
728 }
729 
730 version(vibeD) {
731 }
732 else {
733     extern(C) {
734         int SSL_library_init();
735     }
736 
737     enum SSL_VERIFY_PEER = 0x01;
738     enum SSL_FILETYPE_PEM = 1;
739     enum SSL_FILETYPE_ASN1 = 2;
740 
741     immutable int[SSLOptions.filetype] ft2ssl;
742 
743     shared static this() {
744         ft2ssl = [
745             SSLOptions.filetype.pem: SSL_FILETYPE_PEM,
746             SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1,
747             SSLOptions.filetype.der: SSL_FILETYPE_ASN1
748         ];
749     }
750 
751     public class OpenSslSocket : Socket {
752         //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
753         private SSL* ssl;
754         private SSL_CTX* ctx;
755 
756         private void initSsl(SSLOptions opts) {
757             //ctx = SSL_CTX_new(SSLv3_client_method());
758             ctx = openssl.SSL_CTX_new(openssl.TLS_method());
759             assert(ctx !is null);
760             if ( opts.getVerifyPeer() ) {
761                 openssl.SSL_CTX_set_default_verify_paths(ctx);
762                 if ( opts.getCaCert() ) {
763                     openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null);
764                 }
765                 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null);
766             }
767             immutable keyFile = opts.getKeyFile();
768             immutable keyType = opts.getKeyType();
769             immutable certFile = opts.getCertFile();
770             immutable certType = opts.getCertType();
771             final switch(opts.haveFiles()) {
772                 case 0b11:  // both files
773                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
774                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]);
775                     break;
776                 case 0b01:  // key only
777                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
778                     openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]);
779                     break;
780                 case 0b10:  // cert only
781                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  certFile.toStringz(), ft2ssl[certType]);
782                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]);
783                     break;
784                 case 0b00:
785                     break;
786             }
787             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
788             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
789             ssl = openssl.SSL_new(ctx);
790             openssl.SSL_set_fd(ssl, cast(int)this.handle);
791         }
792 
793         @trusted
794         override void connect(Address dest) {
795             super.connect(dest);
796             if(openssl.SSL_connect(ssl) == -1) {
797                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
798             }
799         }
800         auto connectSSL() {
801             if(openssl.SSL_connect(ssl) == -1) {
802                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
803             }
804             debug(requests) tracef("ssl socket connected");
805             return this;
806         }
807         @trusted
808         override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope {
809             return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length);
810         }
811         override ptrdiff_t send(const(void)[] buf) scope {
812             return send(buf, SocketFlags.NONE);
813         }
814         @trusted
815         override ptrdiff_t receive(void[] buf, SocketFlags flags) scope {
816             return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length);
817         }
818         override ptrdiff_t receive(void[] buf) scope {
819             return receive(buf, SocketFlags.NONE);
820         }
821         this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) {
822             super(af, type);
823             initSsl(opts);
824         }
825         this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) {
826             super(sock, af);
827             initSsl(opts);
828         }
829         override void close() scope {
830             super.close();
831             if ( ssl !is null ) {
832                 openssl.SSL_free(ssl);
833                 ssl = null;
834             }
835             if ( ctx !is null ) {
836                 openssl.SSL_CTX_free(ctx);
837                 ctx = null;
838             }
839         }
840         void SSL_set_tlsext_host_name(string host) {
841 
842         }
843     }
844 
845     public class SSLSocketStream: SocketStream {
846         private SSLOptions _sslOptions;
847         private Socket underlyingSocket;
848         private SSL* ssl;
849         private string host;
850 
851         this(SSLOptions opts) {
852             _sslOptions = opts;
853         }
854         this(NetworkStream ostream, SSLOptions opts, string host = null) {
855             _sslOptions = opts;
856             this.host = host;
857             auto osock = ostream.so();
858             underlyingSocket = osock;
859             auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions);
860             ssl = ss.ssl;
861             if ( host !is null ) {
862                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
863             }
864             ss.connectSSL();
865             __isOpen = true;
866             __isConnected = true;
867             s = ss;
868             debug(requests) tracef("ssl stream created from another stream: %s", s);
869         }
870         override void close() {
871             ssl = null;
872             host = null;
873             super.close();
874             if ( underlyingSocket ) {
875                 underlyingSocket.close();
876             }
877         }
878         override void open(AddressFamily fa) {
879             if ( s !is null ) {
880                 s.close();
881             }
882             auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions);
883             assert(ss !is null, "Can't create socket");
884             ssl = ss.ssl;
885             if ( host !is null ) {
886                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
887             }
888             s = ss;
889             __isOpen = true;
890         }
891         override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) {
892             host = h;
893             return super.connect(h, p, timeout);
894         }
895         override SSLSocketStream accept() {
896             auto newso = s.accept();
897             if ( s is null ) {
898                 return null;
899             }
900             auto newstream = new SSLSocketStream(_sslOptions);
901             auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily);
902             newstream.s = sslSocket;
903             newstream.__isOpen = true;
904             newstream.__isConnected = true;
905             return newstream;
906         }
907     }
908     public class TCPSocketStream : SocketStream {
909         override void open(AddressFamily fa) {
910             if ( s !is null ) {
911                 s.close();
912             }
913             s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
914             assert(s !is null, "Can't create socket");
915             __isOpen = true;
916             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
917         }
918         override TCPSocketStream accept() {
919             auto newso = s.accept();
920             if ( s is null ) {
921                 return null;
922             }
923             auto newstream = new TCPSocketStream();
924             newstream.s = newso;
925             newstream.__isOpen = true;
926             newstream.__isConnected = true;
927             newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
928             return newstream;
929         }
930     }
931 }
932 
933 public interface NetworkStream {
934     @property bool isConnected() const;
935     @property bool isOpen() const;
936 
937     void close() @trusted;
938 
939     ///
940     /// timeout is the socket write timeout.
941     ///
942     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
943 
944     ptrdiff_t send(const(void)[] buff);
945     ptrdiff_t receive(void[] buff);
946 
947     NetworkStream accept();
948     @property void reuseAddr(bool);
949     void bind(string);
950     void bind(Address);
951     void listen(int);
952     version(vibeD) {
953         TCPConnection so();
954     } else {
955         Socket so();
956     }
957     ///
958     /// Set timeout for receive calls. 0 means no timeout.
959     ///
960     @property void readTimeout(Duration timeout);
961 }
962 
963 public abstract class SocketStream : NetworkStream {
964     private {
965         Duration timeout;
966         Socket   s;
967         bool     __isOpen;
968         bool     __isConnected;
969         string   _bind;
970     }
971     void open(AddressFamily fa) {
972     }
973     @property Socket so() @safe pure {
974         return s;
975     }
976     @property bool isOpen() @safe @nogc pure const {
977         return s && __isOpen;
978     }
979     @property bool isConnected() @safe @nogc pure const {
980         return s && __isOpen && __isConnected;
981     }
982     void close() @trusted {
983         debug(requests) tracef("Close socket");
984         if ( isOpen ) {
985             s.close();
986             __isOpen = false;
987             __isConnected = false;
988         }
989         s = null;
990     }
991     /***
992     *  bind() just remember address. We will cal bind() at the time of connect as
993     *  we can have several connection trials.
994     ***/
995     override void bind(string to) {
996         _bind = to;
997     }
998     /***
999     *  Make connection to remote site. Bind, handle connection error, try several addresses, etc
1000     ***/
1001     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1002         debug(requests) tracef(format("Create connection to %s:%d", host, port));
1003         Address[] addresses;
1004         __isConnected = false;
1005         try {
1006             addresses = getAddress(host, port);
1007         } catch (Exception e) {
1008             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
1009         }
1010         foreach(a; addresses) {
1011             debug(requests) tracef("Trying %s", a);
1012             try {
1013                 open(a.addressFamily);
1014                 if ( _bind !is null ) {
1015                     auto ad = getAddress(_bind);
1016                     debug(requests) tracef("bind to %s", ad[0]);
1017                     s.bind(ad[0]);
1018                 }
1019                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
1020                 s.connect(a);
1021                 debug(requests) tracef("Connected to %s", a);
1022                 __isConnected = true;
1023                 break;
1024             } catch (SocketException e) {
1025                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
1026                 s.close();
1027             }
1028         }
1029         if ( !__isConnected ) {
1030             throw new ConnectError("Can't connect to %s:%d".format(host, port));
1031         }
1032         return this;
1033     }
1034 
1035     ptrdiff_t send(const(void)[] buff) @safe
1036     in {assert(isConnected);}
1037     body {
1038         auto rc = s.send(buff);
1039         if (rc < 0) {
1040             close();
1041             throw new NetworkException("sending data");
1042         }
1043         return rc;
1044     }
1045 
1046     ptrdiff_t receive(void[] buff) {
1047         while (true) {
1048             auto r = s.receive(buff);
1049             if (r < 0) {
1050                 auto e = errno;
1051                 version(Windows) {
1052                     close();
1053                     if ( e == 0 ) {
1054                         throw new TimeoutException("Timeout receiving data");
1055                     }
1056                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
1057                 }
1058                 version(Posix) {
1059                     if ( e == EINTR ) {
1060                         continue;
1061                     }
1062                     close();
1063                     if ( e == EAGAIN ) {
1064                         throw new TimeoutException("Timeout receiving data");
1065                     }
1066                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
1067                 }
1068             }
1069             else {
1070                 buff.length = r;
1071             }
1072             return r;
1073         }
1074         assert(false);
1075     }
1076 
1077     @property void readTimeout(Duration timeout) @safe {
1078         s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
1079     }
1080     override SocketStream accept() {
1081         assert(false, "Implement before use");
1082     }
1083     @property override void reuseAddr(bool yes){
1084         if (yes) {
1085             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
1086         }
1087         else {
1088             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
1089         }
1090     }
1091     override void bind(Address addr){
1092         s.bind(addr);
1093     }
1094     override void listen(int n) {
1095         s.listen(n);
1096     };
1097 }
1098 
1099 version (vibeD) {
1100     import vibe.core.net, vibe.stream.tls;
1101 
1102     public class TCPVibeStream : NetworkStream {
1103     private:
1104         TCPConnection _conn;
1105         Duration _readTimeout = Duration.max;
1106         bool _isOpen = true;
1107         string _bind;
1108 
1109     public:
1110         @property bool isConnected() const {
1111             return _conn.connected;
1112         }
1113         @property override bool isOpen() const {
1114             return _conn && _isOpen;
1115         }
1116         void close() @trusted {
1117             _conn.close();
1118             _isOpen = false;
1119         }
1120         override TCPConnection so() {
1121             return _conn;
1122         }
1123         override void bind(string to) {
1124                 _bind = to;
1125         }
1126         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1127             // FIXME: timeout not supported in vibe.d
1128             try {
1129                 _conn = connectTCP(host, port, _bind);
1130             }
1131             catch (Exception e)
1132                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1133 
1134             return this;
1135         }
1136 
1137         ptrdiff_t send(const(void)[] buff) {
1138             _conn.write(cast(const(ubyte)[])buff);
1139             return buff.length;
1140         }
1141 
1142         ptrdiff_t receive(void[] buff) {
1143             if (!_conn.waitForData(_readTimeout)) {
1144                 if (!_conn.connected || _conn.empty ) {
1145                     return 0;
1146                 }
1147                 throw new TimeoutException("Timeout receiving data");
1148             }
1149 
1150             if(_conn.empty) {
1151                 return 0;
1152             }
1153 
1154             auto chunk = min(_conn.leastSize, buff.length);
1155             assert(chunk != 0);
1156             _conn.read(cast(ubyte[])buff[0 .. chunk]);
1157             return chunk;
1158         }
1159 
1160         @property void readTimeout(Duration timeout) {
1161             if (timeout == 0.seconds) {
1162                 _readTimeout = Duration.max;
1163             }
1164             else {
1165                 _readTimeout = timeout;
1166             }
1167         }
1168         override TCPVibeStream accept() {
1169             assert(false, "Must be implemented");
1170         }
1171         override @property void reuseAddr(bool){
1172             assert(false, "Not Implemented");
1173         }
1174         override void bind(Address){
1175             assert(false, "Not Implemented");
1176         }
1177         override void listen(int){
1178             assert(false, "Not Implemented");
1179         }
1180     }
1181 
1182     public class SSLVibeStream : TCPVibeStream {
1183     private:
1184         TLSStream _sslStream;
1185         bool   _isOpen = true;
1186         SSLOptions _sslOptions;
1187         TCPConnection underlyingConnection;
1188 
1189         void connectSSL(string host) {
1190             auto sslctx = createTLSContext(TLSContextKind.client);
1191             if ( _sslOptions.getVerifyPeer() ) {
1192                 if ( _sslOptions.getCaCert() == null ) {
1193                     throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate.");
1194                 }
1195                 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert());
1196                 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert;
1197             } else {
1198                 sslctx.peerValidationMode = TLSPeerValidationMode.none;
1199             }
1200             immutable keyFile = _sslOptions.getKeyFile();
1201             immutable certFile = _sslOptions.getCertFile();
1202             final switch(_sslOptions.haveFiles()) {
1203                 case 0b11:  // both files
1204                     sslctx.usePrivateKeyFile(keyFile);
1205                     sslctx.useCertificateChainFile(certFile);
1206                     break;
1207                 case 0b01:  // key only
1208                     sslctx.usePrivateKeyFile(keyFile);
1209                     sslctx.useCertificateChainFile(keyFile);
1210                     break;
1211                 case 0b10:  // cert only
1212                     sslctx.usePrivateKeyFile(certFile);
1213                     sslctx.useCertificateChainFile(certFile);
1214                     break;
1215                 case 0b00:
1216                     break;
1217             }
1218             _sslStream = createTLSStream(_conn, sslctx, host);
1219         }
1220 
1221     public:
1222         this(SSLOptions opts) {
1223             _sslOptions = opts;
1224         }
1225         override TCPConnection so() {
1226             return _conn;
1227         }
1228         this(NetworkStream ostream, SSLOptions opts, string host = null) {
1229             _sslOptions = opts;
1230             auto oconn = ostream.so();
1231             underlyingConnection = oconn;
1232             _conn = oconn;
1233             connectSSL(host);
1234         }
1235         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1236             try {
1237                 _conn = connectTCP(host, port);
1238                 connectSSL(host);
1239             }
1240             catch (ConnectError e) {
1241                 throw e;
1242             }
1243             catch (Exception e) {
1244                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1245             }
1246 
1247             return this;
1248         }
1249 
1250         override ptrdiff_t send(const(void)[] buff) {
1251             _sslStream.write(cast(const(ubyte)[])buff);
1252             return buff.length;
1253         }
1254 
1255         override ptrdiff_t receive(void[] buff) {
1256             if (!_sslStream.dataAvailableForRead) {
1257                 if (!_conn.waitForData(_readTimeout)) {
1258                     if (!_conn.connected) {
1259                         return 0;
1260                     }
1261                     throw new TimeoutException("Timeout receiving data");
1262                 }
1263             }
1264 
1265             if(_sslStream.empty) {
1266                 return 0;
1267             }
1268 
1269             auto chunk = min(_sslStream.leastSize, buff.length);
1270             assert(chunk != 0);
1271             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
1272             return chunk;
1273         }
1274 
1275         override void close() @trusted {
1276             _sslStream.finalize();
1277             _conn.close();
1278             _isOpen = false;
1279         }
1280         @property override bool isOpen() const {
1281             return _conn && _isOpen;
1282         }
1283         override SSLVibeStream accept() {
1284             assert(false, "Must be implemented");
1285         }
1286         override @property void reuseAddr(bool){
1287             assert(false, "Not Implemented");
1288         }
1289         override void bind(Address){
1290             assert(false, "Not Implemented");
1291         }
1292         override void listen(int){
1293             assert(false, "Not Implemented");
1294         }
1295     }
1296 }
1297 
1298 version (vibeD) {
1299     public alias TCPStream = TCPVibeStream;
1300     public alias SSLStream = SSLVibeStream;
1301 }
1302 else {
1303     public alias TCPStream = TCPSocketStream;
1304     public alias SSLStream = SSLSocketStream;
1305 }