1 module requests.http; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.ascii; 7 import std.conv; 8 import std.datetime; 9 import std.exception; 10 import std.format; 11 import std.stdio; 12 import std.range; 13 import std..string; 14 import std.traits; 15 import std.typecons; 16 import std.experimental.logger; 17 import core.thread; 18 19 import requests.streams; 20 import requests.uri; 21 import requests.utils; 22 import requests.base; 23 import requests.connmanager; 24 import requests.rangeadapter; 25 26 static immutable ushort[] redirectCodes = [301, 302, 303, 307, 308]; 27 enum HTTP11 = 101; 28 enum HTTP10 = 100; 29 30 static immutable string[string] proxies; 31 static this() { 32 import std.process; 33 proxies["http"] = environment.get("http_proxy", environment.get("HTTP_PROXY")); 34 proxies["https"] = environment.get("https_proxy", environment.get("HTTPS_PROXY")); 35 proxies["all"] = environment.get("all_proxy", environment.get("ALL_PROXY")); 36 foreach(p; proxies.byKey()) { 37 if (proxies[p] is null) { 38 continue; 39 } 40 URI u = URI(proxies[p]); 41 } 42 } 43 44 public class MaxRedirectsException: Exception { 45 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 46 super(message, file, line, next); 47 } 48 } 49 50 /// 51 /// 52 /// 53 //public auto queryParams(T...)(T params) pure nothrow @safe 54 //{ 55 // static assert (T.length % 2 == 0, "wrong args count"); 56 // 57 // QueryParam[] output; 58 // output.reserve = T.length / 2; 59 // 60 // void queryParamsHelper(T...)(T params, ref QueryParam[] output) 61 // { 62 // static if (T.length > 0) 63 // { 64 // output ~= QueryParam(params[0].to!string, params[1].to!string); 65 // queryParamsHelper(params[2..$], output); 66 // } 67 // } 68 // 69 // queryParamsHelper(params, output); 70 // return output; 71 //} 72 73 /// 74 /// Response - result of request execution. 75 /// 76 /// Response.code - response HTTP code. 77 /// Response.status_line - received HTTP status line. 78 /// Response.responseHeaders - received headers. 79 /// Response.responseBody - container for received body 80 /// Response.history - for redirected responses contain all history 81 /// 82 public class HTTPResponse : Response { 83 private { 84 string _status_line; 85 86 HTTPResponse[] _history; // redirects history 87 88 mixin(Setter!string("status_line")); 89 90 int _version; 91 } 92 93 ~this() { 94 _responseHeaders = null; 95 _history.length = 0; 96 } 97 98 mixin(Getter("status_line")); 99 100 @property final string[string] responseHeaders() @safe @nogc nothrow { 101 return _responseHeaders; 102 } 103 @property final HTTPResponse[] history() @safe @nogc nothrow { 104 return _history; 105 } 106 107 @property auto getStats() const pure @safe { 108 alias statTuple = Tuple!(Duration, "connectTime", 109 Duration, "sendTime", 110 Duration, "recvTime"); 111 statTuple stat; 112 stat.connectTime = _connectedAt - _startedAt; 113 stat.sendTime = _requestSentAt - _connectedAt; 114 stat.recvTime = _finishedAt - _requestSentAt; 115 return stat; 116 } 117 private int parse_version(in string v) pure const nothrow @safe { 118 // try to parse HTTP/1.x to version 119 try if ( v.length > 5 ) { 120 return (v[5..$].split(".").map!"to!int(a)".array[0..2].reduce!((a,b) => a*100 + b)); 121 } catch (Exception e) { 122 } 123 return 0; 124 } 125 unittest { 126 auto r = new HTTPResponse(); 127 assert(r.parse_version("HTTP/1.1") == 101); 128 assert(r.parse_version("HTTP/1.0") == 100); 129 assert(r.parse_version("HTTP/0.9") == 9); 130 assert(r.parse_version("HTTP/xxx") == 0); 131 } 132 } 133 134 /// 135 /// Request. 136 /// Configurable parameters: 137 /// $(B method) - string, method to use (GET, POST, ...) 138 /// $(B headers) - string[string], add any additional headers you'd like to send. 139 /// $(B authenticator) - class Auth, class to send auth headers. 140 /// $(B keepAlive) - bool, set true for keepAlive requests. default true. 141 /// $(B maxRedirects) - uint, maximum number of redirects. default 10. 142 /// $(B maxHeadersLength) - size_t, maximum length of server response headers. default = 32KB. 143 /// $(B maxContentLength) - size_t, maximun content length. delault - 0 = unlimited. 144 /// $(B bufferSize) - size_t, send and receive buffer size. default = 16KB. 145 /// $(B verbosity) - uint, level of verbosity(0 - nothing, 1 - headers, 2 - headers and body progress). default = 0. 146 /// $(B proxy) - string, set proxy url if needed. default - null. 147 /// $(B cookie) - Tuple Cookie, Read/Write cookie You can get cookie setted by server, or set cookies before doing request. 148 /// $(B timeout) - Duration, Set timeout value for connect/receive/send. 149 /// 150 public struct HTTPRequest { 151 private { 152 string _method = "GET"; 153 URI _uri; 154 string[string] _headers; 155 string[] _filteredHeaders; 156 Auth _authenticator; 157 bool _keepAlive = true; 158 uint _maxRedirects = 10; 159 size_t _maxHeadersLength = 32 * 1024; // 32 KB 160 size_t _maxContentLength; // 0 - Unlimited 161 string _proxy; 162 uint _verbosity = 0; // 0 - no output, 1 - headers, 2 - headers+body info 163 Duration _timeout = 30.seconds; 164 size_t _bufferSize = 16*1024; // 16k 165 bool _useStreaming; // return iterator instead of completed request 166 167 HTTPResponse[] _history; // redirects history 168 DataPipe!ubyte _bodyDecoder; 169 DecodeChunked _unChunker; 170 long _contentLength; 171 long _contentReceived; 172 SSLOptions _sslOptions; 173 string _bind; 174 _UH _userHeaders; 175 176 RefCounted!ConnManager _cm; 177 RefCounted!Cookies _cookie; 178 string[URI] _permanent_redirects; // cache 301 redirects for GET requests 179 MultipartForm _multipartForm; 180 181 NetStreamFactory _socketFactory; 182 183 QueryParam[] _params; 184 string _contentType; 185 InputRangeAdapter _postData; 186 } 187 package HTTPResponse _response; 188 189 mixin(Getter_Setter!string ("method")); 190 mixin(Getter_Setter!bool ("keepAlive")); 191 mixin(Getter_Setter!size_t ("maxContentLength")); 192 mixin(Getter_Setter!size_t ("maxHeadersLength")); 193 mixin(Getter_Setter!size_t ("bufferSize")); 194 mixin(Getter_Setter!uint ("maxRedirects")); 195 mixin(Getter_Setter!uint ("verbosity")); 196 mixin(Getter ("proxy")); 197 mixin(Getter_Setter!Duration ("timeout")); 198 mixin(Setter!Auth ("authenticator")); 199 mixin(Getter_Setter!bool ("useStreaming")); 200 mixin(Getter ("contentLength")); 201 mixin(Getter ("contentReceived")); 202 mixin(Getter_Setter!SSLOptions ("sslOptions")); 203 mixin(Getter_Setter!string ("bind")); 204 mixin(Setter!NetStreamFactory ("socketFactory")); 205 206 @property void sslSetVerifyPeer(bool v) pure @safe nothrow @nogc { 207 _sslOptions.setVerifyPeer(v); 208 } 209 @property void sslSetKeyFile(string p, SSLOptions.filetype t = SSLOptions.filetype.pem) pure @safe nothrow @nogc { 210 _sslOptions.setKeyFile(p, t); 211 } 212 @property void sslSetCertFile(string p, SSLOptions.filetype t = SSLOptions.filetype.pem) pure @safe nothrow @nogc { 213 _sslOptions.setCertFile(p, t); 214 } 215 @property void sslSetCaCert(string path) pure @safe nothrow @nogc { 216 _sslOptions.setCaCert(path); 217 } 218 //@property final void cookie(Cookie[] s) pure @safe @nogc nothrow { 219 // _cookie = s; 220 //} 221 @property final void proxy(string v) { 222 if ( v != _proxy ) { 223 _cm.clear(); 224 } 225 _proxy = v; 226 } 227 //@property final Cookie[] cookie() pure @safe @nogc nothrow { 228 // return _cookie; 229 //} 230 231 this(string uri) { 232 _uri = URI(uri); 233 _cm = ConnManager(10); 234 } 235 ~this() { 236 _headers = null; 237 _authenticator = null; 238 _history = null; 239 _bodyDecoder = null; 240 _unChunker = null; 241 //if ( _cm ) { 242 // _cm.clear(); 243 //} 244 } 245 string toString() const { 246 return "HTTPRequest(%s, %s)".format(_method, _uri.uri()); 247 } 248 string format(string fmt) const { 249 import std.array; 250 import std.stdio; 251 auto a = appender!string(); 252 auto f = FormatSpec!char(fmt); 253 while (f.writeUpToNextSpec(a)) { 254 switch(f.spec) { 255 case 'h': 256 // Remote hostname. 257 a.put(_uri.host); 258 break; 259 case 'm': 260 // method. 261 a.put(_method); 262 break; 263 case 'p': 264 // Remote port. 265 a.put("%d".format(_uri.port)); 266 break; 267 case 'P': 268 // Path 269 a.put(_uri.path); 270 break; 271 case 'q': 272 // query parameters supplied with url. 273 a.put(_uri.query); 274 break; 275 case 'U': 276 a.put(_uri.uri()); 277 break; 278 default: 279 throw new FormatException("Unknown Request format spec " ~ f.spec); 280 } 281 } 282 return a.data(); 283 } 284 string select_proxy(string scheme) { 285 if ( _proxy is null && proxies.length == 0 ) { 286 debug(requests) tracef("proxy=null"); 287 return null; 288 } 289 if ( _proxy ) { 290 debug(requests) tracef("proxy=%s", _proxy); 291 return _proxy; 292 } 293 auto p = scheme in proxies; 294 if ( p !is null && *p != "") { 295 debug(requests) tracef("proxy=%s", *p); 296 return *p; 297 } 298 p = "all" in proxies; 299 if ( p !is null && *p != "") { 300 debug(requests) tracef("proxy=%s", *p); 301 return *p; 302 } 303 debug(requests) tracef("proxy=null"); 304 return null; 305 } 306 void clearHeaders() { 307 _headers = null; 308 } 309 @property void uri(in URI newURI) { 310 //handleURLChange(_uri, newURI); 311 _uri = newURI; 312 } 313 /// Add headers to request 314 /// Params: 315 /// headers = headers to send. 316 void addHeaders(in string[string] headers) { 317 foreach(pair; headers.byKeyValue) { 318 string _h = pair.key; 319 switch(toLower(_h)) { 320 case "host": 321 _userHeaders.Host = true; 322 break; 323 case "user-agent": 324 _userHeaders.UserAgent = true; 325 break; 326 case "content-length": 327 _userHeaders.ContentLength = true; 328 break; 329 case "content-type": 330 _userHeaders.ContentType = true; 331 break; 332 case "connection": 333 _userHeaders.Connection = true; 334 break; 335 case "cookie": 336 _userHeaders.Cookie = true; 337 break; 338 default: 339 break; 340 } 341 _headers[pair.key] = pair.value; 342 } 343 } 344 private void safeSetHeader(ref string[string] headers, bool userAdded, string h, string v) pure @safe { 345 if ( !userAdded ) { 346 headers[h] = v; 347 } 348 } 349 /// Remove headers from request 350 /// Params: 351 /// headers = headers to remove. 352 void removeHeaders(in string[] headers) pure { 353 _filteredHeaders ~= headers; 354 } 355 /// 356 /// compose headers to send 357 /// 358 private string[string] requestHeaders() { 359 360 string[string] generatedHeaders; 361 362 if ( _authenticator ) { 363 _authenticator. 364 authHeaders(_uri.host). 365 byKeyValue. 366 each!(pair => generatedHeaders[pair.key] = pair.value); 367 } 368 369 _headers.byKey.each!(h => generatedHeaders[h] = _headers[h]); 370 371 safeSetHeader(generatedHeaders, _userHeaders.AcceptEncoding, "Accept-Encoding", "gzip,deflate"); 372 safeSetHeader(generatedHeaders, _userHeaders.UserAgent, "User-Agent", "dlang-requests"); 373 safeSetHeader(generatedHeaders, _userHeaders.Connection, "Connection", _keepAlive?"Keep-Alive":"Close"); 374 375 if ( !_userHeaders.Host ) 376 { 377 generatedHeaders["Host"] = _uri.host; 378 if ( _uri.scheme !in standard_ports || _uri.port != standard_ports[_uri.scheme] ) { 379 generatedHeaders["Host"] ~= ":%d".format(_uri.port); 380 } 381 } 382 383 if ( _cookie._array.length && !_userHeaders.Cookie ) { 384 auto cs = _cookie._array. 385 filter!(c => _uri.path.pathMatches(c.path) && _uri.host.domainMatches(c.domain)). 386 map!(c => "%s=%s".format(c.attr, c.value)). 387 joiner(";"); 388 if ( ! cs.empty ) 389 { 390 generatedHeaders["Cookie"] = to!string(cs); 391 } 392 } 393 394 _filteredHeaders.each!(h => generatedHeaders.remove(h)); 395 396 return generatedHeaders; 397 } 398 /// 399 /// Build request string. 400 /// Handle proxy and query parameters. 401 /// 402 private @property string requestString(QueryParam[] params = null) { 403 auto query = _uri.query.dup; 404 if ( params ) { 405 query ~= "&" ~ params2query(params); 406 if ( query[0] != '?' ) { 407 query = "?" ~ query; 408 } 409 } 410 string actual_proxy = select_proxy(_uri.scheme); 411 if ( actual_proxy && _uri.scheme != "https" ) { 412 return "%s %s%s HTTP/1.1\r\n".format(_method, _uri.uri(No.params), query); 413 } 414 return "%s %s%s HTTP/1.1\r\n".format(_method, _uri.path, query); 415 } 416 /// 417 /// encode parameters and build query part of the url 418 /// 419 private static string params2query(in QueryParam[] params) pure @safe { 420 return params. 421 map!(a => "%s=%s".format(a.key.urlEncoded, a.value.urlEncoded)). 422 join("&"); 423 } 424 // 425 package unittest { 426 assert(params2query(queryParams("a","b", "c", " d "))=="a=b&c=%20d%20"); 427 } 428 /// 429 /// Analyze received headers, take appropriate actions: 430 /// check content length, attach unchunk and uncompress 431 /// 432 private void analyzeHeaders(in string[string] headers) { 433 434 _contentLength = -1; 435 _unChunker = null; 436 auto contentLength = "content-length" in headers; 437 if ( contentLength ) { 438 try { 439 _contentLength = to!long(*contentLength); 440 if ( _maxContentLength && _contentLength > _maxContentLength) { 441 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 442 format(_contentLength, _maxContentLength)); 443 } 444 } catch (ConvException e) { 445 throw new RequestException("Can't convert Content-Length from %s".format(*contentLength)); 446 } 447 } 448 auto transferEncoding = "transfer-encoding" in headers; 449 if ( transferEncoding ) { 450 debug(requests) tracef("transferEncoding: %s", *transferEncoding); 451 if ( (*transferEncoding).toLower == "chunked") { 452 _unChunker = new DecodeChunked(); 453 _bodyDecoder.insert(_unChunker); 454 } 455 } 456 auto contentEncoding = "content-encoding" in headers; 457 if ( contentEncoding ) switch (*contentEncoding) { 458 default: 459 throw new RequestException("Unknown content-encoding " ~ *contentEncoding); 460 case "gzip": 461 case "deflate": 462 _bodyDecoder.insert(new Decompressor!ubyte); 463 } 464 465 } 466 /// 467 /// Called when we know that all headers already received in buffer. 468 /// This routine does not interpret headers content (see analyzeHeaders). 469 /// 1. Split headers on lines 470 /// 2. store status line, store response code 471 /// 3. unfold headers if needed 472 /// 4. store headers 473 /// 474 private void parseResponseHeaders(in ubyte[] input) { 475 string lastHeader; 476 auto buffer = cast(string)input; 477 478 foreach(line; buffer.split("\n").map!(l => l.stripRight)) { 479 if ( ! _response.status_line.length ) { 480 debug (requests) tracef("statusLine: %s", line); 481 _response.status_line = line; 482 if ( _verbosity >= 1 ) { 483 writefln("< %s", line); 484 } 485 auto parsed = line.split(" "); 486 if ( parsed.length >= 2 ) { 487 _response.code = parsed[1].to!ushort; 488 _response._version = _response.parse_version(parsed[0]); 489 } 490 continue; 491 } 492 if ( line[0] == ' ' || line[0] == '\t' ) { 493 // unfolding https://tools.ietf.org/html/rfc822#section-3.1 494 if ( auto stored = lastHeader in _response._responseHeaders) { 495 *stored ~= line; 496 } 497 continue; 498 } 499 auto parsed = line.findSplit(":"); 500 auto header = parsed[0].toLower; 501 auto value = parsed[2].strip; 502 503 if ( _verbosity >= 1 ) { 504 writefln("< %s: %s", header, value); 505 } 506 507 lastHeader = header; 508 debug (requests) tracef("Header %s = %s", header, value); 509 510 if ( header != "set-cookie" ) { 511 auto stored = _response.responseHeaders.get(header, null); 512 if ( stored ) { 513 value = stored ~ "," ~ value; 514 } 515 _response._responseHeaders[header] = value; 516 continue; 517 } 518 _cookie._array ~= processCookie(value); 519 } 520 } 521 522 /// 523 /// Process Set-Cookie header from server response 524 /// 525 private Cookie[] processCookie(string value ) pure { 526 // cookie processing 527 // 528 // as we can't join several set-cookie lines in single line 529 // < Set-Cookie: idx=7f2800f63c112a65ef5082957bcca24b; expires=Mon, 29-May-2017 00:31:25 GMT; path=/; domain=example.com 530 // < Set-Cookie: idx=7f2800f63c112a65ef5082957bcca24b; expires=Mon, 29-May-2017 00:31:25 GMT; path=/; domain=example.com, cs=ip764-RgKqc-HvSkxRxdQQAKW8LA; path=/; domain=.example.com; HttpOnly 531 // 532 Cookie[] res; 533 string[string] kv; 534 auto fields = value.split(";").map!strip; 535 while(!fields.empty) { 536 auto s = fields.front.findSplit("="); 537 fields.popFront; 538 if ( s[1] != "=" ) { 539 continue; 540 } 541 auto k = s[0]; 542 auto v = s[2]; 543 switch(k.toLower()) { 544 case "domain": 545 k = "domain"; 546 break; 547 case "path": 548 k = "path"; 549 break; 550 case "expires": 551 continue; 552 default: 553 break; 554 } 555 kv[k] = v; 556 } 557 if ( "domain" !in kv ) { 558 kv["domain"] = _uri.host; 559 } 560 if ( "path" !in kv ) { 561 kv["path"] = _uri.path; 562 } 563 auto domain = kv["domain"]; kv.remove("domain"); 564 auto path = kv["path"]; kv.remove("path"); 565 foreach(pair; kv.byKeyValue) { 566 auto _attr = pair.key; 567 auto _value = pair.value; 568 auto cookie = Cookie(path, domain, _attr, _value); 569 res ~= cookie; 570 } 571 return res; 572 } 573 /// 574 /// Do we received \r\n\r\n? 575 /// 576 private bool headersHaveBeenReceived(in ubyte[] data, ref Buffer!ubyte buffer, out string separator) const @safe { 577 foreach(s; ["\r\n\r\n", "\n\n"]) { 578 if ( data.canFind(s) || buffer.canFind(s) ) { 579 separator = s; 580 return true; 581 } 582 } 583 return false; 584 } 585 586 private bool willFollowRedirect() { 587 if ( !canFind(redirectCodes, _response.code) ) { 588 return false; 589 } 590 if ( !_maxRedirects ) { 591 return false; 592 } 593 if ( "location" !in _response.responseHeaders ) { 594 return false; 595 } 596 return true; 597 } 598 private URI uriFromLocation(const ref URI uri, in string location) { 599 URI newURI = uri; 600 try { 601 newURI = URI(location); 602 } catch (UriException e) { 603 debug(requests) trace("Can't parse Location:, try relative uri"); 604 newURI.path = location; 605 newURI.uri = newURI.recalc_uri; 606 } 607 return newURI; 608 } 609 /// 610 /// if we have new uri, then we need to check if we have to reopen existent connection 611 /// 612 private void checkURL(string url, string file=__FILE__, size_t line=__LINE__) { 613 if (url is null && _uri.uri == "" ) { 614 throw new RequestException("No url configured", file, line); 615 } 616 617 if ( url !is null ) { 618 URI newURI = URI(url); 619 //handleURLChange(_uri, newURI); 620 _uri = newURI; 621 } 622 } 623 /// 624 /// Setup connection. Handle proxy and https case 625 /// 626 /// Place new connection in ConnManager cache 627 /// 628 private NetworkStream setupConnection() 629 do { 630 631 debug(requests) tracef("Set up new connection"); 632 NetworkStream stream; 633 634 // on exit 635 // place created connection to conn. manager 636 // close connection purged from manager (if any) 637 // 638 scope(exit) { 639 if ( stream ) 640 { 641 if ( auto purged_connection = _cm.put(_uri.scheme, _uri.host, _uri.port, stream) ) 642 { 643 debug(requests) tracef("closing purged connection %s", purged_connection); 644 purged_connection.close(); 645 } 646 } 647 } 648 649 if ( _socketFactory ) 650 { 651 debug(requests) tracef("use socketFactory"); 652 stream = _socketFactory(_uri.scheme, _uri.host, _uri.port); 653 } 654 655 if ( stream ) // socket factory created connection 656 { 657 return stream; 658 } 659 660 URI uri; // this URI will be used temporarry if we need proxy 661 string actual_proxy = select_proxy(_uri.scheme); 662 final switch (_uri.scheme) { 663 case"http": 664 if ( actual_proxy ) { 665 uri.uri_parse(actual_proxy); 666 uri.idn_encode(); 667 } else { 668 // use original uri 669 uri = _uri; 670 } 671 stream = new TCPStream(); 672 stream.bind(_bind); 673 stream.connect(uri.host, uri.port, _timeout); 674 break ; 675 case"https": 676 if ( actual_proxy ) { 677 uri.uri_parse(actual_proxy); 678 uri.idn_encode(); 679 stream = new TCPStream(); 680 stream.bind(_bind); 681 stream.connect(uri.host, uri.port, _timeout); 682 if ( verbosity>=1 ) { 683 writeln("> CONNECT %s:%d HTTP/1.1".format(_uri.host, _uri.port)); 684 } 685 stream.send("CONNECT %s:%d HTTP/1.1\r\n\r\n".format(_uri.host, _uri.port)); 686 while ( stream.isConnected ) { 687 ubyte[1024] b; 688 auto read = stream.receive(b); 689 if ( verbosity>=1) { 690 writefln("< %s", cast(string)b[0..read]); 691 } 692 debug(requests) tracef("read: %d", read); 693 if ( b[0..read].canFind("\r\n\r\n") || b[0..read].canFind("\n\n") ) { 694 debug(requests) tracef("proxy connection ready"); 695 // convert connection to ssl 696 stream = new SSLStream(stream, _sslOptions, _uri.host); 697 break ; 698 } else { 699 debug(requests) tracef("still wait for proxy connection"); 700 } 701 } 702 } else { 703 uri = _uri; 704 stream = new SSLStream(_sslOptions); 705 stream.bind(_bind); 706 stream.connect(uri.host, uri.port, _timeout); 707 debug(requests) tracef("ssl connection to origin server ready"); 708 } 709 break ; 710 } 711 712 return stream; 713 } 714 /// 715 /// Request sent, now receive response. 716 /// Find headers, split on headers and body, continue to receive body 717 /// 718 private void receiveResponse(NetworkStream _stream) { 719 720 try { 721 _stream.readTimeout = timeout; 722 } catch (Exception e) { 723 debug(requests) tracef("Failed to set read timeout for stream: %s", e.msg); 724 return; 725 } 726 // Commented this out as at exit we can have alreade closed socket 727 // scope(exit) { 728 // if ( _stream && _stream.isOpen ) { 729 // _stream.readTimeout = 0.seconds; 730 // } 731 // } 732 733 _bodyDecoder = new DataPipe!ubyte(); 734 scope(exit) { 735 if ( !_useStreaming ) { 736 _bodyDecoder = null; 737 _unChunker = null; 738 } 739 } 740 741 auto buffer = Buffer!ubyte(); 742 Buffer!ubyte partialBody; 743 ptrdiff_t read; 744 string separator; 745 746 while(true) { 747 748 auto b = new ubyte[_bufferSize]; 749 read = _stream.receive(b); 750 751 debug(requests) tracef("read: %d", read); 752 if ( read == 0 ) { 753 break; 754 } 755 auto data = b[0..read]; 756 buffer.putNoCopy(data); 757 if ( verbosity>=3 ) { 758 writeln(data.dump.join("\n")); 759 } 760 761 if ( buffer.length > maxHeadersLength ) { 762 throw new RequestException("Headers length > maxHeadersLength (%d > %d)".format(buffer.length, maxHeadersLength)); 763 } 764 if ( headersHaveBeenReceived(data, buffer, separator) ) { 765 auto s = buffer.data.findSplit(separator); 766 auto ResponseHeaders = s[0]; 767 partialBody = Buffer!ubyte(s[2]); 768 _contentReceived += partialBody.length; 769 parseResponseHeaders(ResponseHeaders); 770 break; 771 } 772 } 773 774 analyzeHeaders(_response._responseHeaders); 775 776 _bodyDecoder.putNoCopy(partialBody.data); 777 778 auto v = _bodyDecoder.get(); 779 _response._responseBody.putNoCopy(v); 780 781 if ( _verbosity >= 2 ) writefln("< %d bytes of body received", partialBody.length); 782 783 if ( _method == "HEAD" ) { 784 // HEAD response have ContentLength, but have no body 785 return; 786 } 787 788 while( true ) { 789 if ( _contentLength >= 0 && _contentReceived >= _contentLength ) { 790 debug(requests) trace("Body received."); 791 break; 792 } 793 if ( _unChunker && _unChunker.done ) { 794 break; 795 } 796 797 if ( _useStreaming && _response._responseBody.length && !redirectCodes.canFind(_response.code) ) { 798 debug(requests) trace("streaming requested"); 799 // save _stream in closure 800 auto __stream = _stream; 801 auto __bodyDecoder = _bodyDecoder; 802 auto __unChunker = _unChunker; 803 auto __contentReceived = _contentReceived; 804 auto __contentLength = _contentLength; 805 auto __bufferSize = _bufferSize; 806 807 // set up response 808 _response._contentLength = _contentLength; 809 _response.receiveAsRange.activated = true; 810 _response.receiveAsRange.data = _response._responseBody.data; 811 _response.receiveAsRange.read = delegate ubyte[] () { 812 813 while(true) { 814 // check if we received everything we need 815 if ( ( __unChunker && __unChunker.done ) 816 || !__stream.isConnected() 817 || (__contentLength > 0 && __contentReceived >= __contentLength) ) 818 { 819 debug(requests) trace("streaming_in receive completed"); 820 __bodyDecoder.flush(); 821 return __bodyDecoder.get(); 822 } 823 // have to continue 824 auto b = new ubyte[__bufferSize]; 825 try { 826 read = __stream.receive(b); 827 } 828 catch (Exception e) { 829 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 830 } 831 debug(requests) tracef("streaming_in received %d bytes", read); 832 833 if ( read == 0 ) { 834 debug(requests) tracef("streaming_in: server closed connection"); 835 __bodyDecoder.flush(); 836 return __bodyDecoder.get(); 837 } 838 839 if ( verbosity>=3 ) { 840 writeln(b[0..read].dump.join("\n")); 841 } 842 _response._contentReceived += read; 843 __contentReceived += read; 844 __bodyDecoder.putNoCopy(b[0..read]); 845 auto res = __bodyDecoder.getNoCopy(); 846 if ( res.length == 0 ) { 847 // there were nothing to produce (beginning of the chunk or no decompressed data) 848 continue; 849 } 850 if (res.length == 1) { 851 return res[0]; 852 } 853 // 854 // I'd like to "return _bodyDecoder.getNoCopy().join;" but it is slower 855 // 856 auto total = res.map!(b=>b.length).sum; 857 // create buffer for joined bytes 858 ubyte[] joined = new ubyte[total]; 859 size_t p; 860 // memcopy 861 foreach(ref _; res) { 862 joined[p .. p + _.length] = _; 863 p += _.length; 864 } 865 return joined; 866 } 867 assert(0); 868 }; 869 // we prepared for streaming 870 return; 871 } 872 873 auto b = new ubyte[_bufferSize]; 874 read = _stream.receive(b); 875 876 if ( read == 0 ) { 877 debug(requests) trace("read done"); 878 break; 879 } 880 if ( _verbosity >= 2 ) { 881 writefln("< %d bytes of body received", read); 882 } 883 884 if ( verbosity>=3 ) { 885 writeln(b[0..read].dump.join("\n")); 886 } 887 888 debug(requests) tracef("read: %d", read); 889 _contentReceived += read; 890 if ( _maxContentLength && _contentReceived > _maxContentLength ) { 891 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 892 format(_contentLength, _maxContentLength)); 893 } 894 895 _bodyDecoder.putNoCopy(b[0..read]); // send buffer to all decoders 896 897 _bodyDecoder.getNoCopy. // fetch result and place to body 898 each!(b => _response._responseBody.putNoCopy(b)); 899 900 debug(requests) tracef("receivedTotal: %d, contentLength: %d, bodyLength: %d", _contentReceived, _contentLength, _response._responseBody.length); 901 902 } 903 _bodyDecoder.flush(); 904 _response._responseBody.putNoCopy(_bodyDecoder.get()); 905 } 906 /// 907 /// Check that we received anything. 908 /// Server can close previous connection (keepalive or not) 909 /// 910 private bool serverPrematurelyClosedConnection() pure @safe { 911 immutable server_closed_connection = _response._responseHeaders.length == 0 && _response._status_line.length == 0; 912 // debug(requests) tracef("server closed connection = %s (headers.length=%s, status_line.length=%s)", 913 // server_closed_connection, _response._responseHeaders.length, _response._status_line.length); 914 return server_closed_connection; 915 } 916 private bool isIdempotent(in string method) pure @safe nothrow { 917 return ["GET", "HEAD"].canFind(method); 918 } 919 /// 920 /// If we do not want keepalive request, 921 /// or server signalled to close connection, 922 /// then close it 923 /// 924 void close_connection_if_not_keepalive(NetworkStream _stream) { 925 auto connection = "connection" in _response._responseHeaders; 926 if ( !_keepAlive ) { 927 _stream.close(); 928 } else switch(_response._version) { 929 case HTTP11: 930 // HTTP/1.1 defines the "close" connection option for the sender to signal that the connection 931 // will be closed after completion of the response. For example, 932 // Connection: close 933 // in either the request or the response header fields indicates that the connection 934 // SHOULD NOT be considered `persistent' (section 8.1) after the current request/response is complete. 935 // HTTP/1.1 applications that do not support persistent connections MUST include the "close" connection 936 // option in every message. 937 if ( connection && (*connection).toLower.split(",").canFind("close") ) { 938 _stream.close(); 939 } 940 break; 941 default: 942 // for anything else close connection if there is no keep-alive in Connection 943 if ( connection && !(*connection).toLower.split(",").canFind("keep-alive") ) { 944 _stream.close(); 945 } 946 break; 947 } 948 } 949 /// 950 /// Send multipart for request. 951 /// You would like to use this method for sending large portions of mixed data or uploading files to forms. 952 /// Content of the posted form consist of sources. Each source have at least name and value (can be string-like object or opened file, see more docs for MultipartForm struct) 953 /// Params: 954 /// url = url 955 /// sources = array of sources. 956 HTTPResponse exec(string method="POST")(string url, MultipartForm sources) { 957 import std.uuid; 958 import std.file; 959 960 checkURL(url); 961 //if ( _cm is null ) { 962 // _cm = new ConnManager(); 963 //} 964 965 NetworkStream _stream; 966 _method = method; 967 _response = new HTTPResponse; 968 _response.uri = _uri; 969 _response.finalURI = _uri; 970 bool restartedRequest = false; 971 972 connect: 973 _contentReceived = 0; 974 _response._startedAt = Clock.currTime; 975 976 assert(_stream is null); 977 978 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 979 980 if ( _stream is null ) { 981 debug(requests) trace("create new connection"); 982 _stream = setupConnection(); 983 } else { 984 debug(requests) trace("reuse old connection"); 985 } 986 987 assert(_stream !is null); 988 989 if ( !_stream.isConnected ) { 990 debug(requests) trace("disconnected stream on enter"); 991 if ( !restartedRequest ) { 992 debug(requests) trace("disconnected stream on enter: retry"); 993 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 994 995 _cm.del(_uri.scheme, _uri.host, _uri.port); 996 _stream.close(); 997 _stream = null; 998 999 restartedRequest = true; 1000 goto connect; 1001 } 1002 debug(requests) trace("disconnected stream on enter: return response"); 1003 //_stream = null; 1004 return _response; 1005 } 1006 _response._connectedAt = Clock.currTime; 1007 1008 Appender!string req; 1009 req.put(requestString()); 1010 1011 string boundary = randomUUID().toString; 1012 string[] partHeaders; 1013 size_t contentLength; 1014 1015 foreach(ref part; sources._sources) { 1016 string h = "--" ~ boundary ~ "\r\n"; 1017 string disposition = `form-data; name="%s"`.format(part.name); 1018 string optionals = part. 1019 parameters.byKeyValue(). 1020 filter!(p => p.key!="Content-Type"). 1021 map! (p => "%s=%s".format(p.key, p.value)). 1022 join("; "); 1023 1024 h ~= `Content-Disposition: ` ~ [disposition, optionals].join("; ") ~ "\r\n"; 1025 1026 auto contentType = "Content-Type" in part.parameters; 1027 if ( contentType ) { 1028 h ~= "Content-Type: " ~ *contentType ~ "\r\n"; 1029 } 1030 1031 h ~= "\r\n"; 1032 partHeaders ~= h; 1033 contentLength += h.length + part.input.getSize() + "\r\n".length; 1034 } 1035 contentLength += "--".length + boundary.length + "--\r\n".length; 1036 1037 auto h = requestHeaders(); 1038 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "multipart/form-data; boundary=" ~ boundary); 1039 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(contentLength)); 1040 1041 h.byKeyValue. 1042 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1043 each!(h => req.put(h)); 1044 req.put("\r\n"); 1045 1046 debug(requests) trace(req.data); 1047 if ( _verbosity >= 1 ) req.data.splitLines.each!(a => writeln("> " ~ a)); 1048 1049 try { 1050 _stream.send(req.data()); 1051 foreach(ref source; sources._sources) { 1052 debug(requests) tracef("sending part headers <%s>", partHeaders.front); 1053 _stream.send(partHeaders.front); 1054 partHeaders.popFront; 1055 while (true) { 1056 auto chunk = source.input.read(); 1057 if ( chunk.length <= 0 ) { 1058 break; 1059 } 1060 _stream.send(chunk); 1061 } 1062 _stream.send("\r\n"); 1063 } 1064 _stream.send("--" ~ boundary ~ "--\r\n"); 1065 _response._requestSentAt = Clock.currTime; 1066 receiveResponse(_stream); 1067 _response._finishedAt = Clock.currTime; 1068 } 1069 catch (NetworkException e) { 1070 errorf("Error sending request: ", e.msg); 1071 _stream.close(); 1072 return _response; 1073 } 1074 1075 if ( serverPrematurelyClosedConnection() 1076 && !restartedRequest 1077 && isIdempotent(_method) 1078 ) { 1079 /// 1080 /// We didn't receive any data (keepalive connectioin closed?) 1081 /// and we can restart this request. 1082 /// Go ahead. 1083 /// 1084 debug(requests) tracef("Server closed keepalive connection"); 1085 1086 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1087 1088 _cm.del(_uri.scheme, _uri.host, _uri.port); 1089 _stream.close(); 1090 _stream = null; 1091 1092 restartedRequest = true; 1093 goto connect; 1094 } 1095 1096 if ( _useStreaming ) { 1097 if ( _response._receiveAsRange.activated ) { 1098 debug(requests) trace("streaming_in activated"); 1099 return _response; 1100 } else { 1101 // this can happen if whole response body received together with headers 1102 _response._receiveAsRange.data = _response.responseBody.data; 1103 } 1104 } 1105 1106 close_connection_if_not_keepalive(_stream); 1107 1108 if ( _verbosity >= 1 ) { 1109 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1110 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1111 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1112 } 1113 1114 if ( willFollowRedirect ) { 1115 if ( _history.length >= _maxRedirects ) { 1116 _stream = null; 1117 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1118 } 1119 // "location" in response already checked in canFollowRedirect 1120 immutable new_location = *("location" in _response.responseHeaders); 1121 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1122 1123 // save current response for history 1124 _history ~= _response; 1125 1126 // prepare new response (for redirected request) 1127 _response = new HTTPResponse; 1128 _response.uri = current_uri; 1129 _response.finalURI = next_uri; 1130 _stream = null; 1131 1132 // set new uri 1133 this._uri = next_uri; 1134 debug(requests) tracef("Redirected to %s", next_uri); 1135 if ( _method != "GET" && _response.code != 307 && _response.code != 308 ) { 1136 // 307 and 308 do not change method 1137 return this.get(); 1138 } 1139 if ( restartedRequest ) { 1140 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1141 restartedRequest = false; 1142 } 1143 goto connect; 1144 } 1145 1146 _response._history = _history; 1147 return _response; 1148 } 1149 1150 // we use this if we send from ubyte[][] and user provided Content-Length 1151 private void sendFlattenContent(T)(NetworkStream _stream, T content) { 1152 while ( !content.empty ) { 1153 auto chunk = content.front; 1154 _stream.send(chunk); 1155 content.popFront; 1156 } 1157 debug(requests) tracef("sent"); 1158 } 1159 // we use this if we send from ubyte[][] as chunked content 1160 private void sendChunkedContent(T)(NetworkStream _stream, T content) { 1161 while ( !content.empty ) { 1162 auto chunk = content.front; 1163 auto chunkHeader = "%x\r\n".format(chunk.length); 1164 debug(requests) tracef("sending %s%s", chunkHeader, chunk); 1165 _stream.send(chunkHeader); 1166 _stream.send(chunk); 1167 _stream.send("\r\n"); 1168 content.popFront; 1169 } 1170 debug(requests) tracef("sent"); 1171 _stream.send("0\r\n\r\n"); 1172 } 1173 /// 1174 /// POST/PUT/... data from some string(with Content-Length), or from range of strings/bytes (use Transfer-Encoding: chunked). 1175 /// When rank 1 (flat array) used as content it must have length. In that case "content" will be sent directly to network, and Content-Length headers will be added. 1176 /// If you are goung to send some range and do not know length at the moment when you start to send request, then you can send chunks of chars or ubyte. 1177 /// Try not to send too short chunks as this will put additional load on client and server. Chunks of length 2048 or 4096 are ok. 1178 /// 1179 /// Parameters: 1180 /// url = url 1181 /// content = string or input range 1182 /// contentType = content type 1183 /// Returns: 1184 /// Response 1185 /// Examples: 1186 /// --------------------------------------------------------------------------------------------------------- 1187 /// rs = rq.exec!"POST"("http://httpbin.org/post", "привiт, свiт!", "application/octet-stream"); 1188 /// 1189 /// auto s = lineSplitter("one,\ntwo,\nthree."); 1190 /// rs = rq.exec!"POST"("http://httpbin.org/post", s, "application/octet-stream"); 1191 /// 1192 /// auto s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; 1193 /// rs = rq.exec!"POST"("http://httpbin.org/post", s.representation.chunks(10), "application/octet-stream"); 1194 /// 1195 /// auto f = File("tests/test.txt", "rb"); 1196 /// rs = rq.exec!"POST"("http://httpbin.org/post", f.byChunk(3), "application/octet-stream"); 1197 /// -------------------------------------------------------------------------------------------------------- 1198 deprecated("Use Request() instead of HTTPRequest(); will be removed 2019-07") 1199 HTTPResponse exec(string method="POST", R)(string url, R content, string contentType="application/octet-stream") 1200 if ( (rank!R == 1) 1201 || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 1202 || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte))) 1203 ) 1204 do { 1205 debug(requests) tracef("started url=%s, this._uri=%s", url, _uri); 1206 1207 checkURL(url); 1208 //if ( _cm is null ) { 1209 // _cm = new ConnManager(); 1210 //} 1211 1212 NetworkStream _stream; 1213 _method = method; 1214 _response = new HTTPResponse; 1215 _history.length = 0; 1216 _response.uri = _uri; 1217 _response.finalURI = _uri; 1218 bool restartedRequest = false; 1219 bool send_flat; 1220 1221 connect: 1222 _contentReceived = 0; 1223 _response._startedAt = Clock.currTime; 1224 1225 assert(_stream is null); 1226 1227 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1228 1229 if ( _stream is null ) { 1230 debug(requests) trace("create new connection"); 1231 _stream = setupConnection(); 1232 } else { 1233 debug(requests) trace("reuse old connection"); 1234 } 1235 1236 assert(_stream !is null); 1237 1238 if ( !_stream.isConnected ) { 1239 debug(requests) trace("disconnected stream on enter"); 1240 if ( !restartedRequest ) { 1241 debug(requests) trace("disconnected stream on enter: retry"); 1242 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1243 1244 _cm.del(_uri.scheme, _uri.host, _uri.port); 1245 _stream.close(); 1246 _stream = null; 1247 1248 restartedRequest = true; 1249 goto connect; 1250 } 1251 debug(requests) trace("disconnected stream on enter: return response"); 1252 //_stream = null; 1253 return _response; 1254 } 1255 _response._connectedAt = Clock.currTime; 1256 1257 Appender!string req; 1258 req.put(requestString()); 1259 1260 auto h = requestHeaders; 1261 if ( contentType ) { 1262 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", contentType); 1263 } 1264 static if ( rank!R == 1 ) { 1265 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(content.length)); 1266 } else { 1267 if ( _userHeaders.ContentLength ) { 1268 debug(requests) tracef("User provided content-length for chunked content"); 1269 send_flat = true; 1270 } else { 1271 h["Transfer-Encoding"] = "chunked"; 1272 send_flat = false; 1273 } 1274 } 1275 h.byKeyValue. 1276 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1277 each!(h => req.put(h)); 1278 req.put("\r\n"); 1279 1280 debug(requests) trace(req.data); 1281 if ( _verbosity >= 1 ) { 1282 req.data.splitLines.each!(a => writeln("> " ~ a)); 1283 } 1284 1285 try { 1286 // send headers 1287 _stream.send(req.data()); 1288 // send body 1289 static if ( rank!R == 1) { 1290 _stream.send(content); 1291 } else { 1292 if ( send_flat ) { 1293 sendFlattenContent(_stream, content); 1294 } else { 1295 sendChunkedContent(_stream, content); 1296 } 1297 } 1298 _response._requestSentAt = Clock.currTime; 1299 debug(requests) trace("starting receive response"); 1300 receiveResponse(_stream); 1301 debug(requests) trace("finished receive response"); 1302 _response._finishedAt = Clock.currTime; 1303 } catch (NetworkException e) { 1304 _stream.close(); 1305 throw new RequestException("Network error during data exchange"); 1306 } 1307 1308 if ( serverPrematurelyClosedConnection() 1309 && !restartedRequest 1310 && isIdempotent(_method) 1311 ) { 1312 /// 1313 /// We didn't receive any data (keepalive connectioin closed?) 1314 /// and we can restart this request. 1315 /// Go ahead. 1316 /// 1317 debug(requests) tracef("Server closed keepalive connection"); 1318 1319 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1320 1321 _cm.del(_uri.scheme, _uri.host, _uri.port); 1322 _stream.close(); 1323 _stream = null; 1324 1325 restartedRequest = true; 1326 goto connect; 1327 } 1328 1329 if ( _useStreaming ) { 1330 if ( _response._receiveAsRange.activated ) { 1331 debug(requests) trace("streaming_in activated"); 1332 return _response; 1333 } else { 1334 // this can happen if whole response body received together with headers 1335 _response._receiveAsRange.data = _response.responseBody.data; 1336 } 1337 } 1338 1339 close_connection_if_not_keepalive(_stream); 1340 1341 if ( _verbosity >= 1 ) { 1342 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1343 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1344 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1345 } 1346 1347 1348 if ( willFollowRedirect ) { 1349 if ( _history.length >= _maxRedirects ) { 1350 _stream = null; 1351 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1352 } 1353 // "location" in response already checked in canFollowRedirect 1354 immutable new_location = *("location" in _response.responseHeaders); 1355 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1356 1357 // save current response for history 1358 _history ~= _response; 1359 1360 // prepare new response (for redirected request) 1361 _response = new HTTPResponse; 1362 _response.uri = current_uri; 1363 _response.finalURI = next_uri; 1364 1365 _stream = null; 1366 1367 // set new uri 1368 this._uri = next_uri; 1369 debug(requests) tracef("Redirected to %s", next_uri); 1370 if ( _method != "GET" && _response.code != 307 && _response.code != 308 ) { 1371 // 307 and 308 do not change method 1372 return this.get(); 1373 } 1374 if ( restartedRequest ) { 1375 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1376 restartedRequest = false; 1377 } 1378 goto connect; 1379 } 1380 1381 _response._history = _history; 1382 return _response; 1383 } 1384 /// 1385 /// Send request with parameters. 1386 /// If used for POST or PUT requests then application/x-www-form-urlencoded used. 1387 /// Request parameters will be encoded into request string or placed in request body for POST/PUT 1388 /// requests. 1389 /// Parameters: 1390 /// url = url 1391 /// params = request parameters 1392 /// Returns: 1393 /// Response 1394 /// Examples: 1395 /// --------------------------------------------------------------------------------- 1396 /// rs = Request().exec!"GET"("http://httpbin.org/get", ["c":"d", "a":"b"]); 1397 /// --------------------------------------------------------------------------------- 1398 /// 1399 deprecated("Use Request() instead of HTTPRequest; will be removed 2019-07") 1400 HTTPResponse exec(string method="GET")(string url = null, QueryParam[] params = null) 1401 do { 1402 debug(requests) tracef("started url=%s, this._uri=%s", url, _uri); 1403 1404 checkURL(url); 1405 //if ( _cm is null ) { 1406 // _cm = new ConnManager(); 1407 //} 1408 1409 NetworkStream _stream; 1410 _method = method; 1411 _response = new HTTPResponse; 1412 _history.length = 0; 1413 _response.uri = _uri; 1414 _response.finalURI = _uri; 1415 bool restartedRequest = false; // True if this is restarted keepAlive request 1416 1417 connect: 1418 if ( _method == "GET" && _uri in _permanent_redirects ) { 1419 debug(requests) trace("use parmanent redirects cache"); 1420 _uri = uriFromLocation(_uri, _permanent_redirects[_uri]); 1421 _response._finalURI = _uri; 1422 } 1423 _contentReceived = 0; 1424 _response._startedAt = Clock.currTime; 1425 1426 assert(_stream is null); 1427 1428 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1429 1430 if ( _stream is null ) { 1431 debug(requests) trace("create new connection"); 1432 _stream = setupConnection(); 1433 } else { 1434 debug(requests) trace("reuse old connection"); 1435 } 1436 1437 assert(_stream !is null); 1438 1439 if ( !_stream.isConnected ) { 1440 debug(requests) trace("disconnected stream on enter"); 1441 if ( !restartedRequest ) { 1442 debug(requests) trace("disconnected stream on enter: retry"); 1443 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1444 1445 _cm.del(_uri.scheme, _uri.host, _uri.port); 1446 _stream.close(); 1447 _stream = null; 1448 1449 restartedRequest = true; 1450 goto connect; 1451 } 1452 debug(requests) trace("disconnected stream on enter: return response"); 1453 //_stream = null; 1454 return _response; 1455 } 1456 _response._connectedAt = Clock.currTime; 1457 1458 auto h = requestHeaders(); 1459 1460 Appender!string req; 1461 1462 string encoded; 1463 1464 switch (_method) { 1465 case "POST","PUT": 1466 encoded = params2query(params); 1467 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "application/x-www-form-urlencoded"); 1468 if ( encoded.length > 0) { 1469 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(encoded.length)); 1470 } 1471 req.put(requestString()); 1472 break; 1473 default: 1474 req.put(requestString(params)); 1475 } 1476 1477 h.byKeyValue. 1478 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1479 each!(h => req.put(h)); 1480 req.put("\r\n"); 1481 if ( encoded ) { 1482 req.put(encoded); 1483 } 1484 1485 debug(requests) trace(req.data); 1486 if ( _verbosity >= 1 ) { 1487 req.data.splitLines.each!(a => writeln("> " ~ a)); 1488 } 1489 // 1490 // Now send request and receive response 1491 // 1492 try { 1493 _stream.send(req.data()); 1494 _response._requestSentAt = Clock.currTime; 1495 debug(requests) trace("starting receive response"); 1496 receiveResponse(_stream); 1497 debug(requests) trace("done receive response"); 1498 _response._finishedAt = Clock.currTime; 1499 } 1500 catch (NetworkException e) { 1501 // On SEND this can means: 1502 // we started to send request to the server, but it closed connection because of keepalive timeout. 1503 // We have to restart request if possible. 1504 1505 // On RECEIVE - if we received something - then this exception is real and unexpected error. 1506 // If we didn't receive anything - we can restart request again as it can be 1507 debug(requests) tracef("Exception on receive response: %s", e.msg); 1508 if ( _response._responseHeaders.length != 0 ) 1509 { 1510 _stream.close(); 1511 throw new RequestException("Unexpected network error"); 1512 } 1513 } 1514 1515 if ( serverPrematurelyClosedConnection() 1516 && !restartedRequest 1517 && isIdempotent(_method) 1518 ) { 1519 /// 1520 /// We didn't receive any data (keepalive connectioin closed?) 1521 /// and we can restart this request. 1522 /// Go ahead. 1523 /// 1524 debug(requests) tracef("Server closed keepalive connection"); 1525 1526 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1527 1528 _cm.del(_uri.scheme, _uri.host, _uri.port); 1529 _stream.close(); 1530 _stream = null; 1531 1532 restartedRequest = true; 1533 goto connect; 1534 } 1535 1536 if ( _useStreaming ) { 1537 if ( _response._receiveAsRange.activated ) { 1538 debug(requests) trace("streaming_in activated"); 1539 return _response; 1540 } else { 1541 // this can happen if whole response body received together with headers 1542 _response._receiveAsRange.data = _response.responseBody.data; 1543 } 1544 } 1545 1546 close_connection_if_not_keepalive(_stream); 1547 1548 if ( _verbosity >= 1 ) { 1549 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1550 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1551 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1552 } 1553 1554 if ( willFollowRedirect ) { 1555 debug(requests) trace("going to follow redirect"); 1556 if ( _history.length >= _maxRedirects ) { 1557 _stream = null; 1558 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1559 } 1560 // "location" in response already checked in canFollowRedirect 1561 immutable new_location = *("location" in _response.responseHeaders); 1562 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1563 1564 if ( _method == "GET" && _response.code == 301 ) { 1565 _permanent_redirects[_uri] = new_location; 1566 } 1567 1568 // save current response for history 1569 _history ~= _response; 1570 1571 // prepare new response (for redirected request) 1572 _response = new HTTPResponse; 1573 _response.uri = current_uri; 1574 _response.finalURI = next_uri; 1575 _stream = null; 1576 1577 // set new uri 1578 _uri = next_uri; 1579 debug(requests) tracef("Redirected to %s", next_uri); 1580 if ( _method != "GET" && _response.code != 307 && _response.code != 308 ) { 1581 // 307 and 308 do not change method 1582 return this.get(); 1583 } 1584 if ( restartedRequest ) { 1585 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1586 restartedRequest = false; 1587 } 1588 goto connect; 1589 } 1590 1591 _response._history = _history; 1592 return _response; 1593 } 1594 1595 /// WRAPPERS 1596 /// 1597 /// send file(s) using POST and multipart form. 1598 /// This wrapper will be deprecated, use post with MultipartForm - it is more general and clear. 1599 /// Parameters: 1600 /// url = url 1601 /// files = array of PostFile structures 1602 /// Returns: 1603 /// Response 1604 /// Each PostFile structure contain path to file, and optional field name and content type. 1605 /// If no field name provided, then basename of the file will be used. 1606 /// application/octet-stream is default when no content type provided. 1607 /// Example: 1608 /// --------------------------------------------------------------- 1609 /// PostFile[] files = [ 1610 /// {fileName:"tests/abc.txt", fieldName:"abc", contentType:"application/octet-stream"}, 1611 /// {fileName:"tests/test.txt"} 1612 /// ]; 1613 /// rs = rq.exec!"POST"("http://httpbin.org/post", files); 1614 /// --------------------------------------------------------------- 1615 /// 1616 deprecated("Use Request() instead of HTTPRequest(); will be removed 2019-07") 1617 HTTPResponse exec(string method="POST")(string url, PostFile[] files) if (method=="POST") { 1618 MultipartForm multipart; 1619 File[] toClose; 1620 foreach(ref f; files) { 1621 File file = File(f.fileName, "rb"); 1622 toClose ~= file; 1623 string fileName = f.fileName ? f.fileName : f.fieldName; 1624 string contentType = f.contentType ? f.contentType : "application/octetstream"; 1625 multipart.add(f.fieldName, new FormDataFile(file), ["filename":fileName, "Content-Type": contentType]); 1626 } 1627 auto res = exec!"POST"(url, multipart); 1628 toClose.each!"a.close"; 1629 return res; 1630 } 1631 /// 1632 /// exec request with parameters when you can use dictionary (when you have no duplicates in parameter names) 1633 /// Consider switch to exec(url, QueryParams) as it more generic and clear. 1634 /// Parameters: 1635 /// url = url 1636 /// params = dictionary with field names as keys and field values as values. 1637 /// Returns: 1638 /// Response 1639 deprecated("Use Request() instead of HTTPRequest(); will be removed 2019-07") 1640 HTTPResponse exec(string method="GET")(string url, string[string] params) { 1641 return exec!method(url, params.byKeyValue.map!(p => QueryParam(p.key, p.value)).array); 1642 } 1643 /// 1644 /// GET request. Simple wrapper over exec!"GET" 1645 /// Params: 1646 /// args = request parameters. see exec docs. 1647 /// 1648 deprecated("Use Request() instead of HTTPRequest; will be removed 2019-07") 1649 HTTPResponse get(A...)(A args) { 1650 return exec!"GET"(args); 1651 } 1652 /// 1653 /// POST request. Simple wrapper over exec!"POST" 1654 /// Params: 1655 /// uri = endpoint uri 1656 /// args = request parameters. see exec docs. 1657 /// 1658 deprecated("Use Request() instead of HTTPRequest; will be removed 2019-07") 1659 HTTPResponse post(A...)(string uri, A args) { 1660 return exec!"POST"(uri, args); 1661 } 1662 // XXX interceptors 1663 import requests.request; 1664 1665 // we use this if we send from ubyte[][] and user provided Content-Length 1666 private void sendFlattenContent(NetworkStream _stream) { 1667 while ( !_postData.empty ) { 1668 auto chunk = _postData.front; 1669 _stream.send(chunk); 1670 _postData.popFront; 1671 } 1672 debug(requests) tracef("sent"); 1673 } 1674 // we use this if we send from ubyte[][] as chunked content 1675 private void sendChunkedContent(NetworkStream _stream) { 1676 while ( !_postData.empty ) { 1677 auto chunk = _postData.front; 1678 auto chunkHeader = "%x\r\n".format(chunk.length); 1679 debug(requests) tracef("sending %s%s", chunkHeader, cast(string)chunk); 1680 _stream.send(chunkHeader); 1681 _stream.send(chunk); 1682 _stream.send("\r\n"); 1683 debug(requests) tracef("chunk sent"); 1684 _postData.popFront; 1685 } 1686 debug(requests) tracef("sent"); 1687 _stream.send("0\r\n\r\n"); 1688 } 1689 1690 HTTPResponse exec_from_range(InputRangeAdapter postData) 1691 do { 1692 1693 _postData = postData; 1694 1695 debug(requests) tracef("exec from range"); 1696 1697 NetworkStream _stream; 1698 _response = new HTTPResponse; 1699 _history.length = 0; 1700 _response.uri = _uri; 1701 _response.finalURI = _uri; 1702 bool restartedRequest = false; 1703 bool send_flat; 1704 1705 connect: 1706 _contentReceived = 0; 1707 _response._startedAt = Clock.currTime; 1708 1709 assert(_stream is null); 1710 1711 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1712 1713 if ( _stream is null ) { 1714 debug(requests) trace("create new connection"); 1715 _stream = setupConnection(); 1716 } else { 1717 debug(requests) trace("reuse old connection"); 1718 } 1719 1720 assert(_stream !is null); 1721 1722 if ( !_stream.isConnected ) { 1723 debug(requests) trace("disconnected stream on enter"); 1724 if ( !restartedRequest ) { 1725 debug(requests) trace("disconnected stream on enter: retry"); 1726 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1727 1728 _cm.del(_uri.scheme, _uri.host, _uri.port); 1729 _stream.close(); 1730 _stream = null; 1731 1732 restartedRequest = true; 1733 goto connect; 1734 } 1735 debug(requests) trace("disconnected stream on enter: return response"); 1736 //_stream = null; 1737 return _response; 1738 } 1739 _response._connectedAt = Clock.currTime; 1740 1741 Appender!string req; 1742 req.put(requestString()); 1743 1744 auto h = requestHeaders; 1745 if ( _contentType ) { 1746 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", _contentType); 1747 } 1748 1749 if ( _postData.length >= 0 ) 1750 { 1751 // we know t 1752 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(_postData.length)); 1753 } 1754 1755 if ( _userHeaders.ContentLength || "Content-Length" in h ) 1756 { 1757 debug(requests) tracef("User provided content-length for chunked content"); 1758 send_flat = true; 1759 } 1760 else 1761 { 1762 h["Transfer-Encoding"] = "chunked"; 1763 send_flat = false; 1764 } 1765 h.byKeyValue. 1766 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1767 each!(h => req.put(h)); 1768 req.put("\r\n"); 1769 1770 debug(requests) tracef("send <%s>", req.data); 1771 if ( _verbosity >= 1 ) { 1772 req.data.splitLines.each!(a => writeln("> " ~ a)); 1773 } 1774 1775 try { 1776 // send headers 1777 _stream.send(req.data()); 1778 // send body 1779 //static if ( rank!R == 1) { 1780 // _stream.send(content); 1781 //} else { 1782 if ( send_flat ) { 1783 sendFlattenContent(_stream); 1784 } else { 1785 sendChunkedContent(_stream); 1786 } 1787 //} 1788 _response._requestSentAt = Clock.currTime; 1789 debug(requests) trace("starting receive response"); 1790 receiveResponse(_stream); 1791 debug(requests) trace("finished receive response"); 1792 _response._finishedAt = Clock.currTime; 1793 } 1794 catch (NetworkException e) 1795 { 1796 _stream.close(); 1797 throw new RequestException("Network error during data exchange"); 1798 } 1799 if ( serverPrematurelyClosedConnection() 1800 && !restartedRequest 1801 && isIdempotent(_method) 1802 ) { 1803 /// 1804 /// We didn't receive any data (keepalive connectioin closed?) 1805 /// and we can restart this request. 1806 /// Go ahead. 1807 /// 1808 debug(requests) tracef("Server closed keepalive connection"); 1809 1810 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1811 1812 _cm.del(_uri.scheme, _uri.host, _uri.port); 1813 _stream.close(); 1814 _stream = null; 1815 1816 restartedRequest = true; 1817 goto connect; 1818 } 1819 1820 if ( _useStreaming ) { 1821 if ( _response._receiveAsRange.activated ) { 1822 debug(requests) trace("streaming_in activated"); 1823 return _response; 1824 } else { 1825 // this can happen if whole response body received together with headers 1826 _response._receiveAsRange.data = _response.responseBody.data; 1827 } 1828 } 1829 1830 close_connection_if_not_keepalive(_stream); 1831 1832 if ( _verbosity >= 1 ) { 1833 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1834 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1835 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1836 } 1837 1838 1839 if ( willFollowRedirect ) { 1840 if ( _history.length >= _maxRedirects ) { 1841 _stream = null; 1842 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1843 } 1844 // "location" in response already checked in canFollowRedirect 1845 immutable new_location = *("location" in _response.responseHeaders); 1846 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1847 1848 immutable get_or_head = _method == "GET" || _method == "HEAD"; 1849 immutable code = _response.code; 1850 1851 // save current response for history 1852 _history ~= _response; 1853 1854 if ( code == 301 ) 1855 { 1856 // permanent redirect and change method 1857 _permanent_redirects[_uri] = new_location; 1858 if ( !get_or_head ) 1859 { 1860 _method = "GET"; 1861 } 1862 } 1863 if ( (code == 302 || code == 303) && !get_or_head) 1864 { 1865 // only change method 1866 _method = "GET"; 1867 } 1868 if ( code == 307 ) 1869 { 1870 // no change method, no permanent 1871 } 1872 if ( code == 308 ) 1873 { 1874 // permanent redirection and do not change method 1875 _permanent_redirects[_uri] = new_location; 1876 } 1877 1878 // prepare new response (for redirected request) 1879 _response = new HTTPResponse; 1880 _response.uri = current_uri; 1881 _response.finalURI = next_uri; 1882 1883 _stream = null; 1884 1885 // set new uri 1886 this._uri = next_uri; 1887 debug(requests) tracef("Redirected to %s", next_uri); 1888 if ( restartedRequest ) { 1889 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1890 restartedRequest = false; 1891 } 1892 if ( _method == "GET") 1893 { 1894 return exec_from_parameters(); 1895 } 1896 goto connect; 1897 } 1898 1899 _response._history = _history; 1900 return _response; 1901 } 1902 1903 HTTPResponse exec_from_multipart_form(MultipartForm form) { 1904 import std.uuid; 1905 import std.file; 1906 1907 _multipartForm = form; 1908 1909 debug(requests) tracef("exec from multipart form"); 1910 1911 NetworkStream _stream; 1912 _response = new HTTPResponse; 1913 _response.uri = _uri; 1914 _response.finalURI = _uri; 1915 bool restartedRequest = false; 1916 1917 connect: 1918 _contentReceived = 0; 1919 _response._startedAt = Clock.currTime; 1920 1921 assert(_stream is null); 1922 1923 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1924 1925 if ( _stream is null ) { 1926 debug(requests) trace("create new connection"); 1927 _stream = setupConnection(); 1928 } else { 1929 debug(requests) trace("reuse old connection"); 1930 } 1931 1932 assert(_stream !is null); 1933 1934 if ( !_stream.isConnected ) { 1935 debug(requests) trace("disconnected stream on enter"); 1936 if ( !restartedRequest ) { 1937 debug(requests) trace("disconnected stream on enter: retry"); 1938 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1939 1940 _cm.del(_uri.scheme, _uri.host, _uri.port); 1941 _stream.close(); 1942 _stream = null; 1943 1944 restartedRequest = true; 1945 goto connect; 1946 } 1947 debug(requests) trace("disconnected stream on enter: return response"); 1948 //_stream = null; 1949 return _response; 1950 } 1951 _response._connectedAt = Clock.currTime; 1952 1953 Appender!string req; 1954 req.put(requestString()); 1955 1956 string boundary = randomUUID().toString; 1957 string[] partHeaders; 1958 size_t contentLength; 1959 1960 foreach(ref part; _multipartForm._sources) { 1961 string h = "--" ~ boundary ~ "\r\n"; 1962 string disposition = `form-data; name="%s"`.format(part.name); 1963 string optionals = part. 1964 parameters.byKeyValue(). 1965 filter!(p => p.key!="Content-Type"). 1966 map! (p => "%s=%s".format(p.key, p.value)). 1967 join("; "); 1968 1969 h ~= `Content-Disposition: ` ~ [disposition, optionals].join("; ") ~ "\r\n"; 1970 1971 auto contentType = "Content-Type" in part.parameters; 1972 if ( contentType ) { 1973 h ~= "Content-Type: " ~ *contentType ~ "\r\n"; 1974 } 1975 1976 h ~= "\r\n"; 1977 partHeaders ~= h; 1978 contentLength += h.length + part.input.getSize() + "\r\n".length; 1979 } 1980 contentLength += "--".length + boundary.length + "--\r\n".length; 1981 1982 auto h = requestHeaders(); 1983 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "multipart/form-data; boundary=" ~ boundary); 1984 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(contentLength)); 1985 1986 h.byKeyValue. 1987 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1988 each!(h => req.put(h)); 1989 req.put("\r\n"); 1990 1991 debug(requests) trace(req.data); 1992 if ( _verbosity >= 1 ) req.data.splitLines.each!(a => writeln("> " ~ a)); 1993 1994 try { 1995 _stream.send(req.data()); 1996 foreach(ref source; _multipartForm._sources) { 1997 debug(requests) tracef("sending part headers <%s>", partHeaders.front); 1998 _stream.send(partHeaders.front); 1999 partHeaders.popFront; 2000 while (true) { 2001 auto chunk = source.input.read(); 2002 if ( chunk.length <= 0 ) { 2003 break; 2004 } 2005 _stream.send(chunk); 2006 } 2007 _stream.send("\r\n"); 2008 } 2009 _stream.send("--" ~ boundary ~ "--\r\n"); 2010 _response._requestSentAt = Clock.currTime; 2011 receiveResponse(_stream); 2012 _response._finishedAt = Clock.currTime; 2013 } 2014 catch (NetworkException e) { 2015 errorf("Error sending request: ", e.msg); 2016 _stream.close(); 2017 return _response; 2018 } 2019 2020 if ( serverPrematurelyClosedConnection() 2021 && !restartedRequest 2022 && isIdempotent(_method) 2023 ) { 2024 /// 2025 /// We didn't receive any data (keepalive connectioin closed?) 2026 /// and we can restart this request. 2027 /// Go ahead. 2028 /// 2029 debug(requests) tracef("Server closed keepalive connection"); 2030 2031 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 2032 2033 _cm.del(_uri.scheme, _uri.host, _uri.port); 2034 _stream.close(); 2035 _stream = null; 2036 2037 restartedRequest = true; 2038 goto connect; 2039 } 2040 2041 if ( _useStreaming ) { 2042 if ( _response._receiveAsRange.activated ) { 2043 debug(requests) trace("streaming_in activated"); 2044 return _response; 2045 } else { 2046 // this can happen if whole response body received together with headers 2047 _response._receiveAsRange.data = _response.responseBody.data; 2048 } 2049 } 2050 2051 close_connection_if_not_keepalive(_stream); 2052 2053 if ( _verbosity >= 1 ) { 2054 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 2055 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 2056 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 2057 } 2058 2059 if ( willFollowRedirect ) { 2060 if ( _history.length >= _maxRedirects ) { 2061 _stream = null; 2062 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 2063 } 2064 // "location" in response already checked in canFollowRedirect 2065 immutable new_location = *("location" in _response.responseHeaders); 2066 immutable current_uri = _uri; 2067 immutable next_uri = uriFromLocation(_uri, new_location); 2068 2069 immutable get_or_head = _method == "GET" || _method == "HEAD"; 2070 immutable code = _response.code; 2071 2072 // save current response for history 2073 _history ~= _response; 2074 2075 if ( code == 301 ) 2076 { 2077 // permanent redirect and change method 2078 _permanent_redirects[_uri] = new_location; 2079 if ( !get_or_head ) 2080 { 2081 _method = "GET"; 2082 } 2083 } 2084 if ( (code == 302 || code == 303) && !get_or_head) 2085 { 2086 // only change method 2087 _method = "GET"; 2088 } 2089 if ( code == 307 ) 2090 { 2091 // no change method, no permanent 2092 } 2093 if ( code == 308 ) 2094 { 2095 // permanent redirection and do not change method 2096 _permanent_redirects[_uri] = new_location; 2097 } 2098 2099 // prepare new response (for redirected request) 2100 _response = new HTTPResponse; 2101 _response.uri = current_uri; 2102 _response.finalURI = next_uri; 2103 _stream = null; 2104 2105 // set new uri 2106 this._uri = next_uri; 2107 debug(requests) tracef("Redirected to %s", next_uri); 2108 if ( restartedRequest ) { 2109 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 2110 restartedRequest = false; 2111 } 2112 if ( _method == "GET") 2113 { 2114 return exec_from_parameters(); 2115 } 2116 goto connect; 2117 } 2118 2119 _response._history = _history; 2120 return _response; 2121 } 2122 2123 HTTPResponse exec_from_parameters() { 2124 2125 debug(requests) tracef("exec from parameters request"); 2126 2127 assert(_uri != URI.init); 2128 NetworkStream _stream; 2129 _response = new HTTPResponse; 2130 _history.length = 0; 2131 _response.uri = _uri; 2132 _response.finalURI = _uri; 2133 bool restartedRequest = false; // True if this is restarted keepAlive request 2134 2135 connect: 2136 if ( _method == "GET" && _uri in _permanent_redirects ) { 2137 debug(requests) trace("use parmanent redirects cache"); 2138 _uri = uriFromLocation(_uri, _permanent_redirects[_uri]); 2139 _response._finalURI = _uri; 2140 } 2141 _contentReceived = 0; 2142 _response._startedAt = Clock.currTime; 2143 2144 assert(_stream is null); 2145 2146 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 2147 2148 if ( _stream is null ) { 2149 debug(requests) trace("create new connection"); 2150 _stream = setupConnection(); 2151 } else { 2152 debug(requests) trace("reuse old connection"); 2153 } 2154 2155 assert(_stream !is null); 2156 2157 if ( !_stream.isConnected ) { 2158 debug(requests) trace("disconnected stream on enter"); 2159 if ( !restartedRequest ) { 2160 debug(requests) trace("disconnected stream on enter: retry"); 2161 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 2162 2163 _cm.del(_uri.scheme, _uri.host, _uri.port); 2164 _stream.close(); 2165 _stream = null; 2166 2167 restartedRequest = true; 2168 goto connect; 2169 } 2170 debug(requests) trace("disconnected stream on enter: return response"); 2171 //_stream = null; 2172 return _response; 2173 } 2174 _response._connectedAt = Clock.currTime; 2175 2176 auto h = requestHeaders(); 2177 2178 Appender!string req; 2179 2180 string encoded; 2181 2182 switch (_method) { 2183 case "POST","PUT": 2184 encoded = params2query(_params); 2185 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "application/x-www-form-urlencoded"); 2186 if ( encoded.length > 0) { 2187 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(encoded.length)); 2188 } 2189 req.put(requestString()); 2190 break; 2191 default: 2192 req.put(requestString(_params)); 2193 } 2194 2195 h.byKeyValue. 2196 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 2197 each!(h => req.put(h)); 2198 req.put("\r\n"); 2199 if ( encoded ) { 2200 req.put(encoded); 2201 } 2202 2203 debug(requests) trace(req.data); 2204 if ( _verbosity >= 1 ) { 2205 req.data.splitLines.each!(a => writeln("> " ~ a)); 2206 } 2207 // 2208 // Now send request and receive response 2209 // 2210 try { 2211 _stream.send(req.data()); 2212 _response._requestSentAt = Clock.currTime; 2213 debug(requests) trace("starting receive response"); 2214 receiveResponse(_stream); 2215 debug(requests) tracef("done receive response"); 2216 _response._finishedAt = Clock.currTime; 2217 } 2218 catch (NetworkException e) { 2219 // On SEND this can means: 2220 // we started to send request to the server, but it closed connection because of keepalive timeout. 2221 // We have to restart request if possible. 2222 2223 // On RECEIVE - if we received something - then this exception is real and unexpected error. 2224 // If we didn't receive anything - we can restart request again as it can be 2225 debug(requests) tracef("Exception on receive response: %s", e.msg); 2226 if ( _response._responseHeaders.length != 0 ) 2227 { 2228 _stream.close(); 2229 throw new RequestException("Unexpected network error"); 2230 } 2231 } 2232 2233 if ( serverPrematurelyClosedConnection() 2234 && !restartedRequest 2235 && isIdempotent(_method) 2236 ) { 2237 /// 2238 /// We didn't receive any data (keepalive connectioin closed?) 2239 /// and we can restart this request. 2240 /// Go ahead. 2241 /// 2242 debug(requests) tracef("Server closed keepalive connection"); 2243 2244 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 2245 2246 _cm.del(_uri.scheme, _uri.host, _uri.port); 2247 _stream.close(); 2248 _stream = null; 2249 2250 restartedRequest = true; 2251 goto connect; 2252 } 2253 2254 if ( _useStreaming ) { 2255 if ( _response._receiveAsRange.activated ) { 2256 debug(requests) trace("streaming_in activated"); 2257 return _response; 2258 } else { 2259 // this can happen if whole response body received together with headers 2260 _response._receiveAsRange.data = _response.responseBody.data; 2261 } 2262 } 2263 2264 close_connection_if_not_keepalive(_stream); 2265 2266 if ( _verbosity >= 1 ) { 2267 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 2268 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 2269 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 2270 } 2271 2272 if ( willFollowRedirect ) { 2273 debug(requests) trace("going to follow redirect"); 2274 if ( _history.length >= _maxRedirects ) { 2275 _stream = null; 2276 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 2277 } 2278 // "location" in response already checked in canFollowRedirect 2279 immutable new_location = *("location" in _response.responseHeaders); 2280 immutable current_uri = _uri; 2281 immutable next_uri = uriFromLocation(_uri, new_location); 2282 2283 immutable get_or_head = _method == "GET" || _method == "HEAD"; 2284 immutable code = _response.code; 2285 2286 // save current response for history 2287 _history ~= _response; 2288 2289 if ( code == 301 ) 2290 { 2291 // permanent redirect and change method 2292 _permanent_redirects[_uri] = new_location; 2293 if ( !get_or_head ) 2294 { 2295 _method = "GET"; 2296 } 2297 } 2298 if ( (code == 302 || code == 303) && !get_or_head) 2299 { 2300 // only change method 2301 _method = "GET"; 2302 } 2303 if ( code == 307 ) 2304 { 2305 // no change method, no permanent 2306 } 2307 if ( code == 308 ) 2308 { 2309 // permanent redirection and do not change method 2310 _permanent_redirects[_uri] = new_location; 2311 } 2312 2313 // prepare new response (for redirected request) 2314 _response = new HTTPResponse; 2315 _response.uri = current_uri; 2316 _response.finalURI = next_uri; 2317 _stream = null; 2318 2319 // set new uri 2320 _uri = next_uri; 2321 debug(requests) tracef("Redirected to %s", next_uri); 2322 //if ( _method != "GET" && _response.code != 307 && _response.code != 308 ) { 2323 // // 307 and 308 do not change method 2324 // return exec_from_parameters(r); 2325 //} 2326 if ( restartedRequest ) { 2327 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 2328 restartedRequest = false; 2329 } 2330 goto connect; 2331 } 2332 2333 _response._history = _history; 2334 return _response; 2335 } 2336 HTTPResponse execute(Request r) 2337 { 2338 _method = r.method; 2339 _uri = r.uri; 2340 _useStreaming = r.useStreaming; 2341 _permanent_redirects = r.permanent_redirects; 2342 _maxRedirects = r.maxRedirects; 2343 _authenticator = r.authenticator; 2344 _maxHeadersLength = r.maxHeadersLength; 2345 _maxContentLength = r.maxContentLength; 2346 _verbosity = r.verbosity; 2347 _keepAlive = r.keepAlive; 2348 _bufferSize = r.bufferSize; 2349 _proxy = r.proxy; 2350 _timeout = r.timeout; 2351 _contentType = r.contentType; 2352 _socketFactory = r.socketFactory; 2353 _sslOptions = r.sslOptions; 2354 _bind = r.bind; 2355 _headers = r.headers; 2356 _userHeaders = r.userHeaders; 2357 2358 _params = r.params; 2359 2360 // this assignments increments refCounts, so we can't use const Request 2361 // but Request is anyway struct and called by-value 2362 _cm = r.cm; 2363 _cookie = r.cookie; 2364 2365 debug(requests) trace("serving %s".format(r)); 2366 if ( !r.postData.empty) 2367 { 2368 return exec_from_range(r.postData); 2369 } 2370 if ( r.hasMultipartForm ) 2371 { 2372 return exec_from_multipart_form(r.multipartForm); 2373 } 2374 auto rs = exec_from_parameters(); 2375 return rs; 2376 } 2377 } 2378 2379 version(vibeD) { 2380 import std.json; 2381 package string httpTestServer() { 2382 return "http://httpbin.org/"; 2383 } 2384 package string fromJsonArrayToStr(JSONValue v) { 2385 return v.str; 2386 } 2387 } 2388 else { 2389 import std.json; 2390 package string httpTestServer() { 2391 return "http://127.0.0.1:8081/"; 2392 } 2393 package string fromJsonArrayToStr(JSONValue v) { 2394 return cast(string)(v.array.map!"cast(ubyte)a.integer".array); 2395 } 2396 }