1 module requests.ftp; 2 3 private: 4 import std.ascii; 5 import std.algorithm; 6 import std.conv; 7 import std.datetime; 8 import std.format; 9 import std.exception; 10 import std.string; 11 import std.range; 12 import std.experimental.logger; 13 import std.stdio; 14 import std.path; 15 import std.traits; 16 17 import requests.uri; 18 import requests.utils; 19 import requests.streams; 20 import requests.base; 21 22 public class FTPServerResponseError: Exception { 23 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 24 super(message, file, line, next); 25 } 26 } 27 28 public class FTPResponse : Response { 29 } 30 31 public class FtpAuthentication: Auth { 32 private { 33 string _username, _password; 34 } 35 /// Constructor. 36 /// Params: 37 /// username = username 38 /// password = password 39 /// 40 this(string username, string password) { 41 _username = username; 42 _password = password; 43 } 44 override string userName() { 45 return _username; 46 } 47 override string password() { 48 return _password; 49 } 50 override string[string] authHeaders(string domain) { 51 return null; 52 } 53 } 54 55 enum defaultBufferSize = 8192; 56 57 public struct FTPRequest { 58 private { 59 URI _uri; 60 Duration _timeout = 60.seconds; 61 uint _verbosity = 0; 62 size_t _bufferSize = defaultBufferSize; 63 long _maxContentLength = 5*1024*1024*1024; 64 long _contentLength = -1; 65 long _contentReceived; 66 NetworkStream _controlChannel; 67 string[] _responseHistory; 68 FTPResponse _response; 69 bool _useStreaming; 70 Auth _authenticator; 71 } 72 mixin(Getter_Setter!Duration("timeout")); 73 mixin(Getter_Setter!uint("verbosity")); 74 mixin(Getter_Setter!size_t("bufferSize")); 75 mixin(Getter_Setter!long("maxContentLength")); 76 mixin(Getter_Setter!bool("useStreaming")); 77 mixin(Getter!long("contentLength")); 78 mixin(Getter!long("contentReceived")); 79 mixin(Setter!Auth("authenticator")); 80 81 @property final string[] responseHistory() @safe @nogc nothrow { 82 return _responseHistory; 83 } 84 this(string uri) { 85 _uri = URI(uri); 86 } 87 88 this(in URI uri) { 89 _uri = uri; 90 } 91 92 ~this() { 93 if ( _controlChannel ) { 94 _controlChannel.close(); 95 } 96 } 97 98 ushort sendCmdGetResponse(string cmd) { 99 debug(requests) tracef("cmd to server: %s", cmd.strip); 100 if ( _verbosity >=1 ) { 101 writefln("> %s", cmd.strip); 102 } 103 _controlChannel.send(cmd); 104 string response = serverResponse(); 105 _responseHistory ~= response; 106 return responseToCode(response); 107 } 108 109 ushort responseToCode(string response) pure const @safe { 110 return to!ushort(response[0..3]); 111 } 112 113 void handleChangeURI(in string uri) @safe { 114 // if control channel exists and new URL not match old, then close 115 URI newURI = URI(uri); 116 if ( _controlChannel && 117 (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) { 118 _controlChannel.close(); 119 _controlChannel = null; 120 } 121 _uri = newURI; 122 } 123 124 string serverResponse() { 125 string res, buffer; 126 immutable bufferLimit = 16*1024; 127 _controlChannel.readTimeout = _timeout; 128 scope(exit) { 129 _controlChannel.readTimeout = 0.seconds; 130 } 131 auto b = new ubyte[_bufferSize]; 132 while ( buffer.length < bufferLimit ) { 133 debug(requests) trace("Wait on control channel"); 134 ptrdiff_t rc; 135 try { 136 rc = _controlChannel.receive(b); 137 } 138 catch (Exception e) { 139 error("Failed to read response from server"); 140 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e); 141 } 142 debug(requests) tracef("Got %d bytes from control socket", rc); 143 if ( rc == 0 ) { 144 error("Failed to read response from server"); 145 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__); 146 } 147 if ( _verbosity >= 1 ) { 148 (cast(string)b[0..rc]). 149 splitLines. 150 each!(l=>writefln("< %s", l)); 151 } 152 buffer ~= b[0..rc]; 153 if ( buffer.endsWith('\n') ){ 154 auto responseLines = buffer. 155 splitLines. 156 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit); 157 if ( responseLines.count > 0 ) { 158 return responseLines.front; 159 } 160 } 161 } 162 throw new FTPServerResponseError("Failed to read server responce over control channel"); 163 assert(0); 164 } 165 ushort tryCdOrCreatePath(string[] path) { 166 /* 167 * At start we stay at original path, we have to create next path element 168 * For example: 169 * path = ["", "a", "b"] - we stay in root (path[0]), we have to cd and return ok 170 * or try to cteate "a" and cd to "a". 171 */ 172 debug(requests) info("Trying to create path %s".format(path)); 173 enforce(path.length>=2, "You called tryCdOrCreate, but there is nothing to create: %s".format(path)); 174 auto next_dir = path[1]; 175 auto code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n"); 176 if ( code >= 300) { 177 // try to create, then again CWD 178 code = sendCmdGetResponse("MKD " ~ next_dir ~ "\r\n"); 179 if ( code > 300 ) { 180 return code; 181 } 182 code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n"); 183 } 184 if ( path.length == 2 ) { 185 return code; 186 } 187 return tryCdOrCreatePath(path[1..$]); 188 } 189 auto post(R, A...)(string uri, R content, A args) 190 if ( __traits(compiles, cast(ubyte[])content) 191 || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 192 || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte))) 193 ) { 194 enforce( uri || _uri.host, "FTP URL undefined"); 195 string response; 196 ushort code; 197 198 _response = new FTPResponse; 199 200 if ( uri ) { 201 handleChangeURI(uri); 202 } 203 204 _response.uri = _uri; 205 _response.finalURI = _uri; 206 207 if ( !_controlChannel ) { 208 _controlChannel = new TCPStream(); 209 _controlChannel.connect(_uri.host, _uri.port, _timeout); 210 response = serverResponse(); 211 _responseHistory ~= response; 212 213 code = responseToCode(response); 214 debug(requests) tracef("Server initial response: %s", response); 215 if ( code/100 > 2 ) { 216 _response.code = code; 217 return _response; 218 } 219 // Log in 220 string user, pass; 221 if ( _authenticator ) { 222 user = _authenticator.userName(); 223 pass = _authenticator.password(); 224 } 225 else{ 226 user = _uri.username.length ? _uri.username : "anonymous"; 227 pass = _uri.password.length ? _uri.password : "requests@"; 228 } 229 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 230 231 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 232 if ( code/100 > 3 ) { 233 _response.code = code; 234 return _response; 235 } else if ( code/100 == 3) { 236 237 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 238 if ( code/100 > 2 ) { 239 _response.code = code; 240 return _response; 241 } 242 } 243 244 } 245 code = sendCmdGetResponse("PWD\r\n"); 246 string pwd; 247 if ( code/100 == 2 ) { 248 // like '257 "/home/testuser"' 249 auto a = _responseHistory[$-1].split(); 250 if ( a.length > 1 ) { 251 pwd = a[1].chompPrefix(`"`).chomp(`"`); 252 } 253 } 254 scope (exit) { 255 if ( pwd && _controlChannel ) { 256 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n"); 257 } 258 } 259 260 auto path = dirName(_uri.path); 261 if ( path != "/") { 262 path = path.chompPrefix("/"); 263 } 264 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n"); 265 if ( code == 550 ) { 266 // try to create directory end enter it 267 code = tryCdOrCreatePath(dirName(_uri.path).split('/')); 268 } 269 if ( code/100 > 2 ) { 270 _response.code = code; 271 return _response; 272 } 273 274 code = sendCmdGetResponse("PASV\r\n"); 275 if ( code/100 > 2 ) { 276 _response.code = code; 277 return _response; 278 } 279 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 280 // in last response. 281 // Cut anything between ( and ) 282 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 283 string host; 284 ushort port; 285 try { 286 ubyte a1,a2,a3,a4,p1,p2; 287 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 288 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 289 port = (p1<<8) + p2; 290 } catch (FormatException e) { 291 error("Failed to parse ", v); 292 _response.code = 500; 293 return _response; 294 } 295 296 auto dataStream = new TCPStream(); 297 scope (exit ) { 298 if ( dataStream !is null ) { 299 dataStream.close(); 300 } 301 } 302 303 dataStream.connect(host, port, _timeout); 304 305 code = sendCmdGetResponse("TYPE I\r\n"); 306 if ( code/100 > 2 ) { 307 _response.code = code; 308 return _response; 309 } 310 311 code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n"); 312 if ( code/100 > 1 ) { 313 _response.code = code; 314 return _response; 315 } 316 static if ( __traits(compiles, cast(ubyte[])content) ) { 317 auto data = cast(ubyte[])content; 318 auto b = new ubyte[_bufferSize]; 319 for(size_t pos = 0; pos < data.length;) { 320 auto chunk = data.take(_bufferSize).array; 321 auto rc = dataStream.send(chunk); 322 if ( rc <= 0 ) { 323 debug(requests) trace("done"); 324 break; 325 } 326 debug(requests) tracef("sent %d bytes to data channel", rc); 327 pos += rc; 328 } 329 } else { 330 while (!content.empty) { 331 auto chunk = content.front; 332 debug(requests) trace("ftp posting %d of data chunk".format(chunk.length)); 333 auto rc = dataStream.send(chunk); 334 if ( rc <= 0 ) { 335 debug(requests) trace("done"); 336 break; 337 } 338 content.popFront; 339 } 340 } 341 dataStream.close(); 342 dataStream = null; 343 response = serverResponse(); 344 code = responseToCode(response); 345 if ( code/100 == 2 ) { 346 debug(requests) tracef("Successfully uploaded %d bytes", _response._responseBody.length); 347 } 348 _response.code = code; 349 return _response; 350 } 351 352 auto get(string uri = null) { 353 enforce( uri || _uri.host, "FTP URL undefined"); 354 string response; 355 ushort code; 356 357 _response = new FTPResponse; 358 _contentReceived = 0; 359 360 if ( uri ) { 361 handleChangeURI(uri); 362 } 363 364 _response.uri = _uri; 365 _response.finalURI = _uri; 366 367 if ( !_controlChannel ) { 368 _controlChannel = new TCPStream(); 369 _controlChannel.connect(_uri.host, _uri.port, _timeout); 370 response = serverResponse(); 371 _responseHistory ~= response; 372 373 code = responseToCode(response); 374 debug(requests) tracef("Server initial response: %s", response); 375 if ( code/100 > 2 ) { 376 _response.code = code; 377 return _response; 378 } 379 // Log in 380 string user, pass; 381 if ( _authenticator ) { 382 user = _authenticator.userName(); 383 pass = _authenticator.password(); 384 } 385 else{ 386 user = _uri.username.length ? _uri.username : "anonymous"; 387 pass = _uri.password.length ? _uri.password : "requests@"; 388 } 389 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 390 391 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 392 if ( code/100 > 3 ) { 393 _response.code = code; 394 return _response; 395 } else if ( code/100 == 3) { 396 397 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 398 if ( code/100 > 2 ) { 399 _response.code = code; 400 return _response; 401 } 402 } 403 404 } 405 406 code = sendCmdGetResponse("PWD\r\n"); 407 string pwd; 408 if ( code/100 == 2 ) { 409 // like '257 "/home/testuser"' 410 auto a = _responseHistory[$-1].split(); 411 if ( a.length > 1 ) { 412 pwd = a[1].chompPrefix(`"`).chomp(`"`); 413 } 414 } 415 scope (exit) { 416 if ( pwd && _controlChannel ) { 417 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n"); 418 } 419 } 420 421 auto path = dirName(_uri.path); 422 if ( path != "/") { 423 path = path.chompPrefix("/"); 424 } 425 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n"); 426 if ( code/100 > 2 ) { 427 _response.code = code; 428 return _response; 429 } 430 431 code = sendCmdGetResponse("TYPE I\r\n"); 432 if ( code/100 > 2 ) { 433 _response.code = code; 434 return _response; 435 } 436 437 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n"); 438 if ( code/100 == 2 ) { 439 // something like 440 // 213 229355520 441 auto s = _responseHistory[$-1].findSplitAfter(" "); 442 if ( s.length ) { 443 try { 444 _contentLength = to!long(s[1]); 445 } catch (ConvException) { 446 debug(requests) trace("Failed to convert string %s to file size".format(s[1])); 447 } 448 } 449 } 450 451 if ( _maxContentLength && _contentLength > _maxContentLength ) { 452 throw new RequestException("maxContentLength exceeded for ftp data"); 453 } 454 455 code = sendCmdGetResponse("PASV\r\n"); 456 if ( code/100 > 2 ) { 457 _response.code = code; 458 return _response; 459 } 460 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 461 // in last response. 462 // Cut anything between ( and ) 463 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 464 string host; 465 ushort port; 466 try { 467 ubyte a1,a2,a3,a4,p1,p2; 468 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 469 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 470 port = (p1<<8) + p2; 471 } catch (FormatException e) { 472 error("Failed to parse ", v); 473 _response.code = 500; 474 return _response; 475 } 476 477 auto dataStream = new TCPStream(); 478 scope (exit ) { 479 if ( dataStream !is null && !_response._receiveAsRange.activated ) { 480 dataStream.close(); 481 } 482 } 483 484 dataStream.connect(host, port, _timeout); 485 486 code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n"); 487 if ( code/100 > 1 ) { 488 _response.code = code; 489 return _response; 490 } 491 while ( true ) { 492 auto b = new ubyte[_bufferSize]; 493 auto rc = dataStream.receive(b); 494 if ( rc <= 0 ) { 495 debug(requests) trace("done"); 496 break; 497 } 498 debug(requests) tracef("got %d bytes from data channel", rc); 499 500 _contentReceived += rc; 501 _response._responseBody.putNoCopy(b[0..rc]); 502 503 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) { 504 throw new RequestException("maxContentLength exceeded for ftp data"); 505 } 506 if ( _useStreaming ) { 507 debug(requests) trace("ftp uses streaming"); 508 _response.receiveAsRange.activated = true; 509 _response.receiveAsRange.data.length = 0; 510 _response.receiveAsRange.data = _response._responseBody.data; 511 _response.receiveAsRange.read = delegate ubyte[] () { 512 Buffer!ubyte result; 513 while(true) { 514 // check if we received everything we need 515 if ( _contentReceived >= _maxContentLength ) 516 { 517 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 518 format(_contentLength, _maxContentLength)); 519 } 520 // have to continue 521 auto b = new ubyte[_bufferSize]; 522 ptrdiff_t read; 523 try { 524 read = dataStream.receive(b); 525 } 526 catch (Exception e) { 527 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 528 } 529 530 if ( read > 0 ) { 531 _contentReceived += read; 532 result.putNoCopy(b[0..read]); 533 return result.data; 534 } 535 if ( read == 0 ) { 536 debug(requests) tracef("streaming_in: server closed connection"); 537 dataStream.close(); 538 code = responseToCode(serverResponse()); 539 if ( code/100 == 2 ) { 540 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 541 } 542 _response.code = code; 543 break; 544 } 545 } 546 return result.data; 547 }; 548 return _response; 549 } 550 } 551 dataStream.close(); 552 response = serverResponse(); 553 code = responseToCode(response); 554 if ( code/100 == 2 ) { 555 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 556 } 557 _response.code = code; 558 return _response; 559 } 560 } 561 562 package unittest { 563 globalLogLevel(LogLevel.info); 564 info("testing ftp"); 565 auto rq = FTPRequest(); 566 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 567 auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation); 568 assert(rs.code == 226); 569 info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session."); 570 rs = rq.get("ftp://speedtest.tele2.net/nonexistent"); 571 assert(rs.code != 226); 572 info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session."); 573 rs = rq.get("ftp://speedtest.tele2.net/1KB.zip"); 574 assert(rs.code == 226); 575 assert(rs.responseBody.length == 1024); 576 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 577 rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation); 578 assert(rs.code == 226); 579 info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 580 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 581 assert(rs.code == 226); 582 info("ftp get ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT with authenticator"); 583 rq.authenticator = new FtpAuthentication("anonymous", "requests@"); 584 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 585 assert(rs.code == 226); 586 assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT"); 587 info("testing ftp - done."); 588 }