1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 20 import requests.ssl_adapter : openssl, SSL, SSL_CTX; 21 22 alias InDataHandler = DataPipeIface!ubyte; 23 24 public class ConnectError: Exception { 25 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 26 super(message, file, line, next); 27 } 28 } 29 30 class DecodingException: Exception { 31 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 32 super(message, file, line, next); 33 } 34 } 35 36 public class TimeoutException: Exception { 37 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 38 super(message, file, line, next); 39 } 40 } 41 42 public class NetworkException: Exception { 43 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 44 super(message, file, line, next); 45 } 46 } 47 48 /** 49 * DataPipeIface can accept some data, process, and return processed data. 50 */ 51 public interface DataPipeIface(E) { 52 /// Is there any processed data ready for reading? 53 bool empty(); 54 /// Put next data portion for processing 55 //void put(E[]); 56 void putNoCopy(E[]); 57 /// Get any ready data 58 E[] get(); 59 /// Signal on end of incoming data stream. 60 void flush(); 61 } 62 /** 63 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 64 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 65 * and uncompress Content-Encoding "gzip". 66 */ 67 public class DataPipe(E) : DataPipeIface!E { 68 69 DataPipeIface!(E)[] pipe; 70 Buffer!E buffer; 71 /// Append data processor to pipeline 72 /// Params: 73 /// p = processor 74 final void insert(DataPipeIface!E p) { 75 pipe ~= p; 76 } 77 final E[][] process(DataPipeIface!E p, E[][] data) { 78 E[][] result; 79 data.each!(e => p.putNoCopy(e)); 80 while(!p.empty()) result ~= p.get(); 81 return result; 82 } 83 /// Process next data portion. Data passed over pipeline and store result in buffer. 84 /// Params: 85 /// data = input data buffer. 86 /// NoCopy means we do not copy data to buffer, we keep reference 87 final void putNoCopy(E[] data) { 88 if ( pipe.empty ) { 89 buffer.putNoCopy(data); 90 return; 91 } 92 try { 93 auto t = process(pipe.front, [data]); 94 foreach(ref p; pipe[1..$]) { 95 t = process(p, t); 96 } 97 t.each!(b => buffer.putNoCopy(b)); 98 } 99 catch (Exception e) { 100 throw new DecodingException(e.msg); 101 } 102 } 103 /// Get what was collected in internal buffer and clear it. 104 /// Returns: 105 /// data collected. 106 final E[] get() { 107 if ( buffer.empty ) { 108 return E[].init; 109 } 110 auto res = buffer.data; 111 buffer = Buffer!E.init; 112 return res; 113 } 114 /// 115 /// get without datamove. but user receive [][] 116 /// 117 final E[][] getNoCopy() { 118 if ( buffer.empty ) { 119 return E[][].init; 120 } 121 E[][] res = buffer.__repr.__buffer; 122 buffer = Buffer!E.init; 123 return res; 124 } 125 /// Test if internal buffer is empty 126 /// Returns: 127 /// true if internal buffer is empty (nothing to get()) 128 final bool empty() pure const @safe { 129 return buffer.empty; 130 } 131 final void flush() { 132 E[][] product; 133 foreach(ref p; pipe) { 134 product.each!(e => p.putNoCopy(e)); 135 p.flush(); 136 product.length = 0; 137 while( !p.empty ) product ~= p.get(); 138 } 139 product.each!(b => buffer.putNoCopy(b)); 140 } 141 } 142 143 /** 144 * Processor for gzipped/compressed content. 145 * Also support InputRange interface. 146 */ 147 public class Decompressor(E) : DataPipeIface!E { 148 private { 149 Buffer!ubyte __buff; 150 UnCompress __zlib; 151 } 152 this() { 153 __buff = Buffer!ubyte(); 154 __zlib = new UnCompress(); 155 } 156 final override void putNoCopy(E[] data) { 157 if ( __zlib is null ) { 158 __zlib = new UnCompress(); 159 } 160 __buff.putNoCopy(__zlib.uncompress(data)); 161 } 162 final override E[] get() pure { 163 assert(__buff.length); 164 auto r = __buff.__repr.__buffer[0]; 165 __buff.popFrontN(r.length); 166 return cast(E[])r; 167 } 168 final override void flush() { 169 if ( __zlib is null ) { 170 return; 171 } 172 __buff.put(__zlib.flush()); 173 } 174 final override @property bool empty() const pure @safe { 175 debug(requests) tracef("empty=%b", __buff.empty); 176 return __buff.empty; 177 } 178 final @property auto ref front() pure const @safe { 179 debug(requests) tracef("front: buff length=%d", __buff.length); 180 return __buff.front; 181 } 182 final @property auto popFront() pure @safe { 183 debug(requests) tracef("popFront: buff length=%d", __buff.length); 184 return __buff.popFront; 185 } 186 final @property void popFrontN(size_t n) pure @safe { 187 __buff.popFrontN(n); 188 } 189 } 190 191 /** 192 * Unchunk chunked http responce body. 193 */ 194 public class DecodeChunked : DataPipeIface!ubyte { 195 // length := 0 196 // read chunk-size, chunk-extension (if any) and CRLF 197 // while (chunk-size > 0) { 198 // read chunk-data and CRLF 199 // append chunk-data to entity-body 200 // length := length + chunk-size 201 // read chunk-size and CRLF 202 // } 203 // read entity-header 204 // while (entity-header not empty) { 205 // append entity-header to existing header fields 206 // read entity-header 207 // } 208 // Content-Length := length 209 // Remove "chunked" from Transfer-Encoding 210 // 211 212 // Chunked-Body = *chunk 213 // last-chunk 214 // trailer 215 // CRLF 216 // 217 // chunk = chunk-size [ chunk-extension ] CRLF 218 // chunk-data CRLF 219 // chunk-size = 1*HEX 220 // last-chunk = 1*("0") [ chunk-extension ] CRLF 221 // 222 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 223 // chunk-ext-name = token 224 // chunk-ext-val = token | quoted-string 225 // chunk-data = chunk-size(OCTET) 226 // trailer = *(entity-header CRLF) 227 228 alias eType = ubyte; 229 immutable eType[] CRLF = ['\r', '\n']; 230 private { 231 enum States {huntingSize, huntingSeparator, receiving, trailer}; 232 char state = States.huntingSize; 233 size_t chunk_size, to_receive; 234 Buffer!ubyte buff; 235 ubyte[] linebuff; 236 } 237 final void putNoCopy(eType[] data) { 238 while ( data.length ) { 239 if ( state == States.trailer ) { 240 to_receive = to_receive - min(to_receive, data.length); 241 return; 242 } 243 if ( state == States.huntingSize ) { 244 import std.ascii; 245 ubyte[10] digits; 246 int i; 247 for(i=0;i<data.length;i++) { 248 ubyte v = data[i]; 249 digits[i] = v; 250 if ( v == '\n' ) { 251 i+=1; 252 break; 253 } 254 } 255 linebuff ~= digits[0..i]; 256 if ( linebuff.length >= 80 ) { 257 throw new DecodingException("Can't find chunk size in the body"); 258 } 259 data = data[i..$]; 260 if (!linebuff.canFind(CRLF)) { 261 continue; 262 } 263 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 264 state = States.receiving; 265 to_receive = chunk_size; 266 if ( chunk_size == 0 ) { 267 to_receive = 2-min(2, data.length); // trailing \r\n 268 state = States.trailer; 269 return; 270 } 271 continue; 272 } 273 if ( state == States.receiving ) { 274 if (to_receive > 0 ) { 275 auto can_store = min(to_receive, data.length); 276 buff.putNoCopy(data[0..can_store]); 277 data = data[can_store..$]; 278 to_receive -= can_store; 279 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 280 if ( to_receive == 0 ) { 281 //tracef("switch to huntig separator"); 282 state = States.huntingSeparator; 283 continue; 284 } 285 continue; 286 } 287 assert(false); 288 } 289 if ( state == States.huntingSeparator ) { 290 if ( data[0] == '\n' || data[0]=='\r') { 291 data = data[1..$]; 292 continue; 293 } 294 state = States.huntingSize; 295 linebuff.length = 0; 296 continue; 297 } 298 } 299 } 300 final eType[] get() { 301 auto r = buff.__repr.__buffer[0]; 302 buff.popFrontN(r.length); 303 return r; 304 } 305 final void flush() { 306 } 307 final bool empty() { 308 debug(requests) tracef("empty=%b", buff.empty); 309 return buff.empty; 310 } 311 final bool done() { 312 return state==States.trailer && to_receive==0; 313 } 314 } 315 316 unittest { 317 info("Testing DataPipe"); 318 globalLogLevel(LogLevel.info); 319 alias eType = char; 320 eType[] gzipped = [ 321 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 322 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 323 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 324 0x08, 0x00, 0x00, 0x00 325 ]; // "abc\ndef\n" 326 auto d = new Decompressor!eType(); 327 d.putNoCopy(gzipped[0..2].dup); 328 d.putNoCopy(gzipped[2..10].dup); 329 d.putNoCopy(gzipped[10..$].dup); 330 d.flush(); 331 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 332 333 auto e = new Decompressor!eType(); 334 e.putNoCopy(gzipped[0..10].dup); 335 e.putNoCopy(gzipped[10..$].dup); 336 e.flush(); 337 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 338 // writeln(gzipped.decompress.filter!(a => a!='b').array); 339 auto dp = new DataPipe!eType; 340 dp.insert(new Decompressor!eType()); 341 dp.putNoCopy(gzipped[0..2].dup); 342 dp.putNoCopy(gzipped[2..$].dup); 343 dp.flush(); 344 assert(equal(dp.get(), "abc\ndef\n")); 345 // empty datapipe shoul just pass input to output 346 auto dpu = new DataPipe!ubyte; 347 dpu.putNoCopy("abcd".dup.representation); 348 dpu.putNoCopy("efgh".dup.representation); 349 dpu.flush(); 350 assert(equal(dpu.get(), "abcdefgh")); 351 info("Test unchunker properties"); 352 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation; 353 ubyte[][] result; 354 auto uc = new DecodeChunked(); 355 uc.putNoCopy(twoChunks); 356 while(!uc.empty) { 357 result ~= uc.get(); 358 } 359 assert(equal(result[0], ['1', '2'])); 360 assert(equal(result[1], ['3', '4'])); 361 info("unchunker correctness - ok"); 362 result[0][0] = '5'; 363 assert(twoChunks[3] == '5'); 364 info("unchunker zero copy - ok"); 365 info("Testing DataPipe - done"); 366 } 367 /** 368 * Buffer used to collect and process data from network. It remainds Appender, but support 369 * also Range interface. 370 * $(P To place data in buffer use put() method.) 371 * $(P To retrieve data from buffer you can use several methods:) 372 * $(UL 373 * $(LI Range methods: front, back, index []) 374 * $(LI data method: return collected data (like Appender.data)) 375 * ) 376 */ 377 static this() { 378 } 379 static ~this() { 380 } 381 enum CACHESIZE = 1024; 382 383 static long reprAlloc; 384 static long reprCacheHit; 385 static long reprCacheRequests; 386 387 388 public struct Buffer(T) { 389 // static Repr[CACHESIZE] cache; 390 // static uint cacheIndex; 391 392 private { 393 Repr cachedOrNew() { 394 return new Repr; 395 // reprCacheRequests++; 396 // if ( false && cacheIndex>0 ) { 397 // reprCacheHit++; 398 // cacheIndex -= 1; 399 // return cache[cacheIndex]; 400 // } else { 401 // return new Repr; 402 // } 403 } 404 class Repr { 405 size_t __length; 406 Unqual!T[][] __buffer; 407 this() { 408 reprAlloc++; 409 __length = 0; 410 } 411 this(Repr other) { 412 reprAlloc++; 413 if ( other is null ) 414 return; 415 __length = other.__length; 416 __buffer = other.__buffer.dup; 417 } 418 } 419 Repr __repr; 420 } 421 422 alias toString = data!string; 423 424 this(this) { 425 if ( !__repr ) { 426 return; 427 } 428 __repr = new Repr(__repr); 429 } 430 this(U)(U[] data) { 431 put(data); 432 } 433 ~this() { 434 __repr = null; 435 } 436 /*************** 437 * store data. Data copied 438 */ 439 auto put(U)(U[] data) { 440 if ( data.length == 0 ) { 441 return; 442 } 443 if ( !__repr ) { 444 __repr = cachedOrNew(); 445 } 446 static if (!is(U == T)) { 447 auto d = cast(T[])(data); 448 __repr.__length += d.length; 449 __repr.__buffer ~= d.dup; 450 } else { 451 __repr.__length += data.length; 452 __repr.__buffer ~= data.dup; 453 } 454 return; 455 } 456 auto putNoCopy(U)(U[] data) { 457 if ( data.length == 0 ) { 458 return; 459 } 460 if ( !__repr ) { 461 __repr = cachedOrNew(); 462 } 463 static if (!is(U == T)) { 464 auto d = cast(T[])(data); 465 __repr.__length += d.length; 466 __repr.__buffer ~= d; 467 } else { 468 __repr.__length += data.length; 469 __repr.__buffer ~= data; 470 } 471 return; 472 } 473 @property auto opDollar() const pure @safe { 474 return __repr.__length; 475 } 476 @property size_t length() const pure @safe { 477 if ( !__repr ) { 478 return 0; 479 } 480 return __repr.__length; 481 } 482 @property auto empty() const pure @safe { 483 return length == 0; 484 } 485 @property auto ref front() const pure @safe { 486 assert(length); 487 return __repr.__buffer.front.front; 488 } 489 @property auto ref back() const pure @safe { 490 assert(length); 491 return __repr.__buffer.back.back; 492 } 493 @property void popFront() pure @safe { 494 assert(length); 495 with ( __repr ) { 496 __buffer.front.popFront; 497 if ( __buffer.front.length == 0 ) { 498 __buffer.popFront; 499 } 500 __length--; 501 } 502 } 503 @property void popFrontN(size_t n) pure @safe { 504 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 505 __repr.__length -= n; 506 while( n ) { 507 if ( n <= __repr.__buffer.front.length ) { 508 __repr.__buffer.front.popFrontN(n); 509 if ( __repr.__buffer.front.length == 0 ) { 510 __repr.__buffer.popFront; 511 } 512 return; 513 } 514 n -= __repr.__buffer.front.length; 515 __repr.__buffer.popFront; 516 } 517 } 518 @property void popBack() pure @safe { 519 assert(length); 520 __repr.__buffer.back.popBack; 521 if ( __repr.__buffer.back.length == 0 ) { 522 __repr.__buffer.popBack; 523 } 524 __repr.__length--; 525 } 526 @property void popBackN(size_t n) pure @safe { 527 assert(n <= length, "n: %d, length: %d".format(n, length)); 528 __repr.__length -= n; 529 while( n ) { 530 if ( n <= __repr.__buffer.back.length ) { 531 __repr.__buffer.back.popBackN(n); 532 if ( __repr.__buffer.back.length == 0 ) { 533 __repr.__buffer.popBack; 534 } 535 return; 536 } 537 n -= __repr.__buffer.back.length; 538 __repr.__buffer.popBack; 539 } 540 } 541 @property auto save() @safe { 542 auto n = Buffer!T(); 543 n.__repr = new Repr(__repr); 544 return n; 545 } 546 @property auto ref opIndex(size_t n) const pure @safe { 547 assert( __repr && n < __repr.__length ); 548 foreach(b; __repr.__buffer) { 549 if ( n < b.length ) { 550 return b[n]; 551 } 552 n -= b.length; 553 } 554 assert(false, "Impossible"); 555 } 556 Buffer!T opSlice(size_t m, size_t n) { 557 if ( empty || m == n ) { 558 return Buffer!T(); 559 } 560 assert( m <= n && n <= __repr.__length); 561 auto res = this.save(); 562 res.popBackN(res.__repr.__length-n); 563 res.popFrontN(m); 564 return res; 565 } 566 @property auto data(U=T[])() pure { 567 static if ( is(U==T[]) ) { 568 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 569 return __repr.__buffer.front; 570 } 571 } 572 Appender!(T[]) a; 573 if ( __repr && __repr.__buffer ) { 574 foreach(ref b; __repr.__buffer) { 575 a.put(b); 576 } 577 } 578 static if ( is(U==T[]) ) { 579 return a.data; 580 } else { 581 return cast(U)a.data; 582 } 583 } 584 string opCast(string)() { 585 return this.toString; 586 } 587 bool opEquals(U)(U x) { 588 return cast(U)this == x; 589 } 590 591 } 592 /// 593 public unittest { 594 595 static assert(isInputRange!(Buffer!ubyte)); 596 static assert(isForwardRange!(Buffer!ubyte)); 597 static assert(hasLength!(Buffer!ubyte)); 598 static assert(hasSlicing!(Buffer!ubyte)); 599 static assert(isBidirectionalRange!(Buffer!ubyte)); 600 static assert(isRandomAccessRange!(Buffer!ubyte)); 601 602 auto b = Buffer!ubyte(); 603 b.put("abc".representation.dup); 604 b.put("def".representation.dup); 605 assert(b.length == 6); 606 assert(b.toString == "abcdef"); 607 assert(b.front == 'a'); 608 assert(b.back == 'f'); 609 assert(equal(b[0..$], "abcdef")); 610 assert(equal(b[$-2..$], "ef")); 611 assert(b == "abcdef"); 612 b.popFront; 613 b.popBack; 614 assert(b.front == 'b'); 615 assert(b.back == 'e'); 616 assert(b.length == 4); 617 assert(retro(b).front == 'e'); 618 assert(countUntil(b, 'e') == 3); 619 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 620 assert(equal(b, "bcde")); 621 b.popFront; b.popFront; 622 assert(b.front == 'd'); 623 assert(b.front == b[0]); 624 assert(b.back == b[$-1]); 625 626 auto c = Buffer!ubyte(); 627 c.put("Header0: value0\n".representation.dup); 628 c.put("Header1: value1\n".representation.dup); 629 c.put("Header2: value2\n\nbody".representation.dup); 630 auto c_length = c.length; 631 auto eoh = countUntil(c, "\n\n"); 632 assert(eoh == 47); 633 foreach(header; c[0..eoh].splitter('\n') ) { 634 writeln(cast(string)header.data); 635 } 636 assert(equal(findSplit(c, "\n\n")[2], "body")); 637 assert(c.length == c_length); 638 } 639 640 public struct SSLOptions { 641 enum filetype { 642 pem, 643 asn1, 644 der = asn1, 645 } 646 private { 647 /** 648 * do we need to veryfy peer? 649 */ 650 bool _verifyPeer = false; 651 /** 652 * path to CA cert 653 */ 654 string _caCert; 655 /** 656 * path to key file (can also contain cert (for pem) 657 */ 658 string _keyFile; 659 /** 660 * path to cert file (can also contain key (for pem) 661 */ 662 string _certFile; 663 filetype _keyType = filetype.pem; 664 filetype _certType = filetype.pem; 665 } 666 ubyte haveFiles() pure nothrow @safe @nogc { 667 ubyte r = 0; 668 if ( _keyFile ) r|=1; 669 if ( _certFile ) r|=2; 670 return r; 671 } 672 // do we want to verify peer certificates? 673 bool getVerifyPeer() pure nothrow @nogc { 674 return _verifyPeer; 675 } 676 SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe { 677 _verifyPeer = v; 678 return this; 679 } 680 /// set key file name and type (default - pem) 681 auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 682 _keyFile = f; 683 _keyType = t; 684 return this; 685 } 686 auto getKeyFile() @safe pure nothrow @nogc { 687 return _keyFile; 688 } 689 auto getKeyType() @safe pure nothrow @nogc { 690 return _keyType; 691 } 692 /// set cert file name and type (default - pem) 693 auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 694 _certFile = f; 695 _certType = t; 696 return this; 697 } 698 auto setCaCert(string p) @safe pure nothrow @nogc { 699 _caCert = p; 700 return this; 701 } 702 auto getCaCert() @safe pure nothrow @nogc { 703 return _caCert; 704 } 705 auto getCertFile() @safe pure nothrow @nogc { 706 return _certFile; 707 } 708 auto getCertType() @safe pure nothrow @nogc { 709 return _certType; 710 } 711 /// set key file type 712 void setKeyType(string t) @safe pure nothrow { 713 _keyType = cast(filetype)sslKeyTypes[t]; 714 } 715 /// set cert file type 716 void setCertType(string t) @safe pure nothrow { 717 _certType = cast(filetype)sslKeyTypes[t]; 718 } 719 } 720 static immutable int[string] sslKeyTypes; 721 shared static this() { 722 sslKeyTypes = [ 723 "pem":SSLOptions.filetype.pem, 724 "asn1":SSLOptions.filetype.asn1, 725 "der":SSLOptions.filetype.der, 726 ]; 727 } 728 729 version(vibeD) { 730 } 731 else { 732 extern(C) { 733 int SSL_library_init(); 734 } 735 736 enum SSL_VERIFY_PEER = 0x01; 737 enum SSL_FILETYPE_PEM = 1; 738 enum SSL_FILETYPE_ASN1 = 2; 739 740 immutable int[SSLOptions.filetype] ft2ssl; 741 742 shared static this() { 743 ft2ssl = [ 744 SSLOptions.filetype.pem: SSL_FILETYPE_PEM, 745 SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1, 746 SSLOptions.filetype.der: SSL_FILETYPE_ASN1 747 ]; 748 } 749 750 public class OpenSslSocket : Socket { 751 //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 752 private SSL* ssl; 753 private SSL_CTX* ctx; 754 755 private void initSsl(SSLOptions opts) { 756 //ctx = SSL_CTX_new(SSLv3_client_method()); 757 ctx = openssl.SSL_CTX_new(openssl.TLS_method()); 758 assert(ctx !is null); 759 if ( opts.getVerifyPeer() ) { 760 openssl.SSL_CTX_set_default_verify_paths(ctx); 761 if ( opts.getCaCert() ) { 762 openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null); 763 } 764 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null); 765 } 766 immutable keyFile = opts.getKeyFile(); 767 immutable keyType = opts.getKeyType(); 768 immutable certFile = opts.getCertFile(); 769 immutable certType = opts.getCertType(); 770 final switch(opts.haveFiles()) { 771 case 0b11: // both files 772 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 773 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]); 774 break; 775 case 0b01: // key only 776 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 777 openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 778 break; 779 case 0b10: // cert only 780 openssl.SSL_CTX_use_PrivateKey_file(ctx, certFile.toStringz(), ft2ssl[certType]); 781 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]); 782 break; 783 case 0b00: 784 break; 785 } 786 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 787 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 788 ssl = openssl.SSL_new(ctx); 789 openssl.SSL_set_fd(ssl, cast(int)this.handle); 790 } 791 792 @trusted 793 override void connect(Address dest) { 794 super.connect(dest); 795 if(openssl.SSL_connect(ssl) == -1) { 796 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 797 } 798 } 799 auto connectSSL() { 800 if(openssl.SSL_connect(ssl) == -1) { 801 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 802 } 803 debug(requests) tracef("ssl socket connected"); 804 return this; 805 } 806 @trusted 807 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) { 808 return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length); 809 } 810 override ptrdiff_t send(const(void)[] buf) { 811 return send(buf, SocketFlags.NONE); 812 } 813 @trusted 814 override ptrdiff_t receive(void[] buf, SocketFlags flags) { 815 return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length); 816 } 817 override ptrdiff_t receive(void[] buf) { 818 return receive(buf, SocketFlags.NONE); 819 } 820 this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) { 821 super(af, type); 822 initSsl(opts); 823 } 824 this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) { 825 super(sock, af); 826 initSsl(opts); 827 } 828 override void close() { 829 //SSL_shutdown(ssl); 830 super.close(); 831 } 832 ~this() { 833 openssl.SSL_free(ssl); 834 openssl.SSL_CTX_free(ctx); 835 } 836 void SSL_set_tlsext_host_name(string host) { 837 838 } 839 } 840 841 public class SSLSocketStream: SocketStream { 842 private SSLOptions _sslOptions; 843 private Socket underlyingSocket; 844 private SSL* ssl; 845 private string host; 846 847 this(SSLOptions opts) { 848 _sslOptions = opts; 849 } 850 this(NetworkStream ostream, SSLOptions opts, string host = null) { 851 _sslOptions = opts; 852 this.host = host; 853 auto osock = ostream.so(); 854 underlyingSocket = osock; 855 auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions); 856 ssl = ss.ssl; 857 if ( host !is null ) { 858 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 859 } 860 ss.connectSSL(); 861 __isOpen = true; 862 __isConnected = true; 863 s = ss; 864 debug(requests) tracef("ssl stream created from another stream: %s", s); 865 } 866 override void close() { 867 ssl = null; 868 host = null; 869 super.close(); 870 if ( underlyingSocket ) { 871 underlyingSocket.close(); 872 } 873 } 874 override void open(AddressFamily fa) { 875 if ( s !is null ) { 876 s.close(); 877 } 878 auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions); 879 assert(ss !is null, "Can't create socket"); 880 ssl = ss.ssl; 881 if ( host !is null ) { 882 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 883 } 884 s = ss; 885 __isOpen = true; 886 } 887 override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) { 888 host = h; 889 return super.connect(h, p, timeout); 890 } 891 override SSLSocketStream accept() { 892 auto newso = s.accept(); 893 if ( s is null ) { 894 return null; 895 } 896 auto newstream = new SSLSocketStream(_sslOptions); 897 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 898 newstream.s = sslSocket; 899 newstream.__isOpen = true; 900 newstream.__isConnected = true; 901 return newstream; 902 } 903 } 904 public class TCPSocketStream : SocketStream { 905 override void open(AddressFamily fa) { 906 if ( s !is null ) { 907 s.close(); 908 } 909 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 910 assert(s !is null, "Can't create socket"); 911 __isOpen = true; 912 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 913 } 914 override TCPSocketStream accept() { 915 auto newso = s.accept(); 916 if ( s is null ) { 917 return null; 918 } 919 auto newstream = new TCPSocketStream(); 920 newstream.s = newso; 921 newstream.__isOpen = true; 922 newstream.__isConnected = true; 923 newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 924 return newstream; 925 } 926 } 927 } 928 929 public interface NetworkStream { 930 @property bool isConnected() const; 931 @property bool isOpen() const; 932 933 void close() @trusted; 934 935 /// 936 /// timeout is the socket write timeout. 937 /// 938 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 939 940 ptrdiff_t send(const(void)[] buff); 941 ptrdiff_t receive(void[] buff); 942 943 NetworkStream accept(); 944 @property void reuseAddr(bool); 945 void bind(string); 946 void bind(Address); 947 void listen(int); 948 version(vibeD) { 949 TCPConnection so(); 950 } else { 951 Socket so(); 952 } 953 /// 954 /// Set timeout for receive calls. 0 means no timeout. 955 /// 956 @property void readTimeout(Duration timeout); 957 } 958 959 public abstract class SocketStream : NetworkStream { 960 private { 961 Duration timeout; 962 Socket s; 963 bool __isOpen; 964 bool __isConnected; 965 string _bind; 966 } 967 void open(AddressFamily fa) { 968 } 969 @property Socket so() @safe pure { 970 return s; 971 } 972 @property bool isOpen() @safe @nogc pure const { 973 return s && __isOpen; 974 } 975 @property bool isConnected() @safe @nogc pure const { 976 return s && __isOpen && __isConnected; 977 } 978 void close() @trusted { 979 debug(requests) tracef("Close socket"); 980 if ( isOpen ) { 981 s.close(); 982 __isOpen = false; 983 __isConnected = false; 984 } 985 s = null; 986 } 987 /*** 988 * bind() just remember address. We will cal bind() at the time of connect as 989 * we can have several connection trials. 990 ***/ 991 override void bind(string to) { 992 _bind = to; 993 } 994 /*** 995 * Make connection to remote site. Bind, handle connection error, try several addresses, etc 996 ***/ 997 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 998 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 999 Address[] addresses; 1000 __isConnected = false; 1001 try { 1002 addresses = getAddress(host, port); 1003 } catch (Exception e) { 1004 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 1005 } 1006 foreach(a; addresses) { 1007 debug(requests) tracef("Trying %s", a); 1008 try { 1009 open(a.addressFamily); 1010 if ( _bind !is null ) { 1011 auto ad = getAddress(_bind); 1012 debug(requests) tracef("bind to %s", ad[0]); 1013 s.bind(ad[0]); 1014 } 1015 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 1016 s.connect(a); 1017 debug(requests) tracef("Connected to %s", a); 1018 __isConnected = true; 1019 break; 1020 } catch (SocketException e) { 1021 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 1022 s.close(); 1023 } 1024 } 1025 if ( !__isConnected ) { 1026 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 1027 } 1028 return this; 1029 } 1030 1031 ptrdiff_t send(const(void)[] buff) @safe 1032 in {assert(isConnected);} 1033 body { 1034 auto rc = s.send(buff); 1035 if (rc < 0) { 1036 close(); 1037 throw new NetworkException("sending data"); 1038 } 1039 return rc; 1040 } 1041 1042 ptrdiff_t receive(void[] buff) @safe { 1043 while (true) { 1044 auto r = s.receive(buff); 1045 if (r < 0) { 1046 version(Windows) { 1047 close(); 1048 if ( errno == 0 ) { 1049 throw new TimeoutException("Timeout receiving data"); 1050 } 1051 throw new NetworkException("receiving data"); 1052 } 1053 version(Posix) { 1054 if ( errno == EINTR ) { 1055 continue; 1056 } 1057 close(); 1058 if ( errno == EAGAIN ) { 1059 throw new TimeoutException("Timeout receiving data"); 1060 } 1061 throw new NetworkException("receiving data"); 1062 } 1063 } 1064 else { 1065 buff.length = r; 1066 } 1067 return r; 1068 } 1069 assert(false); 1070 } 1071 1072 @property void readTimeout(Duration timeout) @safe { 1073 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 1074 } 1075 override SocketStream accept() { 1076 assert(false, "Implement before use"); 1077 } 1078 @property override void reuseAddr(bool yes){ 1079 if (yes) { 1080 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 1081 } 1082 else { 1083 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 1084 } 1085 } 1086 override void bind(Address addr){ 1087 s.bind(addr); 1088 } 1089 override void listen(int n) { 1090 s.listen(n); 1091 }; 1092 } 1093 1094 version (vibeD) { 1095 import vibe.core.net, vibe.stream.tls; 1096 1097 public class TCPVibeStream : NetworkStream { 1098 private: 1099 TCPConnection _conn; 1100 Duration _readTimeout = Duration.max; 1101 bool _isOpen = true; 1102 string _bind; 1103 1104 public: 1105 @property bool isConnected() const { 1106 return _conn.connected; 1107 } 1108 @property override bool isOpen() const { 1109 return _conn && _isOpen; 1110 } 1111 void close() @trusted { 1112 _conn.close(); 1113 _isOpen = false; 1114 } 1115 override TCPConnection so() { 1116 return _conn; 1117 } 1118 override void bind(string to) { 1119 _bind = to; 1120 } 1121 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1122 // FIXME: timeout not supported in vibe.d 1123 try { 1124 _conn = connectTCP(host, port, _bind); 1125 } 1126 catch (Exception e) 1127 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1128 1129 return this; 1130 } 1131 1132 ptrdiff_t send(const(void)[] buff) { 1133 _conn.write(cast(const(ubyte)[])buff); 1134 return buff.length; 1135 } 1136 1137 ptrdiff_t receive(void[] buff) { 1138 if (!_conn.waitForData(_readTimeout)) { 1139 if (!_conn.connected) { 1140 return 0; 1141 } 1142 throw new TimeoutException("Timeout receiving data"); 1143 } 1144 1145 if(_conn.empty) { 1146 return 0; 1147 } 1148 1149 auto chunk = min(_conn.leastSize, buff.length); 1150 assert(chunk != 0); 1151 _conn.read(cast(ubyte[])buff[0 .. chunk]); 1152 return chunk; 1153 } 1154 1155 @property void readTimeout(Duration timeout) { 1156 if (timeout == 0.seconds) { 1157 _readTimeout = Duration.max; 1158 } 1159 else { 1160 _readTimeout = timeout; 1161 } 1162 } 1163 override TCPVibeStream accept() { 1164 assert(false, "Must be implemented"); 1165 } 1166 override @property void reuseAddr(bool){ 1167 assert(false, "Not Implemented"); 1168 } 1169 override void bind(Address){ 1170 assert(false, "Not Implemented"); 1171 } 1172 override void listen(int){ 1173 assert(false, "Not Implemented"); 1174 } 1175 } 1176 1177 public class SSLVibeStream : TCPVibeStream { 1178 private: 1179 Stream _sslStream; 1180 bool _isOpen = true; 1181 SSLOptions _sslOptions; 1182 TCPConnection underlyingConnection; 1183 1184 void connectSSL(string host) { 1185 auto sslctx = createTLSContext(TLSContextKind.client); 1186 if ( _sslOptions.getVerifyPeer() ) { 1187 if ( _sslOptions.getCaCert() == null ) { 1188 throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate."); 1189 } 1190 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert()); 1191 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert; 1192 } else { 1193 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1194 } 1195 immutable keyFile = _sslOptions.getKeyFile(); 1196 immutable certFile = _sslOptions.getCertFile(); 1197 final switch(_sslOptions.haveFiles()) { 1198 case 0b11: // both files 1199 sslctx.usePrivateKeyFile(keyFile); 1200 sslctx.useCertificateChainFile(certFile); 1201 break; 1202 case 0b01: // key only 1203 sslctx.usePrivateKeyFile(keyFile); 1204 sslctx.useCertificateChainFile(keyFile); 1205 break; 1206 case 0b10: // cert only 1207 sslctx.usePrivateKeyFile(certFile); 1208 sslctx.useCertificateChainFile(certFile); 1209 break; 1210 case 0b00: 1211 break; 1212 } 1213 _sslStream = createTLSStream(_conn, sslctx, host); 1214 } 1215 1216 public: 1217 this(SSLOptions opts) { 1218 _sslOptions = opts; 1219 } 1220 override TCPConnection so() { 1221 return _conn; 1222 } 1223 this(NetworkStream ostream, SSLOptions opts, string host = null) { 1224 _sslOptions = opts; 1225 auto oconn = ostream.so(); 1226 underlyingConnection = oconn; 1227 _conn = oconn; 1228 connectSSL(host); 1229 } 1230 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1231 try { 1232 _conn = connectTCP(host, port); 1233 connectSSL(host); 1234 } 1235 catch (ConnectError e) { 1236 throw e; 1237 } 1238 catch (Exception e) { 1239 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1240 } 1241 1242 return this; 1243 } 1244 1245 override ptrdiff_t send(const(void)[] buff) { 1246 _sslStream.write(cast(const(ubyte)[])buff); 1247 return buff.length; 1248 } 1249 1250 override ptrdiff_t receive(void[] buff) { 1251 if (!_sslStream.dataAvailableForRead) { 1252 if (!_conn.waitForData(_readTimeout)) { 1253 if (!_conn.connected) { 1254 return 0; 1255 } 1256 throw new TimeoutException("Timeout receiving data"); 1257 } 1258 } 1259 1260 if(_sslStream.empty) { 1261 return 0; 1262 } 1263 1264 auto chunk = min(_sslStream.leastSize, buff.length); 1265 assert(chunk != 0); 1266 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1267 return chunk; 1268 } 1269 1270 override void close() @trusted { 1271 _sslStream.finalize(); 1272 _conn.close(); 1273 _isOpen = false; 1274 } 1275 @property override bool isOpen() const { 1276 return _conn && _isOpen; 1277 } 1278 override SSLVibeStream accept() { 1279 assert(false, "Must be implemented"); 1280 } 1281 override @property void reuseAddr(bool){ 1282 assert(false, "Not Implemented"); 1283 } 1284 override void bind(Address){ 1285 assert(false, "Not Implemented"); 1286 } 1287 override void listen(int){ 1288 assert(false, "Not Implemented"); 1289 } 1290 } 1291 } 1292 1293 version (vibeD) { 1294 public alias TCPStream = TCPVibeStream; 1295 public alias SSLStream = SSLVibeStream; 1296 } 1297 else { 1298 public alias TCPStream = TCPSocketStream; 1299 public alias SSLStream = SSLSocketStream; 1300 }