1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 import core.stdc.string; 20 21 import requests.ssl_adapter : openssl, SSL, SSL_CTX; 22 23 alias InDataHandler = DataPipeIface!ubyte; 24 25 public class ConnectError: Exception { 26 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 27 super(message, file, line, next); 28 } 29 } 30 31 class DecodingException: Exception { 32 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 33 super(message, file, line, next); 34 } 35 } 36 37 public class TimeoutException: Exception { 38 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 39 super(message, file, line, next); 40 } 41 } 42 43 public class NetworkException: Exception { 44 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 45 super(message, file, line, next); 46 } 47 } 48 49 /** 50 * DataPipeIface can accept some data, process, and return processed data. 51 */ 52 public interface DataPipeIface(E) { 53 /// Is there any processed data ready for reading? 54 bool empty(); 55 /// Put next data portion for processing 56 //void put(E[]); 57 void putNoCopy(E[]); 58 /// Get any ready data 59 E[] get(); 60 /// Signal on end of incoming data stream. 61 void flush(); 62 } 63 /** 64 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 65 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 66 * and uncompress Content-Encoding "gzip". 67 */ 68 public class DataPipe(E) : DataPipeIface!E { 69 70 DataPipeIface!(E)[] pipe; 71 Buffer!E buffer; 72 /// Append data processor to pipeline 73 /// Params: 74 /// p = processor 75 final void insert(DataPipeIface!E p) { 76 pipe ~= p; 77 } 78 final E[][] process(DataPipeIface!E p, E[][] data) { 79 E[][] result; 80 data.each!(e => p.putNoCopy(e)); 81 while(!p.empty()) result ~= p.get(); 82 return result; 83 } 84 /// Process next data portion. Data passed over pipeline and store result in buffer. 85 /// Params: 86 /// data = input data buffer. 87 /// NoCopy means we do not copy data to buffer, we keep reference 88 final void putNoCopy(E[] data) { 89 if ( pipe.empty ) { 90 buffer.putNoCopy(data); 91 return; 92 } 93 try { 94 auto t = process(pipe.front, [data]); 95 foreach(ref p; pipe[1..$]) { 96 t = process(p, t); 97 } 98 t.each!(b => buffer.putNoCopy(b)); 99 } 100 catch (Exception e) { 101 throw new DecodingException(e.msg); 102 } 103 } 104 /// Get what was collected in internal buffer and clear it. 105 /// Returns: 106 /// data collected. 107 final E[] get() { 108 if ( buffer.empty ) { 109 return E[].init; 110 } 111 auto res = buffer.data; 112 buffer = Buffer!E.init; 113 return res; 114 } 115 /// 116 /// get without datamove. but user receive [][] 117 /// 118 final E[][] getNoCopy() { 119 if ( buffer.empty ) { 120 return E[][].init; 121 } 122 E[][] res = buffer.__repr.__buffer; 123 buffer = Buffer!E.init; 124 return res; 125 } 126 /// Test if internal buffer is empty 127 /// Returns: 128 /// true if internal buffer is empty (nothing to get()) 129 final bool empty() pure const @safe { 130 return buffer.empty; 131 } 132 final void flush() { 133 E[][] product; 134 foreach(ref p; pipe) { 135 product.each!(e => p.putNoCopy(e)); 136 p.flush(); 137 product.length = 0; 138 while( !p.empty ) product ~= p.get(); 139 } 140 product.each!(b => buffer.putNoCopy(b)); 141 } 142 } 143 144 /** 145 * Processor for gzipped/compressed content. 146 * Also support InputRange interface. 147 */ 148 public class Decompressor(E) : DataPipeIface!E { 149 private { 150 Buffer!ubyte __buff; 151 UnCompress __zlib; 152 } 153 this() { 154 __buff = Buffer!ubyte(); 155 __zlib = new UnCompress(); 156 } 157 final override void putNoCopy(E[] data) { 158 if ( __zlib is null ) { 159 __zlib = new UnCompress(); 160 } 161 __buff.putNoCopy(__zlib.uncompress(data)); 162 } 163 final override E[] get() pure { 164 assert(__buff.length); 165 auto r = __buff.__repr.__buffer[0]; 166 __buff.popFrontN(r.length); 167 return cast(E[])r; 168 } 169 final override void flush() { 170 if ( __zlib is null ) { 171 return; 172 } 173 __buff.put(__zlib.flush()); 174 } 175 final override @property bool empty() const pure @safe { 176 debug(requests) tracef("empty=%b", __buff.empty); 177 return __buff.empty; 178 } 179 final @property auto ref front() pure const @safe { 180 debug(requests) tracef("front: buff length=%d", __buff.length); 181 return __buff.front; 182 } 183 final @property auto popFront() pure @safe { 184 debug(requests) tracef("popFront: buff length=%d", __buff.length); 185 return __buff.popFront; 186 } 187 final @property void popFrontN(size_t n) pure @safe { 188 __buff.popFrontN(n); 189 } 190 } 191 192 /** 193 * Unchunk chunked http responce body. 194 */ 195 public class DecodeChunked : DataPipeIface!ubyte { 196 // length := 0 197 // read chunk-size, chunk-extension (if any) and CRLF 198 // while (chunk-size > 0) { 199 // read chunk-data and CRLF 200 // append chunk-data to entity-body 201 // length := length + chunk-size 202 // read chunk-size and CRLF 203 // } 204 // read entity-header 205 // while (entity-header not empty) { 206 // append entity-header to existing header fields 207 // read entity-header 208 // } 209 // Content-Length := length 210 // Remove "chunked" from Transfer-Encoding 211 // 212 213 // Chunked-Body = *chunk 214 // last-chunk 215 // trailer 216 // CRLF 217 // 218 // chunk = chunk-size [ chunk-extension ] CRLF 219 // chunk-data CRLF 220 // chunk-size = 1*HEX 221 // last-chunk = 1*("0") [ chunk-extension ] CRLF 222 // 223 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 224 // chunk-ext-name = token 225 // chunk-ext-val = token | quoted-string 226 // chunk-data = chunk-size(OCTET) 227 // trailer = *(entity-header CRLF) 228 229 alias eType = ubyte; 230 immutable eType[] CRLF = ['\r', '\n']; 231 private { 232 enum States {huntingSize, huntingSeparator, receiving, trailer}; 233 char state = States.huntingSize; 234 size_t chunk_size, to_receive; 235 Buffer!ubyte buff; 236 ubyte[] linebuff; 237 } 238 final void putNoCopy(eType[] data) { 239 while ( data.length ) { 240 if ( state == States.trailer ) { 241 to_receive = to_receive - min(to_receive, data.length); 242 return; 243 } 244 if ( state == States.huntingSize ) { 245 import std.ascii; 246 ubyte[10] digits; 247 int i; 248 for(i=0;i<data.length;i++) { 249 ubyte v = data[i]; 250 digits[i] = v; 251 if ( v == '\n' ) { 252 i+=1; 253 break; 254 } 255 } 256 linebuff ~= digits[0..i]; 257 if ( linebuff.length >= 80 ) { 258 throw new DecodingException("Can't find chunk size in the body"); 259 } 260 data = data[i..$]; 261 if (!linebuff.canFind(CRLF)) { 262 continue; 263 } 264 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 265 state = States.receiving; 266 to_receive = chunk_size; 267 if ( chunk_size == 0 ) { 268 to_receive = 2-min(2, data.length); // trailing \r\n 269 state = States.trailer; 270 return; 271 } 272 continue; 273 } 274 if ( state == States.receiving ) { 275 if (to_receive > 0 ) { 276 auto can_store = min(to_receive, data.length); 277 buff.putNoCopy(data[0..can_store]); 278 data = data[can_store..$]; 279 to_receive -= can_store; 280 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 281 if ( to_receive == 0 ) { 282 //tracef("switch to huntig separator"); 283 state = States.huntingSeparator; 284 continue; 285 } 286 continue; 287 } 288 assert(false); 289 } 290 if ( state == States.huntingSeparator ) { 291 if ( data[0] == '\n' || data[0]=='\r') { 292 data = data[1..$]; 293 continue; 294 } 295 state = States.huntingSize; 296 linebuff.length = 0; 297 continue; 298 } 299 } 300 } 301 final eType[] get() { 302 auto r = buff.__repr.__buffer[0]; 303 buff.popFrontN(r.length); 304 return r; 305 } 306 final void flush() { 307 } 308 final bool empty() { 309 debug(requests) tracef("empty=%b", buff.empty); 310 return buff.empty; 311 } 312 final bool done() { 313 return state==States.trailer && to_receive==0; 314 } 315 } 316 317 unittest { 318 info("Testing DataPipe"); 319 globalLogLevel(LogLevel.info); 320 alias eType = char; 321 eType[] gzipped = [ 322 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 323 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 324 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 325 0x08, 0x00, 0x00, 0x00 326 ]; // "abc\ndef\n" 327 auto d = new Decompressor!eType(); 328 d.putNoCopy(gzipped[0..2].dup); 329 d.putNoCopy(gzipped[2..10].dup); 330 d.putNoCopy(gzipped[10..$].dup); 331 d.flush(); 332 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 333 334 auto e = new Decompressor!eType(); 335 e.putNoCopy(gzipped[0..10].dup); 336 e.putNoCopy(gzipped[10..$].dup); 337 e.flush(); 338 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 339 // writeln(gzipped.decompress.filter!(a => a!='b').array); 340 auto dp = new DataPipe!eType; 341 dp.insert(new Decompressor!eType()); 342 dp.putNoCopy(gzipped[0..2].dup); 343 dp.putNoCopy(gzipped[2..$].dup); 344 dp.flush(); 345 assert(equal(dp.get(), "abc\ndef\n")); 346 // empty datapipe shoul just pass input to output 347 auto dpu = new DataPipe!ubyte; 348 dpu.putNoCopy("abcd".dup.representation); 349 dpu.putNoCopy("efgh".dup.representation); 350 dpu.flush(); 351 assert(equal(dpu.get(), "abcdefgh")); 352 info("Test unchunker properties"); 353 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation; 354 ubyte[][] result; 355 auto uc = new DecodeChunked(); 356 uc.putNoCopy(twoChunks); 357 while(!uc.empty) { 358 result ~= uc.get(); 359 } 360 assert(equal(result[0], ['1', '2'])); 361 assert(equal(result[1], ['3', '4'])); 362 info("unchunker correctness - ok"); 363 result[0][0] = '5'; 364 assert(twoChunks[3] == '5'); 365 info("unchunker zero copy - ok"); 366 info("Testing DataPipe - done"); 367 } 368 /** 369 * Buffer used to collect and process data from network. It remainds Appender, but support 370 * also Range interface. 371 * $(P To place data in buffer use put() method.) 372 * $(P To retrieve data from buffer you can use several methods:) 373 * $(UL 374 * $(LI Range methods: front, back, index []) 375 * $(LI data method: return collected data (like Appender.data)) 376 * ) 377 */ 378 static this() { 379 } 380 static ~this() { 381 } 382 enum CACHESIZE = 1024; 383 384 static long reprAlloc; 385 static long reprCacheHit; 386 static long reprCacheRequests; 387 388 389 public struct Buffer(T) { 390 // static Repr[CACHESIZE] cache; 391 // static uint cacheIndex; 392 393 private { 394 Repr cachedOrNew() { 395 return new Repr; 396 // reprCacheRequests++; 397 // if ( false && cacheIndex>0 ) { 398 // reprCacheHit++; 399 // cacheIndex -= 1; 400 // return cache[cacheIndex]; 401 // } else { 402 // return new Repr; 403 // } 404 } 405 class Repr { 406 size_t __length; 407 Unqual!T[][] __buffer; 408 this() { 409 reprAlloc++; 410 __length = 0; 411 } 412 this(Repr other) { 413 reprAlloc++; 414 if ( other is null ) 415 return; 416 __length = other.__length; 417 __buffer = other.__buffer.dup; 418 } 419 } 420 Repr __repr; 421 } 422 423 alias toString = data!string; 424 425 this(this) { 426 if ( !__repr ) { 427 return; 428 } 429 __repr = new Repr(__repr); 430 } 431 this(U)(U[] data) { 432 put(data); 433 } 434 ~this() { 435 __repr = null; 436 } 437 /*************** 438 * store data. Data copied 439 */ 440 auto put(U)(U[] data) { 441 if ( data.length == 0 ) { 442 return; 443 } 444 if ( !__repr ) { 445 __repr = cachedOrNew(); 446 } 447 static if (!is(U == T)) { 448 auto d = cast(T[])(data); 449 __repr.__length += d.length; 450 __repr.__buffer ~= d.dup; 451 } else { 452 __repr.__length += data.length; 453 __repr.__buffer ~= data.dup; 454 } 455 return; 456 } 457 auto putNoCopy(U)(U[] data) { 458 if ( data.length == 0 ) { 459 return; 460 } 461 if ( !__repr ) { 462 __repr = cachedOrNew(); 463 } 464 static if (!is(U == T)) { 465 auto d = cast(T[])(data); 466 __repr.__length += d.length; 467 __repr.__buffer ~= d; 468 } else { 469 __repr.__length += data.length; 470 __repr.__buffer ~= data; 471 } 472 return; 473 } 474 @property auto opDollar() const pure @safe { 475 return __repr.__length; 476 } 477 @property size_t length() const pure @safe { 478 if ( !__repr ) { 479 return 0; 480 } 481 return __repr.__length; 482 } 483 @property auto empty() const pure @safe { 484 return length == 0; 485 } 486 @property auto ref front() const pure @safe { 487 assert(length); 488 return __repr.__buffer.front.front; 489 } 490 @property auto ref back() const pure @safe { 491 assert(length); 492 return __repr.__buffer.back.back; 493 } 494 @property void popFront() pure @safe { 495 assert(length); 496 with ( __repr ) { 497 __buffer.front.popFront; 498 if ( __buffer.front.length == 0 ) { 499 __buffer.popFront; 500 } 501 __length--; 502 } 503 } 504 @property void popFrontN(size_t n) pure @safe { 505 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 506 __repr.__length -= n; 507 while( n ) { 508 if ( n <= __repr.__buffer.front.length ) { 509 __repr.__buffer.front.popFrontN(n); 510 if ( __repr.__buffer.front.length == 0 ) { 511 __repr.__buffer.popFront; 512 } 513 return; 514 } 515 n -= __repr.__buffer.front.length; 516 __repr.__buffer.popFront; 517 } 518 } 519 @property void popBack() pure @safe { 520 assert(length); 521 __repr.__buffer.back.popBack; 522 if ( __repr.__buffer.back.length == 0 ) { 523 __repr.__buffer.popBack; 524 } 525 __repr.__length--; 526 } 527 @property void popBackN(size_t n) pure @safe { 528 assert(n <= length, "n: %d, length: %d".format(n, length)); 529 __repr.__length -= n; 530 while( n ) { 531 if ( n <= __repr.__buffer.back.length ) { 532 __repr.__buffer.back.popBackN(n); 533 if ( __repr.__buffer.back.length == 0 ) { 534 __repr.__buffer.popBack; 535 } 536 return; 537 } 538 n -= __repr.__buffer.back.length; 539 __repr.__buffer.popBack; 540 } 541 } 542 @property auto save() @safe { 543 auto n = Buffer!T(); 544 n.__repr = new Repr(__repr); 545 return n; 546 } 547 @property auto ref opIndex(size_t n) const pure @safe { 548 assert( __repr && n < __repr.__length ); 549 foreach(b; __repr.__buffer) { 550 if ( n < b.length ) { 551 return b[n]; 552 } 553 n -= b.length; 554 } 555 assert(false, "Impossible"); 556 } 557 Buffer!T opSlice(size_t m, size_t n) { 558 if ( empty || m == n ) { 559 return Buffer!T(); 560 } 561 assert( m <= n && n <= __repr.__length); 562 auto res = this.save(); 563 res.popBackN(res.__repr.__length-n); 564 res.popFrontN(m); 565 return res; 566 } 567 @property auto data(U=T[])() pure { 568 static if ( is(U==T[]) ) { 569 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 570 return __repr.__buffer.front; 571 } 572 } 573 Appender!(T[]) a; 574 if ( __repr && __repr.__buffer ) { 575 foreach(ref b; __repr.__buffer) { 576 a.put(b); 577 } 578 } 579 static if ( is(U==T[]) ) { 580 return a.data; 581 } else { 582 return cast(U)a.data; 583 } 584 } 585 string opCast(string)() { 586 return this.toString; 587 } 588 bool opEquals(U)(U x) { 589 return cast(U)this == x; 590 } 591 592 } 593 /// 594 public unittest { 595 596 static assert(isInputRange!(Buffer!ubyte)); 597 static assert(isForwardRange!(Buffer!ubyte)); 598 static assert(hasLength!(Buffer!ubyte)); 599 static assert(hasSlicing!(Buffer!ubyte)); 600 static assert(isBidirectionalRange!(Buffer!ubyte)); 601 static assert(isRandomAccessRange!(Buffer!ubyte)); 602 603 auto b = Buffer!ubyte(); 604 b.put("abc".representation.dup); 605 b.put("def".representation.dup); 606 assert(b.length == 6); 607 assert(b.toString == "abcdef"); 608 assert(b.front == 'a'); 609 assert(b.back == 'f'); 610 assert(equal(b[0..$], "abcdef")); 611 assert(equal(b[$-2..$], "ef")); 612 assert(b == "abcdef"); 613 b.popFront; 614 b.popBack; 615 assert(b.front == 'b'); 616 assert(b.back == 'e'); 617 assert(b.length == 4); 618 assert(retro(b).front == 'e'); 619 assert(countUntil(b, 'e') == 3); 620 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 621 assert(equal(b, "bcde")); 622 b.popFront; b.popFront; 623 assert(b.front == 'd'); 624 assert(b.front == b[0]); 625 assert(b.back == b[$-1]); 626 627 auto c = Buffer!ubyte(); 628 c.put("Header0: value0\n".representation.dup); 629 c.put("Header1: value1\n".representation.dup); 630 c.put("Header2: value2\n\nbody".representation.dup); 631 auto c_length = c.length; 632 auto eoh = countUntil(c, "\n\n"); 633 assert(eoh == 47); 634 foreach(header; c[0..eoh].splitter('\n') ) { 635 writeln(cast(string)header.data); 636 } 637 assert(equal(findSplit(c, "\n\n")[2], "body")); 638 assert(c.length == c_length); 639 } 640 641 public struct SSLOptions { 642 enum filetype { 643 pem, 644 asn1, 645 der = asn1, 646 } 647 private { 648 /** 649 * do we need to veryfy peer? 650 */ 651 bool _verifyPeer = true; 652 /** 653 * path to CA cert 654 */ 655 string _caCert; 656 /** 657 * path to key file (can also contain cert (for pem) 658 */ 659 string _keyFile; 660 /** 661 * path to cert file (can also contain key (for pem) 662 */ 663 string _certFile; 664 filetype _keyType = filetype.pem; 665 filetype _certType = filetype.pem; 666 } 667 ubyte haveFiles() pure nothrow @safe @nogc { 668 ubyte r = 0; 669 if ( _keyFile ) r|=1; 670 if ( _certFile ) r|=2; 671 return r; 672 } 673 // do we want to verify peer certificates? 674 bool getVerifyPeer() pure nothrow @nogc { 675 return _verifyPeer; 676 } 677 SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe { 678 _verifyPeer = v; 679 return this; 680 } 681 /// set key file name and type (default - pem) 682 auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 683 _keyFile = f; 684 _keyType = t; 685 return this; 686 } 687 auto getKeyFile() @safe pure nothrow @nogc { 688 return _keyFile; 689 } 690 auto getKeyType() @safe pure nothrow @nogc { 691 return _keyType; 692 } 693 /// set cert file name and type (default - pem) 694 auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 695 _certFile = f; 696 _certType = t; 697 return this; 698 } 699 auto setCaCert(string p) @safe pure nothrow @nogc { 700 _caCert = p; 701 return this; 702 } 703 auto getCaCert() @safe pure nothrow @nogc { 704 return _caCert; 705 } 706 auto getCertFile() @safe pure nothrow @nogc { 707 return _certFile; 708 } 709 auto getCertType() @safe pure nothrow @nogc { 710 return _certType; 711 } 712 /// set key file type 713 void setKeyType(string t) @safe pure nothrow { 714 _keyType = cast(filetype)sslKeyTypes[t]; 715 } 716 /// set cert file type 717 void setCertType(string t) @safe pure nothrow { 718 _certType = cast(filetype)sslKeyTypes[t]; 719 } 720 } 721 static immutable int[string] sslKeyTypes; 722 shared static this() { 723 sslKeyTypes = [ 724 "pem":SSLOptions.filetype.pem, 725 "asn1":SSLOptions.filetype.asn1, 726 "der":SSLOptions.filetype.der, 727 ]; 728 } 729 730 version(vibeD) { 731 } 732 else { 733 extern(C) { 734 int SSL_library_init(); 735 } 736 737 enum SSL_VERIFY_PEER = 0x01; 738 enum SSL_FILETYPE_PEM = 1; 739 enum SSL_FILETYPE_ASN1 = 2; 740 741 immutable int[SSLOptions.filetype] ft2ssl; 742 743 shared static this() { 744 ft2ssl = [ 745 SSLOptions.filetype.pem: SSL_FILETYPE_PEM, 746 SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1, 747 SSLOptions.filetype.der: SSL_FILETYPE_ASN1 748 ]; 749 } 750 751 public class OpenSslSocket : Socket { 752 //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 753 private SSL* ssl; 754 private SSL_CTX* ctx; 755 756 private void initSsl(SSLOptions opts) { 757 //ctx = SSL_CTX_new(SSLv3_client_method()); 758 ctx = openssl.SSL_CTX_new(openssl.TLS_method()); 759 assert(ctx !is null); 760 if ( opts.getVerifyPeer() ) { 761 openssl.SSL_CTX_set_default_verify_paths(ctx); 762 if ( opts.getCaCert() ) { 763 openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null); 764 } 765 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null); 766 } 767 immutable keyFile = opts.getKeyFile(); 768 immutable keyType = opts.getKeyType(); 769 immutable certFile = opts.getCertFile(); 770 immutable certType = opts.getCertType(); 771 final switch(opts.haveFiles()) { 772 case 0b11: // both files 773 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 774 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]); 775 break; 776 case 0b01: // key only 777 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 778 openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 779 break; 780 case 0b10: // cert only 781 openssl.SSL_CTX_use_PrivateKey_file(ctx, certFile.toStringz(), ft2ssl[certType]); 782 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]); 783 break; 784 case 0b00: 785 break; 786 } 787 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 788 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 789 ssl = openssl.SSL_new(ctx); 790 openssl.SSL_set_fd(ssl, cast(int)this.handle); 791 } 792 793 @trusted 794 override void connect(Address dest) { 795 super.connect(dest); 796 if(openssl.SSL_connect(ssl) == -1) { 797 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 798 } 799 } 800 auto connectSSL() { 801 if(openssl.SSL_connect(ssl) == -1) { 802 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 803 } 804 debug(requests) tracef("ssl socket connected"); 805 return this; 806 } 807 @trusted 808 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope { 809 return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length); 810 } 811 override ptrdiff_t send(const(void)[] buf) scope { 812 return send(buf, SocketFlags.NONE); 813 } 814 @trusted 815 override ptrdiff_t receive(void[] buf, SocketFlags flags) scope { 816 return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length); 817 } 818 override ptrdiff_t receive(void[] buf) scope { 819 return receive(buf, SocketFlags.NONE); 820 } 821 this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) { 822 super(af, type); 823 initSsl(opts); 824 } 825 this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) { 826 super(sock, af); 827 initSsl(opts); 828 } 829 override void close() scope { 830 super.close(); 831 if ( ssl !is null ) { 832 openssl.SSL_free(ssl); 833 ssl = null; 834 } 835 if ( ctx !is null ) { 836 openssl.SSL_CTX_free(ctx); 837 ctx = null; 838 } 839 } 840 void SSL_set_tlsext_host_name(string host) { 841 842 } 843 } 844 845 public class SSLSocketStream: SocketStream { 846 private SSLOptions _sslOptions; 847 private Socket underlyingSocket; 848 private SSL* ssl; 849 private string host; 850 851 this(SSLOptions opts) { 852 _sslOptions = opts; 853 } 854 this(NetworkStream ostream, SSLOptions opts, string host = null) { 855 _sslOptions = opts; 856 this.host = host; 857 auto osock = ostream.so(); 858 underlyingSocket = osock; 859 osock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 860 auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions); 861 ssl = ss.ssl; 862 if ( host !is null ) { 863 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 864 } 865 ss.connectSSL(); 866 __isOpen = true; 867 __isConnected = true; 868 s = ss; 869 debug(requests) tracef("ssl stream created from another stream: %s", s); 870 } 871 override void close() { 872 ssl = null; 873 host = null; 874 super.close(); 875 if ( underlyingSocket ) { 876 underlyingSocket.close(); 877 } 878 } 879 override void open(AddressFamily fa) { 880 if ( s !is null ) { 881 s.close(); 882 } 883 auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions); 884 assert(ss !is null, "Can't create socket"); 885 ssl = ss.ssl; 886 if ( host !is null ) { 887 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 888 } 889 s = ss; 890 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 891 __isOpen = true; 892 } 893 override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) { 894 host = h; 895 return super.connect(h, p, timeout); 896 } 897 override SSLSocketStream accept() { 898 auto newso = s.accept(); 899 if ( s is null ) { 900 return null; 901 } 902 auto newstream = new SSLSocketStream(_sslOptions); 903 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 904 newstream.s = sslSocket; 905 newstream.__isOpen = true; 906 newstream.__isConnected = true; 907 return newstream; 908 } 909 } 910 public class TCPSocketStream : SocketStream { 911 override void open(AddressFamily fa) { 912 if ( s !is null ) { 913 s.close(); 914 } 915 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 916 assert(s !is null, "Can't create socket"); 917 __isOpen = true; 918 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 919 } 920 override TCPSocketStream accept() { 921 auto newso = s.accept(); 922 if ( s is null ) { 923 return null; 924 } 925 auto newstream = new TCPSocketStream(); 926 newstream.s = newso; 927 newstream.__isOpen = true; 928 newstream.__isConnected = true; 929 newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 930 return newstream; 931 } 932 } 933 } 934 935 public interface NetworkStream { 936 @property bool isConnected() const; 937 @property bool isOpen() const; 938 939 void close() @trusted; 940 941 /// 942 /// timeout is the socket write timeout. 943 /// 944 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 945 946 ptrdiff_t send(const(void)[] buff); 947 ptrdiff_t receive(void[] buff); 948 949 NetworkStream accept(); 950 @property void reuseAddr(bool); 951 void bind(string); 952 void bind(Address); 953 void listen(int); 954 version(vibeD) { 955 TCPConnection so(); 956 } else { 957 Socket so(); 958 } 959 /// 960 /// Set timeout for receive calls. 0 means no timeout. 961 /// 962 @property void readTimeout(Duration timeout); 963 } 964 965 public abstract class SocketStream : NetworkStream { 966 private { 967 Duration timeout; 968 Socket s; 969 bool __isOpen; 970 bool __isConnected; 971 string _bind; 972 } 973 void open(AddressFamily fa) { 974 } 975 @property Socket so() @safe pure { 976 return s; 977 } 978 @property bool isOpen() @safe @nogc pure const { 979 return s && __isOpen; 980 } 981 @property bool isConnected() @safe @nogc pure const { 982 return s && __isOpen && __isConnected; 983 } 984 void close() @trusted { 985 debug(requests) tracef("Close socket"); 986 if ( isOpen ) { 987 s.close(); 988 __isOpen = false; 989 __isConnected = false; 990 } 991 s = null; 992 } 993 /*** 994 * bind() just remember address. We will cal bind() at the time of connect as 995 * we can have several connection trials. 996 ***/ 997 override void bind(string to) { 998 _bind = to; 999 } 1000 /*** 1001 * Make connection to remote site. Bind, handle connection error, try several addresses, etc 1002 ***/ 1003 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1004 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 1005 Address[] addresses; 1006 __isConnected = false; 1007 try { 1008 addresses = getAddress(host, port); 1009 } catch (Exception e) { 1010 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 1011 } 1012 foreach(a; addresses) { 1013 debug(requests) tracef("Trying %s", a); 1014 try { 1015 open(a.addressFamily); 1016 if ( _bind !is null ) { 1017 auto ad = getAddress(_bind); 1018 debug(requests) tracef("bind to %s", ad[0]); 1019 s.bind(ad[0]); 1020 } 1021 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 1022 s.connect(a); 1023 debug(requests) tracef("Connected to %s", a); 1024 __isConnected = true; 1025 break; 1026 } catch (SocketException e) { 1027 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 1028 s.close(); 1029 } 1030 } 1031 if ( !__isConnected ) { 1032 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 1033 } 1034 return this; 1035 } 1036 1037 ptrdiff_t send(const(void)[] buff) 1038 in {assert(isConnected);} 1039 body { 1040 auto rc = s.send(buff); 1041 if (rc < 0) { 1042 close(); 1043 throw new NetworkException("sending data: %s".format(to!string(strerror(errno)))); 1044 } 1045 return rc; 1046 } 1047 1048 ptrdiff_t receive(void[] buff) { 1049 while (true) { 1050 auto r = s.receive(buff); 1051 if (r < 0) { 1052 auto e = errno; 1053 version(Windows) { 1054 close(); 1055 if ( e == 0 ) { 1056 throw new TimeoutException("Timeout receiving data"); 1057 } 1058 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1059 } 1060 version(Posix) { 1061 if ( e == EINTR ) { 1062 continue; 1063 } 1064 close(); 1065 if ( e == EAGAIN ) { 1066 throw new TimeoutException("Timeout receiving data"); 1067 } 1068 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1069 } 1070 } 1071 else { 1072 buff.length = r; 1073 } 1074 return r; 1075 } 1076 assert(false); 1077 } 1078 1079 @property void readTimeout(Duration timeout) @safe { 1080 if ( __isConnected ) 1081 { 1082 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 1083 } 1084 } 1085 override SocketStream accept() { 1086 assert(false, "Implement before use"); 1087 } 1088 @property override void reuseAddr(bool yes){ 1089 if (yes) { 1090 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 1091 } 1092 else { 1093 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 1094 } 1095 } 1096 override void bind(Address addr){ 1097 s.bind(addr); 1098 } 1099 override void listen(int n) { 1100 s.listen(n); 1101 }; 1102 } 1103 1104 version (vibeD) { 1105 import vibe.core.net, vibe.stream.tls; 1106 1107 public class TCPVibeStream : NetworkStream { 1108 private: 1109 TCPConnection _conn; 1110 Duration _readTimeout = Duration.max; 1111 bool _isOpen = true; 1112 string _bind; 1113 1114 public: 1115 @property bool isConnected() const { 1116 return _conn.connected; 1117 } 1118 @property override bool isOpen() const { 1119 return _conn && _isOpen; 1120 } 1121 void close() @trusted { 1122 _conn.close(); 1123 _isOpen = false; 1124 } 1125 override TCPConnection so() { 1126 return _conn; 1127 } 1128 override void bind(string to) { 1129 _bind = to; 1130 } 1131 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1132 // FIXME: timeout not supported in vibe.d 1133 try { 1134 _conn = connectTCP(host, port, _bind); 1135 } 1136 catch (Exception e) 1137 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1138 1139 return this; 1140 } 1141 1142 ptrdiff_t send(const(void)[] buff) { 1143 _conn.write(cast(const(ubyte)[])buff); 1144 return buff.length; 1145 } 1146 1147 ptrdiff_t receive(void[] buff) { 1148 if (!_conn.waitForData(_readTimeout)) { 1149 if (!_conn.connected || _conn.empty ) { 1150 return 0; 1151 } 1152 throw new TimeoutException("Timeout receiving data"); 1153 } 1154 1155 if(_conn.empty) { 1156 return 0; 1157 } 1158 1159 auto chunk = min(_conn.leastSize, buff.length); 1160 assert(chunk != 0); 1161 _conn.read(cast(ubyte[])buff[0 .. chunk]); 1162 return chunk; 1163 } 1164 1165 @property void readTimeout(Duration timeout) { 1166 if (timeout == 0.seconds) { 1167 _readTimeout = Duration.max; 1168 } 1169 else { 1170 _readTimeout = timeout; 1171 } 1172 } 1173 override TCPVibeStream accept() { 1174 assert(false, "Must be implemented"); 1175 } 1176 override @property void reuseAddr(bool){ 1177 assert(false, "Not Implemented"); 1178 } 1179 override void bind(Address){ 1180 assert(false, "Not Implemented"); 1181 } 1182 override void listen(int){ 1183 assert(false, "Not Implemented"); 1184 } 1185 } 1186 1187 public class SSLVibeStream : TCPVibeStream { 1188 private: 1189 TLSStream _sslStream; 1190 bool _isOpen = true; 1191 SSLOptions _sslOptions; 1192 TCPConnection underlyingConnection; 1193 1194 void connectSSL(string host) { 1195 auto sslctx = createTLSContext(TLSContextKind.client); 1196 if ( _sslOptions.getVerifyPeer() ) { 1197 if ( _sslOptions.getCaCert() == null ) { 1198 throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate."); 1199 } 1200 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert()); 1201 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert; 1202 } else { 1203 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1204 } 1205 immutable keyFile = _sslOptions.getKeyFile(); 1206 immutable certFile = _sslOptions.getCertFile(); 1207 final switch(_sslOptions.haveFiles()) { 1208 case 0b11: // both files 1209 sslctx.usePrivateKeyFile(keyFile); 1210 sslctx.useCertificateChainFile(certFile); 1211 break; 1212 case 0b01: // key only 1213 sslctx.usePrivateKeyFile(keyFile); 1214 sslctx.useCertificateChainFile(keyFile); 1215 break; 1216 case 0b10: // cert only 1217 sslctx.usePrivateKeyFile(certFile); 1218 sslctx.useCertificateChainFile(certFile); 1219 break; 1220 case 0b00: 1221 break; 1222 } 1223 _sslStream = createTLSStream(_conn, sslctx, host); 1224 } 1225 1226 public: 1227 this(SSLOptions opts) { 1228 _sslOptions = opts; 1229 } 1230 override TCPConnection so() { 1231 return _conn; 1232 } 1233 this(NetworkStream ostream, SSLOptions opts, string host = null) { 1234 _sslOptions = opts; 1235 auto oconn = ostream.so(); 1236 underlyingConnection = oconn; 1237 _conn = oconn; 1238 connectSSL(host); 1239 } 1240 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1241 try { 1242 _conn = connectTCP(host, port); 1243 connectSSL(host); 1244 } 1245 catch (ConnectError e) { 1246 throw e; 1247 } 1248 catch (Exception e) { 1249 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1250 } 1251 1252 return this; 1253 } 1254 1255 override ptrdiff_t send(const(void)[] buff) { 1256 _sslStream.write(cast(const(ubyte)[])buff); 1257 return buff.length; 1258 } 1259 1260 override ptrdiff_t receive(void[] buff) { 1261 if (!_sslStream.dataAvailableForRead) { 1262 if (!_conn.waitForData(_readTimeout)) { 1263 if (!_conn.connected) { 1264 return 0; 1265 } 1266 throw new TimeoutException("Timeout receiving data"); 1267 } 1268 } 1269 1270 if(_sslStream.empty) { 1271 return 0; 1272 } 1273 1274 auto chunk = min(_sslStream.leastSize, buff.length); 1275 assert(chunk != 0); 1276 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1277 return chunk; 1278 } 1279 1280 override void close() @trusted { 1281 _sslStream.finalize(); 1282 _conn.close(); 1283 _isOpen = false; 1284 } 1285 @property override bool isOpen() const { 1286 return _conn && _isOpen; 1287 } 1288 override SSLVibeStream accept() { 1289 assert(false, "Must be implemented"); 1290 } 1291 override @property void reuseAddr(bool){ 1292 assert(false, "Not Implemented"); 1293 } 1294 override void bind(Address){ 1295 assert(false, "Not Implemented"); 1296 } 1297 override void listen(int){ 1298 assert(false, "Not Implemented"); 1299 } 1300 } 1301 } 1302 1303 version (vibeD) { 1304 public alias TCPStream = TCPVibeStream; 1305 public alias SSLStream = SSLVibeStream; 1306 } 1307 else { 1308 public alias TCPStream = TCPSocketStream; 1309 public alias SSLStream = SSLSocketStream; 1310 }