1 module requests.http; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.ascii; 7 import std.conv; 8 import std.datetime; 9 import std.exception; 10 import std.format; 11 import std.stdio; 12 import std.range; 13 import std..string; 14 import std.traits; 15 import std.typecons; 16 import std.experimental.logger; 17 import core.thread; 18 19 import requests.streams; 20 import requests.uri; 21 import requests.utils; 22 import requests.base; 23 import requests.connmanager; 24 import requests.rangeadapter; 25 26 static immutable ushort[] redirectCodes = [301, 302, 303, 307, 308]; 27 28 enum HTTP11 = 101; 29 enum HTTP10 = 100; 30 31 static immutable string[string] proxies; 32 shared static this() { 33 import std.process; 34 proxies["http"] = environment.get("http_proxy", environment.get("HTTP_PROXY")); 35 proxies["https"] = environment.get("https_proxy", environment.get("HTTPS_PROXY")); 36 proxies["all"] = environment.get("all_proxy", environment.get("ALL_PROXY")); 37 foreach(p; proxies.byKey()) { 38 if (proxies[p] is null) { 39 continue; 40 } 41 URI u = URI(proxies[p]); 42 } 43 } 44 45 public class MaxRedirectsException: Exception { 46 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 47 super(message, file, line, next); 48 } 49 } 50 51 /// 52 /// 53 /// 54 //public auto queryParams(T...)(T params) pure nothrow @safe 55 //{ 56 // static assert (T.length % 2 == 0, "wrong args count"); 57 // 58 // QueryParam[] output; 59 // output.reserve = T.length / 2; 60 // 61 // void queryParamsHelper(T...)(T params, ref QueryParam[] output) 62 // { 63 // static if (T.length > 0) 64 // { 65 // output ~= QueryParam(params[0].to!string, params[1].to!string); 66 // queryParamsHelper(params[2..$], output); 67 // } 68 // } 69 // 70 // queryParamsHelper(params, output); 71 // return output; 72 //} 73 74 /// 75 /// Response - result of request execution. 76 /// 77 /// Response.code - response HTTP code. 78 /// Response.status_line - received HTTP status line. 79 /// Response.responseHeaders - received headers. 80 /// Response.responseBody - container for received body 81 /// Response.history - for redirected responses contain all history 82 /// 83 public class HTTPResponse : Response { 84 private { 85 string _status_line; 86 87 HTTPResponse[] _history; // redirects history 88 89 mixin(Setter!string("status_line")); 90 91 int _version; 92 } 93 94 ~this() { 95 _responseHeaders = null; 96 _history.length = 0; 97 } 98 99 mixin(Getter("status_line")); 100 101 @property final string[string] responseHeaders() @safe @nogc nothrow { 102 return _responseHeaders; 103 } 104 @property final HTTPResponse[] history() @safe @nogc nothrow { 105 return _history; 106 } 107 108 private int parse_version(in string v) pure const nothrow @safe { 109 // try to parse HTTP/1.x to version 110 try if ( v.length > 5 ) { 111 return (v[5..$].split(".").map!"to!int(a)".array[0..2].reduce!((a,b) => a*100 + b)); 112 } catch (Exception e) { 113 } 114 return 0; 115 } 116 unittest { 117 auto r = new HTTPResponse(); 118 assert(r.parse_version("HTTP/1.1") == 101); 119 assert(r.parse_version("HTTP/1.0") == 100); 120 assert(r.parse_version("HTTP/0.9") == 9); 121 assert(r.parse_version("HTTP/xxx") == 0); 122 } 123 } 124 125 /// 126 /// Request. 127 /// Configurable parameters: 128 /// $(B method) - string, method to use (GET, POST, ...) 129 /// $(B headers) - string[string], add any additional headers you'd like to send. 130 /// $(B authenticator) - class Auth, class to send auth headers. 131 /// $(B keepAlive) - bool, set true for keepAlive requests. default true. 132 /// $(B maxRedirects) - uint, maximum number of redirects. default 10. 133 /// $(B maxHeadersLength) - size_t, maximum length of server response headers. default = 32KB. 134 /// $(B maxContentLength) - size_t, maximun content length. delault - 0 = unlimited. 135 /// $(B bufferSize) - size_t, send and receive buffer size. default = 16KB. 136 /// $(B verbosity) - uint, level of verbosity(0 - nothing, 1 - headers, 2 - headers and body progress). default = 0. 137 /// $(B proxy) - string, set proxy url if needed. default - null. 138 /// $(B cookie) - Tuple Cookie, Read/Write cookie You can get cookie setted by server, or set cookies before doing request. 139 /// $(B timeout) - Duration, Set timeout value for connect/receive/send. 140 /// 141 public struct HTTPRequest { 142 private { 143 string _method = "GET"; 144 URI _uri; 145 string[string] _headers; 146 string[] _filteredHeaders; 147 Auth _authenticator; 148 bool _keepAlive = true; 149 uint _maxRedirects = 10; 150 size_t _maxHeadersLength = 32 * 1024; // 32 KB 151 size_t _maxContentLength; // 0 - Unlimited 152 string _proxy; 153 uint _verbosity = 0; // 0 - no output, 1 - headers, 2 - headers+body info 154 Duration _timeout = 30.seconds; 155 size_t _bufferSize = 16*1024; // 16k 156 bool _useStreaming; // return iterator instead of completed request 157 158 HTTPResponse[] _history; // redirects history 159 DataPipe!ubyte _bodyDecoder; 160 DecodeChunked _unChunker; 161 long _contentLength; 162 long _contentReceived; 163 SSLOptions _sslOptions; 164 string _bind; 165 _UH _userHeaders; 166 167 RefCounted!ConnManager _cm; 168 RefCounted!Cookies _cookie; 169 string[URI] _permanent_redirects; // cache 301 redirects for GET requests 170 MultipartForm _multipartForm; 171 172 NetStreamFactory _socketFactory; 173 174 QueryParam[] _params; 175 string _contentType; 176 InputRangeAdapter _postData; 177 } 178 package HTTPResponse _response; 179 180 mixin(Getter_Setter!string ("method")); 181 mixin(Getter_Setter!bool ("keepAlive")); 182 mixin(Getter_Setter!size_t ("maxContentLength")); 183 mixin(Getter_Setter!size_t ("maxHeadersLength")); 184 mixin(Getter_Setter!size_t ("bufferSize")); 185 mixin(Getter_Setter!uint ("maxRedirects")); 186 mixin(Getter_Setter!uint ("verbosity")); 187 mixin(Getter ("proxy")); 188 mixin(Getter_Setter!Duration ("timeout")); 189 mixin(Setter!Auth ("authenticator")); 190 mixin(Getter_Setter!bool ("useStreaming")); 191 mixin(Getter ("contentLength")); 192 mixin(Getter ("contentReceived")); 193 mixin(Getter_Setter!SSLOptions ("sslOptions")); 194 mixin(Getter_Setter!string ("bind")); 195 mixin(Setter!NetStreamFactory ("socketFactory")); 196 197 @property void sslSetVerifyPeer(bool v) pure @safe nothrow @nogc { 198 _sslOptions.setVerifyPeer(v); 199 } 200 @property void sslSetKeyFile(string p, SSLOptions.filetype t = SSLOptions.filetype.pem) pure @safe nothrow @nogc { 201 _sslOptions.setKeyFile(p, t); 202 } 203 @property void sslSetCertFile(string p, SSLOptions.filetype t = SSLOptions.filetype.pem) pure @safe nothrow @nogc { 204 _sslOptions.setCertFile(p, t); 205 } 206 @property void sslSetCaCert(string path) pure @safe nothrow @nogc { 207 _sslOptions.setCaCert(path); 208 } 209 //@property final void cookie(Cookie[] s) pure @safe @nogc nothrow { 210 // _cookie = s; 211 //} 212 @property final void proxy(string v) { 213 if ( v != _proxy ) { 214 _cm.clear(); 215 } 216 _proxy = v; 217 } 218 //@property final Cookie[] cookie() pure @safe @nogc nothrow { 219 // return _cookie; 220 //} 221 222 this(string uri) { 223 _uri = URI(uri); 224 _cm = ConnManager(10); 225 } 226 ~this() { 227 _headers = null; 228 _authenticator = null; 229 _history = null; 230 _bodyDecoder = null; 231 _unChunker = null; 232 //if ( _cm ) { 233 // _cm.clear(); 234 //} 235 } 236 string toString() const { 237 return "HTTPRequest(%s, %s)".format(_method, _uri.uri()); 238 } 239 string format(string fmt) const { 240 import std.array; 241 import std.stdio; 242 auto a = appender!string(); 243 auto f = FormatSpec!char(fmt); 244 while (f.writeUpToNextSpec(a)) { 245 switch(f.spec) { 246 case 'h': 247 // Remote hostname. 248 a.put(_uri.host); 249 break; 250 case 'm': 251 // method. 252 a.put(_method); 253 break; 254 case 'p': 255 // Remote port. 256 a.put("%d".format(_uri.port)); 257 break; 258 case 'P': 259 // Path 260 a.put(_uri.path); 261 break; 262 case 'q': 263 // query parameters supplied with url. 264 a.put(_uri.query); 265 break; 266 case 'U': 267 a.put(_uri.uri()); 268 break; 269 default: 270 throw new FormatException("Unknown Request format spec " ~ f.spec); 271 } 272 } 273 return a.data(); 274 } 275 string select_proxy(string scheme) { 276 if ( _proxy is null && proxies.length == 0 ) { 277 debug(requests) tracef("proxy=null"); 278 return null; 279 } 280 if ( _proxy ) { 281 debug(requests) tracef("proxy=%s", _proxy); 282 return _proxy; 283 } 284 auto p = scheme in proxies; 285 if ( p !is null && *p != "") { 286 debug(requests) tracef("proxy=%s", *p); 287 return *p; 288 } 289 p = "all" in proxies; 290 if ( p !is null && *p != "") { 291 debug(requests) tracef("proxy=%s", *p); 292 return *p; 293 } 294 debug(requests) tracef("proxy=null"); 295 return null; 296 } 297 void clearHeaders() { 298 _headers = null; 299 } 300 @property void uri(in URI newURI) { 301 //handleURLChange(_uri, newURI); 302 _uri = newURI; 303 } 304 /// Add headers to request 305 /// Params: 306 /// headers = headers to send. 307 void addHeaders(in string[string] headers) { 308 foreach(pair; headers.byKeyValue) { 309 string _h = pair.key; 310 switch(toLower(_h)) { 311 case "host": 312 _userHeaders.Host = true; 313 break; 314 case "user-agent": 315 _userHeaders.UserAgent = true; 316 break; 317 case "content-length": 318 _userHeaders.ContentLength = true; 319 break; 320 case "content-type": 321 _userHeaders.ContentType = true; 322 break; 323 case "connection": 324 _userHeaders.Connection = true; 325 break; 326 case "cookie": 327 _userHeaders.Cookie = true; 328 break; 329 default: 330 break; 331 } 332 _headers[pair.key] = pair.value; 333 } 334 } 335 private void safeSetHeader(ref string[string] headers, bool userAdded, string h, string v) pure @safe { 336 if ( !userAdded ) { 337 headers[h] = v; 338 } 339 } 340 /// Remove headers from request 341 /// Params: 342 /// headers = headers to remove. 343 void removeHeaders(in string[] headers) pure { 344 _filteredHeaders ~= headers; 345 } 346 /// 347 /// compose headers to send 348 /// 349 private string[string] requestHeaders() { 350 351 string[string] generatedHeaders; 352 353 if ( _authenticator ) { 354 _authenticator. 355 authHeaders(_uri.host). 356 byKeyValue. 357 each!(pair => generatedHeaders[pair.key] = pair.value); 358 } 359 360 _headers.byKey.each!(h => generatedHeaders[h] = _headers[h]); 361 362 safeSetHeader(generatedHeaders, _userHeaders.AcceptEncoding, "Accept-Encoding", "gzip,deflate"); 363 safeSetHeader(generatedHeaders, _userHeaders.UserAgent, "User-Agent", "dlang-requests"); 364 safeSetHeader(generatedHeaders, _userHeaders.Connection, "Connection", _keepAlive?"Keep-Alive":"Close"); 365 366 if ( !_userHeaders.Host ) 367 { 368 generatedHeaders["Host"] = _uri.host; 369 if ( _uri.scheme !in standard_ports || _uri.port != standard_ports[_uri.scheme] ) { 370 generatedHeaders["Host"] ~= ":%d".format(_uri.port); 371 } 372 } 373 if ( _cookie._map.length && !_userHeaders.Cookie ) { 374 string[] cookie_strings; 375 foreach(pair; _cookie._map.byPair) { 376 string cookie_name = pair.key; 377 auto cookie = pair.value; 378 if ( _uri.path.pathMatches(cookie.path) && _uri.host.domainMatches(cookie.domain) ) { 379 cookie_strings ~= "%s=%s".format(cookie_name, cookie.value); 380 } 381 } 382 if ( cookie_strings.length > 0 ) { 383 generatedHeaders["Cookie"] = cookie_strings.join("; "); 384 } 385 } 386 387 _filteredHeaders.each!(h => generatedHeaders.remove(h)); 388 389 return generatedHeaders; 390 } 391 /// 392 /// Build request string. 393 /// Handle proxy and query parameters. 394 /// 395 private @property string requestString(QueryParam[] params = null) { 396 auto query = _uri.query.dup; 397 if ( params ) { 398 query ~= "&" ~ params2query(params); 399 if ( query[0] != '?' ) { 400 query = "?" ~ query; 401 } 402 } 403 string actual_proxy = select_proxy(_uri.scheme); 404 if ( actual_proxy && _uri.scheme != "https" ) { 405 return "%s %s%s HTTP/1.1\r\n".format(_method, _uri.uri(No.params), query); 406 } 407 return "%s %s%s HTTP/1.1\r\n".format(_method, _uri.path, query); 408 } 409 /// 410 /// encode parameters and build query part of the url 411 /// 412 private static string params2query(in QueryParam[] params) pure @safe { 413 return params. 414 map!(a => "%s=%s".format(a.key.urlEncoded, a.value.urlEncoded)). 415 join("&"); 416 } 417 // 418 package unittest { 419 assert(params2query(queryParams("a","b", "c", " d "))=="a=b&c=%20d%20"); 420 } 421 /// 422 /// Analyze received headers, take appropriate actions: 423 /// check content length, attach unchunk and uncompress 424 /// 425 private void analyzeHeaders(in string[string] headers) { 426 427 _contentLength = -1; 428 _unChunker = null; 429 auto contentLength = "content-length" in headers; 430 if ( contentLength ) { 431 try { 432 string l = *contentLength; 433 _contentLength = parse!long(l); 434 // TODO: maybe add a strict mode that checks if l was parsed completely 435 if ( _maxContentLength && _contentLength > _maxContentLength) { 436 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 437 format(_contentLength, _maxContentLength)); 438 } 439 } catch (ConvException e) { 440 throw new RequestException("Can't convert Content-Length from %s".format(*contentLength)); 441 } 442 } 443 auto transferEncoding = "transfer-encoding" in headers; 444 if ( transferEncoding ) { 445 debug(requests) tracef("transferEncoding: %s", *transferEncoding); 446 if ( (*transferEncoding).toLower == "chunked") { 447 _unChunker = new DecodeChunked(); 448 _bodyDecoder.insert(_unChunker); 449 } 450 } 451 auto contentEncoding = "content-encoding" in headers; 452 if ( contentEncoding ) switch (*contentEncoding) { 453 default: 454 throw new RequestException("Unknown content-encoding " ~ *contentEncoding); 455 case "gzip": 456 case "deflate": 457 _bodyDecoder.insert(new Decompressor!ubyte); 458 } 459 460 } 461 /// 462 /// Called when we know that all headers already received in buffer. 463 /// This routine does not interpret headers content (see analyzeHeaders). 464 /// 1. Split headers on lines 465 /// 2. store status line, store response code 466 /// 3. unfold headers if needed 467 /// 4. store headers 468 /// 469 private void parseResponseHeaders(in ubyte[] input, string lineSep) { 470 string lastHeader; 471 auto buffer = cast(string)input; 472 473 foreach(line; buffer.split(lineSep).map!(l => l.stripRight)) { 474 if ( ! _response.status_line.length ) { 475 debug (requests) tracef("statusLine: %s", line); 476 _response.status_line = line; 477 if ( _verbosity >= 1 ) { 478 writefln("< %s", line); 479 } 480 auto parsed = line.split(" "); 481 if ( parsed.length >= 2 ) { 482 _response.code = parsed[1].to!ushort; 483 _response._version = _response.parse_version(parsed[0]); 484 } 485 continue; 486 } 487 if ( line[0] == ' ' || line[0] == '\t' ) { 488 // unfolding https://tools.ietf.org/html/rfc822#section-3.1 489 if ( auto stored = lastHeader in _response._responseHeaders) { 490 *stored ~= line; 491 } 492 continue; 493 } 494 auto parsed = line.findSplit(":"); 495 auto header = parsed[0].toLower; 496 auto value = parsed[2].strip; 497 498 if ( _verbosity >= 1 ) { 499 writefln("< %s: %s", header, value); 500 } 501 502 lastHeader = header; 503 debug (requests) tracef("Header %s = %s", header, value); 504 505 if ( header != "set-cookie" ) { 506 auto stored = _response.responseHeaders.get(header, null); 507 if ( stored ) { 508 value = stored ~ "," ~ value; 509 } 510 _response._responseHeaders[header] = value; 511 continue; 512 } 513 auto cookie = processCookie(value); 514 debug(requests) tracef("store cookie[%s]=%s", cookie.name, cookie.data); 515 _cookie._map[cookie.name] = cookie.data; 516 } 517 } 518 519 /// 520 /// Process Set-Cookie header from server response 521 /// 522 private auto processCookie(string value ) pure { 523 // cookie processing 524 // 525 // Separate processing as we can't join several set-cookie lines in single line (as other headers) 526 // < Set-Cookie: idx=7f2800f63c112a65ef5082957bcca24b; expires=Mon, 29-May-2017 00:31:25 GMT; path=/; domain=example.com 527 // < Set-Cookie: idx=7f2800f63c112a65ef5082957bcca24b; expires=Mon, 29-May-2017 00:31:25 GMT; path=/; domain=example.com, cs=ip764-RgKqc-HvSkxRxdQQAKW8LA; path=/; domain=.example.com; HttpOnly 528 // 529 string name; 530 Cookie cookie; 531 auto fields = value.split(";").map!strip; 532 auto c = fields.front.findSplit("="); 533 name = c[0]; 534 cookie.value = c[2]; 535 fields.popFront; 536 while(!fields.empty) { 537 auto s = fields.front.findSplit("="); 538 fields.popFront; 539 if ( s[1] != "=" ) { 540 continue; 541 } 542 auto k = s[0]; 543 auto v = s[2]; 544 switch(k.toLower()) { 545 case "domain": 546 cookie.domain = v; 547 break; 548 case "path": 549 cookie.path = v; 550 break; 551 case "expires": 552 continue; 553 case "max-age": 554 continue; 555 default: 556 continue; 557 } 558 } 559 if ( cookie.domain == "" ) { 560 cookie.domain = _uri.host; 561 } 562 if ( cookie.path == "" ) { 563 cookie.path = _uri.path; 564 } 565 return Tuple!(string, "name", Cookie, "data")(name, cookie); 566 } 567 568 private bool willFollowRedirect() { 569 if ( !canFind(redirectCodes, _response.code) ) { 570 return false; 571 } 572 if ( !_maxRedirects ) { 573 return false; 574 } 575 if ( "location" !in _response.responseHeaders ) { 576 return false; 577 } 578 return true; 579 } 580 private URI uriFromLocation(const ref URI uri, in string location) { 581 URI newURI = uri; 582 try { 583 newURI = URI(location); 584 } catch (UriException e) { 585 debug(requests) trace("Can't parse Location:, try relative uri"); 586 newURI.path = location; 587 newURI.uri = newURI.recalc_uri; 588 } 589 return newURI; 590 } 591 /// 592 /// if we have new uri, then we need to check if we have to reopen existent connection 593 /// 594 private void checkURL(string url, string file=__FILE__, size_t line=__LINE__) { 595 if (url is null && _uri.uri == "" ) { 596 throw new RequestException("No url configured", file, line); 597 } 598 599 if ( url !is null ) { 600 URI newURI = URI(url); 601 //handleURLChange(_uri, newURI); 602 _uri = newURI; 603 } 604 } 605 /// 606 /// Setup connection. Handle proxy and https case 607 /// 608 /// Place new connection in ConnManager cache 609 /// 610 private NetworkStream setupConnection() 611 do { 612 613 debug(requests) tracef("Set up new connection"); 614 NetworkStream stream; 615 616 if ( _socketFactory ) 617 { 618 debug(requests) tracef("use socketFactory"); 619 stream = _socketFactory(_uri.scheme, _uri.host, _uri.port); 620 } 621 622 if ( stream ) // socket factory created connection 623 { 624 return stream; 625 } 626 627 URI uri; // this URI will be used temporarry if we need proxy 628 string actual_proxy = select_proxy(_uri.scheme); 629 final switch (_uri.scheme) { 630 case"http": 631 if ( actual_proxy ) { 632 uri.uri_parse(actual_proxy); 633 uri.idn_encode(); 634 } else { 635 // use original uri 636 uri = _uri; 637 } 638 stream = new TCPStream(); 639 stream.bind(_bind); 640 stream.connect(uri.host, uri.port, _timeout); 641 break ; 642 case"https": 643 if ( actual_proxy ) { 644 uri.uri_parse(actual_proxy); 645 uri.idn_encode(); 646 stream = new TCPStream(); 647 stream.bind(_bind); 648 stream.connect(uri.host, uri.port, _timeout); 649 if ( verbosity>=1 ) { 650 writeln("> CONNECT %s:%d HTTP/1.1".format(_uri.host, _uri.port)); 651 } 652 stream.send("CONNECT %s:%d HTTP/1.1\r\n\r\n".format(_uri.host, _uri.port)); 653 while ( stream.isConnected ) { 654 ubyte[1024] b; 655 auto read = stream.receive(b); 656 if ( verbosity>=1) { 657 writefln("< %s", cast(string)b[0..read]); 658 } 659 debug(requests) tracef("read: %d", read); 660 if ( b[0..read].canFind("\r\n\r\n") || b[0..read].canFind("\n\n") ) { 661 debug(requests) tracef("proxy connection ready"); 662 // convert connection to ssl 663 stream = new SSLStream(stream, _sslOptions, _uri.host); 664 break ; 665 } else { 666 debug(requests) tracef("still wait for proxy connection"); 667 } 668 } 669 } else { 670 uri = _uri; 671 stream = new SSLStream(_sslOptions); 672 stream.bind(_bind); 673 stream.connect(uri.host, uri.port, _timeout); 674 debug(requests) tracef("ssl connection to origin server ready"); 675 } 676 break ; 677 } 678 679 return stream; 680 } 681 /// 682 /// Request sent, now receive response. 683 /// Find headers, split on headers and body, continue to receive body 684 /// 685 private void receiveResponse(NetworkStream _stream) { 686 687 try { 688 _stream.readTimeout = timeout; 689 } catch (Exception e) { 690 debug(requests) tracef("Failed to set read timeout for stream: %s", e.msg); 691 return; 692 } 693 // Commented this out as at exit we can have alreade closed socket 694 // scope(exit) { 695 // if ( _stream && _stream.isOpen ) { 696 // _stream.readTimeout = 0.seconds; 697 // } 698 // } 699 700 _bodyDecoder = new DataPipe!ubyte(); 701 scope(exit) { 702 if ( !_useStreaming ) { 703 _bodyDecoder = null; 704 _unChunker = null; 705 } 706 } 707 708 auto buffer = Buffer!ubyte(); 709 Buffer!ubyte partialBody; 710 ptrdiff_t read; 711 string lineSep = null, headersEnd = null; 712 bool headersHaveBeenReceived; 713 714 while( !headersHaveBeenReceived ) { 715 716 auto b = new ubyte[_bufferSize]; 717 read = _stream.receive(b); 718 719 debug(requests) tracef("read: %d", read); 720 if ( read == 0 ) { 721 break; 722 } 723 auto data = b[0..read]; 724 buffer.putNoCopy(data); 725 if ( verbosity>=3 ) { 726 writeln(data.dump.join("\n")); 727 } 728 729 if ( buffer.length > maxHeadersLength ) { 730 throw new RequestException("Headers length > maxHeadersLength (%d > %d)".format(buffer.length, maxHeadersLength)); 731 } 732 733 // Proper HTTP uses "\r\n" as a line separator, but broken servers sometimes use "\n". 734 // Servers that use "\r\n" might have "\n" inside a header. 735 // For any half-sane server, the first '\n' should be at the end of the status line, so this can be used to detect the line separator. 736 // In any case, all the interesting points in the header for now are at '\n' characters, so scan the newly read data for them. 737 foreach (idx; buffer.length-read..buffer.length) 738 { 739 if ( buffer[idx] == '\n' ) 740 { 741 if ( lineSep is null ) 742 { 743 // First '\n'. Detect line/header endings. 744 // HTTP header sections end with a double line separator 745 lineSep = "\n"; 746 headersEnd = "\n\n"; 747 if ( idx > 0 && buffer[idx-1] == '\r' ) 748 { 749 lineSep = "\r\n"; 750 headersEnd = "\r\n\r\n"; 751 } 752 } 753 else 754 { 755 // Potential header ending. 756 if ( buffer.data[0..idx+1].endsWith(headersEnd) ) 757 { 758 auto ResponseHeaders = buffer.data[0..idx+1-headersEnd.length]; 759 partialBody = buffer[idx+1..$]; 760 _contentReceived += partialBody.length; 761 parseResponseHeaders(ResponseHeaders, lineSep); 762 headersHaveBeenReceived = true; 763 break; 764 } 765 } 766 } 767 } 768 } 769 770 analyzeHeaders(_response._responseHeaders); 771 772 _bodyDecoder.putNoCopy(partialBody.data); 773 774 auto v = _bodyDecoder.get(); 775 _response._responseBody.putNoCopy(v); 776 777 // https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4 778 if ( (_method == "HEAD") || responseMustNotIncludeBody(_response.code) || (_contentLength < 0 && _unChunker is null) ) 779 { 780 debug(requests) tracef("response without body"); 781 return; 782 } 783 784 _response._contentLength = _contentLength; 785 _response._contentReceived = _contentReceived; 786 787 if ( _verbosity >= 2 ) writefln("< %d bytes of body received", partialBody.length); 788 789 while( true ) { 790 if ( _contentLength >= 0 && _contentReceived >= _contentLength ) { 791 debug(requests) trace("Body received."); 792 break; 793 } 794 if ( _unChunker && _unChunker.done ) { 795 break; 796 } 797 798 if ( _useStreaming && _response._responseBody.length && !redirectCodes.canFind(_response.code) ) { 799 debug(requests) trace("streaming requested"); 800 // save _stream in closure 801 auto __stream = _stream; 802 auto __bodyDecoder = _bodyDecoder; 803 auto __unChunker = _unChunker; 804 auto __contentReceived = _contentReceived; 805 auto __contentLength = _contentLength; 806 auto __bufferSize = _bufferSize; 807 auto __response = _response; 808 auto __verbosity = _verbosity; 809 auto __uri = _uri; 810 auto __cm = _cm; 811 812 // set up response 813 _response._contentLength = _contentLength; 814 _response.receiveAsRange.activated = true; 815 _response.receiveAsRange.data = _response._responseBody.data; 816 _response.receiveAsRange.read = delegate ubyte[] () { 817 818 while(true) { 819 // check if we received everything we need 820 if ( ( __unChunker && __unChunker.done ) 821 || !__stream.isConnected() 822 || (__contentLength > 0 && __contentReceived >= __contentLength) ) 823 { 824 debug(requests) trace("streaming_in receive completed"); 825 __bodyDecoder.flush(); 826 if (_stream && _stream.isConnected) { 827 // return to pool 828 __cm.put(__uri.scheme, __uri.host, __uri.port, __stream); 829 } else { 830 _stream.close(); 831 } 832 return __bodyDecoder.get(); 833 } 834 // have to continue 835 auto b = new ubyte[__bufferSize]; 836 try { 837 read = __stream.receive(b); 838 } 839 catch (Exception e) { 840 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 841 } 842 debug(requests) tracef("streaming_in received %d bytes", read); 843 844 if ( read == 0 ) { 845 debug(requests) tracef("streaming_in: server closed connection"); 846 _stream.close(); 847 __bodyDecoder.flush(); 848 return __bodyDecoder.get(); 849 } 850 851 if ( __verbosity>=3 ) { 852 writeln(b[0..read].dump.join("\n")); 853 } 854 __response._contentReceived += read; 855 __contentReceived += read; 856 __bodyDecoder.putNoCopy(b[0..read]); 857 auto res = __bodyDecoder.getNoCopy(); 858 if ( res.length == 0 ) { 859 // there were nothing to produce (beginning of the chunk or no decompressed data) 860 continue; 861 } 862 if (res.length == 1) { 863 return res[0]; 864 } 865 // 866 // I'd like to "return _bodyDecoder.getNoCopy().join;" but it is slower 867 // 868 auto total = res.map!(b=>b.length).sum; 869 // create buffer for joined bytes 870 ubyte[] joined = new ubyte[total]; 871 size_t p; 872 // memcopy 873 foreach(ref _; res) { 874 joined[p .. p + _.length] = _; 875 p += _.length; 876 } 877 return joined; 878 } 879 assert(0); 880 }; 881 // we prepared for streaming 882 return; 883 } 884 885 auto b = new ubyte[_bufferSize]; 886 read = _stream.receive(b); 887 888 if ( read == 0 ) { 889 debug(requests) trace("read done"); 890 break; 891 } 892 if ( _verbosity >= 2 ) { 893 writefln("< %d bytes of body received", read); 894 } 895 896 if ( verbosity>=3 ) { 897 writeln(b[0..read].dump.join("\n")); 898 } 899 900 debug(requests) tracef("read: %d", read); 901 _contentReceived += read; 902 if ( _maxContentLength && _contentReceived > _maxContentLength ) { 903 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 904 format(_contentLength, _maxContentLength)); 905 } 906 907 _bodyDecoder.putNoCopy(b[0..read]); // send buffer to all decoders 908 909 _bodyDecoder.getNoCopy. // fetch result and place to body 910 each!(b => _response._responseBody.putNoCopy(b)); 911 912 debug(requests) tracef("receivedTotal: %d, contentLength: %d, bodyLength: %d", _contentReceived, _contentLength, _response._responseBody.length); 913 914 } 915 _bodyDecoder.flush(); 916 _response._responseBody.putNoCopy(_bodyDecoder.get()); 917 _response._contentReceived = _contentReceived; 918 } 919 /// 920 /// Check that we received anything. 921 /// Server can close previous connection (keepalive or not) 922 /// 923 private bool serverPrematurelyClosedConnection() pure @safe { 924 immutable server_closed_connection = _response._responseHeaders.length == 0 && _response._status_line.length == 0; 925 // debug(requests) tracef("server closed connection = %s (headers.length=%s, status_line.length=%s)", 926 // server_closed_connection, _response._responseHeaders.length, _response._status_line.length); 927 return server_closed_connection; 928 } 929 private bool isIdempotent(in string method) pure @safe nothrow { 930 return ["GET", "HEAD"].canFind(method); 931 } 932 /// 933 /// If we do not want keepalive request, 934 /// or server signalled to close connection, 935 /// then close it 936 /// 937 void close_connection_if_not_keepalive(NetworkStream _stream) { 938 auto connection = "connection" in _response._responseHeaders; 939 if ( !_keepAlive ) { 940 _stream.close(); 941 } else switch(_response._version) { 942 case HTTP11: 943 // HTTP/1.1 defines the "close" connection option for the sender to signal that the connection 944 // will be closed after completion of the response. For example, 945 // Connection: close 946 // in either the request or the response header fields indicates that the connection 947 // SHOULD NOT be considered `persistent' (section 8.1) after the current request/response is complete. 948 // HTTP/1.1 applications that do not support persistent connections MUST include the "close" connection 949 // option in every message. 950 if ( connection && (*connection).toLower.split(",").canFind("close") ) { 951 _stream.close(); 952 } 953 break; 954 default: 955 // for anything else close connection if there is no keep-alive in Connection 956 if ( connection && !(*connection).toLower.split(",").canFind("keep-alive") ) { 957 _stream.close(); 958 } 959 break; 960 } 961 } 962 963 private void sendFlattenContent(T)(NetworkStream _stream, T content) { 964 while ( !content.empty ) { 965 auto chunk = content.front; 966 _stream.send(chunk); 967 content.popFront; 968 } 969 debug(requests) tracef("sent"); 970 } 971 // we use this if we send from ubyte[][] as chunked content 972 private void sendChunkedContent(T)(NetworkStream _stream, T content) { 973 while ( !content.empty ) { 974 auto chunk = content.front; 975 auto chunkHeader = "%x\r\n".format(chunk.length); 976 debug(requests) tracef("sending %s%s", chunkHeader, chunk); 977 _stream.send(chunkHeader); 978 _stream.send(chunk); 979 _stream.send("\r\n"); 980 content.popFront; 981 } 982 debug(requests) tracef("sent"); 983 _stream.send("0\r\n\r\n"); 984 } 985 /// 986 /// POST/PUT/... data from some string(with Content-Length), or from range of strings/bytes (use Transfer-Encoding: chunked). 987 /// When rank 1 (flat array) used as content it must have length. In that case "content" will be sent directly to network, and Content-Length headers will be added. 988 /// If you are goung to send some range and do not know length at the moment when you start to send request, then you can send chunks of chars or ubyte. 989 /// Try not to send too short chunks as this will put additional load on client and server. Chunks of length 2048 or 4096 are ok. 990 /// 991 /// Parameters: 992 /// url = url 993 /// content = string or input range 994 /// contentType = content type 995 /// Returns: 996 /// Response 997 /// Examples: 998 /// --------------------------------------------------------------------------------------------------------- 999 /// rs = rq.exec!"POST"("http://httpbin.org/post", "привiт, свiт!", "application/octet-stream"); 1000 /// 1001 /// auto s = lineSplitter("one,\ntwo,\nthree."); 1002 /// rs = rq.exec!"POST"("http://httpbin.org/post", s, "application/octet-stream"); 1003 /// 1004 /// auto s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; 1005 /// rs = rq.exec!"POST"("http://httpbin.org/post", s.representation.chunks(10), "application/octet-stream"); 1006 /// 1007 /// auto f = File("tests/test.txt", "rb"); 1008 /// rs = rq.exec!"POST"("http://httpbin.org/post", f.byChunk(3), "application/octet-stream"); 1009 /// -------------------------------------------------------------------------------------------------------- 1010 /// Send request with parameters. 1011 /// If used for POST or PUT requests then application/x-www-form-urlencoded used. 1012 /// Request parameters will be encoded into request string or placed in request body for POST/PUT 1013 /// requests. 1014 /// Parameters: 1015 /// url = url 1016 /// params = request parameters 1017 /// Returns: 1018 /// Response 1019 /// Examples: 1020 /// --------------------------------------------------------------------------------- 1021 /// rs = Request().exec!"GET"("http://httpbin.org/get", ["c":"d", "a":"b"]); 1022 /// --------------------------------------------------------------------------------- 1023 /// 1024 1025 /// WRAPPERS 1026 /// 1027 /// send file(s) using POST and multipart form. 1028 /// This wrapper will be deprecated, use post with MultipartForm - it is more general and clear. 1029 /// Parameters: 1030 /// url = url 1031 /// files = array of PostFile structures 1032 /// Returns: 1033 /// Response 1034 /// Each PostFile structure contain path to file, and optional field name and content type. 1035 /// If no field name provided, then basename of the file will be used. 1036 /// application/octet-stream is default when no content type provided. 1037 /// Example: 1038 /// --------------------------------------------------------------- 1039 /// PostFile[] files = [ 1040 /// {fileName:"tests/abc.txt", fieldName:"abc", contentType:"application/octet-stream"}, 1041 /// {fileName:"tests/test.txt"} 1042 /// ]; 1043 /// rs = rq.exec!"POST"("http://httpbin.org/post", files); 1044 /// --------------------------------------------------------------- 1045 /// 1046 1047 import requests.request; 1048 1049 // we use this if we send from ubyte[][] and user provided Content-Length 1050 private void sendFlattenContent(NetworkStream _stream) { 1051 while ( !_postData.empty ) { 1052 auto chunk = _postData.front; 1053 _stream.send(chunk); 1054 _postData.popFront; 1055 } 1056 debug(requests) tracef("sent"); 1057 } 1058 // we use this if we send from ubyte[][] as chunked content 1059 private void sendChunkedContent(NetworkStream _stream) { 1060 while ( !_postData.empty ) { 1061 auto chunk = _postData.front; 1062 auto chunkHeader = "%x\r\n".format(chunk.length); 1063 debug(requests) tracef("sending %s%s", chunkHeader, cast(string)chunk); 1064 _stream.send(chunkHeader); 1065 _stream.send(chunk); 1066 _stream.send("\r\n"); 1067 debug(requests) tracef("chunk sent"); 1068 _postData.popFront; 1069 } 1070 debug(requests) tracef("sent"); 1071 _stream.send("0\r\n\r\n"); 1072 } 1073 1074 HTTPResponse exec_from_range(InputRangeAdapter postData) 1075 do { 1076 1077 _postData = postData; 1078 1079 debug(requests) tracef("exec from range"); 1080 1081 NetworkStream _stream; 1082 _response = new HTTPResponse; 1083 _history.length = 0; 1084 _response.uri = _uri; 1085 _response.finalURI = _uri; 1086 bool restartedRequest = false; 1087 bool send_flat; 1088 1089 connect: 1090 _contentReceived = 0; 1091 _response._startedAt = Clock.currTime; 1092 1093 assert(_stream is null); 1094 1095 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1096 1097 if ( _stream is null ) { 1098 debug(requests) trace("create new connection"); 1099 _stream = setupConnection(); 1100 } else { 1101 debug(requests) trace("reuse old connection"); 1102 } 1103 1104 assert(_stream !is null); 1105 1106 if ( !_stream.isConnected ) { 1107 debug(requests) trace("disconnected stream on enter"); 1108 if ( !restartedRequest ) { 1109 debug(requests) trace("disconnected stream on enter: retry"); 1110 assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1111 1112 // _cm.del(_uri.scheme, _uri.host, _uri.port, _stream); 1113 _stream.close(); 1114 _stream = null; 1115 1116 restartedRequest = true; 1117 goto connect; 1118 } 1119 debug(requests) trace("disconnected stream on enter: return response"); 1120 //_stream = null; 1121 return _response; 1122 } 1123 _response._connectedAt = Clock.currTime; 1124 1125 Appender!string req; 1126 req.put(requestString()); 1127 1128 auto h = requestHeaders; 1129 if ( _contentType ) { 1130 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", _contentType); 1131 } 1132 1133 if ( _postData.length >= 0 ) 1134 { 1135 // we know t 1136 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(_postData.length)); 1137 } 1138 1139 if ( _userHeaders.ContentLength || "Content-Length" in h ) 1140 { 1141 debug(requests) tracef("User provided content-length for chunked content"); 1142 send_flat = true; 1143 } 1144 else 1145 { 1146 h["Transfer-Encoding"] = "chunked"; 1147 send_flat = false; 1148 } 1149 h.byKeyValue. 1150 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1151 each!(h => req.put(h)); 1152 req.put("\r\n"); 1153 1154 debug(requests) tracef("send <%s>", req.data); 1155 if ( _verbosity >= 1 ) { 1156 req.data.splitLines.each!(a => writeln("> " ~ a)); 1157 } 1158 1159 try { 1160 // send headers 1161 _stream.send(req.data()); 1162 // send body 1163 if ( send_flat ) { 1164 sendFlattenContent(_stream); 1165 } else { 1166 sendChunkedContent(_stream); 1167 } 1168 _response._requestSentAt = Clock.currTime; 1169 debug(requests) trace("starting receive response"); 1170 receiveResponse(_stream); 1171 debug(requests) trace("finished receive response"); 1172 _response._finishedAt = Clock.currTime; 1173 } 1174 catch (NetworkException e) 1175 { 1176 _stream.close(); 1177 throw new RequestException("Network error during data exchange"); 1178 } 1179 if ( serverPrematurelyClosedConnection() 1180 && !restartedRequest 1181 && isIdempotent(_method) 1182 ) { 1183 /// 1184 /// We didn't receive any data (keepalive connectioin closed?) 1185 /// and we can restart this request. 1186 /// Go ahead. 1187 /// 1188 debug(requests) tracef("Server closed keepalive connection"); 1189 1190 //assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1191 1192 //_cm.del(_uri.scheme, _uri.host, _uri.port, _stream); 1193 _stream.close(); 1194 _stream = null; 1195 1196 restartedRequest = true; 1197 goto connect; 1198 } 1199 1200 if ( _useStreaming ) { 1201 if ( _response._receiveAsRange.activated ) { 1202 debug(requests) trace("streaming_in activated"); 1203 return _response; 1204 } else { 1205 // this can happen if whole response body received together with headers 1206 _response._receiveAsRange.data = _response.responseBody.data; 1207 } 1208 } 1209 1210 close_connection_if_not_keepalive(_stream); 1211 1212 if ( _verbosity >= 1 ) { 1213 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1214 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1215 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1216 } 1217 1218 1219 if ( willFollowRedirect ) { 1220 if ( _history.length >= _maxRedirects ) { 1221 _stream = null; 1222 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1223 } 1224 // "location" in response already checked in canFollowRedirect 1225 immutable new_location = *("location" in _response.responseHeaders); 1226 immutable current_uri = _uri, next_uri = uriFromLocation(_uri, new_location); 1227 1228 immutable get_or_head = _method == "GET" || _method == "HEAD"; 1229 immutable code = _response.code; 1230 1231 // save current response for history 1232 _history ~= _response; 1233 1234 if ( code == 301 ) 1235 { 1236 // permanent redirect and change method 1237 _permanent_redirects[_uri] = new_location; 1238 if ( !get_or_head ) 1239 { 1240 _method = "GET"; 1241 } 1242 } 1243 if ( (code == 302 || code == 303) && !get_or_head) 1244 { 1245 // only change method 1246 _method = "GET"; 1247 } 1248 if ( code == 307 ) 1249 { 1250 // no change method, no permanent 1251 } 1252 if ( code == 308 ) 1253 { 1254 // permanent redirection and do not change method 1255 _permanent_redirects[_uri] = new_location; 1256 } 1257 1258 // prepare new response (for redirected request) 1259 _response = new HTTPResponse; 1260 _response.uri = current_uri; 1261 _response.finalURI = next_uri; 1262 1263 _stream = null; 1264 1265 // set new uri 1266 this._uri = next_uri; 1267 debug(requests) tracef("Redirected to %s", next_uri); 1268 if ( restartedRequest ) { 1269 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1270 restartedRequest = false; 1271 } 1272 if ( _method == "GET") 1273 { 1274 return exec_from_parameters(); 1275 } 1276 goto connect; 1277 } 1278 1279 _response._history = _history; 1280 if (_stream && _stream.isConnected) { 1281 // return to pool 1282 _cm.put(_uri.scheme, _uri.host, _uri.port, _stream); 1283 } 1284 return _response; 1285 } 1286 1287 /// 1288 /// Send multipart request. 1289 /// You would like to use this method for sending large portions of mixed data or uploading files to forms. 1290 /// Content of the posted form consist of sources. Each source have at least name and value (can be string-like object or opened file, see more docs for MultipartForm struct) 1291 /// Params: 1292 /// url = url 1293 /// sources = array of sources. 1294 // we use this if we send from ubyte[][] and user provided Content-Length 1295 HTTPResponse exec_from_multipart_form(MultipartForm form) { 1296 import std.uuid; 1297 import std.file; 1298 1299 _multipartForm = form; 1300 1301 debug(requests) tracef("exec from multipart form"); 1302 1303 NetworkStream _stream; 1304 _response = new HTTPResponse; 1305 _response.uri = _uri; 1306 _response.finalURI = _uri; 1307 bool restartedRequest = false; 1308 1309 connect: 1310 _contentReceived = 0; 1311 _response._startedAt = Clock.currTime; 1312 1313 assert(_stream is null); 1314 1315 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1316 1317 if ( _stream is null ) { 1318 debug(requests) trace("create new connection"); 1319 _stream = setupConnection(); 1320 } else { 1321 debug(requests) trace("reuse old connection"); 1322 } 1323 1324 assert(_stream !is null); 1325 1326 if ( !_stream.isConnected ) { 1327 debug(requests) trace("disconnected stream on enter"); 1328 if ( !restartedRequest ) { 1329 debug(requests) trace("disconnected stream on enter: retry"); 1330 //assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1331 1332 //_cm.del(_uri.scheme, _uri.host, _uri.port, _stream); 1333 _stream.close(); 1334 _stream = null; 1335 1336 restartedRequest = true; 1337 goto connect; 1338 } 1339 debug(requests) trace("disconnected stream on enter: return response"); 1340 //_stream = null; 1341 return _response; 1342 } 1343 _response._connectedAt = Clock.currTime; 1344 1345 Appender!string req; 1346 req.put(requestString()); 1347 1348 string boundary = randomUUID().toString; 1349 string[] partHeaders; 1350 size_t contentLength; 1351 1352 foreach(ref part; _multipartForm._sources) { 1353 string h = "--" ~ boundary ~ "\r\n"; 1354 string disposition = `form-data; name="%s"`.format(part.name); 1355 string optionals = part. 1356 parameters.byKeyValue(). 1357 filter!(p => p.key!="Content-Type"). 1358 map! (p => "%s=%s".format(p.key, p.value)). 1359 join("; "); 1360 1361 h ~= `Content-Disposition: ` ~ [disposition, optionals].join("; ") ~ "\r\n"; 1362 1363 auto contentType = "Content-Type" in part.parameters; 1364 if ( contentType ) { 1365 h ~= "Content-Type: " ~ *contentType ~ "\r\n"; 1366 } 1367 1368 h ~= "\r\n"; 1369 partHeaders ~= h; 1370 contentLength += h.length + part.input.getSize() + "\r\n".length; 1371 } 1372 contentLength += "--".length + boundary.length + "--\r\n".length; 1373 1374 auto h = requestHeaders(); 1375 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "multipart/form-data; boundary=" ~ boundary); 1376 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(contentLength)); 1377 1378 h.byKeyValue. 1379 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1380 each!(h => req.put(h)); 1381 req.put("\r\n"); 1382 1383 debug(requests) trace(req.data); 1384 if ( _verbosity >= 1 ) req.data.splitLines.each!(a => writeln("> " ~ a)); 1385 1386 try { 1387 _stream.send(req.data()); 1388 foreach(ref source; _multipartForm._sources) { 1389 debug(requests) tracef("sending part headers <%s>", partHeaders.front); 1390 _stream.send(partHeaders.front); 1391 partHeaders.popFront; 1392 while (true) { 1393 auto chunk = source.input.read(); 1394 if ( chunk.length <= 0 ) { 1395 break; 1396 } 1397 _stream.send(chunk); 1398 } 1399 _stream.send("\r\n"); 1400 } 1401 _stream.send("--" ~ boundary ~ "--\r\n"); 1402 _response._requestSentAt = Clock.currTime; 1403 receiveResponse(_stream); 1404 _response._finishedAt = Clock.currTime; 1405 } 1406 catch (NetworkException e) { 1407 errorf("Error sending request: ", e.msg); 1408 _stream.close(); 1409 return _response; 1410 } 1411 1412 if ( serverPrematurelyClosedConnection() 1413 && !restartedRequest 1414 && isIdempotent(_method) 1415 ) { 1416 /// 1417 /// We didn't receive any data (keepalive connectioin closed?) 1418 /// and we can restart this request. 1419 /// Go ahead. 1420 /// 1421 debug(requests) tracef("Server closed keepalive connection"); 1422 1423 //assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1424 1425 //_cm.del(_uri.scheme, _uri.host, _uri.port, _stream); 1426 _stream.close(); 1427 _stream = null; 1428 1429 restartedRequest = true; 1430 goto connect; 1431 } 1432 1433 if ( _useStreaming ) { 1434 if ( _response._receiveAsRange.activated ) { 1435 debug(requests) trace("streaming_in activated"); 1436 return _response; 1437 } else { 1438 // this can happen if whole response body received together with headers 1439 _response._receiveAsRange.data = _response.responseBody.data; 1440 } 1441 } 1442 1443 close_connection_if_not_keepalive(_stream); 1444 1445 if ( _verbosity >= 1 ) { 1446 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1447 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1448 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1449 } 1450 1451 if ( willFollowRedirect ) { 1452 if ( _history.length >= _maxRedirects ) { 1453 _stream = null; 1454 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1455 } 1456 // "location" in response already checked in canFollowRedirect 1457 immutable new_location = *("location" in _response.responseHeaders); 1458 immutable current_uri = _uri; 1459 immutable next_uri = uriFromLocation(_uri, new_location); 1460 1461 immutable get_or_head = _method == "GET" || _method == "HEAD"; 1462 immutable code = _response.code; 1463 1464 // save current response for history 1465 _history ~= _response; 1466 1467 if ( code == 301 ) 1468 { 1469 // permanent redirect and change method 1470 _permanent_redirects[_uri] = new_location; 1471 if ( !get_or_head ) 1472 { 1473 _method = "GET"; 1474 } 1475 } 1476 if ( (code == 302 || code == 303) && !get_or_head) 1477 { 1478 // only change method 1479 _method = "GET"; 1480 } 1481 if ( code == 307 ) 1482 { 1483 // no change method, no permanent 1484 } 1485 if ( code == 308 ) 1486 { 1487 // permanent redirection and do not change method 1488 _permanent_redirects[_uri] = new_location; 1489 } 1490 1491 // prepare new response (for redirected request) 1492 _response = new HTTPResponse; 1493 _response.uri = current_uri; 1494 _response.finalURI = next_uri; 1495 _stream = null; 1496 1497 // set new uri 1498 this._uri = next_uri; 1499 debug(requests) tracef("Redirected to %s", next_uri); 1500 if ( restartedRequest ) { 1501 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1502 restartedRequest = false; 1503 } 1504 if ( _method == "GET") 1505 { 1506 return exec_from_parameters(); 1507 } 1508 goto connect; 1509 } 1510 1511 if (_stream && _stream.isConnected) { 1512 // return to pool 1513 _cm.put(_uri.scheme, _uri.host, _uri.port, _stream); 1514 } 1515 _response._history = _history; 1516 return _response; 1517 } 1518 1519 /// 1520 HTTPResponse exec_from_parameters() { 1521 1522 debug(requests) tracef("exec from parameters request"); 1523 1524 assert(_uri != URI.init); 1525 NetworkStream _stream; 1526 _response = new HTTPResponse; 1527 _history.length = 0; 1528 _response.uri = _uri; 1529 _response.finalURI = _uri; 1530 bool restartedRequest = false; // True if this is restarted keepAlive request 1531 1532 connect: 1533 if ( _method == "GET" && _uri in _permanent_redirects ) { 1534 debug(requests) trace("use parmanent redirects cache"); 1535 _uri = uriFromLocation(_uri, _permanent_redirects[_uri]); 1536 _response._finalURI = _uri; 1537 } 1538 _contentReceived = 0; 1539 _response._startedAt = Clock.currTime; 1540 1541 assert(_stream is null); 1542 1543 _stream = _cm.get(_uri.scheme, _uri.host, _uri.port); 1544 1545 if ( _stream is null ) { 1546 debug(requests) trace("create new connection"); 1547 _stream = setupConnection(); 1548 } else { 1549 debug(requests) trace("reuse old connection"); 1550 } 1551 1552 assert(_stream !is null); 1553 1554 if ( !_stream.isConnected ) { 1555 debug(requests) trace("disconnected stream on enter"); 1556 if ( !restartedRequest ) { 1557 debug(requests) trace("disconnected stream on enter: retry"); 1558 // assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1559 1560 // _cm.del(_uri.scheme, _uri.host, _uri.port, _stream); 1561 _stream.close(); 1562 _stream = null; 1563 1564 restartedRequest = true; 1565 goto connect; 1566 } 1567 debug(requests) trace("disconnected stream on enter: return response"); 1568 //_stream = null; 1569 return _response; 1570 } 1571 _response._connectedAt = Clock.currTime; 1572 1573 auto h = requestHeaders(); 1574 1575 Appender!string req; 1576 1577 string encoded; 1578 1579 switch (_method) { 1580 case "POST","PUT","PATCH": 1581 encoded = params2query(_params); 1582 safeSetHeader(h, _userHeaders.ContentType, "Content-Type", "application/x-www-form-urlencoded"); 1583 if ( encoded.length > 0) { 1584 safeSetHeader(h, _userHeaders.ContentLength, "Content-Length", to!string(encoded.length)); 1585 } 1586 req.put(requestString()); 1587 break; 1588 default: 1589 req.put(requestString(_params)); 1590 } 1591 1592 h.byKeyValue. 1593 map!(kv => kv.key ~ ": " ~ kv.value ~ "\r\n"). 1594 each!(h => req.put(h)); 1595 req.put("\r\n"); 1596 if ( encoded ) { 1597 req.put(encoded); 1598 } 1599 1600 debug(requests) trace(req.data); 1601 if ( _verbosity >= 1 ) { 1602 req.data.splitLines.each!(a => writeln("> " ~ a)); 1603 } 1604 // 1605 // Now send request and receive response 1606 // 1607 try { 1608 _stream.send(req.data()); 1609 _response._requestSentAt = Clock.currTime; 1610 debug(requests) trace("starting receive response"); 1611 receiveResponse(_stream); 1612 debug(requests) tracef("done receive response"); 1613 _response._finishedAt = Clock.currTime; 1614 } 1615 catch (NetworkException e) { 1616 // On SEND this can means: 1617 // we started to send request to the server, but it closed connection because of keepalive timeout. 1618 // We have to restart request if possible. 1619 1620 // On RECEIVE - if we received something - then this exception is real and unexpected error. 1621 // If we didn't receive anything - we can restart request again as it can be 1622 debug(requests) tracef("Exception on receive response: %s", e.msg); 1623 if ( _response._responseHeaders.length != 0 ) 1624 { 1625 _stream.close(); 1626 throw new RequestException("Unexpected network error"); 1627 } 1628 } 1629 1630 if ( serverPrematurelyClosedConnection() 1631 && !restartedRequest 1632 && isIdempotent(_method) 1633 ) { 1634 /// 1635 /// We didn't receive any data (keepalive connectioin closed?) 1636 /// and we can restart this request. 1637 /// Go ahead. 1638 /// 1639 debug(requests) tracef("Server closed keepalive connection"); 1640 1641 //assert(_cm.get(_uri.scheme, _uri.host, _uri.port) == _stream); 1642 1643 //_cm.del(_uri.scheme, _uri.host, _uri.port, _stream); 1644 _stream.close(); 1645 _stream = null; 1646 1647 restartedRequest = true; 1648 goto connect; 1649 } 1650 1651 if ( _useStreaming ) { 1652 if ( _response._receiveAsRange.activated ) { 1653 debug(requests) trace("streaming_in activated"); 1654 return _response; 1655 } else { 1656 // this can happen if whole response body received together with headers 1657 _response._receiveAsRange.data = _response.responseBody.data; 1658 } 1659 } 1660 1661 close_connection_if_not_keepalive(_stream); 1662 1663 if ( _verbosity >= 1 ) { 1664 writeln(">> Connect time: ", _response._connectedAt - _response._startedAt); 1665 writeln(">> Request send time: ", _response._requestSentAt - _response._connectedAt); 1666 writeln(">> Response recv time: ", _response._finishedAt - _response._requestSentAt); 1667 } 1668 1669 if ( willFollowRedirect ) { 1670 debug(requests) trace("going to follow redirect"); 1671 if ( _history.length >= _maxRedirects ) { 1672 _stream = null; 1673 throw new MaxRedirectsException("%d redirects reached maxRedirects %d.".format(_history.length, _maxRedirects)); 1674 } 1675 // "location" in response already checked in canFollowRedirect 1676 immutable new_location = *("location" in _response.responseHeaders); 1677 immutable current_uri = _uri; 1678 immutable next_uri = uriFromLocation(_uri, new_location); 1679 1680 immutable get_or_head = _method == "GET" || _method == "HEAD"; 1681 immutable code = _response.code; 1682 1683 // save current response for history 1684 _history ~= _response; 1685 1686 if ( code == 301 ) 1687 { 1688 // permanent redirect and change method 1689 debug(requests) tracef("store [%s]=%s in permanent_redirects", _uri, new_location); 1690 _permanent_redirects[_uri] = new_location; 1691 if ( !get_or_head ) 1692 { 1693 _method = "GET"; 1694 } 1695 } 1696 if ( (code == 302 || code == 303) && !get_or_head) 1697 { 1698 // only change method 1699 _method = "GET"; 1700 } 1701 if ( code == 307 ) 1702 { 1703 // no change method, no permanent 1704 } 1705 if ( code == 308 ) 1706 { 1707 // permanent redirection and do not change method 1708 _permanent_redirects[_uri] = new_location; 1709 } 1710 1711 // prepare new response (for redirected request) 1712 _response = new HTTPResponse; 1713 _response.uri = current_uri; 1714 _response.finalURI = next_uri; 1715 if (_stream && _stream.isConnected) { 1716 // return to pool 1717 _cm.put(current_uri.scheme, current_uri.host, current_uri.port, _stream); 1718 } 1719 _stream = null; 1720 1721 // set new uri 1722 _uri = next_uri; 1723 debug(requests) tracef("Redirected to %s", next_uri); 1724 if ( restartedRequest ) { 1725 debug(requests) trace("Rare event: clearing 'restartedRequest' on redirect"); 1726 restartedRequest = false; 1727 } 1728 goto connect; 1729 } 1730 1731 _response._history = _history; 1732 if ( _stream && _stream.isConnected ) { 1733 // return to pool 1734 _cm.put(_uri.scheme, _uri.host, _uri.port, _stream); 1735 } 1736 return _response; 1737 } 1738 HTTPResponse execute(Request r) 1739 { 1740 _method = r.method; 1741 _uri = r.uri; 1742 _useStreaming = r.useStreaming; 1743 _permanent_redirects = r.permanent_redirects; 1744 _maxRedirects = r.maxRedirects; 1745 _authenticator = r.authenticator; 1746 _maxHeadersLength = r.maxHeadersLength; 1747 _maxContentLength = r.maxContentLength; 1748 _verbosity = r.verbosity; 1749 _keepAlive = r.keepAlive; 1750 _bufferSize = r.bufferSize; 1751 _proxy = r.proxy; 1752 _timeout = r.timeout; 1753 _contentType = r.contentType; 1754 _socketFactory = r.socketFactory; 1755 _sslOptions = r.sslOptions; 1756 _bind = r.bind; 1757 _headers = r.headers; 1758 _userHeaders = r.userHeaders; 1759 1760 _params = r.params; 1761 1762 // this assignments increments refCounts, so we can't use const Request 1763 // but Request is anyway struct and called by-value 1764 _cm = r.cm; 1765 _cookie = r.cookie; 1766 1767 debug(requests) trace("serving %s".format(r)); 1768 if ( !r.postData.empty) 1769 { 1770 return exec_from_range(r.postData); 1771 } 1772 if ( r.hasMultipartForm ) 1773 { 1774 return exec_from_multipart_form(r.multipartForm); 1775 } 1776 auto rs = exec_from_parameters(); 1777 return rs; 1778 } 1779 } 1780 1781 version(vibeD) { 1782 import std.json; 1783 package string httpTestServer() { 1784 return "http://httpbin.org/"; 1785 } 1786 package string fromJsonArrayToStr(JSONValue v) { 1787 return v.str; 1788 } 1789 } 1790 else { 1791 import std.json; 1792 package string httpTestServer() { 1793 return "http://127.0.0.1:8081/"; 1794 } 1795 package string fromJsonArrayToStr(JSONValue v) { 1796 return cast(string)(v.array.map!"cast(ubyte)a.integer".array); 1797 } 1798 }