1 module requests.ftp; 2 3 private: 4 import std.ascii; 5 import std.algorithm; 6 import std.conv; 7 import std.datetime; 8 import std.format; 9 import std.socket; 10 import std.exception; 11 import std.string; 12 import std.range; 13 import std.experimental.logger; 14 import std.stdio; 15 import std.path; 16 17 import core.stdc.errno; 18 19 import requests.uri; 20 import requests.utils; 21 import requests.streams; 22 import requests.base; 23 24 public class FTPServerResponseError: Exception { 25 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 26 super(msg, file, line); 27 } 28 } 29 30 public class FTPResponse : Response { 31 } 32 33 enum defaultBufferSize = 8192; 34 35 public struct FTPRequest { 36 private { 37 URI _uri; 38 Duration _timeout = 60.seconds; 39 uint _verbosity = 0; 40 size_t _bufferSize = defaultBufferSize; 41 long _maxContentLength = 5*1024*1024*1024; 42 long _contentLength = -1; 43 long _contentReceived; 44 SocketStream _controlChannel; 45 string[] _responseHistory; 46 FTPResponse _response; 47 bool _useStreaming; 48 } 49 mixin(Getter_Setter!Duration("timeout")); 50 mixin(Getter_Setter!uint("verbosity")); 51 mixin(Getter_Setter!size_t("bufferSize")); 52 mixin(Getter_Setter!long("maxContentLength")); 53 mixin(Getter_Setter!bool("useStreaming")); 54 mixin(Getter!long("contentLength")); 55 mixin(Getter!long("contentReceived")); 56 57 @property final string[] responseHistory() @safe @nogc nothrow { 58 return _responseHistory; 59 } 60 this(string uri) { 61 _uri = URI(uri); 62 } 63 64 this(in URI uri) { 65 _uri = uri; 66 } 67 68 ~this() { 69 if ( _controlChannel ) { 70 _controlChannel.close(); 71 } 72 } 73 74 ushort sendCmdGetResponse(string cmd) { 75 tracef("cmd to server: %s", cmd.strip); 76 if ( _verbosity >=1 ) { 77 writefln("> %s", cmd.strip); 78 } 79 _controlChannel.send(cmd); 80 string response = serverResponse(); 81 _responseHistory ~= response; 82 return responseToCode(response); 83 } 84 85 ushort responseToCode(string response) pure const @safe { 86 return to!ushort(response[0..3]); 87 } 88 89 void handleChangeURI(in string uri) @safe { 90 // if control channel exists and new URL not match old, then close 91 URI newURI = URI(uri); 92 if ( _controlChannel && 93 (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) { 94 _controlChannel.close(); 95 _controlChannel = null; 96 } 97 _uri = newURI; 98 } 99 100 string serverResponse() { 101 string res, buffer; 102 immutable bufferLimit = 16*1024; 103 _controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, _timeout); 104 scope(exit) { 105 _controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 0.seconds); 106 } 107 auto b = new ubyte[_bufferSize]; 108 while ( buffer.length < bufferLimit ) { 109 trace("Wait on control channel"); 110 auto rc = _controlChannel.receive(b); 111 version(Posix) { 112 if ( rc < 0 && errno == EINTR ) { 113 continue; 114 } 115 } 116 tracef("Got %d bytes from control socket", rc); 117 if ( rc <= 0 ) { 118 error("Failed to read response from server"); 119 throw new FTPServerResponseError("Failed to read server responce over control channel: rc=%d, errno: %d".format(rc, errno())); 120 } 121 if ( _verbosity >= 1 ) { 122 (cast(string)b[0..rc]). 123 splitLines. 124 each!(l=>writefln("< %s", l)); 125 } 126 buffer ~= b[0..rc]; 127 if ( buffer.endsWith('\n') ){ 128 auto responseLines = buffer. 129 splitLines. 130 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit); 131 if ( responseLines.count > 0 ) { 132 return responseLines.front; 133 } 134 } 135 } 136 throw new FTPServerResponseError("Failed to read server responce over control channel"); 137 assert(0); 138 } 139 140 auto post(R, A...)(string uri, R data, A args) { 141 enforce( uri || _uri.host, "FTP URL undefined"); 142 string response; 143 ushort code; 144 145 _response = new FTPResponse; 146 147 if ( uri ) { 148 handleChangeURI(uri); 149 } 150 151 _response.uri = _uri; 152 _response.finalURI = _uri; 153 154 if ( !_controlChannel ) { 155 _controlChannel = new TCPSocketStream(); 156 _controlChannel.connect(_uri.host, _uri.port, _timeout); 157 response = serverResponse(); 158 _responseHistory ~= response; 159 160 code = responseToCode(response); 161 tracef("Server initial response: %s", response); 162 if ( code/100 > 2 ) { 163 _response.code = code; 164 return _response; 165 } 166 // Log in 167 string user = _uri.username.length ? _uri.username : "anonymous"; 168 string pass = _uri.password.length ? _uri.password : "requests@"; 169 tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 170 171 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 172 if ( code/100 > 3 ) { 173 _response.code = code; 174 return _response; 175 } else if ( code/100 == 3) { 176 177 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 178 if ( code/100 > 2 ) { 179 _response.code = code; 180 return _response; 181 } 182 } 183 184 } 185 186 code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n"); 187 if ( code/100 > 2 ) { 188 _response.code = code; 189 return _response; 190 } 191 192 code = sendCmdGetResponse("PASV\r\n"); 193 if ( code/100 > 2 ) { 194 _response.code = code; 195 return _response; 196 } 197 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 198 // in last response. 199 // Cut anything between ( and ) 200 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 201 string host; 202 ushort port; 203 try { 204 ubyte a1,a2,a3,a4,p1,p2; 205 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 206 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 207 port = (p1<<8) + p2; 208 } catch (FormatException e) { 209 error("Failed to parse ", v); 210 _response.code = 500; 211 return _response; 212 } 213 214 auto dataStream = new TCPSocketStream(); 215 scope (exit ) { 216 if ( dataStream !is null ) { 217 dataStream.close(); 218 } 219 } 220 221 dataStream.connect(host, port, _timeout); 222 223 code = sendCmdGetResponse("TYPE I\r\n"); 224 if ( code/100 > 2 ) { 225 _response.code = code; 226 return _response; 227 } 228 229 code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n"); 230 if ( code/100 > 1 ) { 231 _response.code = code; 232 return _response; 233 } 234 auto b = new ubyte[_bufferSize]; 235 for(size_t pos = 0; pos < data.length;) { 236 auto chunk = data.take(_bufferSize).array; 237 auto rc = dataStream.send(chunk); 238 if ( rc <= 0 ) { 239 trace("done"); 240 break; 241 } 242 tracef("sent %d bytes to data channel", rc); 243 pos += rc; 244 } 245 dataStream.close(); 246 dataStream = null; 247 response = serverResponse(); 248 code = responseToCode(response); 249 if ( code/100 == 2 ) { 250 tracef("Successfully uploaded %d bytes", _response._responseBody.length); 251 } 252 _response.code = code; 253 return _response; 254 } 255 256 auto get(string uri = null) { 257 enforce( uri || _uri.host, "FTP URL undefined"); 258 string response; 259 ushort code; 260 261 _response = new FTPResponse; 262 263 if ( uri ) { 264 handleChangeURI(uri); 265 } 266 267 _response.uri = _uri; 268 _response.finalURI = _uri; 269 270 if ( !_controlChannel ) { 271 _controlChannel = new TCPSocketStream(); 272 _controlChannel.connect(_uri.host, _uri.port, _timeout); 273 response = serverResponse(); 274 _responseHistory ~= response; 275 276 code = responseToCode(response); 277 tracef("Server initial response: %s", response); 278 if ( code/100 > 2 ) { 279 _response.code = code; 280 return _response; 281 } 282 // Log in 283 string user = _uri.username.length ? _uri.username : "anonymous"; 284 string pass = _uri.password.length ? _uri.password : "requests@"; 285 tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 286 287 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 288 if ( code/100 > 3 ) { 289 _response.code = code; 290 return _response; 291 } else if ( code/100 == 3) { 292 293 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 294 if ( code/100 > 2 ) { 295 _response.code = code; 296 return _response; 297 } 298 } 299 300 } 301 302 code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n"); 303 if ( code/100 > 2 ) { 304 _response.code = code; 305 return _response; 306 } 307 308 code = sendCmdGetResponse("TYPE I\r\n"); 309 if ( code/100 > 2 ) { 310 _response.code = code; 311 return _response; 312 } 313 314 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n"); 315 if ( code/100 == 2 ) { 316 // something like 317 // 213 229355520 318 auto s = _responseHistory[$-1].findSplitAfter(" "); 319 if ( s.length ) { 320 try { 321 _contentLength = to!long(s[1]); 322 } catch (ConvException) { 323 trace("Failed to convert string %s to file size".format(s[1])); 324 } 325 } 326 } 327 328 if ( _maxContentLength && _contentLength > _maxContentLength ) { 329 throw new RequestException("maxContentLength exceeded for ftp data"); 330 } 331 332 code = sendCmdGetResponse("PASV\r\n"); 333 if ( code/100 > 2 ) { 334 _response.code = code; 335 return _response; 336 } 337 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 338 // in last response. 339 // Cut anything between ( and ) 340 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 341 string host; 342 ushort port; 343 try { 344 ubyte a1,a2,a3,a4,p1,p2; 345 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 346 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 347 port = (p1<<8) + p2; 348 } catch (FormatException e) { 349 error("Failed to parse ", v); 350 _response.code = 500; 351 return _response; 352 } 353 354 auto dataStream = new TCPSocketStream(); 355 scope (exit ) { 356 if ( dataStream !is null && !_response._receiveAsRange.activated ) { 357 dataStream.close(); 358 } 359 } 360 361 dataStream.connect(host, port, _timeout); 362 363 code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n"); 364 if ( code/100 > 1 ) { 365 _response.code = code; 366 return _response; 367 } 368 while ( true ) { 369 auto b = new ubyte[_bufferSize]; 370 auto rc = dataStream.receive(b); 371 if ( rc <= 0 ) { 372 trace("done"); 373 break; 374 } 375 tracef("got %d bytes from data channel", rc); 376 377 _contentReceived += rc; 378 _response._responseBody.putNoCopy(b[0..rc]); 379 380 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) { 381 throw new RequestException("maxContentLength exceeded for ftp data"); 382 } 383 if ( _useStreaming ) { 384 _response.receiveAsRange.activated = true; 385 _response.receiveAsRange.data = _response._responseBody; 386 _response.receiveAsRange.read = delegate Buffer!ubyte () { 387 Buffer!ubyte result; 388 while(true) { 389 // check if we received everything we need 390 if ( _contentReceived >= _maxContentLength ) 391 { 392 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 393 format(_contentLength, _maxContentLength)); 394 } 395 // have to continue 396 auto b = new ubyte[_bufferSize]; 397 auto read = dataStream.receive(b); 398 399 if ( read > 0 ) { 400 _contentReceived += read; 401 result.putNoCopy(b[0..read]); 402 return result; 403 } 404 if ( read < 0 ) { 405 version(Posix) { 406 if ( errno == EINTR ) { 407 continue; 408 } 409 } 410 throw new RequestException("streaming_in error reading from socket"); 411 } 412 if ( read == 0 ) { 413 debug tracef("streaming_in: server closed connection"); 414 dataStream.close(); 415 code = responseToCode(serverResponse()); 416 if ( code/100 == 2 ) { 417 tracef("Successfully received %d bytes", _response._responseBody.length); 418 } 419 _response.code = code; 420 break; 421 } 422 } 423 return result; 424 }; 425 return _response; 426 } 427 } 428 dataStream.close(); 429 response = serverResponse(); 430 code = responseToCode(response); 431 if ( code/100 == 2 ) { 432 tracef("Successfully received %d bytes", _response._responseBody.length); 433 } 434 _response.code = code; 435 return _response; 436 } 437 } 438 439 package unittest { 440 globalLogLevel(LogLevel.info ); 441 info("testing ftp"); 442 auto rq = FTPRequest(); 443 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 444 auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation); 445 assert(rs.code == 226); 446 info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session."); 447 rs = rq.get("ftp://speedtest.tele2.net/nonexistent"); 448 assert(rs.code != 226); 449 info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session."); 450 rs = rq.get("ftp://speedtest.tele2.net/1KB.zip"); 451 assert(rs.code == 226); 452 assert(rs.responseBody.length == 1024); 453 info("ftp get ", "ftp://ftp.uni-bayreuth.de/README"); 454 rs = rq.get("ftp://ftp.uni-bayreuth.de/README"); 455 assert(rs.code == 226); 456 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 457 rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation); 458 assert(rs.code == 226); 459 info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 460 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 461 assert(rs.code == 226); 462 assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT"); 463 info("testing ftp - done."); 464 }