1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 import core.stdc.string;
20 
21 import requests.ssl_adapter : openssl, SSL, SSL_CTX;
22 
23 alias InDataHandler = DataPipeIface!ubyte;
24 
25 public class ConnectError: Exception {
26     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
27         super(message, file, line, next);
28     }
29 }
30 
31 class DecodingException: Exception {
32     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
33         super(message, file, line, next);
34     }
35 }
36 
37 public class TimeoutException: Exception {
38     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
39         super(message, file, line, next);
40     }
41 }
42 
43 public class NetworkException: Exception {
44     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
45         super(message, file, line, next);
46     }
47 }
48 
49 /**
50  * DataPipeIface can accept some data, process, and return processed data.
51  */
52 public interface DataPipeIface(E) {
53     /// Is there any processed data ready for reading?
54     bool empty();
55     /// Put next data portion for processing
56     //void put(E[]);
57     void putNoCopy(E[]);
58     /// Get any ready data
59     E[] get();
60     /// Signal on end of incoming data stream.
61     void flush();
62 }
63 /**
64  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
65  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
66  * and uncompress Content-Encoding "gzip".
67  */
68 public class DataPipe(E) : DataPipeIface!E {
69 
70     DataPipeIface!(E)[]  pipe;
71     Buffer!E             buffer;
72     /// Append data processor to pipeline
73     /// Params:
74     /// p = processor
75     final void insert(DataPipeIface!E p) {
76         pipe ~= p;
77     }
78     final E[][] process(DataPipeIface!E p, E[][] data) {
79         E[][] result;
80         data.each!(e => p.putNoCopy(e));
81         while(!p.empty()) result ~= p.get();
82         return result;
83     }
84     /// Process next data portion. Data passed over pipeline and store result in buffer.
85     /// Params:
86     /// data = input data buffer.
87     /// NoCopy means we do not copy data to buffer, we keep reference
88     final void putNoCopy(E[] data) {
89         if ( pipe.empty ) {
90             buffer.putNoCopy(data);
91             return;
92         }
93         try {
94             auto t = process(pipe.front, [data]);
95             foreach(ref p; pipe[1..$]) {
96                 t = process(p, t);
97             }
98             t.each!(b => buffer.putNoCopy(b));
99         }
100         catch (Exception e) {
101             throw new DecodingException(e.msg);
102         }
103     }
104     /// Get what was collected in internal buffer and clear it.
105     /// Returns:
106     /// data collected.
107     final E[] get() {
108         if ( buffer.empty ) {
109             return E[].init;
110         }
111         auto res = buffer.data;
112         buffer = Buffer!E.init;
113         return res;
114     }
115     ///
116     /// get without datamove. but user receive [][]
117     ///
118     final E[][] getNoCopy()  {
119         if ( buffer.empty ) {
120             return E[][].init;
121         }
122         E[][] res = buffer.__repr.__buffer;
123         buffer = Buffer!E.init;
124         return res;
125     }
126     /// Test if internal buffer is empty
127     /// Returns:
128     /// true if internal buffer is empty (nothing to get())
129     final bool empty() pure const @safe {
130         return buffer.empty;
131     }
132     final void flush() {
133         E[][] product;
134         foreach(ref p; pipe) {
135             product.each!(e => p.putNoCopy(e));
136             p.flush();
137             product.length = 0;
138             while( !p.empty ) product ~= p.get();
139         }
140         product.each!(b => buffer.putNoCopy(b));
141     }
142 }
143 
144 /**
145  * Processor for gzipped/compressed content.
146  * Also support InputRange interface.
147  */
148 public class Decompressor(E) : DataPipeIface!E {
149     private {
150         Buffer!ubyte __buff;
151         UnCompress   __zlib;
152     }
153     this() {
154         __buff = Buffer!ubyte();
155         __zlib = new UnCompress();
156     }
157     final override void putNoCopy(E[] data) {
158         if ( __zlib is null  ) {
159             __zlib = new UnCompress();
160         }
161         __buff.putNoCopy(__zlib.uncompress(data));
162     }
163     final override E[] get() pure {
164         assert(__buff.length);
165         auto r = __buff.__repr.__buffer[0];
166         __buff.popFrontN(r.length);
167         return cast(E[])r;
168     }
169     final override void flush() {
170         if ( __zlib is null  ) {
171             return;
172         }
173         __buff.put(__zlib.flush());
174     }
175     final override @property bool empty() const pure @safe {
176         debug(requests) tracef("empty=%b", __buff.empty);
177         return __buff.empty;
178     }
179     final @property auto ref front() pure const @safe {
180         debug(requests) tracef("front: buff length=%d", __buff.length);
181         return __buff.front;
182     }
183     final @property auto popFront() pure @safe {
184         debug(requests) tracef("popFront: buff length=%d", __buff.length);
185         return __buff.popFront;
186     }
187     final @property void popFrontN(size_t n) pure @safe {
188         __buff.popFrontN(n);
189     }
190 }
191 
192 /**
193  * Unchunk chunked http responce body.
194  */
195 public class DecodeChunked : DataPipeIface!ubyte {
196     //    length := 0
197     //    read chunk-size, chunk-extension (if any) and CRLF
198     //    while (chunk-size > 0) {
199     //        read chunk-data and CRLF
200     //        append chunk-data to entity-body
201     //        length := length + chunk-size
202     //        read chunk-size and CRLF
203     //    }
204     //    read entity-header
205     //    while (entity-header not empty) {
206     //        append entity-header to existing header fields
207     //        read entity-header
208     //    }
209     //    Content-Length := length
210     //    Remove "chunked" from Transfer-Encoding
211     //
212 
213     //    Chunked-Body   = *chunk
214     //                      last-chunk
215     //                      trailer
216     //                      CRLF
217     //
218     //    chunk          = chunk-size [ chunk-extension ] CRLF
219     //                     chunk-data CRLF
220     //                     chunk-size     = 1*HEX
221     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
222     //
223     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
224     //    chunk-ext-name = token
225     //    chunk-ext-val  = token | quoted-string
226     //    chunk-data     = chunk-size(OCTET)
227     //    trailer        = *(entity-header CRLF)
228 
229     alias eType = ubyte;
230     immutable eType[] CRLF = ['\r', '\n'];
231     private {
232         enum         States {huntingSize, huntingSeparator, receiving, trailer};
233         char         state = States.huntingSize;
234         size_t       chunk_size, to_receive;
235         Buffer!ubyte buff;
236         ubyte[]      linebuff;
237     }
238     final void putNoCopy(eType[] data) {
239         while ( data.length ) {
240             if ( state == States.trailer ) {
241                 to_receive = to_receive - min(to_receive, data.length);
242                 return;
243             }
244             if ( state == States.huntingSize ) {
245                 import std.ascii;
246                 ubyte[10] digits;
247                 int i;
248                 for(i=0;i<data.length;i++) {
249                     ubyte v = data[i];
250                     digits[i] = v;
251                     if ( v == '\n' ) {
252                         i+=1;
253                         break;
254                     }
255                 }
256                 linebuff ~= digits[0..i];
257                 if ( linebuff.length >= 80 ) {
258                     throw new DecodingException("Can't find chunk size in the body");
259                 }
260                 data = data[i..$];
261                 if (!linebuff.canFind(CRLF)) {
262                     continue;
263                 }
264                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
265                 state = States.receiving;
266                 to_receive = chunk_size;
267                 if ( chunk_size == 0 ) {
268                     to_receive = 2-min(2, data.length); // trailing \r\n
269                     state = States.trailer;
270                     return;
271                 }
272                 continue;
273             }
274             if ( state == States.receiving ) {
275                 if (to_receive > 0 ) {
276                     auto can_store = min(to_receive, data.length);
277                     buff.putNoCopy(data[0..can_store]);
278                     data = data[can_store..$];
279                     to_receive -= can_store;
280                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
281                     if ( to_receive == 0 ) {
282                         //tracef("switch to huntig separator");
283                         state = States.huntingSeparator;
284                         continue;
285                     }
286                     continue;
287                 }
288                 assert(false);
289             }
290             if ( state == States.huntingSeparator ) {
291                 if ( data[0] == '\n' || data[0]=='\r') {
292                     data = data[1..$];
293                     continue;
294                 }
295                 state = States.huntingSize;
296                 linebuff.length = 0;
297                 continue;
298             }
299         }
300     }
301     final eType[] get() {
302         auto r = buff.__repr.__buffer[0];
303         buff.popFrontN(r.length);
304         return r;
305     }
306     final void flush() {
307     }
308     final bool empty() {
309         debug(requests) tracef("empty=%b", buff.empty);
310         return buff.empty;
311     }
312     final bool done() {
313         return state==States.trailer && to_receive==0;
314     }
315 }
316 
317 unittest {
318     info("Testing DataPipe");
319     globalLogLevel(LogLevel.info);
320     alias eType = char;
321     eType[] gzipped = [
322         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
323         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
324         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
325         0x08, 0x00, 0x00, 0x00
326     ]; // "abc\ndef\n"
327     auto d = new Decompressor!eType();
328     d.putNoCopy(gzipped[0..2].dup);
329     d.putNoCopy(gzipped[2..10].dup);
330     d.putNoCopy(gzipped[10..$].dup);
331     d.flush();
332     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
333 
334     auto e = new Decompressor!eType();
335     e.putNoCopy(gzipped[0..10].dup);
336     e.putNoCopy(gzipped[10..$].dup);
337     e.flush();
338     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
339     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
340     auto dp = new DataPipe!eType;
341     dp.insert(new Decompressor!eType());
342     dp.putNoCopy(gzipped[0..2].dup);
343     dp.putNoCopy(gzipped[2..$].dup);
344     dp.flush();
345     assert(equal(dp.get(), "abc\ndef\n"));
346     // empty datapipe shoul just pass input to output
347     auto dpu = new DataPipe!ubyte;
348     dpu.putNoCopy("abcd".dup.representation);
349     dpu.putNoCopy("efgh".dup.representation);
350     dpu.flush();
351     assert(equal(dpu.get(), "abcdefgh"));
352     info("Test unchunker properties");
353     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation;
354     ubyte[][] result;
355     auto uc = new DecodeChunked();
356     uc.putNoCopy(twoChunks);
357     while(!uc.empty) {
358         result ~= uc.get();
359     }
360     assert(equal(result[0], ['1', '2']));
361     assert(equal(result[1], ['3', '4']));
362     info("unchunker correctness - ok");
363     result[0][0] = '5';
364     assert(twoChunks[3] == '5');
365     info("unchunker zero copy - ok");
366     info("Testing DataPipe - done");
367 }
368 /**
369  * Buffer used to collect and process data from network. It remainds Appender, but support
370  * also Range interface.
371  * $(P To place data in buffer use put() method.)
372  * $(P  To retrieve data from buffer you can use several methods:)
373  * $(UL
374  *  $(LI Range methods: front, back, index [])
375  *  $(LI data method: return collected data (like Appender.data))
376  * )
377  */
378 enum   CACHESIZE = 1024;
379 
380 static long reprAlloc;
381 static long reprCacheHit;
382 static long reprCacheRequests;
383 
384 
385 public struct Buffer(T) {
386 //    static Repr[CACHESIZE]  cache;
387 //    static uint             cacheIndex;
388 
389     private {
390         Repr  cachedOrNew() {
391             return new Repr;
392 //            reprCacheRequests++;
393 //            if ( false && cacheIndex>0 ) {
394 //                reprCacheHit++;
395 //                cacheIndex -= 1;
396 //                return cache[cacheIndex];
397 //            } else {
398 //                return new Repr;
399 //            }
400         }
401         class Repr {
402             size_t         __length;
403             Unqual!T[][]   __buffer;
404             this() {
405                 reprAlloc++;
406                 __length = 0;
407             }
408             this(Repr other) {
409                 reprAlloc++;
410                 if ( other is null )
411                     return;
412                 __length = other.__length;
413                 __buffer = other.__buffer.dup;
414             }
415         }
416         Repr __repr;
417     }
418 
419     alias toString = data!string;
420 
421     this(this) {
422         if ( !__repr ) {
423             return;
424         }
425         __repr = new Repr(__repr);
426     }
427     this(U)(U[] data) {
428         put(data);
429     }
430     ~this() {
431         __repr = null;
432     }
433     /***************
434      * store data. Data copied
435      */
436     auto put(U)(U[] data) {
437         if ( data.length == 0 ) {
438             return;
439         }
440         if ( !__repr ) {
441             __repr = cachedOrNew();
442         }
443         static if (!is(U == T)) {
444             auto d = cast(T[])(data);
445             __repr.__length += d.length;
446             __repr.__buffer ~= d.dup;
447         } else {
448             __repr.__length += data.length;
449             __repr.__buffer ~= data.dup;
450         }
451         return;
452     }
453     auto putNoCopy(U)(U[] data) {
454         if ( data.length == 0 ) {
455             return;
456         }
457         if ( !__repr ) {
458             __repr = cachedOrNew();
459         }
460         static if (!is(U == T)) {
461             auto d = cast(T[])(data);
462             __repr.__length += d.length;
463             __repr.__buffer ~= d;
464         } else {
465             __repr.__length += data.length;
466             __repr.__buffer ~= data;
467         }
468         return;
469     }
470     @property auto opDollar() const pure @safe {
471         return __repr.__length;
472     }
473     @property size_t length() const pure @safe {
474         if ( !__repr ) {
475             return 0;
476         }
477         return __repr.__length;
478     }
479     @property auto empty() const pure @safe {
480         return length == 0;
481     }
482     @property auto ref front() const pure @safe {
483         assert(length);
484         return __repr.__buffer.front.front;
485     }
486     @property auto ref back() const pure @safe {
487         assert(length);
488         return __repr.__buffer.back.back;
489     }
490     @property void popFront() pure @safe {
491         assert(length);
492         with ( __repr ) {
493             __buffer.front.popFront;
494             if ( __buffer.front.length == 0 ) {
495                 __buffer.popFront;
496             }
497             __length--;
498         }
499     }
500     @property void popFrontN(size_t n) pure @safe {
501         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
502         __repr.__length -= n;
503         while( n ) {
504             if ( n <= __repr.__buffer.front.length ) {
505                 __repr.__buffer.front.popFrontN(n);
506                 if ( __repr.__buffer.front.length == 0 ) {
507                     __repr.__buffer.popFront;
508                 }
509                 return;
510             }
511             n -= __repr.__buffer.front.length;
512             __repr.__buffer.popFront;
513         }
514     }
515     @property void popBack() pure @safe {
516         assert(length);
517         __repr.__buffer.back.popBack;
518         if ( __repr.__buffer.back.length == 0 ) {
519             __repr.__buffer.popBack;
520         }
521         __repr.__length--;
522     }
523     @property void popBackN(size_t n) pure @safe {
524         assert(n <= length, "n: %d, length: %d".format(n, length));
525         __repr.__length -= n;
526         while( n ) {
527             if ( n <= __repr.__buffer.back.length ) {
528                 __repr.__buffer.back.popBackN(n);
529                 if ( __repr.__buffer.back.length == 0 ) {
530                     __repr.__buffer.popBack;
531                 }
532                 return;
533             }
534             n -= __repr.__buffer.back.length;
535             __repr.__buffer.popBack;
536         }
537     }
538     @property auto save() @safe {
539         auto n = Buffer!T();
540         n.__repr = new Repr(__repr);
541         return n;
542     }
543     @property auto ref opIndex(size_t n) const pure @safe {
544         assert( __repr && n < __repr.__length );
545         foreach(b; __repr.__buffer) {
546             if ( n < b.length ) {
547                 return b[n];
548             }
549             n -= b.length;
550         }
551         assert(false, "Impossible");
552     }
553     Buffer!T opSlice(size_t m, size_t n) {
554         if ( empty || m == n ) {
555             return Buffer!T();
556         }
557         assert( m <= n && n <= __repr.__length);
558         auto res = this.save();
559         res.popBackN(res.__repr.__length-n);
560         res.popFrontN(m);
561         return res;
562     }
563     @property auto data(U=T[])() pure {
564         static if ( is(U==T[]) ) {
565             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
566                 return __repr.__buffer.front;
567             }
568         }
569         Appender!(T[]) a;
570         if ( __repr && __repr.__buffer ) {
571             foreach(ref b; __repr.__buffer) {
572                 a.put(b);
573             }
574         }
575         static if ( is(U==T[]) ) {
576             return a.data;
577         } else {
578             return cast(U)a.data;
579         }
580     }
581     string opCast(string)() {
582         return this.toString;
583     }
584     bool opEquals(U)(U x) {
585         return cast(U)this == x;
586     }
587 
588 }
589 ///
590 public unittest {
591 
592     static assert(isInputRange!(Buffer!ubyte));
593     static assert(isForwardRange!(Buffer!ubyte));
594     static assert(hasLength!(Buffer!ubyte));
595     static assert(hasSlicing!(Buffer!ubyte));
596     static assert(isBidirectionalRange!(Buffer!ubyte));
597     static assert(isRandomAccessRange!(Buffer!ubyte));
598 
599     auto b = Buffer!ubyte();
600     b.put("abc".representation.dup);
601     b.put("def".representation.dup);
602     assert(b.length == 6);
603     assert(b.toString == "abcdef");
604     assert(b.front == 'a');
605     assert(b.back == 'f');
606     assert(equal(b[0..$], "abcdef"));
607     assert(equal(b[$-2..$], "ef"));
608     assert(b == "abcdef");
609     b.popFront;
610     b.popBack;
611     assert(b.front == 'b');
612     assert(b.back == 'e');
613     assert(b.length == 4);
614     assert(retro(b).front == 'e');
615     assert(countUntil(b, 'e') == 3);
616     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
617     assert(equal(b, "bcde"));
618     b.popFront; b.popFront;
619     assert(b.front == 'd');
620     assert(b.front == b[0]);
621     assert(b.back == b[$-1]);
622 
623     auto c = Buffer!ubyte();
624     c.put("Header0: value0\n".representation.dup);
625     c.put("Header1: value1\n".representation.dup);
626     c.put("Header2: value2\n\nbody".representation.dup);
627     auto c_length = c.length;
628     auto eoh = countUntil(c, "\n\n");
629     assert(eoh == 47);
630     foreach(header; c[0..eoh].splitter('\n') ) {
631         writeln(cast(string)header.data);
632     }
633     assert(equal(findSplit(c, "\n\n")[2], "body"));
634     assert(c.length == c_length);
635 }
636 
637 public struct SSLOptions {
638     enum filetype {
639         pem,
640         asn1,
641         der = asn1,
642     }
643     private {
644         /**
645          * do we need to veryfy peer?
646          */
647         bool     _verifyPeer = true;
648         /**
649          * path to CA cert
650          */
651         string   _caCert;
652         /**
653          * path to key file (can also contain cert (for pem)
654          */
655         string   _keyFile;
656         /**
657          * path to cert file (can also contain key (for pem)
658          */
659         string   _certFile;
660         filetype _keyType = filetype.pem;
661         filetype _certType = filetype.pem;
662     }
663     ubyte haveFiles() pure nothrow @safe @nogc {
664         ubyte r = 0;
665         if ( _keyFile  ) r|=1;
666         if ( _certFile ) r|=2;
667         return r;
668     }
669     // do we want to verify peer certificates?
670     bool getVerifyPeer() pure nothrow @nogc {
671         return _verifyPeer;
672     }
673     SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe {
674         _verifyPeer = v;
675         return this;
676     }
677     /// set key file name and type (default - pem)
678     auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
679         _keyFile = f;
680         _keyType = t;
681         return this;
682     }
683     auto getKeyFile() @safe pure nothrow @nogc {
684         return _keyFile;
685     }
686     auto getKeyType() @safe pure nothrow @nogc {
687         return _keyType;
688     }
689     /// set cert file name and type (default - pem)
690     auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
691         _certFile = f;
692         _certType = t;
693         return this;
694     }
695     auto setCaCert(string p) @safe pure nothrow @nogc {
696         _caCert = p;
697         return this;
698     }
699     auto getCaCert() @safe pure nothrow @nogc {
700         return _caCert;
701     }
702     auto getCertFile() @safe pure nothrow @nogc {
703         return _certFile;
704     }
705     auto getCertType() @safe pure nothrow @nogc {
706         return _certType;
707     }
708     /// set key file type
709     void setKeyType(string t) @safe pure nothrow {
710         _keyType = cast(filetype)sslKeyTypes[t];
711     }
712     /// set cert file type
713     void setCertType(string t) @safe pure nothrow {
714         _certType = cast(filetype)sslKeyTypes[t];
715     }
716 }
717 static immutable int[string] sslKeyTypes;
718 shared static this() {
719     sslKeyTypes = [
720         "pem":SSLOptions.filetype.pem,
721         "asn1":SSLOptions.filetype.asn1,
722         "der":SSLOptions.filetype.der,
723     ];
724 }
725 
726 version(vibeD) {
727 }
728 else {
729     extern(C) {
730         int SSL_library_init();
731     }
732 
733     enum SSL_VERIFY_PEER = 0x01;
734     enum SSL_FILETYPE_PEM = 1;
735     enum SSL_FILETYPE_ASN1 = 2;
736 
737     immutable int[SSLOptions.filetype] ft2ssl;
738 
739     shared static this() {
740         ft2ssl = [
741             SSLOptions.filetype.pem: SSL_FILETYPE_PEM,
742             SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1,
743             SSLOptions.filetype.der: SSL_FILETYPE_ASN1
744         ];
745     }
746 
747     public class OpenSslSocket : Socket {
748         //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
749         private SSL* ssl;
750         private SSL_CTX* ctx;
751 
752         private void initSsl(SSLOptions opts) {
753             //ctx = SSL_CTX_new(SSLv3_client_method());
754             ctx = openssl.SSL_CTX_new(openssl.TLS_method());
755             assert(ctx !is null);
756             if ( opts.getVerifyPeer() ) {
757                 openssl.SSL_CTX_set_default_verify_paths(ctx);
758                 if ( opts.getCaCert() ) {
759                     openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null);
760                 }
761                 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null);
762             }
763             immutable keyFile = opts.getKeyFile();
764             immutable keyType = opts.getKeyType();
765             immutable certFile = opts.getCertFile();
766             immutable certType = opts.getCertType();
767             final switch(opts.haveFiles()) {
768                 case 0b11:  // both files
769                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
770                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]);
771                     break;
772                 case 0b01:  // key only
773                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
774                     openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]);
775                     break;
776                 case 0b10:  // cert only
777                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  certFile.toStringz(), ft2ssl[certType]);
778                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]);
779                     break;
780                 case 0b00:
781                     break;
782             }
783             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
784             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
785             ssl = openssl.SSL_new(ctx);
786             openssl.SSL_set_fd(ssl, cast(int)this.handle);
787         }
788 
789         @trusted
790         override void connect(Address dest) {
791             super.connect(dest);
792             if(openssl.SSL_connect(ssl) == -1) {
793                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
794             }
795         }
796         auto connectSSL() {
797             if(openssl.SSL_connect(ssl) == -1) {
798                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
799             }
800             debug(requests) tracef("ssl socket connected");
801             return this;
802         }
803         @trusted
804         override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope {
805             return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length);
806         }
807         override ptrdiff_t send(const(void)[] buf) scope {
808             return send(buf, SocketFlags.NONE);
809         }
810         @trusted
811         override ptrdiff_t receive(void[] buf, SocketFlags flags) scope {
812             return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length);
813         }
814         override ptrdiff_t receive(void[] buf) scope {
815             return receive(buf, SocketFlags.NONE);
816         }
817         this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) {
818             super(af, type);
819             initSsl(opts);
820         }
821         this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) {
822             super(sock, af);
823             initSsl(opts);
824         }
825         override void close() scope {
826             super.close();
827             if ( ssl !is null ) {
828                 openssl.SSL_free(ssl);
829                 ssl = null;
830             }
831             if ( ctx !is null ) {
832                 openssl.SSL_CTX_free(ctx);
833                 ctx = null;
834             }
835         }
836         void SSL_set_tlsext_host_name(string host) {
837 
838         }
839     }
840 
841     public class SSLSocketStream: SocketStream {
842         private SSLOptions _sslOptions;
843         private Socket underlyingSocket;
844         private SSL* ssl;
845         private string host;
846 
847         this(SSLOptions opts) {
848             _sslOptions = opts;
849         }
850         this(NetworkStream ostream, SSLOptions opts, string host = null) {
851             _sslOptions = opts;
852             this.host = host;
853             auto osock = ostream.so();
854             underlyingSocket = osock;
855             osock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
856             auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions);
857             ssl = ss.ssl;
858             if ( host !is null ) {
859                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
860             }
861             ss.connectSSL();
862             __isOpen = true;
863             __isConnected = true;
864             s = ss;
865             debug(requests) tracef("ssl stream created from another stream: %s", s);
866         }
867         override void close() {
868             ssl = null;
869             host = null;
870             super.close();
871             if ( underlyingSocket ) {
872                 underlyingSocket.close();
873             }
874         }
875         override void open(AddressFamily fa) {
876             if ( s !is null ) {
877                 s.close();
878             }
879             auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions);
880             assert(ss !is null, "Can't create socket");
881             ssl = ss.ssl;
882             if ( host !is null ) {
883                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
884             }
885             s = ss;
886             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
887             __isOpen = true;
888         }
889         override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) {
890             host = h;
891             return super.connect(h, p, timeout);
892         }
893         override SSLSocketStream accept() {
894             auto newso = s.accept();
895             if ( s is null ) {
896                 return null;
897             }
898             auto newstream = new SSLSocketStream(_sslOptions);
899             auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily);
900             newstream.s = sslSocket;
901             newstream.__isOpen = true;
902             newstream.__isConnected = true;
903             return newstream;
904         }
905     }
906     public class TCPSocketStream : SocketStream {
907         override void open(AddressFamily fa) {
908             if ( s !is null ) {
909                 s.close();
910             }
911             s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
912             assert(s !is null, "Can't create socket");
913             __isOpen = true;
914             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
915         }
916         override TCPSocketStream accept() {
917             auto newso = s.accept();
918             if ( s is null ) {
919                 return null;
920             }
921             auto newstream = new TCPSocketStream();
922             newstream.s = newso;
923             newstream.__isOpen = true;
924             newstream.__isConnected = true;
925             newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
926             return newstream;
927         }
928     }
929 }
930 
931 public interface NetworkStream {
932     @property bool isConnected() const;
933     @property bool isOpen() const;
934 
935     void close() @trusted;
936 
937     ///
938     /// timeout is the socket write timeout.
939     ///
940     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
941 
942     ptrdiff_t send(const(void)[] buff);
943     ptrdiff_t receive(void[] buff);
944 
945     NetworkStream accept();
946     @property void reuseAddr(bool);
947     void bind(string);
948     void bind(Address);
949     void listen(int);
950     version(vibeD) {
951         TCPConnection so();
952     } else {
953         Socket so();
954     }
955     ///
956     /// Set timeout for receive calls. 0 means no timeout.
957     ///
958     @property void readTimeout(Duration timeout);
959 }
960 
961 public abstract class SocketStream : NetworkStream {
962     private {
963         Duration timeout;
964         Socket   s;
965         bool     __isOpen;
966         bool     __isConnected;
967         string   _bind;
968     }
969     void open(AddressFamily fa) {
970     }
971     @property Socket so() @safe pure {
972         return s;
973     }
974     @property bool isOpen() @safe @nogc pure const {
975         return s && __isOpen;
976     }
977     @property bool isConnected() @safe @nogc pure const {
978         return s && __isOpen && __isConnected;
979     }
980     void close() @trusted {
981         debug(requests) tracef("Close socket");
982         if ( isOpen ) {
983             s.close();
984             __isOpen = false;
985             __isConnected = false;
986         }
987         s = null;
988     }
989     /***
990     *  bind() just remember address. We will cal bind() at the time of connect as
991     *  we can have several connection trials.
992     ***/
993     override void bind(string to) {
994         _bind = to;
995     }
996     /***
997     *  Make connection to remote site. Bind, handle connection error, try several addresses, etc
998     ***/
999     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1000         debug(requests) tracef(format("Create connection to %s:%d", host, port));
1001         Address[] addresses;
1002         __isConnected = false;
1003         try {
1004             addresses = getAddress(host, port);
1005         } catch (Exception e) {
1006             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
1007         }
1008         foreach(a; addresses) {
1009             debug(requests) tracef("Trying %s", a);
1010             try {
1011                 open(a.addressFamily);
1012                 if ( _bind !is null ) {
1013                     auto ad = getAddress(_bind);
1014                     debug(requests) tracef("bind to %s", ad[0]);
1015                     s.bind(ad[0]);
1016                 }
1017                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
1018                 s.connect(a);
1019                 debug(requests) tracef("Connected to %s", a);
1020                 __isConnected = true;
1021                 break;
1022             } catch (SocketException e) {
1023                 debug(requests) warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
1024                 s.close();
1025             }
1026         }
1027         if ( !__isConnected ) {
1028             throw new ConnectError("Can't connect to %s:%d".format(host, port));
1029         }
1030         return this;
1031     }
1032 
1033     ptrdiff_t send(const(void)[] buff)
1034     in {assert(isConnected);}
1035     do {
1036         auto rc = s.send(buff);
1037         if (rc < 0) {
1038             close();
1039             throw new NetworkException("sending data: %s".format(to!string(strerror(errno))));
1040         }
1041         return rc;
1042     }
1043 
1044     ptrdiff_t receive(void[] buff) {
1045         while (true) {
1046             auto r = s.receive(buff);
1047             if (r < 0) {
1048                 auto e = errno;
1049                 version(Windows) {
1050                     close();
1051                     if ( e == 0 ) {
1052                         throw new TimeoutException("Timeout receiving data");
1053                     }
1054                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
1055                 }
1056                 version(Posix) {
1057                     if ( e == EINTR ) {
1058                         continue;
1059                     }
1060                     close();
1061                     if ( e == EAGAIN ) {
1062                         throw new TimeoutException("Timeout receiving data");
1063                     }
1064                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
1065                 }
1066             }
1067             else {
1068                 buff.length = r;
1069             }
1070             return r;
1071         }
1072         assert(false);
1073     }
1074 
1075     @property void readTimeout(Duration timeout) @safe {
1076         if ( __isConnected )
1077         {
1078             s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
1079         }
1080     }
1081     override SocketStream accept() {
1082         assert(false, "Implement before use");
1083     }
1084     @property override void reuseAddr(bool yes){
1085         if (yes) {
1086             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
1087         }
1088         else {
1089             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
1090         }
1091     }
1092     override void bind(Address addr){
1093         s.bind(addr);
1094     }
1095     override void listen(int n) {
1096         s.listen(n);
1097     };
1098 }
1099 
1100 version (vibeD) {
1101     import vibe.core.net, vibe.stream.tls;
1102 
1103     public class TCPVibeStream : NetworkStream {
1104     private:
1105         TCPConnection _conn;
1106         Duration _readTimeout = Duration.max;
1107         bool _isOpen = true;
1108         string _bind;
1109 
1110     public:
1111         @property bool isConnected() const {
1112             return _conn.connected;
1113         }
1114         @property override bool isOpen() const {
1115             return _conn && _isOpen;
1116         }
1117         void close() @trusted {
1118             _conn.close();
1119             _isOpen = false;
1120         }
1121         override TCPConnection so() {
1122             return _conn;
1123         }
1124         override void bind(string to) {
1125                 _bind = to;
1126         }
1127         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1128             // FIXME: timeout not supported in vibe.d
1129             try {
1130                 _conn = connectTCP(host, port, _bind);
1131             }
1132             catch (Exception e)
1133                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1134 
1135             return this;
1136         }
1137 
1138         ptrdiff_t send(const(void)[] buff) {
1139             _conn.write(cast(const(ubyte)[])buff);
1140             return buff.length;
1141         }
1142 
1143         ptrdiff_t receive(void[] buff) {
1144             if (!_conn.waitForData(_readTimeout)) {
1145                 if (!_conn.connected || _conn.empty ) {
1146                     return 0;
1147                 }
1148                 throw new TimeoutException("Timeout receiving data");
1149             }
1150 
1151             if(_conn.empty) {
1152                 return 0;
1153             }
1154 
1155             auto chunk = min(_conn.leastSize, buff.length);
1156             assert(chunk != 0);
1157             _conn.read(cast(ubyte[])buff[0 .. chunk]);
1158             return chunk;
1159         }
1160 
1161         @property void readTimeout(Duration timeout) {
1162             if (timeout == 0.seconds) {
1163                 _readTimeout = Duration.max;
1164             }
1165             else {
1166                 _readTimeout = timeout;
1167             }
1168         }
1169         override TCPVibeStream accept() {
1170             assert(false, "Must be implemented");
1171         }
1172         override @property void reuseAddr(bool){
1173             assert(false, "Not Implemented");
1174         }
1175         override void bind(Address){
1176             assert(false, "Not Implemented");
1177         }
1178         override void listen(int){
1179             assert(false, "Not Implemented");
1180         }
1181     }
1182 
1183     public class SSLVibeStream : TCPVibeStream {
1184     private:
1185         TLSStream _sslStream;
1186         bool   _isOpen = true;
1187         SSLOptions _sslOptions;
1188         TCPConnection underlyingConnection;
1189 
1190         void connectSSL(string host) {
1191             auto sslctx = createTLSContext(TLSContextKind.client);
1192             if ( _sslOptions.getVerifyPeer() ) {
1193                 if ( _sslOptions.getCaCert() == null ) {
1194                     throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate.");
1195                 }
1196                 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert());
1197                 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert;
1198             } else {
1199                 sslctx.peerValidationMode = TLSPeerValidationMode.none;
1200             }
1201             immutable keyFile = _sslOptions.getKeyFile();
1202             immutable certFile = _sslOptions.getCertFile();
1203             final switch(_sslOptions.haveFiles()) {
1204                 case 0b11:  // both files
1205                     sslctx.usePrivateKeyFile(keyFile);
1206                     sslctx.useCertificateChainFile(certFile);
1207                     break;
1208                 case 0b01:  // key only
1209                     sslctx.usePrivateKeyFile(keyFile);
1210                     sslctx.useCertificateChainFile(keyFile);
1211                     break;
1212                 case 0b10:  // cert only
1213                     sslctx.usePrivateKeyFile(certFile);
1214                     sslctx.useCertificateChainFile(certFile);
1215                     break;
1216                 case 0b00:
1217                     break;
1218             }
1219             _sslStream = createTLSStream(_conn, sslctx, host);
1220         }
1221 
1222     public:
1223         this(SSLOptions opts) {
1224             _sslOptions = opts;
1225         }
1226         override TCPConnection so() {
1227             return _conn;
1228         }
1229         this(NetworkStream ostream, SSLOptions opts, string host = null) {
1230             _sslOptions = opts;
1231             auto oconn = ostream.so();
1232             underlyingConnection = oconn;
1233             _conn = oconn;
1234             connectSSL(host);
1235         }
1236         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1237             try {
1238                 _conn = connectTCP(host, port);
1239                 connectSSL(host);
1240             }
1241             catch (ConnectError e) {
1242                 throw e;
1243             }
1244             catch (Exception e) {
1245                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1246             }
1247 
1248             return this;
1249         }
1250 
1251         override ptrdiff_t send(const(void)[] buff) {
1252             _sslStream.write(cast(const(ubyte)[])buff);
1253             return buff.length;
1254         }
1255 
1256         override ptrdiff_t receive(void[] buff) {
1257             if (!_sslStream.dataAvailableForRead) {
1258                 if (!_conn.waitForData(_readTimeout)) {
1259                     if (!_conn.connected) {
1260                         return 0;
1261                     }
1262                     throw new TimeoutException("Timeout receiving data");
1263                 }
1264             }
1265 
1266             if(_sslStream.empty) {
1267                 return 0;
1268             }
1269 
1270             auto chunk = min(_sslStream.leastSize, buff.length);
1271             assert(chunk != 0);
1272             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
1273             return chunk;
1274         }
1275 
1276         override void close() @trusted {
1277             if ( _sslStream )
1278             {
1279                 _sslStream.finalize();
1280             }
1281             _conn.close();
1282             _isOpen = false;
1283         }
1284         @property override bool isOpen() const {
1285             return _conn && _isOpen;
1286         }
1287         override SSLVibeStream accept() {
1288             assert(false, "Must be implemented");
1289         }
1290         override @property void reuseAddr(bool){
1291             assert(false, "Not Implemented");
1292         }
1293         override void bind(Address){
1294             assert(false, "Not Implemented");
1295         }
1296         override void listen(int){
1297             assert(false, "Not Implemented");
1298         }
1299     }
1300 }
1301 
1302 version (vibeD) {
1303     public alias TCPStream = TCPVibeStream;
1304     public alias SSLStream = SSLVibeStream;
1305 }
1306 else {
1307     public alias TCPStream = TCPSocketStream;
1308     public alias SSLStream = SSLSocketStream;
1309 }