1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 20 alias InDataHandler = DataPipeIface!ubyte; 21 22 public class ConnectError: Exception { 23 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 24 super(message, file, line, next); 25 } 26 } 27 28 class DecodingException: Exception { 29 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 30 super(message, file, line, next); 31 } 32 } 33 34 public class TimeoutException: Exception { 35 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 36 super(message, file, line, next); 37 } 38 } 39 40 /** 41 * DataPipeIface can accept some data, process, and return processed data. 42 */ 43 public interface DataPipeIface(E) { 44 /// Is there any processed data ready for reading? 45 bool empty(); 46 /// Put next data portion for processing 47 //void put(E[]); 48 void putNoCopy(E[]); 49 /// Get any ready data 50 E[] get(); 51 /// Signal on end of incoming data stream. 52 void flush(); 53 } 54 /** 55 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 56 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 57 * and uncompress Content-Encoding "gzip". 58 */ 59 public class DataPipe(E) : DataPipeIface!E { 60 61 DataPipeIface!(E)[] pipe; 62 Buffer!E buffer; 63 /// Append data processor to pipeline 64 /// Params: 65 /// p = processor 66 void insert(DataPipeIface!E p) { 67 pipe ~= p; 68 } 69 E[][] process(DataPipeIface!E p, E[][] data) { 70 E[][] result; 71 data.each!(e => p.putNoCopy(e)); 72 while(!p.empty()) result ~= p.get(); 73 return result; 74 } 75 /// Process next data portion. Data passed over pipeline and store result in buffer. 76 /// Params: 77 /// data = input data buffer. 78 /// NoCopy means we do not copy data to buffer, we keep reference 79 void putNoCopy(E[] data) { 80 if ( pipe.empty ) { 81 buffer.putNoCopy(data); 82 return; 83 } 84 try { 85 auto t = process(pipe.front, [data]); 86 foreach(ref p; pipe[1..$]) { 87 t = process(p, t); 88 } 89 t.each!(b => buffer.putNoCopy(b)); 90 } 91 catch (Exception e) { 92 throw new DecodingException(e.msg); 93 } 94 } 95 /// Get what was collected in internal buffer and clear it. 96 /// Returns: 97 /// data collected. 98 E[] get() { 99 if ( buffer.empty ) { 100 return E[].init; 101 } 102 auto res = buffer.data; 103 buffer = Buffer!E.init; 104 return res; 105 } 106 /// 107 /// get without datamove. but user receive [][] 108 /// 109 E[][] getNoCopy() { 110 if ( buffer.empty ) { 111 return E[][].init; 112 } 113 E[][] res = buffer.__repr.__buffer; 114 buffer = Buffer!E.init; 115 return res; 116 } 117 /// Test if internal buffer is empty 118 /// Returns: 119 /// true if internal buffer is empty (nothing to get()) 120 bool empty() pure const @safe { 121 return buffer.empty; 122 } 123 void flush() { 124 E[][] product; 125 foreach(ref p; pipe) { 126 product.each!(e => p.putNoCopy(e)); 127 p.flush(); 128 product.length = 0; 129 while( !p.empty ) product ~= p.get(); 130 } 131 product.each!(b => buffer.putNoCopy(b)); 132 } 133 } 134 135 /** 136 * Processor for gzipped/compressed content. 137 * Also support InputRange interface. 138 */ 139 public class Decompressor(E) : DataPipeIface!E { 140 private { 141 Buffer!ubyte __buff; 142 UnCompress __zlib; 143 } 144 this() { 145 __buff = Buffer!ubyte(); 146 __zlib = new UnCompress(); 147 } 148 override void putNoCopy(E[] data) { 149 if ( __zlib is null ) { 150 __zlib = new UnCompress(); 151 } 152 __buff.putNoCopy(__zlib.uncompress(data)); 153 } 154 override E[] get() pure { 155 assert(__buff.length); 156 auto r = __buff.__repr.__buffer[0]; 157 __buff.popFrontN(r.length); 158 return cast(E[])r; 159 } 160 override void flush() { 161 if ( __zlib is null ) { 162 return; 163 } 164 __buff.put(__zlib.flush()); 165 } 166 override @property bool empty() const pure @safe { 167 debug(requests) tracef("empty=%b", __buff.empty); 168 return __buff.empty; 169 } 170 @property auto ref front() pure const @safe { 171 debug(requests) tracef("front: buff length=%d", __buff.length); 172 return __buff.front; 173 } 174 @property auto popFront() pure @safe { 175 debug(requests) tracef("popFront: buff length=%d", __buff.length); 176 return __buff.popFront; 177 } 178 @property void popFrontN(size_t n) pure @safe { 179 __buff.popFrontN(n); 180 } 181 } 182 183 /** 184 * Unchunk chunked http responce body. 185 */ 186 public class DecodeChunked : DataPipeIface!ubyte { 187 // length := 0 188 // read chunk-size, chunk-extension (if any) and CRLF 189 // while (chunk-size > 0) { 190 // read chunk-data and CRLF 191 // append chunk-data to entity-body 192 // length := length + chunk-size 193 // read chunk-size and CRLF 194 // } 195 // read entity-header 196 // while (entity-header not empty) { 197 // append entity-header to existing header fields 198 // read entity-header 199 // } 200 // Content-Length := length 201 // Remove "chunked" from Transfer-Encoding 202 // 203 204 // Chunked-Body = *chunk 205 // last-chunk 206 // trailer 207 // CRLF 208 // 209 // chunk = chunk-size [ chunk-extension ] CRLF 210 // chunk-data CRLF 211 // chunk-size = 1*HEX 212 // last-chunk = 1*("0") [ chunk-extension ] CRLF 213 // 214 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 215 // chunk-ext-name = token 216 // chunk-ext-val = token | quoted-string 217 // chunk-data = chunk-size(OCTET) 218 // trailer = *(entity-header CRLF) 219 220 alias eType = ubyte; 221 immutable eType[] CRLF = ['\r', '\n']; 222 private { 223 enum States {huntingSize, huntingSeparator, receiving, trailer}; 224 char state = States.huntingSize; 225 size_t chunk_size, to_receive; 226 Buffer!ubyte buff; 227 ubyte[] linebuff; 228 } 229 void putNoCopy(eType[] data) { 230 while ( data.length ) { 231 if ( state == States.trailer ) { 232 return; 233 } 234 if ( state == States.huntingSize ) { 235 import std.ascii; 236 ubyte[10] digits; 237 int i; 238 for(i=0;i<data.length;i++) { 239 ubyte v = data[i]; 240 digits[i] = v; 241 if ( v == '\n' ) { 242 i+=1; 243 break; 244 } 245 } 246 linebuff ~= digits[0..i]; 247 if ( linebuff.length >= 80 ) { 248 throw new DecodingException("Can't find chunk size in the body"); 249 } 250 data = data[i..$]; 251 252 if (!linebuff.canFind(CRLF)) { 253 continue; 254 } 255 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 256 state = States.receiving; 257 to_receive = chunk_size; 258 if ( chunk_size == 0 ) { 259 state = States.trailer; 260 return; 261 } 262 continue; 263 } 264 if ( state == States.receiving ) { 265 if (to_receive > 0 ) { 266 auto can_store = min(to_receive, data.length); 267 buff.putNoCopy(data[0..can_store]); 268 data = data[can_store..$]; 269 to_receive -= can_store; 270 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 271 if ( to_receive == 0 ) { 272 //tracef("switch to huntig separator"); 273 state = States.huntingSeparator; 274 continue; 275 } 276 continue; 277 } 278 assert(false); 279 } 280 if ( state == States.huntingSeparator ) { 281 if ( data[0] == '\n' || data[0]=='\r') { 282 data = data[1..$]; 283 continue; 284 } 285 state = States.huntingSize; 286 linebuff.length = 0; 287 continue; 288 } 289 } 290 } 291 eType[] get() { 292 auto r = buff.__repr.__buffer[0]; 293 buff.popFrontN(r.length); 294 return r; 295 } 296 void flush() { 297 } 298 bool empty() { 299 debug(requests) tracef("empty=%b", buff.empty); 300 return buff.empty; 301 } 302 bool done() { 303 return state==States.trailer; 304 } 305 } 306 307 unittest { 308 info("Testing DataPipe"); 309 globalLogLevel(LogLevel.info); 310 alias eType = char; 311 eType[] gzipped = [ 312 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 313 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 314 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 315 0x08, 0x00, 0x00, 0x00 316 ]; // "abc\ndef\n" 317 auto d = new Decompressor!eType(); 318 d.putNoCopy(gzipped[0..2].dup); 319 d.putNoCopy(gzipped[2..10].dup); 320 d.putNoCopy(gzipped[10..$].dup); 321 d.flush(); 322 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 323 324 auto e = new Decompressor!eType(); 325 e.putNoCopy(gzipped[0..10].dup); 326 e.putNoCopy(gzipped[10..$].dup); 327 e.flush(); 328 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 329 // writeln(gzipped.decompress.filter!(a => a!='b').array); 330 auto dp = new DataPipe!eType; 331 dp.insert(new Decompressor!eType()); 332 dp.putNoCopy(gzipped[0..2].dup); 333 dp.putNoCopy(gzipped[2..$].dup); 334 dp.flush(); 335 assert(equal(dp.get(), "abc\ndef\n")); 336 // empty datapipe shoul just pass input to output 337 auto dpu = new DataPipe!ubyte; 338 dpu.putNoCopy("abcd".dup.representation); 339 dpu.putNoCopy("efgh".dup.representation); 340 dpu.flush(); 341 assert(equal(dpu.get(), "abcdefgh")); 342 info("Test unchunker properties"); 343 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n".dup.representation; 344 ubyte[][] result; 345 auto uc = new DecodeChunked(); 346 uc.putNoCopy(twoChunks); 347 while(!uc.empty) { 348 result ~= uc.get(); 349 } 350 assert(equal(result[0], ['1', '2'])); 351 assert(equal(result[1], ['3', '4'])); 352 info("unchunker correctness - ok"); 353 result[0][0] = '5'; 354 assert(twoChunks[3] == '5'); 355 info("unchunker zero copy - ok"); 356 info("Testing DataPipe - done"); 357 } 358 /** 359 * Buffer used to collect and process data from network. It remainds Appender, but support 360 * also Range interface. 361 * $(P To place data in buffer use put() method.) 362 * $(P To retrieve data from buffer you can use several methods:) 363 * $(UL 364 * $(LI Range methods: front, back, index []) 365 * $(LI data method: return collected data (like Appender.data)) 366 * ) 367 */ 368 static this() { 369 } 370 static ~this() { 371 } 372 enum CACHESIZE = 1024; 373 374 static long reprAlloc; 375 static long reprCacheHit; 376 static long reprCacheRequests; 377 378 379 public struct Buffer(T) { 380 // static Repr[CACHESIZE] cache; 381 // static uint cacheIndex; 382 383 private { 384 Repr cachedOrNew() { 385 return new Repr; 386 // reprCacheRequests++; 387 // if ( false && cacheIndex>0 ) { 388 // reprCacheHit++; 389 // cacheIndex -= 1; 390 // return cache[cacheIndex]; 391 // } else { 392 // return new Repr; 393 // } 394 } 395 class Repr { 396 size_t __length; 397 Unqual!T[][] __buffer; 398 this() { 399 reprAlloc++; 400 __length = 0; 401 } 402 this(Repr other) { 403 reprAlloc++; 404 if ( other is null ) 405 return; 406 __length = other.__length; 407 __buffer = other.__buffer.dup; 408 } 409 } 410 Repr __repr; 411 } 412 413 alias toString = data!string; 414 415 this(this) { 416 if ( !__repr ) { 417 return; 418 } 419 __repr = new Repr(__repr); 420 } 421 this(U)(U[] data) { 422 put(data); 423 } 424 ~this() { 425 __repr = null; 426 // if ( cacheIndex >= CACHESIZE ) { 427 // __repr = null; 428 // return; 429 // } 430 // if ( __repr ) { 431 // __repr.__length = 0; 432 // __repr.__buffer = null; 433 // cache[cacheIndex] = __repr; 434 // cacheIndex += 1; 435 // __repr = null; 436 // } 437 } 438 /*************** 439 * store data. Data copied 440 */ 441 auto put(U)(U[] data) { 442 if ( data.length == 0 ) { 443 return; 444 } 445 if ( !__repr ) { 446 __repr = cachedOrNew(); 447 } 448 static if (!is(U == T)) { 449 auto d = castFrom!(U[]).to!(T[])(data); 450 __repr.__length += d.length; 451 __repr.__buffer ~= d.dup; 452 } else { 453 __repr.__length += data.length; 454 __repr.__buffer ~= data.dup; 455 } 456 return; 457 } 458 auto putNoCopy(U)(U[] data) { 459 if ( data.length == 0 ) { 460 return; 461 } 462 if ( !__repr ) { 463 __repr = cachedOrNew(); 464 } 465 static if (!is(U == T)) { 466 auto d = castFrom!(U[]).to!(T[])(data); 467 __repr.__length += d.length; 468 __repr.__buffer ~= d; 469 } else { 470 __repr.__length += data.length; 471 __repr.__buffer ~= data; 472 } 473 return; 474 } 475 @property auto opDollar() const pure @safe { 476 return __repr.__length; 477 } 478 @property auto length() const pure @safe { 479 if ( !__repr ) { 480 return 0; 481 } 482 return __repr.__length; 483 } 484 @property auto empty() const pure @safe { 485 return length == 0; 486 } 487 @property auto ref front() const pure @safe { 488 assert(length); 489 return __repr.__buffer.front.front; 490 } 491 @property auto ref back() const pure @safe { 492 assert(length); 493 return __repr.__buffer.back.back; 494 } 495 @property void popFront() pure @safe { 496 assert(length); 497 with ( __repr ) { 498 __buffer.front.popFront; 499 if ( __buffer.front.length == 0 ) { 500 __buffer.popFront; 501 } 502 __length--; 503 } 504 } 505 @property void popFrontN(size_t n) pure @safe { 506 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 507 __repr.__length -= n; 508 while( n ) { 509 if ( n <= __repr.__buffer.front.length ) { 510 __repr.__buffer.front.popFrontN(n); 511 if ( __repr.__buffer.front.length == 0 ) { 512 __repr.__buffer.popFront; 513 } 514 return; 515 } 516 n -= __repr.__buffer.front.length; 517 __repr.__buffer.popFront; 518 } 519 } 520 @property void popBack() pure @safe { 521 assert(length); 522 __repr.__buffer.back.popBack; 523 if ( __repr.__buffer.back.length == 0 ) { 524 __repr.__buffer.popBack; 525 } 526 __repr.__length--; 527 } 528 @property void popBackN(size_t n) pure @safe { 529 assert(n <= length, "n: %d, length: %d".format(n, length)); 530 __repr.__length -= n; 531 while( n ) { 532 if ( n <= __repr.__buffer.back.length ) { 533 __repr.__buffer.back.popBackN(n); 534 if ( __repr.__buffer.back.length == 0 ) { 535 __repr.__buffer.popBack; 536 } 537 return; 538 } 539 n -= __repr.__buffer.back.length; 540 __repr.__buffer.popBack; 541 } 542 } 543 @property auto save() @safe { 544 auto n = Buffer!T(); 545 n.__repr = new Repr(__repr); 546 return n; 547 } 548 @property auto ref opIndex(size_t n) const pure @safe { 549 assert( __repr && n < __repr.__length ); 550 foreach(b; __repr.__buffer) { 551 if ( n < b.length ) { 552 return b[n]; 553 } 554 n -= b.length; 555 } 556 assert(false, "Impossible"); 557 } 558 Buffer!T opSlice(size_t m, size_t n) { 559 if ( empty || m == n ) { 560 return Buffer!T(); 561 } 562 assert( m <= n && n <= __repr.__length); 563 auto res = this.save(); 564 res.popBackN(res.__repr.__length-n); 565 res.popFrontN(m); 566 return res; 567 } 568 @property auto data(U=T[])() pure { 569 static if ( is(U==T[]) ) { 570 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 571 return __repr.__buffer.front; 572 } 573 } 574 Appender!(T[]) a; 575 if ( __repr && __repr.__buffer ) { 576 foreach(ref b; __repr.__buffer) { 577 a.put(b); 578 } 579 } 580 static if ( is(U==T[]) ) { 581 return a.data; 582 } else { 583 return castFrom!(T[]).to!U(a.data); 584 } 585 } 586 string opCast(string)() { 587 return this.toString; 588 } 589 bool opEquals(U)(U x) { 590 return cast(U)this == x; 591 } 592 593 } 594 /// 595 public unittest { 596 597 static assert(isInputRange!(Buffer!ubyte)); 598 static assert(isForwardRange!(Buffer!ubyte)); 599 static assert(hasLength!(Buffer!ubyte)); 600 static assert(hasSlicing!(Buffer!ubyte)); 601 static assert(isBidirectionalRange!(Buffer!ubyte)); 602 static assert(isRandomAccessRange!(Buffer!ubyte)); 603 604 auto b = Buffer!ubyte(); 605 b.put("abc".representation.dup); 606 b.put("def".representation.dup); 607 assert(b.length == 6); 608 assert(b.toString == "abcdef"); 609 assert(b.front == 'a'); 610 assert(b.back == 'f'); 611 assert(equal(b[0..$], "abcdef")); 612 assert(equal(b[$-2..$], "ef")); 613 assert(b == "abcdef"); 614 b.popFront; 615 b.popBack; 616 assert(b.front == 'b'); 617 assert(b.back == 'e'); 618 assert(b.length == 4); 619 assert(retro(b).front == 'e'); 620 assert(countUntil(b, 'e') == 3); 621 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 622 assert(equal(b, "bcde")); 623 b.popFront; b.popFront; 624 assert(b.front == 'd'); 625 assert(b.front == b[0]); 626 assert(b.back == b[$-1]); 627 628 auto c = Buffer!ubyte(); 629 c.put("Header0: value0\n".representation.dup); 630 c.put("Header1: value1\n".representation.dup); 631 c.put("Header2: value2\n\nbody".representation.dup); 632 auto c_length = c.length; 633 auto eoh = countUntil(c, "\n\n"); 634 assert(eoh == 47); 635 foreach(header; c[0..eoh].splitter('\n') ) { 636 writeln(castFrom!(ubyte[]).to!(string)(header.data)); 637 } 638 assert(equal(findSplit(c, "\n\n")[2], "body")); 639 assert(c.length == c_length); 640 } 641 version(vibeD) { 642 } 643 else { 644 extern(C) { 645 int SSL_library_init(); 646 void OpenSSL_add_all_ciphers(); 647 void OpenSSL_add_all_digests(); 648 void SSL_load_error_strings(); 649 650 struct SSL {} 651 struct SSL_CTX {} 652 struct SSL_METHOD {} 653 654 SSL_CTX* SSL_CTX_new(const SSL_METHOD* method); 655 SSL* SSL_new(SSL_CTX*); 656 int SSL_set_fd(SSL*, int); 657 int SSL_connect(SSL*); 658 int SSL_write(SSL*, const void*, int); 659 int SSL_read(SSL*, void*, int); 660 int SSL_shutdown(SSL*) @trusted @nogc nothrow; 661 void SSL_free(SSL*); 662 void SSL_CTX_free(SSL_CTX*); 663 664 long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg); 665 666 long SSL_CTX_set_mode(SSL_CTX *ctx, long mode); 667 long SSL_set_mode(SSL *ssl, long mode); 668 669 long SSL_CTX_get_mode(SSL_CTX *ctx); 670 long SSL_get_mode(SSL *ssl); 671 672 SSL_METHOD* SSLv3_client_method(); 673 SSL_METHOD* TLSv1_2_client_method(); 674 SSL_METHOD* TLSv1_client_method(); 675 } 676 677 //pragma(lib, "crypto"); 678 //pragma(lib, "ssl"); 679 680 shared static this() { 681 SSL_library_init(); 682 OpenSSL_add_all_ciphers(); 683 OpenSSL_add_all_digests(); 684 SSL_load_error_strings(); 685 } 686 687 public class OpenSslSocket : Socket { 688 enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 689 private SSL* ssl; 690 private SSL_CTX* ctx; 691 private void initSsl() { 692 //ctx = SSL_CTX_new(SSLv3_client_method()); 693 ctx = SSL_CTX_new(TLSv1_client_method()); 694 assert(ctx !is null); 695 696 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 697 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 698 ssl = SSL_new(ctx); 699 SSL_set_fd(ssl, this.handle); 700 } 701 702 @trusted 703 override void connect(Address to) { 704 super.connect(to); 705 if(SSL_connect(ssl) == -1) 706 throw new Exception("ssl connect failed"); 707 } 708 709 @trusted 710 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) { 711 return SSL_write(ssl, buf.ptr, cast(uint) buf.length); 712 } 713 override ptrdiff_t send(const(void)[] buf) { 714 return send(buf, SocketFlags.NONE); 715 } 716 @trusted 717 override ptrdiff_t receive(void[] buf, SocketFlags flags) { 718 return SSL_read(ssl, buf.ptr, cast(int)buf.length); 719 } 720 override ptrdiff_t receive(void[] buf) { 721 return receive(buf, SocketFlags.NONE); 722 } 723 this(AddressFamily af, SocketType type = SocketType.STREAM) { 724 super(af, type); 725 initSsl(); 726 } 727 728 this(socket_t sock, AddressFamily af) { 729 super(sock, af); 730 initSsl(); 731 } 732 override void close() { 733 //SSL_shutdown(ssl); 734 super.close(); 735 } 736 ~this() { 737 SSL_free(ssl); 738 SSL_CTX_free(ctx); 739 } 740 } 741 742 public class SSLSocketStream: SocketStream { 743 override void open(AddressFamily fa) { 744 if ( s !is null ) { 745 s.close(); 746 } 747 s = new OpenSslSocket(fa); 748 assert(s !is null, "Can't create socket"); 749 __isOpen = true; 750 } 751 override SSLSocketStream accept() { 752 auto newso = s.accept(); 753 if ( s is null ) { 754 return null; 755 } 756 auto newstream = new SSLSocketStream(); 757 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 758 newstream.s = sslSocket; 759 newstream.__isOpen = true; 760 newstream.__isConnected = true; 761 return newstream; 762 } 763 } 764 } 765 766 public interface NetworkStream { 767 @property bool isConnected() const; 768 @property bool isOpen() const; 769 770 void close() @trusted; 771 772 /// 773 /// timeout is the socket write timeout. 774 /// 775 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 776 777 ptrdiff_t send(const(void)[] buff); 778 ptrdiff_t receive(void[] buff); 779 780 NetworkStream accept(); 781 @property void reuseAddr(bool); 782 void bind(Address); 783 void listen(int); 784 785 /// 786 /// Set timeout for receive calls. 0 means no timeout. 787 /// 788 @property void readTimeout(Duration timeout); 789 } 790 791 public abstract class SocketStream : NetworkStream { 792 private { 793 Duration timeout; 794 Socket s; 795 bool __isOpen; 796 bool __isConnected; 797 } 798 void open(AddressFamily fa) { 799 } 800 @property ref Socket so() @safe pure { 801 return s; 802 } 803 @property bool isOpen() @safe @nogc pure const { 804 return s && __isOpen; 805 } 806 @property bool isConnected() @safe @nogc pure const { 807 return s && __isOpen && __isConnected; 808 } 809 void close() @trusted { 810 debug(requests) tracef("Close socket"); 811 if ( isOpen ) { 812 s.close(); 813 __isOpen = false; 814 __isConnected = false; 815 } 816 s = null; 817 } 818 819 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 820 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 821 Address[] addresses; 822 __isConnected = false; 823 try { 824 addresses = getAddress(host, port); 825 } catch (Exception e) { 826 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 827 } 828 foreach(a; addresses) { 829 debug(requests) tracef("Trying %s", a); 830 try { 831 open(a.addressFamily); 832 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 833 s.connect(a); 834 debug(requests) tracef("Connected to %s", a); 835 __isConnected = true; 836 break; 837 } catch (SocketException e) { 838 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 839 s.close(); 840 } 841 } 842 if ( !__isConnected ) { 843 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 844 } 845 return this; 846 } 847 848 ptrdiff_t send(const(void)[] buff) @safe 849 in {assert(isConnected);} 850 body { 851 auto rc = s.send(buff); 852 if (rc < 0) { 853 close(); 854 throw new ErrnoException("sending data"); 855 } 856 return rc; 857 } 858 859 ptrdiff_t receive(void[] buff) @safe { 860 while (true) { 861 auto r = s.receive(buff); 862 if (r < 0) { 863 version(Windows) { 864 close(); 865 if ( errno == 0 ) { 866 throw new TimeoutException("Timeout receiving data"); 867 } 868 throw new ErrnoException(); 869 } 870 version(Posix) { 871 if ( errno == EINTR ) { 872 continue; 873 } 874 close(); 875 if ( errno == EAGAIN ) { 876 throw new TimeoutException("Timeout receiving data"); 877 } 878 throw new ErrnoException("receiving data"); 879 } 880 } 881 else { 882 buff.length = r; 883 } 884 return r; 885 } 886 assert(false); 887 } 888 889 @property void readTimeout(Duration timeout) @safe { 890 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 891 } 892 override SocketStream accept() { 893 assert(false, "Implement before use"); 894 } 895 @property override void reuseAddr(bool yes){ 896 if (yes) { 897 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 898 } 899 else { 900 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 901 } 902 } 903 override void bind(Address addr){ 904 s.bind(addr); 905 } 906 override void listen(int n) { 907 s.listen(n); 908 }; 909 } 910 911 public class TCPSocketStream : SocketStream { 912 override void open(AddressFamily fa) { 913 if ( s !is null ) { 914 s.close(); 915 } 916 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 917 assert(s !is null, "Can't create socket"); 918 __isOpen = true; 919 } 920 override TCPSocketStream accept() { 921 auto newso = s.accept(); 922 if ( s is null ) { 923 return null; 924 } 925 auto newstream = new TCPSocketStream(); 926 newstream.s = newso; 927 newstream.__isOpen = true; 928 newstream.__isConnected = true; 929 return newstream; 930 } 931 } 932 933 version (vibeD) { 934 import vibe.core.net, vibe.stream.tls; 935 936 public class TCPVibeStream : NetworkStream { 937 private: 938 TCPConnection _conn; 939 Duration _readTimeout = Duration.max; 940 bool _isOpen = true; 941 942 public: 943 @property bool isConnected() const { 944 return _conn.connected; 945 } 946 @property override bool isOpen() const { 947 return _conn && _isOpen; 948 } 949 void close() @trusted { 950 _conn.close(); 951 _isOpen = false; 952 } 953 954 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 955 // FIXME: timeout not supported in vibe.d 956 try { 957 _conn = connectTCP(host, port); 958 } 959 catch (Exception e) 960 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 961 962 return this; 963 } 964 965 ptrdiff_t send(const(void)[] buff) { 966 _conn.write(cast(const(ubyte)[])buff); 967 return buff.length; 968 } 969 970 ptrdiff_t receive(void[] buff) { 971 if (!_conn.waitForData(_readTimeout)) { 972 if (!_conn.connected) { 973 return 0; 974 } 975 throw new TimeoutException("Timeout receiving data"); 976 } 977 978 if(_conn.empty) { 979 return 0; 980 } 981 982 auto chunk = min(_conn.leastSize, buff.length); 983 assert(chunk != 0); 984 _conn.read(cast(ubyte[])buff[0 .. chunk]); 985 return chunk; 986 } 987 988 @property void readTimeout(Duration timeout) { 989 if (timeout == 0.seconds) { 990 _readTimeout = Duration.max; 991 } 992 else { 993 _readTimeout = timeout; 994 } 995 } 996 override TCPVibeStream accept() { 997 assert(false, "Must be implemented"); 998 } 999 override @property void reuseAddr(bool){ 1000 assert(false, "Not Implemented"); 1001 } 1002 override void bind(Address){ 1003 assert(false, "Not Implemented"); 1004 } 1005 override void listen(int){ 1006 assert(false, "Not Implemented"); 1007 } 1008 } 1009 1010 public class SSLVibeStream : TCPVibeStream { 1011 private: 1012 Stream _sslStream; 1013 bool _isOpen = true; 1014 public: 1015 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1016 try { 1017 _conn = connectTCP(host, port); 1018 auto sslctx = createTLSContext(TLSContextKind.client); 1019 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1020 _sslStream = createTLSStream(_conn, sslctx); 1021 } 1022 catch (Exception e) { 1023 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1024 } 1025 1026 return this; 1027 } 1028 1029 override ptrdiff_t send(const(void)[] buff) { 1030 _sslStream.write(cast(const(ubyte)[])buff); 1031 return buff.length; 1032 } 1033 1034 override ptrdiff_t receive(void[] buff) { 1035 if (!_sslStream.dataAvailableForRead) { 1036 if (!_conn.waitForData(_readTimeout)) { 1037 if (!_conn.connected) { 1038 return 0; 1039 } 1040 throw new TimeoutException("Timeout receiving data"); 1041 } 1042 } 1043 1044 if(_sslStream.empty) { 1045 return 0; 1046 } 1047 1048 auto chunk = min(_sslStream.leastSize, buff.length); 1049 assert(chunk != 0); 1050 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1051 return chunk; 1052 } 1053 1054 override void close() @trusted { 1055 _sslStream.finalize(); 1056 _conn.close(); 1057 _isOpen = false; 1058 } 1059 @property override bool isOpen() const { 1060 return _conn && _isOpen; 1061 } 1062 override SSLVibeStream accept() { 1063 assert(false, "Must be implemented"); 1064 } 1065 override @property void reuseAddr(bool){ 1066 assert(false, "Not Implemented"); 1067 } 1068 override void bind(Address){ 1069 assert(false, "Not Implemented"); 1070 } 1071 override void listen(int){ 1072 assert(false, "Not Implemented"); 1073 } 1074 } 1075 } 1076 1077 version (vibeD) { 1078 public alias TCPStream = TCPVibeStream; 1079 public alias SSLStream = SSLVibeStream; 1080 } 1081 else { 1082 public alias TCPStream = TCPSocketStream; 1083 public alias SSLStream = SSLSocketStream; 1084 }