1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 import core.stdc.string; 20 21 import requests.ssl_adapter : openssl, SSL, SSL_CTX; 22 23 alias InDataHandler = DataPipeIface!ubyte; 24 25 public class ConnectError: Exception { 26 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 27 super(message, file, line, next); 28 } 29 } 30 31 class DecodingException: Exception { 32 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 33 super(message, file, line, next); 34 } 35 } 36 37 public class TimeoutException: Exception { 38 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 39 super(message, file, line, next); 40 } 41 } 42 43 public class NetworkException: Exception { 44 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 45 super(message, file, line, next); 46 } 47 } 48 49 /** 50 * DataPipeIface can accept some data, process, and return processed data. 51 */ 52 public interface DataPipeIface(E) { 53 /// Is there any processed data ready for reading? 54 bool empty(); 55 /// Put next data portion for processing 56 //void put(E[]); 57 void putNoCopy(E[]); 58 /// Get any ready data 59 E[] get(); 60 /// Signal on end of incoming data stream. 61 void flush(); 62 } 63 /** 64 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 65 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 66 * and uncompress Content-Encoding "gzip". 67 */ 68 public class DataPipe(E) : DataPipeIface!E { 69 70 DataPipeIface!(E)[] pipe; 71 Buffer!E buffer; 72 /// Append data processor to pipeline 73 /// Params: 74 /// p = processor 75 final void insert(DataPipeIface!E p) { 76 pipe ~= p; 77 } 78 final E[][] process(DataPipeIface!E p, E[][] data) { 79 E[][] result; 80 data.each!(e => p.putNoCopy(e)); 81 while(!p.empty()) result ~= p.get(); 82 return result; 83 } 84 /// Process next data portion. Data passed over pipeline and store result in buffer. 85 /// Params: 86 /// data = input data buffer. 87 /// NoCopy means we do not copy data to buffer, we keep reference 88 final void putNoCopy(E[] data) { 89 if ( pipe.empty ) { 90 buffer.putNoCopy(data); 91 return; 92 } 93 try { 94 auto t = process(pipe.front, [data]); 95 foreach(ref p; pipe[1..$]) { 96 t = process(p, t); 97 } 98 t.each!(b => buffer.putNoCopy(b)); 99 } 100 catch (Exception e) { 101 throw new DecodingException(e.msg); 102 } 103 } 104 /// Get what was collected in internal buffer and clear it. 105 /// Returns: 106 /// data collected. 107 final E[] get() { 108 if ( buffer.empty ) { 109 return E[].init; 110 } 111 auto res = buffer.data; 112 buffer = Buffer!E.init; 113 return res; 114 } 115 /// 116 /// get without datamove. but user receive [][] 117 /// 118 final E[][] getNoCopy() { 119 if ( buffer.empty ) { 120 return E[][].init; 121 } 122 E[][] res = buffer.__repr.__buffer; 123 buffer = Buffer!E.init; 124 return res; 125 } 126 /// Test if internal buffer is empty 127 /// Returns: 128 /// true if internal buffer is empty (nothing to get()) 129 final bool empty() pure const @safe { 130 return buffer.empty; 131 } 132 final void flush() { 133 E[][] product; 134 foreach(ref p; pipe) { 135 product.each!(e => p.putNoCopy(e)); 136 p.flush(); 137 product.length = 0; 138 while( !p.empty ) product ~= p.get(); 139 } 140 product.each!(b => buffer.putNoCopy(b)); 141 } 142 } 143 144 /** 145 * Processor for gzipped/compressed content. 146 * Also support InputRange interface. 147 */ 148 public class Decompressor(E) : DataPipeIface!E { 149 private { 150 Buffer!ubyte __buff; 151 UnCompress __zlib; 152 } 153 this() { 154 __buff = Buffer!ubyte(); 155 __zlib = new UnCompress(); 156 } 157 final override void putNoCopy(E[] data) { 158 if ( __zlib is null ) { 159 __zlib = new UnCompress(); 160 } 161 __buff.putNoCopy(__zlib.uncompress(data)); 162 } 163 final override E[] get() pure { 164 assert(__buff.length); 165 auto r = __buff.__repr.__buffer[0]; 166 __buff.popFrontN(r.length); 167 return cast(E[])r; 168 } 169 final override void flush() { 170 if ( __zlib is null ) { 171 return; 172 } 173 __buff.put(__zlib.flush()); 174 } 175 final override @property bool empty() const pure @safe { 176 debug(requests) tracef("empty=%b", __buff.empty); 177 return __buff.empty; 178 } 179 final @property auto ref front() pure const @safe { 180 debug(requests) tracef("front: buff length=%d", __buff.length); 181 return __buff.front; 182 } 183 final @property auto popFront() pure @safe { 184 debug(requests) tracef("popFront: buff length=%d", __buff.length); 185 return __buff.popFront; 186 } 187 final @property void popFrontN(size_t n) pure @safe { 188 __buff.popFrontN(n); 189 } 190 } 191 192 /** 193 * Unchunk chunked http responce body. 194 */ 195 public class DecodeChunked : DataPipeIface!ubyte { 196 // length := 0 197 // read chunk-size, chunk-extension (if any) and CRLF 198 // while (chunk-size > 0) { 199 // read chunk-data and CRLF 200 // append chunk-data to entity-body 201 // length := length + chunk-size 202 // read chunk-size and CRLF 203 // } 204 // read entity-header 205 // while (entity-header not empty) { 206 // append entity-header to existing header fields 207 // read entity-header 208 // } 209 // Content-Length := length 210 // Remove "chunked" from Transfer-Encoding 211 // 212 213 // Chunked-Body = *chunk 214 // last-chunk 215 // trailer 216 // CRLF 217 // 218 // chunk = chunk-size [ chunk-extension ] CRLF 219 // chunk-data CRLF 220 // chunk-size = 1*HEX 221 // last-chunk = 1*("0") [ chunk-extension ] CRLF 222 // 223 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 224 // chunk-ext-name = token 225 // chunk-ext-val = token | quoted-string 226 // chunk-data = chunk-size(OCTET) 227 // trailer = *(entity-header CRLF) 228 229 alias eType = ubyte; 230 immutable eType[] CRLF = ['\r', '\n']; 231 private { 232 enum States {huntingSize, huntingSeparator, receiving, trailer}; 233 char state = States.huntingSize; 234 size_t chunk_size, to_receive; 235 Buffer!ubyte buff; 236 ubyte[] linebuff; 237 } 238 final void putNoCopy(eType[] data) { 239 while ( data.length ) { 240 if ( state == States.trailer ) { 241 to_receive = to_receive - min(to_receive, data.length); 242 return; 243 } 244 if ( state == States.huntingSize ) { 245 import std.ascii; 246 ubyte[10] digits; 247 int i; 248 for(i=0;i<data.length;i++) { 249 ubyte v = data[i]; 250 digits[i] = v; 251 if ( v == '\n' ) { 252 i+=1; 253 break; 254 } 255 } 256 linebuff ~= digits[0..i]; 257 if ( linebuff.length >= 80 ) { 258 throw new DecodingException("Can't find chunk size in the body"); 259 } 260 data = data[i..$]; 261 if (!linebuff.canFind(CRLF)) { 262 continue; 263 } 264 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 265 state = States.receiving; 266 to_receive = chunk_size; 267 if ( chunk_size == 0 ) { 268 to_receive = 2-min(2, data.length); // trailing \r\n 269 state = States.trailer; 270 return; 271 } 272 continue; 273 } 274 if ( state == States.receiving ) { 275 if (to_receive > 0 ) { 276 auto can_store = min(to_receive, data.length); 277 buff.putNoCopy(data[0..can_store]); 278 data = data[can_store..$]; 279 to_receive -= can_store; 280 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 281 if ( to_receive == 0 ) { 282 //tracef("switch to huntig separator"); 283 state = States.huntingSeparator; 284 continue; 285 } 286 continue; 287 } 288 assert(false); 289 } 290 if ( state == States.huntingSeparator ) { 291 if ( data[0] == '\n' || data[0]=='\r') { 292 data = data[1..$]; 293 continue; 294 } 295 state = States.huntingSize; 296 linebuff.length = 0; 297 continue; 298 } 299 } 300 } 301 final eType[] get() { 302 auto r = buff.__repr.__buffer[0]; 303 buff.popFrontN(r.length); 304 return r; 305 } 306 final void flush() { 307 } 308 final bool empty() { 309 debug(requests) tracef("empty=%b", buff.empty); 310 return buff.empty; 311 } 312 final bool done() { 313 return state==States.trailer && to_receive==0; 314 } 315 } 316 317 unittest { 318 info("Testing DataPipe"); 319 globalLogLevel(LogLevel.info); 320 alias eType = char; 321 eType[] gzipped = [ 322 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 323 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 324 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 325 0x08, 0x00, 0x00, 0x00 326 ]; // "abc\ndef\n" 327 auto d = new Decompressor!eType(); 328 d.putNoCopy(gzipped[0..2].dup); 329 d.putNoCopy(gzipped[2..10].dup); 330 d.putNoCopy(gzipped[10..$].dup); 331 d.flush(); 332 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 333 334 auto e = new Decompressor!eType(); 335 e.putNoCopy(gzipped[0..10].dup); 336 e.putNoCopy(gzipped[10..$].dup); 337 e.flush(); 338 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 339 // writeln(gzipped.decompress.filter!(a => a!='b').array); 340 auto dp = new DataPipe!eType; 341 dp.insert(new Decompressor!eType()); 342 dp.putNoCopy(gzipped[0..2].dup); 343 dp.putNoCopy(gzipped[2..$].dup); 344 dp.flush(); 345 assert(equal(dp.get(), "abc\ndef\n")); 346 // empty datapipe shoul just pass input to output 347 auto dpu = new DataPipe!ubyte; 348 dpu.putNoCopy("abcd".dup.representation); 349 dpu.putNoCopy("efgh".dup.representation); 350 dpu.flush(); 351 assert(equal(dpu.get(), "abcdefgh")); 352 info("Test unchunker properties"); 353 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation; 354 ubyte[][] result; 355 auto uc = new DecodeChunked(); 356 uc.putNoCopy(twoChunks); 357 while(!uc.empty) { 358 result ~= uc.get(); 359 } 360 assert(equal(result[0], ['1', '2'])); 361 assert(equal(result[1], ['3', '4'])); 362 info("unchunker correctness - ok"); 363 result[0][0] = '5'; 364 assert(twoChunks[3] == '5'); 365 info("unchunker zero copy - ok"); 366 info("Testing DataPipe - done"); 367 } 368 /** 369 * Buffer used to collect and process data from network. It remainds Appender, but support 370 * also Range interface. 371 * $(P To place data in buffer use put() method.) 372 * $(P To retrieve data from buffer you can use several methods:) 373 * $(UL 374 * $(LI Range methods: front, back, index []) 375 * $(LI data method: return collected data (like Appender.data)) 376 * ) 377 */ 378 static this() { 379 } 380 static ~this() { 381 } 382 enum CACHESIZE = 1024; 383 384 static long reprAlloc; 385 static long reprCacheHit; 386 static long reprCacheRequests; 387 388 389 public struct Buffer(T) { 390 // static Repr[CACHESIZE] cache; 391 // static uint cacheIndex; 392 393 private { 394 Repr cachedOrNew() { 395 return new Repr; 396 // reprCacheRequests++; 397 // if ( false && cacheIndex>0 ) { 398 // reprCacheHit++; 399 // cacheIndex -= 1; 400 // return cache[cacheIndex]; 401 // } else { 402 // return new Repr; 403 // } 404 } 405 class Repr { 406 size_t __length; 407 Unqual!T[][] __buffer; 408 this() { 409 reprAlloc++; 410 __length = 0; 411 } 412 this(Repr other) { 413 reprAlloc++; 414 if ( other is null ) 415 return; 416 __length = other.__length; 417 __buffer = other.__buffer.dup; 418 } 419 } 420 Repr __repr; 421 } 422 423 alias toString = data!string; 424 425 this(this) { 426 if ( !__repr ) { 427 return; 428 } 429 __repr = new Repr(__repr); 430 } 431 this(U)(U[] data) { 432 put(data); 433 } 434 ~this() { 435 __repr = null; 436 } 437 /*************** 438 * store data. Data copied 439 */ 440 auto put(U)(U[] data) { 441 if ( data.length == 0 ) { 442 return; 443 } 444 if ( !__repr ) { 445 __repr = cachedOrNew(); 446 } 447 static if (!is(U == T)) { 448 auto d = cast(T[])(data); 449 __repr.__length += d.length; 450 __repr.__buffer ~= d.dup; 451 } else { 452 __repr.__length += data.length; 453 __repr.__buffer ~= data.dup; 454 } 455 return; 456 } 457 auto putNoCopy(U)(U[] data) { 458 if ( data.length == 0 ) { 459 return; 460 } 461 if ( !__repr ) { 462 __repr = cachedOrNew(); 463 } 464 static if (!is(U == T)) { 465 auto d = cast(T[])(data); 466 __repr.__length += d.length; 467 __repr.__buffer ~= d; 468 } else { 469 __repr.__length += data.length; 470 __repr.__buffer ~= data; 471 } 472 return; 473 } 474 @property auto opDollar() const pure @safe { 475 return __repr.__length; 476 } 477 @property size_t length() const pure @safe { 478 if ( !__repr ) { 479 return 0; 480 } 481 return __repr.__length; 482 } 483 @property auto empty() const pure @safe { 484 return length == 0; 485 } 486 @property auto ref front() const pure @safe { 487 assert(length); 488 return __repr.__buffer.front.front; 489 } 490 @property auto ref back() const pure @safe { 491 assert(length); 492 return __repr.__buffer.back.back; 493 } 494 @property void popFront() pure @safe { 495 assert(length); 496 with ( __repr ) { 497 __buffer.front.popFront; 498 if ( __buffer.front.length == 0 ) { 499 __buffer.popFront; 500 } 501 __length--; 502 } 503 } 504 @property void popFrontN(size_t n) pure @safe { 505 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 506 __repr.__length -= n; 507 while( n ) { 508 if ( n <= __repr.__buffer.front.length ) { 509 __repr.__buffer.front.popFrontN(n); 510 if ( __repr.__buffer.front.length == 0 ) { 511 __repr.__buffer.popFront; 512 } 513 return; 514 } 515 n -= __repr.__buffer.front.length; 516 __repr.__buffer.popFront; 517 } 518 } 519 @property void popBack() pure @safe { 520 assert(length); 521 __repr.__buffer.back.popBack; 522 if ( __repr.__buffer.back.length == 0 ) { 523 __repr.__buffer.popBack; 524 } 525 __repr.__length--; 526 } 527 @property void popBackN(size_t n) pure @safe { 528 assert(n <= length, "n: %d, length: %d".format(n, length)); 529 __repr.__length -= n; 530 while( n ) { 531 if ( n <= __repr.__buffer.back.length ) { 532 __repr.__buffer.back.popBackN(n); 533 if ( __repr.__buffer.back.length == 0 ) { 534 __repr.__buffer.popBack; 535 } 536 return; 537 } 538 n -= __repr.__buffer.back.length; 539 __repr.__buffer.popBack; 540 } 541 } 542 @property auto save() @safe { 543 auto n = Buffer!T(); 544 n.__repr = new Repr(__repr); 545 return n; 546 } 547 @property auto ref opIndex(size_t n) const pure @safe { 548 assert( __repr && n < __repr.__length ); 549 foreach(b; __repr.__buffer) { 550 if ( n < b.length ) { 551 return b[n]; 552 } 553 n -= b.length; 554 } 555 assert(false, "Impossible"); 556 } 557 Buffer!T opSlice(size_t m, size_t n) { 558 if ( empty || m == n ) { 559 return Buffer!T(); 560 } 561 assert( m <= n && n <= __repr.__length); 562 auto res = this.save(); 563 res.popBackN(res.__repr.__length-n); 564 res.popFrontN(m); 565 return res; 566 } 567 @property auto data(U=T[])() pure { 568 static if ( is(U==T[]) ) { 569 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 570 return __repr.__buffer.front; 571 } 572 } 573 Appender!(T[]) a; 574 if ( __repr && __repr.__buffer ) { 575 foreach(ref b; __repr.__buffer) { 576 a.put(b); 577 } 578 } 579 static if ( is(U==T[]) ) { 580 return a.data; 581 } else { 582 return cast(U)a.data; 583 } 584 } 585 string opCast(string)() { 586 return this.toString; 587 } 588 bool opEquals(U)(U x) { 589 return cast(U)this == x; 590 } 591 592 } 593 /// 594 public unittest { 595 596 static assert(isInputRange!(Buffer!ubyte)); 597 static assert(isForwardRange!(Buffer!ubyte)); 598 static assert(hasLength!(Buffer!ubyte)); 599 static assert(hasSlicing!(Buffer!ubyte)); 600 static assert(isBidirectionalRange!(Buffer!ubyte)); 601 static assert(isRandomAccessRange!(Buffer!ubyte)); 602 603 auto b = Buffer!ubyte(); 604 b.put("abc".representation.dup); 605 b.put("def".representation.dup); 606 assert(b.length == 6); 607 assert(b.toString == "abcdef"); 608 assert(b.front == 'a'); 609 assert(b.back == 'f'); 610 assert(equal(b[0..$], "abcdef")); 611 assert(equal(b[$-2..$], "ef")); 612 assert(b == "abcdef"); 613 b.popFront; 614 b.popBack; 615 assert(b.front == 'b'); 616 assert(b.back == 'e'); 617 assert(b.length == 4); 618 assert(retro(b).front == 'e'); 619 assert(countUntil(b, 'e') == 3); 620 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 621 assert(equal(b, "bcde")); 622 b.popFront; b.popFront; 623 assert(b.front == 'd'); 624 assert(b.front == b[0]); 625 assert(b.back == b[$-1]); 626 627 auto c = Buffer!ubyte(); 628 c.put("Header0: value0\n".representation.dup); 629 c.put("Header1: value1\n".representation.dup); 630 c.put("Header2: value2\n\nbody".representation.dup); 631 auto c_length = c.length; 632 auto eoh = countUntil(c, "\n\n"); 633 assert(eoh == 47); 634 foreach(header; c[0..eoh].splitter('\n') ) { 635 writeln(cast(string)header.data); 636 } 637 assert(equal(findSplit(c, "\n\n")[2], "body")); 638 assert(c.length == c_length); 639 } 640 641 public struct SSLOptions { 642 enum filetype { 643 pem, 644 asn1, 645 der = asn1, 646 } 647 private { 648 /** 649 * do we need to veryfy peer? 650 */ 651 bool _verifyPeer = true; 652 /** 653 * path to CA cert 654 */ 655 string _caCert; 656 /** 657 * path to key file (can also contain cert (for pem) 658 */ 659 string _keyFile; 660 /** 661 * path to cert file (can also contain key (for pem) 662 */ 663 string _certFile; 664 filetype _keyType = filetype.pem; 665 filetype _certType = filetype.pem; 666 } 667 ubyte haveFiles() pure nothrow @safe @nogc { 668 ubyte r = 0; 669 if ( _keyFile ) r|=1; 670 if ( _certFile ) r|=2; 671 return r; 672 } 673 // do we want to verify peer certificates? 674 bool getVerifyPeer() pure nothrow @nogc { 675 return _verifyPeer; 676 } 677 SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe { 678 _verifyPeer = v; 679 return this; 680 } 681 /// set key file name and type (default - pem) 682 auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 683 _keyFile = f; 684 _keyType = t; 685 return this; 686 } 687 auto getKeyFile() @safe pure nothrow @nogc { 688 return _keyFile; 689 } 690 auto getKeyType() @safe pure nothrow @nogc { 691 return _keyType; 692 } 693 /// set cert file name and type (default - pem) 694 auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 695 _certFile = f; 696 _certType = t; 697 return this; 698 } 699 auto setCaCert(string p) @safe pure nothrow @nogc { 700 _caCert = p; 701 return this; 702 } 703 auto getCaCert() @safe pure nothrow @nogc { 704 return _caCert; 705 } 706 auto getCertFile() @safe pure nothrow @nogc { 707 return _certFile; 708 } 709 auto getCertType() @safe pure nothrow @nogc { 710 return _certType; 711 } 712 /// set key file type 713 void setKeyType(string t) @safe pure nothrow { 714 _keyType = cast(filetype)sslKeyTypes[t]; 715 } 716 /// set cert file type 717 void setCertType(string t) @safe pure nothrow { 718 _certType = cast(filetype)sslKeyTypes[t]; 719 } 720 } 721 static immutable int[string] sslKeyTypes; 722 shared static this() { 723 sslKeyTypes = [ 724 "pem":SSLOptions.filetype.pem, 725 "asn1":SSLOptions.filetype.asn1, 726 "der":SSLOptions.filetype.der, 727 ]; 728 } 729 730 version(vibeD) { 731 } 732 else { 733 extern(C) { 734 int SSL_library_init(); 735 } 736 737 enum SSL_VERIFY_PEER = 0x01; 738 enum SSL_FILETYPE_PEM = 1; 739 enum SSL_FILETYPE_ASN1 = 2; 740 741 immutable int[SSLOptions.filetype] ft2ssl; 742 743 shared static this() { 744 ft2ssl = [ 745 SSLOptions.filetype.pem: SSL_FILETYPE_PEM, 746 SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1, 747 SSLOptions.filetype.der: SSL_FILETYPE_ASN1 748 ]; 749 } 750 751 public class OpenSslSocket : Socket { 752 //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 753 private SSL* ssl; 754 private SSL_CTX* ctx; 755 756 private void initSsl(SSLOptions opts) { 757 //ctx = SSL_CTX_new(SSLv3_client_method()); 758 ctx = openssl.SSL_CTX_new(openssl.TLS_method()); 759 assert(ctx !is null); 760 if ( opts.getVerifyPeer() ) { 761 openssl.SSL_CTX_set_default_verify_paths(ctx); 762 if ( opts.getCaCert() ) { 763 openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null); 764 } 765 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null); 766 } 767 immutable keyFile = opts.getKeyFile(); 768 immutable keyType = opts.getKeyType(); 769 immutable certFile = opts.getCertFile(); 770 immutable certType = opts.getCertType(); 771 final switch(opts.haveFiles()) { 772 case 0b11: // both files 773 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 774 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]); 775 break; 776 case 0b01: // key only 777 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 778 openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 779 break; 780 case 0b10: // cert only 781 openssl.SSL_CTX_use_PrivateKey_file(ctx, certFile.toStringz(), ft2ssl[certType]); 782 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]); 783 break; 784 case 0b00: 785 break; 786 } 787 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 788 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 789 ssl = openssl.SSL_new(ctx); 790 openssl.SSL_set_fd(ssl, cast(int)this.handle); 791 } 792 793 @trusted 794 override void connect(Address dest) { 795 super.connect(dest); 796 if(openssl.SSL_connect(ssl) == -1) { 797 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 798 } 799 } 800 auto connectSSL() { 801 if(openssl.SSL_connect(ssl) == -1) { 802 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 803 } 804 debug(requests) tracef("ssl socket connected"); 805 return this; 806 } 807 @trusted 808 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope { 809 return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length); 810 } 811 override ptrdiff_t send(const(void)[] buf) scope { 812 return send(buf, SocketFlags.NONE); 813 } 814 @trusted 815 override ptrdiff_t receive(void[] buf, SocketFlags flags) scope { 816 return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length); 817 } 818 override ptrdiff_t receive(void[] buf) scope { 819 return receive(buf, SocketFlags.NONE); 820 } 821 this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) { 822 super(af, type); 823 initSsl(opts); 824 } 825 this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) { 826 super(sock, af); 827 initSsl(opts); 828 } 829 override void close() scope { 830 super.close(); 831 if ( ssl !is null ) { 832 openssl.SSL_free(ssl); 833 ssl = null; 834 } 835 if ( ctx !is null ) { 836 openssl.SSL_CTX_free(ctx); 837 ctx = null; 838 } 839 } 840 void SSL_set_tlsext_host_name(string host) { 841 842 } 843 } 844 845 public class SSLSocketStream: SocketStream { 846 private SSLOptions _sslOptions; 847 private Socket underlyingSocket; 848 private SSL* ssl; 849 private string host; 850 851 this(SSLOptions opts) { 852 _sslOptions = opts; 853 } 854 this(NetworkStream ostream, SSLOptions opts, string host = null) { 855 _sslOptions = opts; 856 this.host = host; 857 auto osock = ostream.so(); 858 underlyingSocket = osock; 859 osock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 860 auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions); 861 ssl = ss.ssl; 862 if ( host !is null ) { 863 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 864 } 865 ss.connectSSL(); 866 __isOpen = true; 867 __isConnected = true; 868 s = ss; 869 debug(requests) tracef("ssl stream created from another stream: %s", s); 870 } 871 override void close() { 872 ssl = null; 873 host = null; 874 super.close(); 875 if ( underlyingSocket ) { 876 underlyingSocket.close(); 877 } 878 } 879 override void open(AddressFamily fa) { 880 if ( s !is null ) { 881 s.close(); 882 } 883 auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions); 884 assert(ss !is null, "Can't create socket"); 885 ssl = ss.ssl; 886 if ( host !is null ) { 887 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 888 } 889 s = ss; 890 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 891 __isOpen = true; 892 } 893 override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) { 894 host = h; 895 return super.connect(h, p, timeout); 896 } 897 override SSLSocketStream accept() { 898 auto newso = s.accept(); 899 if ( s is null ) { 900 return null; 901 } 902 auto newstream = new SSLSocketStream(_sslOptions); 903 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 904 newstream.s = sslSocket; 905 newstream.__isOpen = true; 906 newstream.__isConnected = true; 907 return newstream; 908 } 909 } 910 public class TCPSocketStream : SocketStream { 911 override void open(AddressFamily fa) { 912 if ( s !is null ) { 913 s.close(); 914 } 915 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 916 assert(s !is null, "Can't create socket"); 917 __isOpen = true; 918 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 919 } 920 override TCPSocketStream accept() { 921 auto newso = s.accept(); 922 if ( s is null ) { 923 return null; 924 } 925 auto newstream = new TCPSocketStream(); 926 newstream.s = newso; 927 newstream.__isOpen = true; 928 newstream.__isConnected = true; 929 newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 930 return newstream; 931 } 932 } 933 } 934 935 public interface NetworkStream { 936 @property bool isConnected() const; 937 @property bool isOpen() const; 938 939 void close() @trusted; 940 941 /// 942 /// timeout is the socket write timeout. 943 /// 944 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 945 946 ptrdiff_t send(const(void)[] buff); 947 ptrdiff_t receive(void[] buff); 948 949 NetworkStream accept(); 950 @property void reuseAddr(bool); 951 void bind(string); 952 void bind(Address); 953 void listen(int); 954 version(vibeD) { 955 TCPConnection so(); 956 } else { 957 Socket so(); 958 } 959 /// 960 /// Set timeout for receive calls. 0 means no timeout. 961 /// 962 @property void readTimeout(Duration timeout); 963 } 964 965 public abstract class SocketStream : NetworkStream { 966 private { 967 Duration timeout; 968 Socket s; 969 bool __isOpen; 970 bool __isConnected; 971 string _bind; 972 } 973 void open(AddressFamily fa) { 974 } 975 @property Socket so() @safe pure { 976 return s; 977 } 978 @property bool isOpen() @safe @nogc pure const { 979 return s && __isOpen; 980 } 981 @property bool isConnected() @safe @nogc pure const { 982 return s && __isOpen && __isConnected; 983 } 984 void close() @trusted { 985 debug(requests) tracef("Close socket"); 986 if ( isOpen ) { 987 s.close(); 988 __isOpen = false; 989 __isConnected = false; 990 } 991 s = null; 992 } 993 /*** 994 * bind() just remember address. We will cal bind() at the time of connect as 995 * we can have several connection trials. 996 ***/ 997 override void bind(string to) { 998 _bind = to; 999 } 1000 /*** 1001 * Make connection to remote site. Bind, handle connection error, try several addresses, etc 1002 ***/ 1003 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1004 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 1005 Address[] addresses; 1006 __isConnected = false; 1007 try { 1008 addresses = getAddress(host, port); 1009 } catch (Exception e) { 1010 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 1011 } 1012 foreach(a; addresses) { 1013 debug(requests) tracef("Trying %s", a); 1014 try { 1015 open(a.addressFamily); 1016 if ( _bind !is null ) { 1017 auto ad = getAddress(_bind); 1018 debug(requests) tracef("bind to %s", ad[0]); 1019 s.bind(ad[0]); 1020 } 1021 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 1022 s.connect(a); 1023 debug(requests) tracef("Connected to %s", a); 1024 __isConnected = true; 1025 break; 1026 } catch (SocketException e) { 1027 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 1028 s.close(); 1029 } 1030 } 1031 if ( !__isConnected ) { 1032 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 1033 } 1034 return this; 1035 } 1036 1037 ptrdiff_t send(const(void)[] buff) @safe 1038 in {assert(isConnected);} 1039 body { 1040 auto rc = s.send(buff); 1041 if (rc < 0) { 1042 close(); 1043 throw new NetworkException("sending data"); 1044 } 1045 return rc; 1046 } 1047 1048 ptrdiff_t receive(void[] buff) { 1049 while (true) { 1050 auto r = s.receive(buff); 1051 if (r < 0) { 1052 auto e = errno; 1053 version(Windows) { 1054 close(); 1055 if ( e == 0 ) { 1056 throw new TimeoutException("Timeout receiving data"); 1057 } 1058 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1059 } 1060 version(Posix) { 1061 if ( e == EINTR ) { 1062 continue; 1063 } 1064 close(); 1065 if ( e == EAGAIN ) { 1066 throw new TimeoutException("Timeout receiving data"); 1067 } 1068 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1069 } 1070 } 1071 else { 1072 buff.length = r; 1073 } 1074 return r; 1075 } 1076 assert(false); 1077 } 1078 1079 @property void readTimeout(Duration timeout) @safe { 1080 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 1081 } 1082 override SocketStream accept() { 1083 assert(false, "Implement before use"); 1084 } 1085 @property override void reuseAddr(bool yes){ 1086 if (yes) { 1087 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 1088 } 1089 else { 1090 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 1091 } 1092 } 1093 override void bind(Address addr){ 1094 s.bind(addr); 1095 } 1096 override void listen(int n) { 1097 s.listen(n); 1098 }; 1099 } 1100 1101 version (vibeD) { 1102 import vibe.core.net, vibe.stream.tls; 1103 1104 public class TCPVibeStream : NetworkStream { 1105 private: 1106 TCPConnection _conn; 1107 Duration _readTimeout = Duration.max; 1108 bool _isOpen = true; 1109 string _bind; 1110 1111 public: 1112 @property bool isConnected() const { 1113 return _conn.connected; 1114 } 1115 @property override bool isOpen() const { 1116 return _conn && _isOpen; 1117 } 1118 void close() @trusted { 1119 _conn.close(); 1120 _isOpen = false; 1121 } 1122 override TCPConnection so() { 1123 return _conn; 1124 } 1125 override void bind(string to) { 1126 _bind = to; 1127 } 1128 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1129 // FIXME: timeout not supported in vibe.d 1130 try { 1131 _conn = connectTCP(host, port, _bind); 1132 } 1133 catch (Exception e) 1134 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1135 1136 return this; 1137 } 1138 1139 ptrdiff_t send(const(void)[] buff) { 1140 _conn.write(cast(const(ubyte)[])buff); 1141 return buff.length; 1142 } 1143 1144 ptrdiff_t receive(void[] buff) { 1145 if (!_conn.waitForData(_readTimeout)) { 1146 if (!_conn.connected || _conn.empty ) { 1147 return 0; 1148 } 1149 throw new TimeoutException("Timeout receiving data"); 1150 } 1151 1152 if(_conn.empty) { 1153 return 0; 1154 } 1155 1156 auto chunk = min(_conn.leastSize, buff.length); 1157 assert(chunk != 0); 1158 _conn.read(cast(ubyte[])buff[0 .. chunk]); 1159 return chunk; 1160 } 1161 1162 @property void readTimeout(Duration timeout) { 1163 if (timeout == 0.seconds) { 1164 _readTimeout = Duration.max; 1165 } 1166 else { 1167 _readTimeout = timeout; 1168 } 1169 } 1170 override TCPVibeStream accept() { 1171 assert(false, "Must be implemented"); 1172 } 1173 override @property void reuseAddr(bool){ 1174 assert(false, "Not Implemented"); 1175 } 1176 override void bind(Address){ 1177 assert(false, "Not Implemented"); 1178 } 1179 override void listen(int){ 1180 assert(false, "Not Implemented"); 1181 } 1182 } 1183 1184 public class SSLVibeStream : TCPVibeStream { 1185 private: 1186 TLSStream _sslStream; 1187 bool _isOpen = true; 1188 SSLOptions _sslOptions; 1189 TCPConnection underlyingConnection; 1190 1191 void connectSSL(string host) { 1192 auto sslctx = createTLSContext(TLSContextKind.client); 1193 if ( _sslOptions.getVerifyPeer() ) { 1194 if ( _sslOptions.getCaCert() == null ) { 1195 throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate."); 1196 } 1197 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert()); 1198 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert; 1199 } else { 1200 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1201 } 1202 immutable keyFile = _sslOptions.getKeyFile(); 1203 immutable certFile = _sslOptions.getCertFile(); 1204 final switch(_sslOptions.haveFiles()) { 1205 case 0b11: // both files 1206 sslctx.usePrivateKeyFile(keyFile); 1207 sslctx.useCertificateChainFile(certFile); 1208 break; 1209 case 0b01: // key only 1210 sslctx.usePrivateKeyFile(keyFile); 1211 sslctx.useCertificateChainFile(keyFile); 1212 break; 1213 case 0b10: // cert only 1214 sslctx.usePrivateKeyFile(certFile); 1215 sslctx.useCertificateChainFile(certFile); 1216 break; 1217 case 0b00: 1218 break; 1219 } 1220 _sslStream = createTLSStream(_conn, sslctx, host); 1221 } 1222 1223 public: 1224 this(SSLOptions opts) { 1225 _sslOptions = opts; 1226 } 1227 override TCPConnection so() { 1228 return _conn; 1229 } 1230 this(NetworkStream ostream, SSLOptions opts, string host = null) { 1231 _sslOptions = opts; 1232 auto oconn = ostream.so(); 1233 underlyingConnection = oconn; 1234 _conn = oconn; 1235 connectSSL(host); 1236 } 1237 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1238 try { 1239 _conn = connectTCP(host, port); 1240 connectSSL(host); 1241 } 1242 catch (ConnectError e) { 1243 throw e; 1244 } 1245 catch (Exception e) { 1246 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1247 } 1248 1249 return this; 1250 } 1251 1252 override ptrdiff_t send(const(void)[] buff) { 1253 _sslStream.write(cast(const(ubyte)[])buff); 1254 return buff.length; 1255 } 1256 1257 override ptrdiff_t receive(void[] buff) { 1258 if (!_sslStream.dataAvailableForRead) { 1259 if (!_conn.waitForData(_readTimeout)) { 1260 if (!_conn.connected) { 1261 return 0; 1262 } 1263 throw new TimeoutException("Timeout receiving data"); 1264 } 1265 } 1266 1267 if(_sslStream.empty) { 1268 return 0; 1269 } 1270 1271 auto chunk = min(_sslStream.leastSize, buff.length); 1272 assert(chunk != 0); 1273 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1274 return chunk; 1275 } 1276 1277 override void close() @trusted { 1278 _sslStream.finalize(); 1279 _conn.close(); 1280 _isOpen = false; 1281 } 1282 @property override bool isOpen() const { 1283 return _conn && _isOpen; 1284 } 1285 override SSLVibeStream accept() { 1286 assert(false, "Must be implemented"); 1287 } 1288 override @property void reuseAddr(bool){ 1289 assert(false, "Not Implemented"); 1290 } 1291 override void bind(Address){ 1292 assert(false, "Not Implemented"); 1293 } 1294 override void listen(int){ 1295 assert(false, "Not Implemented"); 1296 } 1297 } 1298 } 1299 1300 version (vibeD) { 1301 public alias TCPStream = TCPVibeStream; 1302 public alias SSLStream = SSLVibeStream; 1303 } 1304 else { 1305 public alias TCPStream = TCPSocketStream; 1306 public alias SSLStream = SSLSocketStream; 1307 }