1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 19 alias InDataHandler = DataPipeIface!ubyte; 20 21 public class ConnectError: Exception { 22 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 23 super(msg, file, line); 24 } 25 } 26 27 class DecodingExceptioin: Exception { 28 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 29 super(msg, file, line); 30 } 31 } 32 /** 33 * DataPipeIface can accept some data, process, and return processed data. 34 */ 35 public interface DataPipeIface(E) { 36 /// Is there any processed data ready for reading? 37 bool empty(); 38 /// Put next data portion for processing 39 void put(E[]); 40 /// Get any ready data 41 E[] get(); 42 /// Signal on end of incoming data stream. 43 void flush(); 44 } 45 /** 46 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 47 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 48 * and uncompress Content-Encoding "gzip". 49 */ 50 public class DataPipe(E) : DataPipeIface!E { 51 52 DataPipeIface!(E)[] pipe; 53 Buffer!E buffer; 54 /// Append data processor to pipeline 55 /// Params: 56 /// p = processor 57 void insert(DataPipeIface!E p) { 58 pipe ~= p; 59 } 60 E[][] process(DataPipeIface!E p, E[][] data) { 61 E[][] result; 62 data.each!(e => p.put(e)); 63 while(!p.empty()) result ~= p.get(); 64 return result; 65 } 66 /// Process next data portion. Data passed over pipeline and result stored in buffer. 67 /// Params: 68 /// data = input data array. 69 void put(E[] data) { 70 if ( pipe.empty ) { 71 buffer.put(data); 72 return; 73 } 74 auto t = process(pipe.front, [data]); 75 foreach(ref p; pipe[1..$]) { 76 t = process(p, t); 77 } 78 t.each!(b => buffer.put(b)); 79 } 80 /// Process next data portion. Data passed over pipeline and store result in buffer. 81 /// Params: 82 /// buff = input data buffer. 83 void put(Buffer!E buff) { 84 if ( pipe.empty ) { 85 if ( buffer.__repr is null ) { 86 buffer = buff; 87 return; 88 } 89 buffer.__repr.__buffer ~= buff.__repr.__buffer; 90 buffer.__repr.__length += buff.length; 91 return; 92 } 93 auto t = process(pipe.front, buff.__repr.__buffer); 94 foreach(ref p; pipe[1..$]) { 95 t = process(p, t); 96 } 97 t.each!(b => buffer.put(b)); 98 } 99 /// Get what was collected in internal buffer and clear it. 100 /// Returns: 101 /// data collected. 102 E[] get() pure { 103 if ( buffer.empty ) { 104 return E[].init; 105 } 106 auto res = buffer.data; 107 buffer = Buffer!E.init; 108 return res; 109 } 110 /// Test if internal buffer is empty 111 /// Returns: 112 /// true if internal buffer is empty (nothing to get()) 113 bool empty() pure const @safe { 114 return buffer.empty; 115 } 116 void flush() { 117 E[][] product; 118 foreach(ref p; pipe) { 119 product.each!(e => p.put(e)); 120 p.flush(); 121 product.length = 0; 122 while( !p.empty ) product ~= p.get(); 123 } 124 product.each!(b => buffer.put(b)); 125 } 126 } 127 128 /** 129 * Processor for gzipped/compressed content. 130 * Also support InputRange interface. 131 */ 132 public class Decompressor(E) : DataPipeIface!E { 133 private { 134 Buffer!ubyte __buff; 135 UnCompress __zlib; 136 } 137 this() { 138 __buff = Buffer!ubyte(); 139 __zlib = new UnCompress(); 140 } 141 // this(E[] r) { 142 // //__range = r; 143 // __buff = Buffer!ubyte(); 144 // __zlib = new UnCompress(); 145 // auto l = r.length; 146 // if ( l ) { 147 // __buff.put(__zlib.uncompress(r.take(l))); 148 // } 149 // } 150 override void put(E[] data) { 151 if ( __zlib is null ) { 152 __zlib = new UnCompress(); 153 } 154 __buff.put(__zlib.uncompress(data)); 155 } 156 override E[] get() pure { 157 assert(__buff.length); 158 auto r = __buff.__repr.__buffer[0]; 159 __buff.popFrontN(r.length); 160 return cast(E[])r; 161 } 162 override void flush() { 163 if ( __zlib is null ) { 164 return; 165 } 166 __buff.put(__zlib.flush()); 167 } 168 override @property bool empty() const pure @safe { 169 debug tracef("empty=%b", __buff.empty); 170 return __buff.empty; 171 } 172 @property auto ref front() pure const @safe { 173 debug tracef("front: buff length=%d", __buff.length); 174 return __buff.front; 175 } 176 @property auto popFront() pure @safe { 177 debug tracef("popFront: buff length=%d", __buff.length); 178 return __buff.popFront; 179 } 180 @property void popFrontN(size_t n) pure @safe { 181 __buff.popFrontN(n); 182 } 183 } 184 185 /** 186 * Unchunk chunked http responce body. 187 */ 188 public class DecodeChunked : DataPipeIface!ubyte { 189 // length := 0 190 // read chunk-size, chunk-extension (if any) and CRLF 191 // while (chunk-size > 0) { 192 // read chunk-data and CRLF 193 // append chunk-data to entity-body 194 // length := length + chunk-size 195 // read chunk-size and CRLF 196 // } 197 // read entity-header 198 // while (entity-header not empty) { 199 // append entity-header to existing header fields 200 // read entity-header 201 // } 202 // Content-Length := length 203 // Remove "chunked" from Transfer-Encoding 204 // 205 206 // Chunked-Body = *chunk 207 // last-chunk 208 // trailer 209 // CRLF 210 // 211 // chunk = chunk-size [ chunk-extension ] CRLF 212 // chunk-data CRLF 213 // chunk-size = 1*HEX 214 // last-chunk = 1*("0") [ chunk-extension ] CRLF 215 // 216 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 217 // chunk-ext-name = token 218 // chunk-ext-val = token | quoted-string 219 // chunk-data = chunk-size(OCTET) 220 // trailer = *(entity-header CRLF) 221 222 alias eType = ubyte; 223 immutable eType[] CRLF = ['\r', '\n']; 224 private { 225 enum States {huntingSize, huntingSeparator, receiving, trailer}; 226 char state = States.huntingSize; 227 size_t chunk_size, to_receive; 228 Buffer!ubyte buff; 229 ubyte[] linebuff; 230 } 231 void put(eType[] data) { 232 while ( data.length ) { 233 if ( state == States.trailer ) { 234 return; 235 } 236 if ( state == States.huntingSize ) { 237 linebuff ~= data; 238 data.length = 0; 239 auto s = linebuff.findSplit(CRLF); 240 if ( !s[1].length ) { 241 if ( linebuff.length >= 80 ) { 242 throw new DecodingExceptioin("Can't find chunk size in the body"); 243 } 244 continue; 245 } 246 string x = castFrom!(ubyte[]).to!string(s[0]); 247 formattedRead(x, "%x", &chunk_size); 248 tracef("Got chunk size %s", chunk_size); 249 state = States.receiving; 250 to_receive = chunk_size; 251 data = s[2]; 252 if ( chunk_size == 0 ) { 253 state = States.trailer; 254 tracef("Unchunk completed"); 255 return; 256 } 257 continue; 258 } 259 if ( state == States.receiving ) { 260 if (to_receive > 0 ) { 261 auto can_store = min(to_receive, data.length); 262 buff.put(data[0..can_store]); 263 data = data[can_store..$]; 264 to_receive -= can_store; 265 tracef("Unchunked %d bytes from %d", can_store, chunk_size); 266 if ( to_receive == 0 ) { 267 tracef("switch to huntig separator"); 268 state = States.huntingSeparator; 269 to_receive = 2; 270 linebuff.length = 0; 271 continue; 272 } 273 continue; 274 } 275 assert(false); 276 } 277 if ( state == States.huntingSeparator ) { 278 linebuff ~= data; 279 data.length = 0; 280 auto s = linebuff.findSplit(CRLF); 281 if ( s[1].length ) { 282 data = s[2]; 283 chunk_size = 0; 284 linebuff.length = 0; 285 state = States.huntingSize; 286 tracef("switch to huntig size"); 287 continue; 288 } 289 } 290 } 291 } 292 eType[] get() { 293 auto r = buff.__repr.__buffer[0]; 294 buff.popFrontN(r.length); 295 return r; 296 } 297 void flush() { 298 } 299 bool empty() { 300 debug tracef("empty=%b", buff.empty); 301 return buff.empty; 302 } 303 bool done() { 304 return state==States.trailer; 305 } 306 } 307 308 unittest { 309 info("Testing DataPipe"); 310 globalLogLevel(LogLevel.info); 311 alias eType = char; 312 eType[] gzipped = [ 313 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 314 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 315 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 316 0x08, 0x00, 0x00, 0x00 317 ]; // "abc\ndef\n" 318 auto d = new Decompressor!eType(); 319 d.put(gzipped[0..2]); 320 d.put(gzipped[2..10]); 321 d.put(gzipped[10..$]); 322 d.flush(); 323 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 324 325 auto e = new Decompressor!eType(); 326 e.put(gzipped[0..10]); 327 e.put(gzipped[10..$]); 328 e.flush(); 329 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 330 // writeln(gzipped.decompress.filter!(a => a!='b').array); 331 auto dp = new DataPipe!eType; 332 dp.insert(new Decompressor!eType()); 333 dp.put(gzipped[0..2]); 334 dp.put(gzipped[2..$]); 335 dp.flush(); 336 assert(equal(dp.get(), "abc\ndef\n")); 337 // empty datapipe shoul just pass input to output 338 auto dpu = new DataPipe!ubyte; 339 dpu.put("abcd".dup.representation); 340 dpu.put("efgh".dup.representation); 341 dpu.flush(); 342 assert(equal(dpu.get(), "abcdefgh")); 343 info("Testing DataPipe - done"); 344 } 345 /** 346 * Buffer used to collect and process data from network. It remainds Appender, but support 347 * also Range interface. 348 * $(P To place data in buffer use put() method.) 349 * $(P To retrieve data from buffer you can use several methods:) 350 * $(UL 351 * $(LI Range methods: front, back, index []) 352 * $(LI data method: return collected data (like Appender.data)) 353 * ) 354 */ 355 public struct Buffer(T) { 356 private { 357 class Repr { 358 size_t __length; 359 Unqual!T[][] __buffer; 360 this() { 361 __length = 0; 362 } 363 this(Repr other) { 364 if ( other is null ) 365 return; 366 __length = other.__length; 367 __buffer = other.__buffer.dup; 368 } 369 } 370 Repr __repr; 371 } 372 373 alias toString = data!string; 374 375 this(this) { 376 __repr = new Repr(__repr); 377 } 378 this(U)(U[] data) pure { 379 put(data); 380 } 381 ~this() { 382 __repr = null; 383 } 384 /*************** 385 * store data. Data copied 386 */ 387 auto put(U)(U[] data) pure { 388 if ( data.length == 0 ) { 389 return this; 390 } 391 if ( !__repr ) { 392 __repr = new Repr; 393 } 394 debug tracef("Append %d bytes", data.length); 395 static if (!is(U == T)) { 396 auto d = castFrom!(U[]).to!(T[])(data); 397 __repr.__length += d.length; 398 __repr.__buffer ~= d.dup; 399 } else { 400 __repr.__length += data.length; 401 __repr.__buffer ~= data.dup; 402 } 403 return this; 404 } 405 @property auto opDollar() const pure @safe { 406 return __repr.__length; 407 } 408 @property auto length() const pure @safe { 409 if ( !__repr ) { 410 return 0; 411 } 412 return __repr.__length; 413 } 414 @property auto empty() const pure @safe { 415 return length == 0; 416 } 417 @property auto ref front() const pure @safe { 418 assert(length); 419 return __repr.__buffer.front.front; 420 } 421 @property auto ref back() const pure @safe { 422 assert(length); 423 return __repr.__buffer.back.back; 424 } 425 @property void popFront() pure @safe { 426 assert(length); 427 with ( __repr ) { 428 __buffer.front.popFront; 429 if ( __buffer.front.length == 0 ) { 430 __buffer.popFront; 431 } 432 __length--; 433 } 434 } 435 @property void popFrontN(size_t n) pure @safe { 436 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 437 __repr.__length -= n; 438 while( n ) { 439 if ( n <= __repr.__buffer.front.length ) { 440 __repr.__buffer.front.popFrontN(n); 441 if ( __repr.__buffer.front.length == 0 ) { 442 __repr.__buffer.popFront; 443 } 444 return; 445 } 446 n -= __repr.__buffer.front.length; 447 __repr.__buffer.popFront; 448 } 449 } 450 @property void popBack() pure @safe { 451 assert(length); 452 __repr.__buffer.back.popBack; 453 if ( __repr.__buffer.back.length == 0 ) { 454 __repr.__buffer.popBack; 455 } 456 __repr.__length--; 457 } 458 @property void popBackN(size_t n) pure @safe { 459 assert(n <= length); 460 __repr.__length -= n; 461 while( n ) { 462 if ( n <= __repr.__buffer.back.length ) { 463 __repr.__buffer.back.popBackN(n); 464 if ( __repr.__buffer.back.length == 0 ) { 465 __repr.__buffer.popBack; 466 } 467 return; 468 } 469 n -= __repr.__buffer.back.length; 470 __repr.__buffer.popBack; 471 } 472 } 473 @property auto save() pure @safe { 474 auto n = Buffer!T(); 475 n.__repr = new Repr(__repr); 476 return n; 477 } 478 @property auto ref opIndex(size_t n) const pure @safe { 479 assert( __repr && n < __repr.__length ); 480 foreach(b; __repr.__buffer) { 481 if ( n < b.length ) { 482 return b[n]; 483 } 484 n -= b.length; 485 } 486 assert(false, "Impossible"); 487 } 488 Buffer!T opSlice(size_t m, size_t n) { 489 assert( m <= n && n <= __repr.__length); 490 auto res = Buffer!T(); 491 if ( m == n ) { 492 res.__repr = new Repr; 493 return res; 494 } 495 res.__repr = new Repr(this.__repr); 496 res.popBackN(res.length-n); 497 res.popFrontN(m); 498 return res; 499 } 500 // ptrdiff_t countUntil(in T[] needle) const pure @safe { 501 // ptrdiff_t haystackpos, needlepos; 502 // while(haystackpos < length) { 503 // if ( opIndex(haystackpos) == needle[needlepos] ) { 504 // 505 // return haystackpos; 506 // } else { 507 // needlepos = 0; 508 // haystackpos++; 509 // } 510 // } 511 // return -1; 512 // } 513 @property auto data(U=T[])() const pure { 514 Appender!(T[]) a; 515 if ( __repr && __repr.__buffer ) { 516 foreach(ref b; __repr.__buffer) { 517 a.put(b); 518 } 519 } 520 static if ( is(U==T[]) ) { 521 return a.data; 522 } else { 523 return castFrom!(T[]).to!U(a.data); 524 } 525 } 526 string opCast(string)() { 527 return this.toString; 528 } 529 bool opEquals(U)(U x) { 530 return cast(U)this == x; 531 } 532 } 533 /// 534 public unittest { 535 536 static assert(isInputRange!(Buffer!ubyte)); 537 static assert(isForwardRange!(Buffer!ubyte)); 538 static assert(hasLength!(Buffer!ubyte)); 539 static assert(hasSlicing!(Buffer!ubyte)); 540 static assert(isBidirectionalRange!(Buffer!ubyte)); 541 static assert(isRandomAccessRange!(Buffer!ubyte)); 542 543 auto b = Buffer!ubyte(); 544 b.put("abc".representation.dup); 545 b.put("def".representation.dup); 546 assert(b.length == 6); 547 assert(b.toString == "abcdef"); 548 assert(b.front == 'a'); 549 assert(b.back == 'f'); 550 assert(equal(b[0..$], "abcdef")); 551 assert(equal(b[$-2..$], "ef")); 552 assert(b == "abcdef"); 553 b.popFront; 554 b.popBack; 555 assert(b.front == 'b'); 556 assert(b.back == 'e'); 557 assert(b.length == 4); 558 assert(retro(b).front == 'e'); 559 assert(countUntil(b, 'e') == 3); 560 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 561 assert(equal(b, "bcde")); 562 b.popFront; b.popFront; 563 assert(b.front == 'd'); 564 assert(b.front == b[0]); 565 assert(b.back == b[$-1]); 566 567 auto c = Buffer!ubyte(); 568 c.put("Header0: value0\n".representation.dup); 569 c.put("Header1: value1\n".representation.dup); 570 c.put("Header2: value2\n\nbody".representation.dup); 571 auto c_length = c.length; 572 auto eoh = countUntil(c, "\n\n"); 573 assert(eoh == 47); 574 foreach(header; c[0..eoh].splitter('\n') ) { 575 writeln(castFrom!(ubyte[]).to!(string)(header.data)); 576 } 577 assert(equal(findSplit(c, "\n\n")[2], "body")); 578 assert(c.length == c_length); 579 } 580 581 extern(C) { 582 int SSL_library_init(); 583 void OpenSSL_add_all_ciphers(); 584 void OpenSSL_add_all_digests(); 585 void SSL_load_error_strings(); 586 587 struct SSL {} 588 struct SSL_CTX {} 589 struct SSL_METHOD {} 590 591 SSL_CTX* SSL_CTX_new(const SSL_METHOD* method); 592 SSL* SSL_new(SSL_CTX*); 593 int SSL_set_fd(SSL*, int); 594 int SSL_connect(SSL*); 595 int SSL_write(SSL*, const void*, int); 596 int SSL_read(SSL*, void*, int); 597 int SSL_shutdown(SSL*) @trusted @nogc nothrow; 598 void SSL_free(SSL*); 599 void SSL_CTX_free(SSL_CTX*); 600 601 long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg); 602 603 long SSL_CTX_set_mode(SSL_CTX *ctx, long mode); 604 long SSL_set_mode(SSL *ssl, long mode); 605 606 long SSL_CTX_get_mode(SSL_CTX *ctx); 607 long SSL_get_mode(SSL *ssl); 608 609 SSL_METHOD* SSLv3_client_method(); 610 SSL_METHOD* TLSv1_2_client_method(); 611 SSL_METHOD* TLSv1_client_method(); 612 } 613 614 //pragma(lib, "crypto"); 615 //pragma(lib, "ssl"); 616 617 shared static this() { 618 SSL_library_init(); 619 OpenSSL_add_all_ciphers(); 620 OpenSSL_add_all_digests(); 621 SSL_load_error_strings(); 622 } 623 624 public class OpenSslSocket : Socket { 625 enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 626 private SSL* ssl; 627 private SSL_CTX* ctx; 628 private void initSsl() { 629 //ctx = SSL_CTX_new(SSLv3_client_method()); 630 ctx = SSL_CTX_new(TLSv1_client_method()); 631 assert(ctx !is null); 632 633 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 634 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 635 ssl = SSL_new(ctx); 636 SSL_set_fd(ssl, this.handle); 637 } 638 639 @trusted 640 override void connect(Address to) { 641 super.connect(to); 642 if(SSL_connect(ssl) == -1) 643 throw new Exception("ssl connect failed"); 644 } 645 646 @trusted 647 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) { 648 return SSL_write(ssl, buf.ptr, cast(uint) buf.length); 649 } 650 override ptrdiff_t send(const(void)[] buf) { 651 return send(buf, SocketFlags.NONE); 652 } 653 @trusted 654 override ptrdiff_t receive(void[] buf, SocketFlags flags) { 655 return SSL_read(ssl, buf.ptr, cast(int)buf.length); 656 } 657 override ptrdiff_t receive(void[] buf) { 658 return receive(buf, SocketFlags.NONE); 659 } 660 this(AddressFamily af, SocketType type = SocketType.STREAM) { 661 super(af, type); 662 initSsl(); 663 } 664 665 this(socket_t sock, AddressFamily af) { 666 super(sock, af); 667 initSsl(); 668 } 669 override void close() { 670 //SSL_shutdown(ssl); 671 super.close(); 672 } 673 ~this() { 674 SSL_free(ssl); 675 SSL_CTX_free(ctx); 676 } 677 } 678 679 public abstract class SocketStream { 680 private { 681 Duration timeout; 682 Socket s; 683 bool __isOpen; 684 bool __isConnected; 685 } 686 void open(AddressFamily fa) { 687 } 688 @property ref Socket so() @safe pure { 689 return s; 690 } 691 @property bool isOpen() @safe @nogc pure const { 692 return s && __isOpen; 693 } 694 @property bool isConnected() @safe @nogc pure const { 695 return s && __isConnected; 696 } 697 void close() @trusted { 698 tracef("Close socket"); 699 if ( isOpen ) { 700 s.close(); 701 __isOpen = false; 702 __isConnected = false; 703 } 704 s = null; 705 } 706 707 auto connect(string host, ushort port, Duration timeout = 10.seconds) { 708 tracef(format("Create connection to %s:%d", host, port)); 709 Address[] addresses; 710 __isConnected = false; 711 try { 712 addresses = getAddress(host, port); 713 } catch (Exception e) { 714 errorf("Failed to connect: can't resolve %s - %s", host, e.msg); 715 throw new ConnectError("Can't connect to %s:%d: %s".format(host, port, e.msg)); 716 } 717 foreach(a; addresses) { 718 tracef("Trying %s", a); 719 try { 720 open(a.addressFamily); 721 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 722 s.connect(a); 723 tracef("Connected to %s", a); 724 __isConnected = true; 725 break; 726 } catch (SocketException e) { 727 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 728 s.close(); 729 } 730 } 731 if ( !__isConnected ) { 732 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 733 } 734 return this; 735 } 736 737 ptrdiff_t send(const(void)[] buff) @safe 738 in {assert(isConnected);} 739 body { 740 return s.send(buff); 741 } 742 743 ptrdiff_t receive(void[] buff) @safe { 744 auto r = s.receive(buff); 745 if ( r > 0 ) { 746 buff.length = r; 747 } 748 return r; 749 } 750 } 751 752 public class SSLSocketStream: SocketStream { 753 override void open(AddressFamily fa) { 754 if ( s !is null ) { 755 s.close(); 756 } 757 s = new OpenSslSocket(fa); 758 assert(s !is null, "Can't create socket"); 759 __isOpen = true; 760 } 761 } 762 763 public class TCPSocketStream : SocketStream { 764 override void open(AddressFamily fa) { 765 if ( s !is null ) { 766 s.close(); 767 } 768 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 769 assert(s !is null, "Can't create socket"); 770 __isOpen = true; 771 } 772 } 773