1 module requests.http; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.ascii; 7 import std.conv; 8 import std.datetime; 9 import std.exception; 10 import std.format; 11 import std.stdio; 12 import std.range; 13 import std.string; 14 import std.traits; 15 import std.typecons; 16 import std.experimental.logger; 17 import core.thread; 18 19 import requests.streams; 20 import requests.uri; 21 import requests.utils; 22 import requests.base; 23 import requests.connmanager; 24 import requests.rangeadapter; 25 26 static immutable ushort[] redirectCodes = [301, 302, 303, 307, 308]; 27 enum HTTP11 = 101; 28 enum HTTP10 = 100; 29 30 static immutable string[string] proxies; 31 static this() { 32 import std.process; 33 proxies["http"] = environment.get("http_proxy", environment.get("HTTP_PROXY")); 34 proxies["https"] = environment.get("https_proxy", environment.get("HTTPS_PROXY")); 35 proxies["all"] = environment.get("all_proxy", environment.get("ALL_PROXY")); 36 foreach(p; proxies.byKey()) { 37 if (proxies[p] is null) { 38 continue; 39 } 40 URI u = URI(proxies[p]); 41 } 42 } 43 44 public class MaxRedirectsException: Exception { 45 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 46 super(message, file, line, next); 47 } 48 } 49 50 /// 51 /// 52 /// 53 //public auto queryParams(T...)(T params) pure nothrow @safe 54 //{ 55 // static assert (T.length % 2 == 0, "wrong args count"); 56 // 57 // QueryParam[] output; 58 // output.reserve = T.length / 2; 59 // 60 // void queryParamsHelper(T...)(T params, ref QueryParam[] output) 61 // { 62 // static if (T.length > 0) 63 // { 64 // output ~= QueryParam(params[0].to!string, params[1].to!string); 65 // queryParamsHelper(params[2..$], output); 66 // } 67 // } 68 // 69 // queryParamsHelper(params, output); 70 // return output; 71 //} 72 73 /// 74 /// Response - result of request execution. 75 /// 76 /// Response.code - response HTTP code. 77 /// Response.status_line - received HTTP status line. 78 /// Response.responseHeaders - received headers. 79 /// Response.responseBody - container for received body 80 /// Response.history - for redirected responses contain all history 81 /// 82 public class HTTPResponse : Response { 83 private { 84 string _status_line; 85 86 HTTPResponse[] _history; // redirects history 87 88 mixin(Setter!string("status_line")); 89 90 int _version; 91 } 92 93 ~this() { 94 _responseHeaders = null; 95 _history.length = 0; 96 } 97 98 mixin(Getter("status_line")); 99 100 @property final string[string] responseHeaders() @safe @nogc nothrow { 101 return _responseHeaders; 102 } 103 @property final HTTPResponse[] history() @safe @nogc nothrow { 104 return _history; 105 } 106 107 @property auto getStats() const pure @safe { 108 alias statTuple = Tuple!(Duration, "connectTime", 109 Duration, "sendTime", 110 Duration, "recvTime"); 111 statTuple stat; 112 stat.connectTime = _connectedAt - _startedAt; 113 stat.sendTime = _requestSentAt - _connectedAt; 114 stat.recvTime = _finishedAt - _requestSentAt; 115 return stat; 116 } 117 private int parse_version(in string v) pure const nothrow @safe { 118 // try to parse HTTP/1.x to version 119 try if ( v.length > 5 ) { 120 return (v[5..$].split(".").map!"to!int(a)".array[0..2].reduce!((a,b) => a*100 + b)); 121 } catch (Exception e) { 122 } 123 return 0; 124 } 125 unittest { 126 auto r = new HTTPResponse(); 127 assert(r.parse_version("HTTP/1.1") == 101); 128 assert(r.parse_version("HTTP/1.0") == 100); 129 assert(r.parse_version("HTTP/0.9") == 9); 130 assert(r.parse_version("HTTP/xxx") == 0); 131 } 132 } 133 134 /// 135 /// Request. 136 /// Configurable parameters: 137 /// $(B method) - string, method to use (GET, POST, ...) 138 /// $(B headers) - string[string], add any additional headers you'd like to send. 139 /// $(B authenticator) - class Auth, class to send auth headers. 140 /// $(B keepAlive) - bool, set true for keepAlive requests. default true. 141 /// $(B maxRedirects) - uint, maximum number of redirects. default 10. 142 /// $(B maxHeadersLength) - size_t, maximum length of server response headers. default = 32KB. 143 /// $(B maxContentLength) - size_t, maximun content length. delault - 0 = unlimited. 144 /// $(B bufferSize) - size_t, send and receive buffer size. default = 16KB. 145 /// $(B verbosity) - uint, level of verbosity(0 - nothing, 1 - headers, 2 - headers and body progress). default = 0. 146 /// $(B proxy) - string, set proxy url if needed. default - null. 147 /// $(B cookie) - Tuple Cookie, Read/Write cookie You can get cookie setted by server, or set cookies before doing request. 148 /// $(B timeout) - Duration, Set timeout value for connect/receive/send. 149 /// 150 public struct HTTPRequest { 151 private { 152 string _method = "GET"; 153 URI _uri; 154 string[string] _headers; 155 string[] _filteredHeaders; 156 Auth _authenticator; 157 bool _keepAlive = true; 158 uint _maxRedirects = 10; 159 size_t _maxHeadersLength = 32 * 1024; // 32 KB 160 size_t _maxContentLength; // 0 - Unlimited 161 string _proxy; 162 uint _verbosity = 0; // 0 - no output, 1 - headers, 2 - headers+body info 163 Duration _timeout = 30.seconds; 164 size_t _bufferSize = 16*1024; // 16k 165 bool _useStreaming; // return iterator instead of completed request 166 167 HTTPResponse[] _history; // redirects history 168 DataPipe!ubyte _bodyDecoder; 169 DecodeChunked _unChunker; 170 long _contentLength; 171 long _contentReceived; 172 SSLOptions _sslOptions; 173 string _bind; 174 _UH _userHeaders; 175 176 RefCounted!ConnManager _cm; 177 RefCounted!Cookies _cookie; 178 string[URI] _permanent_redirects; // cache 301 redirects for GET requests 179 MultipartForm _multipartForm; 180 181 NetStreamFactory _socketFactory; 182 183 QueryParam[] _params; 184 string _contentType; 185 InputRangeAdapter _postData; 186 } 187 package HTTPResponse _response; 188 189 mixin(Getter_Setter!string ("method")); 190 mixin(Getter_Setter!bool ("keepAlive")); 191 mixin(Getter_Setter!size_t ("maxContentLength")); 192 mixin(Getter_Setter!size_t ("maxHeadersLength")); 193 mixin(Getter_Setter!size_t ("bufferSize")); 194 mixin(Getter_Setter!uint ("maxRedirects")); 195 mixin(Getter_Setter!uint ("verbosity")); 196 mixin(Getter ("proxy")); 197 mixin(Getter_Setter!Duration ("timeout")); 198 mixin(Setter!Auth ("authenticator")); 199 mixin(Getter_Setter!bool ("useStreaming")); 200 mixin(Getter ("contentLength")); 201 mixin(Getter ("contentReceived")); 202 mixin(Getter_Setter!SSLOptions ("sslOptions")); 203 mixin(Getter_Setter!string ("bind")); 204 mixin(Setter!NetStreamFactory ("socketFactory")); 205 206 @property void sslSetVerifyPeer(bool v) pure @safe nothrow @nogc { 207 _sslOptions.setVerifyPeer(v); 208 } 209 @property void sslSetKeyFile(string p, SSLOptions.filetype t = SSLOptions.filetype.pem) pure @safe nothrow @nogc { 210 _sslOptions.setKeyFile(p, t); 211 } 212 @property void sslSetCertFile(string p, SSLOptions.filetype t = SSLOptions.filetype.pem) pure @safe nothrow @nogc { 213 _sslOptions.setCertFile(p, t); 214 } 215 @property void sslSetCaCert(string path) pure @safe nothrow @nogc { 216 _sslOptions.setCaCert(path); 217 } 218 //@property final void cookie(Cookie[] s) pure @safe @nogc nothrow { 219 // _cookie = s; 220 //} 221 @property final void proxy(string v) { 222 if ( v != _proxy ) { 223 _cm.clear(); 224 } 225 _proxy = v; 226 } 227 //@property final Cookie[] cookie() pure @safe @nogc nothrow { 228 // return _cookie; 229 //} 230 231 this(string uri) { 232 _uri = URI(uri); 233 _cm = ConnManager(10); 234 } 235 ~this() { 236 _headers = null; 237 _authenticator = null; 238 _history = null; 239 _bodyDecoder = null; 240 _unChunker = null; 241 //if ( _cm ) { 242 // _cm.clear(); 243 //} 244 } 245 string toString() const { 246 return "HTTPRequest(%s, %s)".format(_method, _uri.uri()); 247 } 248 string format(string fmt) const { 249 import std.array; 250 import std.stdio; 251 auto a = appender!string(); 252 auto f = FormatSpec!char(fmt); 253 while (f.writeUpToNextSpec(a)) { 254 switch(f.spec) { 255 case 'h': 256 // Remote hostname. 257 a.put(_uri.host); 258 break; 259 case 'm': 260 // method. 261 a.put(_method); 262 break; 263 case 'p': 264 // Remote port. 265 a.put("%d".format(_uri.port)); 266 break; 267 case 'P': 268 // Path 269 a.put(_uri.path); 270 break; 271 case 'q': 272 // query parameters supplied with url. 273 a.put(_uri.query); 274 break; 275 case 'U': 276 a.put(_uri.uri()); 277 break; 278 default: 279 throw new FormatException("Unknown Request format spec " ~ f.spec); 280 } 281 } 282 return a.data(); 283 } 284 string select_proxy(string scheme) { 285 if ( _proxy is null && proxies.length == 0 ) { 286 debug(requests) tracef("proxy=null"); 287 return null; 288 } 289 if ( _proxy ) { 290 debug(requests) tracef("proxy=%s", _proxy); 291 return _proxy; 292 } 293 auto p = scheme in proxies; 294 if ( p !is null && *p != "") { 295 debug(requests) tracef("proxy=%s", *p); 296 return *p; 297 } 298 p = "all" in proxies; 299 if ( p !is null && *p != "") { 300 debug(requests) tracef("proxy=%s", *p); 301 return *p; 302 } 303 debug(requests) tracef("proxy=null"); 304 return null; 305 } 306 void clearHeaders() { 307 _headers = null; 308 } 309 @property void uri(in URI newURI) { 310 //handleURLChange(_uri, newURI); 311 _uri = newURI; 312 } 313 /// Add headers to request 314 /// Params: 315 /// headers = headers to send. 316 void addHeaders(in string[string] headers) { 317 foreach(pair; headers.byKeyValue) { 318 string _h = pair.key; 319 switch(toLower(_h)) { 320 case "host": 321 _userHeaders.Host = true; 322 break; 323 case "user-agent": 324 _userHeaders.UserAgent = true; 325 break; 326 case "content-length": 327 _userHeaders.ContentLength = true; 328 break; 329 case "content-type": 330 _userHeaders.ContentType = true; 331 break; 332 case "connection": 333 _userHeaders.Connection = true; 334 break; 335 case "cookie": 336 _userHeaders.Cookie = true; 337 break; 338 default: 339 break; 340 } 341 _headers[pair.key] = pair.value; 342 } 343 } 344 private void safeSetHeader(ref string[string] headers, bool userAdded, string h, string v) pure @safe { 345 if ( !userAdded ) { 346 headers[h] = v; 347 } 348 } 349 /// Remove headers from request 350 /// Params: 351 /// headers = headers to remove. 352 void removeHeaders(in string[] headers) pure { 353 _filteredHeaders ~= headers; 354 } 355 /// 356 /// compose headers to send 357 /// 358 private string[string] requestHeaders() { 359 360 string[string] generatedHeaders; 361 362 if ( _authenticator ) { 363 _authenticator. 364 authHeaders(_uri.host). 365 byKeyValue. 366 each!(pair => generatedHeaders[pair.key] = pair.value); 367 } 368 369 _headers.byKey.each!(h => generatedHeaders[h] = _headers[h]); 370 371 safeSetHeader(generatedHeaders, _userHeaders.AcceptEncoding, "Accept-Encoding", "gzip,deflate"); 372 safeSetHeader(generatedHeaders, _userHeaders.UserAgent, "User-Agent", "dlang-requests"); 373 safeSetHeader(generatedHeaders, _userHeaders.Connection, "Connection", _keepAlive?"Keep-Alive":"Close"); 374 375 if ( !_userHeaders.Host ) 376 { 377 generatedHeaders["Host"] = _uri.host; 378 if ( _uri.scheme !in standard_ports || _uri.port != standard_ports[_uri.scheme] ) { 379 generatedHeaders["Host"] ~= ":%d".format(_uri.port); 380 } 381 } 382 383 if ( _cookie._array.length && !_userHeaders.Cookie ) { 384 auto cs = _cookie._array. 385 filter!(c => _uri.path.pathMatches(c.path) && _uri.host.domainMatches(c.domain)). 386 map!(c => "%s=%s".format(c.attr, c.value)). 387 joiner(";"); 388 if ( ! cs.empty ) 389 { 390 generatedHeaders["Cookie"] = to!string(cs); 391 } 392 } 393 394 _filteredHeaders.each!(h => generatedHeaders.remove(h)); 395 396 return generatedHeaders; 397 } 398 /// 399 /// Build request string. 400 /// Handle proxy and query parameters. 401 /// 402 private @property string requestString(QueryParam[] params = null) { 403 auto query = _uri.query.dup; 404 if ( params ) { 405 query ~= "&" ~ params2query(params); 406 if ( query[0] != '?' ) { 407 query = "?" ~ query; 408 } 409 } 410 string actual_proxy = select_proxy(_uri.scheme); 411 if ( actual_proxy && _uri.scheme != "https" ) { 412 return "%s %s%s HTTP/1.1\r\n".format(_method, _uri.uri(No.params), query); 413 } 414 return "%s %s%s HTTP/1.1\r\n".format(_method, _uri.path, query); 415 } 416 /// 417 /// encode parameters and build query part of the url 418 /// 419 private static string params2query(in QueryParam[] params) pure @safe { 420 return params. 421 map!(a => "%s=%s".format(a.key.urlEncoded, a.value.urlEncoded)). 422 join("&"); 423 } 424 // 425 package unittest { 426 assert(params2query(queryParams("a","b", "c", " d "))=="a=b&c=%20d%20"); 427 } 428 /// 429 /// Analyze received headers, take appropriate actions: 430 /// check content length, attach unchunk and uncompress 431 /// 432 private void analyzeHeaders(in string[string] headers) { 433 434 _contentLength = -1; 435 _unChunker = null; 436 auto contentLength = "content-length" in headers; 437 if ( contentLength ) { 438 try { 439 _contentLength = to!long(*contentLength); 440 if ( _maxContentLength && _contentLength > _maxContentLength) { 441 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 442 format(_contentLength, _maxContentLength)); 443 } 444 } catch (ConvException e) { 445 throw new RequestException("Can't convert Content-Length from %s".format(*contentLength)); 446 } 447 } 448 auto transferEncoding = "transfer-encoding" in headers; 449 if ( transferEncoding ) { 450 debug(requests) tracef("transferEncoding: %s", *transferEncoding); 451 if ( (*transferEncoding).toLower == "chunked") { 452 _unChunker = new DecodeChunked(); 453 _bodyDecoder.insert(_unChunker); 454 } 455 } 456 auto contentEncoding = "content-encoding" in headers; 457 if ( contentEncoding ) switch (*contentEncoding) { 458 default: 459 throw new RequestException("Unknown content-encoding " ~ *contentEncoding); 460 case "gzip": 461 case "deflate": 462 _bodyDecoder.insert(new Decompressor!ubyte); 463 } 464 465 } 466 /// 467 /// Called when we know that all headers already received in buffer. 468 /// This routine does not interpret headers content (see analyzeHeaders). 469 /// 1. Split headers on lines 470 /// 2. store status line, store response code 471 /// 3. unfold headers if needed 472 /// 4. store headers 473 /// 474 private void parseResponseHeaders(in ubyte[] input) { 475 string lastHeader; 476 auto buffer = cast(string)input; 477 478 foreach(line; buffer.split("\n").map!(l => l.stripRight)) { 479 if ( ! _response.status_line.length ) { 480 debug (requests) tracef("statusLine: %s", line); 481 _response.status_line = line; 482 if ( _verbosity >= 1 ) { 483 writefln("< %s", line); 484 } 485 auto parsed = line.split(" "); 486 if ( parsed.length >= 2 ) { 487 _response.code = parsed[1].to!ushort; 488 _response._version = _response.parse_version(parsed[0]); 489 } 490 continue; 491 } 492 if ( line[0] == ' ' || line[0] == '\t' ) { 493 // unfolding https://tools.ietf.org/html/rfc822#section-3.1 494 if ( auto stored = lastHeader in _response._responseHeaders) { 495 *stored ~= line; 496 } 497 continue; 498 } 499 auto parsed = line.findSplit(":"); 500 auto header = parsed[0].toLower; 501 auto value = parsed[2].strip; 502 503 if ( _verbosity >= 1 ) { 504 writefln("< %s: %s", header, value); 505 } 506 507 lastHeader = header; 508 debug (requests) tracef("Header %s = %s", header, value); 509 510 if ( header != "set-cookie" ) { 511 auto stored = _response.responseHeaders.get(header, null); 512 if ( stored ) { 513 value = stored ~ "," ~ value; 514 } 515 _response._responseHeaders[header] = value; 516 continue; 517 } 518 _cookie._array ~= processCookie(value); 519 } 520 } 521 522 /// 523 /// Process Set-Cookie header from server response 524 /// 525 private Cookie[] processCookie(string value ) pure { 526 // cookie processing 527 // 528 // as we can't join several set-cookie lines in single line 529 // < Set-Cookie: idx=7f2800f63c112a65ef5082957bcca24b; expires=Mon, 29-May-2017 00:31:25 GMT; path=/; domain=example.com 530 // < Set-Cookie: idx=7f2800f63c112a65ef5082957bcca24b; expires=Mon, 29-May-2017 00:31:25 GMT; path=/; domain=example.com, cs=ip764-RgKqc-HvSkxRxdQQAKW8LA; path=/; domain=.example.com; HttpOnly 531 // 532 Cookie[] res; 533 string[string] kv; 534 auto fields = value.split(";").map!strip; 535 while(!fields.empty) { 536 auto s = fields.front.findSplit("="); 537 fields.popFront; 538 if ( s[1] != "=" ) { 539 continue; 540 } 541 auto k = s[0]; 542 auto v = s[2]; 543 switch(k.toLower()) { 544 case "domain": 545 k = "domain"; 546 break; 547 case "path": 548 k = "path"; 549 break; 550 case "expires": 551 continue; 552 default: 553 break; 554 } 555 kv[k] = v; 556 } 557 if ( "domain" !in kv ) { 558 kv["domain"] = _uri.host; 559 } 560 if ( "path" !in kv ) { 561 kv["path"] = _uri.path; 562 } 563 auto domain = kv["domain"]; kv.remove("domain"); 564 auto path = kv["path"]; kv.remove("path"); 565 foreach(pair; kv.byKeyValue) { 566 auto _attr = pair.key; 567 auto _value = pair.value; 568 auto cookie = Cookie(path, domain, _attr, _value); 569 res ~= cookie; 570 } 571 return res; 572 } 573 /// 574 /// Do we received \r\n\r\n? 575 /// 576 private bool headersHaveBeenReceived(in ubyte[] data, ref Buffer!ubyte buffer, out string separator) const @safe { 577 foreach(s; ["\r\n\r\n", "\n\n"]) { 578 if ( data.canFind(s) || buffer.canFind(s) ) { 579 separator = s; 580 return true; 581 } 582 } 583 return false; 584 } 585 586 private bool willFollowRedirect() { 587 if ( !canFind(redirectCodes, _response.code) ) { 588 return false; 589 } 590 if ( !_maxRedirects ) { 591 return false; 592 } 593 if ( "location" !in _response.responseHeaders ) { 594 return false; 595 } 596 return true; 597 } 598 private URI uriFromLocation(const ref URI uri, in string location) { 599 URI newURI = uri; 600 try { 601 newURI = URI(location); 602 } catch (UriException e) { 603 debug(requests) trace("Can't parse Location:, try relative uri"); 604 newURI.path = location; 605 newURI.uri = newURI.recalc_uri; 606 } 607 return newURI; 608 } 609 /// 610 /// if we have new uri, then we need to check if we have to reopen existent connection 611 /// 612 private void checkURL(string url, string file=__FILE__, size_t line=__LINE__) { 613 if (url is null && _uri.uri == "" ) { 614 throw new RequestException("No url configured", file, line); 615 } 616 617 if ( url !is null ) { 618 URI newURI = URI(url); 619 //handleURLChange(_uri, newURI); 620 _uri = newURI; 621 } 622 } 623 /// 624 /// Setup connection. Handle proxy and https case 625 /// 626 /// Place new connection in ConnManager cache 627 /// 628 private NetworkStream setupConnection() 629 do { 630 631 debug(requests) tracef("Set up new connection"); 632 NetworkStream stream; 633 634 // on exit 635 // place created connection to conn. manager 636 // close connection purged from manager (if any) 637 // 638 scope(exit) { 639 if ( stream ) 640 { 641 if ( auto purged_connection = _cm.put(_uri.scheme, _uri.host, _uri.port, stream) ) 642 { 643 debug(requests) tracef("closing purged connection %s", purged_connection); 644 purged_connection.close(); 645 } 646 } 647 } 648 649 if ( _socketFactory ) 650 { 651 debug(requests) tracef("use socketFactory"); 652 stream = _socketFactory(_uri.scheme, _uri.host, _uri.port); 653 } 654 655 if ( stream ) // socket factory created connection 656 { 657 return stream; 658 } 659 660 URI uri; // this URI will be used temporarry if we need proxy 661 string actual_proxy = select_proxy(_uri.scheme); 662 final switch (_uri.scheme) { 663 case"http": 664 if ( actual_proxy ) { 665 uri.uri_parse(actual_proxy); 666 uri.idn_encode(); 667 } else { 668 // use original uri 669 uri = _uri; 670 } 671 stream = new TCPStream(); 672 stream.bind(_bind); 673 stream.connect(uri.host, uri.port, _timeout); 674 break ; 675 case"https": 676 if ( actual_proxy ) { 677 uri.uri_parse(actual_proxy); 678 uri.idn_encode(); 679 stream = new TCPStream(); 680 stream.bind(_bind); 681 stream.connect(uri.host, uri.port, _timeout); 682 if ( verbosity>=1 ) { 683 writeln("> CONNECT %s:%d HTTP/1.1".format(_uri.host, _uri.port)); 684 } 685 stream.send("CONNECT %s:%d HTTP/1.1\r\n\r\n".format(_uri.host, _uri.port)); 686 while ( stream.isConnected ) { 687 ubyte[1024] b; 688 auto read = stream.receive(b); 689 if ( verbosity>=1) { 690 writefln("< %s", cast(string)b[0..read]); 691 } 692 debug(requests) tracef("read: %d", read); 693 if ( b[0..read].canFind("\r\n\r\n") || b[0..read].canFind("\n\n") ) { 694 debug(requests) tracef("proxy connection ready"); 695 // convert connection to ssl 696 stream = new SSLStream(stream, _sslOptions, _uri.host); 697 break ; 698 } else { 699 debug(requests) tracef("still wait for proxy connection"); 700 } 701 } 702 } else { 703 uri = _uri; 704 stream = new SSLStream(_sslOptions); 705 stream.bind(_bind); 706 stream.connect(uri.host, uri.port, _timeout); 707 debug(requests) tracef("ssl connection to origin server ready"); 708 } 709 break ; 710 } 711 712 return stream; 713 } 714 /// 715 /// Request sent, now receive response. 716 /// Find headers, split on headers and body, continue to receive body 717 /// 718 private void receiveResponse(NetworkStream _stream) { 719 720 try { 721 _stream.readTimeout = timeout; 722 } catch (Exception e) { 723 debug(requests) tracef("Failed to set read timeout for stream: %s", e.msg); 724 return; 725 } 726 // Commented this out as at exit we can have alreade closed socket 727 // scope(exit) { 728 // if ( _stream && _stream.isOpen ) { 729 // _stream.readTimeout = 0.seconds; 730 // } 731 // } 732 733 _bodyDecoder = new DataPipe!ubyte(); 734 scope(exit) { 735 if ( !_useStreaming ) { 736 _bodyDecoder = null; 737 _unChunker = null; 738 } 739 } 740 741 auto buffer = Buffer!ubyte(); 742 Buffer!ubyte partialBody; 743 ptrdiff_t read; 744 string separator; 745 746 while(true) { 747 748 auto b = new ubyte[_bufferSize]; 749 read = _stream.receive(b); 750 751 debug(requests) tracef("read: %d", read); 752 if ( read == 0 ) { 753 break; 754 } 755 auto data = b[0..read]; 756 buffer.putNoCopy(data); 757 if ( verbosity>=3 ) { 758 writeln(data.dump.join("\n")); 759 } 760 761 if ( buffer.length > maxHeadersLength ) { 762 throw new RequestException("Headers length > maxHeadersLength (%d > %d)".format(buffer.length, maxHeadersLength)); 763 } 764 if ( headersHaveBeenReceived(data, buffer, separator) ) { 765 auto s = buffer.data.findSplit(separator); 766 auto ResponseHeaders = s[0]; 767 partialBody = Buffer!ubyte(s[2]); 768 _contentReceived += partialBody.length; 769 parseResponseHeaders(ResponseHeaders); 770 break; 771 } 772 } 773 774 analyzeHeaders(_response._responseHeaders); 775 776 _bodyDecoder.putNoCopy(partialBody.data); 777 778 auto v = _bodyDecoder.get(); 779 _response._responseBody.putNoCopy(v); 780 781 if ( _verbosity >= 2 ) writefln("< %d bytes of body received", partialBody.length); 782 783 if ( _method == "HEAD" ) { 784 // HEAD response have ContentLength, but have no body 785 return; 786 } 787 788 while( true ) { 789 if ( _contentLength >= 0 && _contentReceived >= _contentLength ) { 790 debug(requests) trace("Body received."); 791 break; 792 } 793 if ( _unChunker && _unChunker.done ) { 794 break; 795 } 796 797 if ( _useStreaming && _response._responseBody.length && !redirectCodes.canFind(_response.code) ) { 798 debug(requests) trace("streaming requested"); 799 // save _stream in closure 800 auto __stream = _stream; 801 auto __bodyDecoder = _bodyDecoder; 802 auto __unChunker = _unChunker; 803 auto __contentReceived = _contentReceived; 804 auto __contentLength = _contentLength; 805 auto __bufferSize = _bufferSize; 806 auto __response = _response; 807 808 // set up response 809 _response._contentLength = _contentLength; 810 _response.receiveAsRange.activated = true; 811 _response.receiveAsRange.data = _response._responseBody.data; 812 _response.receiveAsRange.read = delegate ubyte[] () { 813 814 while(true) { 815 // check if we received everything we need 816 if ( ( __unChunker && __unChunker.done ) 817 || !__stream.isConnected() 818 || (__contentLength > 0 && __contentReceived >= __contentLength) ) 819 { 820 debug(requests) trace("streaming_in receive completed"); 821 __bodyDecoder.flush(); 822 return __bodyDecoder.get(); 823 } 824 // have to continue 825 auto b = new ubyte[__bufferSize]; 826 try { 827 read = __stream.receive(b); 828 } 829 catch (Exception e) { 830 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 831 } 832 debug(requests) tracef("streaming_in received %d bytes", read); 833 834 if ( read == 0 ) { 835 debug(requests) tracef("streaming_in: server closed connection"); 836 __bodyDecoder.flush(); 837 return __bodyDecoder.get(); 838 } 839 840 if ( verbosity>=3 ) { 841 writeln(b[0..read].dump.join("\n")); 842 } 843 __response._contentReceived += read; 844 __contentReceived += read; 845 __bodyDecoder.putNoCopy(b[0..read]); 846 auto res = __bodyDecoder.getNoCopy(); 847 if ( res.length == 0 ) { 848 // there were nothing to produce (beginning of the chunk or no decompressed data) 849 continue; 850 } 851 if (res.length == 1) { 852 return res[0]; 853 } 854 // 855 // I'd like to "return _bodyDecoder.getNoCopy().join;" but it is slower 856 // 857 auto total = res.map!(b=>b.length).sum; 858 // create buffer for joined bytes 859 ubyte[] joined = new ubyte[total]; 860 size_t p; 861 // memcopy 862 foreach(ref _; res) { 863 joined[p .. p + _.length] = _; 864 p += _.length; 865 } 866 return joined; 867 } 868 assert(0); 869 }; 870 // we prepared for streaming 871 return; 872 } 873 874 auto b = new ubyte[_bufferSize]; 875 read = _stream.receive(b); 876 877 if ( read == 0 ) { 878 debug(requests) trace("read done"); 879 break; 880 } 881 if ( _verbosity >= 2 ) { 882 writefln("< %d bytes of body received", read); 883 } 884 885 if ( verbosity>=3 ) { 886 writeln(b[0..read].dump.join("\n")); 887 } 888 889 debug(requests) tracef("read: %d", read); 890 _contentReceived += read; 891 if ( _maxContentLength && _contentReceived > _maxContentLength ) { 892 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 893 format(_contentLength, _maxContentLength)); 894 } 895 896 _bodyDecoder.putNoCopy(b[0..read]); // send buffer to all decoders 897 898 _bodyDecoder.getNoCopy. // fetch result and place to body 899 each!(b => _response._responseBody.putNoCopy(b)); 900 901 debug(requests) tracef("receivedTotal: %d, contentLength: %d, bodyLength: %d", _contentReceived, _contentLength, _response._responseBody.length); 902 903 } 904 _bodyDecoder.flush(); 905 _response._responseBody.putNoCopy(_bodyDecoder.get()); 906 } 907 /// 908 /// Check that we received anything. 909 /// Server can close previous connection (keepalive or not) 910 /// 911 private bool serverPrematurelyClosedConnection() pure @safe { 912 immutable server_closed_connection = _response._responseHeaders.length == 0 && _response._status_line.length == 0; 913 // debug(requests) tracef("server closed connection = %s (headers.length=%s, status_line.length=%s)", 914 // server_closed_connection, _response._responseHeaders.length, _response._status_line.length); 915 return server_closed_connection; 916 } 917 private bool isIdempotent(in string method) pure @safe nothrow { 918 return ["GET", "HEAD"].canFind(method); 919 } 920 /// 921 /// If we do not want keepalive request, 922 /// or server signalled to close connection, 923 /// then close it 924 /// 925 void close_connection_if_not_keepalive(NetworkStream _stream) { 926 auto connection = "connection" in _response._responseHeaders; 927 if ( !_keepAlive ) { 928 _stream.close(); 929 } else switch(_response._version) { 930 case HTTP11: 931 // HTTP/1.1 defines the "close" connection option for the sender to signal that the connection 932 // will be closed after completion of the response. For example, 933 // Connection: close 934 // in either the request or the response header fields indicates that the connection 935 // SHOULD NOT be considered `persistent' (section 8.1) after the current request/response is complete. 936 // HTTP/1.1 applications that do not support persistent connections MUST include the "close" connection 937 // option in every message. 938 if ( connection && (*connection).toLower.split(",").canFind("close") ) { 939 _stream.close(); 940 } 941 break; 942 default: 943 // for anything else close connection if there is no keep-alive in Connection 944 if ( connection && !(*connection).toLower.split(",").canFind("keep-alive") ) { 945 _stream.close(); 946 } 947 break; 948 } 949 } 950 /// 951 /// Send multipart for request. 952 /// You would like to use this method for sending large portions of mixed data or uploading files to forms. 953 /// Content of the posted form consist of sources. Each source have at least name and value (can be string-like object or opened file, see more docs for MultipartForm struct) 954 /// Params: 955 /// url = url 956 /// sources = array of sources. 957 HTTPResponse exec(string method="POST")(string url, MultipartForm sources) { 958 import std.uuid; 959 import std.file; 960 961 checkURL(url); 962 //if ( _cm is null ) { 963 // _cm = new ConnManager(); 964 //} 965 966 NetworkStream _stream; 967 _method = method; 968 _response = new HTTPResponse; 969 _response.uri = _uri; 970 _response.finalURI = _uri; 971 bool restartedRequest = false; 972 973 connect: 974 _contentReceived = 0; 975 _response._startedAt = Clock.currTime; 976 977 assert(_stream is null); 978 979 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 980 981 if ( _stream is null ) { 982 debug(requests) trace("create new connection"); 983 _stream = setupConnection(); 984 } else { 985 debug(requests) trace("reuse old connection"); 986 } 987 988 assert(_stream !is null); 989 990 if ( !_stream.isConnected ) { 991 debug(requests) trace("disconnected stream on enter"); 992 if ( !restartedRequest ) { 993 debug(requests) trace("disconnected stream on enter: retry"); 994 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 995 996 _cm.del(_uri.scheme, _uri.host, _uri.port); 997 _stream.close(); 998 _stream = null; 999 1000 restartedRequest = true; 1001 goto connect; 1002 } 1003 debug(requests) trace("disconnected stream on enter: return response"); 1004 //_stream = null; 1005 return _response; 1006 } 1007 _response._connectedAt = Clock.currTime; 1008 1009 Appender!string req; 1010 req.put(requestString()); 1011 1012 string boundary = randomUUID().toString; 1013 string[] partHeaders; 1014 size_t contentLength; 1015 1016 foreach(ref part; sources._sources) { 1017 string h = "--" ~ boundary ~ "\r\n"; 1018 string disposition = `form-data; name="%s"`.format(part.name); 1019 string optionals = part. 1020 parameters.byKeyValue(). 1021 filter!(p => p.key!="Content-Type"). 1022 map! (p => "%s=%s".format(p.key, p.value)). 1023 join("; "); 1024 1025 h ~= `Content-Disposition: ` ~ [disposition, optionals].join("; ") ~ "\r\n"; 1026 1027 auto contentType = "Content-Type" in part.parameters; 1028 if ( contentType ) { 1029 h ~= "Content-Type: " ~ *contentType ~ "\r\n"; 1030 } 1031 1032 h ~= "\r\n"; 1033 partHeaders ~= h; 1034 contentLength += h.length + part.input.getSize() + "\r\n".length; 1035 } 1036 contentLength += "--".length + boundary.length + "--\r\n".length; 1037 1038 auto h = requestHeaders(); 1039 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "multipart/form-data; boundary=" ~ boundary); 1040 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(contentLength)); 1041 1042 h.byKeyValue. 1043 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1044 each!(h => req.put(h)); 1045 req.put("\r\n"); 1046 1047 debug(requests) trace(req.data); 1048 if ( _verbosity >= 1 ) req.data.splitLines.each!(a => writeln("> " ~ a)); 1049 1050 try { 1051 _stream.send(req.data()); 1052 foreach(ref source; sources._sources) { 1053 debug(requests) tracef("sending part headers <%s>", partHeaders.front); 1054 _stream.send(partHeaders.front); 1055 partHeaders.popFront; 1056 while (true) { 1057 auto chunk = source.input.read(); 1058 if ( chunk.length <= 0 ) { 1059 break; 1060 } 1061 _stream.send(chunk); 1062 } 1063 _stream.send("\r\n"); 1064 } 1065 _stream.send("--" ~ boundary ~ "--\r\n"); 1066 _response._requestSentAt = Clock.currTime; 1067 receiveResponse(_stream); 1068 _response._finishedAt = Clock.currTime; 1069 } 1070 catch (NetworkException e) { 1071 errorf("Error sending request: ", e.msg); 1072 _stream.close(); 1073 return _response; 1074 } 1075 1076 if ( serverPrematurelyClosedConnection() 1077 && !restartedRequest 1078 && isIdempotent(_method) 1079 ) { 1080 /// 1081 /// We didn't receive any data (keepalive connectioin closed?) 1082 /// and we can restart this request. 1083 /// Go ahead. 1084 /// 1085 debug(requests) tracef("Server closed keepalive connection"); 1086 1087 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1088 1089 _cm.del(_uri.scheme, _uri.host, _uri.port); 1090 _stream.close(); 1091 _stream = null; 1092 1093 restartedRequest = true; 1094 goto connect; 1095 } 1096 1097 if ( _useStreaming ) { 1098 if ( _response._receiveAsRange.activated ) { 1099 debug(requests) trace("streaming_in activated"); 1100 return _response; 1101 } else { 1102 // this can happen if whole response body received together with headers 1103 _response._receiveAsRange.data = _response.responseBody.data; 1104 } 1105 } 1106 1107 close_connection_if_not_keepalive(_stream); 1108 1109 if ( _verbosity >= 1 ) { 1110 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1111 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1112 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1113 } 1114 1115 if ( willFollowRedirect ) { 1116 if ( _history.length >= _maxRedirects ) { 1117 _stream = null; 1118 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1119 } 1120 // "location" in response already checked in canFollowRedirect 1121 immutable new_location = *("location" in _response.responseHeaders); 1122 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1123 1124 // save current response for history 1125 _history ~= _response; 1126 1127 // prepare new response (for redirected request) 1128 _response = new HTTPResponse; 1129 _response.uri = current_uri; 1130 _response.finalURI = next_uri; 1131 _stream = null; 1132 1133 // set new uri 1134 this._uri = next_uri; 1135 debug(requests) tracef("Redirected to %s", next_uri); 1136 if ( _method != "GET" && _response.code != 307 && _response.code != 308 ) { 1137 // 307 and 308 do not change method 1138 return this.get(); 1139 } 1140 if ( restartedRequest ) { 1141 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1142 restartedRequest = false; 1143 } 1144 goto connect; 1145 } 1146 1147 _response._history = _history; 1148 return _response; 1149 } 1150 1151 // we use this if we send from ubyte[][] and user provided Content-Length 1152 private void sendFlattenContent(T)(NetworkStream _stream, T content) { 1153 while ( !content.empty ) { 1154 auto chunk = content.front; 1155 _stream.send(chunk); 1156 content.popFront; 1157 } 1158 debug(requests) tracef("sent"); 1159 } 1160 // we use this if we send from ubyte[][] as chunked content 1161 private void sendChunkedContent(T)(NetworkStream _stream, T content) { 1162 while ( !content.empty ) { 1163 auto chunk = content.front; 1164 auto chunkHeader = "%x\r\n".format(chunk.length); 1165 debug(requests) tracef("sending %s%s", chunkHeader, chunk); 1166 _stream.send(chunkHeader); 1167 _stream.send(chunk); 1168 _stream.send("\r\n"); 1169 content.popFront; 1170 } 1171 debug(requests) tracef("sent"); 1172 _stream.send("0\r\n\r\n"); 1173 } 1174 /// 1175 /// POST/PUT/... data from some string(with Content-Length), or from range of strings/bytes (use Transfer-Encoding: chunked). 1176 /// When rank 1 (flat array) used as content it must have length. In that case "content" will be sent directly to network, and Content-Length headers will be added. 1177 /// If you are goung to send some range and do not know length at the moment when you start to send request, then you can send chunks of chars or ubyte. 1178 /// Try not to send too short chunks as this will put additional load on client and server. Chunks of length 2048 or 4096 are ok. 1179 /// 1180 /// Parameters: 1181 /// url = url 1182 /// content = string or input range 1183 /// contentType = content type 1184 /// Returns: 1185 /// Response 1186 /// Examples: 1187 /// --------------------------------------------------------------------------------------------------------- 1188 /// rs = rq.exec!"POST"("http://httpbin.org/post", "привiт, свiт!", "application/octet-stream"); 1189 /// 1190 /// auto s = lineSplitter("one,\ntwo,\nthree."); 1191 /// rs = rq.exec!"POST"("http://httpbin.org/post", s, "application/octet-stream"); 1192 /// 1193 /// auto s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; 1194 /// rs = rq.exec!"POST"("http://httpbin.org/post", s.representation.chunks(10), "application/octet-stream"); 1195 /// 1196 /// auto f = File("tests/test.txt", "rb"); 1197 /// rs = rq.exec!"POST"("http://httpbin.org/post", f.byChunk(3), "application/octet-stream"); 1198 /// -------------------------------------------------------------------------------------------------------- 1199 deprecated("Use Request() instead of HTTPRequest(); will be removed 2019-07") 1200 HTTPResponse exec(string method="POST", R)(string url, R content, string contentType="application/octet-stream") 1201 if ( (rank!R == 1) 1202 || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 1203 || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte))) 1204 ) 1205 do { 1206 debug(requests) tracef("started url=%s, this._uri=%s", url, _uri); 1207 1208 checkURL(url); 1209 //if ( _cm is null ) { 1210 // _cm = new ConnManager(); 1211 //} 1212 1213 NetworkStream _stream; 1214 _method = method; 1215 _response = new HTTPResponse; 1216 _history.length = 0; 1217 _response.uri = _uri; 1218 _response.finalURI = _uri; 1219 bool restartedRequest = false; 1220 bool send_flat; 1221 1222 connect: 1223 _contentReceived = 0; 1224 _response._startedAt = Clock.currTime; 1225 1226 assert(_stream is null); 1227 1228 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1229 1230 if ( _stream is null ) { 1231 debug(requests) trace("create new connection"); 1232 _stream = setupConnection(); 1233 } else { 1234 debug(requests) trace("reuse old connection"); 1235 } 1236 1237 assert(_stream !is null); 1238 1239 if ( !_stream.isConnected ) { 1240 debug(requests) trace("disconnected stream on enter"); 1241 if ( !restartedRequest ) { 1242 debug(requests) trace("disconnected stream on enter: retry"); 1243 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1244 1245 _cm.del(_uri.scheme, _uri.host, _uri.port); 1246 _stream.close(); 1247 _stream = null; 1248 1249 restartedRequest = true; 1250 goto connect; 1251 } 1252 debug(requests) trace("disconnected stream on enter: return response"); 1253 //_stream = null; 1254 return _response; 1255 } 1256 _response._connectedAt = Clock.currTime; 1257 1258 Appender!string req; 1259 req.put(requestString()); 1260 1261 auto h = requestHeaders; 1262 if ( contentType ) { 1263 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", contentType); 1264 } 1265 static if ( rank!R == 1 ) { 1266 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(content.length)); 1267 } else { 1268 if ( _userHeaders.ContentLength ) { 1269 debug(requests) tracef("User provided content-length for chunked content"); 1270 send_flat = true; 1271 } else { 1272 h["Transfer-Encoding"] = "chunked"; 1273 send_flat = false; 1274 } 1275 } 1276 h.byKeyValue. 1277 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1278 each!(h => req.put(h)); 1279 req.put("\r\n"); 1280 1281 debug(requests) trace(req.data); 1282 if ( _verbosity >= 1 ) { 1283 req.data.splitLines.each!(a => writeln("> " ~ a)); 1284 } 1285 1286 try { 1287 // send headers 1288 _stream.send(req.data()); 1289 // send body 1290 static if ( rank!R == 1) { 1291 _stream.send(content); 1292 } else { 1293 if ( send_flat ) { 1294 sendFlattenContent(_stream, content); 1295 } else { 1296 sendChunkedContent(_stream, content); 1297 } 1298 } 1299 _response._requestSentAt = Clock.currTime; 1300 debug(requests) trace("starting receive response"); 1301 receiveResponse(_stream); 1302 debug(requests) trace("finished receive response"); 1303 _response._finishedAt = Clock.currTime; 1304 } catch (NetworkException e) { 1305 _stream.close(); 1306 throw new RequestException("Network error during data exchange"); 1307 } 1308 1309 if ( serverPrematurelyClosedConnection() 1310 && !restartedRequest 1311 && isIdempotent(_method) 1312 ) { 1313 /// 1314 /// We didn't receive any data (keepalive connectioin closed?) 1315 /// and we can restart this request. 1316 /// Go ahead. 1317 /// 1318 debug(requests) tracef("Server closed keepalive connection"); 1319 1320 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1321 1322 _cm.del(_uri.scheme, _uri.host, _uri.port); 1323 _stream.close(); 1324 _stream = null; 1325 1326 restartedRequest = true; 1327 goto connect; 1328 } 1329 1330 if ( _useStreaming ) { 1331 if ( _response._receiveAsRange.activated ) { 1332 debug(requests) trace("streaming_in activated"); 1333 return _response; 1334 } else { 1335 // this can happen if whole response body received together with headers 1336 _response._receiveAsRange.data = _response.responseBody.data; 1337 } 1338 } 1339 1340 close_connection_if_not_keepalive(_stream); 1341 1342 if ( _verbosity >= 1 ) { 1343 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1344 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1345 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1346 } 1347 1348 1349 if ( willFollowRedirect ) { 1350 if ( _history.length >= _maxRedirects ) { 1351 _stream = null; 1352 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1353 } 1354 // "location" in response already checked in canFollowRedirect 1355 immutable new_location = *("location" in _response.responseHeaders); 1356 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1357 1358 // save current response for history 1359 _history ~= _response; 1360 1361 // prepare new response (for redirected request) 1362 _response = new HTTPResponse; 1363 _response.uri = current_uri; 1364 _response.finalURI = next_uri; 1365 1366 _stream = null; 1367 1368 // set new uri 1369 this._uri = next_uri; 1370 debug(requests) tracef("Redirected to %s", next_uri); 1371 if ( _method != "GET" && _response.code != 307 && _response.code != 308 ) { 1372 // 307 and 308 do not change method 1373 return this.get(); 1374 } 1375 if ( restartedRequest ) { 1376 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1377 restartedRequest = false; 1378 } 1379 goto connect; 1380 } 1381 1382 _response._history = _history; 1383 return _response; 1384 } 1385 /// 1386 /// Send request with parameters. 1387 /// If used for POST or PUT requests then application/x-www-form-urlencoded used. 1388 /// Request parameters will be encoded into request string or placed in request body for POST/PUT 1389 /// requests. 1390 /// Parameters: 1391 /// url = url 1392 /// params = request parameters 1393 /// Returns: 1394 /// Response 1395 /// Examples: 1396 /// --------------------------------------------------------------------------------- 1397 /// rs = Request().exec!"GET"("http://httpbin.org/get", ["c":"d", "a":"b"]); 1398 /// --------------------------------------------------------------------------------- 1399 /// 1400 deprecated("Use Request() instead of HTTPRequest; will be removed 2019-07") 1401 HTTPResponse exec(string method="GET")(string url = null, QueryParam[] params = null) 1402 do { 1403 debug(requests) tracef("started url=%s, this._uri=%s", url, _uri); 1404 1405 checkURL(url); 1406 //if ( _cm is null ) { 1407 // _cm = new ConnManager(); 1408 //} 1409 1410 NetworkStream _stream; 1411 _method = method; 1412 _response = new HTTPResponse; 1413 _history.length = 0; 1414 _response.uri = _uri; 1415 _response.finalURI = _uri; 1416 bool restartedRequest = false; // True if this is restarted keepAlive request 1417 1418 connect: 1419 if ( _method == "GET" && _uri in _permanent_redirects ) { 1420 debug(requests) trace("use parmanent redirects cache"); 1421 _uri = uriFromLocation(_uri, _permanent_redirects[_uri]); 1422 _response._finalURI = _uri; 1423 } 1424 _contentReceived = 0; 1425 _response._startedAt = Clock.currTime; 1426 1427 assert(_stream is null); 1428 1429 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1430 1431 if ( _stream is null ) { 1432 debug(requests) trace("create new connection"); 1433 _stream = setupConnection(); 1434 } else { 1435 debug(requests) trace("reuse old connection"); 1436 } 1437 1438 assert(_stream !is null); 1439 1440 if ( !_stream.isConnected ) { 1441 debug(requests) trace("disconnected stream on enter"); 1442 if ( !restartedRequest ) { 1443 debug(requests) trace("disconnected stream on enter: retry"); 1444 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1445 1446 _cm.del(_uri.scheme, _uri.host, _uri.port); 1447 _stream.close(); 1448 _stream = null; 1449 1450 restartedRequest = true; 1451 goto connect; 1452 } 1453 debug(requests) trace("disconnected stream on enter: return response"); 1454 //_stream = null; 1455 return _response; 1456 } 1457 _response._connectedAt = Clock.currTime; 1458 1459 auto h = requestHeaders(); 1460 1461 Appender!string req; 1462 1463 string encoded; 1464 1465 switch (_method) { 1466 case "POST","PUT": 1467 encoded = params2query(params); 1468 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "application/x-www-form-urlencoded"); 1469 if ( encoded.length > 0) { 1470 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(encoded.length)); 1471 } 1472 req.put(requestString()); 1473 break; 1474 default: 1475 req.put(requestString(params)); 1476 } 1477 1478 h.byKeyValue. 1479 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1480 each!(h => req.put(h)); 1481 req.put("\r\n"); 1482 if ( encoded ) { 1483 req.put(encoded); 1484 } 1485 1486 debug(requests) trace(req.data); 1487 if ( _verbosity >= 1 ) { 1488 req.data.splitLines.each!(a => writeln("> " ~ a)); 1489 } 1490 // 1491 // Now send request and receive response 1492 // 1493 try { 1494 _stream.send(req.data()); 1495 _response._requestSentAt = Clock.currTime; 1496 debug(requests) trace("starting receive response"); 1497 receiveResponse(_stream); 1498 debug(requests) trace("done receive response"); 1499 _response._finishedAt = Clock.currTime; 1500 } 1501 catch (NetworkException e) { 1502 // On SEND this can means: 1503 // we started to send request to the server, but it closed connection because of keepalive timeout. 1504 // We have to restart request if possible. 1505 1506 // On RECEIVE - if we received something - then this exception is real and unexpected error. 1507 // If we didn't receive anything - we can restart request again as it can be 1508 debug(requests) tracef("Exception on receive response: %s", e.msg); 1509 if ( _response._responseHeaders.length != 0 ) 1510 { 1511 _stream.close(); 1512 throw new RequestException("Unexpected network error"); 1513 } 1514 } 1515 1516 if ( serverPrematurelyClosedConnection() 1517 && !restartedRequest 1518 && isIdempotent(_method) 1519 ) { 1520 /// 1521 /// We didn't receive any data (keepalive connectioin closed?) 1522 /// and we can restart this request. 1523 /// Go ahead. 1524 /// 1525 debug(requests) tracef("Server closed keepalive connection"); 1526 1527 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1528 1529 _cm.del(_uri.scheme, _uri.host, _uri.port); 1530 _stream.close(); 1531 _stream = null; 1532 1533 restartedRequest = true; 1534 goto connect; 1535 } 1536 1537 if ( _useStreaming ) { 1538 if ( _response._receiveAsRange.activated ) { 1539 debug(requests) trace("streaming_in activated"); 1540 return _response; 1541 } else { 1542 // this can happen if whole response body received together with headers 1543 _response._receiveAsRange.data = _response.responseBody.data; 1544 } 1545 } 1546 1547 close_connection_if_not_keepalive(_stream); 1548 1549 if ( _verbosity >= 1 ) { 1550 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1551 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1552 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1553 } 1554 1555 if ( willFollowRedirect ) { 1556 debug(requests) trace("going to follow redirect"); 1557 if ( _history.length >= _maxRedirects ) { 1558 _stream = null; 1559 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1560 } 1561 // "location" in response already checked in canFollowRedirect 1562 immutable new_location = *("location" in _response.responseHeaders); 1563 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1564 1565 if ( _method == "GET" && _response.code == 301 ) { 1566 _permanent_redirects[_uri] = new_location; 1567 } 1568 1569 // save current response for history 1570 _history ~= _response; 1571 1572 // prepare new response (for redirected request) 1573 _response = new HTTPResponse; 1574 _response.uri = current_uri; 1575 _response.finalURI = next_uri; 1576 _stream = null; 1577 1578 // set new uri 1579 _uri = next_uri; 1580 debug(requests) tracef("Redirected to %s", next_uri); 1581 if ( _method != "GET" && _response.code != 307 && _response.code != 308 ) { 1582 // 307 and 308 do not change method 1583 return this.get(); 1584 } 1585 if ( restartedRequest ) { 1586 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1587 restartedRequest = false; 1588 } 1589 goto connect; 1590 } 1591 1592 _response._history = _history; 1593 return _response; 1594 } 1595 1596 /// WRAPPERS 1597 /// 1598 /// send file(s) using POST and multipart form. 1599 /// This wrapper will be deprecated, use post with MultipartForm - it is more general and clear. 1600 /// Parameters: 1601 /// url = url 1602 /// files = array of PostFile structures 1603 /// Returns: 1604 /// Response 1605 /// Each PostFile structure contain path to file, and optional field name and content type. 1606 /// If no field name provided, then basename of the file will be used. 1607 /// application/octet-stream is default when no content type provided. 1608 /// Example: 1609 /// --------------------------------------------------------------- 1610 /// PostFile[] files = [ 1611 /// {fileName:"tests/abc.txt", fieldName:"abc", contentType:"application/octet-stream"}, 1612 /// {fileName:"tests/test.txt"} 1613 /// ]; 1614 /// rs = rq.exec!"POST"("http://httpbin.org/post", files); 1615 /// --------------------------------------------------------------- 1616 /// 1617 deprecated("Use Request() instead of HTTPRequest(); will be removed 2019-07") 1618 HTTPResponse exec(string method="POST")(string url, PostFile[] files) if (method=="POST") { 1619 MultipartForm multipart; 1620 File[] toClose; 1621 foreach(ref f; files) { 1622 File file = File(f.fileName, "rb"); 1623 toClose ~= file; 1624 string fileName = f.fileName ? f.fileName : f.fieldName; 1625 string contentType = f.contentType ? f.contentType : "application/octetstream"; 1626 multipart.add(f.fieldName, new FormDataFile(file), ["filename":fileName, "Content-Type": contentType]); 1627 } 1628 auto res = exec!"POST"(url, multipart); 1629 toClose.each!"a.close"; 1630 return res; 1631 } 1632 /// 1633 /// exec request with parameters when you can use dictionary (when you have no duplicates in parameter names) 1634 /// Consider switch to exec(url, QueryParams) as it more generic and clear. 1635 /// Parameters: 1636 /// url = url 1637 /// params = dictionary with field names as keys and field values as values. 1638 /// Returns: 1639 /// Response 1640 deprecated("Use Request() instead of HTTPRequest(); will be removed 2019-07") 1641 HTTPResponse exec(string method="GET")(string url, string[string] params) { 1642 return exec!method(url, params.byKeyValue.map!(p => QueryParam(p.key, p.value)).array); 1643 } 1644 /// 1645 /// GET request. Simple wrapper over exec!"GET" 1646 /// Params: 1647 /// args = request parameters. see exec docs. 1648 /// 1649 deprecated("Use Request() instead of HTTPRequest; will be removed 2019-07") 1650 HTTPResponse get(A...)(A args) { 1651 return exec!"GET"(args); 1652 } 1653 /// 1654 /// POST request. Simple wrapper over exec!"POST" 1655 /// Params: 1656 /// uri = endpoint uri 1657 /// args = request parameters. see exec docs. 1658 /// 1659 deprecated("Use Request() instead of HTTPRequest; will be removed 2019-07") 1660 HTTPResponse post(A...)(string uri, A args) { 1661 return exec!"POST"(uri, args); 1662 } 1663 // XXX interceptors 1664 import requests.request; 1665 1666 // we use this if we send from ubyte[][] and user provided Content-Length 1667 private void sendFlattenContent(NetworkStream _stream) { 1668 while ( !_postData.empty ) { 1669 auto chunk = _postData.front; 1670 _stream.send(chunk); 1671 _postData.popFront; 1672 } 1673 debug(requests) tracef("sent"); 1674 } 1675 // we use this if we send from ubyte[][] as chunked content 1676 private void sendChunkedContent(NetworkStream _stream) { 1677 while ( !_postData.empty ) { 1678 auto chunk = _postData.front; 1679 auto chunkHeader = "%x\r\n".format(chunk.length); 1680 debug(requests) tracef("sending %s%s", chunkHeader, cast(string)chunk); 1681 _stream.send(chunkHeader); 1682 _stream.send(chunk); 1683 _stream.send("\r\n"); 1684 debug(requests) tracef("chunk sent"); 1685 _postData.popFront; 1686 } 1687 debug(requests) tracef("sent"); 1688 _stream.send("0\r\n\r\n"); 1689 } 1690 1691 HTTPResponse exec_from_range(InputRangeAdapter postData) 1692 do { 1693 1694 _postData = postData; 1695 1696 debug(requests) tracef("exec from range"); 1697 1698 NetworkStream _stream; 1699 _response = new HTTPResponse; 1700 _history.length = 0; 1701 _response.uri = _uri; 1702 _response.finalURI = _uri; 1703 bool restartedRequest = false; 1704 bool send_flat; 1705 1706 connect: 1707 _contentReceived = 0; 1708 _response._startedAt = Clock.currTime; 1709 1710 assert(_stream is null); 1711 1712 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1713 1714 if ( _stream is null ) { 1715 debug(requests) trace("create new connection"); 1716 _stream = setupConnection(); 1717 } else { 1718 debug(requests) trace("reuse old connection"); 1719 } 1720 1721 assert(_stream !is null); 1722 1723 if ( !_stream.isConnected ) { 1724 debug(requests) trace("disconnected stream on enter"); 1725 if ( !restartedRequest ) { 1726 debug(requests) trace("disconnected stream on enter: retry"); 1727 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1728 1729 _cm.del(_uri.scheme, _uri.host, _uri.port); 1730 _stream.close(); 1731 _stream = null; 1732 1733 restartedRequest = true; 1734 goto connect; 1735 } 1736 debug(requests) trace("disconnected stream on enter: return response"); 1737 //_stream = null; 1738 return _response; 1739 } 1740 _response._connectedAt = Clock.currTime; 1741 1742 Appender!string req; 1743 req.put(requestString()); 1744 1745 auto h = requestHeaders; 1746 if ( _contentType ) { 1747 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", _contentType); 1748 } 1749 1750 if ( _postData.length >= 0 ) 1751 { 1752 // we know t 1753 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(_postData.length)); 1754 } 1755 1756 if ( _userHeaders.ContentLength || "Content-Length" in h ) 1757 { 1758 debug(requests) tracef("User provided content-length for chunked content"); 1759 send_flat = true; 1760 } 1761 else 1762 { 1763 h["Transfer-Encoding"] = "chunked"; 1764 send_flat = false; 1765 } 1766 h.byKeyValue. 1767 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1768 each!(h => req.put(h)); 1769 req.put("\r\n"); 1770 1771 debug(requests) tracef("send <%s>", req.data); 1772 if ( _verbosity >= 1 ) { 1773 req.data.splitLines.each!(a => writeln("> " ~ a)); 1774 } 1775 1776 try { 1777 // send headers 1778 _stream.send(req.data()); 1779 // send body 1780 //static if ( rank!R == 1) { 1781 // _stream.send(content); 1782 //} else { 1783 if ( send_flat ) { 1784 sendFlattenContent(_stream); 1785 } else { 1786 sendChunkedContent(_stream); 1787 } 1788 //} 1789 _response._requestSentAt = Clock.currTime; 1790 debug(requests) trace("starting receive response"); 1791 receiveResponse(_stream); 1792 debug(requests) trace("finished receive response"); 1793 _response._finishedAt = Clock.currTime; 1794 } 1795 catch (NetworkException e) 1796 { 1797 _stream.close(); 1798 throw new RequestException("Network error during data exchange"); 1799 } 1800 if ( serverPrematurelyClosedConnection() 1801 && !restartedRequest 1802 && isIdempotent(_method) 1803 ) { 1804 /// 1805 /// We didn't receive any data (keepalive connectioin closed?) 1806 /// and we can restart this request. 1807 /// Go ahead. 1808 /// 1809 debug(requests) tracef("Server closed keepalive connection"); 1810 1811 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1812 1813 _cm.del(_uri.scheme, _uri.host, _uri.port); 1814 _stream.close(); 1815 _stream = null; 1816 1817 restartedRequest = true; 1818 goto connect; 1819 } 1820 1821 if ( _useStreaming ) { 1822 if ( _response._receiveAsRange.activated ) { 1823 debug(requests) trace("streaming_in activated"); 1824 return _response; 1825 } else { 1826 // this can happen if whole response body received together with headers 1827 _response._receiveAsRange.data = _response.responseBody.data; 1828 } 1829 } 1830 1831 close_connection_if_not_keepalive(_stream); 1832 1833 if ( _verbosity >= 1 ) { 1834 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1835 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1836 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1837 } 1838 1839 1840 if ( willFollowRedirect ) { 1841 if ( _history.length >= _maxRedirects ) { 1842 _stream = null; 1843 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1844 } 1845 // "location" in response already checked in canFollowRedirect 1846 immutable new_location = *("location" in _response.responseHeaders); 1847 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1848 1849 immutable get_or_head = _method == "GET" || _method == "HEAD"; 1850 immutable code = _response.code; 1851 1852 // save current response for history 1853 _history ~= _response; 1854 1855 if ( code == 301 ) 1856 { 1857 // permanent redirect and change method 1858 _permanent_redirects[_uri] = new_location; 1859 if ( !get_or_head ) 1860 { 1861 _method = "GET"; 1862 } 1863 } 1864 if ( (code == 302 || code == 303) && !get_or_head) 1865 { 1866 // only change method 1867 _method = "GET"; 1868 } 1869 if ( code == 307 ) 1870 { 1871 // no change method, no permanent 1872 } 1873 if ( code == 308 ) 1874 { 1875 // permanent redirection and do not change method 1876 _permanent_redirects[_uri] = new_location; 1877 } 1878 1879 // prepare new response (for redirected request) 1880 _response = new HTTPResponse; 1881 _response.uri = current_uri; 1882 _response.finalURI = next_uri; 1883 1884 _stream = null; 1885 1886 // set new uri 1887 this._uri = next_uri; 1888 debug(requests) tracef("Redirected to %s", next_uri); 1889 if ( restartedRequest ) { 1890 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1891 restartedRequest = false; 1892 } 1893 if ( _method == "GET") 1894 { 1895 return exec_from_parameters(); 1896 } 1897 goto connect; 1898 } 1899 1900 _response._history = _history; 1901 return _response; 1902 } 1903 1904 HTTPResponse exec_from_multipart_form(MultipartForm form) { 1905 import std.uuid; 1906 import std.file; 1907 1908 _multipartForm = form; 1909 1910 debug(requests) tracef("exec from multipart form"); 1911 1912 NetworkStream _stream; 1913 _response = new HTTPResponse; 1914 _response.uri = _uri; 1915 _response.finalURI = _uri; 1916 bool restartedRequest = false; 1917 1918 connect: 1919 _contentReceived = 0; 1920 _response._startedAt = Clock.currTime; 1921 1922 assert(_stream is null); 1923 1924 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1925 1926 if ( _stream is null ) { 1927 debug(requests) trace("create new connection"); 1928 _stream = setupConnection(); 1929 } else { 1930 debug(requests) trace("reuse old connection"); 1931 } 1932 1933 assert(_stream !is null); 1934 1935 if ( !_stream.isConnected ) { 1936 debug(requests) trace("disconnected stream on enter"); 1937 if ( !restartedRequest ) { 1938 debug(requests) trace("disconnected stream on enter: retry"); 1939 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1940 1941 _cm.del(_uri.scheme, _uri.host, _uri.port); 1942 _stream.close(); 1943 _stream = null; 1944 1945 restartedRequest = true; 1946 goto connect; 1947 } 1948 debug(requests) trace("disconnected stream on enter: return response"); 1949 //_stream = null; 1950 return _response; 1951 } 1952 _response._connectedAt = Clock.currTime; 1953 1954 Appender!string req; 1955 req.put(requestString()); 1956 1957 string boundary = randomUUID().toString; 1958 string[] partHeaders; 1959 size_t contentLength; 1960 1961 foreach(ref part; _multipartForm._sources) { 1962 string h = "--" ~ boundary ~ "\r\n"; 1963 string disposition = `form-data; name="%s"`.format(part.name); 1964 string optionals = part. 1965 parameters.byKeyValue(). 1966 filter!(p => p.key!="Content-Type"). 1967 map! (p => "%s=%s".format(p.key, p.value)). 1968 join("; "); 1969 1970 h ~= `Content-Disposition: ` ~ [disposition, optionals].join("; ") ~ "\r\n"; 1971 1972 auto contentType = "Content-Type" in part.parameters; 1973 if ( contentType ) { 1974 h ~= "Content-Type: " ~ *contentType ~ "\r\n"; 1975 } 1976 1977 h ~= "\r\n"; 1978 partHeaders ~= h; 1979 contentLength += h.length + part.input.getSize() + "\r\n".length; 1980 } 1981 contentLength += "--".length + boundary.length + "--\r\n".length; 1982 1983 auto h = requestHeaders(); 1984 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "multipart/form-data; boundary=" ~ boundary); 1985 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(contentLength)); 1986 1987 h.byKeyValue. 1988 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1989 each!(h => req.put(h)); 1990 req.put("\r\n"); 1991 1992 debug(requests) trace(req.data); 1993 if ( _verbosity >= 1 ) req.data.splitLines.each!(a => writeln("> " ~ a)); 1994 1995 try { 1996 _stream.send(req.data()); 1997 foreach(ref source; _multipartForm._sources) { 1998 debug(requests) tracef("sending part headers <%s>", partHeaders.front); 1999 _stream.send(partHeaders.front); 2000 partHeaders.popFront; 2001 while (true) { 2002 auto chunk = source.input.read(); 2003 if ( chunk.length <= 0 ) { 2004 break; 2005 } 2006 _stream.send(chunk); 2007 } 2008 _stream.send("\r\n"); 2009 } 2010 _stream.send("--" ~ boundary ~ "--\r\n"); 2011 _response._requestSentAt = Clock.currTime; 2012 receiveResponse(_stream); 2013 _response._finishedAt = Clock.currTime; 2014 } 2015 catch (NetworkException e) { 2016 errorf("Error sending request: ", e.msg); 2017 _stream.close(); 2018 return _response; 2019 } 2020 2021 if ( serverPrematurelyClosedConnection() 2022 && !restartedRequest 2023 && isIdempotent(_method) 2024 ) { 2025 /// 2026 /// We didn't receive any data (keepalive connectioin closed?) 2027 /// and we can restart this request. 2028 /// Go ahead. 2029 /// 2030 debug(requests) tracef("Server closed keepalive connection"); 2031 2032 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 2033 2034 _cm.del(_uri.scheme, _uri.host, _uri.port); 2035 _stream.close(); 2036 _stream = null; 2037 2038 restartedRequest = true; 2039 goto connect; 2040 } 2041 2042 if ( _useStreaming ) { 2043 if ( _response._receiveAsRange.activated ) { 2044 debug(requests) trace("streaming_in activated"); 2045 return _response; 2046 } else { 2047 // this can happen if whole response body received together with headers 2048 _response._receiveAsRange.data = _response.responseBody.data; 2049 } 2050 } 2051 2052 close_connection_if_not_keepalive(_stream); 2053 2054 if ( _verbosity >= 1 ) { 2055 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 2056 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 2057 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 2058 } 2059 2060 if ( willFollowRedirect ) { 2061 if ( _history.length >= _maxRedirects ) { 2062 _stream = null; 2063 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 2064 } 2065 // "location" in response already checked in canFollowRedirect 2066 immutable new_location = *("location" in _response.responseHeaders); 2067 immutable current_uri = _uri; 2068 immutable next_uri = uriFromLocation(_uri, new_location); 2069 2070 immutable get_or_head = _method == "GET" || _method == "HEAD"; 2071 immutable code = _response.code; 2072 2073 // save current response for history 2074 _history ~= _response; 2075 2076 if ( code == 301 ) 2077 { 2078 // permanent redirect and change method 2079 _permanent_redirects[_uri] = new_location; 2080 if ( !get_or_head ) 2081 { 2082 _method = "GET"; 2083 } 2084 } 2085 if ( (code == 302 || code == 303) && !get_or_head) 2086 { 2087 // only change method 2088 _method = "GET"; 2089 } 2090 if ( code == 307 ) 2091 { 2092 // no change method, no permanent 2093 } 2094 if ( code == 308 ) 2095 { 2096 // permanent redirection and do not change method 2097 _permanent_redirects[_uri] = new_location; 2098 } 2099 2100 // prepare new response (for redirected request) 2101 _response = new HTTPResponse; 2102 _response.uri = current_uri; 2103 _response.finalURI = next_uri; 2104 _stream = null; 2105 2106 // set new uri 2107 this._uri = next_uri; 2108 debug(requests) tracef("Redirected to %s", next_uri); 2109 if ( restartedRequest ) { 2110 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 2111 restartedRequest = false; 2112 } 2113 if ( _method == "GET") 2114 { 2115 return exec_from_parameters(); 2116 } 2117 goto connect; 2118 } 2119 2120 _response._history = _history; 2121 return _response; 2122 } 2123 2124 HTTPResponse exec_from_parameters() { 2125 2126 debug(requests) tracef("exec from parameters request"); 2127 2128 assert(_uri != URI.init); 2129 NetworkStream _stream; 2130 _response = new HTTPResponse; 2131 _history.length = 0; 2132 _response.uri = _uri; 2133 _response.finalURI = _uri; 2134 bool restartedRequest = false; // True if this is restarted keepAlive request 2135 2136 connect: 2137 if ( _method == "GET" && _uri in _permanent_redirects ) { 2138 debug(requests) trace("use parmanent redirects cache"); 2139 _uri = uriFromLocation(_uri, _permanent_redirects[_uri]); 2140 _response._finalURI = _uri; 2141 } 2142 _contentReceived = 0; 2143 _response._startedAt = Clock.currTime; 2144 2145 assert(_stream is null); 2146 2147 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 2148 2149 if ( _stream is null ) { 2150 debug(requests) trace("create new connection"); 2151 _stream = setupConnection(); 2152 } else { 2153 debug(requests) trace("reuse old connection"); 2154 } 2155 2156 assert(_stream !is null); 2157 2158 if ( !_stream.isConnected ) { 2159 debug(requests) trace("disconnected stream on enter"); 2160 if ( !restartedRequest ) { 2161 debug(requests) trace("disconnected stream on enter: retry"); 2162 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 2163 2164 _cm.del(_uri.scheme, _uri.host, _uri.port); 2165 _stream.close(); 2166 _stream = null; 2167 2168 restartedRequest = true; 2169 goto connect; 2170 } 2171 debug(requests) trace("disconnected stream on enter: return response"); 2172 //_stream = null; 2173 return _response; 2174 } 2175 _response._connectedAt = Clock.currTime; 2176 2177 auto h = requestHeaders(); 2178 2179 Appender!string req; 2180 2181 string encoded; 2182 2183 switch (_method) { 2184 case "POST","PUT": 2185 encoded = params2query(_params); 2186 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "application/x-www-form-urlencoded"); 2187 if ( encoded.length > 0) { 2188 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(encoded.length)); 2189 } 2190 req.put(requestString()); 2191 break; 2192 default: 2193 req.put(requestString(_params)); 2194 } 2195 2196 h.byKeyValue. 2197 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 2198 each!(h => req.put(h)); 2199 req.put("\r\n"); 2200 if ( encoded ) { 2201 req.put(encoded); 2202 } 2203 2204 debug(requests) trace(req.data); 2205 if ( _verbosity >= 1 ) { 2206 req.data.splitLines.each!(a => writeln("> " ~ a)); 2207 } 2208 // 2209 // Now send request and receive response 2210 // 2211 try { 2212 _stream.send(req.data()); 2213 _response._requestSentAt = Clock.currTime; 2214 debug(requests) trace("starting receive response"); 2215 receiveResponse(_stream); 2216 debug(requests) tracef("done receive response"); 2217 _response._finishedAt = Clock.currTime; 2218 } 2219 catch (NetworkException e) { 2220 // On SEND this can means: 2221 // we started to send request to the server, but it closed connection because of keepalive timeout. 2222 // We have to restart request if possible. 2223 2224 // On RECEIVE - if we received something - then this exception is real and unexpected error. 2225 // If we didn't receive anything - we can restart request again as it can be 2226 debug(requests) tracef("Exception on receive response: %s", e.msg); 2227 if ( _response._responseHeaders.length != 0 ) 2228 { 2229 _stream.close(); 2230 throw new RequestException("Unexpected network error"); 2231 } 2232 } 2233 2234 if ( serverPrematurelyClosedConnection() 2235 && !restartedRequest 2236 && isIdempotent(_method) 2237 ) { 2238 /// 2239 /// We didn't receive any data (keepalive connectioin closed?) 2240 /// and we can restart this request. 2241 /// Go ahead. 2242 /// 2243 debug(requests) tracef("Server closed keepalive connection"); 2244 2245 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 2246 2247 _cm.del(_uri.scheme, _uri.host, _uri.port); 2248 _stream.close(); 2249 _stream = null; 2250 2251 restartedRequest = true; 2252 goto connect; 2253 } 2254 2255 if ( _useStreaming ) { 2256 if ( _response._receiveAsRange.activated ) { 2257 debug(requests) trace("streaming_in activated"); 2258 return _response; 2259 } else { 2260 // this can happen if whole response body received together with headers 2261 _response._receiveAsRange.data = _response.responseBody.data; 2262 } 2263 } 2264 2265 close_connection_if_not_keepalive(_stream); 2266 2267 if ( _verbosity >= 1 ) { 2268 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 2269 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 2270 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 2271 } 2272 2273 if ( willFollowRedirect ) { 2274 debug(requests) trace("going to follow redirect"); 2275 if ( _history.length >= _maxRedirects ) { 2276 _stream = null; 2277 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 2278 } 2279 // "location" in response already checked in canFollowRedirect 2280 immutable new_location = *("location" in _response.responseHeaders); 2281 immutable current_uri = _uri; 2282 immutable next_uri = uriFromLocation(_uri, new_location); 2283 2284 immutable get_or_head = _method == "GET" || _method == "HEAD"; 2285 immutable code = _response.code; 2286 2287 // save current response for history 2288 _history ~= _response; 2289 2290 if ( code == 301 ) 2291 { 2292 // permanent redirect and change method 2293 _permanent_redirects[_uri] = new_location; 2294 if ( !get_or_head ) 2295 { 2296 _method = "GET"; 2297 } 2298 } 2299 if ( (code == 302 || code == 303) && !get_or_head) 2300 { 2301 // only change method 2302 _method = "GET"; 2303 } 2304 if ( code == 307 ) 2305 { 2306 // no change method, no permanent 2307 } 2308 if ( code == 308 ) 2309 { 2310 // permanent redirection and do not change method 2311 _permanent_redirects[_uri] = new_location; 2312 } 2313 2314 // prepare new response (for redirected request) 2315 _response = new HTTPResponse; 2316 _response.uri = current_uri; 2317 _response.finalURI = next_uri; 2318 _stream = null; 2319 2320 // set new uri 2321 _uri = next_uri; 2322 debug(requests) tracef("Redirected to %s", next_uri); 2323 //if ( _method != "GET" && _response.code != 307 && _response.code != 308 ) { 2324 // // 307 and 308 do not change method 2325 // return exec_from_parameters(r); 2326 //} 2327 if ( restartedRequest ) { 2328 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 2329 restartedRequest = false; 2330 } 2331 goto connect; 2332 } 2333 2334 _response._history = _history; 2335 return _response; 2336 } 2337 HTTPResponse execute(Request r) 2338 { 2339 _method = r.method; 2340 _uri = r.uri; 2341 _useStreaming = r.useStreaming; 2342 _permanent_redirects = r.permanent_redirects; 2343 _maxRedirects = r.maxRedirects; 2344 _authenticator = r.authenticator; 2345 _maxHeadersLength = r.maxHeadersLength; 2346 _maxContentLength = r.maxContentLength; 2347 _verbosity = r.verbosity; 2348 _keepAlive = r.keepAlive; 2349 _bufferSize = r.bufferSize; 2350 _proxy = r.proxy; 2351 _timeout = r.timeout; 2352 _contentType = r.contentType; 2353 _socketFactory = r.socketFactory; 2354 _sslOptions = r.sslOptions; 2355 _bind = r.bind; 2356 _headers = r.headers; 2357 _userHeaders = r.userHeaders; 2358 2359 _params = r.params; 2360 2361 // this assignments increments refCounts, so we can't use const Request 2362 // but Request is anyway struct and called by-value 2363 _cm = r.cm; 2364 _cookie = r.cookie; 2365 2366 debug(requests) trace("serving %s".format(r)); 2367 if ( !r.postData.empty) 2368 { 2369 return exec_from_range(r.postData); 2370 } 2371 if ( r.hasMultipartForm ) 2372 { 2373 return exec_from_multipart_form(r.multipartForm); 2374 } 2375 auto rs = exec_from_parameters(); 2376 return rs; 2377 } 2378 } 2379 2380 version(vibeD) { 2381 import std.json; 2382 package string httpTestServer() { 2383 return "http://httpbin.org/"; 2384 } 2385 package string fromJsonArrayToStr(JSONValue v) { 2386 return v.str; 2387 } 2388 } 2389 else { 2390 import std.json; 2391 package string httpTestServer() { 2392 return "http://127.0.0.1:8081/"; 2393 } 2394 package string fromJsonArrayToStr(JSONValue v) { 2395 return cast(string)(v.array.map!"cast(ubyte)a.integer".array); 2396 } 2397 }