1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 19 alias InDataHandler = DataPipeIface!ubyte; 20 21 public class ConnectError: Exception { 22 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 23 super(msg, file, line); 24 } 25 } 26 27 class DecodingExceptioin: Exception { 28 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 29 super(msg, file, line); 30 } 31 } 32 /** 33 * DataPipeIface can accept some data, process, and return processed data. 34 */ 35 public interface DataPipeIface(E) { 36 /// Is there any processed data ready for reading? 37 bool empty(); 38 /// Put next data portion for processing 39 void put(E[]); 40 void putNoCopy(E[]); 41 /// Get any ready data 42 E[] get(); 43 /// Signal on end of incoming data stream. 44 void flush(); 45 } 46 /** 47 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 48 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 49 * and uncompress Content-Encoding "gzip". 50 */ 51 public class DataPipe(E) : DataPipeIface!E { 52 53 DataPipeIface!(E)[] pipe; 54 Buffer!E buffer; 55 /// Append data processor to pipeline 56 /// Params: 57 /// p = processor 58 void insert(DataPipeIface!E p) { 59 pipe ~= p; 60 } 61 E[][] process(DataPipeIface!E p, E[][] data) { 62 E[][] result; 63 data.each!(e => p.put(e)); 64 while(!p.empty()) result ~= p.get(); 65 return result; 66 } 67 /// Process next data portion. Data passed over pipeline and result stored in buffer. 68 /// Params: 69 /// data = input data array. 70 void put(E[] data) { 71 if ( pipe.empty ) { 72 buffer.put(data); 73 return; 74 } 75 auto t = process(pipe.front, [data]); 76 foreach(ref p; pipe[1..$]) { 77 t = process(p, t); 78 } 79 t.each!(b => buffer.put(b)); 80 } 81 void putNoCopy(E[] data) { 82 if ( pipe.empty ) { 83 buffer.putNoCopy(data); 84 return; 85 } 86 auto t = process(pipe.front, [data]); 87 foreach(ref p; pipe[1..$]) { 88 t = process(p, t); 89 } 90 t.each!(b => buffer.putNoCopy(b)); 91 } 92 /// Process next data portion. Data passed over pipeline and store result in buffer. 93 /// Params: 94 /// buff = input data buffer. 95 void put(Buffer!E buff) { 96 if ( pipe.empty ) { 97 if ( buffer.__repr is null ) { 98 buffer = buff; 99 return; 100 } 101 buffer.__repr.__buffer ~= buff.__repr.__buffer; 102 buffer.__repr.__length += buff.length; 103 return; 104 } 105 auto t = process(pipe.front, buff.__repr.__buffer); 106 foreach(ref p; pipe[1..$]) { 107 t = process(p, t); 108 } 109 t.each!(b => buffer.put(b)); 110 } 111 /// Get what was collected in internal buffer and clear it. 112 /// Returns: 113 /// data collected. 114 E[] get() pure { 115 if ( buffer.empty ) { 116 return E[].init; 117 } 118 auto res = buffer.data; 119 buffer = Buffer!E.init; 120 return res; 121 } 122 /// Test if internal buffer is empty 123 /// Returns: 124 /// true if internal buffer is empty (nothing to get()) 125 bool empty() pure const @safe { 126 return buffer.empty; 127 } 128 void flush() { 129 E[][] product; 130 foreach(ref p; pipe) { 131 product.each!(e => p.put(e)); 132 p.flush(); 133 product.length = 0; 134 while( !p.empty ) product ~= p.get(); 135 } 136 product.each!(b => buffer.put(b)); 137 } 138 } 139 140 /** 141 * Processor for gzipped/compressed content. 142 * Also support InputRange interface. 143 */ 144 public class Decompressor(E) : DataPipeIface!E { 145 private { 146 Buffer!ubyte __buff; 147 UnCompress __zlib; 148 } 149 this() { 150 __buff = Buffer!ubyte(); 151 __zlib = new UnCompress(); 152 } 153 // this(E[] r) { 154 // //__range = r; 155 // __buff = Buffer!ubyte(); 156 // __zlib = new UnCompress(); 157 // auto l = r.length; 158 // if ( l ) { 159 // __buff.put(__zlib.uncompress(r.take(l))); 160 // } 161 // } 162 override void putNoCopy(E[] data) { 163 if ( __zlib is null ) { 164 __zlib = new UnCompress(); 165 } 166 __buff.put(__zlib.uncompress(data)); 167 } 168 override void put(E[] data) { 169 if ( __zlib is null ) { 170 __zlib = new UnCompress(); 171 } 172 __buff.put(__zlib.uncompress(data)); 173 } 174 override E[] get() pure { 175 assert(__buff.length); 176 auto r = __buff.__repr.__buffer[0]; 177 __buff.popFrontN(r.length); 178 return cast(E[])r; 179 } 180 override void flush() { 181 if ( __zlib is null ) { 182 return; 183 } 184 __buff.put(__zlib.flush()); 185 } 186 override @property bool empty() const pure @safe { 187 debug tracef("empty=%b", __buff.empty); 188 return __buff.empty; 189 } 190 @property auto ref front() pure const @safe { 191 debug tracef("front: buff length=%d", __buff.length); 192 return __buff.front; 193 } 194 @property auto popFront() pure @safe { 195 debug tracef("popFront: buff length=%d", __buff.length); 196 return __buff.popFront; 197 } 198 @property void popFrontN(size_t n) pure @safe { 199 __buff.popFrontN(n); 200 } 201 } 202 203 /** 204 * Unchunk chunked http responce body. 205 */ 206 public class DecodeChunked : DataPipeIface!ubyte { 207 // length := 0 208 // read chunk-size, chunk-extension (if any) and CRLF 209 // while (chunk-size > 0) { 210 // read chunk-data and CRLF 211 // append chunk-data to entity-body 212 // length := length + chunk-size 213 // read chunk-size and CRLF 214 // } 215 // read entity-header 216 // while (entity-header not empty) { 217 // append entity-header to existing header fields 218 // read entity-header 219 // } 220 // Content-Length := length 221 // Remove "chunked" from Transfer-Encoding 222 // 223 224 // Chunked-Body = *chunk 225 // last-chunk 226 // trailer 227 // CRLF 228 // 229 // chunk = chunk-size [ chunk-extension ] CRLF 230 // chunk-data CRLF 231 // chunk-size = 1*HEX 232 // last-chunk = 1*("0") [ chunk-extension ] CRLF 233 // 234 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 235 // chunk-ext-name = token 236 // chunk-ext-val = token | quoted-string 237 // chunk-data = chunk-size(OCTET) 238 // trailer = *(entity-header CRLF) 239 240 alias eType = ubyte; 241 immutable eType[] CRLF = ['\r', '\n']; 242 private { 243 enum States {huntingSize, huntingSeparator, receiving, trailer}; 244 char state = States.huntingSize; 245 size_t chunk_size, to_receive; 246 Buffer!ubyte buff; 247 ubyte[] linebuff; 248 } 249 void putNoCopy(eType[] data) { 250 assert(0, "Not implemented"); 251 } 252 void put(eType[] data) { 253 while ( data.length ) { 254 if ( state == States.trailer ) { 255 return; 256 } 257 if ( state == States.huntingSize ) { 258 linebuff ~= data; 259 data.length = 0; 260 auto s = linebuff.findSplit(CRLF); 261 if ( !s[1].length ) { 262 if ( linebuff.length >= 80 ) { 263 throw new DecodingExceptioin("Can't find chunk size in the body"); 264 } 265 continue; 266 } 267 string x = castFrom!(ubyte[]).to!string(s[0]); 268 formattedRead(x, "%x", &chunk_size); 269 tracef("Got chunk size %s", chunk_size); 270 state = States.receiving; 271 to_receive = chunk_size; 272 data = s[2]; 273 if ( chunk_size == 0 ) { 274 state = States.trailer; 275 tracef("Unchunk completed"); 276 return; 277 } 278 continue; 279 } 280 if ( state == States.receiving ) { 281 if (to_receive > 0 ) { 282 auto can_store = min(to_receive, data.length); 283 buff.put(data[0..can_store]); 284 data = data[can_store..$]; 285 to_receive -= can_store; 286 tracef("Unchunked %d bytes from %d", can_store, chunk_size); 287 if ( to_receive == 0 ) { 288 tracef("switch to huntig separator"); 289 state = States.huntingSeparator; 290 to_receive = 2; 291 linebuff.length = 0; 292 continue; 293 } 294 continue; 295 } 296 assert(false); 297 } 298 if ( state == States.huntingSeparator ) { 299 linebuff ~= data; 300 data.length = 0; 301 auto s = linebuff.findSplit(CRLF); 302 if ( s[1].length ) { 303 data = s[2]; 304 chunk_size = 0; 305 linebuff.length = 0; 306 state = States.huntingSize; 307 tracef("switch to huntig size"); 308 continue; 309 } 310 } 311 } 312 } 313 eType[] get() { 314 auto r = buff.__repr.__buffer[0]; 315 buff.popFrontN(r.length); 316 return r; 317 } 318 void flush() { 319 } 320 bool empty() { 321 debug tracef("empty=%b", buff.empty); 322 return buff.empty; 323 } 324 bool done() { 325 return state==States.trailer; 326 } 327 } 328 329 unittest { 330 info("Testing DataPipe"); 331 globalLogLevel(LogLevel.info); 332 alias eType = char; 333 eType[] gzipped = [ 334 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 335 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 336 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 337 0x08, 0x00, 0x00, 0x00 338 ]; // "abc\ndef\n" 339 auto d = new Decompressor!eType(); 340 d.put(gzipped[0..2]); 341 d.put(gzipped[2..10]); 342 d.put(gzipped[10..$]); 343 d.flush(); 344 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 345 346 auto e = new Decompressor!eType(); 347 e.put(gzipped[0..10]); 348 e.put(gzipped[10..$]); 349 e.flush(); 350 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 351 // writeln(gzipped.decompress.filter!(a => a!='b').array); 352 auto dp = new DataPipe!eType; 353 dp.insert(new Decompressor!eType()); 354 dp.put(gzipped[0..2]); 355 dp.put(gzipped[2..$]); 356 dp.flush(); 357 assert(equal(dp.get(), "abc\ndef\n")); 358 // empty datapipe shoul just pass input to output 359 auto dpu = new DataPipe!ubyte; 360 dpu.put("abcd".dup.representation); 361 dpu.put("efgh".dup.representation); 362 dpu.flush(); 363 assert(equal(dpu.get(), "abcdefgh")); 364 info("Testing DataPipe - done"); 365 } 366 /** 367 * Buffer used to collect and process data from network. It remainds Appender, but support 368 * also Range interface. 369 * $(P To place data in buffer use put() method.) 370 * $(P To retrieve data from buffer you can use several methods:) 371 * $(UL 372 * $(LI Range methods: front, back, index []) 373 * $(LI data method: return collected data (like Appender.data)) 374 * ) 375 */ 376 public struct Buffer(T) { 377 private { 378 class Repr { 379 size_t __length; 380 Unqual!T[][] __buffer; 381 this() { 382 __length = 0; 383 } 384 this(Repr other) { 385 if ( other is null ) 386 return; 387 __length = other.__length; 388 __buffer = other.__buffer.dup; 389 } 390 } 391 Repr __repr; 392 } 393 394 alias toString = data!string; 395 396 this(this) { 397 __repr = new Repr(__repr); 398 } 399 this(U)(U[] data) pure { 400 put(data); 401 } 402 ~this() { 403 __repr = null; 404 } 405 /*************** 406 * store data. Data copied 407 */ 408 auto put(U)(U[] data) pure { 409 if ( data.length == 0 ) { 410 return this; 411 } 412 if ( !__repr ) { 413 __repr = new Repr; 414 } 415 debug tracef("Append %d bytes", data.length); 416 static if (!is(U == T)) { 417 auto d = castFrom!(U[]).to!(T[])(data); 418 __repr.__length += d.length; 419 __repr.__buffer ~= d.dup; 420 } else { 421 __repr.__length += data.length; 422 __repr.__buffer ~= data.dup; 423 } 424 return this; 425 } 426 auto putNoCopy(U)(U[] data) pure { 427 if ( data.length == 0 ) { 428 return this; 429 } 430 if ( !__repr ) { 431 __repr = new Repr; 432 } 433 debug tracef("Append %d bytes", data.length); 434 static if (!is(U == T)) { 435 auto d = castFrom!(U[]).to!(T[])(data); 436 __repr.__length += d.length; 437 __repr.__buffer ~= d; 438 } else { 439 __repr.__length += data.length; 440 __repr.__buffer ~= data; 441 } 442 return this; 443 } 444 @property auto opDollar() const pure @safe { 445 return __repr.__length; 446 } 447 @property auto length() const pure @safe { 448 if ( !__repr ) { 449 return 0; 450 } 451 return __repr.__length; 452 } 453 @property auto empty() const pure @safe { 454 return length == 0; 455 } 456 @property auto ref front() const pure @safe { 457 assert(length); 458 return __repr.__buffer.front.front; 459 } 460 @property auto ref back() const pure @safe { 461 assert(length); 462 return __repr.__buffer.back.back; 463 } 464 @property void popFront() pure @safe { 465 assert(length); 466 with ( __repr ) { 467 __buffer.front.popFront; 468 if ( __buffer.front.length == 0 ) { 469 __buffer.popFront; 470 } 471 __length--; 472 } 473 } 474 @property void popFrontN(size_t n) pure @safe { 475 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 476 __repr.__length -= n; 477 while( n ) { 478 if ( n <= __repr.__buffer.front.length ) { 479 __repr.__buffer.front.popFrontN(n); 480 if ( __repr.__buffer.front.length == 0 ) { 481 __repr.__buffer.popFront; 482 } 483 return; 484 } 485 n -= __repr.__buffer.front.length; 486 __repr.__buffer.popFront; 487 } 488 } 489 @property void popBack() pure @safe { 490 assert(length); 491 __repr.__buffer.back.popBack; 492 if ( __repr.__buffer.back.length == 0 ) { 493 __repr.__buffer.popBack; 494 } 495 __repr.__length--; 496 } 497 @property void popBackN(size_t n) pure @safe { 498 assert(n <= length); 499 __repr.__length -= n; 500 while( n ) { 501 if ( n <= __repr.__buffer.back.length ) { 502 __repr.__buffer.back.popBackN(n); 503 if ( __repr.__buffer.back.length == 0 ) { 504 __repr.__buffer.popBack; 505 } 506 return; 507 } 508 n -= __repr.__buffer.back.length; 509 __repr.__buffer.popBack; 510 } 511 } 512 @property auto save() pure @safe { 513 auto n = Buffer!T(); 514 n.__repr = new Repr(__repr); 515 return n; 516 } 517 @property auto ref opIndex(size_t n) const pure @safe { 518 assert( __repr && n < __repr.__length ); 519 foreach(b; __repr.__buffer) { 520 if ( n < b.length ) { 521 return b[n]; 522 } 523 n -= b.length; 524 } 525 assert(false, "Impossible"); 526 } 527 Buffer!T opSlice(size_t m, size_t n) { 528 assert( m <= n && n <= __repr.__length); 529 auto res = Buffer!T(); 530 if ( m == n ) { 531 res.__repr = new Repr; 532 return res; 533 } 534 res.__repr = new Repr(this.__repr); 535 res.popBackN(res.length-n); 536 res.popFrontN(m); 537 return res; 538 } 539 // ptrdiff_t countUntil(in T[] needle) const pure @safe { 540 // ptrdiff_t haystackpos, needlepos; 541 // while(haystackpos < length) { 542 // if ( opIndex(haystackpos) == needle[needlepos] ) { 543 // 544 // return haystackpos; 545 // } else { 546 // needlepos = 0; 547 // haystackpos++; 548 // } 549 // } 550 // return -1; 551 // } 552 @property auto data(U=T[])() const pure { 553 static if ( is(U==T[]) ) { 554 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 555 return __repr.__buffer.front.dup; 556 } 557 } 558 Appender!(T[]) a; 559 if ( __repr && __repr.__buffer ) { 560 foreach(ref b; __repr.__buffer) { 561 a.put(b); 562 } 563 } 564 static if ( is(U==T[]) ) { 565 return a.data; 566 } else { 567 return castFrom!(T[]).to!U(a.data); 568 } 569 } 570 string opCast(string)() { 571 return this.toString; 572 } 573 bool opEquals(U)(U x) { 574 return cast(U)this == x; 575 } 576 } 577 /// 578 public unittest { 579 580 static assert(isInputRange!(Buffer!ubyte)); 581 static assert(isForwardRange!(Buffer!ubyte)); 582 static assert(hasLength!(Buffer!ubyte)); 583 static assert(hasSlicing!(Buffer!ubyte)); 584 static assert(isBidirectionalRange!(Buffer!ubyte)); 585 static assert(isRandomAccessRange!(Buffer!ubyte)); 586 587 auto b = Buffer!ubyte(); 588 b.put("abc".representation.dup); 589 b.put("def".representation.dup); 590 assert(b.length == 6); 591 assert(b.toString == "abcdef"); 592 assert(b.front == 'a'); 593 assert(b.back == 'f'); 594 assert(equal(b[0..$], "abcdef")); 595 assert(equal(b[$-2..$], "ef")); 596 assert(b == "abcdef"); 597 b.popFront; 598 b.popBack; 599 assert(b.front == 'b'); 600 assert(b.back == 'e'); 601 assert(b.length == 4); 602 assert(retro(b).front == 'e'); 603 assert(countUntil(b, 'e') == 3); 604 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 605 assert(equal(b, "bcde")); 606 b.popFront; b.popFront; 607 assert(b.front == 'd'); 608 assert(b.front == b[0]); 609 assert(b.back == b[$-1]); 610 611 auto c = Buffer!ubyte(); 612 c.put("Header0: value0\n".representation.dup); 613 c.put("Header1: value1\n".representation.dup); 614 c.put("Header2: value2\n\nbody".representation.dup); 615 auto c_length = c.length; 616 auto eoh = countUntil(c, "\n\n"); 617 assert(eoh == 47); 618 foreach(header; c[0..eoh].splitter('\n') ) { 619 writeln(castFrom!(ubyte[]).to!(string)(header.data)); 620 } 621 assert(equal(findSplit(c, "\n\n")[2], "body")); 622 assert(c.length == c_length); 623 } 624 625 extern(C) { 626 int SSL_library_init(); 627 void OpenSSL_add_all_ciphers(); 628 void OpenSSL_add_all_digests(); 629 void SSL_load_error_strings(); 630 631 struct SSL {} 632 struct SSL_CTX {} 633 struct SSL_METHOD {} 634 635 SSL_CTX* SSL_CTX_new(const SSL_METHOD* method); 636 SSL* SSL_new(SSL_CTX*); 637 int SSL_set_fd(SSL*, int); 638 int SSL_connect(SSL*); 639 int SSL_write(SSL*, const void*, int); 640 int SSL_read(SSL*, void*, int); 641 int SSL_shutdown(SSL*) @trusted @nogc nothrow; 642 void SSL_free(SSL*); 643 void SSL_CTX_free(SSL_CTX*); 644 645 long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg); 646 647 long SSL_CTX_set_mode(SSL_CTX *ctx, long mode); 648 long SSL_set_mode(SSL *ssl, long mode); 649 650 long SSL_CTX_get_mode(SSL_CTX *ctx); 651 long SSL_get_mode(SSL *ssl); 652 653 SSL_METHOD* SSLv3_client_method(); 654 SSL_METHOD* TLSv1_2_client_method(); 655 SSL_METHOD* TLSv1_client_method(); 656 } 657 658 //pragma(lib, "crypto"); 659 //pragma(lib, "ssl"); 660 661 shared static this() { 662 SSL_library_init(); 663 OpenSSL_add_all_ciphers(); 664 OpenSSL_add_all_digests(); 665 SSL_load_error_strings(); 666 } 667 668 public class OpenSslSocket : Socket { 669 enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 670 private SSL* ssl; 671 private SSL_CTX* ctx; 672 private void initSsl() { 673 //ctx = SSL_CTX_new(SSLv3_client_method()); 674 ctx = SSL_CTX_new(TLSv1_client_method()); 675 assert(ctx !is null); 676 677 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 678 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 679 ssl = SSL_new(ctx); 680 SSL_set_fd(ssl, this.handle); 681 } 682 683 @trusted 684 override void connect(Address to) { 685 super.connect(to); 686 if(SSL_connect(ssl) == -1) 687 throw new Exception("ssl connect failed"); 688 } 689 690 @trusted 691 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) { 692 return SSL_write(ssl, buf.ptr, cast(uint) buf.length); 693 } 694 override ptrdiff_t send(const(void)[] buf) { 695 return send(buf, SocketFlags.NONE); 696 } 697 @trusted 698 override ptrdiff_t receive(void[] buf, SocketFlags flags) { 699 return SSL_read(ssl, buf.ptr, cast(int)buf.length); 700 } 701 override ptrdiff_t receive(void[] buf) { 702 return receive(buf, SocketFlags.NONE); 703 } 704 this(AddressFamily af, SocketType type = SocketType.STREAM) { 705 super(af, type); 706 initSsl(); 707 } 708 709 this(socket_t sock, AddressFamily af) { 710 super(sock, af); 711 initSsl(); 712 } 713 override void close() { 714 //SSL_shutdown(ssl); 715 super.close(); 716 } 717 ~this() { 718 SSL_free(ssl); 719 SSL_CTX_free(ctx); 720 } 721 } 722 723 public abstract class SocketStream { 724 private { 725 Duration timeout; 726 Socket s; 727 bool __isOpen; 728 bool __isConnected; 729 } 730 void open(AddressFamily fa) { 731 } 732 @property ref Socket so() @safe pure { 733 return s; 734 } 735 @property bool isOpen() @safe @nogc pure const { 736 return s && __isOpen; 737 } 738 @property bool isConnected() @safe @nogc pure const { 739 return s && __isConnected; 740 } 741 void close() @trusted { 742 tracef("Close socket"); 743 if ( isOpen ) { 744 s.close(); 745 __isOpen = false; 746 __isConnected = false; 747 } 748 s = null; 749 } 750 751 auto connect(string host, ushort port, Duration timeout = 10.seconds) { 752 tracef(format("Create connection to %s:%d", host, port)); 753 Address[] addresses; 754 __isConnected = false; 755 try { 756 addresses = getAddress(host, port); 757 } catch (Exception e) { 758 errorf("Failed to connect: can't resolve %s - %s", host, e.msg); 759 throw new ConnectError("Can't connect to %s:%d: %s".format(host, port, e.msg)); 760 } 761 foreach(a; addresses) { 762 tracef("Trying %s", a); 763 try { 764 open(a.addressFamily); 765 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 766 s.connect(a); 767 tracef("Connected to %s", a); 768 __isConnected = true; 769 break; 770 } catch (SocketException e) { 771 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 772 s.close(); 773 } 774 } 775 if ( !__isConnected ) { 776 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 777 } 778 return this; 779 } 780 781 ptrdiff_t send(const(void)[] buff) @safe 782 in {assert(isConnected);} 783 body { 784 return s.send(buff); 785 } 786 787 ptrdiff_t receive(void[] buff) @safe { 788 auto r = s.receive(buff); 789 if ( r > 0 ) { 790 buff.length = r; 791 } 792 return r; 793 } 794 } 795 796 public class SSLSocketStream: SocketStream { 797 override void open(AddressFamily fa) { 798 if ( s !is null ) { 799 s.close(); 800 } 801 s = new OpenSslSocket(fa); 802 assert(s !is null, "Can't create socket"); 803 __isOpen = true; 804 } 805 } 806 807 public class TCPSocketStream : SocketStream { 808 override void open(AddressFamily fa) { 809 if ( s !is null ) { 810 s.close(); 811 } 812 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 813 assert(s !is null, "Can't create socket"); 814 __isOpen = true; 815 } 816 } 817