1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 20 alias InDataHandler = DataPipeIface!ubyte; 21 22 public class ConnectError: Exception { 23 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 24 super(message, file, line, next); 25 } 26 } 27 28 class DecodingExceptioin: Exception { 29 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 30 super(message, file, line, next); 31 } 32 } 33 34 public class TimeoutException: Exception { 35 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 36 super(message, file, line, next); 37 } 38 } 39 40 /** 41 * DataPipeIface can accept some data, process, and return processed data. 42 */ 43 public interface DataPipeIface(E) { 44 /// Is there any processed data ready for reading? 45 bool empty(); 46 /// Put next data portion for processing 47 //void put(E[]); 48 void putNoCopy(E[]); 49 /// Get any ready data 50 E[] get(); 51 /// Signal on end of incoming data stream. 52 void flush(); 53 } 54 /** 55 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 56 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 57 * and uncompress Content-Encoding "gzip". 58 */ 59 public class DataPipe(E) : DataPipeIface!E { 60 61 DataPipeIface!(E)[] pipe; 62 Buffer!E buffer; 63 /// Append data processor to pipeline 64 /// Params: 65 /// p = processor 66 void insert(DataPipeIface!E p) { 67 pipe ~= p; 68 } 69 E[][] process(DataPipeIface!E p, E[][] data) { 70 E[][] result; 71 data.each!(e => p.putNoCopy(e)); 72 while(!p.empty()) result ~= p.get(); 73 return result; 74 } 75 /// Process next data portion. Data passed over pipeline and store result in buffer. 76 /// Params: 77 /// data = input data buffer. 78 /// NoCopy means we do not copy data to buffer, we keep reference 79 void putNoCopy(E[] data) { 80 if ( pipe.empty ) { 81 buffer.putNoCopy(data); 82 return; 83 } 84 auto t = process(pipe.front, [data]); 85 foreach(ref p; pipe[1..$]) { 86 t = process(p, t); 87 } 88 t.each!(b => buffer.putNoCopy(b)); 89 } 90 /// Get what was collected in internal buffer and clear it. 91 /// Returns: 92 /// data collected. 93 E[] get() { 94 if ( buffer.empty ) { 95 return E[].init; 96 } 97 auto res = buffer.data; 98 buffer = Buffer!E.init; 99 return res; 100 } 101 /// 102 /// get without datamove. but user receive [][] 103 /// 104 E[][] getNoCopy() { 105 if ( buffer.empty ) { 106 return E[][].init; 107 } 108 E[][] res = buffer.__repr.__buffer; 109 buffer = Buffer!E.init; 110 return res; 111 } 112 /// Test if internal buffer is empty 113 /// Returns: 114 /// true if internal buffer is empty (nothing to get()) 115 bool empty() pure const @safe { 116 return buffer.empty; 117 } 118 void flush() { 119 E[][] product; 120 foreach(ref p; pipe) { 121 product.each!(e => p.putNoCopy(e)); 122 p.flush(); 123 product.length = 0; 124 while( !p.empty ) product ~= p.get(); 125 } 126 product.each!(b => buffer.putNoCopy(b)); 127 } 128 } 129 130 /** 131 * Processor for gzipped/compressed content. 132 * Also support InputRange interface. 133 */ 134 public class Decompressor(E) : DataPipeIface!E { 135 private { 136 Buffer!ubyte __buff; 137 UnCompress __zlib; 138 } 139 this() { 140 __buff = Buffer!ubyte(); 141 __zlib = new UnCompress(); 142 } 143 override void putNoCopy(E[] data) { 144 if ( __zlib is null ) { 145 __zlib = new UnCompress(); 146 } 147 __buff.putNoCopy(__zlib.uncompress(data)); 148 } 149 override E[] get() pure { 150 assert(__buff.length); 151 auto r = __buff.__repr.__buffer[0]; 152 __buff.popFrontN(r.length); 153 return cast(E[])r; 154 } 155 override void flush() { 156 if ( __zlib is null ) { 157 return; 158 } 159 __buff.put(__zlib.flush()); 160 } 161 override @property bool empty() const pure @safe { 162 debug tracef("empty=%b", __buff.empty); 163 return __buff.empty; 164 } 165 @property auto ref front() pure const @safe { 166 debug tracef("front: buff length=%d", __buff.length); 167 return __buff.front; 168 } 169 @property auto popFront() pure @safe { 170 debug tracef("popFront: buff length=%d", __buff.length); 171 return __buff.popFront; 172 } 173 @property void popFrontN(size_t n) pure @safe { 174 __buff.popFrontN(n); 175 } 176 } 177 178 /** 179 * Unchunk chunked http responce body. 180 */ 181 public class DecodeChunked : DataPipeIface!ubyte { 182 // length := 0 183 // read chunk-size, chunk-extension (if any) and CRLF 184 // while (chunk-size > 0) { 185 // read chunk-data and CRLF 186 // append chunk-data to entity-body 187 // length := length + chunk-size 188 // read chunk-size and CRLF 189 // } 190 // read entity-header 191 // while (entity-header not empty) { 192 // append entity-header to existing header fields 193 // read entity-header 194 // } 195 // Content-Length := length 196 // Remove "chunked" from Transfer-Encoding 197 // 198 199 // Chunked-Body = *chunk 200 // last-chunk 201 // trailer 202 // CRLF 203 // 204 // chunk = chunk-size [ chunk-extension ] CRLF 205 // chunk-data CRLF 206 // chunk-size = 1*HEX 207 // last-chunk = 1*("0") [ chunk-extension ] CRLF 208 // 209 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 210 // chunk-ext-name = token 211 // chunk-ext-val = token | quoted-string 212 // chunk-data = chunk-size(OCTET) 213 // trailer = *(entity-header CRLF) 214 215 alias eType = ubyte; 216 immutable eType[] CRLF = ['\r', '\n']; 217 private { 218 enum States {huntingSize, huntingSeparator, receiving, trailer}; 219 char state = States.huntingSize; 220 size_t chunk_size, to_receive; 221 Buffer!ubyte buff; 222 ubyte[] linebuff; 223 } 224 void putNoCopy(eType[] data) { 225 while ( data.length ) { 226 if ( state == States.trailer ) { 227 return; 228 } 229 if ( state == States.huntingSize ) { 230 import std.ascii; 231 ubyte[10] digits; 232 int i; 233 for(i=0;i<data.length;i++) { 234 ubyte v = data[i]; 235 digits[i] = v; 236 if ( v == '\n' ) { 237 i+=1; 238 break; 239 } 240 } 241 linebuff ~= digits[0..i]; 242 if ( linebuff.length >= 80 ) { 243 throw new DecodingExceptioin("Can't find chunk size in the body"); 244 } 245 data = data[i..$]; 246 247 if (!linebuff.canFind(CRLF)) { 248 continue; 249 } 250 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 251 state = States.receiving; 252 to_receive = chunk_size; 253 if ( chunk_size == 0 ) { 254 state = States.trailer; 255 return; 256 } 257 continue; 258 } 259 if ( state == States.receiving ) { 260 if (to_receive > 0 ) { 261 auto can_store = min(to_receive, data.length); 262 buff.putNoCopy(data[0..can_store]); 263 data = data[can_store..$]; 264 to_receive -= can_store; 265 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 266 if ( to_receive == 0 ) { 267 //tracef("switch to huntig separator"); 268 state = States.huntingSeparator; 269 continue; 270 } 271 continue; 272 } 273 assert(false); 274 } 275 if ( state == States.huntingSeparator ) { 276 if ( data[0] == '\n' || data[0]=='\r') { 277 data = data[1..$]; 278 continue; 279 } 280 state = States.huntingSize; 281 linebuff.length = 0; 282 continue; 283 } 284 } 285 } 286 eType[] get() { 287 auto r = buff.__repr.__buffer[0]; 288 buff.popFrontN(r.length); 289 return r; 290 } 291 void flush() { 292 } 293 bool empty() { 294 debug tracef("empty=%b", buff.empty); 295 return buff.empty; 296 } 297 bool done() { 298 return state==States.trailer; 299 } 300 } 301 302 unittest { 303 info("Testing DataPipe"); 304 globalLogLevel(LogLevel.info); 305 alias eType = char; 306 eType[] gzipped = [ 307 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 308 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 309 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 310 0x08, 0x00, 0x00, 0x00 311 ]; // "abc\ndef\n" 312 auto d = new Decompressor!eType(); 313 d.putNoCopy(gzipped[0..2].dup); 314 d.putNoCopy(gzipped[2..10].dup); 315 d.putNoCopy(gzipped[10..$].dup); 316 d.flush(); 317 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 318 319 auto e = new Decompressor!eType(); 320 e.putNoCopy(gzipped[0..10].dup); 321 e.putNoCopy(gzipped[10..$].dup); 322 e.flush(); 323 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 324 // writeln(gzipped.decompress.filter!(a => a!='b').array); 325 auto dp = new DataPipe!eType; 326 dp.insert(new Decompressor!eType()); 327 dp.putNoCopy(gzipped[0..2].dup); 328 dp.putNoCopy(gzipped[2..$].dup); 329 dp.flush(); 330 assert(equal(dp.get(), "abc\ndef\n")); 331 // empty datapipe shoul just pass input to output 332 auto dpu = new DataPipe!ubyte; 333 dpu.putNoCopy("abcd".dup.representation); 334 dpu.putNoCopy("efgh".dup.representation); 335 dpu.flush(); 336 assert(equal(dpu.get(), "abcdefgh")); 337 info("Test unchunker properties"); 338 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n".dup.representation; 339 ubyte[][] result; 340 auto uc = new DecodeChunked(); 341 uc.putNoCopy(twoChunks); 342 while(!uc.empty) { 343 result ~= uc.get(); 344 } 345 assert(equal(result[0], ['1', '2'])); 346 assert(equal(result[1], ['3', '4'])); 347 info("unchunker correctness - ok"); 348 result[0][0] = '5'; 349 assert(twoChunks[3] == '5'); 350 info("unchunker zero copy - ok"); 351 info("Testing DataPipe - done"); 352 } 353 /** 354 * Buffer used to collect and process data from network. It remainds Appender, but support 355 * also Range interface. 356 * $(P To place data in buffer use put() method.) 357 * $(P To retrieve data from buffer you can use several methods:) 358 * $(UL 359 * $(LI Range methods: front, back, index []) 360 * $(LI data method: return collected data (like Appender.data)) 361 * ) 362 */ 363 static this() { 364 } 365 static ~this() { 366 } 367 enum CACHESIZE = 1024; 368 369 static long reprAlloc; 370 static long reprCacheHit; 371 static long reprCacheRequests; 372 373 374 public struct Buffer(T) { 375 // static Repr[CACHESIZE] cache; 376 // static uint cacheIndex; 377 378 private { 379 Repr cachedOrNew() { 380 return new Repr; 381 // reprCacheRequests++; 382 // if ( false && cacheIndex>0 ) { 383 // reprCacheHit++; 384 // cacheIndex -= 1; 385 // return cache[cacheIndex]; 386 // } else { 387 // return new Repr; 388 // } 389 } 390 class Repr { 391 size_t __length; 392 Unqual!T[][] __buffer; 393 this() { 394 reprAlloc++; 395 __length = 0; 396 } 397 this(Repr other) { 398 reprAlloc++; 399 if ( other is null ) 400 return; 401 __length = other.__length; 402 __buffer = other.__buffer.dup; 403 } 404 } 405 Repr __repr; 406 } 407 408 alias toString = data!string; 409 410 this(this) { 411 if ( !__repr ) { 412 return; 413 } 414 __repr = new Repr(__repr); 415 } 416 this(U)(U[] data) { 417 put(data); 418 } 419 ~this() { 420 __repr = null; 421 // if ( cacheIndex >= CACHESIZE ) { 422 // __repr = null; 423 // return; 424 // } 425 // if ( __repr ) { 426 // __repr.__length = 0; 427 // __repr.__buffer = null; 428 // cache[cacheIndex] = __repr; 429 // cacheIndex += 1; 430 // __repr = null; 431 // } 432 } 433 /*************** 434 * store data. Data copied 435 */ 436 auto put(U)(U[] data) { 437 if ( data.length == 0 ) { 438 return; 439 } 440 if ( !__repr ) { 441 __repr = cachedOrNew(); 442 } 443 static if (!is(U == T)) { 444 auto d = castFrom!(U[]).to!(T[])(data); 445 __repr.__length += d.length; 446 __repr.__buffer ~= d.dup; 447 } else { 448 __repr.__length += data.length; 449 __repr.__buffer ~= data.dup; 450 } 451 return; 452 } 453 auto putNoCopy(U)(U[] data) { 454 if ( data.length == 0 ) { 455 return; 456 } 457 if ( !__repr ) { 458 __repr = cachedOrNew(); 459 } 460 static if (!is(U == T)) { 461 auto d = castFrom!(U[]).to!(T[])(data); 462 __repr.__length += d.length; 463 __repr.__buffer ~= d; 464 } else { 465 __repr.__length += data.length; 466 __repr.__buffer ~= data; 467 } 468 return; 469 } 470 @property auto opDollar() const pure @safe { 471 return __repr.__length; 472 } 473 @property auto length() const pure @safe { 474 if ( !__repr ) { 475 return 0; 476 } 477 return __repr.__length; 478 } 479 @property auto empty() const pure @safe { 480 return length == 0; 481 } 482 @property auto ref front() const pure @safe { 483 assert(length); 484 return __repr.__buffer.front.front; 485 } 486 @property auto ref back() const pure @safe { 487 assert(length); 488 return __repr.__buffer.back.back; 489 } 490 @property void popFront() pure @safe { 491 assert(length); 492 with ( __repr ) { 493 __buffer.front.popFront; 494 if ( __buffer.front.length == 0 ) { 495 __buffer.popFront; 496 } 497 __length--; 498 } 499 } 500 @property void popFrontN(size_t n) pure @safe { 501 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 502 __repr.__length -= n; 503 while( n ) { 504 if ( n <= __repr.__buffer.front.length ) { 505 __repr.__buffer.front.popFrontN(n); 506 if ( __repr.__buffer.front.length == 0 ) { 507 __repr.__buffer.popFront; 508 } 509 return; 510 } 511 n -= __repr.__buffer.front.length; 512 __repr.__buffer.popFront; 513 } 514 } 515 @property void popBack() pure @safe { 516 assert(length); 517 __repr.__buffer.back.popBack; 518 if ( __repr.__buffer.back.length == 0 ) { 519 __repr.__buffer.popBack; 520 } 521 __repr.__length--; 522 } 523 @property void popBackN(size_t n) pure @safe { 524 assert(n <= length, "n: %d, length: %d".format(n, length)); 525 __repr.__length -= n; 526 while( n ) { 527 if ( n <= __repr.__buffer.back.length ) { 528 __repr.__buffer.back.popBackN(n); 529 if ( __repr.__buffer.back.length == 0 ) { 530 __repr.__buffer.popBack; 531 } 532 return; 533 } 534 n -= __repr.__buffer.back.length; 535 __repr.__buffer.popBack; 536 } 537 } 538 @property auto save() @safe { 539 auto n = Buffer!T(); 540 n.__repr = new Repr(__repr); 541 return n; 542 } 543 @property auto ref opIndex(size_t n) const pure @safe { 544 assert( __repr && n < __repr.__length ); 545 foreach(b; __repr.__buffer) { 546 if ( n < b.length ) { 547 return b[n]; 548 } 549 n -= b.length; 550 } 551 assert(false, "Impossible"); 552 } 553 Buffer!T opSlice(size_t m, size_t n) { 554 if ( empty || m == n ) { 555 return Buffer!T(); 556 } 557 assert( m <= n && n <= __repr.__length); 558 auto res = this.save(); 559 res.popBackN(res.__repr.__length-n); 560 res.popFrontN(m); 561 return res; 562 } 563 @property auto data(U=T[])() pure { 564 static if ( is(U==T[]) ) { 565 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 566 return __repr.__buffer.front; 567 } 568 } 569 Appender!(T[]) a; 570 if ( __repr && __repr.__buffer ) { 571 foreach(ref b; __repr.__buffer) { 572 a.put(b); 573 } 574 } 575 static if ( is(U==T[]) ) { 576 return a.data; 577 } else { 578 return castFrom!(T[]).to!U(a.data); 579 } 580 } 581 string opCast(string)() { 582 return this.toString; 583 } 584 bool opEquals(U)(U x) { 585 return cast(U)this == x; 586 } 587 588 } 589 /// 590 public unittest { 591 592 static assert(isInputRange!(Buffer!ubyte)); 593 static assert(isForwardRange!(Buffer!ubyte)); 594 static assert(hasLength!(Buffer!ubyte)); 595 static assert(hasSlicing!(Buffer!ubyte)); 596 static assert(isBidirectionalRange!(Buffer!ubyte)); 597 static assert(isRandomAccessRange!(Buffer!ubyte)); 598 599 auto b = Buffer!ubyte(); 600 b.put("abc".representation.dup); 601 b.put("def".representation.dup); 602 assert(b.length == 6); 603 assert(b.toString == "abcdef"); 604 assert(b.front == 'a'); 605 assert(b.back == 'f'); 606 assert(equal(b[0..$], "abcdef")); 607 assert(equal(b[$-2..$], "ef")); 608 assert(b == "abcdef"); 609 b.popFront; 610 b.popBack; 611 assert(b.front == 'b'); 612 assert(b.back == 'e'); 613 assert(b.length == 4); 614 assert(retro(b).front == 'e'); 615 assert(countUntil(b, 'e') == 3); 616 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 617 assert(equal(b, "bcde")); 618 b.popFront; b.popFront; 619 assert(b.front == 'd'); 620 assert(b.front == b[0]); 621 assert(b.back == b[$-1]); 622 623 auto c = Buffer!ubyte(); 624 c.put("Header0: value0\n".representation.dup); 625 c.put("Header1: value1\n".representation.dup); 626 c.put("Header2: value2\n\nbody".representation.dup); 627 auto c_length = c.length; 628 auto eoh = countUntil(c, "\n\n"); 629 assert(eoh == 47); 630 foreach(header; c[0..eoh].splitter('\n') ) { 631 writeln(castFrom!(ubyte[]).to!(string)(header.data)); 632 } 633 assert(equal(findSplit(c, "\n\n")[2], "body")); 634 assert(c.length == c_length); 635 } 636 637 version (sslLibs) { 638 extern(C) { 639 int SSL_library_init(); 640 void OpenSSL_add_all_ciphers(); 641 void OpenSSL_add_all_digests(); 642 void SSL_load_error_strings(); 643 644 struct SSL {} 645 struct SSL_CTX {} 646 struct SSL_METHOD {} 647 648 SSL_CTX* SSL_CTX_new(const SSL_METHOD* method); 649 SSL* SSL_new(SSL_CTX*); 650 int SSL_set_fd(SSL*, int); 651 int SSL_connect(SSL*); 652 int SSL_write(SSL*, const void*, int); 653 int SSL_read(SSL*, void*, int); 654 int SSL_shutdown(SSL*) @trusted @nogc nothrow; 655 void SSL_free(SSL*); 656 void SSL_CTX_free(SSL_CTX*); 657 658 long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg); 659 660 long SSL_CTX_set_mode(SSL_CTX *ctx, long mode); 661 long SSL_set_mode(SSL *ssl, long mode); 662 663 long SSL_CTX_get_mode(SSL_CTX *ctx); 664 long SSL_get_mode(SSL *ssl); 665 666 SSL_METHOD* SSLv3_client_method(); 667 SSL_METHOD* TLSv1_2_client_method(); 668 SSL_METHOD* TLSv1_client_method(); 669 } 670 671 //pragma(lib, "crypto"); 672 //pragma(lib, "ssl"); 673 674 shared static this() { 675 SSL_library_init(); 676 OpenSSL_add_all_ciphers(); 677 OpenSSL_add_all_digests(); 678 SSL_load_error_strings(); 679 } 680 681 public class OpenSslSocket : Socket { 682 enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 683 private SSL* ssl; 684 private SSL_CTX* ctx; 685 private void initSsl() { 686 //ctx = SSL_CTX_new(SSLv3_client_method()); 687 ctx = SSL_CTX_new(TLSv1_client_method()); 688 assert(ctx !is null); 689 690 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 691 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 692 ssl = SSL_new(ctx); 693 SSL_set_fd(ssl, this.handle); 694 } 695 696 @trusted 697 override void connect(Address to) { 698 super.connect(to); 699 if(SSL_connect(ssl) == -1) 700 throw new Exception("ssl connect failed"); 701 } 702 703 @trusted 704 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) { 705 return SSL_write(ssl, buf.ptr, cast(uint) buf.length); 706 } 707 override ptrdiff_t send(const(void)[] buf) { 708 return send(buf, SocketFlags.NONE); 709 } 710 @trusted 711 override ptrdiff_t receive(void[] buf, SocketFlags flags) { 712 return SSL_read(ssl, buf.ptr, cast(int)buf.length); 713 } 714 override ptrdiff_t receive(void[] buf) { 715 return receive(buf, SocketFlags.NONE); 716 } 717 this(AddressFamily af, SocketType type = SocketType.STREAM) { 718 super(af, type); 719 initSsl(); 720 } 721 722 this(socket_t sock, AddressFamily af) { 723 super(sock, af); 724 initSsl(); 725 } 726 override void close() { 727 //SSL_shutdown(ssl); 728 super.close(); 729 } 730 ~this() { 731 SSL_free(ssl); 732 SSL_CTX_free(ctx); 733 } 734 } 735 736 public class SSLSocketStream: SocketStream { 737 override void open(AddressFamily fa) { 738 if ( s !is null ) { 739 s.close(); 740 } 741 s = new OpenSslSocket(fa); 742 assert(s !is null, "Can't create socket"); 743 __isOpen = true; 744 } 745 } 746 } 747 748 public interface NetworkStream { 749 @property bool isConnected() const; 750 void close() @trusted; 751 752 /// 753 /// timeout is the socket write timeout. 754 /// 755 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 756 757 ptrdiff_t send(const(void)[] buff); 758 ptrdiff_t receive(void[] buff); 759 760 /// 761 /// Set timeout for receive calls. 0 means no timeout. 762 /// 763 @property void readTimeout(Duration timeout); 764 } 765 766 public abstract class SocketStream : NetworkStream { 767 private { 768 Duration timeout; 769 Socket s; 770 bool __isOpen; 771 bool __isConnected; 772 } 773 void open(AddressFamily fa) { 774 } 775 @property ref Socket so() @safe pure { 776 return s; 777 } 778 @property bool isOpen() @safe @nogc pure const { 779 return s && __isOpen; 780 } 781 @property bool isConnected() @safe @nogc pure const { 782 return s && __isConnected; 783 } 784 void close() @trusted { 785 tracef("Close socket"); 786 if ( isOpen ) { 787 s.close(); 788 __isOpen = false; 789 __isConnected = false; 790 } 791 s = null; 792 } 793 794 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 795 tracef(format("Create connection to %s:%d", host, port)); 796 Address[] addresses; 797 __isConnected = false; 798 try { 799 addresses = getAddress(host, port); 800 } catch (Exception e) { 801 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 802 } 803 foreach(a; addresses) { 804 tracef("Trying %s", a); 805 try { 806 open(a.addressFamily); 807 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 808 s.connect(a); 809 tracef("Connected to %s", a); 810 __isConnected = true; 811 break; 812 } catch (SocketException e) { 813 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 814 s.close(); 815 } 816 } 817 if ( !__isConnected ) { 818 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 819 } 820 return this; 821 } 822 823 ptrdiff_t send(const(void)[] buff) @safe 824 in {assert(isConnected);} 825 body { 826 auto rc = s.send(buff); 827 if (rc < 0) { 828 throw new ErrnoException("sending data"); 829 } 830 return rc; 831 } 832 833 ptrdiff_t receive(void[] buff) @safe { 834 while (true) { 835 auto r = s.receive(buff); 836 if (r < 0) { 837 version(Windows) { 838 if ( errno == 0 ) { 839 throw new TimeoutException("Timeout receiving data"); 840 } 841 } 842 version(Posix) { 843 if ( errno == EINTR ) { 844 continue; 845 } 846 if ( errno == EAGAIN ) { 847 throw new TimeoutException("Timeout receiving data"); 848 } 849 throw new ErrnoException("receiving data"); 850 } 851 } 852 else { 853 buff.length = r; 854 } 855 return r; 856 } 857 assert(false); 858 } 859 860 @property void readTimeout(Duration timeout) @safe { 861 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 862 } 863 } 864 865 public class TCPSocketStream : SocketStream { 866 override void open(AddressFamily fa) { 867 if ( s !is null ) { 868 s.close(); 869 } 870 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 871 assert(s !is null, "Can't create socket"); 872 __isOpen = true; 873 } 874 } 875 876 version (vibeD) { 877 import vibe.core.net, vibe.stream.tls; 878 879 public class TCPVibeStream : NetworkStream { 880 private: 881 TCPConnection _conn; 882 Duration _readTimeout = Duration.max; 883 884 public: 885 @property bool isConnected() const { 886 return _conn.connected; 887 } 888 889 void close() @trusted { 890 _conn.close(); 891 } 892 893 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 894 // FIXME: timeout not supported in vibe.d 895 try { 896 _conn = connectTCP(host, port); 897 } 898 catch (Exception e) 899 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 900 901 return this; 902 } 903 904 ptrdiff_t send(const(void)[] buff) { 905 _conn.write(cast(const(ubyte)[])buff); 906 return buff.length; 907 } 908 909 ptrdiff_t receive(void[] buff) { 910 if (!_conn.waitForData(_readTimeout)) { 911 if (!_conn.connected) { 912 return 0; 913 } 914 throw new TimeoutException("Timeout receiving data"); 915 } 916 917 if(_conn.empty) { 918 return 0; 919 } 920 921 auto chunk = min(_conn.leastSize, buff.length); 922 assert(chunk != 0); 923 _conn.read(cast(ubyte[])buff[0 .. chunk]); 924 return chunk; 925 } 926 927 @property void readTimeout(Duration timeout) { 928 if (timeout == 0.seconds) { 929 _readTimeout = Duration.max; 930 } 931 else { 932 _readTimeout = timeout; 933 } 934 } 935 } 936 937 public class SSLVibeStream : TCPVibeStream { 938 private: 939 Stream _sslStream; 940 941 public: 942 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 943 try { 944 _conn = connectTCP(host, port); 945 auto sslctx = createTLSContext(TLSContextKind.client); 946 sslctx.peerValidationMode = TLSPeerValidationMode.none; 947 _sslStream = createTLSStream(_conn, sslctx); 948 } 949 catch (Exception e) { 950 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 951 } 952 953 return this; 954 } 955 956 override ptrdiff_t send(const(void)[] buff) { 957 _sslStream.write(cast(const(ubyte)[])buff); 958 return buff.length; 959 } 960 961 override ptrdiff_t receive(void[] buff) { 962 if (!_sslStream.dataAvailableForRead) { 963 if (!_conn.waitForData(_readTimeout)) { 964 if (!_conn.connected) { 965 return 0; 966 } 967 throw new TimeoutException("Timeout receiving data"); 968 } 969 } 970 971 if(_sslStream.empty) { 972 return 0; 973 } 974 975 auto chunk = min(_sslStream.leastSize, buff.length); 976 assert(chunk != 0); 977 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 978 return chunk; 979 } 980 981 override void close() @trusted { 982 _sslStream.finalize(); 983 _conn.close(); 984 } 985 } 986 } 987 988 version (vibeD) { 989 public alias TCPStream = TCPVibeStream; 990 public alias SSLStream = SSLVibeStream; 991 } 992 else { 993 public alias TCPStream = TCPSocketStream; 994 public alias SSLStream = SSLSocketStream; 995 }