1 module requests.pool; 2 3 import std.stdio; 4 import std.range; 5 import std.algorithm; 6 import std.concurrency; 7 import std.typecons; 8 import std.variant; 9 import std.random; 10 import std.string; 11 import std.format; 12 import core.thread; 13 import std.exception; 14 import std.experimental.logger; 15 16 import requests.request; 17 import requests.http; 18 import requests.base; 19 import requests.uri; 20 import requests.streams; 21 22 struct Job { 23 enum Method { 24 GET, 25 POST, 26 }; 27 string _url; 28 Method _method = Method.GET; 29 immutable(ubyte)[] _data; // data for post 30 immutable(ubyte)[] _opaque; // opaque data tie request and response 31 uint _maxRedirects = 10; 32 Duration _timeout = 30.seconds; 33 immutable(string)[] _headers_h; 34 immutable(string)[] _headers_v; 35 36 auto method(Method m) { 37 _method = m; 38 return this; 39 } 40 auto method(string m) { 41 switch(m.toUpper()) { 42 case "POST": 43 _method = Method.POST; 44 break; 45 case "GET": 46 _method = Method.GET; 47 break; 48 default: 49 throw new Exception("Unknown method %s, known methods are GET,POST".format(m)); 50 } 51 return this; 52 } 53 auto data(immutable(ubyte)[] d) { 54 _data = d; 55 return this; 56 } 57 auto maxRedirects(uint n) { 58 _maxRedirects = n; 59 return this; 60 } 61 auto timeout(Duration t) { 62 _timeout = t; 63 return this; 64 } 65 auto opaque(immutable(ubyte)[] o) { 66 _opaque = o; 67 return this; 68 } 69 auto addHeaders(in string[string] headers) { 70 foreach(pair; headers.byKeyValue) { 71 _headers_h ~= idup(pair.key); 72 _headers_v ~= idup(pair.value); 73 } 74 return this; 75 } 76 } 77 78 struct Result { 79 enum { 80 OK = 1, 81 QUIT = 2, 82 EXCEPTION = 4 83 } 84 uint flags; 85 ushort code; 86 immutable(ubyte)[] data; // response body 87 immutable(ubyte)[] opaque; // opaque data tie request and response 88 } 89 90 struct Quit { 91 } 92 93 struct Route { 94 string scheme; 95 string host; 96 ushort port; 97 98 @disable this(); 99 100 this(string url) { 101 URI parsed = URI(url); 102 scheme = parsed.scheme; 103 host = parsed.host; 104 port = parsed.port; 105 } 106 bool opEquals(Route other) { 107 bool r = this.scheme == other.scheme 108 && this.host == other.host 109 && this.port == other.port; 110 return r; 111 } 112 bool opEquals(ref Route other) { 113 bool r = this.scheme == other.scheme 114 && this.host == other.host 115 && this.port == other.port; 116 return r; 117 } 118 } 119 120 121 void worker() { 122 Request rq; 123 bool run = true; 124 125 Result process(ref Job j) { 126 debug(requests) tracef("Received job %s", j._url); 127 128 rq.maxRedirects(j._maxRedirects); 129 rq.timeout(j._timeout); 130 rq.clearHeaders(); 131 if ( j._headers_h.length ) { 132 auto headers = assocArray(zip(j._headers_h.dup, j._headers_v.dup)); 133 rq.addHeaders(headers); 134 } 135 Response rs; 136 try { 137 final switch(j._method) { 138 case Job.Method.GET: 139 rs = rq.get(j._url); 140 break; 141 case Job.Method.POST: 142 rs = rq.post(j._url, j._data); 143 break; 144 } 145 return Result(Result.OK, rs.code, assumeUnique(rs.responseBody.data), j._opaque); 146 } catch (Exception e) { 147 return Result(Result.EXCEPTION, 500, e.msg.representation(), j._opaque); 148 } 149 } 150 151 try { 152 while (run) { 153 receive( 154 (Tid p, Quit q) { 155 // cmd to quit 156 debug(requests) tracef("got quit"); 157 run = false; 158 }, 159 (Tid p, Job j) { 160 // cmd to process 161 debug(requests) tracef("got job"); 162 auto r = process(j); 163 p.send(thisTid(), r); 164 }, 165 ); 166 } 167 } 168 catch (OwnerTerminated e) { 169 debug(requests) tracef("parent terminated"); 170 } 171 catch (Exception e) { 172 errorf("Exception ", e); 173 } 174 finally { 175 debug(requests) tracef("worker done"); 176 } 177 } 178 179 class Manager(R) { 180 R _range; // input range 181 uint _workers; // max num of workers 182 Route[Tid] _idle; // idle workers with last route served 183 Route[Tid] _busy; // busy workers with currently serving route 184 Job[Tid] _box; // one-element queue 185 size_t _sent; 186 size_t _received; 187 Nullable!Result _result; 188 uint _rc; // ref counter 189 bool _exhausted; 190 191 bool boxIsEmpty(Tid t) { 192 return _box[t] == Job.init; 193 } 194 195 auto findWorker(Route route) { 196 foreach(t, ref r; _idle) { 197 if ( r == route ) { 198 // use it 199 return t; 200 } 201 } 202 foreach(t, ref r; _busy) { 203 if ( r == route && _box[t] == Job.init ) { 204 // use it 205 return t; 206 } 207 } 208 if ( _busy.length + _idle.length < _workers ) { 209 return Tid.init; 210 } 211 return _idle.keys[0]; 212 } 213 } 214 215 struct Pool(R) { 216 private: 217 Manager!R _m; 218 public: 219 string toString() { 220 return "Pool<>"; 221 } 222 223 ~this() { 224 _m._rc--; 225 debug(requests) tracef("on ~ rc=%d", _m._rc); 226 Tid me = thisTid(); 227 if ( _m._rc == 0 ) { 228 foreach(ref t; _m._busy.keys ~ _m._idle.keys) { 229 debug(requests) tracef("sending Quit message to workers"); 230 t.send(me, Quit()); 231 } 232 } 233 } 234 235 this(R input_range, uint w) { 236 _m = new Manager!R(); 237 _m._rc = 1; 238 _m._range = input_range; 239 _m._workers = w; 240 } 241 this(this) { 242 assert( _m._rc > 0); 243 _m._rc++; 244 debug(requests) tracef("copy rc=%d", _m._rc); 245 } 246 247 bool empty() { 248 debug(requests) tracef("input empty: %s, exhausted: %s, busy: %d", _m._range.empty(), _m._exhausted, _m._busy.length); 249 if ( _m._range.empty() && _m._sent == 0 ) { 250 // we didn't start processing and input already empty. Empty input range? 251 return true; 252 } 253 return _m._exhausted; 254 } 255 /** 256 popFront 257 */ 258 void popFront() 259 in 260 { 261 assert(_m._busy.length > 0 || _m._range.empty); 262 assert(_m._busy.length + _m._idle.length <= _m._workers); 263 } 264 body 265 { 266 auto owner = thisTid(); 267 Nullable!Tid idle; 268 bool result_ready = false; 269 debug(requests) tracef("busy: %d, idle: %d, workers: %d", _m._busy.length, _m._idle.length, _m._workers); 270 271 if ( _m._busy.length > 0 ) { 272 receive( 273 (Tid t, Result r) { 274 assert(t in _m._busy, "received response not from busy thread"); 275 _m._received++; 276 _m._result = r; 277 result_ready = true; 278 if ( ! _m.boxIsEmpty(t) ) { 279 Job j = _m._box[t]; 280 assert(Route(j._url) == _m._busy[t]); 281 debug(requests) tracef("send job %s from the box", j._url); 282 // have job with the same route, worker is still busy 283 _m._box[t] = Job.init; 284 t.send(owner, j); 285 _m._sent++; 286 } else { 287 // move this thread from busy to idle threads 288 Route route = _m._busy[t]; 289 debug(requests) tracef("release busy thread %s", route); 290 _m._busy.remove(t); 291 _m._idle[t] = route; 292 idle = t; 293 } 294 } 295 ); 296 } 297 while( !_m._range.empty() && _m._busy.length < _m._workers) { 298 debug(requests) trace("push next job to pool"); 299 Job j = _m._range.front(); 300 _m._range.popFront(); 301 Route route = Route(j._url); 302 /* 303 find best route. 304 1. look up for idle worker that served same route. 305 2. if 1. failed - look up for busy worker who server same route and have empty box 306 3. if 1 and 2 failed - just use any idle worker ( we must have one anyay) 307 */ 308 auto t = _m.findWorker(route); 309 if ( t in _m._busy ) { 310 // just place in box 311 assert(_m._box[t] == Job.init); 312 debug(requests) tracef("found best for %s in busy %s", route, _m._busy[t]); 313 _m._box[t] = j; 314 continue; 315 } else 316 if ( t in _m._idle ) { 317 debug(requests) tracef("found best for %s in idle %s", route, _m._idle[t]); 318 fromIdleToBusy(t, route); 319 } else 320 if ( !idle.isNull ) { 321 debug(requests) tracef("use just released idle (prev job %s) for %s", _m._idle[t], route); 322 t = idle; 323 idle.nullify(); 324 fromIdleToBusy(t, route); 325 } else { 326 debug(requests) tracef("create worker for %s", route); 327 t = spawn(&worker); 328 _m._box[t] = Job.init; 329 _m._busy[t] = route; 330 } 331 t.send(owner, j); 332 _m._sent++; 333 } 334 debug(requests) tracef("input empty: %s, sent: %d, received: %d, busy: %d", 335 _m._range.empty, _m._sent, _m._received,_m._busy.length ); 336 if ( !result_ready && _m._range.empty && _m._sent == _m._received && _m._busy.length==0) { 337 _m._exhausted = true; 338 } 339 else { 340 _m._exhausted = false; 341 } 342 } 343 /** 344 front 345 */ 346 Result front() 347 out { 348 assert(_m._busy.length > 0 || _m._range.empty); 349 } 350 body { 351 if ( !_m._result.isNull ) { 352 return _m._result; 353 } 354 Tid w; 355 sendWhilePossible(); 356 receive( 357 (Tid t, Result r) { 358 debug(requests) trace("received first response"); 359 _m._result = r; 360 // move this thread from busy to idle threads 361 fromBusyToIdle(t); 362 _m._received++; 363 w = t; 364 }, 365 ); 366 if ( !_m._range.empty && _m._busy.length == 0) { 367 // when max number of workers = 1, then 368 // at this point we will have only one idle worker, 369 // and we need to have at least one busy worker 370 // so that we can always read in popFront 371 Job j = _m._range.front(); 372 Route route = Route(j._url); 373 w.send(thisTid(), j); 374 _m._range.popFront(); 375 fromIdleToBusy(w, route); 376 _m._sent++; 377 } 378 return _m._result; 379 } 380 /** 381 helpers 382 */ 383 void fromBusyToIdle(Tid t) { 384 assert(t in _m._busy); 385 assert(t !in _m._idle); 386 _m._idle[t] = _m._busy[t]; 387 _m._busy.remove(t); 388 } 389 void fromIdleToBusy(Tid t, Route r) { 390 assert(t !in _m._busy); 391 assert(t in _m._idle); 392 _m._busy[t] = r; 393 _m._box[t] = Job.init; 394 _m._idle.remove(t); 395 } 396 void sendWhilePossible() { 397 while( !_m._range.empty() && (_m._busy.length+_m._idle.length) < _m._workers) { 398 Tid t = spawn(&worker); 399 Job j = _m._range.front(); 400 Route route = Route(j._url); 401 402 auto owner = thisTid(); 403 send(t, owner, j); 404 _m._range.popFront(); 405 _m._busy[t] = route; 406 _m._box[t] = Job.init; 407 _m._sent++; 408 } 409 } 410 } 411 412 Pool!R pool(R)(R r, uint w) { 413 enforce(w>0, "Number of workers must me > 0"); 414 return Pool!R(r, w); 415 } 416 417 unittest { 418 419 version(vibeD) { 420 string httpbinurl = "http://httpbin.org"; 421 } else { 422 info("Testing pool"); 423 import httpbin; 424 auto server = httpbinApp(); 425 server.start(); 426 scope(exit) { 427 server.stop(); 428 } 429 string httpbinurl = "http://127.0.0.1:8081"; 430 Job[] jobs = [ 431 Job(httpbinurl ~ "/get").addHeaders([ 432 "X-Header": "X-Value", 433 "Y-Header": "Y-Value" 434 ]), 435 Job(httpbinurl ~ "/gzip"), 436 Job(httpbinurl ~ "/deflate"), 437 Job(httpbinurl ~ "/absolute-redirect/3") 438 .maxRedirects(2), 439 Job(httpbinurl ~ "/range/1024"), 440 Job(httpbinurl ~ "/post") 441 .method("POST") // change default GET to POST 442 .data("test".representation()) // attach data for POST 443 .opaque("id".representation), // opaque data - you will receive the same in Result 444 Job(httpbinurl ~ "/delay/3") 445 .timeout(1.seconds), // set timeout to 1.seconds - this request will throw exception and fails 446 Job(httpbinurl ~ "/stream/1024"), 447 ]; 448 449 auto count = jobs. 450 pool(5). 451 filter!(r => r.code==200). 452 count(); 453 454 assert(count == jobs.length - 2, "pool test failed"); 455 iota(20) 456 .map!(n => Job(httpbinurl ~ "/post") 457 .data("%d".format(n).representation)) 458 .pool(10) 459 .each!(r => assert(r.code==200)); 460 info("Testing pool - done"); 461 } 462 }