1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 
20 alias InDataHandler = DataPipeIface!ubyte;
21 
22 public class ConnectError: Exception {
23     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
24         super(message, file, line, next);
25     }
26 }
27 
28 class DecodingException: Exception {
29     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
30         super(message, file, line, next);
31     }
32 }
33 
34 public class TimeoutException: Exception {
35     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
36         super(message, file, line, next);
37     }
38 }
39 
40 public class NetworkException: Exception {
41     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
42         super(message, file, line, next);
43     }
44 }
45 
46 /**
47  * DataPipeIface can accept some data, process, and return processed data.
48  */
49 public interface DataPipeIface(E) {
50     /// Is there any processed data ready for reading?
51     bool empty();
52     /// Put next data portion for processing
53     //void put(E[]);
54     void putNoCopy(E[]);
55     /// Get any ready data
56     E[] get();
57     /// Signal on end of incoming data stream.
58     void flush();
59 }
60 /**
61  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
62  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
63  * and uncompress Content-Encoding "gzip".
64  */
65 public class DataPipe(E) : DataPipeIface!E {
66 
67     DataPipeIface!(E)[]  pipe;
68     Buffer!E             buffer;
69     /// Append data processor to pipeline
70     /// Params:
71     /// p = processor
72     void insert(DataPipeIface!E p) {
73         pipe ~= p;
74     }
75     E[][] process(DataPipeIface!E p, E[][] data) {
76         E[][] result;
77         data.each!(e => p.putNoCopy(e));
78         while(!p.empty()) result ~= p.get();
79         return result;
80     }
81     /// Process next data portion. Data passed over pipeline and store result in buffer.
82     /// Params:
83     /// data = input data buffer.
84     /// NoCopy means we do not copy data to buffer, we keep reference
85     void putNoCopy(E[] data) {
86         if ( pipe.empty ) {
87             buffer.putNoCopy(data);
88             return;
89         }
90         try {
91             auto t = process(pipe.front, [data]);
92             foreach(ref p; pipe[1..$]) {
93                 t = process(p, t);
94             }
95             t.each!(b => buffer.putNoCopy(b));
96         }
97         catch (Exception e) {
98             throw new DecodingException(e.msg);
99         }
100     }
101     /// Get what was collected in internal buffer and clear it. 
102     /// Returns:
103     /// data collected.
104     E[] get() {
105         if ( buffer.empty ) {
106             return E[].init;
107         }
108         auto res = buffer.data;
109         buffer = Buffer!E.init;
110         return res;
111     }
112     ///
113     /// get without datamove. but user receive [][]
114     /// 
115     E[][] getNoCopy()  {
116         if ( buffer.empty ) {
117             return E[][].init;
118         }
119         E[][] res = buffer.__repr.__buffer;
120         buffer = Buffer!E.init;
121         return res;
122     }
123     /// Test if internal buffer is empty
124     /// Returns:
125     /// true if internal buffer is empty (nothing to get())
126     bool empty() pure const @safe {
127         return buffer.empty;
128     }
129     void flush() {
130         E[][] product;
131         foreach(ref p; pipe) {
132             product.each!(e => p.putNoCopy(e));
133             p.flush();
134             product.length = 0;
135             while( !p.empty ) product ~= p.get();
136         }
137         product.each!(b => buffer.putNoCopy(b));
138     }
139 }
140 
141 /**
142  * Processor for gzipped/compressed content.
143  * Also support InputRange interface.
144  */
145 public class Decompressor(E) : DataPipeIface!E {
146     private {
147         Buffer!ubyte __buff;
148         UnCompress   __zlib;
149     }
150     this() {
151         __buff = Buffer!ubyte();
152         __zlib = new UnCompress();
153     }
154     override void putNoCopy(E[] data) {
155         if ( __zlib is null  ) {
156             __zlib = new UnCompress();
157         }
158         __buff.putNoCopy(__zlib.uncompress(data));
159     }
160     override E[] get() pure {
161         assert(__buff.length);
162         auto r = __buff.__repr.__buffer[0];
163         __buff.popFrontN(r.length);
164         return cast(E[])r;
165     }
166     override void flush() {
167         if ( __zlib is null  ) {
168             return;
169         }
170         __buff.put(__zlib.flush());
171     }
172     override @property bool empty() const pure @safe {
173         debug(requests) tracef("empty=%b", __buff.empty);
174         return __buff.empty;
175     }
176     @property auto ref front() pure const @safe {
177         debug(requests) tracef("front: buff length=%d", __buff.length);
178         return __buff.front;
179     }
180     @property auto popFront() pure @safe {
181         debug(requests) tracef("popFront: buff length=%d", __buff.length);
182         return __buff.popFront;
183     }
184     @property void popFrontN(size_t n) pure @safe {
185         __buff.popFrontN(n);
186     }
187 }
188 
189 /**
190  * Unchunk chunked http responce body.
191  */
192 public class DecodeChunked : DataPipeIface!ubyte {
193     //    length := 0
194     //    read chunk-size, chunk-extension (if any) and CRLF
195     //    while (chunk-size > 0) {
196     //        read chunk-data and CRLF
197     //        append chunk-data to entity-body
198     //        length := length + chunk-size
199     //        read chunk-size and CRLF
200     //    }
201     //    read entity-header
202     //    while (entity-header not empty) {
203     //        append entity-header to existing header fields
204     //        read entity-header
205     //    }
206     //    Content-Length := length
207     //    Remove "chunked" from Transfer-Encoding
208     //
209 
210     //    Chunked-Body   = *chunk
211     //                      last-chunk
212     //                      trailer
213     //                      CRLF
214     //            
215     //    chunk          = chunk-size [ chunk-extension ] CRLF
216     //                     chunk-data CRLF
217     //                     chunk-size     = 1*HEX
218     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
219     //        
220     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
221     //    chunk-ext-name = token
222     //    chunk-ext-val  = token | quoted-string
223     //    chunk-data     = chunk-size(OCTET)
224     //    trailer        = *(entity-header CRLF)
225 
226     alias eType = ubyte;
227     immutable eType[] CRLF = ['\r', '\n'];
228     private {
229         enum         States {huntingSize, huntingSeparator, receiving, trailer};
230         char         state = States.huntingSize;
231         size_t       chunk_size, to_receive;
232         Buffer!ubyte buff;
233         ubyte[]      linebuff;
234     }
235     void putNoCopy(eType[] data) {
236         while ( data.length ) {
237             if ( state == States.trailer ) {
238                 to_receive = to_receive - min(to_receive, data.length);
239                 return;
240             }
241             if ( state == States.huntingSize ) {
242                 import std.ascii;
243                 ubyte[10] digits;
244                 int i;
245                 for(i=0;i<data.length;i++) {
246                     ubyte v = data[i];
247                     digits[i] = v;
248                     if ( v == '\n' ) {
249                         i+=1;
250                         break;
251                     }
252                 }
253                 linebuff ~= digits[0..i];
254                 if ( linebuff.length >= 80 ) {
255                     throw new DecodingException("Can't find chunk size in the body");
256                 }
257                 data = data[i..$];
258                 if (!linebuff.canFind(CRLF)) {
259                     continue;
260                 }
261                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
262                 state = States.receiving;
263                 to_receive = chunk_size;
264                 if ( chunk_size == 0 ) {
265                     to_receive = 2-min(2, data.length); // trailing \r\n
266                     state = States.trailer;
267                     return;
268                 }
269                 continue;
270             }
271             if ( state == States.receiving ) {
272                 if (to_receive > 0 ) {
273                     auto can_store = min(to_receive, data.length);
274                     buff.putNoCopy(data[0..can_store]);
275                     data = data[can_store..$];
276                     to_receive -= can_store;
277                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
278                     if ( to_receive == 0 ) {
279                         //tracef("switch to huntig separator");
280                         state = States.huntingSeparator;
281                         continue;
282                     }
283                     continue;
284                 }
285                 assert(false);
286             }
287             if ( state == States.huntingSeparator ) {
288                 if ( data[0] == '\n' || data[0]=='\r') {
289                     data = data[1..$];
290                     continue;
291                 }
292                 state = States.huntingSize;
293                 linebuff.length = 0;
294                 continue;
295             }
296         }
297     }
298     eType[] get() {
299         auto r = buff.__repr.__buffer[0];
300         buff.popFrontN(r.length);
301         return r;
302     }
303     void flush() {
304     }
305     bool empty() {
306         debug(requests) tracef("empty=%b", buff.empty);
307         return buff.empty;
308     }
309     bool done() {
310         return state==States.trailer && to_receive==0;
311     }
312 }
313 
314 unittest {
315     info("Testing DataPipe");
316     globalLogLevel(LogLevel.info);
317     alias eType = char;
318     eType[] gzipped = [
319         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
320         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
321         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
322         0x08, 0x00, 0x00, 0x00
323     ]; // "abc\ndef\n"
324     auto d = new Decompressor!eType();
325     d.putNoCopy(gzipped[0..2].dup);
326     d.putNoCopy(gzipped[2..10].dup);
327     d.putNoCopy(gzipped[10..$].dup);
328     d.flush();
329     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
330 
331     auto e = new Decompressor!eType();
332     e.putNoCopy(gzipped[0..10].dup);
333     e.putNoCopy(gzipped[10..$].dup);
334     e.flush();
335     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
336     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
337     auto dp = new DataPipe!eType;
338     dp.insert(new Decompressor!eType());
339     dp.putNoCopy(gzipped[0..2].dup);
340     dp.putNoCopy(gzipped[2..$].dup);
341     dp.flush();
342     assert(equal(dp.get(), "abc\ndef\n"));
343     // empty datapipe shoul just pass input to output
344     auto dpu = new DataPipe!ubyte;
345     dpu.putNoCopy("abcd".dup.representation);
346     dpu.putNoCopy("efgh".dup.representation);
347     dpu.flush();
348     assert(equal(dpu.get(), "abcdefgh"));
349     info("Test unchunker properties");
350     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation;
351     ubyte[][] result;
352     auto uc = new DecodeChunked();
353     uc.putNoCopy(twoChunks);
354     while(!uc.empty) {
355         result ~= uc.get();
356     }
357     assert(equal(result[0], ['1', '2']));
358     assert(equal(result[1], ['3', '4']));
359     info("unchunker correctness - ok");
360     result[0][0] = '5';
361     assert(twoChunks[3] == '5');
362     info("unchunker zero copy - ok");
363     info("Testing DataPipe - done");
364 }
365 /**
366  * Buffer used to collect and process data from network. It remainds Appender, but support
367  * also Range interface.
368  * $(P To place data in buffer use put() method.)
369  * $(P  To retrieve data from buffer you can use several methods:)
370  * $(UL
371  *  $(LI Range methods: front, back, index [])
372  *  $(LI data method: return collected data (like Appender.data))
373  * )
374  */
375 static this() {
376 }
377 static ~this() {
378 }
379 enum   CACHESIZE = 1024;
380 
381 static long reprAlloc;
382 static long reprCacheHit;
383 static long reprCacheRequests;
384 
385 
386 public struct Buffer(T) {
387 //    static Repr[CACHESIZE]  cache;
388 //    static uint             cacheIndex;
389 
390     private {
391         Repr  cachedOrNew() {
392             return new Repr;
393 //            reprCacheRequests++;
394 //            if ( false && cacheIndex>0 ) {
395 //                reprCacheHit++;
396 //                cacheIndex -= 1;
397 //                return cache[cacheIndex];
398 //            } else {
399 //                return new Repr;
400 //            }
401         }
402         class Repr {
403             size_t         __length;
404             Unqual!T[][]   __buffer;
405             this() {
406                 reprAlloc++;
407                 __length = 0;
408             }
409             this(Repr other) {
410                 reprAlloc++;
411                 if ( other is null )
412                     return;
413                 __length = other.__length;
414                 __buffer = other.__buffer.dup;
415             }
416         }
417         Repr __repr;
418     }
419     
420     alias toString = data!string;
421     
422     this(this) {
423         if ( !__repr ) {
424             return;
425         }
426         __repr = new Repr(__repr);
427     }
428     this(U)(U[] data) {
429         put(data);
430     }
431     ~this() {
432         __repr = null;
433     }
434     /***************
435      * store data. Data copied
436      */
437     auto put(U)(U[] data) {
438         if ( data.length == 0 ) {
439             return;
440         }
441         if ( !__repr ) {
442             __repr = cachedOrNew();
443         }
444         static if (!is(U == T)) {
445             auto d = cast(T[])(data);
446             __repr.__length += d.length;
447             __repr.__buffer ~= d.dup;
448         } else {
449             __repr.__length += data.length;
450             __repr.__buffer ~= data.dup;
451         }
452         return;
453     }
454     auto putNoCopy(U)(U[] data) {
455         if ( data.length == 0 ) {
456             return;
457         }
458         if ( !__repr ) {
459             __repr = cachedOrNew();
460         }
461         static if (!is(U == T)) {
462             auto d = cast(T[])(data);
463             __repr.__length += d.length;
464             __repr.__buffer ~= d;
465         } else {
466             __repr.__length += data.length;
467             __repr.__buffer ~= data;
468         }
469         return;
470     }
471     @property auto opDollar() const pure @safe {
472         return __repr.__length;
473     }
474     @property auto length() const pure @safe {
475         if ( !__repr ) {
476             return 0;
477         }
478         return __repr.__length;
479     }
480     @property auto empty() const pure @safe {
481         return length == 0;
482     }
483     @property auto ref front() const pure @safe {
484         assert(length);
485         return __repr.__buffer.front.front;
486     }
487     @property auto ref back() const pure @safe {
488         assert(length);
489         return __repr.__buffer.back.back;
490     }
491     @property void popFront() pure @safe {
492         assert(length);
493         with ( __repr ) {
494             __buffer.front.popFront;
495             if ( __buffer.front.length == 0 ) {
496                 __buffer.popFront;
497             }
498             __length--;
499         }
500     }
501     @property void popFrontN(size_t n) pure @safe {
502         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
503         __repr.__length -= n;
504         while( n ) {
505             if ( n <= __repr.__buffer.front.length ) {
506                 __repr.__buffer.front.popFrontN(n);
507                 if ( __repr.__buffer.front.length == 0 ) {
508                     __repr.__buffer.popFront;
509                 }
510                 return;
511             }
512             n -= __repr.__buffer.front.length;
513             __repr.__buffer.popFront;
514         }
515     }
516     @property void popBack() pure @safe {
517         assert(length);
518         __repr.__buffer.back.popBack;
519         if ( __repr.__buffer.back.length == 0 ) {
520             __repr.__buffer.popBack;
521         }
522         __repr.__length--;
523     }
524     @property void popBackN(size_t n) pure @safe {
525         assert(n <= length, "n: %d, length: %d".format(n, length));
526         __repr.__length -= n;
527         while( n ) {
528             if ( n <= __repr.__buffer.back.length ) {
529                 __repr.__buffer.back.popBackN(n);
530                 if ( __repr.__buffer.back.length == 0 ) {
531                     __repr.__buffer.popBack;
532                 }
533                 return;
534             }
535             n -= __repr.__buffer.back.length;
536             __repr.__buffer.popBack;
537         }
538     }
539     @property auto save() @safe {
540         auto n = Buffer!T();
541         n.__repr = new Repr(__repr);
542         return n;
543     }
544     @property auto ref opIndex(size_t n) const pure @safe {
545         assert( __repr && n < __repr.__length );
546         foreach(b; __repr.__buffer) {
547             if ( n < b.length ) {
548                 return b[n];
549             }
550             n -= b.length;
551         }
552         assert(false, "Impossible");
553     }
554     Buffer!T opSlice(size_t m, size_t n) {
555         if ( empty || m == n ) {
556             return Buffer!T();
557         }
558         assert( m <= n && n <= __repr.__length);
559         auto res = this.save();
560         res.popBackN(res.__repr.__length-n);
561         res.popFrontN(m);
562         return res;
563     }
564     @property auto data(U=T[])() pure {
565         static if ( is(U==T[]) ) {
566             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
567                 return __repr.__buffer.front;
568             }
569         }
570         Appender!(T[]) a;
571         if ( __repr && __repr.__buffer ) {
572             foreach(ref b; __repr.__buffer) {
573                 a.put(b);
574             }
575         }
576         static if ( is(U==T[]) ) {
577             return a.data;
578         } else {
579             return cast(U)a.data;
580         }
581     }
582     string opCast(string)() {
583         return this.toString;
584     }
585     bool opEquals(U)(U x) {
586         return cast(U)this == x;
587     }
588 
589 }
590 ///
591 public unittest {
592 
593     static assert(isInputRange!(Buffer!ubyte));
594     static assert(isForwardRange!(Buffer!ubyte));
595     static assert(hasLength!(Buffer!ubyte));
596     static assert(hasSlicing!(Buffer!ubyte));
597     static assert(isBidirectionalRange!(Buffer!ubyte));
598     static assert(isRandomAccessRange!(Buffer!ubyte));
599     
600     auto b = Buffer!ubyte();
601     b.put("abc".representation.dup);
602     b.put("def".representation.dup);
603     assert(b.length == 6);
604     assert(b.toString == "abcdef");
605     assert(b.front == 'a');
606     assert(b.back == 'f');
607     assert(equal(b[0..$], "abcdef"));
608     assert(equal(b[$-2..$], "ef"));
609     assert(b == "abcdef");
610     b.popFront;
611     b.popBack;
612     assert(b.front == 'b');
613     assert(b.back == 'e');
614     assert(b.length == 4);
615     assert(retro(b).front == 'e');
616     assert(countUntil(b, 'e') == 3);
617     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
618     assert(equal(b, "bcde"));
619     b.popFront; b.popFront;
620     assert(b.front == 'd');
621     assert(b.front == b[0]);
622     assert(b.back == b[$-1]);
623 
624     auto c = Buffer!ubyte();
625     c.put("Header0: value0\n".representation.dup);
626     c.put("Header1: value1\n".representation.dup);
627     c.put("Header2: value2\n\nbody".representation.dup);
628     auto c_length = c.length;
629     auto eoh = countUntil(c, "\n\n");
630     assert(eoh == 47);
631     foreach(header; c[0..eoh].splitter('\n') ) {
632         writeln(cast(string)header.data);
633     }
634     assert(equal(findSplit(c, "\n\n")[2], "body"));
635     assert(c.length == c_length);
636 }
637 
638 public struct SSLOptions {
639     enum filetype {
640         pem,
641         asn1,
642         der = asn1,
643     }
644     private {
645         /**
646          * do we need to veryfy peer?
647          */
648         bool     _verifyPeer = false;
649         /**
650          * path to CA cert
651          */
652         string   _caCert;
653         /**
654          * path to key file (can also contain cert (for pem)
655          */
656         string   _keyFile;
657         /**
658          * path to cert file (can also contain key (for pem)
659          */
660         string   _certFile;
661         filetype _keyType = filetype.pem;
662         filetype _certType = filetype.pem;
663     }
664     ubyte haveFiles() pure nothrow @safe @nogc {
665         ubyte r = 0;
666         if ( _keyFile  ) r|=1;
667         if ( _certFile ) r|=2;
668         return r;
669     }
670     // do we want to verify peer certificates?
671     bool getVerifyPeer() pure nothrow @nogc {
672         return _verifyPeer;
673     }
674     SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe {
675         _verifyPeer = v;
676         return this;
677     }
678     /// set key file name and type (default - pem)
679     auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
680         _keyFile = f;
681         _keyType = t;
682         return this;
683     }
684     auto getKeyFile() @safe pure nothrow @nogc {
685         return _keyFile;
686     }
687     auto getKeyType() @safe pure nothrow @nogc {
688         return _keyType;
689     }
690     /// set cert file name and type (default - pem)
691     auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
692         _certFile = f;
693         _certType = t;
694         return this;
695     }
696     auto setCaCert(string p) @safe pure nothrow @nogc {
697         _caCert = p;
698         return this;
699     }
700     auto getCaCert() @safe pure nothrow @nogc {
701         return _caCert;
702     }
703     auto getCertFile() @safe pure nothrow @nogc {
704         return _certFile;
705     }
706     auto getCertType() @safe pure nothrow @nogc {
707         return _certType;
708     }
709     /// set key file type
710     void setKeyType(string t) @safe pure nothrow {
711         _keyType = cast(filetype)sslKeyTypes[t];
712     }
713     /// set cert file type
714     void setCertType(string t) @safe pure nothrow {
715         _certType = cast(filetype)sslKeyTypes[t];
716     }
717 }
718 static immutable int[string] sslKeyTypes;
719 shared static this() {
720     sslKeyTypes = [
721         "pem":SSLOptions.filetype.pem,
722         "asn1":SSLOptions.filetype.asn1,
723         "der":SSLOptions.filetype.der,
724     ];
725 }
726 
727 version(vibeD) {
728 }
729 else {
730     extern(C) {
731         int SSL_library_init();
732         void OpenSSL_add_all_ciphers();
733         void OpenSSL_add_all_digests();
734         void SSL_load_error_strings();
735 
736         struct SSL {}
737         struct SSL_CTX {}
738         struct SSL_METHOD {}
739 
740         SSL_CTX* SSL_CTX_new(const SSL_METHOD* method);
741         SSL* SSL_new(SSL_CTX*);
742         int SSL_set_fd(SSL*, int);
743         int SSL_connect(SSL*);
744         int SSL_write(SSL*, const void*, int);
745         int SSL_read(SSL*, void*, int);
746         int SSL_shutdown(SSL*) @trusted @nogc nothrow;
747         void SSL_free(SSL*);
748         void SSL_CTX_free(SSL_CTX*);
749 
750         long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
751 
752         long SSL_CTX_set_mode(SSL_CTX *ctx, long mode);
753         int  SSL_CTX_set_default_verify_paths(SSL_CTX *ctx);
754         int SSL_CTX_load_verify_locations(SSL_CTX *ctx, const char *CAfile, const char *CApath);
755         void SSL_CTX_set_verify(SSL_CTX *ctx, int mode, void *);
756         long SSL_set_mode(SSL *ssl, long mode);
757         int  SSL_CTX_use_PrivateKey_file(SSL_CTX *ctx, const char *file, int type);
758         int  SSL_CTX_use_certificate_file(SSL_CTX *ctx, const char *file, int type);
759 
760         long SSL_CTX_get_mode(SSL_CTX *ctx);
761         long SSL_get_mode(SSL *ssl);
762 
763         long ERR_get_error();
764         char* ERR_reason_error_string(ulong e);
765 
766         SSL_METHOD* SSLv3_client_method();
767         SSL_METHOD* TLSv1_2_client_method();
768         SSL_METHOD* TLSv1_client_method();
769     }
770 
771     enum SSL_VERIFY_PEER = 0x01;
772     enum SSL_FILETYPE_PEM = 1;
773     enum SSL_FILETYPE_ASN1 = 2;
774 
775     immutable int[SSLOptions.filetype] ft2ssl;
776 
777     shared static this() {
778         SSL_library_init();
779         OpenSSL_add_all_ciphers();
780         OpenSSL_add_all_digests();
781         SSL_load_error_strings();
782         ft2ssl = [
783             SSLOptions.filetype.pem: SSL_FILETYPE_PEM,
784             SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1,
785             SSLOptions.filetype.der: SSL_FILETYPE_ASN1
786         ];
787     }
788 
789     public class OpenSslSocket : Socket {
790         //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
791         private SSL* ssl;
792         private SSL_CTX* ctx;
793         private void initSsl(SSLOptions opts) {
794             //ctx = SSL_CTX_new(SSLv3_client_method());
795             ctx = SSL_CTX_new(TLSv1_client_method());
796             assert(ctx !is null);
797             if ( opts.getVerifyPeer() ) {
798                 SSL_CTX_set_default_verify_paths(ctx);
799                 if ( opts.getCaCert() ) {
800                     SSL_CTX_load_verify_locations(ctx, opts.getCaCert().toStringz(), null);
801                 }
802                 SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null);
803             }
804             immutable keyFile = opts.getKeyFile();
805             immutable keyType = opts.getKeyType();
806             immutable certFile = opts.getCertFile();
807             immutable certType = opts.getCertType();
808             final switch(opts.haveFiles()) {
809                 case 0b11:  // both files
810                     SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
811                     SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]);
812                     break;
813                 case 0b01:  // key only
814                     SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
815                     SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]);
816                     break;
817                 case 0b10:  // cert only
818                     SSL_CTX_use_PrivateKey_file(ctx,  certFile.toStringz(), ft2ssl[certType]);
819                     SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]);
820                     break;
821                 case 0b00:
822                     break;
823             }
824             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
825             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
826             ssl = SSL_new(ctx);
827             SSL_set_fd(ssl, this.handle);
828         }
829 
830         @trusted
831         override void connect(Address dest) {
832             super.connect(dest);
833             if(SSL_connect(ssl) == -1) {
834                 throw new Exception("ssl connect failed: %s".format(to!string(ERR_reason_error_string(ERR_get_error()))));
835             }
836         }
837 
838         @trusted
839         override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
840             return SSL_write(ssl, buf.ptr, cast(uint) buf.length);
841         }
842         override ptrdiff_t send(const(void)[] buf) {
843             return send(buf, SocketFlags.NONE);
844         }
845         @trusted
846         override ptrdiff_t receive(void[] buf, SocketFlags flags) {
847             return SSL_read(ssl, buf.ptr, cast(int)buf.length);
848         }
849         override ptrdiff_t receive(void[] buf) {
850             return receive(buf, SocketFlags.NONE);
851         }
852         this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) {
853             super(af, type);
854             initSsl(opts);
855         }
856 
857         this(socket_t sock, AddressFamily af) {
858             super(sock, af);
859             initSsl(SSLOptions());
860         }
861         override void close() {
862             //SSL_shutdown(ssl);
863             super.close();
864         }
865         ~this() {
866             SSL_free(ssl);
867             SSL_CTX_free(ctx);
868         }
869     }
870 
871     public class SSLSocketStream: SocketStream {
872         SSLOptions _sslOptions;
873 
874         this(SSLOptions opts) {
875             _sslOptions = opts;
876         }
877 
878         override void open(AddressFamily fa) {
879             if ( s !is null ) {
880                 s.close();
881             }
882             s = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions);
883             assert(s !is null, "Can't create socket");
884             __isOpen = true;
885         }
886         override SSLSocketStream accept() {
887             auto newso = s.accept();
888             if ( s is null ) {
889                 return null;
890             }
891             auto newstream = new SSLSocketStream(_sslOptions);
892             auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily);
893             newstream.s = sslSocket;
894             newstream.__isOpen = true;
895             newstream.__isConnected = true;
896             return newstream;
897         }
898     }
899 }
900 
901 public interface NetworkStream {
902     @property bool isConnected() const;
903     @property bool isOpen() const;
904 
905     void close() @trusted;
906 
907     ///
908     /// timeout is the socket write timeout.
909     ///
910     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
911 
912     ptrdiff_t send(const(void)[] buff);
913     ptrdiff_t receive(void[] buff);
914 
915     NetworkStream accept();
916     @property void reuseAddr(bool);
917     void bind(Address);
918     void listen(int);
919 
920     ///
921     /// Set timeout for receive calls. 0 means no timeout.
922     ///
923     @property void readTimeout(Duration timeout);
924 }
925 
926 public abstract class SocketStream : NetworkStream {
927     private {
928         Duration timeout;
929         Socket   s;
930         bool     __isOpen;
931         bool     __isConnected;
932     }
933     void open(AddressFamily fa) {
934     }
935     @property ref Socket so() @safe pure {
936         return s;
937     }
938     @property bool isOpen() @safe @nogc pure const {
939         return s && __isOpen;
940     }
941     @property bool isConnected() @safe @nogc pure const {
942         return s && __isOpen && __isConnected;
943     }
944     void close() @trusted {
945         debug(requests) tracef("Close socket");
946         if ( isOpen ) {
947             s.close();
948             __isOpen = false;
949             __isConnected = false;
950         }
951         s = null;
952     }
953     
954     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
955         debug(requests) tracef(format("Create connection to %s:%d", host, port));
956         Address[] addresses;
957         __isConnected = false;
958         try {
959             addresses = getAddress(host, port);
960         } catch (Exception e) {
961             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
962         }
963         foreach(a; addresses) {
964             debug(requests) tracef("Trying %s", a);
965             try {
966                 open(a.addressFamily);
967                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
968                 s.connect(a);
969                 debug(requests) tracef("Connected to %s", a);
970                 __isConnected = true;
971                 break;
972             } catch (SocketException e) {
973                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
974                 s.close();
975             }
976         }
977         if ( !__isConnected ) {
978             throw new ConnectError("Can't connect to %s:%d".format(host, port));
979         }
980         return this;
981     }
982     
983     ptrdiff_t send(const(void)[] buff) @safe
984     in {assert(isConnected);}
985     body {
986         auto rc = s.send(buff);
987         if (rc < 0) {
988             close();
989             throw new NetworkException("sending data");
990         }
991         return rc;
992     }
993     
994     ptrdiff_t receive(void[] buff) @safe {
995         while (true) {
996             auto r = s.receive(buff);
997             if (r < 0) {
998                 version(Windows) {
999                     close();
1000                     if ( errno == 0 ) {
1001                         throw new TimeoutException("Timeout receiving data");
1002                     }
1003                     throw new NetworkException("receiving data");
1004                 }
1005                 version(Posix) {
1006                     if ( errno == EINTR ) {
1007                         continue;
1008                     }
1009                     close();
1010                     if ( errno == EAGAIN ) {
1011                         throw new TimeoutException("Timeout receiving data");
1012                     }
1013                     throw new NetworkException("receiving data");
1014                 }
1015             }
1016             else {
1017                 buff.length = r;
1018             }
1019             return r;
1020         }
1021         assert(false);
1022     }
1023 
1024     @property void readTimeout(Duration timeout) @safe {
1025         s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
1026     }
1027     override SocketStream accept() {
1028         assert(false, "Implement before use");
1029     }
1030     @property override void reuseAddr(bool yes){
1031         if (yes) {
1032             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
1033         }
1034         else {
1035             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
1036         }
1037     }
1038     override void bind(Address addr){
1039         s.bind(addr);
1040     }
1041     override void listen(int n) {
1042         s.listen(n);
1043     };
1044 }
1045 
1046 public class TCPSocketStream : SocketStream {
1047     override void open(AddressFamily fa) {
1048         if ( s !is null ) {
1049             s.close();
1050         }
1051         s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
1052         assert(s !is null, "Can't create socket");
1053         __isOpen = true;
1054         s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
1055     }
1056     override TCPSocketStream accept() {
1057         auto newso = s.accept();
1058         if ( s is null ) {
1059             return null;
1060         }
1061         auto newstream = new TCPSocketStream();
1062         newstream.s = newso;
1063         newstream.__isOpen = true;
1064         newstream.__isConnected = true;
1065         newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
1066         return newstream;
1067     }
1068 }
1069 
1070 version (vibeD) {
1071     import vibe.core.net, vibe.stream.tls;
1072 
1073     public class TCPVibeStream : NetworkStream {
1074     private:
1075         TCPConnection _conn;
1076         Duration _readTimeout = Duration.max;
1077         bool _isOpen = true;
1078 
1079     public:
1080         @property bool isConnected() const {
1081             return _conn.connected;
1082         }
1083         @property override bool isOpen() const {
1084             return _conn && _isOpen;
1085         }
1086         void close() @trusted {
1087             _conn.close();
1088             _isOpen = false;
1089         }
1090 
1091         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1092             // FIXME: timeout not supported in vibe.d
1093             try {
1094                 _conn = connectTCP(host, port);
1095             }
1096             catch (Exception e)
1097                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1098 
1099             return this;
1100         }
1101 
1102         ptrdiff_t send(const(void)[] buff) {
1103             _conn.write(cast(const(ubyte)[])buff);
1104             return buff.length;
1105         }
1106 
1107         ptrdiff_t receive(void[] buff) {
1108             if (!_conn.waitForData(_readTimeout)) {
1109                 if (!_conn.connected) {
1110                     return 0;
1111                 }
1112                 throw new TimeoutException("Timeout receiving data");
1113             }
1114 
1115             if(_conn.empty) {
1116                 return 0;
1117             }
1118 
1119             auto chunk = min(_conn.leastSize, buff.length);
1120             assert(chunk != 0);
1121             _conn.read(cast(ubyte[])buff[0 .. chunk]);
1122             return chunk;
1123         }
1124 
1125         @property void readTimeout(Duration timeout) {
1126             if (timeout == 0.seconds) {
1127                 _readTimeout = Duration.max;
1128             }
1129             else {
1130                 _readTimeout = timeout;
1131             }
1132         }
1133         override TCPVibeStream accept() {
1134             assert(false, "Must be implemented");
1135         }
1136         override @property void reuseAddr(bool){
1137             assert(false, "Not Implemented");
1138         }
1139         override void bind(Address){
1140             assert(false, "Not Implemented");
1141         }
1142         override void listen(int){
1143             assert(false, "Not Implemented");
1144         }
1145     }
1146 
1147     public class SSLVibeStream : TCPVibeStream {
1148     private:
1149         Stream _sslStream;
1150         bool   _isOpen = true;
1151         SSLOptions _sslOptions;
1152 
1153     public:
1154         this(SSLOptions opts) {
1155             _sslOptions = opts;
1156         }
1157         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1158             try {
1159                 _conn = connectTCP(host, port);
1160                 auto sslctx = createTLSContext(TLSContextKind.client);
1161                 if ( _sslOptions.getVerifyPeer() ) {
1162                     if ( _sslOptions.getCaCert() == null ) {
1163                         throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate.");
1164                     }
1165                     sslctx.useTrustedCertificateFile(_sslOptions.getCaCert());
1166                     sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert;
1167                 } else {
1168                     sslctx.peerValidationMode = TLSPeerValidationMode.none;
1169                 }
1170                 immutable keyFile = _sslOptions.getKeyFile();
1171                 immutable certFile = _sslOptions.getCertFile();
1172                 final switch(_sslOptions.haveFiles()) {
1173                     case 0b11:  // both files
1174                         sslctx.usePrivateKeyFile(keyFile);
1175                         sslctx.useCertificateChainFile(certFile);
1176                         break;
1177                     case 0b01:  // key only
1178                         sslctx.usePrivateKeyFile(keyFile);
1179                         sslctx.useCertificateChainFile(keyFile);
1180                         break;
1181                     case 0b10:  // cert only
1182                         sslctx.usePrivateKeyFile(certFile);
1183                         sslctx.useCertificateChainFile(certFile);
1184                         break;
1185                     case 0b00:
1186                         break;
1187                 }
1188                 _sslStream = createTLSStream(_conn, sslctx, host);
1189             }
1190             catch (ConnectError e) {
1191                 throw e;
1192             }
1193             catch (Exception e) {
1194                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1195             }
1196 
1197             return this;
1198         }
1199 
1200         override ptrdiff_t send(const(void)[] buff) {
1201             _sslStream.write(cast(const(ubyte)[])buff);
1202             return buff.length;
1203         }
1204 
1205         override ptrdiff_t receive(void[] buff) {
1206             if (!_sslStream.dataAvailableForRead) {
1207                 if (!_conn.waitForData(_readTimeout)) {
1208                     if (!_conn.connected) {
1209                         return 0;
1210                     }
1211                     throw new TimeoutException("Timeout receiving data");
1212                 }
1213             }
1214 
1215             if(_sslStream.empty) {
1216                 return 0;
1217             }
1218 
1219             auto chunk = min(_sslStream.leastSize, buff.length);
1220             assert(chunk != 0);
1221             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
1222             return chunk;
1223         }
1224 
1225         override void close() @trusted {
1226             _sslStream.finalize();
1227             _conn.close();
1228             _isOpen = false;
1229         }
1230         @property override bool isOpen() const {
1231             return _conn && _isOpen;
1232         }
1233         override SSLVibeStream accept() {
1234             assert(false, "Must be implemented");
1235         }
1236         override @property void reuseAddr(bool){
1237             assert(false, "Not Implemented");
1238         }
1239         override void bind(Address){
1240             assert(false, "Not Implemented");
1241         }
1242         override void listen(int){
1243             assert(false, "Not Implemented");
1244         }
1245     }
1246 }
1247 
1248 version (vibeD) {
1249     public alias TCPStream = TCPVibeStream;
1250     public alias SSLStream = SSLVibeStream;
1251 }
1252 else {
1253     public alias TCPStream = TCPSocketStream;
1254     public alias SSLStream = SSLSocketStream;
1255 }