1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 
19 alias InDataHandler = DataPipeIface!ubyte;
20 
21 public class ConnectError: Exception {
22     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
23         super(msg, file, line);
24     }
25 }
26 
27 class DecodingExceptioin: Exception {
28     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
29         super(msg, file, line);
30     }
31 }
32 /**
33  * DataPipeIface can accept some data, process, and return processed data.
34  */
35 public interface DataPipeIface(E) {
36     /// Is there any processed data ready for reading?
37     bool empty();
38     /// Put next data portion for processing
39     void put(E[]);
40     void putNoCopy(E[]);
41     /// Get any ready data
42     E[] get();
43     /// Signal on end of incoming data stream.
44     void flush();
45 }
46 /**
47  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
48  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
49  * and uncompress Content-Encoding "gzip".
50  */
51 public class DataPipe(E) : DataPipeIface!E {
52 
53     DataPipeIface!(E)[]  pipe;
54     Buffer!E             buffer;
55     /// Append data processor to pipeline
56     /// Params:
57     /// p = processor
58     void insert(DataPipeIface!E p) {
59         pipe ~= p;
60     }
61     E[][] process(DataPipeIface!E p, E[][] data) {
62         E[][] result;
63         data.each!(e => p.put(e));
64         while(!p.empty()) result ~= p.get();
65         return result;
66     }
67     /// Process next data portion. Data passed over pipeline and result stored in buffer.
68     /// Params:
69     /// data = input data array.
70     void put(E[] data) {
71         if ( pipe.empty ) {
72             buffer.put(data);
73             return;
74         }
75         auto t = process(pipe.front, [data]);
76         foreach(ref p; pipe[1..$]) {
77             t = process(p, t);
78         }
79         t.each!(b => buffer.put(b));
80     }
81     void putNoCopy(E[] data) {
82         if ( pipe.empty ) {
83             buffer.putNoCopy(data);
84             return;
85         }
86         auto t = process(pipe.front, [data]);
87         foreach(ref p; pipe[1..$]) {
88             t = process(p, t);
89         }
90         t.each!(b => buffer.putNoCopy(b));
91     }
92     /// Process next data portion. Data passed over pipeline and store result in buffer.
93     /// Params:
94     /// buff = input data buffer.
95     void put(Buffer!E buff) {
96         if ( pipe.empty ) {
97             if ( buffer.__repr is null ) {
98                 buffer = buff;
99                 return;
100             }
101             buffer.__repr.__buffer ~= buff.__repr.__buffer;
102             buffer.__repr.__length += buff.length;
103             return;
104         }
105         auto t = process(pipe.front, buff.__repr.__buffer);
106         foreach(ref p; pipe[1..$]) {
107             t = process(p, t);
108         }
109         t.each!(b => buffer.put(b));
110     }
111     /// Get what was collected in internal buffer and clear it. 
112     /// Returns:
113     /// data collected.
114     E[] get() pure {
115         if ( buffer.empty ) {
116             return E[].init;
117         }
118         auto res = buffer.data;
119         buffer = Buffer!E.init;
120         return res;
121     }
122     /// Test if internal buffer is empty
123     /// Returns:
124     /// true if internal buffer is empty (nothing to get())
125     bool empty() pure const @safe {
126         return buffer.empty;
127     }
128     void flush() {
129         E[][] product;
130         foreach(ref p; pipe) {
131             product.each!(e => p.put(e));
132             p.flush();
133             product.length = 0;
134             while( !p.empty ) product ~= p.get();
135         }
136         product.each!(b => buffer.put(b));
137     }
138 }
139 
140 /**
141  * Processor for gzipped/compressed content.
142  * Also support InputRange interface.
143  */
144 public class Decompressor(E) : DataPipeIface!E {
145     private {
146         Buffer!ubyte __buff;
147         UnCompress   __zlib;
148     }
149     this() {
150         __buff = Buffer!ubyte();
151         __zlib = new UnCompress();
152     }
153 //    this(E[] r) {
154 //        //__range = r;
155 //        __buff = Buffer!ubyte();
156 //        __zlib = new UnCompress();
157 //        auto l = r.length;
158 //        if ( l ) {
159 //            __buff.put(__zlib.uncompress(r.take(l)));
160 //        }
161 //    }
162     override void putNoCopy(E[] data) {
163         if ( __zlib is null  ) {
164             __zlib = new UnCompress();
165         }
166         __buff.put(__zlib.uncompress(data));
167     }
168     override void put(E[] data) {
169         if ( __zlib is null  ) {
170             __zlib = new UnCompress();
171         }
172         __buff.put(__zlib.uncompress(data));
173     }
174     override E[] get() pure {
175         assert(__buff.length);
176         auto r = __buff.__repr.__buffer[0];
177         __buff.popFrontN(r.length);
178         return cast(E[])r;
179     }
180     override void flush() {
181         if ( __zlib is null  ) {
182             return;
183         }
184         __buff.put(__zlib.flush());
185     }
186     override @property bool empty() const pure @safe {
187         debug tracef("empty=%b", __buff.empty);
188         return __buff.empty;
189     }
190     @property auto ref front() pure const @safe {
191         debug tracef("front: buff length=%d", __buff.length);
192         return __buff.front;
193     }
194     @property auto popFront() pure @safe {
195         debug tracef("popFront: buff length=%d", __buff.length);
196         return __buff.popFront;
197     }
198     @property void popFrontN(size_t n) pure @safe {
199         __buff.popFrontN(n);
200     }
201 }
202 
203 /**
204  * Unchunk chunked http responce body.
205  */
206 public class DecodeChunked : DataPipeIface!ubyte {
207     //    length := 0
208     //    read chunk-size, chunk-extension (if any) and CRLF
209     //    while (chunk-size > 0) {
210     //        read chunk-data and CRLF
211     //        append chunk-data to entity-body
212     //        length := length + chunk-size
213     //        read chunk-size and CRLF
214     //    }
215     //    read entity-header
216     //    while (entity-header not empty) {
217     //        append entity-header to existing header fields
218     //        read entity-header
219     //    }
220     //    Content-Length := length
221     //    Remove "chunked" from Transfer-Encoding
222     //
223 
224     //    Chunked-Body   = *chunk
225     //                      last-chunk
226     //                      trailer
227     //                      CRLF
228     //            
229     //    chunk          = chunk-size [ chunk-extension ] CRLF
230     //                     chunk-data CRLF
231     //                     chunk-size     = 1*HEX
232     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
233     //        
234     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
235     //    chunk-ext-name = token
236     //    chunk-ext-val  = token | quoted-string
237     //    chunk-data     = chunk-size(OCTET)
238     //    trailer        = *(entity-header CRLF)
239 
240     alias eType = ubyte;
241     immutable eType[] CRLF = ['\r', '\n'];
242     private {
243         enum         States {huntingSize, huntingSeparator, receiving, trailer};
244         char         state = States.huntingSize;
245         size_t       chunk_size, to_receive;
246         Buffer!ubyte buff;
247         ubyte[]      linebuff;
248     }
249     void putNoCopy(eType[] data) {
250         assert(0, "Not implemented");
251     }
252     void put(eType[] data) {
253         while ( data.length ) {
254             if ( state == States.trailer ) {
255                 return;
256             }
257             if ( state == States.huntingSize ) {
258                 linebuff ~= data;
259                 data.length = 0;
260                 auto s = linebuff.findSplit(CRLF);
261                 if ( !s[1].length ) {
262                     if ( linebuff.length >= 80 ) {
263                         throw new DecodingExceptioin("Can't find chunk size in the body");
264                     }
265                     continue;
266                 }
267                 string x = castFrom!(ubyte[]).to!string(s[0]);
268                 formattedRead(x, "%x", &chunk_size);
269                 tracef("Got chunk size %s", chunk_size);
270                 state = States.receiving;
271                 to_receive = chunk_size;
272                 data = s[2];
273                 if ( chunk_size == 0 ) {
274                     state = States.trailer;
275                     tracef("Unchunk completed");
276                     return;
277                 }
278                 continue;
279             }
280             if ( state == States.receiving ) {
281                 if (to_receive > 0 ) {
282                     auto can_store = min(to_receive, data.length);
283                     buff.put(data[0..can_store]);
284                     data = data[can_store..$];
285                     to_receive -= can_store;
286                     tracef("Unchunked %d bytes from %d", can_store, chunk_size);
287                     if ( to_receive == 0 ) {
288                         tracef("switch to huntig separator");
289                         state = States.huntingSeparator;
290                         to_receive = 2;
291                         linebuff.length = 0;
292                         continue;
293                     }
294                     continue;
295                 }
296                 assert(false);
297             }
298             if ( state == States.huntingSeparator ) {
299                 linebuff ~= data;
300                 data.length = 0;
301                 auto s = linebuff.findSplit(CRLF);
302                 if ( s[1].length ) {
303                     data = s[2];
304                     chunk_size = 0;
305                     linebuff.length = 0;
306                     state = States.huntingSize;
307                     tracef("switch to huntig size");
308                     continue;
309                 }
310             }
311         }
312     }
313     eType[] get() {
314         auto r = buff.__repr.__buffer[0];
315         buff.popFrontN(r.length);
316         return r;
317     }
318     void flush() {
319     }
320     bool empty() {
321         debug tracef("empty=%b", buff.empty);
322         return buff.empty;
323     }
324     bool done() {
325         return state==States.trailer;
326     }
327 }
328 
329 unittest {
330     info("Testing DataPipe");
331     globalLogLevel(LogLevel.info);
332     alias eType = char;
333     eType[] gzipped = [
334         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
335         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
336         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
337         0x08, 0x00, 0x00, 0x00
338     ]; // "abc\ndef\n"
339     auto d = new Decompressor!eType();
340     d.put(gzipped[0..2]);
341     d.put(gzipped[2..10]);
342     d.put(gzipped[10..$]);
343     d.flush();
344     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
345 
346     auto e = new Decompressor!eType();
347     e.put(gzipped[0..10]);
348     e.put(gzipped[10..$]);
349     e.flush();
350     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
351     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
352     auto dp = new DataPipe!eType;
353     dp.insert(new Decompressor!eType());
354     dp.put(gzipped[0..2]);
355     dp.put(gzipped[2..$]);
356     dp.flush();
357     assert(equal(dp.get(), "abc\ndef\n"));
358     // empty datapipe shoul just pass input to output
359     auto dpu = new DataPipe!ubyte;
360     dpu.put("abcd".dup.representation);
361     dpu.put("efgh".dup.representation);
362     dpu.flush();
363     assert(equal(dpu.get(), "abcdefgh"));
364     info("Testing DataPipe - done");
365 }
366 /**
367  * Buffer used to collect and process data from network. It remainds Appender, but support
368  * also Range interface.
369  * $(P To place data in buffer use put() method.)
370  * $(P  To retrieve data from buffer you can use several methods:)
371  * $(UL
372  *  $(LI Range methods: front, back, index [])
373  *  $(LI data method: return collected data (like Appender.data))
374  * )
375  */
376 public struct Buffer(T) {
377     private {
378         class Repr {
379             size_t         __length;
380             Unqual!T[][]   __buffer;
381             this() {
382                 __length = 0;
383             }
384             this(Repr other) {
385                 if ( other is null )
386                     return;
387                 __length = other.__length;
388                 __buffer = other.__buffer.dup;
389             }
390         }
391         Repr __repr;
392     }
393 
394     alias toString = data!string;
395 
396     this(this) {
397         __repr = new Repr(__repr);
398     }
399     this(U)(U[] data) pure {
400         put(data);
401     }
402    ~this() {
403         __repr = null;
404     }
405     /***************
406      * store data. Data copied
407      */
408     auto put(U)(U[] data) pure {
409         if ( data.length == 0 ) {
410             return this;
411         }
412         if ( !__repr ) {
413             __repr = new Repr;
414         }
415         debug tracef("Append %d bytes", data.length);
416         static if (!is(U == T)) {
417             auto d = castFrom!(U[]).to!(T[])(data);
418             __repr.__length += d.length;
419             __repr.__buffer ~= d.dup;
420         } else {
421             __repr.__length += data.length;
422             __repr.__buffer ~= data.dup;
423         }
424         return this;
425     }
426     auto putNoCopy(U)(U[] data) pure {
427         if ( data.length == 0 ) {
428             return this;
429         }
430         if ( !__repr ) {
431             __repr = new Repr;
432         }
433         debug tracef("Append %d bytes", data.length);
434         static if (!is(U == T)) {
435             auto d = castFrom!(U[]).to!(T[])(data);
436             __repr.__length += d.length;
437             __repr.__buffer ~= d;
438         } else {
439             __repr.__length += data.length;
440             __repr.__buffer ~= data;
441         }
442         return this;
443     }
444     @property auto opDollar() const pure @safe {
445         return __repr.__length;
446     }
447     @property auto length() const pure @safe {
448         if ( !__repr ) {
449             return 0;
450         }
451         return __repr.__length;
452     }
453     @property auto empty() const pure @safe {
454         return length == 0;
455     }
456     @property auto ref front() const pure @safe {
457         assert(length);
458         return __repr.__buffer.front.front;
459     }
460     @property auto ref back() const pure @safe {
461         assert(length);
462         return __repr.__buffer.back.back;
463     }
464     @property void popFront() pure @safe {
465         assert(length);
466         with ( __repr ) {
467             __buffer.front.popFront;
468             if ( __buffer.front.length == 0 ) {
469                 __buffer.popFront;
470             }
471             __length--;
472         }
473     }
474     @property void popFrontN(size_t n) pure @safe {
475         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
476         __repr.__length -= n;
477         while( n ) {
478             if ( n <= __repr.__buffer.front.length ) {
479                 __repr.__buffer.front.popFrontN(n);
480                 if ( __repr.__buffer.front.length == 0 ) {
481                     __repr.__buffer.popFront;
482                 }
483                 return;
484             }
485             n -= __repr.__buffer.front.length;
486             __repr.__buffer.popFront;
487         }
488     }
489     @property void popBack() pure @safe {
490         assert(length);
491         __repr.__buffer.back.popBack;
492         if ( __repr.__buffer.back.length == 0 ) {
493             __repr.__buffer.popBack;
494         }
495         __repr.__length--;
496     }
497     @property void popBackN(size_t n) pure @safe {
498         assert(n <= length);
499         __repr.__length -= n;
500         while( n ) {
501             if ( n <= __repr.__buffer.back.length ) {
502                 __repr.__buffer.back.popBackN(n);
503                 if ( __repr.__buffer.back.length == 0 ) {
504                     __repr.__buffer.popBack;
505                 }
506                 return;
507             }
508             n -= __repr.__buffer.back.length;
509             __repr.__buffer.popBack;
510         }
511     }
512     @property auto save() pure @safe {
513         auto n = Buffer!T();
514         n.__repr = new Repr(__repr);
515         return n;
516     }
517     @property auto ref opIndex(size_t n) const pure @safe {
518         assert( __repr && n < __repr.__length );
519         foreach(b; __repr.__buffer) {
520             if ( n < b.length ) {
521                 return b[n];
522             }
523             n -= b.length;
524         }
525         assert(false, "Impossible");
526     }
527     Buffer!T opSlice(size_t m, size_t n) {
528         assert( m <= n && n <= __repr.__length);
529         auto res = Buffer!T();
530         if ( m == n ) {
531             res.__repr = new Repr;
532             return res;
533         }
534         res.__repr = new Repr(this.__repr);
535         res.popBackN(res.length-n);
536         res.popFrontN(m);
537         return res;
538     }
539 //    ptrdiff_t countUntil(in T[] needle) const pure @safe {
540 //        ptrdiff_t haystackpos, needlepos;
541 //        while(haystackpos < length) {
542 //            if ( opIndex(haystackpos) == needle[needlepos] ) {
543 //
544 //                return haystackpos;
545 //            } else {
546 //                needlepos = 0;
547 //                haystackpos++;
548 //            }
549 //        }
550 //        return -1;
551 //    }
552     @property auto data(U=T[])() const pure {
553         static if ( is(U==T[]) ) {
554             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
555                 return __repr.__buffer.front.dup;
556             }
557         }
558         Appender!(T[]) a;
559         if ( __repr && __repr.__buffer ) {
560             foreach(ref b; __repr.__buffer) {
561                 a.put(b);
562             }
563         }
564         static if ( is(U==T[]) ) {
565             return a.data;
566         } else {
567             return castFrom!(T[]).to!U(a.data);
568         }
569     }
570     string opCast(string)() {
571         return this.toString;
572     }
573     bool opEquals(U)(U x) {
574         return cast(U)this == x;
575     }
576 }
577 ///
578 public unittest {
579 
580     static assert(isInputRange!(Buffer!ubyte));
581     static assert(isForwardRange!(Buffer!ubyte));
582     static assert(hasLength!(Buffer!ubyte));
583     static assert(hasSlicing!(Buffer!ubyte));
584     static assert(isBidirectionalRange!(Buffer!ubyte));
585     static assert(isRandomAccessRange!(Buffer!ubyte));
586     
587     auto b = Buffer!ubyte();
588     b.put("abc".representation.dup);
589     b.put("def".representation.dup);
590     assert(b.length == 6);
591     assert(b.toString == "abcdef");
592     assert(b.front == 'a');
593     assert(b.back == 'f');
594     assert(equal(b[0..$], "abcdef"));
595     assert(equal(b[$-2..$], "ef"));
596     assert(b == "abcdef");
597     b.popFront;
598     b.popBack;
599     assert(b.front == 'b');
600     assert(b.back == 'e');
601     assert(b.length == 4);
602     assert(retro(b).front == 'e');
603     assert(countUntil(b, 'e') == 3);
604     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
605     assert(equal(b, "bcde"));
606     b.popFront; b.popFront;
607     assert(b.front == 'd');
608     assert(b.front == b[0]);
609     assert(b.back == b[$-1]);
610 
611     auto c = Buffer!ubyte();
612     c.put("Header0: value0\n".representation.dup);
613     c.put("Header1: value1\n".representation.dup);
614     c.put("Header2: value2\n\nbody".representation.dup);
615     auto c_length = c.length;
616     auto eoh = countUntil(c, "\n\n");
617     assert(eoh == 47);
618     foreach(header; c[0..eoh].splitter('\n') ) {
619         writeln(castFrom!(ubyte[]).to!(string)(header.data));
620     }
621     assert(equal(findSplit(c, "\n\n")[2], "body"));
622     assert(c.length == c_length);
623 }
624 
625 extern(C) {
626     int SSL_library_init();
627     void OpenSSL_add_all_ciphers();
628     void OpenSSL_add_all_digests();
629     void SSL_load_error_strings();
630     
631     struct SSL {}
632     struct SSL_CTX {}
633     struct SSL_METHOD {}
634     
635     SSL_CTX* SSL_CTX_new(const SSL_METHOD* method);
636     SSL* SSL_new(SSL_CTX*);
637     int SSL_set_fd(SSL*, int);
638     int SSL_connect(SSL*);
639     int SSL_write(SSL*, const void*, int);
640     int SSL_read(SSL*, void*, int);
641     int SSL_shutdown(SSL*) @trusted @nogc nothrow;
642     void SSL_free(SSL*);
643     void SSL_CTX_free(SSL_CTX*);
644     
645     long    SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
646     
647     long SSL_CTX_set_mode(SSL_CTX *ctx, long mode);
648     long SSL_set_mode(SSL *ssl, long mode);
649     
650     long SSL_CTX_get_mode(SSL_CTX *ctx);
651     long SSL_get_mode(SSL *ssl);
652     
653     SSL_METHOD* SSLv3_client_method();
654     SSL_METHOD* TLSv1_2_client_method();
655     SSL_METHOD* TLSv1_client_method();
656 }
657 
658 //pragma(lib, "crypto");
659 //pragma(lib, "ssl");
660 
661 shared static this() {
662     SSL_library_init();
663     OpenSSL_add_all_ciphers();
664     OpenSSL_add_all_digests();
665     SSL_load_error_strings();
666 }
667 
668 public class OpenSslSocket : Socket {
669     enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
670     private SSL* ssl;
671     private SSL_CTX* ctx;
672     private void initSsl() {
673         //ctx = SSL_CTX_new(SSLv3_client_method());
674         ctx = SSL_CTX_new(TLSv1_client_method());
675         assert(ctx !is null);
676         
677         //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
678         //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
679         ssl = SSL_new(ctx);
680         SSL_set_fd(ssl, this.handle);
681     }
682     
683     @trusted
684     override void connect(Address to) {
685         super.connect(to);
686         if(SSL_connect(ssl) == -1)
687             throw new Exception("ssl connect failed");
688     }
689     
690     @trusted
691     override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
692         return SSL_write(ssl, buf.ptr, cast(uint) buf.length);
693     }
694     override ptrdiff_t send(const(void)[] buf) {
695         return send(buf, SocketFlags.NONE);
696     }
697     @trusted
698     override ptrdiff_t receive(void[] buf, SocketFlags flags) {
699         return SSL_read(ssl, buf.ptr, cast(int)buf.length);
700     }
701     override ptrdiff_t receive(void[] buf) {
702         return receive(buf, SocketFlags.NONE);
703     }
704     this(AddressFamily af, SocketType type = SocketType.STREAM) {
705         super(af, type);
706         initSsl();
707     }
708     
709     this(socket_t sock, AddressFamily af) {
710         super(sock, af);
711         initSsl();
712     }
713     override void close() {
714         //SSL_shutdown(ssl);
715         super.close();
716     }
717     ~this() {
718         SSL_free(ssl);
719         SSL_CTX_free(ctx);
720     }
721 }
722 
723 public abstract class SocketStream {
724     private {
725         Duration timeout;
726         Socket   s;
727         bool     __isOpen;
728         bool     __isConnected;
729     }
730     void open(AddressFamily fa) {
731     }
732     @property ref Socket so() @safe pure {
733         return s;
734     }
735     @property bool isOpen() @safe @nogc pure const {
736         return s && __isOpen;
737     }
738     @property bool isConnected() @safe @nogc pure const {
739         return s && __isConnected;
740     }
741     void close() @trusted {
742         tracef("Close socket");
743         if ( isOpen ) {
744             s.close();
745             __isOpen = false;
746             __isConnected = false;
747         }
748         s = null;
749     }
750     
751     auto connect(string host, ushort port, Duration timeout = 10.seconds) {
752         tracef(format("Create connection to %s:%d", host, port));
753         Address[] addresses;
754         __isConnected = false;
755         try {
756             addresses = getAddress(host, port);
757         } catch (Exception e) {
758             errorf("Failed to connect: can't resolve %s - %s", host, e.msg);
759             throw new ConnectError("Can't connect to %s:%d: %s".format(host, port, e.msg));
760         }
761         foreach(a; addresses) {
762             tracef("Trying %s", a);
763             try {
764                 open(a.addressFamily);
765                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
766                 s.connect(a);
767                 tracef("Connected to %s", a);
768                 __isConnected = true;
769                 break;
770             } catch (SocketException e) {
771                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
772                 s.close();
773             }
774         }
775         if ( !__isConnected ) {
776             throw new ConnectError("Can't connect to %s:%d".format(host, port));
777         }
778         return this;
779     }
780     
781     ptrdiff_t send(const(void)[] buff) @safe
782     in {assert(isConnected);}
783     body {
784         return s.send(buff);
785     }
786     
787     ptrdiff_t receive(void[] buff) @safe {
788         auto r = s.receive(buff);
789         if ( r > 0 ) {
790             buff.length = r;
791         }
792         return r;
793     }
794 }
795 
796 public class SSLSocketStream: SocketStream {
797     override void open(AddressFamily fa) {
798         if ( s !is null ) {
799             s.close();
800         }
801         s = new OpenSslSocket(fa);
802         assert(s !is null, "Can't create socket");
803         __isOpen = true;
804     }
805 }
806 
807 public class TCPSocketStream : SocketStream {
808     override void open(AddressFamily fa) {
809         if ( s !is null ) {
810             s.close();
811         }
812         s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
813         assert(s !is null, "Can't create socket");
814         __isOpen = true;
815     }
816 }
817