1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 19 alias InDataHandler = DataPipeIface!ubyte; 20 21 public class ConnectError: Exception { 22 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 23 super(msg, file, line); 24 } 25 } 26 27 class DecodingExceptioin: Exception { 28 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 29 super(msg, file, line); 30 } 31 } 32 /** 33 * DataPipeIface can accept some data, process, and return processed data. 34 */ 35 public interface DataPipeIface(E) { 36 /// Is there any processed data ready for reading? 37 bool empty(); 38 /// Put next data portion for processing 39 //void put(E[]); 40 void putNoCopy(E[]); 41 /// Get any ready data 42 E[] get(); 43 /// Signal on end of incoming data stream. 44 void flush(); 45 } 46 /** 47 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 48 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 49 * and uncompress Content-Encoding "gzip". 50 */ 51 public class DataPipe(E) : DataPipeIface!E { 52 53 DataPipeIface!(E)[] pipe; 54 Buffer!E buffer; 55 /// Append data processor to pipeline 56 /// Params: 57 /// p = processor 58 void insert(DataPipeIface!E p) { 59 pipe ~= p; 60 } 61 E[][] process(DataPipeIface!E p, E[][] data) { 62 E[][] result; 63 data.each!(e => p.putNoCopy(e)); 64 while(!p.empty()) result ~= p.get(); 65 return result; 66 } 67 /// Process next data portion. Data passed over pipeline and store result in buffer. 68 /// Params: 69 /// data = input data buffer. 70 /// NoCopy means we do not copy data to buffer, we keep reference 71 void putNoCopy(E[] data) { 72 if ( pipe.empty ) { 73 buffer.putNoCopy(data); 74 return; 75 } 76 auto t = process(pipe.front, [data]); 77 foreach(ref p; pipe[1..$]) { 78 t = process(p, t); 79 } 80 t.each!(b => buffer.putNoCopy(b)); 81 } 82 /// Get what was collected in internal buffer and clear it. 83 /// Returns: 84 /// data collected. 85 E[] get() { 86 if ( buffer.empty ) { 87 return E[].init; 88 } 89 auto res = buffer.data; 90 buffer = Buffer!E.init; 91 return res; 92 } 93 /// 94 /// get without datamove. but user receive [][] 95 /// 96 E[][] getNoCopy() { 97 if ( buffer.empty ) { 98 return E[][].init; 99 } 100 E[][] res = buffer.__repr.__buffer; 101 buffer = Buffer!E.init; 102 return res; 103 } 104 /// Test if internal buffer is empty 105 /// Returns: 106 /// true if internal buffer is empty (nothing to get()) 107 bool empty() pure const @safe { 108 return buffer.empty; 109 } 110 void flush() { 111 E[][] product; 112 foreach(ref p; pipe) { 113 product.each!(e => p.putNoCopy(e)); 114 p.flush(); 115 product.length = 0; 116 while( !p.empty ) product ~= p.get(); 117 } 118 product.each!(b => buffer.putNoCopy(b)); 119 } 120 } 121 122 /** 123 * Processor for gzipped/compressed content. 124 * Also support InputRange interface. 125 */ 126 public class Decompressor(E) : DataPipeIface!E { 127 private { 128 Buffer!ubyte __buff; 129 UnCompress __zlib; 130 } 131 this() { 132 __buff = Buffer!ubyte(); 133 __zlib = new UnCompress(); 134 } 135 override void putNoCopy(E[] data) { 136 if ( __zlib is null ) { 137 __zlib = new UnCompress(); 138 } 139 __buff.putNoCopy(__zlib.uncompress(data)); 140 } 141 override E[] get() pure { 142 assert(__buff.length); 143 auto r = __buff.__repr.__buffer[0]; 144 __buff.popFrontN(r.length); 145 return cast(E[])r; 146 } 147 override void flush() { 148 if ( __zlib is null ) { 149 return; 150 } 151 __buff.put(__zlib.flush()); 152 } 153 override @property bool empty() const pure @safe { 154 debug tracef("empty=%b", __buff.empty); 155 return __buff.empty; 156 } 157 @property auto ref front() pure const @safe { 158 debug tracef("front: buff length=%d", __buff.length); 159 return __buff.front; 160 } 161 @property auto popFront() pure @safe { 162 debug tracef("popFront: buff length=%d", __buff.length); 163 return __buff.popFront; 164 } 165 @property void popFrontN(size_t n) pure @safe { 166 __buff.popFrontN(n); 167 } 168 } 169 170 /** 171 * Unchunk chunked http responce body. 172 */ 173 public class DecodeChunked : DataPipeIface!ubyte { 174 // length := 0 175 // read chunk-size, chunk-extension (if any) and CRLF 176 // while (chunk-size > 0) { 177 // read chunk-data and CRLF 178 // append chunk-data to entity-body 179 // length := length + chunk-size 180 // read chunk-size and CRLF 181 // } 182 // read entity-header 183 // while (entity-header not empty) { 184 // append entity-header to existing header fields 185 // read entity-header 186 // } 187 // Content-Length := length 188 // Remove "chunked" from Transfer-Encoding 189 // 190 191 // Chunked-Body = *chunk 192 // last-chunk 193 // trailer 194 // CRLF 195 // 196 // chunk = chunk-size [ chunk-extension ] CRLF 197 // chunk-data CRLF 198 // chunk-size = 1*HEX 199 // last-chunk = 1*("0") [ chunk-extension ] CRLF 200 // 201 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 202 // chunk-ext-name = token 203 // chunk-ext-val = token | quoted-string 204 // chunk-data = chunk-size(OCTET) 205 // trailer = *(entity-header CRLF) 206 207 alias eType = ubyte; 208 immutable eType[] CRLF = ['\r', '\n']; 209 private { 210 enum States {huntingSize, huntingSeparator, receiving, trailer}; 211 char state = States.huntingSize; 212 size_t chunk_size, to_receive; 213 Buffer!ubyte buff; 214 ubyte[] linebuff; 215 } 216 void putNoCopy(eType[] data) { 217 while ( data.length ) { 218 if ( state == States.trailer ) { 219 return; 220 } 221 if ( state == States.huntingSize ) { 222 import std.ascii; 223 ubyte[10] digits; 224 int i; 225 for(i=0;i<data.length;i++) { 226 ubyte v = data[i]; 227 digits[i] = v; 228 if ( v == '\n' ) { 229 i+=1; 230 break; 231 } 232 } 233 linebuff ~= digits[0..i]; 234 if ( linebuff.length >= 80 ) { 235 throw new DecodingExceptioin("Can't find chunk size in the body"); 236 } 237 data = data[i..$]; 238 239 if (!linebuff.canFind(CRLF)) { 240 continue; 241 } 242 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 243 state = States.receiving; 244 to_receive = chunk_size; 245 if ( chunk_size == 0 ) { 246 state = States.trailer; 247 return; 248 } 249 continue; 250 } 251 if ( state == States.receiving ) { 252 if (to_receive > 0 ) { 253 auto can_store = min(to_receive, data.length); 254 buff.putNoCopy(data[0..can_store]); 255 data = data[can_store..$]; 256 to_receive -= can_store; 257 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 258 if ( to_receive == 0 ) { 259 //tracef("switch to huntig separator"); 260 state = States.huntingSeparator; 261 continue; 262 } 263 continue; 264 } 265 assert(false); 266 } 267 if ( state == States.huntingSeparator ) { 268 if ( data[0] == '\n' || data[0]=='\r') { 269 data = data[1..$]; 270 continue; 271 } 272 state = States.huntingSize; 273 linebuff.length = 0; 274 continue; 275 } 276 } 277 } 278 eType[] get() { 279 auto r = buff.__repr.__buffer[0]; 280 buff.popFrontN(r.length); 281 return r; 282 } 283 void flush() { 284 } 285 bool empty() { 286 debug tracef("empty=%b", buff.empty); 287 return buff.empty; 288 } 289 bool done() { 290 return state==States.trailer; 291 } 292 } 293 294 unittest { 295 info("Testing DataPipe"); 296 globalLogLevel(LogLevel.info); 297 alias eType = char; 298 eType[] gzipped = [ 299 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 300 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 301 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 302 0x08, 0x00, 0x00, 0x00 303 ]; // "abc\ndef\n" 304 auto d = new Decompressor!eType(); 305 d.putNoCopy(gzipped[0..2].dup); 306 d.putNoCopy(gzipped[2..10].dup); 307 d.putNoCopy(gzipped[10..$].dup); 308 d.flush(); 309 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 310 311 auto e = new Decompressor!eType(); 312 e.putNoCopy(gzipped[0..10].dup); 313 e.putNoCopy(gzipped[10..$].dup); 314 e.flush(); 315 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 316 // writeln(gzipped.decompress.filter!(a => a!='b').array); 317 auto dp = new DataPipe!eType; 318 dp.insert(new Decompressor!eType()); 319 dp.putNoCopy(gzipped[0..2].dup); 320 dp.putNoCopy(gzipped[2..$].dup); 321 dp.flush(); 322 assert(equal(dp.get(), "abc\ndef\n")); 323 // empty datapipe shoul just pass input to output 324 auto dpu = new DataPipe!ubyte; 325 dpu.putNoCopy("abcd".dup.representation); 326 dpu.putNoCopy("efgh".dup.representation); 327 dpu.flush(); 328 assert(equal(dpu.get(), "abcdefgh")); 329 info("Test unchunker properties"); 330 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n".dup.representation; 331 ubyte[][] result; 332 auto uc = new DecodeChunked(); 333 uc.putNoCopy(twoChunks); 334 while(!uc.empty) { 335 result ~= uc.get(); 336 } 337 assert(equal(result[0], ['1', '2'])); 338 assert(equal(result[1], ['3', '4'])); 339 info("unchunker correctness - ok"); 340 result[0][0] = '5'; 341 assert(twoChunks[3] == '5'); 342 info("unchunker zero copy - ok"); 343 info("Testing DataPipe - done"); 344 } 345 /** 346 * Buffer used to collect and process data from network. It remainds Appender, but support 347 * also Range interface. 348 * $(P To place data in buffer use put() method.) 349 * $(P To retrieve data from buffer you can use several methods:) 350 * $(UL 351 * $(LI Range methods: front, back, index []) 352 * $(LI data method: return collected data (like Appender.data)) 353 * ) 354 */ 355 static this() { 356 } 357 static ~this() { 358 } 359 enum CACHESIZE = 1024; 360 361 static long reprAlloc; 362 static long reprCacheHit; 363 static long reprCacheRequests; 364 365 366 public struct Buffer(T) { 367 // static Repr[CACHESIZE] cache; 368 // static uint cacheIndex; 369 370 private { 371 Repr cachedOrNew() { 372 return new Repr; 373 // reprCacheRequests++; 374 // if ( false && cacheIndex>0 ) { 375 // reprCacheHit++; 376 // cacheIndex -= 1; 377 // return cache[cacheIndex]; 378 // } else { 379 // return new Repr; 380 // } 381 } 382 class Repr { 383 size_t __length; 384 Unqual!T[][] __buffer; 385 this() { 386 reprAlloc++; 387 __length = 0; 388 } 389 this(Repr other) { 390 reprAlloc++; 391 if ( other is null ) 392 return; 393 __length = other.__length; 394 __buffer = other.__buffer.dup; 395 } 396 } 397 Repr __repr; 398 } 399 400 alias toString = data!string; 401 402 this(this) { 403 if ( !__repr ) { 404 return; 405 } 406 __repr = new Repr(__repr); 407 } 408 this(U)(U[] data) { 409 put(data); 410 } 411 ~this() { 412 __repr = null; 413 // if ( cacheIndex >= CACHESIZE ) { 414 // __repr = null; 415 // return; 416 // } 417 // if ( __repr ) { 418 // __repr.__length = 0; 419 // __repr.__buffer = null; 420 // cache[cacheIndex] = __repr; 421 // cacheIndex += 1; 422 // __repr = null; 423 // } 424 } 425 /*************** 426 * store data. Data copied 427 */ 428 auto put(U)(U[] data) { 429 if ( data.length == 0 ) { 430 return; 431 } 432 if ( !__repr ) { 433 __repr = cachedOrNew(); 434 } 435 static if (!is(U == T)) { 436 auto d = castFrom!(U[]).to!(T[])(data); 437 __repr.__length += d.length; 438 __repr.__buffer ~= d.dup; 439 } else { 440 __repr.__length += data.length; 441 __repr.__buffer ~= data.dup; 442 } 443 return; 444 } 445 auto putNoCopy(U)(U[] data) { 446 if ( data.length == 0 ) { 447 return; 448 } 449 if ( !__repr ) { 450 __repr = cachedOrNew(); 451 } 452 static if (!is(U == T)) { 453 auto d = castFrom!(U[]).to!(T[])(data); 454 __repr.__length += d.length; 455 __repr.__buffer ~= d; 456 } else { 457 __repr.__length += data.length; 458 __repr.__buffer ~= data; 459 } 460 return; 461 } 462 @property auto opDollar() const pure @safe { 463 return __repr.__length; 464 } 465 @property auto length() const pure @safe { 466 if ( !__repr ) { 467 return 0; 468 } 469 return __repr.__length; 470 } 471 @property auto empty() const pure @safe { 472 return length == 0; 473 } 474 @property auto ref front() const pure @safe { 475 assert(length); 476 return __repr.__buffer.front.front; 477 } 478 @property auto ref back() const pure @safe { 479 assert(length); 480 return __repr.__buffer.back.back; 481 } 482 @property void popFront() pure @safe { 483 assert(length); 484 with ( __repr ) { 485 __buffer.front.popFront; 486 if ( __buffer.front.length == 0 ) { 487 __buffer.popFront; 488 } 489 __length--; 490 } 491 } 492 @property void popFrontN(size_t n) pure @safe { 493 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 494 __repr.__length -= n; 495 while( n ) { 496 if ( n <= __repr.__buffer.front.length ) { 497 __repr.__buffer.front.popFrontN(n); 498 if ( __repr.__buffer.front.length == 0 ) { 499 __repr.__buffer.popFront; 500 } 501 return; 502 } 503 n -= __repr.__buffer.front.length; 504 __repr.__buffer.popFront; 505 } 506 } 507 @property void popBack() pure @safe { 508 assert(length); 509 __repr.__buffer.back.popBack; 510 if ( __repr.__buffer.back.length == 0 ) { 511 __repr.__buffer.popBack; 512 } 513 __repr.__length--; 514 } 515 @property void popBackN(size_t n) pure @safe { 516 assert(n <= length, "n: %d, length: %d".format(n, length)); 517 __repr.__length -= n; 518 while( n ) { 519 if ( n <= __repr.__buffer.back.length ) { 520 __repr.__buffer.back.popBackN(n); 521 if ( __repr.__buffer.back.length == 0 ) { 522 __repr.__buffer.popBack; 523 } 524 return; 525 } 526 n -= __repr.__buffer.back.length; 527 __repr.__buffer.popBack; 528 } 529 } 530 @property auto save() @safe { 531 auto n = Buffer!T(); 532 n.__repr = new Repr(__repr); 533 return n; 534 } 535 @property auto ref opIndex(size_t n) const pure @safe { 536 assert( __repr && n < __repr.__length ); 537 foreach(b; __repr.__buffer) { 538 if ( n < b.length ) { 539 return b[n]; 540 } 541 n -= b.length; 542 } 543 assert(false, "Impossible"); 544 } 545 Buffer!T opSlice(size_t m, size_t n) { 546 if ( empty || m == n ) { 547 return Buffer!T(); 548 } 549 assert( m <= n && n <= __repr.__length); 550 auto res = this.save(); 551 res.popBackN(res.__repr.__length-n); 552 res.popFrontN(m); 553 return res; 554 } 555 @property auto data(U=T[])() pure { 556 static if ( is(U==T[]) ) { 557 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 558 return __repr.__buffer.front; 559 } 560 } 561 Appender!(T[]) a; 562 if ( __repr && __repr.__buffer ) { 563 foreach(ref b; __repr.__buffer) { 564 a.put(b); 565 } 566 } 567 static if ( is(U==T[]) ) { 568 return a.data; 569 } else { 570 return castFrom!(T[]).to!U(a.data); 571 } 572 } 573 string opCast(string)() { 574 return this.toString; 575 } 576 bool opEquals(U)(U x) { 577 return cast(U)this == x; 578 } 579 580 } 581 /// 582 public unittest { 583 584 static assert(isInputRange!(Buffer!ubyte)); 585 static assert(isForwardRange!(Buffer!ubyte)); 586 static assert(hasLength!(Buffer!ubyte)); 587 static assert(hasSlicing!(Buffer!ubyte)); 588 static assert(isBidirectionalRange!(Buffer!ubyte)); 589 static assert(isRandomAccessRange!(Buffer!ubyte)); 590 591 auto b = Buffer!ubyte(); 592 b.put("abc".representation.dup); 593 b.put("def".representation.dup); 594 assert(b.length == 6); 595 assert(b.toString == "abcdef"); 596 assert(b.front == 'a'); 597 assert(b.back == 'f'); 598 assert(equal(b[0..$], "abcdef")); 599 assert(equal(b[$-2..$], "ef")); 600 assert(b == "abcdef"); 601 b.popFront; 602 b.popBack; 603 assert(b.front == 'b'); 604 assert(b.back == 'e'); 605 assert(b.length == 4); 606 assert(retro(b).front == 'e'); 607 assert(countUntil(b, 'e') == 3); 608 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 609 assert(equal(b, "bcde")); 610 b.popFront; b.popFront; 611 assert(b.front == 'd'); 612 assert(b.front == b[0]); 613 assert(b.back == b[$-1]); 614 615 auto c = Buffer!ubyte(); 616 c.put("Header0: value0\n".representation.dup); 617 c.put("Header1: value1\n".representation.dup); 618 c.put("Header2: value2\n\nbody".representation.dup); 619 auto c_length = c.length; 620 auto eoh = countUntil(c, "\n\n"); 621 assert(eoh == 47); 622 foreach(header; c[0..eoh].splitter('\n') ) { 623 writeln(castFrom!(ubyte[]).to!(string)(header.data)); 624 } 625 assert(equal(findSplit(c, "\n\n")[2], "body")); 626 assert(c.length == c_length); 627 } 628 629 extern(C) { 630 int SSL_library_init(); 631 void OpenSSL_add_all_ciphers(); 632 void OpenSSL_add_all_digests(); 633 void SSL_load_error_strings(); 634 635 struct SSL {} 636 struct SSL_CTX {} 637 struct SSL_METHOD {} 638 639 SSL_CTX* SSL_CTX_new(const SSL_METHOD* method); 640 SSL* SSL_new(SSL_CTX*); 641 int SSL_set_fd(SSL*, int); 642 int SSL_connect(SSL*); 643 int SSL_write(SSL*, const void*, int); 644 int SSL_read(SSL*, void*, int); 645 int SSL_shutdown(SSL*) @trusted @nogc nothrow; 646 void SSL_free(SSL*); 647 void SSL_CTX_free(SSL_CTX*); 648 649 long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg); 650 651 long SSL_CTX_set_mode(SSL_CTX *ctx, long mode); 652 long SSL_set_mode(SSL *ssl, long mode); 653 654 long SSL_CTX_get_mode(SSL_CTX *ctx); 655 long SSL_get_mode(SSL *ssl); 656 657 SSL_METHOD* SSLv3_client_method(); 658 SSL_METHOD* TLSv1_2_client_method(); 659 SSL_METHOD* TLSv1_client_method(); 660 } 661 662 //pragma(lib, "crypto"); 663 //pragma(lib, "ssl"); 664 665 shared static this() { 666 SSL_library_init(); 667 OpenSSL_add_all_ciphers(); 668 OpenSSL_add_all_digests(); 669 SSL_load_error_strings(); 670 } 671 672 public class OpenSslSocket : Socket { 673 enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 674 private SSL* ssl; 675 private SSL_CTX* ctx; 676 private void initSsl() { 677 //ctx = SSL_CTX_new(SSLv3_client_method()); 678 ctx = SSL_CTX_new(TLSv1_client_method()); 679 assert(ctx !is null); 680 681 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 682 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 683 ssl = SSL_new(ctx); 684 SSL_set_fd(ssl, this.handle); 685 } 686 687 @trusted 688 override void connect(Address to) { 689 super.connect(to); 690 if(SSL_connect(ssl) == -1) 691 throw new Exception("ssl connect failed"); 692 } 693 694 @trusted 695 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) { 696 return SSL_write(ssl, buf.ptr, cast(uint) buf.length); 697 } 698 override ptrdiff_t send(const(void)[] buf) { 699 return send(buf, SocketFlags.NONE); 700 } 701 @trusted 702 override ptrdiff_t receive(void[] buf, SocketFlags flags) { 703 return SSL_read(ssl, buf.ptr, cast(int)buf.length); 704 } 705 override ptrdiff_t receive(void[] buf) { 706 return receive(buf, SocketFlags.NONE); 707 } 708 this(AddressFamily af, SocketType type = SocketType.STREAM) { 709 super(af, type); 710 initSsl(); 711 } 712 713 this(socket_t sock, AddressFamily af) { 714 super(sock, af); 715 initSsl(); 716 } 717 override void close() { 718 //SSL_shutdown(ssl); 719 super.close(); 720 } 721 ~this() { 722 SSL_free(ssl); 723 SSL_CTX_free(ctx); 724 } 725 } 726 727 public abstract class SocketStream { 728 private { 729 Duration timeout; 730 Socket s; 731 bool __isOpen; 732 bool __isConnected; 733 } 734 void open(AddressFamily fa) { 735 } 736 @property ref Socket so() @safe pure { 737 return s; 738 } 739 @property bool isOpen() @safe @nogc pure const { 740 return s && __isOpen; 741 } 742 @property bool isConnected() @safe @nogc pure const { 743 return s && __isConnected; 744 } 745 void close() @trusted { 746 tracef("Close socket"); 747 if ( isOpen ) { 748 s.close(); 749 __isOpen = false; 750 __isConnected = false; 751 } 752 s = null; 753 } 754 755 auto connect(string host, ushort port, Duration timeout = 10.seconds) { 756 tracef(format("Create connection to %s:%d", host, port)); 757 Address[] addresses; 758 __isConnected = false; 759 try { 760 addresses = getAddress(host, port); 761 } catch (Exception e) { 762 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 763 } 764 foreach(a; addresses) { 765 tracef("Trying %s", a); 766 try { 767 open(a.addressFamily); 768 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 769 s.connect(a); 770 tracef("Connected to %s", a); 771 __isConnected = true; 772 break; 773 } catch (SocketException e) { 774 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 775 s.close(); 776 } 777 } 778 if ( !__isConnected ) { 779 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 780 } 781 return this; 782 } 783 784 ptrdiff_t send(const(void)[] buff) @safe 785 in {assert(isConnected);} 786 body { 787 return s.send(buff); 788 } 789 790 ptrdiff_t receive(void[] buff) @safe { 791 auto r = s.receive(buff); 792 if ( r > 0 ) { 793 buff.length = r; 794 } 795 return r; 796 } 797 } 798 799 public class SSLSocketStream: SocketStream { 800 override void open(AddressFamily fa) { 801 if ( s !is null ) { 802 s.close(); 803 } 804 s = new OpenSslSocket(fa); 805 assert(s !is null, "Can't create socket"); 806 __isOpen = true; 807 } 808 } 809 810 public class TCPSocketStream : SocketStream { 811 override void open(AddressFamily fa) { 812 if ( s !is null ) { 813 s.close(); 814 } 815 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 816 assert(s !is null, "Can't create socket"); 817 __isOpen = true; 818 } 819 } 820