1 module requests.ftp; 2 3 private: 4 import std.ascii; 5 import std.algorithm; 6 import std.conv; 7 import std.datetime; 8 import std.format; 9 import std.exception; 10 import std.string; 11 import std.range; 12 import std.experimental.logger; 13 import std.stdio; 14 import std.path; 15 import std.traits; 16 17 import requests.uri; 18 import requests.utils; 19 import requests.streams; 20 import requests.base; 21 22 public class FTPServerResponseError: Exception { 23 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 24 super(message, file, line, next); 25 } 26 } 27 28 public class FTPResponse : Response { 29 } 30 31 public class FtpAuthentication: Auth { 32 private { 33 string _username, _password; 34 } 35 /// Constructor. 36 /// Params: 37 /// username = username 38 /// password = password 39 /// 40 this(string username, string password) { 41 _username = username; 42 _password = password; 43 } 44 override string userName() { 45 return _username; 46 } 47 override string password() { 48 return _password; 49 } 50 override string[string] authHeaders(string domain) { 51 return null; 52 } 53 } 54 55 enum defaultBufferSize = 8192; 56 57 public struct FTPRequest { 58 private { 59 URI _uri; 60 Duration _timeout = 60.seconds; 61 uint _verbosity = 0; 62 size_t _bufferSize = defaultBufferSize; 63 long _maxContentLength = 5*1024*1024*1024; 64 long _contentLength = -1; 65 long _contentReceived; 66 NetworkStream _controlChannel; 67 string[] _responseHistory; 68 FTPResponse _response; 69 bool _useStreaming; 70 Auth _authenticator; 71 string _method; 72 string _proxy; 73 string _bind; 74 } 75 mixin(Getter_Setter!Duration("timeout")); 76 mixin(Getter_Setter!uint("verbosity")); 77 mixin(Getter_Setter!size_t("bufferSize")); 78 mixin(Getter_Setter!long("maxContentLength")); 79 mixin(Getter_Setter!bool("useStreaming")); 80 mixin(Getter!long("contentLength")); 81 mixin(Getter!long("contentReceived")); 82 mixin(Setter!Auth("authenticator")); 83 mixin(Getter_Setter!string("proxy")); 84 mixin(Getter_Setter!string("bind")); 85 86 @property final string[] responseHistory() @safe @nogc nothrow { 87 return _responseHistory; 88 } 89 this(string uri) { 90 _uri = URI(uri); 91 } 92 93 this(in URI uri) { 94 _uri = uri; 95 } 96 97 ~this() { 98 if ( _controlChannel ) { 99 _controlChannel.close(); 100 } 101 } 102 string toString() const { 103 return "FTPRequest(%s, %s)".format(_method, _uri.uri()); 104 } 105 string format(string fmt) const { 106 import std.array; 107 import std.stdio; 108 auto a = appender!string(); 109 auto f = FormatSpec!char(fmt); 110 while (f.writeUpToNextSpec(a)) { 111 switch(f.spec) { 112 case 'h': 113 // Remote hostname. 114 a.put(_uri.host); 115 break; 116 case 'm': 117 // method. 118 a.put(_method); 119 break; 120 case 'p': 121 // Remote port. 122 a.put("%d".format(_uri.port)); 123 break; 124 case 'P': 125 // Path 126 a.put(_uri.path); 127 break; 128 case 'q': 129 // query parameters supplied with url. 130 a.put(_uri.query); 131 break; 132 case 'U': 133 a.put(_uri.uri()); 134 break; 135 default: 136 throw new FormatException("Unknown Request format spec " ~ f.spec); 137 } 138 } 139 return a.data(); 140 } 141 ushort sendCmdGetResponse(string cmd) { 142 debug(requests) tracef("cmd to server: %s", cmd.strip); 143 if ( _verbosity >=1 ) { 144 writefln("> %s", cmd.strip); 145 } 146 _controlChannel.send(cmd); 147 string response = serverResponse(); 148 _responseHistory ~= response; 149 return responseToCode(response); 150 } 151 152 ushort responseToCode(string response) pure const @safe { 153 return to!ushort(response[0..3]); 154 } 155 156 void handleChangeURI(in string uri) @safe { 157 // if control channel exists and new URL not match old, then close 158 URI newURI = URI(uri); 159 if ( _controlChannel && 160 (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) { 161 _controlChannel.close(); 162 _controlChannel = null; 163 } 164 _uri = newURI; 165 } 166 167 string serverResponse() { 168 string res, buffer; 169 immutable bufferLimit = 16*1024; 170 _controlChannel.readTimeout = _timeout; 171 scope(exit) { 172 _controlChannel.readTimeout = 0.seconds; 173 } 174 auto b = new ubyte[_bufferSize]; 175 while ( buffer.length < bufferLimit ) { 176 debug(requests) trace("Wait on control channel"); 177 ptrdiff_t rc; 178 try { 179 rc = _controlChannel.receive(b); 180 } 181 catch (Exception e) { 182 error("Failed to read response from server"); 183 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e); 184 } 185 debug(requests) tracef("Got %d bytes from control socket", rc); 186 if ( rc == 0 ) { 187 error("Failed to read response from server"); 188 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__); 189 } 190 if ( _verbosity >= 1 ) { 191 (cast(string)b[0..rc]). 192 splitLines. 193 each!(l=>writefln("< %s", l)); 194 } 195 buffer ~= b[0..rc]; 196 if ( buffer.endsWith('\n') ){ 197 auto responseLines = buffer. 198 splitLines. 199 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit); 200 if ( responseLines.count > 0 ) { 201 return responseLines.front; 202 } 203 } 204 } 205 throw new FTPServerResponseError("Failed to read server responce over control channel"); 206 assert(0); 207 } 208 ushort tryCdOrCreatePath(string[] path) { 209 /* 210 * At start we stay at original path, we have to create next path element 211 * For example: 212 * path = ["", "a", "b"] - we stay in root (path[0]), we have to cd and return ok 213 * or try to cteate "a" and cd to "a". 214 */ 215 debug(requests) info("Trying to create path %s".format(path)); 216 enforce(path.length>=2, "You called tryCdOrCreate, but there is nothing to create: %s".format(path)); 217 auto next_dir = path[1]; 218 auto code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n"); 219 if ( code >= 300) { 220 // try to create, then again CWD 221 code = sendCmdGetResponse("MKD " ~ next_dir ~ "\r\n"); 222 if ( code > 300 ) { 223 return code; 224 } 225 code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n"); 226 } 227 if ( path.length == 2 ) { 228 return code; 229 } 230 return tryCdOrCreatePath(path[1..$]); 231 } 232 auto post(R, A...)(string uri, R content, A args) 233 if ( __traits(compiles, cast(ubyte[])content) 234 || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 235 || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte))) 236 ) { 237 enforce( uri || _uri.host, "FTP URL undefined"); 238 string response; 239 ushort code; 240 241 _response = new FTPResponse; 242 _response._startedAt = Clock.currTime; 243 _method = "POST"; 244 245 scope(exit) { 246 _response._finishedAt = Clock.currTime; 247 } 248 249 if ( uri ) { 250 handleChangeURI(uri); 251 } 252 253 _response.uri = _uri; 254 _response.finalURI = _uri; 255 256 if ( !_controlChannel ) { 257 _controlChannel = new TCPStream(); 258 _controlChannel.connect(_uri.host, _uri.port, _timeout); 259 response = serverResponse(); 260 _responseHistory ~= response; 261 262 code = responseToCode(response); 263 debug(requests) tracef("Server initial response: %s", response); 264 if ( code/100 > 2 ) { 265 _response.code = code; 266 return _response; 267 } 268 // Log in 269 string user, pass; 270 if ( _authenticator ) { 271 user = _authenticator.userName(); 272 pass = _authenticator.password(); 273 } 274 else{ 275 user = _uri.username.length ? _uri.username : "anonymous"; 276 pass = _uri.password.length ? _uri.password : "requests@"; 277 } 278 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 279 280 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 281 if ( code/100 > 3 ) { 282 _response.code = code; 283 return _response; 284 } else if ( code/100 == 3) { 285 286 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 287 if ( code/100 > 2 ) { 288 _response.code = code; 289 return _response; 290 } 291 } 292 293 } 294 code = sendCmdGetResponse("PWD\r\n"); 295 string pwd; 296 if ( code/100 == 2 ) { 297 // like '257 "/home/testuser"' 298 auto a = _responseHistory[$-1].split(); 299 if ( a.length > 1 ) { 300 pwd = a[1].chompPrefix(`"`).chomp(`"`); 301 } 302 } 303 scope (exit) { 304 if ( pwd && _controlChannel ) { 305 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n"); 306 } 307 } 308 309 auto path = dirName(_uri.path); 310 if ( path != "/") { 311 path = path.chompPrefix("/"); 312 } 313 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n"); 314 if ( code == 550 ) { 315 // try to create directory end enter it 316 code = tryCdOrCreatePath(dirName(_uri.path).split('/')); 317 } 318 if ( code/100 > 2 ) { 319 _response.code = code; 320 return _response; 321 } 322 323 code = sendCmdGetResponse("PASV\r\n"); 324 if ( code/100 > 2 ) { 325 _response.code = code; 326 return _response; 327 } 328 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 329 // in last response. 330 // Cut anything between ( and ) 331 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 332 string host; 333 ushort port; 334 try { 335 ubyte a1,a2,a3,a4,p1,p2; 336 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 337 host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4); 338 port = (p1<<8) + p2; 339 } catch (FormatException e) { 340 error("Failed to parse ", v); 341 _response.code = 500; 342 return _response; 343 } 344 345 auto dataStream = new TCPStream(); 346 scope (exit ) { 347 if ( dataStream !is null ) { 348 dataStream.close(); 349 } 350 } 351 352 dataStream.connect(host, port, _timeout); 353 354 code = sendCmdGetResponse("TYPE I\r\n"); 355 if ( code/100 > 2 ) { 356 _response.code = code; 357 return _response; 358 } 359 360 code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n"); 361 if ( code/100 > 1 ) { 362 _response.code = code; 363 return _response; 364 } 365 static if ( __traits(compiles, cast(ubyte[])content) ) { 366 auto data = cast(ubyte[])content; 367 auto b = new ubyte[_bufferSize]; 368 for(size_t pos = 0; pos < data.length;) { 369 auto chunk = data.take(_bufferSize).array; 370 auto rc = dataStream.send(chunk); 371 if ( rc <= 0 ) { 372 debug(requests) trace("done"); 373 break; 374 } 375 debug(requests) tracef("sent %d bytes to data channel", rc); 376 pos += rc; 377 } 378 } else { 379 while (!content.empty) { 380 auto chunk = content.front; 381 debug(requests) trace("ftp posting %d of data chunk".format(chunk.length)); 382 auto rc = dataStream.send(chunk); 383 if ( rc <= 0 ) { 384 debug(requests) trace("done"); 385 break; 386 } 387 content.popFront; 388 } 389 } 390 dataStream.close(); 391 dataStream = null; 392 response = serverResponse(); 393 code = responseToCode(response); 394 if ( code/100 == 2 ) { 395 debug(requests) tracef("Successfully uploaded %d bytes", _response._responseBody.length); 396 } 397 _response.code = code; 398 return _response; 399 } 400 401 auto get(string uri = null) { 402 enforce( uri || _uri.host, "FTP URL undefined"); 403 string response; 404 ushort code; 405 406 _response = new FTPResponse; 407 _contentReceived = 0; 408 _method = "GET"; 409 410 _response._startedAt = Clock.currTime; 411 scope(exit) { 412 _response._finishedAt = Clock.currTime; 413 } 414 415 if ( uri ) { 416 handleChangeURI(uri); 417 } 418 419 _response.uri = _uri; 420 _response.finalURI = _uri; 421 422 if ( !_controlChannel ) { 423 _controlChannel = new TCPStream(); 424 _controlChannel.bind(_bind); 425 _controlChannel.connect(_uri.host, _uri.port, _timeout); 426 _response._connectedAt = Clock.currTime; 427 response = serverResponse(); 428 _responseHistory ~= response; 429 430 code = responseToCode(response); 431 debug(requests) tracef("Server initial response: %s", response); 432 if ( code/100 > 2 ) { 433 _response.code = code; 434 return _response; 435 } 436 // Log in 437 string user, pass; 438 if ( _authenticator ) { 439 user = _authenticator.userName(); 440 pass = _authenticator.password(); 441 } 442 else{ 443 user = _uri.username.length ? _uri.username : "anonymous"; 444 pass = _uri.password.length ? _uri.password : "requests@"; 445 } 446 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 447 448 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 449 if ( code/100 > 3 ) { 450 _response.code = code; 451 return _response; 452 } else if ( code/100 == 3) { 453 454 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 455 if ( code/100 > 2 ) { 456 _response.code = code; 457 return _response; 458 } 459 } 460 } 461 else { 462 _response._connectedAt = Clock.currTime; 463 } 464 465 code = sendCmdGetResponse("PWD\r\n"); 466 string pwd; 467 if ( code/100 == 2 ) { 468 // like '257 "/home/testuser"' 469 auto a = _responseHistory[$-1].split(); 470 if ( a.length > 1 ) { 471 pwd = a[1].chompPrefix(`"`).chomp(`"`); 472 } 473 } 474 scope (exit) { 475 if ( pwd && _controlChannel && !_useStreaming ) { 476 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n"); 477 } 478 } 479 480 auto path = dirName(_uri.path); 481 if ( path != "/") { 482 path = path.chompPrefix("/"); 483 } 484 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n"); 485 if ( code/100 > 2 ) { 486 _response.code = code; 487 return _response; 488 } 489 490 code = sendCmdGetResponse("TYPE I\r\n"); 491 if ( code/100 > 2 ) { 492 _response.code = code; 493 return _response; 494 } 495 496 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n"); 497 if ( code/100 == 2 ) { 498 // something like 499 // 213 229355520 500 auto s = _responseHistory[$-1].findSplitAfter(" "); 501 if ( s.length ) { 502 try { 503 _contentLength = to!long(s[1]); 504 } catch (ConvException) { 505 debug(requests) trace("Failed to convert string %s to file size".format(s[1])); 506 } 507 } 508 } 509 510 if ( _maxContentLength && _contentLength > _maxContentLength ) { 511 throw new RequestException("maxContentLength exceeded for ftp data"); 512 } 513 514 code = sendCmdGetResponse("PASV\r\n"); 515 if ( code/100 > 2 ) { 516 _response.code = code; 517 return _response; 518 } 519 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 520 // in last response. 521 // Cut anything between ( and ) 522 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 523 string host; 524 ushort port; 525 try { 526 ubyte a1,a2,a3,a4,p1,p2; 527 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 528 host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4); 529 port = (p1<<8) + p2; 530 } catch (FormatException e) { 531 error("Failed to parse ", v); 532 _response.code = 500; 533 return _response; 534 } 535 536 auto dataStream = new TCPStream(); 537 scope (exit ) { 538 if ( dataStream !is null && !_response._receiveAsRange.activated ) { 539 dataStream.close(); 540 } 541 } 542 dataStream.bind(_bind); 543 dataStream.connect(host, port, _timeout); 544 545 code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n"); 546 if ( code/100 > 1 ) { 547 _response.code = code; 548 return _response; 549 } 550 while ( true ) { 551 auto b = new ubyte[_bufferSize]; 552 auto rc = dataStream.receive(b); 553 if ( rc <= 0 ) { 554 debug(requests) trace("done"); 555 break; 556 } 557 debug(requests) tracef("got %d bytes from data channel", rc); 558 559 _contentReceived += rc; 560 _response._responseBody.putNoCopy(b[0..rc]); 561 562 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) { 563 throw new RequestException("maxContentLength exceeded for ftp data"); 564 } 565 if ( _useStreaming ) { 566 debug(requests) trace("ftp uses streaming"); 567 _response.receiveAsRange.activated = true; 568 _response.receiveAsRange.data.length = 0; 569 _response.receiveAsRange.data = _response._responseBody.data; 570 _response.receiveAsRange.read = delegate ubyte[] () { 571 Buffer!ubyte result; 572 while(true) { 573 // check if we received everything we need 574 if ( _contentReceived >= _maxContentLength ) 575 { 576 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 577 format(_contentLength, _maxContentLength)); 578 } 579 // have to continue 580 auto b = new ubyte[_bufferSize]; 581 ptrdiff_t read; 582 try { 583 read = dataStream.receive(b); 584 } 585 catch (Exception e) { 586 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 587 } 588 589 if ( read > 0 ) { 590 _contentReceived += read; 591 result.putNoCopy(b[0..read]); 592 return result.data; 593 } 594 if ( read == 0 ) { 595 debug(requests) tracef("streaming_in: server closed connection"); 596 dataStream.close(); 597 code = responseToCode(serverResponse()); 598 if ( code/100 == 2 ) { 599 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 600 } 601 _response.code = code; 602 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n"); 603 break; 604 } 605 } 606 return result.data; 607 }; 608 return _response; 609 } 610 } 611 dataStream.close(); 612 response = serverResponse(); 613 code = responseToCode(response); 614 if ( code/100 == 2 ) { 615 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 616 } 617 _response.code = code; 618 return _response; 619 } 620 } 621 622 package unittest { 623 import std.process; 624 625 globalLogLevel(LogLevel.info); 626 bool unreliable_network = environment.get("UNRELIABLENETWORK", "false") == "true"; 627 628 info("testing ftp"); 629 auto rq = FTPRequest(); 630 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 631 auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation); 632 assert(unreliable_network || rs.code == 226); 633 info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session."); 634 rs = rq.get("ftp://speedtest.tele2.net/nonexistent"); 635 assert(unreliable_network || rs.code != 226); 636 info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session."); 637 rs = rq.get("ftp://speedtest.tele2.net/1KB.zip"); 638 assert(unreliable_network || rs.code == 226); 639 assert(unreliable_network || rs.responseBody.length == 1024); 640 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 641 rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation); 642 assert(unreliable_network || rs.code == 226); 643 info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 644 try { 645 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 646 } 647 catch (ConnectError e) 648 { 649 } 650 assert(unreliable_network || rs.code == 226); 651 info("ftp get ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT with authenticator"); 652 rq.authenticator = new FtpAuthentication("anonymous", "requests@"); 653 try { 654 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 655 } 656 catch (ConnectError e) 657 { 658 } 659 assert(unreliable_network || rs.code == 226); 660 assert(unreliable_network || rs.finalURI.path == "/pub/FreeBSD/README.TXT"); 661 assert(unreliable_network || rq.format("%m|%h|%p|%P|%q|%U") == "GET|ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 662 assert(unreliable_network || rs.format("%h|%p|%P|%q|%U") == "ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 663 info("testing ftp - done."); 664 }