1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 import core.stdc.string;
20 
21 import requests.ssl_adapter : openssl, SSL, SSL_CTX;
22 
23 alias InDataHandler = DataPipeIface!ubyte;
24 
25 public class ConnectError: Exception {
26     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
27         super(message, file, line, next);
28     }
29 }
30 
31 class DecodingException: Exception {
32     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
33         super(message, file, line, next);
34     }
35 }
36 
37 public class TimeoutException: Exception {
38     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
39         super(message, file, line, next);
40     }
41 }
42 
43 public class NetworkException: Exception {
44     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
45         super(message, file, line, next);
46     }
47 }
48 
49 /**
50  * DataPipeIface can accept some data, process, and return processed data.
51  */
52 public interface DataPipeIface(E) {
53     /// Is there any processed data ready for reading?
54     bool empty();
55     /// Put next data portion for processing
56     //void put(E[]);
57     void putNoCopy(E[]);
58     /// Get any ready data
59     E[] get();
60     /// Signal on end of incoming data stream.
61     void flush();
62 }
63 /**
64  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
65  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
66  * and uncompress Content-Encoding "gzip".
67  */
68 public class DataPipe(E) : DataPipeIface!E {
69 
70     DataPipeIface!(E)[]  pipe;
71     Buffer!E             buffer;
72     /// Append data processor to pipeline
73     /// Params:
74     /// p = processor
75     final void insert(DataPipeIface!E p) {
76         pipe ~= p;
77     }
78     final E[][] process(DataPipeIface!E p, E[][] data) {
79         E[][] result;
80         data.each!(e => p.putNoCopy(e));
81         while(!p.empty()) result ~= p.get();
82         return result;
83     }
84     /// Process next data portion. Data passed over pipeline and store result in buffer.
85     /// Params:
86     /// data = input data buffer.
87     /// NoCopy means we do not copy data to buffer, we keep reference
88     final void putNoCopy(E[] data) {
89         if ( pipe.empty ) {
90             buffer.putNoCopy(data);
91             return;
92         }
93         try {
94             auto t = process(pipe.front, [data]);
95             foreach(ref p; pipe[1..$]) {
96                 t = process(p, t);
97             }
98             t.each!(b => buffer.putNoCopy(b));
99         }
100         catch (Exception e) {
101             throw new DecodingException(e.msg);
102         }
103     }
104     /// Get what was collected in internal buffer and clear it.
105     /// Returns:
106     /// data collected.
107     final E[] get() {
108         if ( buffer.empty ) {
109             return E[].init;
110         }
111         auto res = buffer.data;
112         buffer = Buffer!E.init;
113         return res;
114     }
115     ///
116     /// get without datamove. but user receive [][]
117     ///
118     final E[][] getNoCopy()  {
119         if ( buffer.empty ) {
120             return E[][].init;
121         }
122         E[][] res = buffer.__repr.__buffer;
123         buffer = Buffer!E.init;
124         return res;
125     }
126     /// Test if internal buffer is empty
127     /// Returns:
128     /// true if internal buffer is empty (nothing to get())
129     final bool empty() pure const @safe {
130         return buffer.empty;
131     }
132     final void flush() {
133         E[][] product;
134         foreach(ref p; pipe) {
135             product.each!(e => p.putNoCopy(e));
136             p.flush();
137             product.length = 0;
138             while( !p.empty ) product ~= p.get();
139         }
140         product.each!(b => buffer.putNoCopy(b));
141     }
142 }
143 
144 /**
145  * Processor for gzipped/compressed content.
146  * Also support InputRange interface.
147  */
148 public class Decompressor(E) : DataPipeIface!E {
149     private {
150         Buffer!ubyte __buff;
151         UnCompress   __zlib;
152     }
153     this() {
154         __buff = Buffer!ubyte();
155         __zlib = new UnCompress();
156     }
157     final override void putNoCopy(E[] data) {
158         if ( __zlib is null  ) {
159             __zlib = new UnCompress();
160         }
161         __buff.putNoCopy(__zlib.uncompress(data));
162     }
163     final override E[] get() pure {
164         assert(__buff.length);
165         auto r = __buff.__repr.__buffer[0];
166         __buff.popFrontN(r.length);
167         return cast(E[])r;
168     }
169     final override void flush() {
170         if ( __zlib is null  ) {
171             return;
172         }
173         __buff.put(__zlib.flush());
174     }
175     final override @property bool empty() const pure @safe {
176         debug(requests) tracef("empty=%b", __buff.empty);
177         return __buff.empty;
178     }
179     final @property auto ref front() pure const @safe {
180         debug(requests) tracef("front: buff length=%d", __buff.length);
181         return __buff.front;
182     }
183     final @property auto popFront() pure @safe {
184         debug(requests) tracef("popFront: buff length=%d", __buff.length);
185         return __buff.popFront;
186     }
187     final @property void popFrontN(size_t n) pure @safe {
188         __buff.popFrontN(n);
189     }
190 }
191 
192 /**
193  * Unchunk chunked http responce body.
194  */
195 public class DecodeChunked : DataPipeIface!ubyte {
196     //    length := 0
197     //    read chunk-size, chunk-extension (if any) and CRLF
198     //    while (chunk-size > 0) {
199     //        read chunk-data and CRLF
200     //        append chunk-data to entity-body
201     //        length := length + chunk-size
202     //        read chunk-size and CRLF
203     //    }
204     //    read entity-header
205     //    while (entity-header not empty) {
206     //        append entity-header to existing header fields
207     //        read entity-header
208     //    }
209     //    Content-Length := length
210     //    Remove "chunked" from Transfer-Encoding
211     //
212 
213     //    Chunked-Body   = *chunk
214     //                      last-chunk
215     //                      trailer
216     //                      CRLF
217     //
218     //    chunk          = chunk-size [ chunk-extension ] CRLF
219     //                     chunk-data CRLF
220     //                     chunk-size     = 1*HEX
221     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
222     //
223     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
224     //    chunk-ext-name = token
225     //    chunk-ext-val  = token | quoted-string
226     //    chunk-data     = chunk-size(OCTET)
227     //    trailer        = *(entity-header CRLF)
228 
229     alias eType = ubyte;
230     immutable eType[] CRLF = ['\r', '\n'];
231     private {
232         enum         States {huntingSize, huntingSeparator, receiving, trailer};
233         char         state = States.huntingSize;
234         size_t       chunk_size, to_receive;
235         Buffer!ubyte buff;
236         ubyte[]      linebuff;
237     }
238     final void putNoCopy(eType[] data) {
239         while ( data.length ) {
240             if ( state == States.trailer ) {
241                 to_receive = to_receive - min(to_receive, data.length);
242                 return;
243             }
244             if ( state == States.huntingSize ) {
245                 import std.ascii;
246                 ubyte[10] digits;
247                 int i;
248                 for(i=0;i<data.length;i++) {
249                     ubyte v = data[i];
250                     digits[i] = v;
251                     if ( v == '\n' ) {
252                         i+=1;
253                         break;
254                     }
255                 }
256                 linebuff ~= digits[0..i];
257                 if ( linebuff.length >= 80 ) {
258                     throw new DecodingException("Can't find chunk size in the body");
259                 }
260                 data = data[i..$];
261                 if (!linebuff.canFind(CRLF)) {
262                     continue;
263                 }
264                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
265                 state = States.receiving;
266                 to_receive = chunk_size;
267                 if ( chunk_size == 0 ) {
268                     to_receive = 2-min(2, data.length); // trailing \r\n
269                     state = States.trailer;
270                     return;
271                 }
272                 continue;
273             }
274             if ( state == States.receiving ) {
275                 if (to_receive > 0 ) {
276                     auto can_store = min(to_receive, data.length);
277                     buff.putNoCopy(data[0..can_store]);
278                     data = data[can_store..$];
279                     to_receive -= can_store;
280                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
281                     if ( to_receive == 0 ) {
282                         //tracef("switch to huntig separator");
283                         state = States.huntingSeparator;
284                         continue;
285                     }
286                     continue;
287                 }
288                 assert(false);
289             }
290             if ( state == States.huntingSeparator ) {
291                 if ( data[0] == '\n' || data[0]=='\r') {
292                     data = data[1..$];
293                     continue;
294                 }
295                 state = States.huntingSize;
296                 linebuff.length = 0;
297                 continue;
298             }
299         }
300     }
301     final eType[] get() {
302         auto r = buff.__repr.__buffer[0];
303         buff.popFrontN(r.length);
304         return r;
305     }
306     final void flush() {
307     }
308     final bool empty() {
309         debug(requests) tracef("empty=%b", buff.empty);
310         return buff.empty;
311     }
312     final bool done() {
313         return state==States.trailer && to_receive==0;
314     }
315 }
316 
317 unittest {
318     info("Testing DataPipe");
319     globalLogLevel(LogLevel.info);
320     alias eType = char;
321     eType[] gzipped = [
322         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
323         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
324         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
325         0x08, 0x00, 0x00, 0x00
326     ]; // "abc\ndef\n"
327     auto d = new Decompressor!eType();
328     d.putNoCopy(gzipped[0..2].dup);
329     d.putNoCopy(gzipped[2..10].dup);
330     d.putNoCopy(gzipped[10..$].dup);
331     d.flush();
332     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
333 
334     auto e = new Decompressor!eType();
335     e.putNoCopy(gzipped[0..10].dup);
336     e.putNoCopy(gzipped[10..$].dup);
337     e.flush();
338     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
339     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
340     auto dp = new DataPipe!eType;
341     dp.insert(new Decompressor!eType());
342     dp.putNoCopy(gzipped[0..2].dup);
343     dp.putNoCopy(gzipped[2..$].dup);
344     dp.flush();
345     assert(equal(dp.get(), "abc\ndef\n"));
346     // empty datapipe shoul just pass input to output
347     auto dpu = new DataPipe!ubyte;
348     dpu.putNoCopy("abcd".dup.representation);
349     dpu.putNoCopy("efgh".dup.representation);
350     dpu.flush();
351     assert(equal(dpu.get(), "abcdefgh"));
352     info("Test unchunker properties");
353     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation;
354     ubyte[][] result;
355     auto uc = new DecodeChunked();
356     uc.putNoCopy(twoChunks);
357     while(!uc.empty) {
358         result ~= uc.get();
359     }
360     assert(equal(result[0], ['1', '2']));
361     assert(equal(result[1], ['3', '4']));
362     info("unchunker correctness - ok");
363     result[0][0] = '5';
364     assert(twoChunks[3] == '5');
365     info("unchunker zero copy - ok");
366     info("Testing DataPipe - done");
367 }
368 /**
369  * Buffer used to collect and process data from network. It remainds Appender, but support
370  * also Range interface.
371  * $(P To place data in buffer use put() method.)
372  * $(P  To retrieve data from buffer you can use several methods:)
373  * $(UL
374  *  $(LI Range methods: front, back, index [])
375  *  $(LI data method: return collected data (like Appender.data))
376  * )
377  */
378 static this() {
379 }
380 static ~this() {
381 }
382 enum   CACHESIZE = 1024;
383 
384 static long reprAlloc;
385 static long reprCacheHit;
386 static long reprCacheRequests;
387 
388 
389 public struct Buffer(T) {
390 //    static Repr[CACHESIZE]  cache;
391 //    static uint             cacheIndex;
392 
393     private {
394         Repr  cachedOrNew() {
395             return new Repr;
396 //            reprCacheRequests++;
397 //            if ( false && cacheIndex>0 ) {
398 //                reprCacheHit++;
399 //                cacheIndex -= 1;
400 //                return cache[cacheIndex];
401 //            } else {
402 //                return new Repr;
403 //            }
404         }
405         class Repr {
406             size_t         __length;
407             Unqual!T[][]   __buffer;
408             this() {
409                 reprAlloc++;
410                 __length = 0;
411             }
412             this(Repr other) {
413                 reprAlloc++;
414                 if ( other is null )
415                     return;
416                 __length = other.__length;
417                 __buffer = other.__buffer.dup;
418             }
419         }
420         Repr __repr;
421     }
422 
423     alias toString = data!string;
424 
425     this(this) {
426         if ( !__repr ) {
427             return;
428         }
429         __repr = new Repr(__repr);
430     }
431     this(U)(U[] data) {
432         put(data);
433     }
434     ~this() {
435         __repr = null;
436     }
437     /***************
438      * store data. Data copied
439      */
440     auto put(U)(U[] data) {
441         if ( data.length == 0 ) {
442             return;
443         }
444         if ( !__repr ) {
445             __repr = cachedOrNew();
446         }
447         static if (!is(U == T)) {
448             auto d = cast(T[])(data);
449             __repr.__length += d.length;
450             __repr.__buffer ~= d.dup;
451         } else {
452             __repr.__length += data.length;
453             __repr.__buffer ~= data.dup;
454         }
455         return;
456     }
457     auto putNoCopy(U)(U[] data) {
458         if ( data.length == 0 ) {
459             return;
460         }
461         if ( !__repr ) {
462             __repr = cachedOrNew();
463         }
464         static if (!is(U == T)) {
465             auto d = cast(T[])(data);
466             __repr.__length += d.length;
467             __repr.__buffer ~= d;
468         } else {
469             __repr.__length += data.length;
470             __repr.__buffer ~= data;
471         }
472         return;
473     }
474     @property auto opDollar() const pure @safe {
475         return __repr.__length;
476     }
477     @property size_t length() const pure @safe {
478         if ( !__repr ) {
479             return 0;
480         }
481         return __repr.__length;
482     }
483     @property auto empty() const pure @safe {
484         return length == 0;
485     }
486     @property auto ref front() const pure @safe {
487         assert(length);
488         return __repr.__buffer.front.front;
489     }
490     @property auto ref back() const pure @safe {
491         assert(length);
492         return __repr.__buffer.back.back;
493     }
494     @property void popFront() pure @safe {
495         assert(length);
496         with ( __repr ) {
497             __buffer.front.popFront;
498             if ( __buffer.front.length == 0 ) {
499                 __buffer.popFront;
500             }
501             __length--;
502         }
503     }
504     @property void popFrontN(size_t n) pure @safe {
505         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
506         __repr.__length -= n;
507         while( n ) {
508             if ( n <= __repr.__buffer.front.length ) {
509                 __repr.__buffer.front.popFrontN(n);
510                 if ( __repr.__buffer.front.length == 0 ) {
511                     __repr.__buffer.popFront;
512                 }
513                 return;
514             }
515             n -= __repr.__buffer.front.length;
516             __repr.__buffer.popFront;
517         }
518     }
519     @property void popBack() pure @safe {
520         assert(length);
521         __repr.__buffer.back.popBack;
522         if ( __repr.__buffer.back.length == 0 ) {
523             __repr.__buffer.popBack;
524         }
525         __repr.__length--;
526     }
527     @property void popBackN(size_t n) pure @safe {
528         assert(n <= length, "n: %d, length: %d".format(n, length));
529         __repr.__length -= n;
530         while( n ) {
531             if ( n <= __repr.__buffer.back.length ) {
532                 __repr.__buffer.back.popBackN(n);
533                 if ( __repr.__buffer.back.length == 0 ) {
534                     __repr.__buffer.popBack;
535                 }
536                 return;
537             }
538             n -= __repr.__buffer.back.length;
539             __repr.__buffer.popBack;
540         }
541     }
542     @property auto save() @safe {
543         auto n = Buffer!T();
544         n.__repr = new Repr(__repr);
545         return n;
546     }
547     @property auto ref opIndex(size_t n) const pure @safe {
548         assert( __repr && n < __repr.__length );
549         foreach(b; __repr.__buffer) {
550             if ( n < b.length ) {
551                 return b[n];
552             }
553             n -= b.length;
554         }
555         assert(false, "Impossible");
556     }
557     Buffer!T opSlice(size_t m, size_t n) {
558         if ( empty || m == n ) {
559             return Buffer!T();
560         }
561         assert( m <= n && n <= __repr.__length);
562         auto res = this.save();
563         res.popBackN(res.__repr.__length-n);
564         res.popFrontN(m);
565         return res;
566     }
567     @property auto data(U=T[])() pure {
568         static if ( is(U==T[]) ) {
569             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
570                 return __repr.__buffer.front;
571             }
572         }
573         Appender!(T[]) a;
574         if ( __repr && __repr.__buffer ) {
575             foreach(ref b; __repr.__buffer) {
576                 a.put(b);
577             }
578         }
579         static if ( is(U==T[]) ) {
580             return a.data;
581         } else {
582             return cast(U)a.data;
583         }
584     }
585     string opCast(string)() {
586         return this.toString;
587     }
588     bool opEquals(U)(U x) {
589         return cast(U)this == x;
590     }
591 
592 }
593 ///
594 public unittest {
595 
596     static assert(isInputRange!(Buffer!ubyte));
597     static assert(isForwardRange!(Buffer!ubyte));
598     static assert(hasLength!(Buffer!ubyte));
599     static assert(hasSlicing!(Buffer!ubyte));
600     static assert(isBidirectionalRange!(Buffer!ubyte));
601     static assert(isRandomAccessRange!(Buffer!ubyte));
602 
603     auto b = Buffer!ubyte();
604     b.put("abc".representation.dup);
605     b.put("def".representation.dup);
606     assert(b.length == 6);
607     assert(b.toString == "abcdef");
608     assert(b.front == 'a');
609     assert(b.back == 'f');
610     assert(equal(b[0..$], "abcdef"));
611     assert(equal(b[$-2..$], "ef"));
612     assert(b == "abcdef");
613     b.popFront;
614     b.popBack;
615     assert(b.front == 'b');
616     assert(b.back == 'e');
617     assert(b.length == 4);
618     assert(retro(b).front == 'e');
619     assert(countUntil(b, 'e') == 3);
620     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
621     assert(equal(b, "bcde"));
622     b.popFront; b.popFront;
623     assert(b.front == 'd');
624     assert(b.front == b[0]);
625     assert(b.back == b[$-1]);
626 
627     auto c = Buffer!ubyte();
628     c.put("Header0: value0\n".representation.dup);
629     c.put("Header1: value1\n".representation.dup);
630     c.put("Header2: value2\n\nbody".representation.dup);
631     auto c_length = c.length;
632     auto eoh = countUntil(c, "\n\n");
633     assert(eoh == 47);
634     foreach(header; c[0..eoh].splitter('\n') ) {
635         writeln(cast(string)header.data);
636     }
637     assert(equal(findSplit(c, "\n\n")[2], "body"));
638     assert(c.length == c_length);
639 }
640 
641 public struct SSLOptions {
642     enum filetype {
643         pem,
644         asn1,
645         der = asn1,
646     }
647     private {
648         /**
649          * do we need to veryfy peer?
650          */
651         bool     _verifyPeer = true;
652         /**
653          * path to CA cert
654          */
655         string   _caCert;
656         /**
657          * path to key file (can also contain cert (for pem)
658          */
659         string   _keyFile;
660         /**
661          * path to cert file (can also contain key (for pem)
662          */
663         string   _certFile;
664         filetype _keyType = filetype.pem;
665         filetype _certType = filetype.pem;
666     }
667     ubyte haveFiles() pure nothrow @safe @nogc {
668         ubyte r = 0;
669         if ( _keyFile  ) r|=1;
670         if ( _certFile ) r|=2;
671         return r;
672     }
673     // do we want to verify peer certificates?
674     bool getVerifyPeer() pure nothrow @nogc {
675         return _verifyPeer;
676     }
677     SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe {
678         _verifyPeer = v;
679         return this;
680     }
681     /// set key file name and type (default - pem)
682     auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
683         _keyFile = f;
684         _keyType = t;
685         return this;
686     }
687     auto getKeyFile() @safe pure nothrow @nogc {
688         return _keyFile;
689     }
690     auto getKeyType() @safe pure nothrow @nogc {
691         return _keyType;
692     }
693     /// set cert file name and type (default - pem)
694     auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
695         _certFile = f;
696         _certType = t;
697         return this;
698     }
699     auto setCaCert(string p) @safe pure nothrow @nogc {
700         _caCert = p;
701         return this;
702     }
703     auto getCaCert() @safe pure nothrow @nogc {
704         return _caCert;
705     }
706     auto getCertFile() @safe pure nothrow @nogc {
707         return _certFile;
708     }
709     auto getCertType() @safe pure nothrow @nogc {
710         return _certType;
711     }
712     /// set key file type
713     void setKeyType(string t) @safe pure nothrow {
714         _keyType = cast(filetype)sslKeyTypes[t];
715     }
716     /// set cert file type
717     void setCertType(string t) @safe pure nothrow {
718         _certType = cast(filetype)sslKeyTypes[t];
719     }
720 }
721 static immutable int[string] sslKeyTypes;
722 shared static this() {
723     sslKeyTypes = [
724         "pem":SSLOptions.filetype.pem,
725         "asn1":SSLOptions.filetype.asn1,
726         "der":SSLOptions.filetype.der,
727     ];
728 }
729 
730 version(vibeD) {
731 }
732 else {
733     extern(C) {
734         int SSL_library_init();
735     }
736 
737     enum SSL_VERIFY_PEER = 0x01;
738     enum SSL_FILETYPE_PEM = 1;
739     enum SSL_FILETYPE_ASN1 = 2;
740 
741     immutable int[SSLOptions.filetype] ft2ssl;
742 
743     shared static this() {
744         ft2ssl = [
745             SSLOptions.filetype.pem: SSL_FILETYPE_PEM,
746             SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1,
747             SSLOptions.filetype.der: SSL_FILETYPE_ASN1
748         ];
749     }
750 
751     public class OpenSslSocket : Socket {
752         //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
753         private SSL* ssl;
754         private SSL_CTX* ctx;
755 
756         private void initSsl(SSLOptions opts) {
757             //ctx = SSL_CTX_new(SSLv3_client_method());
758             ctx = openssl.SSL_CTX_new(openssl.TLS_method());
759             assert(ctx !is null);
760             if ( opts.getVerifyPeer() ) {
761                 openssl.SSL_CTX_set_default_verify_paths(ctx);
762                 if ( opts.getCaCert() ) {
763                     openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null);
764                 }
765                 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null);
766             }
767             immutable keyFile = opts.getKeyFile();
768             immutable keyType = opts.getKeyType();
769             immutable certFile = opts.getCertFile();
770             immutable certType = opts.getCertType();
771             final switch(opts.haveFiles()) {
772                 case 0b11:  // both files
773                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
774                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]);
775                     break;
776                 case 0b01:  // key only
777                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
778                     openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]);
779                     break;
780                 case 0b10:  // cert only
781                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  certFile.toStringz(), ft2ssl[certType]);
782                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]);
783                     break;
784                 case 0b00:
785                     break;
786             }
787             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
788             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
789             ssl = openssl.SSL_new(ctx);
790             openssl.SSL_set_fd(ssl, cast(int)this.handle);
791         }
792 
793         @trusted
794         override void connect(Address dest) {
795             super.connect(dest);
796             if(openssl.SSL_connect(ssl) == -1) {
797                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
798             }
799         }
800         auto connectSSL() {
801             if(openssl.SSL_connect(ssl) == -1) {
802                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
803             }
804             debug(requests) tracef("ssl socket connected");
805             return this;
806         }
807         @trusted
808         override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope {
809             return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length);
810         }
811         override ptrdiff_t send(const(void)[] buf) scope {
812             return send(buf, SocketFlags.NONE);
813         }
814         @trusted
815         override ptrdiff_t receive(void[] buf, SocketFlags flags) scope {
816             return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length);
817         }
818         override ptrdiff_t receive(void[] buf) scope {
819             return receive(buf, SocketFlags.NONE);
820         }
821         this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) {
822             super(af, type);
823             initSsl(opts);
824         }
825         this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) {
826             super(sock, af);
827             initSsl(opts);
828         }
829         override void close() scope {
830             super.close();
831             if ( ssl !is null ) {
832                 openssl.SSL_free(ssl);
833                 ssl = null;
834             }
835             if ( ctx !is null ) {
836                 openssl.SSL_CTX_free(ctx);
837                 ctx = null;
838             }
839         }
840         void SSL_set_tlsext_host_name(string host) {
841 
842         }
843     }
844 
845     public class SSLSocketStream: SocketStream {
846         private SSLOptions _sslOptions;
847         private Socket underlyingSocket;
848         private SSL* ssl;
849         private string host;
850 
851         this(SSLOptions opts) {
852             _sslOptions = opts;
853         }
854         this(NetworkStream ostream, SSLOptions opts, string host = null) {
855             _sslOptions = opts;
856             this.host = host;
857             auto osock = ostream.so();
858             underlyingSocket = osock;
859             osock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
860             auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions);
861             ssl = ss.ssl;
862             if ( host !is null ) {
863                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
864             }
865             ss.connectSSL();
866             __isOpen = true;
867             __isConnected = true;
868             s = ss;
869             debug(requests) tracef("ssl stream created from another stream: %s", s);
870         }
871         override void close() {
872             ssl = null;
873             host = null;
874             super.close();
875             if ( underlyingSocket ) {
876                 underlyingSocket.close();
877             }
878         }
879         override void open(AddressFamily fa) {
880             if ( s !is null ) {
881                 s.close();
882             }
883             auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions);
884             assert(ss !is null, "Can't create socket");
885             ssl = ss.ssl;
886             if ( host !is null ) {
887                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
888             }
889             s = ss;
890             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
891             __isOpen = true;
892         }
893         override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) {
894             host = h;
895             return super.connect(h, p, timeout);
896         }
897         override SSLSocketStream accept() {
898             auto newso = s.accept();
899             if ( s is null ) {
900                 return null;
901             }
902             auto newstream = new SSLSocketStream(_sslOptions);
903             auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily);
904             newstream.s = sslSocket;
905             newstream.__isOpen = true;
906             newstream.__isConnected = true;
907             return newstream;
908         }
909     }
910     public class TCPSocketStream : SocketStream {
911         override void open(AddressFamily fa) {
912             if ( s !is null ) {
913                 s.close();
914             }
915             s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
916             assert(s !is null, "Can't create socket");
917             __isOpen = true;
918             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
919         }
920         override TCPSocketStream accept() {
921             auto newso = s.accept();
922             if ( s is null ) {
923                 return null;
924             }
925             auto newstream = new TCPSocketStream();
926             newstream.s = newso;
927             newstream.__isOpen = true;
928             newstream.__isConnected = true;
929             newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
930             return newstream;
931         }
932     }
933 }
934 
935 public interface NetworkStream {
936     @property bool isConnected() const;
937     @property bool isOpen() const;
938 
939     void close() @trusted;
940 
941     ///
942     /// timeout is the socket write timeout.
943     ///
944     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
945 
946     ptrdiff_t send(const(void)[] buff);
947     ptrdiff_t receive(void[] buff);
948 
949     NetworkStream accept();
950     @property void reuseAddr(bool);
951     void bind(string);
952     void bind(Address);
953     void listen(int);
954     version(vibeD) {
955         TCPConnection so();
956     } else {
957         Socket so();
958     }
959     ///
960     /// Set timeout for receive calls. 0 means no timeout.
961     ///
962     @property void readTimeout(Duration timeout);
963 }
964 
965 public abstract class SocketStream : NetworkStream {
966     private {
967         Duration timeout;
968         Socket   s;
969         bool     __isOpen;
970         bool     __isConnected;
971         string   _bind;
972     }
973     void open(AddressFamily fa) {
974     }
975     @property Socket so() @safe pure {
976         return s;
977     }
978     @property bool isOpen() @safe @nogc pure const {
979         return s && __isOpen;
980     }
981     @property bool isConnected() @safe @nogc pure const {
982         return s && __isOpen && __isConnected;
983     }
984     void close() @trusted {
985         debug(requests) tracef("Close socket");
986         if ( isOpen ) {
987             s.close();
988             __isOpen = false;
989             __isConnected = false;
990         }
991         s = null;
992     }
993     /***
994     *  bind() just remember address. We will cal bind() at the time of connect as
995     *  we can have several connection trials.
996     ***/
997     override void bind(string to) {
998         _bind = to;
999     }
1000     /***
1001     *  Make connection to remote site. Bind, handle connection error, try several addresses, etc
1002     ***/
1003     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1004         debug(requests) tracef(format("Create connection to %s:%d", host, port));
1005         Address[] addresses;
1006         __isConnected = false;
1007         try {
1008             addresses = getAddress(host, port);
1009         } catch (Exception e) {
1010             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
1011         }
1012         foreach(a; addresses) {
1013             debug(requests) tracef("Trying %s", a);
1014             try {
1015                 open(a.addressFamily);
1016                 if ( _bind !is null ) {
1017                     auto ad = getAddress(_bind);
1018                     debug(requests) tracef("bind to %s", ad[0]);
1019                     s.bind(ad[0]);
1020                 }
1021                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
1022                 s.connect(a);
1023                 debug(requests) tracef("Connected to %s", a);
1024                 __isConnected = true;
1025                 break;
1026             } catch (SocketException e) {
1027                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
1028                 s.close();
1029             }
1030         }
1031         if ( !__isConnected ) {
1032             throw new ConnectError("Can't connect to %s:%d".format(host, port));
1033         }
1034         return this;
1035     }
1036 
1037     ptrdiff_t send(const(void)[] buff)
1038     in {assert(isConnected);}
1039     body {
1040         auto rc = s.send(buff);
1041         if (rc < 0) {
1042             close();
1043             throw new NetworkException("sending data: %s".format(to!string(strerror(errno))));
1044         }
1045         return rc;
1046     }
1047 
1048     ptrdiff_t receive(void[] buff) {
1049         while (true) {
1050             auto r = s.receive(buff);
1051             if (r < 0) {
1052                 auto e = errno;
1053                 version(Windows) {
1054                     close();
1055                     if ( e == 0 ) {
1056                         throw new TimeoutException("Timeout receiving data");
1057                     }
1058                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
1059                 }
1060                 version(Posix) {
1061                     if ( e == EINTR ) {
1062                         continue;
1063                     }
1064                     close();
1065                     if ( e == EAGAIN ) {
1066                         throw new TimeoutException("Timeout receiving data");
1067                     }
1068                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
1069                 }
1070             }
1071             else {
1072                 buff.length = r;
1073             }
1074             return r;
1075         }
1076         assert(false);
1077     }
1078 
1079     @property void readTimeout(Duration timeout) @safe {
1080         if ( __isConnected )
1081         {
1082             s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
1083         }
1084     }
1085     override SocketStream accept() {
1086         assert(false, "Implement before use");
1087     }
1088     @property override void reuseAddr(bool yes){
1089         if (yes) {
1090             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
1091         }
1092         else {
1093             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
1094         }
1095     }
1096     override void bind(Address addr){
1097         s.bind(addr);
1098     }
1099     override void listen(int n) {
1100         s.listen(n);
1101     };
1102 }
1103 
1104 version (vibeD) {
1105     import vibe.core.net, vibe.stream.tls;
1106 
1107     public class TCPVibeStream : NetworkStream {
1108     private:
1109         TCPConnection _conn;
1110         Duration _readTimeout = Duration.max;
1111         bool _isOpen = true;
1112         string _bind;
1113 
1114     public:
1115         @property bool isConnected() const {
1116             return _conn.connected;
1117         }
1118         @property override bool isOpen() const {
1119             return _conn && _isOpen;
1120         }
1121         void close() @trusted {
1122             _conn.close();
1123             _isOpen = false;
1124         }
1125         override TCPConnection so() {
1126             return _conn;
1127         }
1128         override void bind(string to) {
1129                 _bind = to;
1130         }
1131         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1132             // FIXME: timeout not supported in vibe.d
1133             try {
1134                 _conn = connectTCP(host, port, _bind);
1135             }
1136             catch (Exception e)
1137                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1138 
1139             return this;
1140         }
1141 
1142         ptrdiff_t send(const(void)[] buff) {
1143             _conn.write(cast(const(ubyte)[])buff);
1144             return buff.length;
1145         }
1146 
1147         ptrdiff_t receive(void[] buff) {
1148             if (!_conn.waitForData(_readTimeout)) {
1149                 if (!_conn.connected || _conn.empty ) {
1150                     return 0;
1151                 }
1152                 throw new TimeoutException("Timeout receiving data");
1153             }
1154 
1155             if(_conn.empty) {
1156                 return 0;
1157             }
1158 
1159             auto chunk = min(_conn.leastSize, buff.length);
1160             assert(chunk != 0);
1161             _conn.read(cast(ubyte[])buff[0 .. chunk]);
1162             return chunk;
1163         }
1164 
1165         @property void readTimeout(Duration timeout) {
1166             if (timeout == 0.seconds) {
1167                 _readTimeout = Duration.max;
1168             }
1169             else {
1170                 _readTimeout = timeout;
1171             }
1172         }
1173         override TCPVibeStream accept() {
1174             assert(false, "Must be implemented");
1175         }
1176         override @property void reuseAddr(bool){
1177             assert(false, "Not Implemented");
1178         }
1179         override void bind(Address){
1180             assert(false, "Not Implemented");
1181         }
1182         override void listen(int){
1183             assert(false, "Not Implemented");
1184         }
1185     }
1186 
1187     public class SSLVibeStream : TCPVibeStream {
1188     private:
1189         TLSStream _sslStream;
1190         bool   _isOpen = true;
1191         SSLOptions _sslOptions;
1192         TCPConnection underlyingConnection;
1193 
1194         void connectSSL(string host) {
1195             auto sslctx = createTLSContext(TLSContextKind.client);
1196             if ( _sslOptions.getVerifyPeer() ) {
1197                 if ( _sslOptions.getCaCert() == null ) {
1198                     throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate.");
1199                 }
1200                 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert());
1201                 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert;
1202             } else {
1203                 sslctx.peerValidationMode = TLSPeerValidationMode.none;
1204             }
1205             immutable keyFile = _sslOptions.getKeyFile();
1206             immutable certFile = _sslOptions.getCertFile();
1207             final switch(_sslOptions.haveFiles()) {
1208                 case 0b11:  // both files
1209                     sslctx.usePrivateKeyFile(keyFile);
1210                     sslctx.useCertificateChainFile(certFile);
1211                     break;
1212                 case 0b01:  // key only
1213                     sslctx.usePrivateKeyFile(keyFile);
1214                     sslctx.useCertificateChainFile(keyFile);
1215                     break;
1216                 case 0b10:  // cert only
1217                     sslctx.usePrivateKeyFile(certFile);
1218                     sslctx.useCertificateChainFile(certFile);
1219                     break;
1220                 case 0b00:
1221                     break;
1222             }
1223             _sslStream = createTLSStream(_conn, sslctx, host);
1224         }
1225 
1226     public:
1227         this(SSLOptions opts) {
1228             _sslOptions = opts;
1229         }
1230         override TCPConnection so() {
1231             return _conn;
1232         }
1233         this(NetworkStream ostream, SSLOptions opts, string host = null) {
1234             _sslOptions = opts;
1235             auto oconn = ostream.so();
1236             underlyingConnection = oconn;
1237             _conn = oconn;
1238             connectSSL(host);
1239         }
1240         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1241             try {
1242                 _conn = connectTCP(host, port);
1243                 connectSSL(host);
1244             }
1245             catch (ConnectError e) {
1246                 throw e;
1247             }
1248             catch (Exception e) {
1249                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1250             }
1251 
1252             return this;
1253         }
1254 
1255         override ptrdiff_t send(const(void)[] buff) {
1256             _sslStream.write(cast(const(ubyte)[])buff);
1257             return buff.length;
1258         }
1259 
1260         override ptrdiff_t receive(void[] buff) {
1261             if (!_sslStream.dataAvailableForRead) {
1262                 if (!_conn.waitForData(_readTimeout)) {
1263                     if (!_conn.connected) {
1264                         return 0;
1265                     }
1266                     throw new TimeoutException("Timeout receiving data");
1267                 }
1268             }
1269 
1270             if(_sslStream.empty) {
1271                 return 0;
1272             }
1273 
1274             auto chunk = min(_sslStream.leastSize, buff.length);
1275             assert(chunk != 0);
1276             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
1277             return chunk;
1278         }
1279 
1280         override void close() @trusted {
1281             _sslStream.finalize();
1282             _conn.close();
1283             _isOpen = false;
1284         }
1285         @property override bool isOpen() const {
1286             return _conn && _isOpen;
1287         }
1288         override SSLVibeStream accept() {
1289             assert(false, "Must be implemented");
1290         }
1291         override @property void reuseAddr(bool){
1292             assert(false, "Not Implemented");
1293         }
1294         override void bind(Address){
1295             assert(false, "Not Implemented");
1296         }
1297         override void listen(int){
1298             assert(false, "Not Implemented");
1299         }
1300     }
1301 }
1302 
1303 version (vibeD) {
1304     public alias TCPStream = TCPVibeStream;
1305     public alias SSLStream = SSLVibeStream;
1306 }
1307 else {
1308     public alias TCPStream = TCPSocketStream;
1309     public alias SSLStream = SSLSocketStream;
1310 }