1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 20 alias InDataHandler = DataPipeIface!ubyte; 21 22 public class ConnectError: Exception { 23 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 24 super(message, file, line, next); 25 } 26 } 27 28 class DecodingException: Exception { 29 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 30 super(message, file, line, next); 31 } 32 } 33 34 public class TimeoutException: Exception { 35 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 36 super(message, file, line, next); 37 } 38 } 39 40 public class NetworkException: Exception { 41 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 42 super(message, file, line, next); 43 } 44 } 45 46 /** 47 * DataPipeIface can accept some data, process, and return processed data. 48 */ 49 public interface DataPipeIface(E) { 50 /// Is there any processed data ready for reading? 51 bool empty(); 52 /// Put next data portion for processing 53 //void put(E[]); 54 void putNoCopy(E[]); 55 /// Get any ready data 56 E[] get(); 57 /// Signal on end of incoming data stream. 58 void flush(); 59 } 60 /** 61 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 62 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 63 * and uncompress Content-Encoding "gzip". 64 */ 65 public class DataPipe(E) : DataPipeIface!E { 66 67 DataPipeIface!(E)[] pipe; 68 Buffer!E buffer; 69 /// Append data processor to pipeline 70 /// Params: 71 /// p = processor 72 void insert(DataPipeIface!E p) { 73 pipe ~= p; 74 } 75 E[][] process(DataPipeIface!E p, E[][] data) { 76 E[][] result; 77 data.each!(e => p.putNoCopy(e)); 78 while(!p.empty()) result ~= p.get(); 79 return result; 80 } 81 /// Process next data portion. Data passed over pipeline and store result in buffer. 82 /// Params: 83 /// data = input data buffer. 84 /// NoCopy means we do not copy data to buffer, we keep reference 85 void putNoCopy(E[] data) { 86 if ( pipe.empty ) { 87 buffer.putNoCopy(data); 88 return; 89 } 90 try { 91 auto t = process(pipe.front, [data]); 92 foreach(ref p; pipe[1..$]) { 93 t = process(p, t); 94 } 95 t.each!(b => buffer.putNoCopy(b)); 96 } 97 catch (Exception e) { 98 throw new DecodingException(e.msg); 99 } 100 } 101 /// Get what was collected in internal buffer and clear it. 102 /// Returns: 103 /// data collected. 104 E[] get() { 105 if ( buffer.empty ) { 106 return E[].init; 107 } 108 auto res = buffer.data; 109 buffer = Buffer!E.init; 110 return res; 111 } 112 /// 113 /// get without datamove. but user receive [][] 114 /// 115 E[][] getNoCopy() { 116 if ( buffer.empty ) { 117 return E[][].init; 118 } 119 E[][] res = buffer.__repr.__buffer; 120 buffer = Buffer!E.init; 121 return res; 122 } 123 /// Test if internal buffer is empty 124 /// Returns: 125 /// true if internal buffer is empty (nothing to get()) 126 bool empty() pure const @safe { 127 return buffer.empty; 128 } 129 void flush() { 130 E[][] product; 131 foreach(ref p; pipe) { 132 product.each!(e => p.putNoCopy(e)); 133 p.flush(); 134 product.length = 0; 135 while( !p.empty ) product ~= p.get(); 136 } 137 product.each!(b => buffer.putNoCopy(b)); 138 } 139 } 140 141 /** 142 * Processor for gzipped/compressed content. 143 * Also support InputRange interface. 144 */ 145 public class Decompressor(E) : DataPipeIface!E { 146 private { 147 Buffer!ubyte __buff; 148 UnCompress __zlib; 149 } 150 this() { 151 __buff = Buffer!ubyte(); 152 __zlib = new UnCompress(); 153 } 154 override void putNoCopy(E[] data) { 155 if ( __zlib is null ) { 156 __zlib = new UnCompress(); 157 } 158 __buff.putNoCopy(__zlib.uncompress(data)); 159 } 160 override E[] get() pure { 161 assert(__buff.length); 162 auto r = __buff.__repr.__buffer[0]; 163 __buff.popFrontN(r.length); 164 return cast(E[])r; 165 } 166 override void flush() { 167 if ( __zlib is null ) { 168 return; 169 } 170 __buff.put(__zlib.flush()); 171 } 172 override @property bool empty() const pure @safe { 173 debug(requests) tracef("empty=%b", __buff.empty); 174 return __buff.empty; 175 } 176 @property auto ref front() pure const @safe { 177 debug(requests) tracef("front: buff length=%d", __buff.length); 178 return __buff.front; 179 } 180 @property auto popFront() pure @safe { 181 debug(requests) tracef("popFront: buff length=%d", __buff.length); 182 return __buff.popFront; 183 } 184 @property void popFrontN(size_t n) pure @safe { 185 __buff.popFrontN(n); 186 } 187 } 188 189 /** 190 * Unchunk chunked http responce body. 191 */ 192 public class DecodeChunked : DataPipeIface!ubyte { 193 // length := 0 194 // read chunk-size, chunk-extension (if any) and CRLF 195 // while (chunk-size > 0) { 196 // read chunk-data and CRLF 197 // append chunk-data to entity-body 198 // length := length + chunk-size 199 // read chunk-size and CRLF 200 // } 201 // read entity-header 202 // while (entity-header not empty) { 203 // append entity-header to existing header fields 204 // read entity-header 205 // } 206 // Content-Length := length 207 // Remove "chunked" from Transfer-Encoding 208 // 209 210 // Chunked-Body = *chunk 211 // last-chunk 212 // trailer 213 // CRLF 214 // 215 // chunk = chunk-size [ chunk-extension ] CRLF 216 // chunk-data CRLF 217 // chunk-size = 1*HEX 218 // last-chunk = 1*("0") [ chunk-extension ] CRLF 219 // 220 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 221 // chunk-ext-name = token 222 // chunk-ext-val = token | quoted-string 223 // chunk-data = chunk-size(OCTET) 224 // trailer = *(entity-header CRLF) 225 226 alias eType = ubyte; 227 immutable eType[] CRLF = ['\r', '\n']; 228 private { 229 enum States {huntingSize, huntingSeparator, receiving, trailer}; 230 char state = States.huntingSize; 231 size_t chunk_size, to_receive; 232 Buffer!ubyte buff; 233 ubyte[] linebuff; 234 } 235 void putNoCopy(eType[] data) { 236 while ( data.length ) { 237 if ( state == States.trailer ) { 238 to_receive = to_receive - min(to_receive, data.length); 239 return; 240 } 241 if ( state == States.huntingSize ) { 242 import std.ascii; 243 ubyte[10] digits; 244 int i; 245 for(i=0;i<data.length;i++) { 246 ubyte v = data[i]; 247 digits[i] = v; 248 if ( v == '\n' ) { 249 i+=1; 250 break; 251 } 252 } 253 linebuff ~= digits[0..i]; 254 if ( linebuff.length >= 80 ) { 255 throw new DecodingException("Can't find chunk size in the body"); 256 } 257 data = data[i..$]; 258 if (!linebuff.canFind(CRLF)) { 259 continue; 260 } 261 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 262 state = States.receiving; 263 to_receive = chunk_size; 264 if ( chunk_size == 0 ) { 265 to_receive = 2-min(2, data.length); // trailing \r\n 266 state = States.trailer; 267 return; 268 } 269 continue; 270 } 271 if ( state == States.receiving ) { 272 if (to_receive > 0 ) { 273 auto can_store = min(to_receive, data.length); 274 buff.putNoCopy(data[0..can_store]); 275 data = data[can_store..$]; 276 to_receive -= can_store; 277 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 278 if ( to_receive == 0 ) { 279 //tracef("switch to huntig separator"); 280 state = States.huntingSeparator; 281 continue; 282 } 283 continue; 284 } 285 assert(false); 286 } 287 if ( state == States.huntingSeparator ) { 288 if ( data[0] == '\n' || data[0]=='\r') { 289 data = data[1..$]; 290 continue; 291 } 292 state = States.huntingSize; 293 linebuff.length = 0; 294 continue; 295 } 296 } 297 } 298 eType[] get() { 299 auto r = buff.__repr.__buffer[0]; 300 buff.popFrontN(r.length); 301 return r; 302 } 303 void flush() { 304 } 305 bool empty() { 306 debug(requests) tracef("empty=%b", buff.empty); 307 return buff.empty; 308 } 309 bool done() { 310 return state==States.trailer && to_receive==0; 311 } 312 } 313 314 unittest { 315 info("Testing DataPipe"); 316 globalLogLevel(LogLevel.info); 317 alias eType = char; 318 eType[] gzipped = [ 319 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 320 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 321 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 322 0x08, 0x00, 0x00, 0x00 323 ]; // "abc\ndef\n" 324 auto d = new Decompressor!eType(); 325 d.putNoCopy(gzipped[0..2].dup); 326 d.putNoCopy(gzipped[2..10].dup); 327 d.putNoCopy(gzipped[10..$].dup); 328 d.flush(); 329 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 330 331 auto e = new Decompressor!eType(); 332 e.putNoCopy(gzipped[0..10].dup); 333 e.putNoCopy(gzipped[10..$].dup); 334 e.flush(); 335 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 336 // writeln(gzipped.decompress.filter!(a => a!='b').array); 337 auto dp = new DataPipe!eType; 338 dp.insert(new Decompressor!eType()); 339 dp.putNoCopy(gzipped[0..2].dup); 340 dp.putNoCopy(gzipped[2..$].dup); 341 dp.flush(); 342 assert(equal(dp.get(), "abc\ndef\n")); 343 // empty datapipe shoul just pass input to output 344 auto dpu = new DataPipe!ubyte; 345 dpu.putNoCopy("abcd".dup.representation); 346 dpu.putNoCopy("efgh".dup.representation); 347 dpu.flush(); 348 assert(equal(dpu.get(), "abcdefgh")); 349 info("Test unchunker properties"); 350 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation; 351 ubyte[][] result; 352 auto uc = new DecodeChunked(); 353 uc.putNoCopy(twoChunks); 354 while(!uc.empty) { 355 result ~= uc.get(); 356 } 357 assert(equal(result[0], ['1', '2'])); 358 assert(equal(result[1], ['3', '4'])); 359 info("unchunker correctness - ok"); 360 result[0][0] = '5'; 361 assert(twoChunks[3] == '5'); 362 info("unchunker zero copy - ok"); 363 info("Testing DataPipe - done"); 364 } 365 /** 366 * Buffer used to collect and process data from network. It remainds Appender, but support 367 * also Range interface. 368 * $(P To place data in buffer use put() method.) 369 * $(P To retrieve data from buffer you can use several methods:) 370 * $(UL 371 * $(LI Range methods: front, back, index []) 372 * $(LI data method: return collected data (like Appender.data)) 373 * ) 374 */ 375 static this() { 376 } 377 static ~this() { 378 } 379 enum CACHESIZE = 1024; 380 381 static long reprAlloc; 382 static long reprCacheHit; 383 static long reprCacheRequests; 384 385 386 public struct Buffer(T) { 387 // static Repr[CACHESIZE] cache; 388 // static uint cacheIndex; 389 390 private { 391 Repr cachedOrNew() { 392 return new Repr; 393 // reprCacheRequests++; 394 // if ( false && cacheIndex>0 ) { 395 // reprCacheHit++; 396 // cacheIndex -= 1; 397 // return cache[cacheIndex]; 398 // } else { 399 // return new Repr; 400 // } 401 } 402 class Repr { 403 size_t __length; 404 Unqual!T[][] __buffer; 405 this() { 406 reprAlloc++; 407 __length = 0; 408 } 409 this(Repr other) { 410 reprAlloc++; 411 if ( other is null ) 412 return; 413 __length = other.__length; 414 __buffer = other.__buffer.dup; 415 } 416 } 417 Repr __repr; 418 } 419 420 alias toString = data!string; 421 422 this(this) { 423 if ( !__repr ) { 424 return; 425 } 426 __repr = new Repr(__repr); 427 } 428 this(U)(U[] data) { 429 put(data); 430 } 431 ~this() { 432 __repr = null; 433 } 434 /*************** 435 * store data. Data copied 436 */ 437 auto put(U)(U[] data) { 438 if ( data.length == 0 ) { 439 return; 440 } 441 if ( !__repr ) { 442 __repr = cachedOrNew(); 443 } 444 static if (!is(U == T)) { 445 auto d = cast(T[])(data); 446 __repr.__length += d.length; 447 __repr.__buffer ~= d.dup; 448 } else { 449 __repr.__length += data.length; 450 __repr.__buffer ~= data.dup; 451 } 452 return; 453 } 454 auto putNoCopy(U)(U[] data) { 455 if ( data.length == 0 ) { 456 return; 457 } 458 if ( !__repr ) { 459 __repr = cachedOrNew(); 460 } 461 static if (!is(U == T)) { 462 auto d = cast(T[])(data); 463 __repr.__length += d.length; 464 __repr.__buffer ~= d; 465 } else { 466 __repr.__length += data.length; 467 __repr.__buffer ~= data; 468 } 469 return; 470 } 471 @property auto opDollar() const pure @safe { 472 return __repr.__length; 473 } 474 @property auto length() const pure @safe { 475 if ( !__repr ) { 476 return 0; 477 } 478 return __repr.__length; 479 } 480 @property auto empty() const pure @safe { 481 return length == 0; 482 } 483 @property auto ref front() const pure @safe { 484 assert(length); 485 return __repr.__buffer.front.front; 486 } 487 @property auto ref back() const pure @safe { 488 assert(length); 489 return __repr.__buffer.back.back; 490 } 491 @property void popFront() pure @safe { 492 assert(length); 493 with ( __repr ) { 494 __buffer.front.popFront; 495 if ( __buffer.front.length == 0 ) { 496 __buffer.popFront; 497 } 498 __length--; 499 } 500 } 501 @property void popFrontN(size_t n) pure @safe { 502 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 503 __repr.__length -= n; 504 while( n ) { 505 if ( n <= __repr.__buffer.front.length ) { 506 __repr.__buffer.front.popFrontN(n); 507 if ( __repr.__buffer.front.length == 0 ) { 508 __repr.__buffer.popFront; 509 } 510 return; 511 } 512 n -= __repr.__buffer.front.length; 513 __repr.__buffer.popFront; 514 } 515 } 516 @property void popBack() pure @safe { 517 assert(length); 518 __repr.__buffer.back.popBack; 519 if ( __repr.__buffer.back.length == 0 ) { 520 __repr.__buffer.popBack; 521 } 522 __repr.__length--; 523 } 524 @property void popBackN(size_t n) pure @safe { 525 assert(n <= length, "n: %d, length: %d".format(n, length)); 526 __repr.__length -= n; 527 while( n ) { 528 if ( n <= __repr.__buffer.back.length ) { 529 __repr.__buffer.back.popBackN(n); 530 if ( __repr.__buffer.back.length == 0 ) { 531 __repr.__buffer.popBack; 532 } 533 return; 534 } 535 n -= __repr.__buffer.back.length; 536 __repr.__buffer.popBack; 537 } 538 } 539 @property auto save() @safe { 540 auto n = Buffer!T(); 541 n.__repr = new Repr(__repr); 542 return n; 543 } 544 @property auto ref opIndex(size_t n) const pure @safe { 545 assert( __repr && n < __repr.__length ); 546 foreach(b; __repr.__buffer) { 547 if ( n < b.length ) { 548 return b[n]; 549 } 550 n -= b.length; 551 } 552 assert(false, "Impossible"); 553 } 554 Buffer!T opSlice(size_t m, size_t n) { 555 if ( empty || m == n ) { 556 return Buffer!T(); 557 } 558 assert( m <= n && n <= __repr.__length); 559 auto res = this.save(); 560 res.popBackN(res.__repr.__length-n); 561 res.popFrontN(m); 562 return res; 563 } 564 @property auto data(U=T[])() pure { 565 static if ( is(U==T[]) ) { 566 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 567 return __repr.__buffer.front; 568 } 569 } 570 Appender!(T[]) a; 571 if ( __repr && __repr.__buffer ) { 572 foreach(ref b; __repr.__buffer) { 573 a.put(b); 574 } 575 } 576 static if ( is(U==T[]) ) { 577 return a.data; 578 } else { 579 return cast(U)a.data; 580 } 581 } 582 string opCast(string)() { 583 return this.toString; 584 } 585 bool opEquals(U)(U x) { 586 return cast(U)this == x; 587 } 588 589 } 590 /// 591 public unittest { 592 593 static assert(isInputRange!(Buffer!ubyte)); 594 static assert(isForwardRange!(Buffer!ubyte)); 595 static assert(hasLength!(Buffer!ubyte)); 596 static assert(hasSlicing!(Buffer!ubyte)); 597 static assert(isBidirectionalRange!(Buffer!ubyte)); 598 static assert(isRandomAccessRange!(Buffer!ubyte)); 599 600 auto b = Buffer!ubyte(); 601 b.put("abc".representation.dup); 602 b.put("def".representation.dup); 603 assert(b.length == 6); 604 assert(b.toString == "abcdef"); 605 assert(b.front == 'a'); 606 assert(b.back == 'f'); 607 assert(equal(b[0..$], "abcdef")); 608 assert(equal(b[$-2..$], "ef")); 609 assert(b == "abcdef"); 610 b.popFront; 611 b.popBack; 612 assert(b.front == 'b'); 613 assert(b.back == 'e'); 614 assert(b.length == 4); 615 assert(retro(b).front == 'e'); 616 assert(countUntil(b, 'e') == 3); 617 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 618 assert(equal(b, "bcde")); 619 b.popFront; b.popFront; 620 assert(b.front == 'd'); 621 assert(b.front == b[0]); 622 assert(b.back == b[$-1]); 623 624 auto c = Buffer!ubyte(); 625 c.put("Header0: value0\n".representation.dup); 626 c.put("Header1: value1\n".representation.dup); 627 c.put("Header2: value2\n\nbody".representation.dup); 628 auto c_length = c.length; 629 auto eoh = countUntil(c, "\n\n"); 630 assert(eoh == 47); 631 foreach(header; c[0..eoh].splitter('\n') ) { 632 writeln(cast(string)header.data); 633 } 634 assert(equal(findSplit(c, "\n\n")[2], "body")); 635 assert(c.length == c_length); 636 } 637 638 public struct SSLOptions { 639 enum filetype { 640 pem, 641 asn1, 642 der = asn1, 643 } 644 private { 645 /** 646 * do we need to veryfy peer? 647 */ 648 bool _verifyPeer = false; 649 /** 650 * path to CA cert 651 */ 652 string _caCert; 653 /** 654 * path to key file (can also contain cert (for pem) 655 */ 656 string _keyFile; 657 /** 658 * path to cert file (can also contain key (for pem) 659 */ 660 string _certFile; 661 filetype _keyType = filetype.pem; 662 filetype _certType = filetype.pem; 663 } 664 ubyte haveFiles() pure nothrow @safe @nogc { 665 ubyte r = 0; 666 if ( _keyFile ) r|=1; 667 if ( _certFile ) r|=2; 668 return r; 669 } 670 // do we want to verify peer certificates? 671 bool getVerifyPeer() pure nothrow @nogc { 672 return _verifyPeer; 673 } 674 SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe { 675 _verifyPeer = v; 676 return this; 677 } 678 /// set key file name and type (default - pem) 679 auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 680 _keyFile = f; 681 _keyType = t; 682 return this; 683 } 684 auto getKeyFile() @safe pure nothrow @nogc { 685 return _keyFile; 686 } 687 auto getKeyType() @safe pure nothrow @nogc { 688 return _keyType; 689 } 690 /// set cert file name and type (default - pem) 691 auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 692 _certFile = f; 693 _certType = t; 694 return this; 695 } 696 auto setCaCert(string p) @safe pure nothrow @nogc { 697 _caCert = p; 698 return this; 699 } 700 auto getCaCert() @safe pure nothrow @nogc { 701 return _caCert; 702 } 703 auto getCertFile() @safe pure nothrow @nogc { 704 return _certFile; 705 } 706 auto getCertType() @safe pure nothrow @nogc { 707 return _certType; 708 } 709 /// set key file type 710 void setKeyType(string t) @safe pure nothrow { 711 _keyType = cast(filetype)sslKeyTypes[t]; 712 } 713 /// set cert file type 714 void setCertType(string t) @safe pure nothrow { 715 _certType = cast(filetype)sslKeyTypes[t]; 716 } 717 } 718 static immutable int[string] sslKeyTypes; 719 shared static this() { 720 sslKeyTypes = [ 721 "pem":SSLOptions.filetype.pem, 722 "asn1":SSLOptions.filetype.asn1, 723 "der":SSLOptions.filetype.der, 724 ]; 725 } 726 727 version(vibeD) { 728 } 729 else { 730 extern(C) { 731 int SSL_library_init(); 732 void OpenSSL_add_all_ciphers(); 733 void OpenSSL_add_all_digests(); 734 void SSL_load_error_strings(); 735 736 struct SSL {} 737 struct SSL_CTX {} 738 struct SSL_METHOD {} 739 740 SSL_CTX* SSL_CTX_new(const SSL_METHOD* method); 741 SSL* SSL_new(SSL_CTX*); 742 int SSL_set_fd(SSL*, int); 743 int SSL_connect(SSL*); 744 int SSL_write(SSL*, const void*, int); 745 int SSL_read(SSL*, void*, int); 746 int SSL_shutdown(SSL*) @trusted @nogc nothrow; 747 void SSL_free(SSL*); 748 void SSL_CTX_free(SSL_CTX*); 749 750 long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg); 751 752 long SSL_CTX_set_mode(SSL_CTX *ctx, long mode); 753 int SSL_CTX_set_default_verify_paths(SSL_CTX *ctx); 754 int SSL_CTX_load_verify_locations(SSL_CTX *ctx, const char *CAfile, const char *CApath); 755 void SSL_CTX_set_verify(SSL_CTX *ctx, int mode, void *); 756 long SSL_set_mode(SSL *ssl, long mode); 757 int SSL_CTX_use_PrivateKey_file(SSL_CTX *ctx, const char *file, int type); 758 int SSL_CTX_use_certificate_file(SSL_CTX *ctx, const char *file, int type); 759 760 long SSL_CTX_get_mode(SSL_CTX *ctx); 761 long SSL_get_mode(SSL *ssl); 762 763 long ERR_get_error(); 764 char* ERR_reason_error_string(ulong e); 765 766 SSL_METHOD* SSLv3_client_method(); 767 SSL_METHOD* TLSv1_2_client_method(); 768 SSL_METHOD* TLSv1_client_method(); 769 } 770 771 enum SSL_VERIFY_PEER = 0x01; 772 enum SSL_FILETYPE_PEM = 1; 773 enum SSL_FILETYPE_ASN1 = 2; 774 775 immutable int[SSLOptions.filetype] ft2ssl; 776 777 shared static this() { 778 SSL_library_init(); 779 OpenSSL_add_all_ciphers(); 780 OpenSSL_add_all_digests(); 781 SSL_load_error_strings(); 782 ft2ssl = [ 783 SSLOptions.filetype.pem: SSL_FILETYPE_PEM, 784 SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1, 785 SSLOptions.filetype.der: SSL_FILETYPE_ASN1 786 ]; 787 } 788 789 public class OpenSslSocket : Socket { 790 //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 791 private SSL* ssl; 792 private SSL_CTX* ctx; 793 private void initSsl(SSLOptions opts) { 794 //ctx = SSL_CTX_new(SSLv3_client_method()); 795 ctx = SSL_CTX_new(TLSv1_client_method()); 796 assert(ctx !is null); 797 if ( opts.getVerifyPeer() ) { 798 SSL_CTX_set_default_verify_paths(ctx); 799 if ( opts.getCaCert() ) { 800 SSL_CTX_load_verify_locations(ctx, opts.getCaCert().toStringz(), null); 801 } 802 SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null); 803 } 804 immutable keyFile = opts.getKeyFile(); 805 immutable keyType = opts.getKeyType(); 806 immutable certFile = opts.getCertFile(); 807 immutable certType = opts.getCertType(); 808 final switch(opts.haveFiles()) { 809 case 0b11: // both files 810 SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 811 SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]); 812 break; 813 case 0b01: // key only 814 SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 815 SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 816 break; 817 case 0b10: // cert only 818 SSL_CTX_use_PrivateKey_file(ctx, certFile.toStringz(), ft2ssl[certType]); 819 SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]); 820 break; 821 case 0b00: 822 break; 823 } 824 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 825 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 826 ssl = SSL_new(ctx); 827 SSL_set_fd(ssl, this.handle); 828 } 829 830 @trusted 831 override void connect(Address dest) { 832 super.connect(dest); 833 if(SSL_connect(ssl) == -1) { 834 throw new Exception("ssl connect failed: %s".format(to!string(ERR_reason_error_string(ERR_get_error())))); 835 } 836 } 837 838 @trusted 839 override ptrdiff_t send(const(void)[] buf, SocketFlags flags) { 840 return SSL_write(ssl, buf.ptr, cast(uint) buf.length); 841 } 842 override ptrdiff_t send(const(void)[] buf) { 843 return send(buf, SocketFlags.NONE); 844 } 845 @trusted 846 override ptrdiff_t receive(void[] buf, SocketFlags flags) { 847 return SSL_read(ssl, buf.ptr, cast(int)buf.length); 848 } 849 override ptrdiff_t receive(void[] buf) { 850 return receive(buf, SocketFlags.NONE); 851 } 852 this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) { 853 super(af, type); 854 initSsl(opts); 855 } 856 857 this(socket_t sock, AddressFamily af) { 858 super(sock, af); 859 initSsl(SSLOptions()); 860 } 861 override void close() { 862 //SSL_shutdown(ssl); 863 super.close(); 864 } 865 ~this() { 866 SSL_free(ssl); 867 SSL_CTX_free(ctx); 868 } 869 } 870 871 public class SSLSocketStream: SocketStream { 872 SSLOptions _sslOptions; 873 874 this(SSLOptions opts) { 875 _sslOptions = opts; 876 } 877 878 override void open(AddressFamily fa) { 879 if ( s !is null ) { 880 s.close(); 881 } 882 s = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions); 883 assert(s !is null, "Can't create socket"); 884 __isOpen = true; 885 } 886 override SSLSocketStream accept() { 887 auto newso = s.accept(); 888 if ( s is null ) { 889 return null; 890 } 891 auto newstream = new SSLSocketStream(_sslOptions); 892 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 893 newstream.s = sslSocket; 894 newstream.__isOpen = true; 895 newstream.__isConnected = true; 896 return newstream; 897 } 898 } 899 } 900 901 public interface NetworkStream { 902 @property bool isConnected() const; 903 @property bool isOpen() const; 904 905 void close() @trusted; 906 907 /// 908 /// timeout is the socket write timeout. 909 /// 910 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 911 912 ptrdiff_t send(const(void)[] buff); 913 ptrdiff_t receive(void[] buff); 914 915 NetworkStream accept(); 916 @property void reuseAddr(bool); 917 void bind(Address); 918 void listen(int); 919 920 /// 921 /// Set timeout for receive calls. 0 means no timeout. 922 /// 923 @property void readTimeout(Duration timeout); 924 } 925 926 public abstract class SocketStream : NetworkStream { 927 private { 928 Duration timeout; 929 Socket s; 930 bool __isOpen; 931 bool __isConnected; 932 } 933 void open(AddressFamily fa) { 934 } 935 @property ref Socket so() @safe pure { 936 return s; 937 } 938 @property bool isOpen() @safe @nogc pure const { 939 return s && __isOpen; 940 } 941 @property bool isConnected() @safe @nogc pure const { 942 return s && __isOpen && __isConnected; 943 } 944 void close() @trusted { 945 debug(requests) tracef("Close socket"); 946 if ( isOpen ) { 947 s.close(); 948 __isOpen = false; 949 __isConnected = false; 950 } 951 s = null; 952 } 953 954 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 955 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 956 Address[] addresses; 957 __isConnected = false; 958 try { 959 addresses = getAddress(host, port); 960 } catch (Exception e) { 961 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 962 } 963 foreach(a; addresses) { 964 debug(requests) tracef("Trying %s", a); 965 try { 966 open(a.addressFamily); 967 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 968 s.connect(a); 969 debug(requests) tracef("Connected to %s", a); 970 __isConnected = true; 971 break; 972 } catch (SocketException e) { 973 warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 974 s.close(); 975 } 976 } 977 if ( !__isConnected ) { 978 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 979 } 980 return this; 981 } 982 983 ptrdiff_t send(const(void)[] buff) @safe 984 in {assert(isConnected);} 985 body { 986 auto rc = s.send(buff); 987 if (rc < 0) { 988 close(); 989 throw new NetworkException("sending data"); 990 } 991 return rc; 992 } 993 994 ptrdiff_t receive(void[] buff) @safe { 995 while (true) { 996 auto r = s.receive(buff); 997 if (r < 0) { 998 version(Windows) { 999 close(); 1000 if ( errno == 0 ) { 1001 throw new TimeoutException("Timeout receiving data"); 1002 } 1003 throw new NetworkException("receiving data"); 1004 } 1005 version(Posix) { 1006 if ( errno == EINTR ) { 1007 continue; 1008 } 1009 close(); 1010 if ( errno == EAGAIN ) { 1011 throw new TimeoutException("Timeout receiving data"); 1012 } 1013 throw new NetworkException("receiving data"); 1014 } 1015 } 1016 else { 1017 buff.length = r; 1018 } 1019 return r; 1020 } 1021 assert(false); 1022 } 1023 1024 @property void readTimeout(Duration timeout) @safe { 1025 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 1026 } 1027 override SocketStream accept() { 1028 assert(false, "Implement before use"); 1029 } 1030 @property override void reuseAddr(bool yes){ 1031 if (yes) { 1032 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 1033 } 1034 else { 1035 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 1036 } 1037 } 1038 override void bind(Address addr){ 1039 s.bind(addr); 1040 } 1041 override void listen(int n) { 1042 s.listen(n); 1043 }; 1044 } 1045 1046 public class TCPSocketStream : SocketStream { 1047 override void open(AddressFamily fa) { 1048 if ( s !is null ) { 1049 s.close(); 1050 } 1051 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 1052 assert(s !is null, "Can't create socket"); 1053 __isOpen = true; 1054 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 1055 } 1056 override TCPSocketStream accept() { 1057 auto newso = s.accept(); 1058 if ( s is null ) { 1059 return null; 1060 } 1061 auto newstream = new TCPSocketStream(); 1062 newstream.s = newso; 1063 newstream.__isOpen = true; 1064 newstream.__isConnected = true; 1065 newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 1066 return newstream; 1067 } 1068 } 1069 1070 version (vibeD) { 1071 import vibe.core.net, vibe.stream.tls; 1072 1073 public class TCPVibeStream : NetworkStream { 1074 private: 1075 TCPConnection _conn; 1076 Duration _readTimeout = Duration.max; 1077 bool _isOpen = true; 1078 1079 public: 1080 @property bool isConnected() const { 1081 return _conn.connected; 1082 } 1083 @property override bool isOpen() const { 1084 return _conn && _isOpen; 1085 } 1086 void close() @trusted { 1087 _conn.close(); 1088 _isOpen = false; 1089 } 1090 1091 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1092 // FIXME: timeout not supported in vibe.d 1093 try { 1094 _conn = connectTCP(host, port); 1095 } 1096 catch (Exception e) 1097 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1098 1099 return this; 1100 } 1101 1102 ptrdiff_t send(const(void)[] buff) { 1103 _conn.write(cast(const(ubyte)[])buff); 1104 return buff.length; 1105 } 1106 1107 ptrdiff_t receive(void[] buff) { 1108 if (!_conn.waitForData(_readTimeout)) { 1109 if (!_conn.connected) { 1110 return 0; 1111 } 1112 throw new TimeoutException("Timeout receiving data"); 1113 } 1114 1115 if(_conn.empty) { 1116 return 0; 1117 } 1118 1119 auto chunk = min(_conn.leastSize, buff.length); 1120 assert(chunk != 0); 1121 _conn.read(cast(ubyte[])buff[0 .. chunk]); 1122 return chunk; 1123 } 1124 1125 @property void readTimeout(Duration timeout) { 1126 if (timeout == 0.seconds) { 1127 _readTimeout = Duration.max; 1128 } 1129 else { 1130 _readTimeout = timeout; 1131 } 1132 } 1133 override TCPVibeStream accept() { 1134 assert(false, "Must be implemented"); 1135 } 1136 override @property void reuseAddr(bool){ 1137 assert(false, "Not Implemented"); 1138 } 1139 override void bind(Address){ 1140 assert(false, "Not Implemented"); 1141 } 1142 override void listen(int){ 1143 assert(false, "Not Implemented"); 1144 } 1145 } 1146 1147 public class SSLVibeStream : TCPVibeStream { 1148 private: 1149 Stream _sslStream; 1150 bool _isOpen = true; 1151 SSLOptions _sslOptions; 1152 1153 public: 1154 this(SSLOptions opts) { 1155 _sslOptions = opts; 1156 } 1157 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1158 try { 1159 _conn = connectTCP(host, port); 1160 auto sslctx = createTLSContext(TLSContextKind.client); 1161 if ( _sslOptions.getVerifyPeer() ) { 1162 if ( _sslOptions.getCaCert() == null ) { 1163 throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate."); 1164 } 1165 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert()); 1166 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert; 1167 } else { 1168 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1169 } 1170 immutable keyFile = _sslOptions.getKeyFile(); 1171 immutable certFile = _sslOptions.getCertFile(); 1172 final switch(_sslOptions.haveFiles()) { 1173 case 0b11: // both files 1174 sslctx.usePrivateKeyFile(keyFile); 1175 sslctx.useCertificateChainFile(certFile); 1176 break; 1177 case 0b01: // key only 1178 sslctx.usePrivateKeyFile(keyFile); 1179 sslctx.useCertificateChainFile(keyFile); 1180 break; 1181 case 0b10: // cert only 1182 sslctx.usePrivateKeyFile(certFile); 1183 sslctx.useCertificateChainFile(certFile); 1184 break; 1185 case 0b00: 1186 break; 1187 } 1188 _sslStream = createTLSStream(_conn, sslctx, host); 1189 } 1190 catch (ConnectError e) { 1191 throw e; 1192 } 1193 catch (Exception e) { 1194 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1195 } 1196 1197 return this; 1198 } 1199 1200 override ptrdiff_t send(const(void)[] buff) { 1201 _sslStream.write(cast(const(ubyte)[])buff); 1202 return buff.length; 1203 } 1204 1205 override ptrdiff_t receive(void[] buff) { 1206 if (!_sslStream.dataAvailableForRead) { 1207 if (!_conn.waitForData(_readTimeout)) { 1208 if (!_conn.connected) { 1209 return 0; 1210 } 1211 throw new TimeoutException("Timeout receiving data"); 1212 } 1213 } 1214 1215 if(_sslStream.empty) { 1216 return 0; 1217 } 1218 1219 auto chunk = min(_sslStream.leastSize, buff.length); 1220 assert(chunk != 0); 1221 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1222 return chunk; 1223 } 1224 1225 override void close() @trusted { 1226 _sslStream.finalize(); 1227 _conn.close(); 1228 _isOpen = false; 1229 } 1230 @property override bool isOpen() const { 1231 return _conn && _isOpen; 1232 } 1233 override SSLVibeStream accept() { 1234 assert(false, "Must be implemented"); 1235 } 1236 override @property void reuseAddr(bool){ 1237 assert(false, "Not Implemented"); 1238 } 1239 override void bind(Address){ 1240 assert(false, "Not Implemented"); 1241 } 1242 override void listen(int){ 1243 assert(false, "Not Implemented"); 1244 } 1245 } 1246 } 1247 1248 version (vibeD) { 1249 public alias TCPStream = TCPVibeStream; 1250 public alias SSLStream = SSLVibeStream; 1251 } 1252 else { 1253 public alias TCPStream = TCPSocketStream; 1254 public alias SSLStream = SSLSocketStream; 1255 }