1 module requests.ftp; 2 3 private: 4 import std.ascii; 5 import std.algorithm; 6 import std.conv; 7 import std.datetime; 8 import std.format; 9 import std.exception; 10 import std.string; 11 import std.range; 12 import std.experimental.logger; 13 import std.stdio; 14 import std.path; 15 import std.traits; 16 17 import requests.uri; 18 import requests.utils; 19 import requests.streams; 20 import requests.base; 21 22 public class FTPServerResponseError: Exception { 23 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 24 super(message, file, line, next); 25 } 26 } 27 28 public class FTPResponse : Response { 29 } 30 31 enum defaultBufferSize = 8192; 32 33 public struct FTPRequest { 34 private { 35 URI _uri; 36 Duration _timeout = 60.seconds; 37 uint _verbosity = 0; 38 size_t _bufferSize = defaultBufferSize; 39 long _maxContentLength = 5*1024*1024*1024; 40 long _contentLength = -1; 41 long _contentReceived; 42 NetworkStream _controlChannel; 43 string[] _responseHistory; 44 FTPResponse _response; 45 bool _useStreaming; 46 } 47 mixin(Getter_Setter!Duration("timeout")); 48 mixin(Getter_Setter!uint("verbosity")); 49 mixin(Getter_Setter!size_t("bufferSize")); 50 mixin(Getter_Setter!long("maxContentLength")); 51 mixin(Getter_Setter!bool("useStreaming")); 52 mixin(Getter!long("contentLength")); 53 mixin(Getter!long("contentReceived")); 54 55 @property final string[] responseHistory() @safe @nogc nothrow { 56 return _responseHistory; 57 } 58 this(string uri) { 59 _uri = URI(uri); 60 } 61 62 this(in URI uri) { 63 _uri = uri; 64 } 65 66 ~this() { 67 if ( _controlChannel ) { 68 _controlChannel.close(); 69 } 70 } 71 72 ushort sendCmdGetResponse(string cmd) { 73 debug(requests) tracef("cmd to server: %s", cmd.strip); 74 if ( _verbosity >=1 ) { 75 writefln("> %s", cmd.strip); 76 } 77 _controlChannel.send(cmd); 78 string response = serverResponse(); 79 _responseHistory ~= response; 80 return responseToCode(response); 81 } 82 83 ushort responseToCode(string response) pure const @safe { 84 return to!ushort(response[0..3]); 85 } 86 87 void handleChangeURI(in string uri) @safe { 88 // if control channel exists and new URL not match old, then close 89 URI newURI = URI(uri); 90 if ( _controlChannel && 91 (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) { 92 _controlChannel.close(); 93 _controlChannel = null; 94 } 95 _uri = newURI; 96 } 97 98 string serverResponse() { 99 string res, buffer; 100 immutable bufferLimit = 16*1024; 101 _controlChannel.readTimeout = _timeout; 102 scope(exit) { 103 _controlChannel.readTimeout = 0.seconds; 104 } 105 auto b = new ubyte[_bufferSize]; 106 while ( buffer.length < bufferLimit ) { 107 debug(requests) trace("Wait on control channel"); 108 ptrdiff_t rc; 109 try { 110 rc = _controlChannel.receive(b); 111 } 112 catch (Exception e) { 113 error("Failed to read response from server"); 114 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e); 115 } 116 debug(requests) tracef("Got %d bytes from control socket", rc); 117 if ( rc == 0 ) { 118 error("Failed to read response from server"); 119 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__); 120 } 121 if ( _verbosity >= 1 ) { 122 (cast(string)b[0..rc]). 123 splitLines. 124 each!(l=>writefln("< %s", l)); 125 } 126 buffer ~= b[0..rc]; 127 if ( buffer.endsWith('\n') ){ 128 auto responseLines = buffer. 129 splitLines. 130 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit); 131 if ( responseLines.count > 0 ) { 132 return responseLines.front; 133 } 134 } 135 } 136 throw new FTPServerResponseError("Failed to read server responce over control channel"); 137 assert(0); 138 } 139 ushort tryCdOrCreatePath(string[] path) { 140 /* 141 * At start we stay at original path, we have to create next path element 142 * For example: 143 * path = ["", "a", "b"] - we stay in root (path[0]), we have to cd and return ok 144 * or try to cteate "a" and cd to "a". 145 */ 146 debug(requests) info("Trying to create path %s".format(path)); 147 enforce(path.length>=2, "You called tryCdOrCreate, but there is nothing to create: %s".format(path)); 148 auto next_dir = path[1]; 149 auto code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n"); 150 if ( code >= 300) { 151 // try to create, then again CWD 152 code = sendCmdGetResponse("MKD " ~ next_dir ~ "\r\n"); 153 if ( code > 300 ) { 154 return code; 155 } 156 code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n"); 157 } 158 if ( path.length == 2 ) { 159 return code; 160 } 161 return tryCdOrCreatePath(path[1..$]); 162 } 163 auto post(R, A...)(string uri, R content, A args) 164 if ( __traits(compiles, cast(ubyte[])content) 165 || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 166 || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte))) 167 ) { 168 enforce( uri || _uri.host, "FTP URL undefined"); 169 string response; 170 ushort code; 171 172 _response = new FTPResponse; 173 174 if ( uri ) { 175 handleChangeURI(uri); 176 } 177 178 _response.uri = _uri; 179 _response.finalURI = _uri; 180 181 if ( !_controlChannel ) { 182 _controlChannel = new TCPStream(); 183 _controlChannel.connect(_uri.host, _uri.port, _timeout); 184 response = serverResponse(); 185 _responseHistory ~= response; 186 187 code = responseToCode(response); 188 debug(requests) tracef("Server initial response: %s", response); 189 if ( code/100 > 2 ) { 190 _response.code = code; 191 return _response; 192 } 193 // Log in 194 string user = _uri.username.length ? _uri.username : "anonymous"; 195 string pass = _uri.password.length ? _uri.password : "requests@"; 196 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 197 198 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 199 if ( code/100 > 3 ) { 200 _response.code = code; 201 return _response; 202 } else if ( code/100 == 3) { 203 204 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 205 if ( code/100 > 2 ) { 206 _response.code = code; 207 return _response; 208 } 209 } 210 211 } 212 code = sendCmdGetResponse("PWD\r\n"); 213 string pwd; 214 if ( code/100 == 2 ) { 215 // like '257 "/home/testuser"' 216 auto a = _responseHistory[$-1].split(); 217 if ( a.length > 1 ) { 218 pwd = a[1].chompPrefix(`"`).chomp(`"`); 219 } 220 } 221 scope (exit) { 222 if ( pwd && _controlChannel ) { 223 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n"); 224 } 225 } 226 227 auto path = dirName(_uri.path); 228 if ( path != "/") { 229 path = path.chompPrefix("/"); 230 } 231 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n"); 232 if ( code == 550 ) { 233 // try to create directory end enter it 234 code = tryCdOrCreatePath(dirName(_uri.path).split('/')); 235 } 236 if ( code/100 > 2 ) { 237 _response.code = code; 238 return _response; 239 } 240 241 code = sendCmdGetResponse("PASV\r\n"); 242 if ( code/100 > 2 ) { 243 _response.code = code; 244 return _response; 245 } 246 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 247 // in last response. 248 // Cut anything between ( and ) 249 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 250 string host; 251 ushort port; 252 try { 253 ubyte a1,a2,a3,a4,p1,p2; 254 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 255 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 256 port = (p1<<8) + p2; 257 } catch (FormatException e) { 258 error("Failed to parse ", v); 259 _response.code = 500; 260 return _response; 261 } 262 263 auto dataStream = new TCPStream(); 264 scope (exit ) { 265 if ( dataStream !is null ) { 266 dataStream.close(); 267 } 268 } 269 270 dataStream.connect(host, port, _timeout); 271 272 code = sendCmdGetResponse("TYPE I\r\n"); 273 if ( code/100 > 2 ) { 274 _response.code = code; 275 return _response; 276 } 277 278 code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n"); 279 if ( code/100 > 1 ) { 280 _response.code = code; 281 return _response; 282 } 283 static if ( __traits(compiles, cast(ubyte[])content) ) { 284 auto data = cast(ubyte[])content; 285 auto b = new ubyte[_bufferSize]; 286 for(size_t pos = 0; pos < data.length;) { 287 auto chunk = data.take(_bufferSize).array; 288 auto rc = dataStream.send(chunk); 289 if ( rc <= 0 ) { 290 debug(requests) trace("done"); 291 break; 292 } 293 debug(requests) tracef("sent %d bytes to data channel", rc); 294 pos += rc; 295 } 296 } else { 297 while (!content.empty) { 298 auto chunk = content.front; 299 debug(requests) trace("ftp posting %d of data chunk".format(chunk.length)); 300 auto rc = dataStream.send(chunk); 301 if ( rc <= 0 ) { 302 debug(requests) trace("done"); 303 break; 304 } 305 content.popFront; 306 } 307 } 308 dataStream.close(); 309 dataStream = null; 310 response = serverResponse(); 311 code = responseToCode(response); 312 if ( code/100 == 2 ) { 313 debug(requests) tracef("Successfully uploaded %d bytes", _response._responseBody.length); 314 } 315 _response.code = code; 316 return _response; 317 } 318 319 auto get(string uri = null) { 320 enforce( uri || _uri.host, "FTP URL undefined"); 321 string response; 322 ushort code; 323 324 _response = new FTPResponse; 325 _contentReceived = 0; 326 327 if ( uri ) { 328 handleChangeURI(uri); 329 } 330 331 _response.uri = _uri; 332 _response.finalURI = _uri; 333 334 if ( !_controlChannel ) { 335 _controlChannel = new TCPStream(); 336 _controlChannel.connect(_uri.host, _uri.port, _timeout); 337 response = serverResponse(); 338 _responseHistory ~= response; 339 340 code = responseToCode(response); 341 debug(requests) tracef("Server initial response: %s", response); 342 if ( code/100 > 2 ) { 343 _response.code = code; 344 return _response; 345 } 346 // Log in 347 string user = _uri.username.length ? _uri.username : "anonymous"; 348 string pass = _uri.password.length ? _uri.password : "requests@"; 349 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 350 351 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 352 if ( code/100 > 3 ) { 353 _response.code = code; 354 return _response; 355 } else if ( code/100 == 3) { 356 357 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 358 if ( code/100 > 2 ) { 359 _response.code = code; 360 return _response; 361 } 362 } 363 364 } 365 366 code = sendCmdGetResponse("PWD\r\n"); 367 string pwd; 368 if ( code/100 == 2 ) { 369 // like '257 "/home/testuser"' 370 auto a = _responseHistory[$-1].split(); 371 if ( a.length > 1 ) { 372 pwd = a[1].chompPrefix(`"`).chomp(`"`); 373 } 374 } 375 scope (exit) { 376 if ( pwd && _controlChannel ) { 377 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n"); 378 } 379 } 380 381 auto path = dirName(_uri.path); 382 if ( path != "/") { 383 path = path.chompPrefix("/"); 384 } 385 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n"); 386 if ( code/100 > 2 ) { 387 _response.code = code; 388 return _response; 389 } 390 391 code = sendCmdGetResponse("TYPE I\r\n"); 392 if ( code/100 > 2 ) { 393 _response.code = code; 394 return _response; 395 } 396 397 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n"); 398 if ( code/100 == 2 ) { 399 // something like 400 // 213 229355520 401 auto s = _responseHistory[$-1].findSplitAfter(" "); 402 if ( s.length ) { 403 try { 404 _contentLength = to!long(s[1]); 405 } catch (ConvException) { 406 debug(requests) trace("Failed to convert string %s to file size".format(s[1])); 407 } 408 } 409 } 410 411 if ( _maxContentLength && _contentLength > _maxContentLength ) { 412 throw new RequestException("maxContentLength exceeded for ftp data"); 413 } 414 415 code = sendCmdGetResponse("PASV\r\n"); 416 if ( code/100 > 2 ) { 417 _response.code = code; 418 return _response; 419 } 420 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 421 // in last response. 422 // Cut anything between ( and ) 423 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 424 string host; 425 ushort port; 426 try { 427 ubyte a1,a2,a3,a4,p1,p2; 428 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 429 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 430 port = (p1<<8) + p2; 431 } catch (FormatException e) { 432 error("Failed to parse ", v); 433 _response.code = 500; 434 return _response; 435 } 436 437 auto dataStream = new TCPStream(); 438 scope (exit ) { 439 if ( dataStream !is null && !_response._receiveAsRange.activated ) { 440 dataStream.close(); 441 } 442 } 443 444 dataStream.connect(host, port, _timeout); 445 446 code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n"); 447 if ( code/100 > 1 ) { 448 _response.code = code; 449 return _response; 450 } 451 while ( true ) { 452 auto b = new ubyte[_bufferSize]; 453 auto rc = dataStream.receive(b); 454 if ( rc <= 0 ) { 455 debug(requests) trace("done"); 456 break; 457 } 458 debug(requests) tracef("got %d bytes from data channel", rc); 459 460 _contentReceived += rc; 461 _response._responseBody.putNoCopy(b[0..rc]); 462 463 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) { 464 throw new RequestException("maxContentLength exceeded for ftp data"); 465 } 466 if ( _useStreaming ) { 467 debug(requests) trace("ftp uses streaming"); 468 _response.receiveAsRange.activated = true; 469 _response.receiveAsRange.data.length = 0; 470 _response.receiveAsRange.data = _response._responseBody.data; 471 _response.receiveAsRange.read = delegate ubyte[] () { 472 Buffer!ubyte result; 473 while(true) { 474 // check if we received everything we need 475 if ( _contentReceived >= _maxContentLength ) 476 { 477 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 478 format(_contentLength, _maxContentLength)); 479 } 480 // have to continue 481 auto b = new ubyte[_bufferSize]; 482 ptrdiff_t read; 483 try { 484 read = dataStream.receive(b); 485 } 486 catch (Exception e) { 487 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 488 } 489 490 if ( read > 0 ) { 491 _contentReceived += read; 492 result.putNoCopy(b[0..read]); 493 return result.data; 494 } 495 if ( read == 0 ) { 496 debug(requests) tracef("streaming_in: server closed connection"); 497 dataStream.close(); 498 code = responseToCode(serverResponse()); 499 if ( code/100 == 2 ) { 500 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 501 } 502 _response.code = code; 503 break; 504 } 505 } 506 return result.data; 507 }; 508 return _response; 509 } 510 } 511 dataStream.close(); 512 response = serverResponse(); 513 code = responseToCode(response); 514 if ( code/100 == 2 ) { 515 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 516 } 517 _response.code = code; 518 return _response; 519 } 520 } 521 522 package unittest { 523 globalLogLevel(LogLevel.info); 524 info("testing ftp"); 525 auto rq = FTPRequest(); 526 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 527 auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation); 528 assert(rs.code == 226); 529 info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session."); 530 rs = rq.get("ftp://speedtest.tele2.net/nonexistent"); 531 assert(rs.code != 226); 532 info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session."); 533 rs = rq.get("ftp://speedtest.tele2.net/1KB.zip"); 534 assert(rs.code == 226); 535 assert(rs.responseBody.length == 1024); 536 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 537 rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation); 538 assert(rs.code == 226); 539 info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 540 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 541 assert(rs.code == 226); 542 assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT"); 543 info("testing ftp - done."); 544 }