1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 import core.stdc.string; 20 21 import requests.ssl_adapter : openssl, SSL, SSL_CTX; 22 23 alias InDataHandler = DataPipeIface!ubyte; 24 25 public class ConnectError: Exception { 26 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 27 super(message, file, line, next); 28 } 29 } 30 31 class DecodingException: Exception { 32 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 33 super(message, file, line, next); 34 } 35 } 36 37 public class TimeoutException: Exception { 38 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 39 super(message, file, line, next); 40 } 41 } 42 43 public class NetworkException: Exception { 44 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 45 super(message, file, line, next); 46 } 47 } 48 49 /** 50 * DataPipeIface can accept some data, process, and return processed data. 51 */ 52 public interface DataPipeIface(E) { 53 /// Is there any processed data ready for reading? 54 bool empty(); 55 /// Put next data portion for processing 56 //void put(E[]); 57 void putNoCopy(E[]); 58 /// Get any ready data 59 E[] get(); 60 /// Signal on end of incoming data stream. 61 void flush(); 62 } 63 /** 64 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 65 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 66 * and uncompress Content-Encoding "gzip". 67 */ 68 public class DataPipe(E) : DataPipeIface!E { 69 70 DataPipeIface!(E)[] pipe; 71 Buffer!E buffer; 72 /// Append data processor to pipeline 73 /// Params: 74 /// p = processor 75 final void insert(DataPipeIface!E p) { 76 pipe ~= p; 77 } 78 final E[][] process(DataPipeIface!E p, E[][] data) { 79 E[][] result; 80 data.each!(e => p.putNoCopy(e)); 81 while(!p.empty()) result ~= p.get(); 82 return result; 83 } 84 /// Process next data portion. Data passed over pipeline and store result in buffer. 85 /// Params: 86 /// data = input data buffer. 87 /// NoCopy means we do not copy data to buffer, we keep reference 88 final void putNoCopy(E[] data) { 89 if ( pipe.empty ) { 90 buffer.putNoCopy(data); 91 return; 92 } 93 try { 94 auto t = process(pipe.front, [data]); 95 foreach(ref p; pipe[1..$]) { 96 t = process(p, t); 97 } 98 t.each!(b => buffer.putNoCopy(b)); 99 } 100 catch (Exception e) { 101 throw new DecodingException(e.msg); 102 } 103 } 104 /// Get what was collected in internal buffer and clear it. 105 /// Returns: 106 /// data collected. 107 final E[] get() { 108 if ( buffer.empty ) { 109 return E[].init; 110 } 111 auto res = buffer.data; 112 buffer = Buffer!E.init; 113 return res; 114 } 115 /// 116 /// get without datamove. but user receive [][] 117 /// 118 final E[][] getNoCopy() { 119 if ( buffer.empty ) { 120 return E[][].init; 121 } 122 E[][] res = buffer.__repr.__buffer; 123 buffer = Buffer!E.init; 124 return res; 125 } 126 /// Test if internal buffer is empty 127 /// Returns: 128 /// true if internal buffer is empty (nothing to get()) 129 final bool empty() pure const @safe { 130 return buffer.empty; 131 } 132 final void flush() { 133 E[][] product; 134 foreach(ref p; pipe) { 135 product.each!(e => p.putNoCopy(e)); 136 p.flush(); 137 product.length = 0; 138 while( !p.empty ) product ~= p.get(); 139 } 140 product.each!(b => buffer.putNoCopy(b)); 141 } 142 } 143 144 /** 145 * Processor for gzipped/compressed content. 146 * Also support InputRange interface. 147 */ 148 public class Decompressor(E) : DataPipeIface!E { 149 private { 150 Buffer!ubyte __buff; 151 UnCompress __zlib; 152 } 153 this() { 154 __buff = Buffer!ubyte(); 155 __zlib = new UnCompress(); 156 } 157 final override void putNoCopy(E[] data) { 158 if ( __zlib is null ) { 159 __zlib = new UnCompress(); 160 } 161 __buff.putNoCopy(__zlib.uncompress(data)); 162 } 163 final override E[] get() pure { 164 assert(__buff.length); 165 auto r = __buff.__repr.__buffer[0]; 166 __buff.popFrontN(r.length); 167 return cast(E[])r; 168 } 169 final override void flush() { 170 if ( __zlib is null ) { 171 return; 172 } 173 __buff.put(__zlib.flush()); 174 } 175 final override @property bool empty() const pure @safe { 176 debug(requests) tracef("empty=%b", __buff.empty); 177 return __buff.empty; 178 } 179 final @property auto ref front() pure const @safe { 180 debug(requests) tracef("front: buff length=%d", __buff.length); 181 return __buff.front; 182 } 183 final @property auto popFront() pure @safe { 184 debug(requests) tracef("popFront: buff length=%d", __buff.length); 185 return __buff.popFront; 186 } 187 final @property void popFrontN(size_t n) pure @safe { 188 __buff.popFrontN(n); 189 } 190 } 191 192 /** 193 * Unchunk chunked http responce body. 194 */ 195 public class DecodeChunked : DataPipeIface!ubyte { 196 // length := 0 197 // read chunk-size, chunk-extension (if any) and CRLF 198 // while (chunk-size > 0) { 199 // read chunk-data and CRLF 200 // append chunk-data to entity-body 201 // length := length + chunk-size 202 // read chunk-size and CRLF 203 // } 204 // read entity-header 205 // while (entity-header not empty) { 206 // append entity-header to existing header fields 207 // read entity-header 208 // } 209 // Content-Length := length 210 // Remove "chunked" from Transfer-Encoding 211 // 212 213 // Chunked-Body = *chunk 214 // last-chunk 215 // trailer 216 // CRLF 217 // 218 // chunk = chunk-size [ chunk-extension ] CRLF 219 // chunk-data CRLF 220 // chunk-size = 1*HEX 221 // last-chunk = 1*("0") [ chunk-extension ] CRLF 222 // 223 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 224 // chunk-ext-name = token 225 // chunk-ext-val = token | quoted-string 226 // chunk-data = chunk-size(OCTET) 227 // trailer = *(entity-header CRLF) 228 229 alias eType = ubyte; 230 immutable eType[] CRLF = ['\r', '\n']; 231 private { 232 enum States {huntingSize, huntingSeparator, receiving, trailer}; 233 char state = States.huntingSize; 234 size_t chunk_size, to_receive; 235 Buffer!ubyte buff; 236 ubyte[] linebuff; 237 } 238 final void putNoCopy(eType[] data) { 239 while ( data.length ) { 240 if ( state == States.trailer ) { 241 to_receive = to_receive - min(to_receive, data.length); 242 return; 243 } 244 if ( state == States.huntingSize ) { 245 import std.ascii; 246 ubyte[10] digits; 247 int i; 248 for(i=0;i<data.length;i++) { 249 ubyte v = data[i]; 250 digits[i] = v; 251 if ( v == '\n' ) { 252 i+=1; 253 break; 254 } 255 } 256 linebuff ~= digits[0..i]; 257 if ( linebuff.length >= 80 ) { 258 throw new DecodingException("Can't find chunk size in the body"); 259 } 260 data = data[i..$]; 261 if (!linebuff.canFind(CRLF)) { 262 continue; 263 } 264 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 265 state = States.receiving; 266 to_receive = chunk_size; 267 if ( chunk_size == 0 ) { 268 to_receive = 2-min(2, data.length); // trailing \r\n 269 state = States.trailer; 270 return; 271 } 272 continue; 273 } 274 if ( state == States.receiving ) { 275 if (to_receive > 0 ) { 276 auto can_store = min(to_receive, data.length); 277 buff.putNoCopy(data[0..can_store]); 278 data = data[can_store..$]; 279 to_receive -= can_store; 280 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 281 if ( to_receive == 0 ) { 282 //tracef("switch to huntig separator"); 283 state = States.huntingSeparator; 284 continue; 285 } 286 continue; 287 } 288 assert(false); 289 } 290 if ( state == States.huntingSeparator ) { 291 if ( data[0] == '\n' || data[0]=='\r') { 292 data = data[1..$]; 293 continue; 294 } 295 state = States.huntingSize; 296 linebuff.length = 0; 297 continue; 298 } 299 } 300 } 301 final eType[] get() { 302 auto r = buff.__repr.__buffer[0]; 303 buff.popFrontN(r.length); 304 return r; 305 } 306 final void flush() { 307 } 308 final bool empty() { 309 debug(requests) tracef("empty=%b", buff.empty); 310 return buff.empty; 311 } 312 final bool done() { 313 return state==States.trailer && to_receive==0; 314 } 315 } 316 317 unittest { 318 info("Testing DataPipe"); 319 globalLogLevel(LogLevel.info); 320 alias eType = char; 321 eType[] gzipped = [ 322 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 323 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 324 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 325 0x08, 0x00, 0x00, 0x00 326 ]; // "abc\ndef\n" 327 auto d = new Decompressor!eType(); 328 d.putNoCopy(gzipped[0..2].dup); 329 d.putNoCopy(gzipped[2..10].dup); 330 d.putNoCopy(gzipped[10..$].dup); 331 d.flush(); 332 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 333 334 auto e = new Decompressor!eType(); 335 e.putNoCopy(gzipped[0..10].dup); 336 e.putNoCopy(gzipped[10..$].dup); 337 e.flush(); 338 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 339 // writeln(gzipped.decompress.filter!(a => a!='b').array); 340 auto dp = new DataPipe!eType; 341 dp.insert(new Decompressor!eType()); 342 dp.putNoCopy(gzipped[0..2].dup); 343 dp.putNoCopy(gzipped[2..$].dup); 344 dp.flush(); 345 assert(equal(dp.get(), "abc\ndef\n")); 346 // empty datapipe shoul just pass input to output 347 auto dpu = new DataPipe!ubyte; 348 dpu.putNoCopy("abcd".dup.representation); 349 dpu.putNoCopy("efgh".dup.representation); 350 dpu.flush(); 351 assert(equal(dpu.get(), "abcdefgh")); 352 info("Test unchunker properties"); 353 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation; 354 ubyte[][] result; 355 auto uc = new DecodeChunked(); 356 uc.putNoCopy(twoChunks); 357 while(!uc.empty) { 358 result ~= uc.get(); 359 } 360 assert(equal(result[0], ['1', '2'])); 361 assert(equal(result[1], ['3', '4'])); 362 info("unchunker correctness - ok"); 363 result[0][0] = '5'; 364 assert(twoChunks[3] == '5'); 365 info("unchunker zero copy - ok"); 366 info("Testing DataPipe - done"); 367 } 368 /** 369 * Buffer used to collect and process data from network. It remainds Appender, but support 370 * also Range interface. 371 * $(P To place data in buffer use put() method.) 372 * $(P To retrieve data from buffer you can use several methods:) 373 * $(UL 374 * $(LI Range methods: front, back, index []) 375 * $(LI data method: return collected data (like Appender.data)) 376 * ) 377 */ 378 static this() { 379 } 380 static ~this() { 381 } 382 enum CACHESIZE = 1024; 383 384 static long reprAlloc; 385 static long reprCacheHit; 386 static long reprCacheRequests; 387 388 389 public struct Buffer(T) { 390 // static Repr[CACHESIZE] cache; 391 // static uint cacheIndex; 392 393 private { 394 Repr cachedOrNew() { 395 return new Repr; 396 // reprCacheRequests++; 397 // if ( false && cacheIndex>0 ) { 398 // reprCacheHit++; 399 // cacheIndex -= 1; 400 // return cache[cacheIndex]; 401 // } else { 402 // return new Repr; 403 // } 404 } 405 class Repr { 406 size_t __length; 407 Unqual!T[][] __buffer; 408 this() { 409 reprAlloc++; 410 __length = 0; 411 } 412 this(Repr other) { 413 reprAlloc++; 414 if ( other is null ) 415 return; 416 __length = other.__length; 417 __buffer = other.__buffer.dup; 418 } 419 } 420 Repr __repr; 421 } 422 423 alias toString = data!string; 424 425 this(this) { 426 if ( !__repr ) { 427 return; 428 } 429 __repr = new Repr(__repr); 430 } 431 this(U)(U[] data) { 432 put(data); 433 } 434 ~this() { 435 __repr = null; 436 } 437 /*************** 438 * store data. Data copied 439 */ 440 auto put(U)(U[] data) { 441 if ( data.length == 0 ) { 442 return; 443 } 444 if ( !__repr ) { 445 __repr = cachedOrNew(); 446 } 447 static if (!is(U == T)) { 448 auto d = cast(T[])(data); 449 __repr.__length += d.length; 450 __repr.__buffer ~= d.dup; 451 } else { 452 __repr.__length += data.length; 453 __repr.__buffer ~= data.dup; 454 } 455 return; 456 } 457 auto putNoCopy(U)(U[] data) { 458 if ( data.length == 0 ) { 459 return; 460 } 461 if ( !__repr ) { 462 __repr = cachedOrNew(); 463 } 464 static if (!is(U == T)) { 465 auto d = cast(T[])(data); 466 __repr.__length += d.length; 467 __repr.__buffer ~= d; 468 } else { 469 __repr.__length += data.length; 470 __repr.__buffer ~= data; 471 } 472 return; 473 } 474 @property auto opDollar() const pure @safe { 475 return __repr.__length; 476 } 477 @property size_t length() const pure @safe { 478 if ( !__repr ) { 479 return 0; 480 } 481 return __repr.__length; 482 } 483 @property auto empty() const pure @safe { 484 return length == 0; 485 } 486 @property auto ref front() const pure @safe { 487 assert(length); 488 return __repr.__buffer.front.front; 489 } 490 @property auto ref back() const pure @safe { 491 assert(length); 492 return __repr.__buffer.back.back; 493 } 494 @property void popFront() pure @safe { 495 assert(length); 496 with ( __repr ) { 497 __buffer.front.popFront; 498 if ( __buffer.front.length == 0 ) { 499 __buffer.popFront; 500 } 501 __length--; 502 } 503 } 504 @property void popFrontN(size_t n) pure @safe { 505 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 506 __repr.__length -= n; 507 while( n ) { 508 if ( n <= __repr.__buffer.front.length ) { 509 __repr.__buffer.front.popFrontN(n); 510 if ( __repr.__buffer.front.length == 0 ) { 511 __repr.__buffer.popFront; 512 } 513 return; 514 } 515 n -= __repr.__buffer.front.length; 516 __repr.__buffer.popFront; 517 } 518 } 519 @property void popBack() pure @safe { 520 assert(length); 521 __repr.__buffer.back.popBack; 522 if ( __repr.__buffer.back.length == 0 ) { 523 __repr.__buffer.popBack; 524 } 525 __repr.__length--; 526 } 527 @property void popBackN(size_t n) pure @safe { 528 assert(n <= length, "n: %d, length: %d".format(n, length)); 529 __repr.__length -= n; 530 while( n ) { 531 if ( n <= __repr.__buffer.back.length ) { 532 __repr.__buffer.back.popBackN(n); 533 if ( __repr.__buffer.back.length == 0 ) { 534 __repr.__buffer.popBack; 535 } 536 return; 537 } 538 n -= __repr.__buffer.back.length; 539 __repr.__buffer.popBack; 540 } 541 } 542 @property auto save() @safe { 543 auto n = Buffer!T(); 544 n.__repr = new Repr(__repr); 545 return n; 546 } 547 @property auto ref opIndex(size_t n) const pure @safe { 548 assert( __repr && n < __repr.__length ); 549 foreach(b; __repr.__buffer) { 550 if ( n < b.length ) { 551 return b[n]; 552 } 553 n -= b.length; 554 } 555 assert(false, "Impossible"); 556 } 557 Buffer!T opSlice(size_t m, size_t n) { 558 if ( empty || m == n ) { 559 return Buffer!T(); 560 } 561 assert( m <= n && n <= __repr.__length); 562 auto res = this.save(); 563 res.popBackN(res.__repr.__length-n); 564 res.popFrontN(m); 565 return res; 566 } 567 @property auto data(U=T[])() pure { 568 static if ( is(U==T[]) ) { 569 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 570 return __repr.__buffer.front; 571 } 572 } 573 Appender!(T[]) a; 574 if ( __repr && __repr.__buffer ) { 575 foreach(ref b; __repr.__buffer) { 576 a.put(b); 577 } 578 } 579 static if ( is(U==T[]) ) { 580 return a.data; 581 } else { 582 return cast(U)a.data; 583 } 584 } 585 string opCast(string)() { 586 return this.toString; 587 } 588 bool opEquals(U)(U x) { 589 return cast(U)this == x; 590 } 591 592 } 593 /// 594 public unittest { 595 596 static assert(isInputRange!(Buffer!ubyte)); 597 static assert(isForwardRange!(Buffer!ubyte)); 598 static assert(hasLength!(Buffer!ubyte)); 599 static assert(hasSlicing!(Buffer!ubyte)); 600 static assert(isBidirectionalRange!(Buffer!ubyte)); 601 static assert(isRandomAccessRange!(Buffer!ubyte)); 602 603 auto b = Buffer!ubyte(); 604 b.put("abc".representation.dup); 605 b.put("def".representation.dup); 606 assert(b.length == 6); 607 assert(b.toString == "abcdef"); 608 assert(b.front == 'a'); 609 assert(b.back == 'f'); 610 assert(equal(b[0..$], "abcdef")); 611 assert(equal(b[$-2..$], "ef")); 612 assert(b == "abcdef"); 613 b.popFront; 614 b.popBack; 615 assert(b.front == 'b'); 616 assert(b.back == 'e'); 617 assert(b.length == 4); 618 assert(retro(b).front == 'e'); 619 assert(countUntil(b, 'e') == 3); 620 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 621 assert(equal(b, "bcde")); 622 b.popFront; b.popFront; 623 assert(b.front == 'd'); 624 assert(b.front == b[0]); 625 assert(b.back == b[$-1]); 626 627 auto c = Buffer!ubyte(); 628 c.put("Header0: value0\n".representation.dup); 629 c.put("Header1: value1\n".representation.dup); 630 c.put("Header2: value2\n\nbody".representation.dup); 631 auto c_length = c.length; 632 auto eoh = countUntil(c, "\n\n"); 633 assert(eoh == 47); 634 foreach(header; c[0..eoh].splitter('\n') ) { 635 writeln(cast(string)header.data); 636 } 637 assert(equal(findSplit(c, "\n\n")[2], "body")); 638 assert(c.length == c_length); 639 } 640 641 public struct SSLOptions { 642 enum filetype { 643 pem, 644 asn1, 645 der = asn1, 646 } 647 private { 648 /** 649 * do we need to veryfy peer? 650 */ 651 bool _verifyPeer = true; 652 /** 653 * path to CA cert 654 */ 655 string _caCert; 656 /** 657 * path to key file (can also contain cert (for pem) 658 */ 659 string _keyFile; 660 /** 661 * path to cert file (can also contain key (for pem) 662 */ 663 string _certFile; 664 filetype _keyType = filetype.pem; 665 filetype _certType = filetype.pem; 666 } 667 ubyte haveFiles() pure nothrow @safe @nogc { 668 ubyte r = 0; 669 if ( _keyFile ) r|=1; 670 if ( _certFile ) r|=2; 671 return r; 672 } 673 // do we want to verify peer certificates? 674 bool getVerifyPeer() pure nothrow @nogc { 675 return _verifyPeer; 676 } 677 SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe { 678 _verifyPeer = v; 679 return this; 680 } 681 /// set key file name and type (default - pem) 682 auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 683 _keyFile = f; 684 _keyType = t; 685 return this; 686 } 687 auto getKeyFile() @safe pure nothrow @nogc { 688 return _keyFile; 689 } 690 auto getKeyType() @safe pure nothrow @nogc { 691 return _keyType; 692 } 693 /// set cert file name and type (default - pem) 694 auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 695 _certFile = f; 696 _certType = t; 697 return this; 698 } 699 auto setCaCert(string p) @safe pure nothrow @nogc { 700 _caCert = p; 701 return this; 702 } 703 auto getCaCert() @safe pure nothrow @nogc { 704 return _caCert; 705 } 706 auto getCertFile() @safe pure nothrow @nogc { 707 return _certFile; 708 } 709 auto getCertType() @safe pure nothrow @nogc { 710 return _certType; 711 } 712 /// set key file type 713 void setKeyType(string t) @safe pure nothrow { 714 _keyType = cast(filetype)sslKeyTypes[t]; 715 } 716 /// set cert file type 717 void setCertType(string t) @safe pure nothrow { 718 _certType = cast(filetype)sslKeyTypes[t]; 719 } 720 } 721 static immutable int[string] sslKeyTypes; 722 shared static this() { 723 sslKeyTypes = [ 724 "pem":SSLOptions.filetype.pem, 725 "asn1":SSLOptions.filetype.asn1, 726 "der":SSLOptions.filetype.der, 727 ]; 728 } 729 730 version(vibeD) { 731 } 732 else { 733 extern(C) { 734 int SSL_library_init(); 735 } 736 737 enum SSL_VERIFY_PEER = 0x01; 738 enum SSL_FILETYPE_PEM = 1; 739 enum SSL_FILETYPE_ASN1 = 2; 740 741 immutable int[SSLOptions.filetype] ft2ssl; 742 743 shared static this() { 744 ft2ssl = [ 745 SSLOptions.filetype.pem: SSL_FILETYPE_PEM, 746 SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1, 747 SSLOptions.filetype.der: SSL_FILETYPE_ASN1 748 ]; 749 } 750 751 public class OpenSslSocket : Socket { 752 //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 753 private SSL* ssl; 754 private SSL_CTX* ctx; 755 756 private void initSsl(SSLOptions opts) { 757 //ctx = SSL_CTX_new(SSLv3_client_method()); 758 ctx = openssl.SSL_CTX_new(openssl.TLS_method()); 759 assert(ctx !is null); 760 if ( opts.getVerifyPeer() ) { 761 openssl.SSL_CTX_set_default_verify_paths(ctx); 762 if ( opts.getCaCert() ) { 763 openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null); 764 } 765 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null); 766 } 767 immutable keyFile = opts.getKeyFile(); 768 immutable keyType = opts.getKeyType(); 769 immutable certFile = opts.getCertFile(); 770 immutable certType = opts.getCertType(); 771 final switch(opts.haveFiles()) { 772 case 0b11: // both files 773 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 774 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]); 775 break; 776 case 0b01: // key only 777 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 778 openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 779 break; 780 case 0b10: // cert only 781 openssl.SSL_CTX_use_PrivateKey_file(ctx, certFile.toStringz(), ft2ssl[certType]); 782 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]); 783 break; 784 case 0b00: 785 break; 786 } 787 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 788 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 789 ssl = openssl.SSL_new(ctx); 790 openssl.SSL_set_fd(ssl, cast(int)this.handle); 791 } 792 793 @trusted 794 override void connect(Address dest) { 795 super.connect(dest); 796 if(openssl.SSL_connect(ssl) == -1) { 797 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 798 } 799 } 800 auto connectSSL() { 801 if(openssl.SSL_connect(ssl) == -1) { 802 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 803 } 804 debug(requests) tracef("ssl socket connected"); 805 return this; 806 } 807 @trusted 808 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope { 809 return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length); 810 } 811 override ptrdiff_t send(const(void)[] buf) scope { 812 return send(buf, SocketFlags.NONE); 813 } 814 @trusted 815 override ptrdiff_t receive(void[] buf, SocketFlags flags) scope { 816 return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length); 817 } 818 override ptrdiff_t receive(void[] buf) scope { 819 return receive(buf, SocketFlags.NONE); 820 } 821 this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) { 822 super(af, type); 823 initSsl(opts); 824 } 825 this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) { 826 super(sock, af); 827 initSsl(opts); 828 } 829 override void close() scope { 830 super.close(); 831 if ( ssl !is null ) { 832 openssl.SSL_free(ssl); 833 ssl = null; 834 } 835 if ( ctx !is null ) { 836 openssl.SSL_CTX_free(ctx); 837 ctx = null; 838 } 839 } 840 void SSL_set_tlsext_host_name(string host) { 841 842 } 843 } 844 845 public class SSLSocketStream: SocketStream { 846 private SSLOptions _sslOptions; 847 private Socket underlyingSocket; 848 private SSL* ssl; 849 private string host; 850 851 this(SSLOptions opts) { 852 _sslOptions = opts; 853 } 854 this(NetworkStream ostream, SSLOptions opts, string host = null) { 855 _sslOptions = opts; 856 this.host = host; 857 auto osock = ostream.so(); 858 underlyingSocket = osock; 859 auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions); 860 ssl = ss.ssl; 861 if ( host !is null ) { 862 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 863 } 864 ss.connectSSL(); 865 __isOpen = true; 866 __isConnected = true; 867 s = ss; 868 debug(requests) tracef("ssl stream created from another stream: %s", s); 869 } 870 override void close() { 871 ssl = null; 872 host = null; 873 super.close(); 874 if ( underlyingSocket ) { 875 underlyingSocket.close(); 876 } 877 } 878 override void open(AddressFamily fa) { 879 if ( s !is null ) { 880 s.close(); 881 } 882 auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions); 883 assert(ss !is null, "Can't create socket"); 884 ssl = ss.ssl; 885 if ( host !is null ) { 886 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 887 } 888 s = ss; 889 __isOpen = true; 890 } 891 override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) { 892 host = h; 893 return super.connect(h, p, timeout); 894 } 895 override SSLSocketStream accept() { 896 auto newso = s.accept(); 897 if ( s is null ) { 898 return null; 899 } 900 auto newstream = new SSLSocketStream(_sslOptions); 901 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 902 newstream.s = sslSocket; 903 newstream.__isOpen = true; 904 newstream.__isConnected = true; 905 return newstream; 906 } 907 } 908 public class TCPSocketStream : SocketStream { 909 override void open(AddressFamily fa) { 910 if ( s !is null ) { 911 s.close(); 912 } 913 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 914 assert(s !is null, "Can't create socket"); 915 __isOpen = true; 916 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 917 } 918 override TCPSocketStream accept() { 919 auto newso = s.accept(); 920 if ( s is null ) { 921 return null; 922 } 923 auto newstream = new TCPSocketStream(); 924 newstream.s = newso; 925 newstream.__isOpen = true; 926 newstream.__isConnected = true; 927 newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 928 return newstream; 929 } 930 } 931 } 932 933 public interface NetworkStream { 934 @property bool isConnected() const; 935 @property bool isOpen() const; 936 937 void close() @trusted; 938 939 /// 940 /// timeout is the socket write timeout. 941 /// 942 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 943 944 ptrdiff_t send(const(void)[] buff); 945 ptrdiff_t receive(void[] buff); 946 947 NetworkStream accept(); 948 @property void reuseAddr(bool); 949 void bind(string); 950 void bind(Address); 951 void listen(int); 952 version(vibeD) { 953 TCPConnection so(); 954 } else { 955 Socket so(); 956 } 957 /// 958 /// Set timeout for receive calls. 0 means no timeout. 959 /// 960 @property void readTimeout(Duration timeout); 961 } 962 963 public abstract class SocketStream : NetworkStream { 964 private { 965 Duration timeout; 966 Socket s; 967 bool __isOpen; 968 bool __isConnected; 969 string _bind; 970 } 971 void open(AddressFamily fa) { 972 } 973 @property Socket so() @safe pure { 974 return s; 975 } 976 @property bool isOpen() @safe @nogc pure const { 977 return s && __isOpen; 978 } 979 @property bool isConnected() @safe @nogc pure const { 980 return s && __isOpen && __isConnected; 981 } 982 void close() @trusted { 983 debug(requests) tracef("Close socket"); 984 if ( isOpen ) { 985 s.close(); 986 __isOpen = false; 987 __isConnected = false; 988 } 989 s = null; 990 } 991 /*** 992 * bind() just remember address. We will cal bind() at the time of connect as 993 * we can have several connection trials. 994 ***/ 995 override void bind(string to) { 996 _bind = to; 997 } 998 /*** 999 * Make connection to remote site. Bind, handle connection error, try several addresses, etc 1000 ***/ 1001 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1002 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 1003 Address[] addresses; 1004 __isConnected = false; 1005 try { 1006 addresses = getAddress(host, port); 1007 } catch (Exception e) { 1008 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 1009 } 1010 foreach(a; addresses) { 1011 debug(requests) tracef("Trying %s", a); 1012 try { 1013 open(a.addressFamily); 1014 if ( _bind !is null ) { 1015 auto ad = getAddress(_bind); 1016 debug(requests) tracef("bind to %s", ad[0]); 1017 s.bind(ad[0]); 1018 } 1019 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 1020 s.connect(a); 1021 debug(requests) tracef("Connected to %s", a); 1022 __isConnected = true; 1023 break; 1024 } catch (SocketException e) { 1025 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 1026 s.close(); 1027 } 1028 } 1029 if ( !__isConnected ) { 1030 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 1031 } 1032 return this; 1033 } 1034 1035 ptrdiff_t send(const(void)[] buff) @safe 1036 in {assert(isConnected);} 1037 body { 1038 auto rc = s.send(buff); 1039 if (rc < 0) { 1040 close(); 1041 throw new NetworkException("sending data"); 1042 } 1043 return rc; 1044 } 1045 1046 ptrdiff_t receive(void[] buff) { 1047 while (true) { 1048 auto r = s.receive(buff); 1049 if (r < 0) { 1050 auto e = errno; 1051 version(Windows) { 1052 close(); 1053 if ( e == 0 ) { 1054 throw new TimeoutException("Timeout receiving data"); 1055 } 1056 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1057 } 1058 version(Posix) { 1059 if ( e == EINTR ) { 1060 continue; 1061 } 1062 close(); 1063 if ( e == EAGAIN ) { 1064 throw new TimeoutException("Timeout receiving data"); 1065 } 1066 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1067 } 1068 } 1069 else { 1070 buff.length = r; 1071 } 1072 return r; 1073 } 1074 assert(false); 1075 } 1076 1077 @property void readTimeout(Duration timeout) @safe { 1078 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 1079 } 1080 override SocketStream accept() { 1081 assert(false, "Implement before use"); 1082 } 1083 @property override void reuseAddr(bool yes){ 1084 if (yes) { 1085 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 1086 } 1087 else { 1088 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 1089 } 1090 } 1091 override void bind(Address addr){ 1092 s.bind(addr); 1093 } 1094 override void listen(int n) { 1095 s.listen(n); 1096 }; 1097 } 1098 1099 version (vibeD) { 1100 import vibe.core.net, vibe.stream.tls; 1101 1102 public class TCPVibeStream : NetworkStream { 1103 private: 1104 TCPConnection _conn; 1105 Duration _readTimeout = Duration.max; 1106 bool _isOpen = true; 1107 string _bind; 1108 1109 public: 1110 @property bool isConnected() const { 1111 return _conn.connected; 1112 } 1113 @property override bool isOpen() const { 1114 return _conn && _isOpen; 1115 } 1116 void close() @trusted { 1117 _conn.close(); 1118 _isOpen = false; 1119 } 1120 override TCPConnection so() { 1121 return _conn; 1122 } 1123 override void bind(string to) { 1124 _bind = to; 1125 } 1126 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1127 // FIXME: timeout not supported in vibe.d 1128 try { 1129 _conn = connectTCP(host, port, _bind); 1130 } 1131 catch (Exception e) 1132 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1133 1134 return this; 1135 } 1136 1137 ptrdiff_t send(const(void)[] buff) { 1138 _conn.write(cast(const(ubyte)[])buff); 1139 return buff.length; 1140 } 1141 1142 ptrdiff_t receive(void[] buff) { 1143 if (!_conn.waitForData(_readTimeout)) { 1144 if (!_conn.connected || _conn.empty ) { 1145 return 0; 1146 } 1147 throw new TimeoutException("Timeout receiving data"); 1148 } 1149 1150 if(_conn.empty) { 1151 return 0; 1152 } 1153 1154 auto chunk = min(_conn.leastSize, buff.length); 1155 assert(chunk != 0); 1156 _conn.read(cast(ubyte[])buff[0 .. chunk]); 1157 return chunk; 1158 } 1159 1160 @property void readTimeout(Duration timeout) { 1161 if (timeout == 0.seconds) { 1162 _readTimeout = Duration.max; 1163 } 1164 else { 1165 _readTimeout = timeout; 1166 } 1167 } 1168 override TCPVibeStream accept() { 1169 assert(false, "Must be implemented"); 1170 } 1171 override @property void reuseAddr(bool){ 1172 assert(false, "Not Implemented"); 1173 } 1174 override void bind(Address){ 1175 assert(false, "Not Implemented"); 1176 } 1177 override void listen(int){ 1178 assert(false, "Not Implemented"); 1179 } 1180 } 1181 1182 public class SSLVibeStream : TCPVibeStream { 1183 private: 1184 TLSStream _sslStream; 1185 bool _isOpen = true; 1186 SSLOptions _sslOptions; 1187 TCPConnection underlyingConnection; 1188 1189 void connectSSL(string host) { 1190 auto sslctx = createTLSContext(TLSContextKind.client); 1191 if ( _sslOptions.getVerifyPeer() ) { 1192 if ( _sslOptions.getCaCert() == null ) { 1193 throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate."); 1194 } 1195 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert()); 1196 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert; 1197 } else { 1198 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1199 } 1200 immutable keyFile = _sslOptions.getKeyFile(); 1201 immutable certFile = _sslOptions.getCertFile(); 1202 final switch(_sslOptions.haveFiles()) { 1203 case 0b11: // both files 1204 sslctx.usePrivateKeyFile(keyFile); 1205 sslctx.useCertificateChainFile(certFile); 1206 break; 1207 case 0b01: // key only 1208 sslctx.usePrivateKeyFile(keyFile); 1209 sslctx.useCertificateChainFile(keyFile); 1210 break; 1211 case 0b10: // cert only 1212 sslctx.usePrivateKeyFile(certFile); 1213 sslctx.useCertificateChainFile(certFile); 1214 break; 1215 case 0b00: 1216 break; 1217 } 1218 _sslStream = createTLSStream(_conn, sslctx, host); 1219 } 1220 1221 public: 1222 this(SSLOptions opts) { 1223 _sslOptions = opts; 1224 } 1225 override TCPConnection so() { 1226 return _conn; 1227 } 1228 this(NetworkStream ostream, SSLOptions opts, string host = null) { 1229 _sslOptions = opts; 1230 auto oconn = ostream.so(); 1231 underlyingConnection = oconn; 1232 _conn = oconn; 1233 connectSSL(host); 1234 } 1235 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1236 try { 1237 _conn = connectTCP(host, port); 1238 connectSSL(host); 1239 } 1240 catch (ConnectError e) { 1241 throw e; 1242 } 1243 catch (Exception e) { 1244 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1245 } 1246 1247 return this; 1248 } 1249 1250 override ptrdiff_t send(const(void)[] buff) { 1251 _sslStream.write(cast(const(ubyte)[])buff); 1252 return buff.length; 1253 } 1254 1255 override ptrdiff_t receive(void[] buff) { 1256 if (!_sslStream.dataAvailableForRead) { 1257 if (!_conn.waitForData(_readTimeout)) { 1258 if (!_conn.connected) { 1259 return 0; 1260 } 1261 throw new TimeoutException("Timeout receiving data"); 1262 } 1263 } 1264 1265 if(_sslStream.empty) { 1266 return 0; 1267 } 1268 1269 auto chunk = min(_sslStream.leastSize, buff.length); 1270 assert(chunk != 0); 1271 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1272 return chunk; 1273 } 1274 1275 override void close() @trusted { 1276 _sslStream.finalize(); 1277 _conn.close(); 1278 _isOpen = false; 1279 } 1280 @property override bool isOpen() const { 1281 return _conn && _isOpen; 1282 } 1283 override SSLVibeStream accept() { 1284 assert(false, "Must be implemented"); 1285 } 1286 override @property void reuseAddr(bool){ 1287 assert(false, "Not Implemented"); 1288 } 1289 override void bind(Address){ 1290 assert(false, "Not Implemented"); 1291 } 1292 override void listen(int){ 1293 assert(false, "Not Implemented"); 1294 } 1295 } 1296 } 1297 1298 version (vibeD) { 1299 public alias TCPStream = TCPVibeStream; 1300 public alias SSLStream = SSLVibeStream; 1301 } 1302 else { 1303 public alias TCPStream = TCPSocketStream; 1304 public alias SSLStream = SSLSocketStream; 1305 }