1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 20 import requests.ssl_adapter : openssl, SSL, SSL_CTX; 21 22 alias InDataHandler = DataPipeIface!ubyte; 23 24 public class ConnectError: Exception { 25 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 26 super(message, file, line, next); 27 } 28 } 29 30 class DecodingException: Exception { 31 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 32 super(message, file, line, next); 33 } 34 } 35 36 public class TimeoutException: Exception { 37 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 38 super(message, file, line, next); 39 } 40 } 41 42 public class NetworkException: Exception { 43 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 44 super(message, file, line, next); 45 } 46 } 47 48 /** 49 * DataPipeIface can accept some data, process, and return processed data. 50 */ 51 public interface DataPipeIface(E) { 52 /// Is there any processed data ready for reading? 53 bool empty(); 54 /// Put next data portion for processing 55 //void put(E[]); 56 void putNoCopy(E[]); 57 /// Get any ready data 58 E[] get(); 59 /// Signal on end of incoming data stream. 60 void flush(); 61 } 62 /** 63 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 64 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 65 * and uncompress Content-Encoding "gzip". 66 */ 67 public class DataPipe(E) : DataPipeIface!E { 68 69 DataPipeIface!(E)[] pipe; 70 Buffer!E buffer; 71 /// Append data processor to pipeline 72 /// Params: 73 /// p = processor 74 final void insert(DataPipeIface!E p) { 75 pipe ~= p; 76 } 77 final E[][] process(DataPipeIface!E p, E[][] data) { 78 E[][] result; 79 data.each!(e => p.putNoCopy(e)); 80 while(!p.empty()) result ~= p.get(); 81 return result; 82 } 83 /// Process next data portion. Data passed over pipeline and store result in buffer. 84 /// Params: 85 /// data = input data buffer. 86 /// NoCopy means we do not copy data to buffer, we keep reference 87 final void putNoCopy(E[] data) { 88 if ( pipe.empty ) { 89 buffer.putNoCopy(data); 90 return; 91 } 92 try { 93 auto t = process(pipe.front, [data]); 94 foreach(ref p; pipe[1..$]) { 95 t = process(p, t); 96 } 97 t.each!(b => buffer.putNoCopy(b)); 98 } 99 catch (Exception e) { 100 throw new DecodingException(e.msg); 101 } 102 } 103 /// Get what was collected in internal buffer and clear it. 104 /// Returns: 105 /// data collected. 106 final E[] get() { 107 if ( buffer.empty ) { 108 return E[].init; 109 } 110 auto res = buffer.data; 111 buffer = Buffer!E.init; 112 return res; 113 } 114 /// 115 /// get without datamove. but user receive [][] 116 /// 117 final E[][] getNoCopy() { 118 if ( buffer.empty ) { 119 return E[][].init; 120 } 121 E[][] res = buffer.__repr.__buffer; 122 buffer = Buffer!E.init; 123 return res; 124 } 125 /// Test if internal buffer is empty 126 /// Returns: 127 /// true if internal buffer is empty (nothing to get()) 128 final bool empty() pure const @safe { 129 return buffer.empty; 130 } 131 final void flush() { 132 E[][] product; 133 foreach(ref p; pipe) { 134 product.each!(e => p.putNoCopy(e)); 135 p.flush(); 136 product.length = 0; 137 while( !p.empty ) product ~= p.get(); 138 } 139 product.each!(b => buffer.putNoCopy(b)); 140 } 141 } 142 143 /** 144 * Processor for gzipped/compressed content. 145 * Also support InputRange interface. 146 */ 147 public class Decompressor(E) : DataPipeIface!E { 148 private { 149 Buffer!ubyte __buff; 150 UnCompress __zlib; 151 } 152 this() { 153 __buff = Buffer!ubyte(); 154 __zlib = new UnCompress(); 155 } 156 final override void putNoCopy(E[] data) { 157 if ( __zlib is null ) { 158 __zlib = new UnCompress(); 159 } 160 __buff.putNoCopy(__zlib.uncompress(data)); 161 } 162 final override E[] get() pure { 163 assert(__buff.length); 164 auto r = __buff.__repr.__buffer[0]; 165 __buff.popFrontN(r.length); 166 return cast(E[])r; 167 } 168 final override void flush() { 169 if ( __zlib is null ) { 170 return; 171 } 172 __buff.put(__zlib.flush()); 173 } 174 final override @property bool empty() const pure @safe { 175 debug(requests) tracef("empty=%b", __buff.empty); 176 return __buff.empty; 177 } 178 final @property auto ref front() pure const @safe { 179 debug(requests) tracef("front: buff length=%d", __buff.length); 180 return __buff.front; 181 } 182 final @property auto popFront() pure @safe { 183 debug(requests) tracef("popFront: buff length=%d", __buff.length); 184 return __buff.popFront; 185 } 186 final @property void popFrontN(size_t n) pure @safe { 187 __buff.popFrontN(n); 188 } 189 } 190 191 /** 192 * Unchunk chunked http responce body. 193 */ 194 public class DecodeChunked : DataPipeIface!ubyte { 195 // length := 0 196 // read chunk-size, chunk-extension (if any) and CRLF 197 // while (chunk-size > 0) { 198 // read chunk-data and CRLF 199 // append chunk-data to entity-body 200 // length := length + chunk-size 201 // read chunk-size and CRLF 202 // } 203 // read entity-header 204 // while (entity-header not empty) { 205 // append entity-header to existing header fields 206 // read entity-header 207 // } 208 // Content-Length := length 209 // Remove "chunked" from Transfer-Encoding 210 // 211 212 // Chunked-Body = *chunk 213 // last-chunk 214 // trailer 215 // CRLF 216 // 217 // chunk = chunk-size [ chunk-extension ] CRLF 218 // chunk-data CRLF 219 // chunk-size = 1*HEX 220 // last-chunk = 1*("0") [ chunk-extension ] CRLF 221 // 222 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 223 // chunk-ext-name = token 224 // chunk-ext-val = token | quoted-string 225 // chunk-data = chunk-size(OCTET) 226 // trailer = *(entity-header CRLF) 227 228 alias eType = ubyte; 229 immutable eType[] CRLF = ['\r', '\n']; 230 private { 231 enum States {huntingSize, huntingSeparator, receiving, trailer}; 232 char state = States.huntingSize; 233 size_t chunk_size, to_receive; 234 Buffer!ubyte buff; 235 ubyte[] linebuff; 236 } 237 final void putNoCopy(eType[] data) { 238 while ( data.length ) { 239 if ( state == States.trailer ) { 240 to_receive = to_receive - min(to_receive, data.length); 241 return; 242 } 243 if ( state == States.huntingSize ) { 244 import std.ascii; 245 ubyte[10] digits; 246 int i; 247 for(i=0;i<data.length;i++) { 248 ubyte v = data[i]; 249 digits[i] = v; 250 if ( v == '\n' ) { 251 i+=1; 252 break; 253 } 254 } 255 linebuff ~= digits[0..i]; 256 if ( linebuff.length >= 80 ) { 257 throw new DecodingException("Can't find chunk size in the body"); 258 } 259 data = data[i..$]; 260 if (!linebuff.canFind(CRLF)) { 261 continue; 262 } 263 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 264 state = States.receiving; 265 to_receive = chunk_size; 266 if ( chunk_size == 0 ) { 267 to_receive = 2-min(2, data.length); // trailing \r\n 268 state = States.trailer; 269 return; 270 } 271 continue; 272 } 273 if ( state == States.receiving ) { 274 if (to_receive > 0 ) { 275 auto can_store = min(to_receive, data.length); 276 buff.putNoCopy(data[0..can_store]); 277 data = data[can_store..$]; 278 to_receive -= can_store; 279 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 280 if ( to_receive == 0 ) { 281 //tracef("switch to huntig separator"); 282 state = States.huntingSeparator; 283 continue; 284 } 285 continue; 286 } 287 assert(false); 288 } 289 if ( state == States.huntingSeparator ) { 290 if ( data[0] == '\n' || data[0]=='\r') { 291 data = data[1..$]; 292 continue; 293 } 294 state = States.huntingSize; 295 linebuff.length = 0; 296 continue; 297 } 298 } 299 } 300 final eType[] get() { 301 auto r = buff.__repr.__buffer[0]; 302 buff.popFrontN(r.length); 303 return r; 304 } 305 final void flush() { 306 } 307 final bool empty() { 308 debug(requests) tracef("empty=%b", buff.empty); 309 return buff.empty; 310 } 311 final bool done() { 312 return state==States.trailer && to_receive==0; 313 } 314 } 315 316 unittest { 317 info("Testing DataPipe"); 318 globalLogLevel(LogLevel.info); 319 alias eType = char; 320 eType[] gzipped = [ 321 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 322 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 323 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 324 0x08, 0x00, 0x00, 0x00 325 ]; // "abc\ndef\n" 326 auto d = new Decompressor!eType(); 327 d.putNoCopy(gzipped[0..2].dup); 328 d.putNoCopy(gzipped[2..10].dup); 329 d.putNoCopy(gzipped[10..$].dup); 330 d.flush(); 331 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 332 333 auto e = new Decompressor!eType(); 334 e.putNoCopy(gzipped[0..10].dup); 335 e.putNoCopy(gzipped[10..$].dup); 336 e.flush(); 337 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 338 // writeln(gzipped.decompress.filter!(a => a!='b').array); 339 auto dp = new DataPipe!eType; 340 dp.insert(new Decompressor!eType()); 341 dp.putNoCopy(gzipped[0..2].dup); 342 dp.putNoCopy(gzipped[2..$].dup); 343 dp.flush(); 344 assert(equal(dp.get(), "abc\ndef\n")); 345 // empty datapipe shoul just pass input to output 346 auto dpu = new DataPipe!ubyte; 347 dpu.putNoCopy("abcd".dup.representation); 348 dpu.putNoCopy("efgh".dup.representation); 349 dpu.flush(); 350 assert(equal(dpu.get(), "abcdefgh")); 351 info("Test unchunker properties"); 352 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation; 353 ubyte[][] result; 354 auto uc = new DecodeChunked(); 355 uc.putNoCopy(twoChunks); 356 while(!uc.empty) { 357 result ~= uc.get(); 358 } 359 assert(equal(result[0], ['1', '2'])); 360 assert(equal(result[1], ['3', '4'])); 361 info("unchunker correctness - ok"); 362 result[0][0] = '5'; 363 assert(twoChunks[3] == '5'); 364 info("unchunker zero copy - ok"); 365 info("Testing DataPipe - done"); 366 } 367 /** 368 * Buffer used to collect and process data from network. It remainds Appender, but support 369 * also Range interface. 370 * $(P To place data in buffer use put() method.) 371 * $(P To retrieve data from buffer you can use several methods:) 372 * $(UL 373 * $(LI Range methods: front, back, index []) 374 * $(LI data method: return collected data (like Appender.data)) 375 * ) 376 */ 377 static this() { 378 } 379 static ~this() { 380 } 381 enum CACHESIZE = 1024; 382 383 static long reprAlloc; 384 static long reprCacheHit; 385 static long reprCacheRequests; 386 387 388 public struct Buffer(T) { 389 // static Repr[CACHESIZE] cache; 390 // static uint cacheIndex; 391 392 private { 393 Repr cachedOrNew() { 394 return new Repr; 395 // reprCacheRequests++; 396 // if ( false && cacheIndex>0 ) { 397 // reprCacheHit++; 398 // cacheIndex -= 1; 399 // return cache[cacheIndex]; 400 // } else { 401 // return new Repr; 402 // } 403 } 404 class Repr { 405 size_t __length; 406 Unqual!T[][] __buffer; 407 this() { 408 reprAlloc++; 409 __length = 0; 410 } 411 this(Repr other) { 412 reprAlloc++; 413 if ( other is null ) 414 return; 415 __length = other.__length; 416 __buffer = other.__buffer.dup; 417 } 418 } 419 Repr __repr; 420 } 421 422 alias toString = data!string; 423 424 this(this) { 425 if ( !__repr ) { 426 return; 427 } 428 __repr = new Repr(__repr); 429 } 430 this(U)(U[] data) { 431 put(data); 432 } 433 ~this() { 434 __repr = null; 435 } 436 /*************** 437 * store data. Data copied 438 */ 439 auto put(U)(U[] data) { 440 if ( data.length == 0 ) { 441 return; 442 } 443 if ( !__repr ) { 444 __repr = cachedOrNew(); 445 } 446 static if (!is(U == T)) { 447 auto d = cast(T[])(data); 448 __repr.__length += d.length; 449 __repr.__buffer ~= d.dup; 450 } else { 451 __repr.__length += data.length; 452 __repr.__buffer ~= data.dup; 453 } 454 return; 455 } 456 auto putNoCopy(U)(U[] data) { 457 if ( data.length == 0 ) { 458 return; 459 } 460 if ( !__repr ) { 461 __repr = cachedOrNew(); 462 } 463 static if (!is(U == T)) { 464 auto d = cast(T[])(data); 465 __repr.__length += d.length; 466 __repr.__buffer ~= d; 467 } else { 468 __repr.__length += data.length; 469 __repr.__buffer ~= data; 470 } 471 return; 472 } 473 @property auto opDollar() const pure @safe { 474 return __repr.__length; 475 } 476 @property size_t length() const pure @safe { 477 if ( !__repr ) { 478 return 0; 479 } 480 return __repr.__length; 481 } 482 @property auto empty() const pure @safe { 483 return length == 0; 484 } 485 @property auto ref front() const pure @safe { 486 assert(length); 487 return __repr.__buffer.front.front; 488 } 489 @property auto ref back() const pure @safe { 490 assert(length); 491 return __repr.__buffer.back.back; 492 } 493 @property void popFront() pure @safe { 494 assert(length); 495 with ( __repr ) { 496 __buffer.front.popFront; 497 if ( __buffer.front.length == 0 ) { 498 __buffer.popFront; 499 } 500 __length--; 501 } 502 } 503 @property void popFrontN(size_t n) pure @safe { 504 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 505 __repr.__length -= n; 506 while( n ) { 507 if ( n <= __repr.__buffer.front.length ) { 508 __repr.__buffer.front.popFrontN(n); 509 if ( __repr.__buffer.front.length == 0 ) { 510 __repr.__buffer.popFront; 511 } 512 return; 513 } 514 n -= __repr.__buffer.front.length; 515 __repr.__buffer.popFront; 516 } 517 } 518 @property void popBack() pure @safe { 519 assert(length); 520 __repr.__buffer.back.popBack; 521 if ( __repr.__buffer.back.length == 0 ) { 522 __repr.__buffer.popBack; 523 } 524 __repr.__length--; 525 } 526 @property void popBackN(size_t n) pure @safe { 527 assert(n <= length, "n: %d, length: %d".format(n, length)); 528 __repr.__length -= n; 529 while( n ) { 530 if ( n <= __repr.__buffer.back.length ) { 531 __repr.__buffer.back.popBackN(n); 532 if ( __repr.__buffer.back.length == 0 ) { 533 __repr.__buffer.popBack; 534 } 535 return; 536 } 537 n -= __repr.__buffer.back.length; 538 __repr.__buffer.popBack; 539 } 540 } 541 @property auto save() @safe { 542 auto n = Buffer!T(); 543 n.__repr = new Repr(__repr); 544 return n; 545 } 546 @property auto ref opIndex(size_t n) const pure @safe { 547 assert( __repr && n < __repr.__length ); 548 foreach(b; __repr.__buffer) { 549 if ( n < b.length ) { 550 return b[n]; 551 } 552 n -= b.length; 553 } 554 assert(false, "Impossible"); 555 } 556 Buffer!T opSlice(size_t m, size_t n) { 557 if ( empty || m == n ) { 558 return Buffer!T(); 559 } 560 assert( m <= n && n <= __repr.__length); 561 auto res = this.save(); 562 res.popBackN(res.__repr.__length-n); 563 res.popFrontN(m); 564 return res; 565 } 566 @property auto data(U=T[])() pure { 567 static if ( is(U==T[]) ) { 568 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 569 return __repr.__buffer.front; 570 } 571 } 572 Appender!(T[]) a; 573 if ( __repr && __repr.__buffer ) { 574 foreach(ref b; __repr.__buffer) { 575 a.put(b); 576 } 577 } 578 static if ( is(U==T[]) ) { 579 return a.data; 580 } else { 581 return cast(U)a.data; 582 } 583 } 584 string opCast(string)() { 585 return this.toString; 586 } 587 bool opEquals(U)(U x) { 588 return cast(U)this == x; 589 } 590 591 } 592 /// 593 public unittest { 594 595 static assert(isInputRange!(Buffer!ubyte)); 596 static assert(isForwardRange!(Buffer!ubyte)); 597 static assert(hasLength!(Buffer!ubyte)); 598 static assert(hasSlicing!(Buffer!ubyte)); 599 static assert(isBidirectionalRange!(Buffer!ubyte)); 600 static assert(isRandomAccessRange!(Buffer!ubyte)); 601 602 auto b = Buffer!ubyte(); 603 b.put("abc".representation.dup); 604 b.put("def".representation.dup); 605 assert(b.length == 6); 606 assert(b.toString == "abcdef"); 607 assert(b.front == 'a'); 608 assert(b.back == 'f'); 609 assert(equal(b[0..$], "abcdef")); 610 assert(equal(b[$-2..$], "ef")); 611 assert(b == "abcdef"); 612 b.popFront; 613 b.popBack; 614 assert(b.front == 'b'); 615 assert(b.back == 'e'); 616 assert(b.length == 4); 617 assert(retro(b).front == 'e'); 618 assert(countUntil(b, 'e') == 3); 619 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 620 assert(equal(b, "bcde")); 621 b.popFront; b.popFront; 622 assert(b.front == 'd'); 623 assert(b.front == b[0]); 624 assert(b.back == b[$-1]); 625 626 auto c = Buffer!ubyte(); 627 c.put("Header0: value0\n".representation.dup); 628 c.put("Header1: value1\n".representation.dup); 629 c.put("Header2: value2\n\nbody".representation.dup); 630 auto c_length = c.length; 631 auto eoh = countUntil(c, "\n\n"); 632 assert(eoh == 47); 633 foreach(header; c[0..eoh].splitter('\n') ) { 634 writeln(cast(string)header.data); 635 } 636 assert(equal(findSplit(c, "\n\n")[2], "body")); 637 assert(c.length == c_length); 638 } 639 640 public struct SSLOptions { 641 enum filetype { 642 pem, 643 asn1, 644 der = asn1, 645 } 646 private { 647 /** 648 * do we need to veryfy peer? 649 */ 650 bool _verifyPeer = true; 651 /** 652 * path to CA cert 653 */ 654 string _caCert; 655 /** 656 * path to key file (can also contain cert (for pem) 657 */ 658 string _keyFile; 659 /** 660 * path to cert file (can also contain key (for pem) 661 */ 662 string _certFile; 663 filetype _keyType = filetype.pem; 664 filetype _certType = filetype.pem; 665 } 666 ubyte haveFiles() pure nothrow @safe @nogc { 667 ubyte r = 0; 668 if ( _keyFile ) r|=1; 669 if ( _certFile ) r|=2; 670 return r; 671 } 672 // do we want to verify peer certificates? 673 bool getVerifyPeer() pure nothrow @nogc { 674 return _verifyPeer; 675 } 676 SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe { 677 _verifyPeer = v; 678 return this; 679 } 680 /// set key file name and type (default - pem) 681 auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 682 _keyFile = f; 683 _keyType = t; 684 return this; 685 } 686 auto getKeyFile() @safe pure nothrow @nogc { 687 return _keyFile; 688 } 689 auto getKeyType() @safe pure nothrow @nogc { 690 return _keyType; 691 } 692 /// set cert file name and type (default - pem) 693 auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 694 _certFile = f; 695 _certType = t; 696 return this; 697 } 698 auto setCaCert(string p) @safe pure nothrow @nogc { 699 _caCert = p; 700 return this; 701 } 702 auto getCaCert() @safe pure nothrow @nogc { 703 return _caCert; 704 } 705 auto getCertFile() @safe pure nothrow @nogc { 706 return _certFile; 707 } 708 auto getCertType() @safe pure nothrow @nogc { 709 return _certType; 710 } 711 /// set key file type 712 void setKeyType(string t) @safe pure nothrow { 713 _keyType = cast(filetype)sslKeyTypes[t]; 714 } 715 /// set cert file type 716 void setCertType(string t) @safe pure nothrow { 717 _certType = cast(filetype)sslKeyTypes[t]; 718 } 719 } 720 static immutable int[string] sslKeyTypes; 721 shared static this() { 722 sslKeyTypes = [ 723 "pem":SSLOptions.filetype.pem, 724 "asn1":SSLOptions.filetype.asn1, 725 "der":SSLOptions.filetype.der, 726 ]; 727 } 728 729 version(vibeD) { 730 } 731 else { 732 extern(C) { 733 int SSL_library_init(); 734 } 735 736 enum SSL_VERIFY_PEER = 0x01; 737 enum SSL_FILETYPE_PEM = 1; 738 enum SSL_FILETYPE_ASN1 = 2; 739 740 immutable int[SSLOptions.filetype] ft2ssl; 741 742 shared static this() { 743 ft2ssl = [ 744 SSLOptions.filetype.pem: SSL_FILETYPE_PEM, 745 SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1, 746 SSLOptions.filetype.der: SSL_FILETYPE_ASN1 747 ]; 748 } 749 750 public class OpenSslSocket : Socket { 751 //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 752 private SSL* ssl; 753 private SSL_CTX* ctx; 754 755 private void initSsl(SSLOptions opts) { 756 //ctx = SSL_CTX_new(SSLv3_client_method()); 757 ctx = openssl.SSL_CTX_new(openssl.TLS_method()); 758 assert(ctx !is null); 759 if ( opts.getVerifyPeer() ) { 760 openssl.SSL_CTX_set_default_verify_paths(ctx); 761 if ( opts.getCaCert() ) { 762 openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null); 763 } 764 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null); 765 } 766 immutable keyFile = opts.getKeyFile(); 767 immutable keyType = opts.getKeyType(); 768 immutable certFile = opts.getCertFile(); 769 immutable certType = opts.getCertType(); 770 final switch(opts.haveFiles()) { 771 case 0b11: // both files 772 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 773 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]); 774 break; 775 case 0b01: // key only 776 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 777 openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 778 break; 779 case 0b10: // cert only 780 openssl.SSL_CTX_use_PrivateKey_file(ctx, certFile.toStringz(), ft2ssl[certType]); 781 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]); 782 break; 783 case 0b00: 784 break; 785 } 786 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 787 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 788 ssl = openssl.SSL_new(ctx); 789 openssl.SSL_set_fd(ssl, cast(int)this.handle); 790 } 791 792 @trusted 793 override void connect(Address dest) { 794 super.connect(dest); 795 if(openssl.SSL_connect(ssl) == -1) { 796 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 797 } 798 } 799 auto connectSSL() { 800 if(openssl.SSL_connect(ssl) == -1) { 801 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 802 } 803 debug(requests) tracef("ssl socket connected"); 804 return this; 805 } 806 @trusted 807 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope { 808 return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length); 809 } 810 override ptrdiff_t send(const(void)[] buf) scope { 811 return send(buf, SocketFlags.NONE); 812 } 813 @trusted 814 override ptrdiff_t receive(void[] buf, SocketFlags flags) scope { 815 return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length); 816 } 817 override ptrdiff_t receive(void[] buf) scope { 818 return receive(buf, SocketFlags.NONE); 819 } 820 this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) { 821 super(af, type); 822 initSsl(opts); 823 } 824 this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) { 825 super(sock, af); 826 initSsl(opts); 827 } 828 override void close() scope { 829 super.close(); 830 if ( ssl !is null ) { 831 openssl.SSL_free(ssl); 832 ssl = null; 833 } 834 if ( ctx !is null ) { 835 openssl.SSL_CTX_free(ctx); 836 ctx = null; 837 } 838 } 839 void SSL_set_tlsext_host_name(string host) { 840 841 } 842 } 843 844 public class SSLSocketStream: SocketStream { 845 private SSLOptions _sslOptions; 846 private Socket underlyingSocket; 847 private SSL* ssl; 848 private string host; 849 850 this(SSLOptions opts) { 851 _sslOptions = opts; 852 } 853 this(NetworkStream ostream, SSLOptions opts, string host = null) { 854 _sslOptions = opts; 855 this.host = host; 856 auto osock = ostream.so(); 857 underlyingSocket = osock; 858 auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions); 859 ssl = ss.ssl; 860 if ( host !is null ) { 861 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 862 } 863 ss.connectSSL(); 864 __isOpen = true; 865 __isConnected = true; 866 s = ss; 867 debug(requests) tracef("ssl stream created from another stream: %s", s); 868 } 869 override void close() { 870 ssl = null; 871 host = null; 872 super.close(); 873 if ( underlyingSocket ) { 874 underlyingSocket.close(); 875 } 876 } 877 override void open(AddressFamily fa) { 878 if ( s !is null ) { 879 s.close(); 880 } 881 auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions); 882 assert(ss !is null, "Can't create socket"); 883 ssl = ss.ssl; 884 if ( host !is null ) { 885 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 886 } 887 s = ss; 888 __isOpen = true; 889 } 890 override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) { 891 host = h; 892 return super.connect(h, p, timeout); 893 } 894 override SSLSocketStream accept() { 895 auto newso = s.accept(); 896 if ( s is null ) { 897 return null; 898 } 899 auto newstream = new SSLSocketStream(_sslOptions); 900 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 901 newstream.s = sslSocket; 902 newstream.__isOpen = true; 903 newstream.__isConnected = true; 904 return newstream; 905 } 906 } 907 public class TCPSocketStream : SocketStream { 908 override void open(AddressFamily fa) { 909 if ( s !is null ) { 910 s.close(); 911 } 912 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 913 assert(s !is null, "Can't create socket"); 914 __isOpen = true; 915 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 916 } 917 override TCPSocketStream accept() { 918 auto newso = s.accept(); 919 if ( s is null ) { 920 return null; 921 } 922 auto newstream = new TCPSocketStream(); 923 newstream.s = newso; 924 newstream.__isOpen = true; 925 newstream.__isConnected = true; 926 newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 927 return newstream; 928 } 929 } 930 } 931 932 public interface NetworkStream { 933 @property bool isConnected() const; 934 @property bool isOpen() const; 935 936 void close() @trusted; 937 938 /// 939 /// timeout is the socket write timeout. 940 /// 941 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 942 943 ptrdiff_t send(const(void)[] buff); 944 ptrdiff_t receive(void[] buff); 945 946 NetworkStream accept(); 947 @property void reuseAddr(bool); 948 void bind(string); 949 void bind(Address); 950 void listen(int); 951 version(vibeD) { 952 TCPConnection so(); 953 } else { 954 Socket so(); 955 } 956 /// 957 /// Set timeout for receive calls. 0 means no timeout. 958 /// 959 @property void readTimeout(Duration timeout); 960 } 961 962 public abstract class SocketStream : NetworkStream { 963 private { 964 Duration timeout; 965 Socket s; 966 bool __isOpen; 967 bool __isConnected; 968 string _bind; 969 } 970 void open(AddressFamily fa) { 971 } 972 @property Socket so() @safe pure { 973 return s; 974 } 975 @property bool isOpen() @safe @nogc pure const { 976 return s && __isOpen; 977 } 978 @property bool isConnected() @safe @nogc pure const { 979 return s && __isOpen && __isConnected; 980 } 981 void close() @trusted { 982 debug(requests) tracef("Close socket"); 983 if ( isOpen ) { 984 s.close(); 985 __isOpen = false; 986 __isConnected = false; 987 } 988 s = null; 989 } 990 /*** 991 * bind() just remember address. We will cal bind() at the time of connect as 992 * we can have several connection trials. 993 ***/ 994 override void bind(string to) { 995 _bind = to; 996 } 997 /*** 998 * Make connection to remote site. Bind, handle connection error, try several addresses, etc 999 ***/ 1000 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1001 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 1002 Address[] addresses; 1003 __isConnected = false; 1004 try { 1005 addresses = getAddress(host, port); 1006 } catch (Exception e) { 1007 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 1008 } 1009 foreach(a; addresses) { 1010 debug(requests) tracef("Trying %s", a); 1011 try { 1012 open(a.addressFamily); 1013 if ( _bind !is null ) { 1014 auto ad = getAddress(_bind); 1015 debug(requests) tracef("bind to %s", ad[0]); 1016 s.bind(ad[0]); 1017 } 1018 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 1019 s.connect(a); 1020 debug(requests) tracef("Connected to %s", a); 1021 __isConnected = true; 1022 break; 1023 } catch (SocketException e) { 1024 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 1025 s.close(); 1026 } 1027 } 1028 if ( !__isConnected ) { 1029 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 1030 } 1031 return this; 1032 } 1033 1034 ptrdiff_t send(const(void)[] buff) @safe 1035 in {assert(isConnected);} 1036 body { 1037 auto rc = s.send(buff); 1038 if (rc < 0) { 1039 close(); 1040 throw new NetworkException("sending data"); 1041 } 1042 return rc; 1043 } 1044 1045 ptrdiff_t receive(void[] buff) @safe { 1046 while (true) { 1047 auto r = s.receive(buff); 1048 if (r < 0) { 1049 version(Windows) { 1050 close(); 1051 if ( errno == 0 ) { 1052 throw new TimeoutException("Timeout receiving data"); 1053 } 1054 throw new NetworkException("receiving data"); 1055 } 1056 version(Posix) { 1057 if ( errno == EINTR ) { 1058 continue; 1059 } 1060 close(); 1061 if ( errno == EAGAIN ) { 1062 throw new TimeoutException("Timeout receiving data"); 1063 } 1064 throw new NetworkException("receiving data"); 1065 } 1066 } 1067 else { 1068 buff.length = r; 1069 } 1070 return r; 1071 } 1072 assert(false); 1073 } 1074 1075 @property void readTimeout(Duration timeout) @safe { 1076 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 1077 } 1078 override SocketStream accept() { 1079 assert(false, "Implement before use"); 1080 } 1081 @property override void reuseAddr(bool yes){ 1082 if (yes) { 1083 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 1084 } 1085 else { 1086 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 1087 } 1088 } 1089 override void bind(Address addr){ 1090 s.bind(addr); 1091 } 1092 override void listen(int n) { 1093 s.listen(n); 1094 }; 1095 } 1096 1097 version (vibeD) { 1098 import vibe.core.net, vibe.stream.tls; 1099 1100 public class TCPVibeStream : NetworkStream { 1101 private: 1102 TCPConnection _conn; 1103 Duration _readTimeout = Duration.max; 1104 bool _isOpen = true; 1105 string _bind; 1106 1107 public: 1108 @property bool isConnected() const { 1109 return _conn.connected; 1110 } 1111 @property override bool isOpen() const { 1112 return _conn && _isOpen; 1113 } 1114 void close() @trusted { 1115 _conn.close(); 1116 _isOpen = false; 1117 } 1118 override TCPConnection so() { 1119 return _conn; 1120 } 1121 override void bind(string to) { 1122 _bind = to; 1123 } 1124 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1125 // FIXME: timeout not supported in vibe.d 1126 try { 1127 _conn = connectTCP(host, port, _bind); 1128 } 1129 catch (Exception e) 1130 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1131 1132 return this; 1133 } 1134 1135 ptrdiff_t send(const(void)[] buff) { 1136 _conn.write(cast(const(ubyte)[])buff); 1137 return buff.length; 1138 } 1139 1140 ptrdiff_t receive(void[] buff) { 1141 if (!_conn.waitForData(_readTimeout)) { 1142 if (!_conn.connected || _conn.empty ) { 1143 return 0; 1144 } 1145 throw new TimeoutException("Timeout receiving data"); 1146 } 1147 1148 if(_conn.empty) { 1149 return 0; 1150 } 1151 1152 auto chunk = min(_conn.leastSize, buff.length); 1153 assert(chunk != 0); 1154 _conn.read(cast(ubyte[])buff[0 .. chunk]); 1155 return chunk; 1156 } 1157 1158 @property void readTimeout(Duration timeout) { 1159 if (timeout == 0.seconds) { 1160 _readTimeout = Duration.max; 1161 } 1162 else { 1163 _readTimeout = timeout; 1164 } 1165 } 1166 override TCPVibeStream accept() { 1167 assert(false, "Must be implemented"); 1168 } 1169 override @property void reuseAddr(bool){ 1170 assert(false, "Not Implemented"); 1171 } 1172 override void bind(Address){ 1173 assert(false, "Not Implemented"); 1174 } 1175 override void listen(int){ 1176 assert(false, "Not Implemented"); 1177 } 1178 } 1179 1180 public class SSLVibeStream : TCPVibeStream { 1181 private: 1182 TLSStream _sslStream; 1183 bool _isOpen = true; 1184 SSLOptions _sslOptions; 1185 TCPConnection underlyingConnection; 1186 1187 void connectSSL(string host) { 1188 auto sslctx = createTLSContext(TLSContextKind.client); 1189 if ( _sslOptions.getVerifyPeer() ) { 1190 if ( _sslOptions.getCaCert() == null ) { 1191 throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate."); 1192 } 1193 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert()); 1194 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert; 1195 } else { 1196 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1197 } 1198 immutable keyFile = _sslOptions.getKeyFile(); 1199 immutable certFile = _sslOptions.getCertFile(); 1200 final switch(_sslOptions.haveFiles()) { 1201 case 0b11: // both files 1202 sslctx.usePrivateKeyFile(keyFile); 1203 sslctx.useCertificateChainFile(certFile); 1204 break; 1205 case 0b01: // key only 1206 sslctx.usePrivateKeyFile(keyFile); 1207 sslctx.useCertificateChainFile(keyFile); 1208 break; 1209 case 0b10: // cert only 1210 sslctx.usePrivateKeyFile(certFile); 1211 sslctx.useCertificateChainFile(certFile); 1212 break; 1213 case 0b00: 1214 break; 1215 } 1216 _sslStream = createTLSStream(_conn, sslctx, host); 1217 } 1218 1219 public: 1220 this(SSLOptions opts) { 1221 _sslOptions = opts; 1222 } 1223 override TCPConnection so() { 1224 return _conn; 1225 } 1226 this(NetworkStream ostream, SSLOptions opts, string host = null) { 1227 _sslOptions = opts; 1228 auto oconn = ostream.so(); 1229 underlyingConnection = oconn; 1230 _conn = oconn; 1231 connectSSL(host); 1232 } 1233 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1234 try { 1235 _conn = connectTCP(host, port); 1236 connectSSL(host); 1237 } 1238 catch (ConnectError e) { 1239 throw e; 1240 } 1241 catch (Exception e) { 1242 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1243 } 1244 1245 return this; 1246 } 1247 1248 override ptrdiff_t send(const(void)[] buff) { 1249 _sslStream.write(cast(const(ubyte)[])buff); 1250 return buff.length; 1251 } 1252 1253 override ptrdiff_t receive(void[] buff) { 1254 if (!_sslStream.dataAvailableForRead) { 1255 if (!_conn.waitForData(_readTimeout)) { 1256 if (!_conn.connected) { 1257 return 0; 1258 } 1259 throw new TimeoutException("Timeout receiving data"); 1260 } 1261 } 1262 1263 if(_sslStream.empty) { 1264 return 0; 1265 } 1266 1267 auto chunk = min(_sslStream.leastSize, buff.length); 1268 assert(chunk != 0); 1269 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1270 return chunk; 1271 } 1272 1273 override void close() @trusted { 1274 _sslStream.finalize(); 1275 _conn.close(); 1276 _isOpen = false; 1277 } 1278 @property override bool isOpen() const { 1279 return _conn && _isOpen; 1280 } 1281 override SSLVibeStream accept() { 1282 assert(false, "Must be implemented"); 1283 } 1284 override @property void reuseAddr(bool){ 1285 assert(false, "Not Implemented"); 1286 } 1287 override void bind(Address){ 1288 assert(false, "Not Implemented"); 1289 } 1290 override void listen(int){ 1291 assert(false, "Not Implemented"); 1292 } 1293 } 1294 } 1295 1296 version (vibeD) { 1297 public alias TCPStream = TCPVibeStream; 1298 public alias SSLStream = SSLVibeStream; 1299 } 1300 else { 1301 public alias TCPStream = TCPSocketStream; 1302 public alias SSLStream = SSLSocketStream; 1303 }