1 module requests.ftp; 2 3 private: 4 import std.ascii; 5 import std.algorithm; 6 import std.conv; 7 import std.datetime; 8 import std.format; 9 import std.socket; 10 import std.exception; 11 import std.string; 12 import std.range; 13 import std.experimental.logger; 14 import std.stdio; 15 import std.path; 16 17 import core.stdc.errno; 18 19 import requests.uri; 20 import requests.utils; 21 import requests.streams; 22 import requests.base; 23 24 public class FTPServerResponseError: Exception { 25 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe pure { 26 super(msg, file, line); 27 } 28 } 29 30 public class FTPResponse : Response { 31 } 32 33 public struct FTPRequest { 34 private { 35 URI _uri; 36 Duration _timeout = 60.seconds; 37 uint _verbosity = 0; 38 size_t _bufferSize = 16*1024; // 16k 39 size_t _maxContentLength; 40 size_t _contentLength; 41 SocketStream _controlChannel; 42 string[] _responseHistory; 43 FTPResponse _response; 44 } 45 mixin(Getter_Setter!Duration("timeout")); 46 mixin(Getter_Setter!uint("verbosity")); 47 mixin(Getter_Setter!size_t("bufferSize")); 48 mixin(Getter_Setter!size_t("maxContentLength")); 49 50 @property final string[] responseHistory() @safe @nogc nothrow { 51 return _responseHistory; 52 } 53 this(string uri) { 54 _uri = URI(uri); 55 } 56 57 this(in URI uri) { 58 _uri = uri; 59 } 60 61 ~this() { 62 if ( _controlChannel ) { 63 _controlChannel.close(); 64 } 65 } 66 67 ushort sendCmdGetResponse(string cmd) { 68 tracef("cmd to server: %s", cmd.strip); 69 if ( _verbosity >=1 ) { 70 writefln("> %s", cmd.strip); 71 } 72 _controlChannel.send(cmd); 73 string response = serverResponse(); 74 _responseHistory ~= response; 75 return responseToCode(response); 76 } 77 78 ushort responseToCode(string response) pure const @safe { 79 return to!ushort(response[0..3]); 80 } 81 82 void handleChangeURI(in string uri) @safe { 83 // if control channel exists and new URL not match old, then close 84 URI newURI = URI(uri); 85 if ( _controlChannel && 86 (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) { 87 _controlChannel.close(); 88 _controlChannel = null; 89 } 90 _uri = newURI; 91 } 92 93 string serverResponse() { 94 string res, buffer; 95 immutable bufferLimit = 16*1024; 96 _controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, _timeout); 97 scope(exit) { 98 _controlChannel.so.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 0.seconds); 99 } 100 auto b = new ubyte[_bufferSize]; 101 while ( buffer.length < bufferLimit ) { 102 trace("Wait on control channel"); 103 auto rc = _controlChannel.receive(b); 104 version(Posix) { 105 if ( rc < 0 && errno == EINTR ) { 106 continue; 107 } 108 } 109 tracef("Got %d bytes from control socket", rc); 110 if ( rc <= 0 ) { 111 error("Failed to read response from server"); 112 throw new FTPServerResponseError("Failed to read server responce over control channel: rc=%d, errno: %d".format(rc, errno())); 113 } 114 if ( _verbosity >= 1 ) { 115 (cast(string)b[0..rc]). 116 splitLines. 117 each!(l=>writefln("< %s", l)); 118 } 119 buffer ~= b[0..rc]; 120 if ( buffer.endsWith('\n') ){ 121 auto responseLines = buffer. 122 splitLines. 123 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit); 124 if ( responseLines.count > 0 ) { 125 return responseLines.front; 126 } 127 } 128 } 129 throw new FTPServerResponseError("Failed to read server responce over control channel"); 130 assert(0); 131 } 132 133 auto post(R, A...)(string uri, R data, A args) { 134 enforce( uri || _uri.host, "FTP URL undefined"); 135 string response; 136 ushort code; 137 138 _response = new FTPResponse; 139 140 if ( uri ) { 141 handleChangeURI(uri); 142 } 143 144 _response.uri = _uri; 145 _response.finalURI = _uri; 146 147 if ( !_controlChannel ) { 148 _controlChannel = new TCPSocketStream(); 149 _controlChannel.connect(_uri.host, _uri.port, _timeout); 150 response = serverResponse(); 151 _responseHistory ~= response; 152 153 code = responseToCode(response); 154 tracef("Server initial response: %s", response); 155 if ( code/100 > 2 ) { 156 _response.code = code; 157 return _response; 158 } 159 // Log in 160 string user = _uri.username.length ? _uri.username : "anonymous"; 161 string pass = _uri.password.length ? _uri.password : "requests@"; 162 tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 163 164 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 165 if ( code/100 > 3 ) { 166 _response.code = code; 167 return _response; 168 } else if ( code/100 == 3) { 169 170 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 171 if ( code/100 > 2 ) { 172 _response.code = code; 173 return _response; 174 } 175 } 176 177 } 178 179 code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n"); 180 if ( code/100 > 2 ) { 181 _response.code = code; 182 return _response; 183 } 184 185 code = sendCmdGetResponse("PASV\r\n"); 186 if ( code/100 > 2 ) { 187 _response.code = code; 188 return _response; 189 } 190 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 191 // in last response. 192 // Cut anything between ( and ) 193 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 194 string host; 195 ushort port; 196 try { 197 ubyte a1,a2,a3,a4,p1,p2; 198 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 199 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 200 port = (p1<<8) + p2; 201 } catch (FormatException e) { 202 error("Failed to parse ", v); 203 _response.code = 500; 204 return _response; 205 } 206 207 auto dataStream = new TCPSocketStream(); 208 scope (exit ) { 209 if ( dataStream !is null ) { 210 dataStream.close(); 211 } 212 } 213 214 dataStream.connect(host, port, _timeout); 215 216 code = sendCmdGetResponse("TYPE I\r\n"); 217 if ( code/100 > 2 ) { 218 _response.code = code; 219 return _response; 220 } 221 222 code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n"); 223 if ( code/100 > 1 ) { 224 _response.code = code; 225 return _response; 226 } 227 auto b = new ubyte[_bufferSize]; 228 for(size_t pos = 0; pos < data.length;) { 229 auto chunk = data.take(_bufferSize).array; 230 auto rc = dataStream.send(chunk); 231 if ( rc <= 0 ) { 232 trace("done"); 233 break; 234 } 235 tracef("sent %d bytes to data channel", rc); 236 pos += rc; 237 } 238 dataStream.close(); 239 dataStream = null; 240 response = serverResponse(); 241 code = responseToCode(response); 242 if ( code/100 == 2 ) { 243 tracef("Successfully uploaded %d bytes", _response._responseBody.length); 244 } 245 _response.code = code; 246 return _response; 247 } 248 249 auto get(string uri = null) { 250 enforce( uri || _uri.host, "FTP URL undefined"); 251 string response; 252 ushort code; 253 254 _response = new FTPResponse; 255 256 if ( uri ) { 257 handleChangeURI(uri); 258 } 259 260 _response.uri = _uri; 261 _response.finalURI = _uri; 262 263 if ( !_controlChannel ) { 264 _controlChannel = new TCPSocketStream(); 265 _controlChannel.connect(_uri.host, _uri.port, _timeout); 266 response = serverResponse(); 267 _responseHistory ~= response; 268 269 code = responseToCode(response); 270 tracef("Server initial response: %s", response); 271 if ( code/100 > 2 ) { 272 _response.code = code; 273 return _response; 274 } 275 // Log in 276 string user = _uri.username.length ? _uri.username : "anonymous"; 277 string pass = _uri.password.length ? _uri.password : "requests@"; 278 tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 279 280 code = sendCmdGetResponse("USER " ~ user ~ "\r\n"); 281 if ( code/100 > 3 ) { 282 _response.code = code; 283 return _response; 284 } else if ( code/100 == 3) { 285 286 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n"); 287 if ( code/100 > 2 ) { 288 _response.code = code; 289 return _response; 290 } 291 } 292 293 } 294 295 code = sendCmdGetResponse("CWD " ~ dirName(_uri.path) ~ "\r\n"); 296 if ( code/100 > 2 ) { 297 _response.code = code; 298 return _response; 299 } 300 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n"); 301 if ( code/100 == 2 ) { 302 // something like 303 // 213 229355520 304 auto s = _responseHistory[$-1].findSplitAfter(" "); 305 if ( s.length ) { 306 try { 307 _contentLength = to!size_t(s[1]); 308 } catch (ConvException) { 309 trace("Failed to convert string %s to file size".format(s[1])); 310 } 311 } 312 } 313 314 if ( _maxContentLength && _contentLength > _maxContentLength ) { 315 throw new RequestException("maxContentLength exceeded for ftp data"); 316 } 317 318 code = sendCmdGetResponse("PASV\r\n"); 319 if ( code/100 > 2 ) { 320 _response.code = code; 321 return _response; 322 } 323 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 324 // in last response. 325 // Cut anything between ( and ) 326 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 327 string host; 328 ushort port; 329 try { 330 ubyte a1,a2,a3,a4,p1,p2; 331 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 332 host = format("%d.%d.%d.%d", a1, a2, a3, a4); 333 port = (p1<<8) + p2; 334 } catch (FormatException e) { 335 error("Failed to parse ", v); 336 _response.code = 500; 337 return _response; 338 } 339 340 auto dataStream = new TCPSocketStream(); 341 scope (exit ) { 342 if ( dataStream !is null ) { 343 dataStream.close(); 344 } 345 } 346 347 dataStream.connect(host, port, _timeout); 348 349 code = sendCmdGetResponse("TYPE I\r\n"); 350 if ( code/100 > 2 ) { 351 _response.code = code; 352 return _response; 353 } 354 355 code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n"); 356 if ( code/100 > 1 ) { 357 _response.code = code; 358 return _response; 359 } 360 auto b = new ubyte[_bufferSize]; 361 while ( true ) { 362 auto rc = dataStream.receive(b); 363 if ( rc <= 0 ) { 364 trace("done"); 365 break; 366 } 367 tracef("got %d bytes from data channel", rc); 368 _response._responseBody.put(b[0..rc]); 369 370 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) { 371 throw new RequestException("maxContentLength exceeded for ftp data"); 372 } 373 } 374 dataStream.close(); 375 response = serverResponse(); 376 code = responseToCode(response); 377 if ( code/100 == 2 ) { 378 tracef("Successfully received %d bytes", _response._responseBody.length); 379 } 380 _response.code = code; 381 return _response; 382 } 383 } 384 385 package unittest { 386 globalLogLevel(LogLevel.info ); 387 info("testing ftp"); 388 auto rq = FTPRequest(); 389 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 390 auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation); 391 assert(rs.code == 226); 392 info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session."); 393 rs = rq.get("ftp://speedtest.tele2.net/nonexistent"); 394 assert(rs.code != 226); 395 info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session."); 396 rs = rq.get("ftp://speedtest.tele2.net/1KB.zip"); 397 assert(rs.code == 226); 398 assert(rs.responseBody.length == 1024); 399 info("ftp get ", "ftp://ftp.uni-bayreuth.de/README"); 400 rs = rq.get("ftp://ftp.uni-bayreuth.de/README"); 401 assert(rs.code == 226); 402 info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 403 rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation); 404 assert(rs.code == 226); 405 info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 406 rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 407 assert(rs.code == 226); 408 assert(rs.finalURI.path == "/pub/FreeBSD/README.TXT"); 409 info("testing ftp - done."); 410 }