1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 import core.stdc.string; 20 21 import requests.ssl_adapter : openssl, SSL, SSL_CTX; 22 23 alias InDataHandler = DataPipeIface!ubyte; 24 25 public class ConnectError: Exception { 26 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 27 super(message, file, line, next); 28 } 29 } 30 31 class DecodingException: Exception { 32 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 33 super(message, file, line, next); 34 } 35 } 36 37 public class TimeoutException: Exception { 38 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 39 super(message, file, line, next); 40 } 41 } 42 43 public class NetworkException: Exception { 44 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 45 super(message, file, line, next); 46 } 47 } 48 49 /** 50 * DataPipeIface can accept some data, process, and return processed data. 51 */ 52 public interface DataPipeIface(E) { 53 /// Is there any processed data ready for reading? 54 bool empty(); 55 /// Put next data portion for processing 56 //void put(E[]); 57 void putNoCopy(E[]); 58 /// Get any ready data 59 E[] get(); 60 /// Signal on end of incoming data stream. 61 void flush(); 62 } 63 /** 64 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 65 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 66 * and uncompress Content-Encoding "gzip". 67 */ 68 public class DataPipe(E) : DataPipeIface!E { 69 70 DataPipeIface!(E)[] pipe; 71 Buffer!E buffer; 72 /// Append data processor to pipeline 73 /// Params: 74 /// p = processor 75 final void insert(DataPipeIface!E p) { 76 pipe ~= p; 77 } 78 final E[][] process(DataPipeIface!E p, E[][] data) { 79 E[][] result; 80 data.each!(e => p.putNoCopy(e)); 81 while(!p.empty()) result ~= p.get(); 82 return result; 83 } 84 /// Process next data portion. Data passed over pipeline and store result in buffer. 85 /// Params: 86 /// data = input data buffer. 87 /// NoCopy means we do not copy data to buffer, we keep reference 88 final void putNoCopy(E[] data) { 89 if ( pipe.empty ) { 90 buffer.putNoCopy(data); 91 return; 92 } 93 try { 94 auto t = process(pipe.front, [data]); 95 foreach(ref p; pipe[1..$]) { 96 t = process(p, t); 97 } 98 t.each!(b => buffer.putNoCopy(b)); 99 } 100 catch (Exception e) { 101 throw new DecodingException(e.msg); 102 } 103 } 104 /// Get what was collected in internal buffer and clear it. 105 /// Returns: 106 /// data collected. 107 final E[] get() { 108 if ( buffer.empty ) { 109 return E[].init; 110 } 111 auto res = buffer.data; 112 buffer = Buffer!E.init; 113 return res; 114 } 115 /// 116 /// get without datamove. but user receive [][] 117 /// 118 final E[][] getNoCopy() { 119 if ( buffer.empty ) { 120 return E[][].init; 121 } 122 E[][] res = buffer.__repr.__buffer; 123 buffer = Buffer!E.init; 124 return res; 125 } 126 /// Test if internal buffer is empty 127 /// Returns: 128 /// true if internal buffer is empty (nothing to get()) 129 final bool empty() pure const @safe { 130 return buffer.empty; 131 } 132 final void flush() { 133 E[][] product; 134 foreach(ref p; pipe) { 135 product.each!(e => p.putNoCopy(e)); 136 p.flush(); 137 product.length = 0; 138 while( !p.empty ) product ~= p.get(); 139 } 140 product.each!(b => buffer.putNoCopy(b)); 141 } 142 } 143 144 /** 145 * Processor for gzipped/compressed content. 146 * Also support InputRange interface. 147 */ 148 public class Decompressor(E) : DataPipeIface!E { 149 private { 150 Buffer!ubyte __buff; 151 UnCompress __zlib; 152 } 153 this() { 154 __buff = Buffer!ubyte(); 155 __zlib = new UnCompress(); 156 } 157 final override void putNoCopy(E[] data) { 158 if ( __zlib is null ) { 159 __zlib = new UnCompress(); 160 } 161 __buff.putNoCopy(__zlib.uncompress(data)); 162 } 163 final override E[] get() pure { 164 assert(__buff.length); 165 auto r = __buff.__repr.__buffer[0]; 166 __buff.popFrontN(r.length); 167 return cast(E[])r; 168 } 169 final override void flush() { 170 if ( __zlib is null ) { 171 return; 172 } 173 __buff.put(__zlib.flush()); 174 } 175 final override @property bool empty() const pure @safe { 176 debug(requests) tracef("empty=%b", __buff.empty); 177 return __buff.empty; 178 } 179 final @property auto ref front() pure const @safe { 180 debug(requests) tracef("front: buff length=%d", __buff.length); 181 return __buff.front; 182 } 183 final @property auto popFront() pure @safe { 184 debug(requests) tracef("popFront: buff length=%d", __buff.length); 185 return __buff.popFront; 186 } 187 final @property void popFrontN(size_t n) pure @safe { 188 __buff.popFrontN(n); 189 } 190 } 191 192 /** 193 * Unchunk chunked http responce body. 194 */ 195 public class DecodeChunked : DataPipeIface!ubyte { 196 // length := 0 197 // read chunk-size, chunk-extension (if any) and CRLF 198 // while (chunk-size > 0) { 199 // read chunk-data and CRLF 200 // append chunk-data to entity-body 201 // length := length + chunk-size 202 // read chunk-size and CRLF 203 // } 204 // read entity-header 205 // while (entity-header not empty) { 206 // append entity-header to existing header fields 207 // read entity-header 208 // } 209 // Content-Length := length 210 // Remove "chunked" from Transfer-Encoding 211 // 212 213 // Chunked-Body = *chunk 214 // last-chunk 215 // trailer 216 // CRLF 217 // 218 // chunk = chunk-size [ chunk-extension ] CRLF 219 // chunk-data CRLF 220 // chunk-size = 1*HEX 221 // last-chunk = 1*("0") [ chunk-extension ] CRLF 222 // 223 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 224 // chunk-ext-name = token 225 // chunk-ext-val = token | quoted-string 226 // chunk-data = chunk-size(OCTET) 227 // trailer = *(entity-header CRLF) 228 229 alias eType = ubyte; 230 immutable eType[] CRLF = ['\r', '\n']; 231 private { 232 enum States {huntingSize, huntingSeparator, receiving, trailer}; 233 char state = States.huntingSize; 234 size_t chunk_size, to_receive; 235 Buffer!ubyte buff; 236 ubyte[] linebuff; 237 } 238 final void putNoCopy(eType[] data) { 239 while ( data.length ) { 240 if ( state == States.trailer ) { 241 to_receive = to_receive - min(to_receive, data.length); 242 return; 243 } 244 if ( state == States.huntingSize ) { 245 import std.ascii; 246 ubyte[10] digits; 247 int i; 248 for(i=0;i<data.length;i++) { 249 ubyte v = data[i]; 250 digits[i] = v; 251 if ( v == '\n' ) { 252 i+=1; 253 break; 254 } 255 } 256 linebuff ~= digits[0..i]; 257 if ( linebuff.length >= 80 ) { 258 throw new DecodingException("Can't find chunk size in the body"); 259 } 260 data = data[i..$]; 261 if (!linebuff.canFind(CRLF)) { 262 continue; 263 } 264 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 265 state = States.receiving; 266 to_receive = chunk_size; 267 if ( chunk_size == 0 ) { 268 to_receive = 2-min(2, data.length); // trailing \r\n 269 state = States.trailer; 270 return; 271 } 272 continue; 273 } 274 if ( state == States.receiving ) { 275 if (to_receive > 0 ) { 276 auto can_store = min(to_receive, data.length); 277 buff.putNoCopy(data[0..can_store]); 278 data = data[can_store..$]; 279 to_receive -= can_store; 280 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 281 if ( to_receive == 0 ) { 282 //tracef("switch to huntig separator"); 283 state = States.huntingSeparator; 284 continue; 285 } 286 continue; 287 } 288 assert(false); 289 } 290 if ( state == States.huntingSeparator ) { 291 if ( data[0] == '\n' || data[0]=='\r') { 292 data = data[1..$]; 293 continue; 294 } 295 state = States.huntingSize; 296 linebuff.length = 0; 297 continue; 298 } 299 } 300 } 301 final eType[] get() { 302 auto r = buff.__repr.__buffer[0]; 303 buff.popFrontN(r.length); 304 return r; 305 } 306 final void flush() { 307 } 308 final bool empty() { 309 debug(requests) tracef("empty=%b", buff.empty); 310 return buff.empty; 311 } 312 final bool done() { 313 return state==States.trailer && to_receive==0; 314 } 315 } 316 317 unittest { 318 info("Testing DataPipe"); 319 globalLogLevel(LogLevel.info); 320 alias eType = char; 321 eType[] gzipped = [ 322 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 323 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 324 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 325 0x08, 0x00, 0x00, 0x00 326 ]; // "abc\ndef\n" 327 auto d = new Decompressor!eType(); 328 d.putNoCopy(gzipped[0..2].dup); 329 d.putNoCopy(gzipped[2..10].dup); 330 d.putNoCopy(gzipped[10..$].dup); 331 d.flush(); 332 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 333 334 auto e = new Decompressor!eType(); 335 e.putNoCopy(gzipped[0..10].dup); 336 e.putNoCopy(gzipped[10..$].dup); 337 e.flush(); 338 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 339 // writeln(gzipped.decompress.filter!(a => a!='b').array); 340 auto dp = new DataPipe!eType; 341 dp.insert(new Decompressor!eType()); 342 dp.putNoCopy(gzipped[0..2].dup); 343 dp.putNoCopy(gzipped[2..$].dup); 344 dp.flush(); 345 assert(equal(dp.get(), "abc\ndef\n")); 346 // empty datapipe shoul just pass input to output 347 auto dpu = new DataPipe!ubyte; 348 dpu.putNoCopy("abcd".dup.representation); 349 dpu.putNoCopy("efgh".dup.representation); 350 dpu.flush(); 351 assert(equal(dpu.get(), "abcdefgh")); 352 info("Test unchunker properties"); 353 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation; 354 ubyte[][] result; 355 auto uc = new DecodeChunked(); 356 uc.putNoCopy(twoChunks); 357 while(!uc.empty) { 358 result ~= uc.get(); 359 } 360 assert(equal(result[0], ['1', '2'])); 361 assert(equal(result[1], ['3', '4'])); 362 info("unchunker correctness - ok"); 363 result[0][0] = '5'; 364 assert(twoChunks[3] == '5'); 365 info("unchunker zero copy - ok"); 366 info("Testing DataPipe - done"); 367 } 368 /** 369 * Buffer used to collect and process data from network. It remainds Appender, but support 370 * also Range interface. 371 * $(P To place data in buffer use put() method.) 372 * $(P To retrieve data from buffer you can use several methods:) 373 * $(UL 374 * $(LI Range methods: front, back, index []) 375 * $(LI data method: return collected data (like Appender.data)) 376 * ) 377 */ 378 enum CACHESIZE = 1024; 379 380 static long reprAlloc; 381 static long reprCacheHit; 382 static long reprCacheRequests; 383 384 385 public struct Buffer(T) { 386 // static Repr[CACHESIZE] cache; 387 // static uint cacheIndex; 388 389 private { 390 Repr cachedOrNew() { 391 return new Repr; 392 // reprCacheRequests++; 393 // if ( false && cacheIndex>0 ) { 394 // reprCacheHit++; 395 // cacheIndex -= 1; 396 // return cache[cacheIndex]; 397 // } else { 398 // return new Repr; 399 // } 400 } 401 class Repr { 402 size_t __length; 403 Unqual!T[][] __buffer; 404 this() { 405 reprAlloc++; 406 __length = 0; 407 } 408 this(Repr other) { 409 reprAlloc++; 410 if ( other is null ) 411 return; 412 __length = other.__length; 413 __buffer = other.__buffer.dup; 414 } 415 } 416 Repr __repr; 417 } 418 419 alias toString = data!string; 420 421 this(this) { 422 if ( !__repr ) { 423 return; 424 } 425 __repr = new Repr(__repr); 426 } 427 this(U)(U[] data) { 428 put(data); 429 } 430 ~this() { 431 __repr = null; 432 } 433 /*************** 434 * store data. Data copied 435 */ 436 auto put(U)(U[] data) { 437 if ( data.length == 0 ) { 438 return; 439 } 440 if ( !__repr ) { 441 __repr = cachedOrNew(); 442 } 443 static if (!is(U == T)) { 444 auto d = cast(T[])(data); 445 __repr.__length += d.length; 446 __repr.__buffer ~= d.dup; 447 } else { 448 __repr.__length += data.length; 449 __repr.__buffer ~= data.dup; 450 } 451 return; 452 } 453 auto putNoCopy(U)(U[] data) { 454 if ( data.length == 0 ) { 455 return; 456 } 457 if ( !__repr ) { 458 __repr = cachedOrNew(); 459 } 460 static if (!is(U == T)) { 461 auto d = cast(T[])(data); 462 __repr.__length += d.length; 463 __repr.__buffer ~= d; 464 } else { 465 __repr.__length += data.length; 466 __repr.__buffer ~= data; 467 } 468 return; 469 } 470 @property auto opDollar() const pure @safe { 471 return __repr.__length; 472 } 473 @property size_t length() const pure @safe { 474 if ( !__repr ) { 475 return 0; 476 } 477 return __repr.__length; 478 } 479 @property auto empty() const pure @safe { 480 return length == 0; 481 } 482 @property auto ref front() const pure @safe { 483 assert(length); 484 return __repr.__buffer.front.front; 485 } 486 @property auto ref back() const pure @safe { 487 assert(length); 488 return __repr.__buffer.back.back; 489 } 490 @property void popFront() pure @safe { 491 assert(length); 492 with ( __repr ) { 493 __buffer.front.popFront; 494 if ( __buffer.front.length == 0 ) { 495 __buffer.popFront; 496 } 497 __length--; 498 } 499 } 500 @property void popFrontN(size_t n) pure @safe { 501 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 502 __repr.__length -= n; 503 while( n ) { 504 if ( n <= __repr.__buffer.front.length ) { 505 __repr.__buffer.front.popFrontN(n); 506 if ( __repr.__buffer.front.length == 0 ) { 507 __repr.__buffer.popFront; 508 } 509 return; 510 } 511 n -= __repr.__buffer.front.length; 512 __repr.__buffer.popFront; 513 } 514 } 515 @property void popBack() pure @safe { 516 assert(length); 517 __repr.__buffer.back.popBack; 518 if ( __repr.__buffer.back.length == 0 ) { 519 __repr.__buffer.popBack; 520 } 521 __repr.__length--; 522 } 523 @property void popBackN(size_t n) pure @safe { 524 assert(n <= length, "n: %d, length: %d".format(n, length)); 525 __repr.__length -= n; 526 while( n ) { 527 if ( n <= __repr.__buffer.back.length ) { 528 __repr.__buffer.back.popBackN(n); 529 if ( __repr.__buffer.back.length == 0 ) { 530 __repr.__buffer.popBack; 531 } 532 return; 533 } 534 n -= __repr.__buffer.back.length; 535 __repr.__buffer.popBack; 536 } 537 } 538 @property auto save() @safe { 539 auto n = Buffer!T(); 540 n.__repr = new Repr(__repr); 541 return n; 542 } 543 @property auto ref opIndex(size_t n) const pure @safe { 544 assert( __repr && n < __repr.__length ); 545 foreach(b; __repr.__buffer) { 546 if ( n < b.length ) { 547 return b[n]; 548 } 549 n -= b.length; 550 } 551 assert(false, "Impossible"); 552 } 553 Buffer!T opSlice(size_t m, size_t n) { 554 if ( empty || m == n ) { 555 return Buffer!T(); 556 } 557 assert( m <= n && n <= __repr.__length); 558 auto res = this.save(); 559 res.popBackN(res.__repr.__length-n); 560 res.popFrontN(m); 561 return res; 562 } 563 @property auto data(U=T[])() pure { 564 static if ( is(U==T[]) ) { 565 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 566 return __repr.__buffer.front; 567 } 568 } 569 Appender!(T[]) a; 570 if ( __repr && __repr.__buffer ) { 571 foreach(ref b; __repr.__buffer) { 572 a.put(b); 573 } 574 } 575 static if ( is(U==T[]) ) { 576 return a.data; 577 } else { 578 return cast(U)a.data; 579 } 580 } 581 string opCast(string)() { 582 return this.toString; 583 } 584 bool opEquals(U)(U x) { 585 return cast(U)this == x; 586 } 587 588 } 589 /// 590 public unittest { 591 592 static assert(isInputRange!(Buffer!ubyte)); 593 static assert(isForwardRange!(Buffer!ubyte)); 594 static assert(hasLength!(Buffer!ubyte)); 595 static assert(hasSlicing!(Buffer!ubyte)); 596 static assert(isBidirectionalRange!(Buffer!ubyte)); 597 static assert(isRandomAccessRange!(Buffer!ubyte)); 598 599 auto b = Buffer!ubyte(); 600 b.put("abc".representation.dup); 601 b.put("def".representation.dup); 602 assert(b.length == 6); 603 assert(b.toString == "abcdef"); 604 assert(b.front == 'a'); 605 assert(b.back == 'f'); 606 assert(equal(b[0..$], "abcdef")); 607 assert(equal(b[$-2..$], "ef")); 608 assert(b == "abcdef"); 609 b.popFront; 610 b.popBack; 611 assert(b.front == 'b'); 612 assert(b.back == 'e'); 613 assert(b.length == 4); 614 assert(retro(b).front == 'e'); 615 assert(countUntil(b, 'e') == 3); 616 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 617 assert(equal(b, "bcde")); 618 b.popFront; b.popFront; 619 assert(b.front == 'd'); 620 assert(b.front == b[0]); 621 assert(b.back == b[$-1]); 622 623 auto c = Buffer!ubyte(); 624 c.put("Header0: value0\n".representation.dup); 625 c.put("Header1: value1\n".representation.dup); 626 c.put("Header2: value2\n\nbody".representation.dup); 627 auto c_length = c.length; 628 auto eoh = countUntil(c, "\n\n"); 629 assert(eoh == 47); 630 foreach(header; c[0..eoh].splitter('\n') ) { 631 writeln(cast(string)header.data); 632 } 633 assert(equal(findSplit(c, "\n\n")[2], "body")); 634 assert(c.length == c_length); 635 } 636 637 public struct SSLOptions { 638 enum filetype { 639 pem, 640 asn1, 641 der = asn1, 642 } 643 private { 644 /** 645 * do we need to veryfy peer? 646 */ 647 bool _verifyPeer = true; 648 /** 649 * path to CA cert 650 */ 651 string _caCert; 652 /** 653 * path to key file (can also contain cert (for pem) 654 */ 655 string _keyFile; 656 /** 657 * path to cert file (can also contain key (for pem) 658 */ 659 string _certFile; 660 filetype _keyType = filetype.pem; 661 filetype _certType = filetype.pem; 662 } 663 ubyte haveFiles() pure nothrow @safe @nogc { 664 ubyte r = 0; 665 if ( _keyFile ) r|=1; 666 if ( _certFile ) r|=2; 667 return r; 668 } 669 // do we want to verify peer certificates? 670 bool getVerifyPeer() pure nothrow @nogc { 671 return _verifyPeer; 672 } 673 SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe { 674 _verifyPeer = v; 675 return this; 676 } 677 /// set key file name and type (default - pem) 678 auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 679 _keyFile = f; 680 _keyType = t; 681 return this; 682 } 683 auto getKeyFile() @safe pure nothrow @nogc { 684 return _keyFile; 685 } 686 auto getKeyType() @safe pure nothrow @nogc { 687 return _keyType; 688 } 689 /// set cert file name and type (default - pem) 690 auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 691 _certFile = f; 692 _certType = t; 693 return this; 694 } 695 auto setCaCert(string p) @safe pure nothrow @nogc { 696 _caCert = p; 697 return this; 698 } 699 auto getCaCert() @safe pure nothrow @nogc { 700 return _caCert; 701 } 702 auto getCertFile() @safe pure nothrow @nogc { 703 return _certFile; 704 } 705 auto getCertType() @safe pure nothrow @nogc { 706 return _certType; 707 } 708 /// set key file type 709 void setKeyType(string t) @safe pure nothrow { 710 _keyType = cast(filetype)sslKeyTypes[t]; 711 } 712 /// set cert file type 713 void setCertType(string t) @safe pure nothrow { 714 _certType = cast(filetype)sslKeyTypes[t]; 715 } 716 } 717 static immutable int[string] sslKeyTypes; 718 shared static this() { 719 sslKeyTypes = [ 720 "pem":SSLOptions.filetype.pem, 721 "asn1":SSLOptions.filetype.asn1, 722 "der":SSLOptions.filetype.der, 723 ]; 724 } 725 726 version(vibeD) { 727 } 728 else { 729 extern(C) { 730 int SSL_library_init(); 731 } 732 733 enum SSL_VERIFY_PEER = 0x01; 734 enum SSL_FILETYPE_PEM = 1; 735 enum SSL_FILETYPE_ASN1 = 2; 736 737 immutable int[SSLOptions.filetype] ft2ssl; 738 739 shared static this() { 740 ft2ssl = [ 741 SSLOptions.filetype.pem: SSL_FILETYPE_PEM, 742 SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1, 743 SSLOptions.filetype.der: SSL_FILETYPE_ASN1 744 ]; 745 } 746 747 public class OpenSslSocket : Socket { 748 //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 749 private SSL* ssl; 750 private SSL_CTX* ctx; 751 752 private void initSsl(SSLOptions opts) { 753 //ctx = SSL_CTX_new(SSLv3_client_method()); 754 ctx = openssl.SSL_CTX_new(openssl.TLS_method()); 755 assert(ctx !is null); 756 if ( opts.getVerifyPeer() ) { 757 openssl.SSL_CTX_set_default_verify_paths(ctx); 758 if ( opts.getCaCert() ) { 759 openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null); 760 } 761 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null); 762 } 763 immutable keyFile = opts.getKeyFile(); 764 immutable keyType = opts.getKeyType(); 765 immutable certFile = opts.getCertFile(); 766 immutable certType = opts.getCertType(); 767 final switch(opts.haveFiles()) { 768 case 0b11: // both files 769 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 770 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]); 771 break; 772 case 0b01: // key only 773 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 774 openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 775 break; 776 case 0b10: // cert only 777 openssl.SSL_CTX_use_PrivateKey_file(ctx, certFile.toStringz(), ft2ssl[certType]); 778 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]); 779 break; 780 case 0b00: 781 break; 782 } 783 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 784 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 785 ssl = openssl.SSL_new(ctx); 786 openssl.SSL_set_fd(ssl, cast(int)this.handle); 787 } 788 789 @trusted 790 override void connect(Address dest) { 791 super.connect(dest); 792 if(openssl.SSL_connect(ssl) == -1) { 793 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 794 } 795 } 796 auto connectSSL() { 797 if(openssl.SSL_connect(ssl) == -1) { 798 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 799 } 800 debug(requests) tracef("ssl socket connected"); 801 return this; 802 } 803 @trusted 804 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) scope { 805 return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length); 806 } 807 override ptrdiff_t send(const(void)[] buf) scope { 808 return send(buf, SocketFlags.NONE); 809 } 810 @trusted 811 override ptrdiff_t receive(void[] buf, SocketFlags flags) scope { 812 return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length); 813 } 814 override ptrdiff_t receive(void[] buf) scope { 815 return receive(buf, SocketFlags.NONE); 816 } 817 this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) { 818 super(af, type); 819 initSsl(opts); 820 } 821 this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) { 822 super(sock, af); 823 initSsl(opts); 824 } 825 override void close() scope { 826 super.close(); 827 if ( ssl !is null ) { 828 openssl.SSL_free(ssl); 829 ssl = null; 830 } 831 if ( ctx !is null ) { 832 openssl.SSL_CTX_free(ctx); 833 ctx = null; 834 } 835 } 836 void SSL_set_tlsext_host_name(string host) { 837 838 } 839 } 840 841 public class SSLSocketStream: SocketStream { 842 private SSLOptions _sslOptions; 843 private Socket underlyingSocket; 844 private SSL* ssl; 845 private string host; 846 847 this(SSLOptions opts) { 848 _sslOptions = opts; 849 } 850 this(NetworkStream ostream, SSLOptions opts, string host = null) { 851 _sslOptions = opts; 852 this.host = host; 853 auto osock = ostream.so(); 854 underlyingSocket = osock; 855 osock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 856 auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions); 857 ssl = ss.ssl; 858 if ( host !is null ) { 859 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 860 } 861 ss.connectSSL(); 862 __isOpen = true; 863 __isConnected = true; 864 s = ss; 865 debug(requests) tracef("ssl stream created from another stream: %s", s); 866 } 867 override void close() { 868 ssl = null; 869 host = null; 870 super.close(); 871 if ( underlyingSocket ) { 872 underlyingSocket.close(); 873 } 874 } 875 override void open(AddressFamily fa) { 876 if ( s !is null ) { 877 s.close(); 878 } 879 auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions); 880 assert(ss !is null, "Can't create socket"); 881 ssl = ss.ssl; 882 if ( host !is null ) { 883 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 884 } 885 s = ss; 886 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 887 __isOpen = true; 888 } 889 override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) { 890 host = h; 891 return super.connect(h, p, timeout); 892 } 893 override SSLSocketStream accept() { 894 auto newso = s.accept(); 895 if ( s is null ) { 896 return null; 897 } 898 auto newstream = new SSLSocketStream(_sslOptions); 899 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 900 newstream.s = sslSocket; 901 newstream.__isOpen = true; 902 newstream.__isConnected = true; 903 return newstream; 904 } 905 } 906 public class TCPSocketStream : SocketStream { 907 override void open(AddressFamily fa) { 908 if ( s !is null ) { 909 s.close(); 910 } 911 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 912 assert(s !is null, "Can't create socket"); 913 __isOpen = true; 914 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 915 } 916 override TCPSocketStream accept() { 917 auto newso = s.accept(); 918 if ( s is null ) { 919 return null; 920 } 921 auto newstream = new TCPSocketStream(); 922 newstream.s = newso; 923 newstream.__isOpen = true; 924 newstream.__isConnected = true; 925 newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 926 return newstream; 927 } 928 } 929 } 930 931 public interface NetworkStream { 932 @property bool isConnected() const; 933 @property bool isOpen() const; 934 935 void close() @trusted; 936 937 /// 938 /// timeout is the socket write timeout. 939 /// 940 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 941 942 ptrdiff_t send(const(void)[] buff); 943 ptrdiff_t receive(void[] buff); 944 945 NetworkStream accept(); 946 @property void reuseAddr(bool); 947 void bind(string); 948 void bind(Address); 949 void listen(int); 950 version(vibeD) { 951 TCPConnection so(); 952 } else { 953 Socket so(); 954 } 955 /// 956 /// Set timeout for receive calls. 0 means no timeout. 957 /// 958 @property void readTimeout(Duration timeout); 959 } 960 961 public abstract class SocketStream : NetworkStream { 962 private { 963 Duration timeout; 964 Socket s; 965 bool __isOpen; 966 bool __isConnected; 967 string _bind; 968 } 969 void open(AddressFamily fa) { 970 } 971 @property Socket so() @safe pure { 972 return s; 973 } 974 @property bool isOpen() @safe @nogc pure const { 975 return s && __isOpen; 976 } 977 @property bool isConnected() @safe @nogc pure const { 978 return s && __isOpen && __isConnected; 979 } 980 void close() @trusted { 981 debug(requests) tracef("Close socket"); 982 if ( isOpen ) { 983 s.close(); 984 __isOpen = false; 985 __isConnected = false; 986 } 987 s = null; 988 } 989 /*** 990 * bind() just remember address. We will cal bind() at the time of connect as 991 * we can have several connection trials. 992 ***/ 993 override void bind(string to) { 994 _bind = to; 995 } 996 /*** 997 * Make connection to remote site. Bind, handle connection error, try several addresses, etc 998 ***/ 999 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1000 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 1001 Address[] addresses; 1002 __isConnected = false; 1003 try { 1004 addresses = getAddress(host, port); 1005 } catch (Exception e) { 1006 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 1007 } 1008 foreach(a; addresses) { 1009 debug(requests) tracef("Trying %s", a); 1010 try { 1011 open(a.addressFamily); 1012 if ( _bind !is null ) { 1013 auto ad = getAddress(_bind); 1014 debug(requests) tracef("bind to %s", ad[0]); 1015 s.bind(ad[0]); 1016 } 1017 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 1018 s.connect(a); 1019 debug(requests) tracef("Connected to %s", a); 1020 __isConnected = true; 1021 break; 1022 } catch (SocketException e) { 1023 debug(requests) warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 1024 s.close(); 1025 } 1026 } 1027 if ( !__isConnected ) { 1028 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 1029 } 1030 return this; 1031 } 1032 1033 ptrdiff_t send(const(void)[] buff) 1034 in {assert(isConnected);} 1035 do { 1036 auto rc = s.send(buff); 1037 if (rc < 0) { 1038 close(); 1039 throw new NetworkException("sending data: %s".format(to!string(strerror(errno)))); 1040 } 1041 return rc; 1042 } 1043 1044 ptrdiff_t receive(void[] buff) { 1045 while (true) { 1046 auto r = s.receive(buff); 1047 if (r < 0) { 1048 auto e = errno; 1049 version(Windows) { 1050 close(); 1051 if ( e == 0 ) { 1052 throw new TimeoutException("Timeout receiving data"); 1053 } 1054 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1055 } 1056 version(Posix) { 1057 if ( e == EINTR ) { 1058 continue; 1059 } 1060 close(); 1061 if ( e == EAGAIN ) { 1062 throw new TimeoutException("Timeout receiving data"); 1063 } 1064 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1065 } 1066 } 1067 else { 1068 buff.length = r; 1069 } 1070 return r; 1071 } 1072 assert(false); 1073 } 1074 1075 @property void readTimeout(Duration timeout) @safe { 1076 if ( __isConnected ) 1077 { 1078 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 1079 } 1080 } 1081 override SocketStream accept() { 1082 assert(false, "Implement before use"); 1083 } 1084 @property override void reuseAddr(bool yes){ 1085 if (yes) { 1086 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 1087 } 1088 else { 1089 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 1090 } 1091 } 1092 override void bind(Address addr){ 1093 s.bind(addr); 1094 } 1095 override void listen(int n) { 1096 s.listen(n); 1097 }; 1098 } 1099 1100 version (vibeD) { 1101 import vibe.core.net, vibe.stream.tls; 1102 1103 public class TCPVibeStream : NetworkStream { 1104 private: 1105 TCPConnection _conn; 1106 Duration _readTimeout = Duration.max; 1107 bool _isOpen = true; 1108 string _bind; 1109 1110 public: 1111 @property bool isConnected() const { 1112 return _conn.connected; 1113 } 1114 @property override bool isOpen() const { 1115 return _conn && _isOpen; 1116 } 1117 void close() @trusted { 1118 _conn.close(); 1119 _isOpen = false; 1120 } 1121 override TCPConnection so() { 1122 return _conn; 1123 } 1124 override void bind(string to) { 1125 _bind = to; 1126 } 1127 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1128 // FIXME: timeout not supported in vibe.d 1129 try { 1130 _conn = connectTCP(host, port, _bind); 1131 } 1132 catch (Exception e) 1133 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1134 1135 return this; 1136 } 1137 1138 ptrdiff_t send(const(void)[] buff) { 1139 _conn.write(cast(const(ubyte)[])buff); 1140 return buff.length; 1141 } 1142 1143 ptrdiff_t receive(void[] buff) { 1144 if (!_conn.waitForData(_readTimeout)) { 1145 if (!_conn.connected || _conn.empty ) { 1146 return 0; 1147 } 1148 throw new TimeoutException("Timeout receiving data"); 1149 } 1150 1151 if(_conn.empty) { 1152 return 0; 1153 } 1154 1155 auto chunk = min(_conn.leastSize, buff.length); 1156 assert(chunk != 0); 1157 _conn.read(cast(ubyte[])buff[0 .. chunk]); 1158 return chunk; 1159 } 1160 1161 @property void readTimeout(Duration timeout) { 1162 if (timeout == 0.seconds) { 1163 _readTimeout = Duration.max; 1164 } 1165 else { 1166 _readTimeout = timeout; 1167 } 1168 } 1169 override TCPVibeStream accept() { 1170 assert(false, "Must be implemented"); 1171 } 1172 override @property void reuseAddr(bool){ 1173 assert(false, "Not Implemented"); 1174 } 1175 override void bind(Address){ 1176 assert(false, "Not Implemented"); 1177 } 1178 override void listen(int){ 1179 assert(false, "Not Implemented"); 1180 } 1181 } 1182 1183 public class SSLVibeStream : TCPVibeStream { 1184 private: 1185 TLSStream _sslStream; 1186 bool _isOpen = true; 1187 SSLOptions _sslOptions; 1188 TCPConnection underlyingConnection; 1189 1190 void connectSSL(string host) { 1191 auto sslctx = createTLSContext(TLSContextKind.client); 1192 if ( _sslOptions.getVerifyPeer() ) { 1193 if ( _sslOptions.getCaCert() == null ) { 1194 throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate."); 1195 } 1196 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert()); 1197 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert; 1198 } else { 1199 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1200 } 1201 immutable keyFile = _sslOptions.getKeyFile(); 1202 immutable certFile = _sslOptions.getCertFile(); 1203 final switch(_sslOptions.haveFiles()) { 1204 case 0b11: // both files 1205 sslctx.usePrivateKeyFile(keyFile); 1206 sslctx.useCertificateChainFile(certFile); 1207 break; 1208 case 0b01: // key only 1209 sslctx.usePrivateKeyFile(keyFile); 1210 sslctx.useCertificateChainFile(keyFile); 1211 break; 1212 case 0b10: // cert only 1213 sslctx.usePrivateKeyFile(certFile); 1214 sslctx.useCertificateChainFile(certFile); 1215 break; 1216 case 0b00: 1217 break; 1218 } 1219 _sslStream = createTLSStream(_conn, sslctx, host); 1220 } 1221 1222 public: 1223 this(SSLOptions opts) { 1224 _sslOptions = opts; 1225 } 1226 override TCPConnection so() { 1227 return _conn; 1228 } 1229 this(NetworkStream ostream, SSLOptions opts, string host = null) { 1230 _sslOptions = opts; 1231 auto oconn = ostream.so(); 1232 underlyingConnection = oconn; 1233 _conn = oconn; 1234 connectSSL(host); 1235 } 1236 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1237 try { 1238 _conn = connectTCP(host, port); 1239 connectSSL(host); 1240 } 1241 catch (ConnectError e) { 1242 throw e; 1243 } 1244 catch (Exception e) { 1245 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1246 } 1247 1248 return this; 1249 } 1250 1251 override ptrdiff_t send(const(void)[] buff) { 1252 _sslStream.write(cast(const(ubyte)[])buff); 1253 return buff.length; 1254 } 1255 1256 override ptrdiff_t receive(void[] buff) { 1257 if (!_sslStream.dataAvailableForRead) { 1258 if (!_conn.waitForData(_readTimeout)) { 1259 if (!_conn.connected) { 1260 return 0; 1261 } 1262 throw new TimeoutException("Timeout receiving data"); 1263 } 1264 } 1265 1266 if(_sslStream.empty) { 1267 return 0; 1268 } 1269 1270 auto chunk = min(_sslStream.leastSize, buff.length); 1271 assert(chunk != 0); 1272 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1273 return chunk; 1274 } 1275 1276 override void close() @trusted { 1277 if ( _sslStream ) 1278 { 1279 _sslStream.finalize(); 1280 } 1281 _conn.close(); 1282 _isOpen = false; 1283 } 1284 @property override bool isOpen() const { 1285 return _conn && _isOpen; 1286 } 1287 override SSLVibeStream accept() { 1288 assert(false, "Must be implemented"); 1289 } 1290 override @property void reuseAddr(bool){ 1291 assert(false, "Not Implemented"); 1292 } 1293 override void bind(Address){ 1294 assert(false, "Not Implemented"); 1295 } 1296 override void listen(int){ 1297 assert(false, "Not Implemented"); 1298 } 1299 } 1300 } 1301 1302 version (vibeD) { 1303 public alias TCPStream = TCPVibeStream; 1304 public alias SSLStream = SSLVibeStream; 1305 } 1306 else { 1307 public alias TCPStream = TCPSocketStream; 1308 public alias SSLStream = SSLSocketStream; 1309 }