1 module requests.ftp; 2 3 private: 4 import std.ascii; 5 import std.algorithm; 6 import std.conv; 7 import std.datetime; 8 import std.format; 9 import std.exception; 10 import std.string; 11 import std.range; 12 import std.experimental.logger; 13 import std.stdio; 14 import std.path; 15 16 import requests.uri; 17 import requests.utils; 18 import requests.streams; 19 import requests.base; 20 21 public class FTPServerResponseError: Exception { 22 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 23 super(message, file, line, next); 24 } 25 } 26 27 public class FTPResponse : Response { 28 } 29 30 enum defaultBufferSize = 8192; 31 32 public struct FTPRequest { 33 private { 34 URI _uri; 35 Duration _timeout = 60.seconds; 36 uint _verbosity = 0; 37 size_t _bufferSize = defaultBufferSize; 38 long _maxContentLength = 5*1024*1024*1024; 39 long _contentLength = -1; 40 long _contentReceived; 41 NetworkStream _controlChannel; 42 string[] _responseHistory; 43 FTPResponse _response; 44 bool _useStreaming; 45 } 46 mixin(Getter_Setter!Duration("timeout")); 47 mixin(Getter_Setter!uint("verbosity")); 48 mixin(Getter_Setter!size_t("bufferSize")); 49 mixin(Getter_Setter!long("maxContentLength")); 50 mixin(Getter_Setter!bool("useStreaming")); 51 mixin(Getter!long("contentLength")); 52 mixin(Getter!long("contentReceived")); 53 54 @property final string[] responseHistory() @safe @nogc nothrow { 55 return _responseHistory; 56 } 57 this(string uri) { 58 _uri = URI(uri); 59 } 60 61 this(in URI uri) { 62 _uri = uri; 63 } 64 65 ~this() { 66 if ( _controlChannel ) { 67 _controlChannel.close(); 68 } 69 } 70 71 ushort sendCmdGetResponse(string cmd) { 72 tracef("cmd to server: %s", cmd.strip); 73 if ( _verbosity >=1 ) { 74 writefln("> %s", cmd.strip); 75 } 76 _controlChannel.send(cmd); 77 string response = serverResponse(); 78 _responseHistory ~= response; 79 return responseToCode(response); 80 } 81 82 ushort responseToCode(string response) pure const @safe { 83 return to!ushort(response[0..3]); 84 } 85 86 void handleChangeURI(in string uri) @safe { 87 // if control channel exists and new URL not match old, then close 88 URI newURI = URI(uri); 89 if ( _controlChannel && 90 (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) { 91 _controlChannel.close(); 92 _controlChannel = null; 93 } 94 _uri = newURI; 95 } 96 97 string serverResponse() { 98 string res, buffer; 99 immutable bufferLimit = 16*1024; 100 _controlChannel.readTimeout = _timeout; 101 scope(exit) { 102 _controlChannel.readTimeout = 0.seconds; 103 } 104 auto b = new ubyte[_bufferSize]; 105 while ( buffer.length < bufferLimit ) { 106 trace("Wait on control channel"); 107 ptrdiff_t rc; 108 try { 109 rc = _controlChannel.receive(b); 110 } 111 catch (Exception e) { 112 error("Failed to read response from server"); 113 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e); 114 } 115 tracef("Got %d bytes from control socket", rc); 116 if ( rc == 0 ) { 117 error("Failed to read response from server"); 118 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__); 119 } 120 if ( _verbosity >= 1 ) { 121 (cast(string)b[0..rc]). 122 splitLines. 123 each!(l=>writefln("< %s", l)); 124 } 125 buffer ~= b[0..rc]; 126 if ( buffer.endsWith('\n') ){ 127 auto responseLines = buffer. 128 splitLines. 129 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit); 130 if ( responseLines.count > 0 ) { 131 return responseLines.front; 132 } 133 } 134 } 135 throw new FTPServerResponseError("Failed to read server responce over control channel"); 136 assert(0); 137 } 138 139 auto post(R, A...)(string uri, R data, A args) { 140 enforce( uri || _uri.host, "FTP URL undefined"); 141 string response; 142 ushort code; 143 144 _response = new FTPResponse; 145 146 if ( uri ) { 147 handleChangeURI(uri); 148 } 149 150 _response.uri = _uri; 151 _response.finalURI = _uri; 152 153 if ( !_controlChannel ) { 154 _controlChannel = new TCPStream(); 155 _controlChannel.connect(_uri.host, _uri.port, _timeout); 156 response = serverResponse(); 157 _responseHistory ~= response; 158 159 code = responseToCode(response); 160 tracef("Server initial response: %s", response); 161 if ( code/100 > 2 ) { 162 _response.code = code; 163 return _response; 164 } 165 // Log in 166 string user = _uri.username.length ? _uri.username : "anonymous"; 167 string pass = _uri.password.length ? _uri.password : "requests@"; 168 tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 169 170 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 171 if ( code/100 > 3 ) { 172 _response.code = code; 173 return _response; 174 } else if ( code/100 == 3) { 175 176 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 177 if ( code/100 > 2 ) { 178 _response.code = code; 179 return _response; 180 } 181 } 182 183 } 184 185 code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n"); 186 if ( code/100 > 2 ) { 187 _response.code = code; 188 return _response; 189 } 190 191 code = sendCmdGetResponse("PASV\r\n"); 192 if ( code/100 > 2 ) { 193 _response.code = code; 194 return _response; 195 } 196 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 197 // in last response. 198 // Cut anything between ( and ) 199 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 200 string host; 201 ushort port; 202 try { 203 ubyte a1,a2,a3,a4,p1,p2; 204 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 205 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 206 port = (p1<<8) + p2; 207 } catch (FormatException e) { 208 error("Failed to parse ", v); 209 _response.code = 500; 210 return _response; 211 } 212 213 auto dataStream = new TCPStream(); 214 scope (exit ) { 215 if ( dataStream !is null ) { 216 dataStream.close(); 217 } 218 } 219 220 dataStream.connect(host, port, _timeout); 221 222 code = sendCmdGetResponse("TYPE I\r\n"); 223 if ( code/100 > 2 ) { 224 _response.code = code; 225 return _response; 226 } 227 228 code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n"); 229 if ( code/100 > 1 ) { 230 _response.code = code; 231 return _response; 232 } 233 auto b = new ubyte[_bufferSize]; 234 for(size_t pos = 0; pos < data.length;) { 235 auto chunk = data.take(_bufferSize).array; 236 auto rc = dataStream.send(chunk); 237 if ( rc <= 0 ) { 238 trace("done"); 239 break; 240 } 241 tracef("sent %d bytes to data channel", rc); 242 pos += rc; 243 } 244 dataStream.close(); 245 dataStream = null; 246 response = serverResponse(); 247 code = responseToCode(response); 248 if ( code/100 == 2 ) { 249 tracef("Successfully uploaded %d bytes", _response._responseBody.length); 250 } 251 _response.code = code; 252 return _response; 253 } 254 255 auto get(string uri = null) { 256 enforce( uri || _uri.host, "FTP URL undefined"); 257 string response; 258 ushort code; 259 260 _response = new FTPResponse; 261 262 if ( uri ) { 263 handleChangeURI(uri); 264 } 265 266 _response.uri = _uri; 267 _response.finalURI = _uri; 268 269 if ( !_controlChannel ) { 270 _controlChannel = new TCPStream(); 271 _controlChannel.connect(_uri.host, _uri.port, _timeout); 272 response = serverResponse(); 273 _responseHistory ~= response; 274 275 code = responseToCode(response); 276 tracef("Server initial response: %s", response); 277 if ( code/100 > 2 ) { 278 _response.code = code; 279 return _response; 280 } 281 // Log in 282 string user = _uri.username.length ? _uri.username : "anonymous"; 283 string pass = _uri.password.length ? _uri.password : "requests@"; 284 tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 285 286 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 287 if ( code/100 > 3 ) { 288 _response.code = code; 289 return _response; 290 } else if ( code/100 == 3) { 291 292 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 293 if ( code/100 > 2 ) { 294 _response.code = code; 295 return _response; 296 } 297 } 298 299 } 300 301 code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n"); 302 if ( code/100 > 2 ) { 303 _response.code = code; 304 return _response; 305 } 306 307 code = sendCmdGetResponse("TYPE I\r\n"); 308 if ( code/100 > 2 ) { 309 _response.code = code; 310 return _response; 311 } 312 313 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n"); 314 if ( code/100 == 2 ) { 315 // something like 316 // 213 229355520 317 auto s = _responseHistory[$-1].findSplitAfter(" "); 318 if ( s.length ) { 319 try { 320 _contentLength = to!long(s[1]); 321 } catch (ConvException) { 322 trace("Failed to convert string %s to file size".format(s[1])); 323 } 324 } 325 } 326 327 if ( _maxContentLength && _contentLength > _maxContentLength ) { 328 throw new RequestException("maxContentLength exceeded for ftp data"); 329 } 330 331 code = sendCmdGetResponse("PASV\r\n"); 332 if ( code/100 > 2 ) { 333 _response.code = code; 334 return _response; 335 } 336 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 337 // in last response. 338 // Cut anything between ( and ) 339 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 340 string host; 341 ushort port; 342 try { 343 ubyte a1,a2,a3,a4,p1,p2; 344 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 345 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 346 port = (p1<<8) + p2; 347 } catch (FormatException e) { 348 error("Failed to parse ", v); 349 _response.code = 500; 350 return _response; 351 } 352 353 auto dataStream = new TCPStream(); 354 scope (exit ) { 355 if ( dataStream !is null && !_response._receiveAsRange.activated ) { 356 dataStream.close(); 357 } 358 } 359 360 dataStream.connect(host, port, _timeout); 361 362 code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n"); 363 if ( code/100 > 1 ) { 364 _response.code = code; 365 return _response; 366 } 367 while ( true ) { 368 auto b = new ubyte[_bufferSize]; 369 auto rc = dataStream.receive(b); 370 if ( rc <= 0 ) { 371 trace("done"); 372 break; 373 } 374 tracef("got %d bytes from data channel", rc); 375 376 _contentReceived += rc; 377 _response._responseBody.putNoCopy(b[0..rc]); 378 379 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) { 380 throw new RequestException("maxContentLength exceeded for ftp data"); 381 } 382 if ( _useStreaming ) { 383 _response.receiveAsRange.activated = true; 384 _response.receiveAsRange.data = _response._responseBody.data; 385 _response.receiveAsRange.read = delegate ubyte[] () { 386 Buffer!ubyte result; 387 while(true) { 388 // check if we received everything we need 389 if ( _contentReceived >= _maxContentLength ) 390 { 391 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 392 format(_contentLength, _maxContentLength)); 393 } 394 // have to continue 395 auto b = new ubyte[_bufferSize]; 396 ptrdiff_t read; 397 try { 398 read = dataStream.receive(b); 399 } 400 catch (Exception e) { 401 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 402 } 403 404 if ( read > 0 ) { 405 _contentReceived += read; 406 result.putNoCopy(b[0..read]); 407 return result.data; 408 } 409 if ( read == 0 ) { 410 debug tracef("streaming_in: server closed connection"); 411 dataStream.close(); 412 code = responseToCode(serverResponse()); 413 if ( code/100 == 2 ) { 414 tracef("Successfully received %d bytes", _response._responseBody.length); 415 } 416 _response.code = code; 417 break; 418 } 419 } 420 return result.data; 421 }; 422 return _response; 423 } 424 } 425 dataStream.close(); 426 response = serverResponse(); 427 code = responseToCode(response); 428 if ( code/100 == 2 ) { 429 tracef("Successfully received %d bytes", _response._responseBody.length); 430 } 431 _response.code = code; 432 return _response; 433 } 434 } 435 436 package unittest { 437 globalLogLevel(LogLevel.info ); 438 info("testing ftp"); 439 auto rq = FTPRequest(); 440 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 441 auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation); 442 assert(rs.code == 226); 443 info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session."); 444 rs = rq.get("ftp://speedtest.tele2.net/nonexistent"); 445 assert(rs.code != 226); 446 info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session."); 447 rs = rq.get("ftp://speedtest.tele2.net/1KB.zip"); 448 assert(rs.code == 226); 449 assert(rs.responseBody.length == 1024); 450 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 451 rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation); 452 assert(rs.code == 226); 453 info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 454 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 455 assert(rs.code == 226); 456 assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT"); 457 info("testing ftp - done."); 458 }