1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 19 public class ConnectError: Exception { 20 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 21 super(msg, file, line); 22 } 23 } 24 25 class DecodingExceptioin: Exception { 26 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 27 super(msg, file, line); 28 } 29 } 30 /** 31 * DataPipeIface can accept some data, process, and return processed data. 32 */ 33 public interface DataPipeIface(E) { 34 /// Is there any processed data ready for reading? 35 bool empty(); 36 /// Put next data portion for processing 37 void put(E[]); 38 /// Get any ready data 39 E[] get(); 40 /// Signal on end of incoming data stream. 41 void flush(); 42 } 43 /** 44 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 45 * This class used to combine different Transfer- and Content- encodings. For example: unchunk chunked transfer-encoding, 46 * and uncompress compressed Content-Encoding. 47 */ 48 public class DataPipe(E) : DataPipeIface!E { 49 50 DataPipeIface!(E)[] pipe; 51 Buffer!E buffer; 52 /// Append data processor to pipeline 53 /// Params: 54 /// p = processor 55 void insert(DataPipeIface!E p) { 56 pipe ~= p; 57 } 58 E[][] process(DataPipeIface!E p, E[][] data) { 59 E[][] result; 60 data.each!(e => p.put(e)); 61 while(!p.empty()) result ~= p.get(); 62 return result; 63 } 64 /// Process next data portion. Data passed over pipeline and result stored in buffer. 65 /// Params: 66 /// data = input data array. 67 void put(E[] data) { 68 if ( pipe.empty ) { 69 buffer.put(data); 70 return; 71 } 72 auto t = process(pipe.front, [data]); 73 foreach(ref p; pipe[1..$]) { 74 t = process(p, t); 75 } 76 t.each!(b => buffer.put(b)); 77 } 78 /// Process next data portion. Data passed over pipeline and store result in buffer. 79 /// Params: 80 /// buff = input data buffer. 81 void put(Buffer!E buff) { 82 if ( pipe.empty ) { 83 if ( buffer.__repr is null ) { 84 buffer = buff; 85 return; 86 } 87 buffer.__repr.__buffer ~= buff.__repr.__buffer; 88 buffer.__repr.__length += buff.length; 89 return; 90 } 91 auto t = process(pipe.front, buff.__repr.__buffer); 92 foreach(ref p; pipe[1..$]) { 93 t = process(p, t); 94 } 95 t.each!(b => buffer.put(b)); 96 } 97 /// Get what was collected in internal buffer and clear it. 98 /// Returns: 99 /// data collected. 100 E[] get() pure { 101 if ( buffer.empty ) { 102 return E[].init; 103 } 104 auto res = buffer.data; 105 buffer = Buffer!E.init; 106 return res; 107 } 108 /// Test if internal buffer is empty 109 /// Returns: 110 /// true if internal buffer is empty (nothing to get()) 111 bool empty() pure const @safe { 112 return buffer.empty; 113 } 114 void flush() { 115 E[][] product; 116 foreach(ref p; pipe) { 117 product.each!(e => p.put(e)); 118 p.flush(); 119 product.length = 0; 120 while( !p.empty ) product ~= p.get(); 121 } 122 product.each!(b => buffer.put(b)); 123 } 124 } 125 126 /** 127 * Processor for gzipped/compressed content. 128 * Also support InputRange interface. 129 */ 130 public class Decompressor(E) : DataPipeIface!E { 131 private { 132 Buffer!ubyte __buff; 133 UnCompress __zlib; 134 } 135 this() { 136 __buff = Buffer!ubyte(); 137 __zlib = new UnCompress(); 138 } 139 // this(E[] r) { 140 // //__range = r; 141 // __buff = Buffer!ubyte(); 142 // __zlib = new UnCompress(); 143 // auto l = r.length; 144 // if ( l ) { 145 // __buff.put(__zlib.uncompress(r.take(l))); 146 // } 147 // } 148 override void put(E[] data) { 149 if ( __zlib is null ) { 150 __zlib = new UnCompress(); 151 } 152 __buff.put(__zlib.uncompress(data)); 153 } 154 override E[] get() pure { 155 assert(__buff.length); 156 auto r = __buff.__repr.__buffer[0]; 157 __buff.popFrontN(r.length); 158 return cast(E[])r; 159 } 160 override void flush() { 161 if ( __zlib is null ) { 162 return; 163 } 164 __buff.put(__zlib.flush()); 165 } 166 override @property bool empty() const pure @safe { 167 debug tracef("empty=%b", __buff.empty); 168 return __buff.empty; 169 } 170 @property auto ref front() pure const @safe { 171 debug tracef("front: buff length=%d", __buff.length); 172 return __buff.front; 173 } 174 @property auto popFront() pure @safe { 175 debug tracef("popFront: buff length=%d", __buff.length); 176 return __buff.popFront; 177 } 178 @property void popFrontN(size_t n) pure @safe { 179 __buff.popFrontN(n); 180 } 181 } 182 183 /** 184 * Unchunk chunked http responce body. 185 */ 186 public class DecodeChunked : DataPipeIface!ubyte { 187 // length := 0 188 // read chunk-size, chunk-extension (if any) and CRLF 189 // while (chunk-size > 0) { 190 // read chunk-data and CRLF 191 // append chunk-data to entity-body 192 // length := length + chunk-size 193 // read chunk-size and CRLF 194 // } 195 // read entity-header 196 // while (entity-header not empty) { 197 // append entity-header to existing header fields 198 // read entity-header 199 // } 200 // Content-Length := length 201 // Remove "chunked" from Transfer-Encoding 202 // 203 204 // Chunked-Body = *chunk 205 // last-chunk 206 // trailer 207 // CRLF 208 // 209 // chunk = chunk-size [ chunk-extension ] CRLF 210 // chunk-data CRLF 211 // chunk-size = 1*HEX 212 // last-chunk = 1*("0") [ chunk-extension ] CRLF 213 // 214 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 215 // chunk-ext-name = token 216 // chunk-ext-val = token | quoted-string 217 // chunk-data = chunk-size(OCTET) 218 // trailer = *(entity-header CRLF) 219 220 alias eType = ubyte; 221 immutable eType[] CRLF = ['\r', '\n']; 222 private { 223 enum States {huntingSize, huntingSeparator, receiving, trailer}; 224 char state = States.huntingSize; 225 size_t chunk_size, to_receive; 226 Buffer!ubyte buff; 227 ubyte[] linebuff; 228 } 229 void put(eType[] data) { 230 while ( data.length ) { 231 if ( state == States.trailer ) { 232 return; 233 } 234 if ( state == States.huntingSize ) { 235 linebuff ~= data; 236 data.length = 0; 237 auto s = linebuff.findSplit(CRLF); 238 if ( !s[1].length ) { 239 if ( linebuff.length >= 80 ) { 240 throw new DecodingExceptioin("Can't find chunk size in the body"); 241 } 242 continue; 243 } 244 string x = castFrom!(ubyte[]).to!string(s[0]); 245 formattedRead(x, "%x", &chunk_size); 246 tracef("Got chunk size %s", chunk_size); 247 state = States.receiving; 248 to_receive = chunk_size; 249 data = s[2]; 250 if ( chunk_size == 0 ) { 251 state = States.trailer; 252 tracef("Unchunk completed"); 253 return; 254 } 255 continue; 256 } 257 if ( state == States.receiving ) { 258 if (to_receive > 0 ) { 259 auto can_store = min(to_receive, data.length); 260 buff.put(data[0..can_store]); 261 data = data[can_store..$]; 262 to_receive -= can_store; 263 tracef("Unchunked %d bytes from %d", can_store, chunk_size); 264 if ( to_receive == 0 ) { 265 tracef("switch to huntig separator"); 266 state = States.huntingSeparator; 267 to_receive = 2; 268 linebuff.length = 0; 269 continue; 270 } 271 continue; 272 } 273 assert(false); 274 } 275 if ( state == States.huntingSeparator ) { 276 linebuff ~= data; 277 data.length = 0; 278 auto s = linebuff.findSplit(CRLF); 279 if ( s[1].length ) { 280 data = s[2]; 281 chunk_size = 0; 282 linebuff.length = 0; 283 state = States.huntingSize; 284 tracef("switch to huntig size"); 285 continue; 286 } 287 } 288 } 289 } 290 eType[] get() { 291 auto r = buff.__repr.__buffer[0]; 292 buff.popFrontN(r.length); 293 return r; 294 } 295 void flush() { 296 } 297 bool empty() { 298 debug tracef("empty=%b", buff.empty); 299 return buff.empty; 300 } 301 bool done() { 302 return state==States.trailer; 303 } 304 } 305 306 unittest { 307 info("Testing DataPipe"); 308 globalLogLevel(LogLevel.info); 309 alias eType = char; 310 eType[] gzipped = [ 311 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 312 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 313 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 314 0x08, 0x00, 0x00, 0x00 315 ]; // "abc\ndef\n" 316 auto d = new Decompressor!eType(); 317 d.put(gzipped[0..2]); 318 d.put(gzipped[2..10]); 319 d.put(gzipped[10..$]); 320 d.flush(); 321 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 322 323 auto e = new Decompressor!eType(); 324 e.put(gzipped[0..10]); 325 e.put(gzipped[10..$]); 326 e.flush(); 327 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 328 // writeln(gzipped.decompress.filter!(a => a!='b').array); 329 auto dp = new DataPipe!eType; 330 dp.insert(new Decompressor!eType()); 331 dp.put(gzipped[0..2]); 332 dp.put(gzipped[2..$]); 333 dp.flush(); 334 assert(equal(dp.get(), "abc\ndef\n")); 335 // empty datapipe shoul just pass input to output 336 auto dpu = new DataPipe!ubyte; 337 dpu.put("abcd".dup.representation); 338 dpu.put("efgh".dup.representation); 339 dpu.flush(); 340 assert(equal(dpu.get(), "abcdefgh")); 341 info("Testing DataPipe - done"); 342 } 343 /** 344 * Buffer used to collect and process data from network. It remainds Appender, but support 345 * also Range interface. 346 * $(P To place data in buffer use put() method.) 347 * $(P To retrieve data from buffer you can use several methods:) 348 * $(UL 349 * $(LI Range methods: front, back, index []) 350 * $(LI data method: return collected data (like Appender.data)) 351 * ) 352 */ 353 public struct Buffer(T) { 354 private { 355 class Repr { 356 size_t __length; 357 Unqual!T[][] __buffer; 358 this() { 359 __length = 0; 360 } 361 this(Repr other) { 362 if ( other is null ) 363 return; 364 __length = other.__length; 365 __buffer = other.__buffer.dup; 366 } 367 } 368 Repr __repr; 369 } 370 371 alias toString = data!string; 372 373 this(this) { 374 __repr = new Repr(__repr); 375 } 376 this(U)(U[] data) pure { 377 put(data); 378 } 379 ~this() { 380 __repr = null; 381 } 382 /*************** 383 * store data. Data copied 384 */ 385 auto put(U)(U[] data) pure { 386 if ( data.length == 0 ) { 387 return this; 388 } 389 if ( !__repr ) { 390 __repr = new Repr; 391 } 392 debug tracef("Append %d bytes", data.length); 393 static if (!is(U == T)) { 394 auto d = castFrom!(U[]).to!(T[])(data); 395 __repr.__length += d.length; 396 __repr.__buffer ~= d.dup; 397 } else { 398 __repr.__length += data.length; 399 __repr.__buffer ~= data.dup; 400 } 401 return this; 402 } 403 @property auto opDollar() const pure @safe { 404 return __repr.__length; 405 } 406 @property auto length() const pure @safe { 407 if ( !__repr ) { 408 return 0; 409 } 410 return __repr.__length; 411 } 412 @property auto empty() const pure @safe { 413 return length == 0; 414 } 415 @property auto ref front() const pure @safe { 416 assert(length); 417 return __repr.__buffer.front.front; 418 } 419 @property auto ref back() const pure @safe { 420 assert(length); 421 return __repr.__buffer.back.back; 422 } 423 @property void popFront() pure @safe { 424 assert(length); 425 with ( __repr ) { 426 __buffer.front.popFront; 427 if ( __buffer.front.length == 0 ) { 428 __buffer.popFront; 429 } 430 __length--; 431 } 432 } 433 @property void popFrontN(size_t n) pure @safe { 434 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 435 __repr.__length -= n; 436 while( n ) { 437 if ( n <= __repr.__buffer.front.length ) { 438 __repr.__buffer.front.popFrontN(n); 439 if ( __repr.__buffer.front.length == 0 ) { 440 __repr.__buffer.popFront; 441 } 442 return; 443 } 444 n -= __repr.__buffer.front.length; 445 __repr.__buffer.popFront; 446 } 447 } 448 @property void popBack() pure @safe { 449 assert(length); 450 __repr.__buffer.back.popBack; 451 if ( __repr.__buffer.back.length == 0 ) { 452 __repr.__buffer.popBack; 453 } 454 __repr.__length--; 455 } 456 @property void popBackN(size_t n) pure @safe { 457 assert(n <= length); 458 __repr.__length -= n; 459 while( n ) { 460 if ( n <= __repr.__buffer.back.length ) { 461 __repr.__buffer.back.popBackN(n); 462 if ( __repr.__buffer.back.length == 0 ) { 463 __repr.__buffer.popBack; 464 } 465 return; 466 } 467 n -= __repr.__buffer.back.length; 468 __repr.__buffer.popBack; 469 } 470 } 471 @property auto save() pure @safe { 472 auto n = Buffer!T(); 473 n.__repr = new Repr(__repr); 474 return n; 475 } 476 @property auto ref opIndex(size_t n) const pure @safe { 477 assert( __repr && n < __repr.__length ); 478 foreach(b; __repr.__buffer) { 479 if ( n < b.length ) { 480 return b[n]; 481 } 482 n -= b.length; 483 } 484 assert(false, "Impossible"); 485 } 486 Buffer!T opSlice(size_t m, size_t n) { 487 assert( m <= n && n <= __repr.__length); 488 auto res = Buffer!T(); 489 if ( m == n ) { 490 res.__repr = new Repr; 491 return res; 492 } 493 res.__repr = new Repr(this.__repr); 494 res.popBackN(res.length-n); 495 res.popFrontN(m); 496 return res; 497 } 498 // ptrdiff_t countUntil(in T[] needle) const pure @safe { 499 // ptrdiff_t haystackpos, needlepos; 500 // while(haystackpos < length) { 501 // if ( opIndex(haystackpos) == needle[needlepos] ) { 502 // 503 // return haystackpos; 504 // } else { 505 // needlepos = 0; 506 // haystackpos++; 507 // } 508 // } 509 // return -1; 510 // } 511 @property auto data(U=T[])() const pure { 512 Appender!(T[]) a; 513 if ( __repr && __repr.__buffer ) { 514 foreach(ref b; __repr.__buffer) { 515 a.put(b); 516 } 517 } 518 static if ( is(U==T[]) ) { 519 return a.data; 520 } else { 521 return castFrom!(T[]).to!U(a.data); 522 } 523 } 524 string opCast(string)() { 525 return this.toString; 526 } 527 bool opEquals(U)(U x) { 528 return cast(U)this == x; 529 } 530 } 531 /// 532 public unittest { 533 534 static assert(isInputRange!(Buffer!ubyte)); 535 static assert(isForwardRange!(Buffer!ubyte)); 536 static assert(hasLength!(Buffer!ubyte)); 537 static assert(hasSlicing!(Buffer!ubyte)); 538 static assert(isBidirectionalRange!(Buffer!ubyte)); 539 static assert(isRandomAccessRange!(Buffer!ubyte)); 540 541 auto b = Buffer!ubyte(); 542 b.put("abc".representation.dup); 543 b.put("def".representation.dup); 544 assert(b.length == 6); 545 assert(b.toString == "abcdef"); 546 assert(b.front == 'a'); 547 assert(b.back == 'f'); 548 assert(equal(b[0..$], "abcdef")); 549 assert(equal(b[$-2..$], "ef")); 550 assert(b == "abcdef"); 551 b.popFront; 552 b.popBack; 553 assert(b.front == 'b'); 554 assert(b.back == 'e'); 555 assert(b.length == 4); 556 assert(retro(b).front == 'e'); 557 assert(countUntil(b, 'e') == 3); 558 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 559 assert(equal(b, "bcde")); 560 b.popFront; b.popFront; 561 assert(b.front == 'd'); 562 assert(b.front == b[0]); 563 assert(b.back == b[$-1]); 564 565 auto c = Buffer!ubyte(); 566 c.put("Header0: value0\n".representation.dup); 567 c.put("Header1: value1\n".representation.dup); 568 c.put("Header2: value2\n\nbody".representation.dup); 569 auto c_length = c.length; 570 auto eoh = countUntil(c, "\n\n"); 571 assert(eoh == 47); 572 foreach(header; c[0..eoh].splitter('\n') ) { 573 writeln(castFrom!(ubyte[]).to!(string)(header.data)); 574 } 575 assert(equal(findSplit(c, "\n\n")[2], "body")); 576 assert(c.length == c_length); 577 } 578 579 extern(C) { 580 int SSL_library_init(); 581 void OpenSSL_add_all_ciphers(); 582 void OpenSSL_add_all_digests(); 583 void SSL_load_error_strings(); 584 585 struct SSL {} 586 struct SSL_CTX {} 587 struct SSL_METHOD {} 588 589 SSL_CTX* SSL_CTX_new(const SSL_METHOD* method); 590 SSL* SSL_new(SSL_CTX*); 591 int SSL_set_fd(SSL*, int); 592 int SSL_connect(SSL*); 593 int SSL_write(SSL*, const void*, int); 594 int SSL_read(SSL*, void*, int); 595 int SSL_shutdown(SSL*) @trusted @nogc nothrow; 596 void SSL_free(SSL*); 597 void SSL_CTX_free(SSL_CTX*); 598 599 long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg); 600 601 long SSL_CTX_set_mode(SSL_CTX *ctx, long mode); 602 long SSL_set_mode(SSL *ssl, long mode); 603 604 long SSL_CTX_get_mode(SSL_CTX *ctx); 605 long SSL_get_mode(SSL *ssl); 606 607 SSL_METHOD* SSLv3_client_method(); 608 SSL_METHOD* TLSv1_2_client_method(); 609 SSL_METHOD* TLSv1_client_method(); 610 } 611 612 //pragma(lib, "crypto"); 613 //pragma(lib, "ssl"); 614 615 shared static this() { 616 SSL_library_init(); 617 OpenSSL_add_all_ciphers(); 618 OpenSSL_add_all_digests(); 619 SSL_load_error_strings(); 620 } 621 622 public class OpenSslSocket : Socket { 623 enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 624 private SSL* ssl; 625 private SSL_CTX* ctx; 626 private void initSsl() { 627 //ctx = SSL_CTX_new(SSLv3_client_method()); 628 ctx = SSL_CTX_new(TLSv1_client_method()); 629 assert(ctx !is null); 630 631 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 632 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 633 ssl = SSL_new(ctx); 634 SSL_set_fd(ssl, this.handle); 635 } 636 637 @trusted 638 override void connect(Address to) { 639 super.connect(to); 640 if(SSL_connect(ssl) == -1) 641 throw new Exception("ssl connect failed"); 642 } 643 644 @trusted 645 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) { 646 return SSL_write(ssl, buf.ptr, cast(uint) buf.length); 647 } 648 override ptrdiff_t send(const(void)[] buf) { 649 return send(buf, SocketFlags.NONE); 650 } 651 @trusted 652 override ptrdiff_t receive(void[] buf, SocketFlags flags) { 653 return SSL_read(ssl, buf.ptr, cast(int)buf.length); 654 } 655 override ptrdiff_t receive(void[] buf) { 656 return receive(buf, SocketFlags.NONE); 657 } 658 this(AddressFamily af, SocketType type = SocketType.STREAM) { 659 super(af, type); 660 initSsl(); 661 } 662 663 this(socket_t sock, AddressFamily af) { 664 super(sock, af); 665 initSsl(); 666 } 667 override void close() { 668 //SSL_shutdown(ssl); 669 super.close(); 670 } 671 ~this() { 672 SSL_free(ssl); 673 SSL_CTX_free(ctx); 674 } 675 } 676 677 public abstract class SocketStream { 678 private { 679 Duration timeout; 680 Socket s; 681 bool __isOpen; 682 bool __isConnected; 683 } 684 void open(AddressFamily fa) { 685 } 686 @property ref Socket so() @safe pure { 687 return s; 688 } 689 @property bool isOpen() @safe @nogc pure const { 690 return s && __isOpen; 691 } 692 @property bool isConnected() @safe @nogc pure const { 693 return s && __isConnected; 694 } 695 void close() { 696 tracef("Close socket"); 697 if ( isOpen ) { 698 s.close(); 699 __isOpen = false; 700 __isConnected = false; 701 } 702 s = null; 703 } 704 705 auto connect(string host, ushort port, Duration timeout = 10.seconds) { 706 tracef(format("Create connection to %s:%d", host, port)); 707 Address[] addresses; 708 __isConnected = false; 709 try { 710 addresses = getAddress(host, port); 711 } catch (Exception e) { 712 errorf("Failed to connect: can't resolve %s - %s", host, e.msg); 713 throw new ConnectError("Can't connect to %s:%d: %s".format(host, port, e.msg)); 714 } 715 foreach(a; addresses) { 716 tracef("Trying %s", a); 717 try { 718 open(a.addressFamily); 719 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 720 s.connect(a); 721 tracef("Connected to %s", a); 722 __isConnected = true; 723 break; 724 } catch (SocketException e) { 725 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 726 s.close(); 727 } 728 } 729 if ( !__isConnected ) { 730 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 731 } 732 return this; 733 } 734 735 ptrdiff_t send(const(void)[] buff) @safe 736 in {assert(isConnected);} 737 body { 738 return s.send(buff); 739 } 740 741 ptrdiff_t receive(void[] buff) @safe { 742 auto r = s.receive(buff); 743 if ( r > 0 ) { 744 buff.length = r; 745 } 746 return r; 747 } 748 } 749 750 public class SSLSocketStream: SocketStream { 751 override void open(AddressFamily fa) { 752 if ( s !is null ) { 753 s.close(); 754 } 755 s = new OpenSslSocket(fa); 756 assert(s !is null, "Can't create socket"); 757 __isOpen = true; 758 } 759 } 760 761 public class TCPSocketStream : SocketStream { 762 override void open(AddressFamily fa) { 763 if ( s !is null ) { 764 s.close(); 765 } 766 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 767 assert(s !is null, "Can't create socket"); 768 __isOpen = true; 769 } 770 } 771