1 module requests.pool;
2 
3 import std.stdio;
4 import std.range;
5 import std.algorithm;
6 import std.concurrency;
7 import std.typecons;
8 import std.variant;
9 import std.random;
10 import std.string;
11 import std.format;
12 import core.thread;
13 import std.exception;
14 import std.experimental.logger;
15 
16 import requests.request;
17 import requests.http;
18 import requests.base;
19 import requests.uri;
20 import requests.streams;
21 
22 struct Job {
23     enum Method {
24         GET,
25         POST,
26     };
27     string              _url;
28     Method              _method = Method.GET;
29     immutable(ubyte)[]  _data;                  // data for post
30     immutable(ubyte)[]  _opaque;                // opaque data tie request and response
31     uint                _maxRedirects = 10;
32     Duration            _timeout = 30.seconds;
33     immutable(string)[] _headers_h;
34     immutable(string)[] _headers_v;
35 
36     auto method(Method m) {
37         _method = m;
38         return this;
39     }
40     auto method(string m) {
41         switch(m.toUpper()) {
42             case "POST":
43                 _method = Method.POST;
44                 break;
45             case "GET":
46                 _method = Method.GET;
47                 break;
48             default:
49                 throw new Exception("Unknown method %s, known methods are GET,POST".format(m));
50         }
51         return this;
52     }
53     auto data(immutable(ubyte)[] d) {
54         _data = d;
55         return this;
56     }
57     auto maxRedirects(uint n) {
58         _maxRedirects = n;
59         return this;
60     }
61     auto timeout(Duration t) {
62         _timeout = t;
63         return this;
64     }
65     auto opaque(immutable(ubyte)[] o) {
66         _opaque = o;
67         return this;
68     }
69     auto addHeaders(in string[string] headers) {
70         foreach(pair; headers.byKeyValue) {
71             _headers_h ~= idup(pair.key);
72             _headers_v ~= idup(pair.value);
73         }
74         return this;
75     }
76 }
77 
78 struct Result {
79     enum {
80         OK   = 1,
81         QUIT = 2,
82         EXCEPTION = 4
83     }
84     uint                flags;
85     ushort              code;
86     immutable(ubyte)[]  data;       // response body
87     immutable(ubyte)[]  opaque;     // opaque data tie request and response
88 }
89 
90 struct Quit {
91 }
92 
93 struct Route {
94     string scheme;
95     string host;
96     ushort port;
97 
98     @disable this();
99 
100     this(string url) {
101         URI parsed = URI(url);
102         scheme = parsed.scheme;
103         host = parsed.host;
104         port = parsed.port;
105     }
106     bool opEquals(Route other) {
107         bool r =  this.scheme == other.scheme 
108             && this.host == other.host
109             && this.port == other.port;
110         return r;
111     }
112     bool opEquals(ref Route other) {
113         bool r = this.scheme == other.scheme 
114             && this.host == other.host
115             && this.port == other.port;
116         return r;
117     }
118 }
119 
120 
121 void worker() {
122     Request rq;
123     bool    run = true;
124 
125     Result process(ref Job j) {
126         debug(requests) tracef("Received job %s", j._url);
127 
128         rq.maxRedirects(j._maxRedirects);
129         rq.timeout(j._timeout);
130         rq.clearHeaders();
131         if ( j._headers_h.length ) {
132             auto headers = assocArray(zip(j._headers_h.dup, j._headers_v.dup));
133             rq.addHeaders(headers);
134         }
135         Response rs;
136         try {
137             final switch(j._method) {
138                 case Job.Method.GET:
139                     rs = rq.get(j._url);
140                     break;
141                 case Job.Method.POST:
142                     rs = rq.post(j._url, j._data);
143                     break;
144             }
145             return Result(Result.OK, rs.code, assumeUnique(rs.responseBody.data), j._opaque);
146         } catch (Exception e) {
147             return Result(Result.EXCEPTION, 500, e.msg.representation(), j._opaque);
148         }
149     }
150 
151     try {
152         while (run) {
153             receive(
154                 (Tid p, Quit q) {
155                     // cmd to quit
156                     debug(requests) tracef("got quit");
157                     run = false;
158                 },
159                 (Tid p, Job j) {
160                     // cmd to process
161                     debug(requests) tracef("got job");
162                     auto r = process(j);
163                     p.send(thisTid(), r);
164                 },
165             );
166         }
167     }
168     catch (OwnerTerminated e) {
169         debug(requests) tracef("parent terminated");
170     }
171     catch (Exception e) {
172         errorf("Exception ", e);
173     }
174     finally {
175         debug(requests) tracef("worker done");
176     }
177 }
178 
179 class Manager(R) {
180     R                   _range;     // input range
181     uint                _workers;   // max num of workers
182     Route[Tid]          _idle;      // idle workers with last route served
183     Route[Tid]          _busy;      // busy workers with currently serving route
184     Job[Tid]            _box;       // one-element queue
185     size_t              _sent;
186     size_t              _received;
187     Nullable!Result     _result;
188     uint                _rc;        // ref counter
189     bool                _exhausted;
190 
191     bool boxIsEmpty(Tid t) {
192         return _box[t] == Job.init;
193     }
194 
195     auto findWorker(Route route) {
196         foreach(t, ref r; _idle) {
197             if ( r == route ) {
198                 // use it
199                 return t;
200             }
201         }
202         foreach(t, ref r; _busy) {
203             if ( r == route && _box[t] == Job.init ) {
204                 // use it
205                 return t;
206             }
207         }
208         if ( _busy.length + _idle.length < _workers ) {
209             return Tid.init;
210         }
211         return _idle.keys[0];
212     }
213 }
214 
215 struct Pool(R) {
216 private:
217     Manager!R _m;
218 public:
219     string toString() {
220         return "Pool<>";
221     }
222 
223    ~this() {
224         _m._rc--;
225         debug(requests) tracef("on ~ rc=%d", _m._rc);
226         Tid me = thisTid();
227         if ( _m._rc == 0 ) {
228             foreach(ref t; _m._busy.keys ~ _m._idle.keys) {
229                 debug(requests) tracef("sending Quit message to workers");
230                 t.send(me, Quit());
231             }
232         }
233     }
234 
235     this(R input_range, uint w) {
236         _m = new Manager!R();
237         _m._rc = 1;
238         _m._range = input_range;
239         _m._workers = w;
240     }
241     this(this) {
242         assert( _m._rc > 0);
243         _m._rc++;
244         debug(requests) tracef("copy rc=%d", _m._rc);
245     }
246 
247     bool empty() {
248         debug(requests) tracef("input empty: %s, exhausted: %s, busy: %d", _m._range.empty(), _m._exhausted, _m._busy.length);
249         if ( _m._range.empty() && _m._sent == 0 ) {
250             // we didn't start processing and input already empty. Empty input range?
251             return true;
252         }
253         return _m._exhausted;
254     }
255     /**
256         popFront
257     */
258     void popFront()
259     in
260     {
261         assert(_m._busy.length > 0 || _m._range.empty);
262         assert(_m._busy.length + _m._idle.length <= _m._workers);
263     }
264     body
265     {
266         auto owner = thisTid();
267         Nullable!Tid  idle;
268         bool result_ready = false;
269         debug(requests) tracef("busy: %d, idle: %d, workers: %d", _m._busy.length, _m._idle.length, _m._workers);
270         
271         if ( _m._busy.length > 0 ) {
272             receive(
273                 (Tid t, Result r) {
274                     assert(t in _m._busy, "received response not from busy thread");
275                     _m._received++;
276                     _m._result = r;
277                     result_ready = true;
278                     if ( ! _m.boxIsEmpty(t) ) {
279                         Job j = _m._box[t];
280                         assert(Route(j._url) == _m._busy[t]);
281                         debug(requests) tracef("send job %s from the box", j._url);
282                         // have job with the same route, worker is still busy
283                         _m._box[t] = Job.init;
284                         t.send(owner, j);
285                         _m._sent++;
286                     } else {
287                         // move this thread from busy to idle threads
288                         Route route = _m._busy[t];
289                         debug(requests) tracef("release busy thread %s", route);
290                         _m._busy.remove(t);
291                         _m._idle[t] = route;
292                         idle = t;
293                     }
294                 }
295             );
296         }
297         while( !_m._range.empty() && _m._busy.length < _m._workers) {
298             debug(requests) trace("push next job to pool");
299             Job j = _m._range.front();
300             _m._range.popFront();
301             Route route = Route(j._url);
302             /* 
303             find best route.
304             1. look up for idle worker that served same route.
305             2. if 1. failed - look up for busy worker who server same route and have empty box
306             3. if 1 and 2 failed - just use any idle worker ( we must have one anyay)
307             */
308             auto t = _m.findWorker(route);
309             if ( t in _m._busy ) {
310                 // just place in box
311                 assert(_m._box[t] == Job.init);
312                 debug(requests) tracef("found best for %s in busy %s", route, _m._busy[t]);
313                 _m._box[t] = j;
314                 continue;
315             } else
316             if ( t in _m._idle ) {
317                 debug(requests) tracef("found best for %s in idle %s", route, _m._idle[t]);
318                 fromIdleToBusy(t, route);
319             } else 
320             if ( !idle.isNull ) {
321                 debug(requests) tracef("use just released idle (prev job %s) for %s", _m._idle[t], route);
322                 t = idle;
323                 idle.nullify();
324                 fromIdleToBusy(t, route);
325             } else {
326                 debug(requests) tracef("create worker for %s", route);
327                 t = spawn(&worker);
328                 _m._box[t] = Job.init;
329                 _m._busy[t] = route;
330             }
331             t.send(owner, j);
332             _m._sent++;
333         }
334         debug(requests) tracef("input empty: %s, sent: %d, received: %d, busy: %d",
335                                 _m._range.empty, _m._sent, _m._received,_m._busy.length );
336         if ( !result_ready && _m._range.empty && _m._sent == _m._received && _m._busy.length==0) {
337             _m._exhausted = true;
338         }
339         else {
340             _m._exhausted = false;
341         }
342     }
343     /**
344         front
345     */
346     Result front()
347     out {
348         assert(_m._busy.length > 0 || _m._range.empty);
349     }
350     body {
351         if ( !_m._result.isNull ) {
352             return _m._result;
353         }
354         Tid w;
355         sendWhilePossible();
356         receive(
357             (Tid t, Result r) {
358                 debug(requests) trace("received first response");
359                 _m._result = r;
360                 // move this thread from busy to idle threads
361                 fromBusyToIdle(t);
362                 _m._received++;
363                 w = t;
364             },
365         );
366         if ( !_m._range.empty && _m._busy.length == 0) {
367             // when max number of workers = 1, then
368             // at this point we will have only one idle worker,
369             // and we need to have at least one busy worker
370             // so that we can always read in popFront
371             Job j = _m._range.front();
372             Route route = Route(j._url);
373             w.send(thisTid(), j);
374             _m._range.popFront();
375             fromIdleToBusy(w, route);
376             _m._sent++;
377         }
378         return _m._result;
379     }
380     /**
381         helpers
382     */
383     void fromBusyToIdle(Tid t) {
384         assert(t in _m._busy);
385         assert(t !in _m._idle);
386         _m._idle[t] = _m._busy[t];
387         _m._busy.remove(t);
388     }
389     void fromIdleToBusy(Tid t, Route r) {
390         assert(t !in _m._busy);
391         assert(t in _m._idle);
392         _m._busy[t] = r;
393         _m._box[t] = Job.init;
394         _m._idle.remove(t);
395     }
396     void sendWhilePossible() {
397         while( !_m._range.empty() && (_m._busy.length+_m._idle.length) < _m._workers) {
398             Tid t = spawn(&worker);
399             Job j = _m._range.front();
400             Route route = Route(j._url);
401 
402             auto owner = thisTid();
403             send(t, owner, j);
404             _m._range.popFront();
405             _m._busy[t] = route;
406             _m._box[t] = Job.init;
407             _m._sent++;
408         }
409     }
410 }
411 
412 Pool!R pool(R)(R r, uint w) {
413     enforce(w>0, "Number of workers must me > 0");
414     return Pool!R(r, w);
415 }
416 
417 unittest {
418 
419     version(vibeD) {
420         string httpbinurl = "http://httpbin.org";
421     } else {
422         info("Testing pool");
423         import httpbin;
424         auto server = httpbinApp();
425         server.start();
426         scope(exit) {
427             server.stop();
428         }
429         string httpbinurl = "http://127.0.0.1:8081";
430         Job[] jobs = [
431             Job(httpbinurl ~ "/get").addHeaders([
432                                 "X-Header": "X-Value",
433                                 "Y-Header": "Y-Value"
434                             ]),
435             Job(httpbinurl ~ "/gzip"),
436             Job(httpbinurl ~ "/deflate"),
437             Job(httpbinurl ~ "/absolute-redirect/3")
438                     .maxRedirects(2),
439             Job(httpbinurl ~ "/range/1024"),
440             Job(httpbinurl ~ "/post")
441                     .method("POST")                     // change default GET to POST
442                     .data("test".representation())      // attach data for POST
443                     .opaque("id".representation),       // opaque data - you will receive the same in Result
444             Job(httpbinurl ~ "/delay/3")
445                     .timeout(1.seconds),                // set timeout to 1.seconds - this request will throw exception and fails
446             Job(httpbinurl ~ "/stream/1024"),
447         ];
448 
449         auto count = jobs.
450             pool(5).
451             filter!(r => r.code==200).
452             count();
453 
454         assert(count == jobs.length - 2, "pool test failed");
455         iota(20)
456             .map!(n => Job(httpbinurl ~ "/post")
457                             .data("%d".format(n).representation))
458             .pool(10)
459             .each!(r => assert(r.code==200));
460         info("Testing pool - done");
461     }
462 }