1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 
19 alias InDataHandler = DataPipeIface!ubyte;
20 
21 public class ConnectError: Exception {
22     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
23         super(msg, file, line);
24     }
25 }
26 
27 class DecodingExceptioin: Exception {
28     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure {
29         super(msg, file, line);
30     }
31 }
32 /**
33  * DataPipeIface can accept some data, process, and return processed data.
34  */
35 public interface DataPipeIface(E) {
36     /// Is there any processed data ready for reading?
37     bool empty();
38     /// Put next data portion for processing
39     //void put(E[]);
40     void putNoCopy(E[]);
41     /// Get any ready data
42     E[] get();
43     /// Signal on end of incoming data stream.
44     void flush();
45 }
46 /**
47  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
48  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
49  * and uncompress Content-Encoding "gzip".
50  */
51 public class DataPipe(E) : DataPipeIface!E {
52 
53     DataPipeIface!(E)[]  pipe;
54     Buffer!E             buffer;
55     /// Append data processor to pipeline
56     /// Params:
57     /// p = processor
58     void insert(DataPipeIface!E p) {
59         pipe ~= p;
60     }
61     E[][] process(DataPipeIface!E p, E[][] data) {
62         E[][] result;
63         data.each!(e => p.putNoCopy(e));
64         while(!p.empty()) result ~= p.get();
65         return result;
66     }
67     /// Process next data portion. Data passed over pipeline and store result in buffer.
68     /// Params:
69     /// data = input data buffer.
70     /// NoCopy means we do not copy data to buffer, we keep reference
71     void putNoCopy(E[] data) {
72         if ( pipe.empty ) {
73             buffer.putNoCopy(data);
74             return;
75         }
76         auto t = process(pipe.front, [data]);
77         foreach(ref p; pipe[1..$]) {
78             t = process(p, t);
79         }
80         t.each!(b => buffer.putNoCopy(b));
81     }
82     /// Get what was collected in internal buffer and clear it. 
83     /// Returns:
84     /// data collected.
85     E[] get() {
86         if ( buffer.empty ) {
87             return E[].init;
88         }
89         auto res = buffer.data;
90         buffer = Buffer!E.init;
91         return res;
92     }
93     ///
94     /// get without datamove. but user receive [][]
95     /// 
96     E[][] getNoCopy()  {
97         if ( buffer.empty ) {
98             return E[][].init;
99         }
100         E[][] res = buffer.__repr.__buffer;
101         buffer = Buffer!E.init;
102         return res;
103     }
104     /// Test if internal buffer is empty
105     /// Returns:
106     /// true if internal buffer is empty (nothing to get())
107     bool empty() pure const @safe {
108         return buffer.empty;
109     }
110     void flush() {
111         E[][] product;
112         foreach(ref p; pipe) {
113             product.each!(e => p.putNoCopy(e));
114             p.flush();
115             product.length = 0;
116             while( !p.empty ) product ~= p.get();
117         }
118         product.each!(b => buffer.putNoCopy(b));
119     }
120 }
121 
122 /**
123  * Processor for gzipped/compressed content.
124  * Also support InputRange interface.
125  */
126 public class Decompressor(E) : DataPipeIface!E {
127     private {
128         Buffer!ubyte __buff;
129         UnCompress   __zlib;
130     }
131     this() {
132         __buff = Buffer!ubyte();
133         __zlib = new UnCompress();
134     }
135     override void putNoCopy(E[] data) {
136         if ( __zlib is null  ) {
137             __zlib = new UnCompress();
138         }
139         __buff.putNoCopy(__zlib.uncompress(data));
140     }
141     override E[] get() pure {
142         assert(__buff.length);
143         auto r = __buff.__repr.__buffer[0];
144         __buff.popFrontN(r.length);
145         return cast(E[])r;
146     }
147     override void flush() {
148         if ( __zlib is null  ) {
149             return;
150         }
151         __buff.put(__zlib.flush());
152     }
153     override @property bool empty() const pure @safe {
154         debug tracef("empty=%b", __buff.empty);
155         return __buff.empty;
156     }
157     @property auto ref front() pure const @safe {
158         debug tracef("front: buff length=%d", __buff.length);
159         return __buff.front;
160     }
161     @property auto popFront() pure @safe {
162         debug tracef("popFront: buff length=%d", __buff.length);
163         return __buff.popFront;
164     }
165     @property void popFrontN(size_t n) pure @safe {
166         __buff.popFrontN(n);
167     }
168 }
169 
170 /**
171  * Unchunk chunked http responce body.
172  */
173 public class DecodeChunked : DataPipeIface!ubyte {
174     //    length := 0
175     //    read chunk-size, chunk-extension (if any) and CRLF
176     //    while (chunk-size > 0) {
177     //        read chunk-data and CRLF
178     //        append chunk-data to entity-body
179     //        length := length + chunk-size
180     //        read chunk-size and CRLF
181     //    }
182     //    read entity-header
183     //    while (entity-header not empty) {
184     //        append entity-header to existing header fields
185     //        read entity-header
186     //    }
187     //    Content-Length := length
188     //    Remove "chunked" from Transfer-Encoding
189     //
190 
191     //    Chunked-Body   = *chunk
192     //                      last-chunk
193     //                      trailer
194     //                      CRLF
195     //            
196     //    chunk          = chunk-size [ chunk-extension ] CRLF
197     //                     chunk-data CRLF
198     //                     chunk-size     = 1*HEX
199     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
200     //        
201     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
202     //    chunk-ext-name = token
203     //    chunk-ext-val  = token | quoted-string
204     //    chunk-data     = chunk-size(OCTET)
205     //    trailer        = *(entity-header CRLF)
206 
207     alias eType = ubyte;
208     immutable eType[] CRLF = ['\r', '\n'];
209     private {
210         enum         States {huntingSize, huntingSeparator, receiving, trailer};
211         char         state = States.huntingSize;
212         size_t       chunk_size, to_receive;
213         Buffer!ubyte buff;
214         ubyte[]      linebuff;
215     }
216     void putNoCopy(eType[] data) {
217         while ( data.length ) {
218             if ( state == States.trailer ) {
219                 return;
220             }
221             if ( state == States.huntingSize ) {
222                 import std.ascii;
223                 ubyte[10] digits;
224                 int i;
225                 for(i=0;i<data.length;i++) {
226                     ubyte v = data[i];
227                     digits[i] = v;
228                     if ( v == '\n' ) {
229                         i+=1;
230                         break;
231                     }
232                 }
233                 linebuff ~= digits[0..i];
234                 if ( linebuff.length >= 80 ) {
235                     throw new DecodingExceptioin("Can't find chunk size in the body");
236                 }
237                 data = data[i..$];
238 
239                 if (!linebuff.canFind(CRLF)) {
240                     continue;
241                 }
242                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
243                 state = States.receiving;
244                 to_receive = chunk_size;
245                 if ( chunk_size == 0 ) {
246                     state = States.trailer;
247                     return;
248                 }
249                 continue;
250             }
251             if ( state == States.receiving ) {
252                 if (to_receive > 0 ) {
253                     auto can_store = min(to_receive, data.length);
254                     buff.putNoCopy(data[0..can_store]);
255                     data = data[can_store..$];
256                     to_receive -= can_store;
257                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
258                     if ( to_receive == 0 ) {
259                         //tracef("switch to huntig separator");
260                         state = States.huntingSeparator;
261                         continue;
262                     }
263                     continue;
264                 }
265                 assert(false);
266             }
267             if ( state == States.huntingSeparator ) {
268                 if ( data[0] == '\n' || data[0]=='\r') {
269                     data = data[1..$];
270                     continue;
271                 }
272                 state = States.huntingSize;
273                 linebuff.length = 0;
274                 continue;
275             }
276         }
277     }
278     eType[] get() {
279         auto r = buff.__repr.__buffer[0];
280         buff.popFrontN(r.length);
281         return r;
282     }
283     void flush() {
284     }
285     bool empty() {
286         debug tracef("empty=%b", buff.empty);
287         return buff.empty;
288     }
289     bool done() {
290         return state==States.trailer;
291     }
292 }
293 
294 unittest {
295     info("Testing DataPipe");
296     globalLogLevel(LogLevel.info);
297     alias eType = char;
298     eType[] gzipped = [
299         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
300         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
301         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
302         0x08, 0x00, 0x00, 0x00
303     ]; // "abc\ndef\n"
304     auto d = new Decompressor!eType();
305     d.putNoCopy(gzipped[0..2].dup);
306     d.putNoCopy(gzipped[2..10].dup);
307     d.putNoCopy(gzipped[10..$].dup);
308     d.flush();
309     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
310 
311     auto e = new Decompressor!eType();
312     e.putNoCopy(gzipped[0..10].dup);
313     e.putNoCopy(gzipped[10..$].dup);
314     e.flush();
315     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
316     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
317     auto dp = new DataPipe!eType;
318     dp.insert(new Decompressor!eType());
319     dp.putNoCopy(gzipped[0..2].dup);
320     dp.putNoCopy(gzipped[2..$].dup);
321     dp.flush();
322     assert(equal(dp.get(), "abc\ndef\n"));
323     // empty datapipe shoul just pass input to output
324     auto dpu = new DataPipe!ubyte;
325     dpu.putNoCopy("abcd".dup.representation);
326     dpu.putNoCopy("efgh".dup.representation);
327     dpu.flush();
328     assert(equal(dpu.get(), "abcdefgh"));
329     info("Test unchunker properties");
330     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n".dup.representation;
331     ubyte[][] result;
332     auto uc = new DecodeChunked();
333     uc.putNoCopy(twoChunks);
334     while(!uc.empty) {
335         result ~= uc.get();
336     }
337     assert(equal(result[0], ['1', '2']));
338     assert(equal(result[1], ['3', '4']));
339     info("unchunker correctness - ok");
340     result[0][0] = '5';
341     assert(twoChunks[3] == '5');
342     info("unchunker zero copy - ok");
343     info("Testing DataPipe - done");
344 }
345 /**
346  * Buffer used to collect and process data from network. It remainds Appender, but support
347  * also Range interface.
348  * $(P To place data in buffer use put() method.)
349  * $(P  To retrieve data from buffer you can use several methods:)
350  * $(UL
351  *  $(LI Range methods: front, back, index [])
352  *  $(LI data method: return collected data (like Appender.data))
353  * )
354  */
355 static this() {
356 }
357 static ~this() {
358 }
359 enum   CACHESIZE = 1024;
360 
361 static long reprAlloc;
362 static long reprCacheHit;
363 static long reprCacheRequests;
364 
365 
366 public struct Buffer(T) {
367 //    static Repr[CACHESIZE]  cache;
368 //    static uint             cacheIndex;
369 
370     private {
371         Repr  cachedOrNew() {
372             return new Repr;
373 //            reprCacheRequests++;
374 //            if ( false && cacheIndex>0 ) {
375 //                reprCacheHit++;
376 //                cacheIndex -= 1;
377 //                return cache[cacheIndex];
378 //            } else {
379 //                return new Repr;
380 //            }
381         }
382         class Repr {
383             size_t         __length;
384             Unqual!T[][]   __buffer;
385             this() {
386                 reprAlloc++;
387                 __length = 0;
388             }
389             this(Repr other) {
390                 reprAlloc++;
391                 if ( other is null )
392                     return;
393                 __length = other.__length;
394                 __buffer = other.__buffer.dup;
395             }
396         }
397         Repr __repr;
398     }
399     
400     alias toString = data!string;
401     
402     this(this) {
403         if ( !__repr ) {
404             return;
405         }
406         __repr = new Repr(__repr);
407     }
408     this(U)(U[] data) {
409         put(data);
410     }
411     ~this() {
412         __repr = null;
413 //        if ( cacheIndex >= CACHESIZE ) {
414 //            __repr = null;
415 //            return;
416 //        }
417 //        if ( __repr ) {
418 //            __repr.__length = 0;
419 //            __repr.__buffer = null;
420 //            cache[cacheIndex] = __repr;
421 //            cacheIndex += 1;
422 //            __repr = null;
423 //        }
424     }
425     /***************
426      * store data. Data copied
427      */
428     auto put(U)(U[] data) {
429         if ( data.length == 0 ) {
430             return;
431         }
432         if ( !__repr ) {
433             __repr = cachedOrNew();
434         }
435         static if (!is(U == T)) {
436             auto d = castFrom!(U[]).to!(T[])(data);
437             __repr.__length += d.length;
438             __repr.__buffer ~= d.dup;
439         } else {
440             __repr.__length += data.length;
441             __repr.__buffer ~= data.dup;
442         }
443         return;
444     }
445     auto putNoCopy(U)(U[] data) {
446         if ( data.length == 0 ) {
447             return;
448         }
449         if ( !__repr ) {
450             __repr = cachedOrNew();
451         }
452         static if (!is(U == T)) {
453             auto d = castFrom!(U[]).to!(T[])(data);
454             __repr.__length += d.length;
455             __repr.__buffer ~= d;
456         } else {
457             __repr.__length += data.length;
458             __repr.__buffer ~= data;
459         }
460         return;
461     }
462     @property auto opDollar() const pure @safe {
463         return __repr.__length;
464     }
465     @property auto length() const pure @safe {
466         if ( !__repr ) {
467             return 0;
468         }
469         return __repr.__length;
470     }
471     @property auto empty() const pure @safe {
472         return length == 0;
473     }
474     @property auto ref front() const pure @safe {
475         assert(length);
476         return __repr.__buffer.front.front;
477     }
478     @property auto ref back() const pure @safe {
479         assert(length);
480         return __repr.__buffer.back.back;
481     }
482     @property void popFront() pure @safe {
483         assert(length);
484         with ( __repr ) {
485             __buffer.front.popFront;
486             if ( __buffer.front.length == 0 ) {
487                 __buffer.popFront;
488             }
489             __length--;
490         }
491     }
492     @property void popFrontN(size_t n) pure @safe {
493         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
494         __repr.__length -= n;
495         while( n ) {
496             if ( n <= __repr.__buffer.front.length ) {
497                 __repr.__buffer.front.popFrontN(n);
498                 if ( __repr.__buffer.front.length == 0 ) {
499                     __repr.__buffer.popFront;
500                 }
501                 return;
502             }
503             n -= __repr.__buffer.front.length;
504             __repr.__buffer.popFront;
505         }
506     }
507     @property void popBack() pure @safe {
508         assert(length);
509         __repr.__buffer.back.popBack;
510         if ( __repr.__buffer.back.length == 0 ) {
511             __repr.__buffer.popBack;
512         }
513         __repr.__length--;
514     }
515     @property void popBackN(size_t n) pure @safe {
516         assert(n <= length, "n: %d, length: %d".format(n, length));
517         __repr.__length -= n;
518         while( n ) {
519             if ( n <= __repr.__buffer.back.length ) {
520                 __repr.__buffer.back.popBackN(n);
521                 if ( __repr.__buffer.back.length == 0 ) {
522                     __repr.__buffer.popBack;
523                 }
524                 return;
525             }
526             n -= __repr.__buffer.back.length;
527             __repr.__buffer.popBack;
528         }
529     }
530     @property auto save() @safe {
531         auto n = Buffer!T();
532         n.__repr = new Repr(__repr);
533         return n;
534     }
535     @property auto ref opIndex(size_t n) const pure @safe {
536         assert( __repr && n < __repr.__length );
537         foreach(b; __repr.__buffer) {
538             if ( n < b.length ) {
539                 return b[n];
540             }
541             n -= b.length;
542         }
543         assert(false, "Impossible");
544     }
545     Buffer!T opSlice(size_t m, size_t n) {
546         if ( empty || m == n ) {
547             return Buffer!T();
548         }
549         assert( m <= n && n <= __repr.__length);
550         auto res = this.save();
551         res.popBackN(res.__repr.__length-n);
552         res.popFrontN(m);
553         return res;
554     }
555     @property auto data(U=T[])() pure {
556         static if ( is(U==T[]) ) {
557             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
558                 return __repr.__buffer.front;
559             }
560         }
561         Appender!(T[]) a;
562         if ( __repr && __repr.__buffer ) {
563             foreach(ref b; __repr.__buffer) {
564                 a.put(b);
565             }
566         }
567         static if ( is(U==T[]) ) {
568             return a.data;
569         } else {
570             return castFrom!(T[]).to!U(a.data);
571         }
572     }
573     string opCast(string)() {
574         return this.toString;
575     }
576     bool opEquals(U)(U x) {
577         return cast(U)this == x;
578     }
579 
580 }
581 ///
582 public unittest {
583 
584     static assert(isInputRange!(Buffer!ubyte));
585     static assert(isForwardRange!(Buffer!ubyte));
586     static assert(hasLength!(Buffer!ubyte));
587     static assert(hasSlicing!(Buffer!ubyte));
588     static assert(isBidirectionalRange!(Buffer!ubyte));
589     static assert(isRandomAccessRange!(Buffer!ubyte));
590     
591     auto b = Buffer!ubyte();
592     b.put("abc".representation.dup);
593     b.put("def".representation.dup);
594     assert(b.length == 6);
595     assert(b.toString == "abcdef");
596     assert(b.front == 'a');
597     assert(b.back == 'f');
598     assert(equal(b[0..$], "abcdef"));
599     assert(equal(b[$-2..$], "ef"));
600     assert(b == "abcdef");
601     b.popFront;
602     b.popBack;
603     assert(b.front == 'b');
604     assert(b.back == 'e');
605     assert(b.length == 4);
606     assert(retro(b).front == 'e');
607     assert(countUntil(b, 'e') == 3);
608     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
609     assert(equal(b, "bcde"));
610     b.popFront; b.popFront;
611     assert(b.front == 'd');
612     assert(b.front == b[0]);
613     assert(b.back == b[$-1]);
614 
615     auto c = Buffer!ubyte();
616     c.put("Header0: value0\n".representation.dup);
617     c.put("Header1: value1\n".representation.dup);
618     c.put("Header2: value2\n\nbody".representation.dup);
619     auto c_length = c.length;
620     auto eoh = countUntil(c, "\n\n");
621     assert(eoh == 47);
622     foreach(header; c[0..eoh].splitter('\n') ) {
623         writeln(castFrom!(ubyte[]).to!(string)(header.data));
624     }
625     assert(equal(findSplit(c, "\n\n")[2], "body"));
626     assert(c.length == c_length);
627 }
628 
629 extern(C) {
630     int SSL_library_init();
631     void OpenSSL_add_all_ciphers();
632     void OpenSSL_add_all_digests();
633     void SSL_load_error_strings();
634     
635     struct SSL {}
636     struct SSL_CTX {}
637     struct SSL_METHOD {}
638     
639     SSL_CTX* SSL_CTX_new(const SSL_METHOD* method);
640     SSL* SSL_new(SSL_CTX*);
641     int SSL_set_fd(SSL*, int);
642     int SSL_connect(SSL*);
643     int SSL_write(SSL*, const void*, int);
644     int SSL_read(SSL*, void*, int);
645     int SSL_shutdown(SSL*) @trusted @nogc nothrow;
646     void SSL_free(SSL*);
647     void SSL_CTX_free(SSL_CTX*);
648     
649     long    SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
650     
651     long SSL_CTX_set_mode(SSL_CTX *ctx, long mode);
652     long SSL_set_mode(SSL *ssl, long mode);
653     
654     long SSL_CTX_get_mode(SSL_CTX *ctx);
655     long SSL_get_mode(SSL *ssl);
656     
657     SSL_METHOD* SSLv3_client_method();
658     SSL_METHOD* TLSv1_2_client_method();
659     SSL_METHOD* TLSv1_client_method();
660 }
661 
662 //pragma(lib, "crypto");
663 //pragma(lib, "ssl");
664 
665 shared static this() {
666     SSL_library_init();
667     OpenSSL_add_all_ciphers();
668     OpenSSL_add_all_digests();
669     SSL_load_error_strings();
670 }
671 
672 public class OpenSslSocket : Socket {
673     enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
674     private SSL* ssl;
675     private SSL_CTX* ctx;
676     private void initSsl() {
677         //ctx = SSL_CTX_new(SSLv3_client_method());
678         ctx = SSL_CTX_new(TLSv1_client_method());
679         assert(ctx !is null);
680         
681         //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
682         //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
683         ssl = SSL_new(ctx);
684         SSL_set_fd(ssl, this.handle);
685     }
686     
687     @trusted
688     override void connect(Address to) {
689         super.connect(to);
690         if(SSL_connect(ssl) == -1)
691             throw new Exception("ssl connect failed");
692     }
693     
694     @trusted
695     override ptrdiff_t send(const(void)[] buf, SocketFlags flags) {
696         return SSL_write(ssl, buf.ptr, cast(uint) buf.length);
697     }
698     override ptrdiff_t send(const(void)[] buf) {
699         return send(buf, SocketFlags.NONE);
700     }
701     @trusted
702     override ptrdiff_t receive(void[] buf, SocketFlags flags) {
703         return SSL_read(ssl, buf.ptr, cast(int)buf.length);
704     }
705     override ptrdiff_t receive(void[] buf) {
706         return receive(buf, SocketFlags.NONE);
707     }
708     this(AddressFamily af, SocketType type = SocketType.STREAM) {
709         super(af, type);
710         initSsl();
711     }
712     
713     this(socket_t sock, AddressFamily af) {
714         super(sock, af);
715         initSsl();
716     }
717     override void close() {
718         //SSL_shutdown(ssl);
719         super.close();
720     }
721     ~this() {
722         SSL_free(ssl);
723         SSL_CTX_free(ctx);
724     }
725 }
726 
727 public abstract class SocketStream {
728     private {
729         Duration timeout;
730         Socket   s;
731         bool     __isOpen;
732         bool     __isConnected;
733     }
734     void open(AddressFamily fa) {
735     }
736     @property ref Socket so() @safe pure {
737         return s;
738     }
739     @property bool isOpen() @safe @nogc pure const {
740         return s && __isOpen;
741     }
742     @property bool isConnected() @safe @nogc pure const {
743         return s && __isConnected;
744     }
745     void close() @trusted {
746         tracef("Close socket");
747         if ( isOpen ) {
748             s.close();
749             __isOpen = false;
750             __isConnected = false;
751         }
752         s = null;
753     }
754     
755     auto connect(string host, ushort port, Duration timeout = 10.seconds) {
756         tracef(format("Create connection to %s:%d", host, port));
757         Address[] addresses;
758         __isConnected = false;
759         try {
760             addresses = getAddress(host, port);
761         } catch (Exception e) {
762             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
763         }
764         foreach(a; addresses) {
765             tracef("Trying %s", a);
766             try {
767                 open(a.addressFamily);
768                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
769                 s.connect(a);
770                 tracef("Connected to %s", a);
771                 __isConnected = true;
772                 break;
773             } catch (SocketException e) {
774                 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
775                 s.close();
776             }
777         }
778         if ( !__isConnected ) {
779             throw new ConnectError("Can't connect to %s:%d".format(host, port));
780         }
781         return this;
782     }
783     
784     ptrdiff_t send(const(void)[] buff) @safe
785     in {assert(isConnected);}
786     body {
787         return s.send(buff);
788     }
789     
790     ptrdiff_t receive(void[] buff) @safe {
791         auto r = s.receive(buff);
792         if ( r > 0 ) {
793             buff.length = r;
794         }
795         return r;
796     }
797 }
798 
799 public class SSLSocketStream: SocketStream {
800     override void open(AddressFamily fa) {
801         if ( s !is null ) {
802             s.close();
803         }
804         s = new OpenSslSocket(fa);
805         assert(s !is null, "Can't create socket");
806         __isOpen = true;
807     }
808 }
809 
810 public class TCPSocketStream : SocketStream {
811     override void open(AddressFamily fa) {
812         if ( s !is null ) {
813             s.close();
814         }
815         s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
816         assert(s !is null, "Can't create socket");
817         __isOpen = true;
818     }
819 }
820