1 module requests.pool;
2 
3 import std.stdio;
4 import std.range;
5 import std.algorithm;
6 import std.concurrency;
7 import std.typecons;
8 import std.variant;
9 import std.random;
10 import std.string;
11 import std.format;
12 import core.thread;
13 import std.exception;
14 import std.experimental.logger;
15 
16 import requests.request;
17 import requests.http;
18 import requests.base;
19 import requests.uri;
20 import requests.streams;
21 
22 struct Job {
23     enum Method {
24         GET,
25         POST,
26     };
27 
28     package {
29         string              _url;
30         Method              _method = Method.GET;
31         immutable(ubyte)[]  _data;                  // data for post
32         immutable(ubyte)[]  _opaque;                // opaque data tie request and response
33         uint                _maxRedirects = 10;
34         Duration            _timeout = 30.seconds;
35         immutable(string)[] _headers_h;
36         immutable(string)[] _headers_v;
37     }
38 
39     auto method(Method m) {
40         _method = m;
41         return this;
42     }
43     auto method(string m) {
44         switch(m.toUpper()) {
45             case "POST":
46                 _method = Method.POST;
47                 break;
48             case "GET":
49                 _method = Method.GET;
50                 break;
51             default:
52                 throw new Exception("Unknown method %s, known methods are GET,POST".format(m));
53         }
54         return this;
55     }
56     auto data(immutable(ubyte)[] d) {
57         _data = d;
58         return this;
59     }
60     auto maxRedirects(uint n) {
61         _maxRedirects = n;
62         return this;
63     }
64     auto timeout(Duration t) {
65         _timeout = t;
66         return this;
67     }
68     auto opaque(immutable(ubyte)[] o) {
69         _opaque = o;
70         return this;
71     }
72     auto addHeaders(in string[string] headers) {
73         foreach(pair; headers.byKeyValue) {
74             _headers_h ~= idup(pair.key);
75             _headers_v ~= idup(pair.value);
76         }
77         return this;
78     }
79 }
80 
81 struct Result {
82     enum {
83         OK   = 1,
84         QUIT = 2,
85         EXCEPTION = 4
86     }
87     package {
88         uint                _flags;
89         ushort              _code;
90         immutable(ubyte)[]  _data;       // response body
91         immutable(ubyte)[]  _opaque;     // opaque data tie request and response
92     }
93     auto flags() pure @nogc {
94         return _flags;
95     }
96     auto code() pure @nogc {
97         return _code;
98     }
99     auto data() pure @nogc {
100         return _data;
101     }
102     auto opaque() pure @nogc {
103         return _opaque;
104     }
105 }
106 
107 struct Quit {
108 }
109 
110 struct Route {
111     string scheme;
112     string host;
113     ushort port;
114 
115     @disable this();
116 
117     this(string url) {
118         URI parsed = URI(url);
119         scheme = parsed.scheme;
120         host = parsed.host;
121         port = parsed.port;
122     }
123     bool opEquals(Route other) {
124         bool r =  this.scheme == other.scheme 
125             && this.host == other.host
126             && this.port == other.port;
127         return r;
128     }
129     bool opEquals(ref Route other) {
130         bool r = this.scheme == other.scheme 
131             && this.host == other.host
132             && this.port == other.port;
133         return r;
134     }
135 }
136 
137 
138 void worker() {
139     Request rq;
140     bool    run = true;
141 
142     Result process(ref Job j) {
143         debug(requests) tracef("Received job %s", j._url);
144 
145         rq.maxRedirects(j._maxRedirects);
146         rq.timeout(j._timeout);
147         rq.clearHeaders();
148         if ( j._headers_h.length ) {
149             auto headers = assocArray(zip(j._headers_h.dup, j._headers_v.dup));
150             rq.addHeaders(headers);
151         }
152         Response rs;
153         try {
154             final switch(j._method) {
155                 case Job.Method.GET:
156                     rs = rq.get(j._url);
157                     break;
158                 case Job.Method.POST:
159                     rs = rq.post(j._url, j._data);
160                     break;
161             }
162             return Result(Result.OK, rs.code, assumeUnique(rs.responseBody.data), j._opaque);
163         } catch (Exception e) {
164             return Result(Result.EXCEPTION, 500, e.msg.representation(), j._opaque);
165         }
166     }
167 
168     try {
169         while (run) {
170             receive(
171                 (Tid p, Quit q) {
172                     // cmd to quit
173                     debug(requests) tracef("got quit");
174                     run = false;
175                 },
176                 (Tid p, Job j) {
177                     // cmd to process
178                     debug(requests) tracef("got job");
179                     auto r = process(j);
180                     p.send(thisTid(), r);
181                 },
182             );
183         }
184     }
185     catch (OwnerTerminated e) {
186         debug(requests) tracef("parent terminated");
187     }
188     catch (Exception e) {
189         errorf("Exception ", e);
190     }
191     finally {
192         debug(requests) tracef("worker done");
193     }
194 }
195 
196 class Manager(R) {
197     R                   _range;     // input range
198     uint                _workers;   // max num of workers
199     Route[Tid]          _idle;      // idle workers with last route served
200     Route[Tid]          _busy;      // busy workers with currently serving route
201     Job[Tid]            _box;       // one-element queue
202     size_t              _sent;
203     size_t              _received;
204     Nullable!Result     _result;
205     uint                _rc;        // ref counter
206     bool                _exhausted;
207 
208     bool boxIsEmpty(Tid t) {
209         return _box[t] == Job.init;
210     }
211 
212     auto findWorker(Route route) {
213         foreach(t, ref r; _idle) {
214             if ( r == route ) {
215                 // use it
216                 return t;
217             }
218         }
219         foreach(t, ref r; _busy) {
220             if ( r == route && _box[t] == Job.init ) {
221                 // use it
222                 return t;
223             }
224         }
225         if ( _busy.length + _idle.length < _workers ) {
226             return Tid.init;
227         }
228         return _idle.keys[0];
229     }
230 }
231 
232 struct Pool(R) {
233 private:
234     Manager!R _m;
235 public:
236     string toString() {
237         return "Pool<>";
238     }
239 
240    ~this() {
241         _m._rc--;
242         debug(requests) tracef("on ~ rc=%d", _m._rc);
243         Tid me = thisTid();
244         if ( _m._rc == 0 ) {
245             foreach(ref t; _m._busy.keys ~ _m._idle.keys) {
246                 debug(requests) tracef("sending Quit message to workers");
247                 t.send(me, Quit());
248             }
249         }
250     }
251 
252     this(R input_range, uint w) {
253         _m = new Manager!R();
254         _m._rc = 1;
255         _m._range = input_range;
256         _m._workers = w;
257     }
258     this(this) {
259         assert( _m._rc > 0);
260         _m._rc++;
261         debug(requests) tracef("copy rc=%d", _m._rc);
262     }
263 
264     bool empty() {
265         debug(requests) tracef("input empty: %s, exhausted: %s, busy: %d", _m._range.empty(), _m._exhausted, _m._busy.length);
266         if ( _m._range.empty() && _m._sent == 0 ) {
267             // we didn't start processing and input already empty. Empty input range?
268             return true;
269         }
270         return _m._exhausted;
271     }
272     /**
273         popFront
274     */
275     void popFront()
276     in
277     {
278         assert(_m._busy.length > 0 || _m._range.empty || _m._result.isNull);
279         assert(_m._busy.length + _m._idle.length <= _m._workers);
280     }
281     body
282     {
283         auto owner = thisTid();
284         Nullable!Tid  idle;
285         bool result_ready = false;
286         debug(requests) tracef("busy: %d, idle: %d, workers: %d", _m._busy.length, _m._idle.length, _m._workers);
287         if ( _m._result.isNull ) {
288             // case when popFront called without front
289             front;
290         }
291         if ( _m._busy.length > 0 ) {
292             receive(
293                 (Tid t, Result r) {
294                     assert(t in _m._busy, "received response not from busy thread");
295                     _m._received++;
296                     _m._result = r;
297                     result_ready = true;
298                     if ( ! _m.boxIsEmpty(t) ) {
299                         Job j = _m._box[t];
300                         assert(Route(j._url) == _m._busy[t], "wrong route");
301                         debug(requests) tracef("send job %s from the box", j._url);
302                         // have job with the same route, worker is still busy
303                         _m._box[t] = Job.init;
304                         t.send(owner, j);
305                         _m._sent++;
306                     } else {
307                         // move this thread from busy to idle threads
308                         Route route = _m._busy[t];
309                         debug(requests) tracef("release busy thread %s", route);
310                         _m._busy.remove(t);
311                         _m._idle[t] = route;
312                         idle = t;
313                     }
314                 }
315             );
316         }
317         while( !_m._range.empty() && _m._busy.length < _m._workers) {
318             debug(requests) trace("push next job to pool");
319             Job j = _m._range.front();
320             _m._range.popFront();
321             Route route = Route(j._url);
322             /* 
323             find best route.
324             1. look up for idle worker that served same route.
325             2. if 1. failed - look up for busy worker who server same route and have empty box
326             3. if 1 and 2 failed - just use any idle worker ( we must have one anyay)
327             */
328             auto t = _m.findWorker(route);
329             if ( t in _m._busy ) {
330                 // just place in box
331                 assert(_m._box[t] == Job.init);
332                 debug(requests) tracef("found best for %s in busy %s", route, _m._busy[t]);
333                 _m._box[t] = j;
334                 continue;
335             } else
336             if ( t in _m._idle ) {
337                 debug(requests) tracef("found best for %s in idle %s", route, _m._idle[t]);
338                 fromIdleToBusy(t, route);
339             } else 
340             if ( !idle.isNull ) {
341                 debug(requests) tracef("use just released idle (prev job %s) for %s", _m._idle[t], route);
342                 t = idle;
343                 idle.nullify();
344                 fromIdleToBusy(t, route);
345             } else {
346                 debug(requests) tracef("create worker for %s", route);
347                 t = spawn(&worker);
348                 _m._box[t] = Job.init;
349                 _m._busy[t] = route;
350             }
351             t.send(owner, j);
352             _m._sent++;
353         }
354         debug(requests) tracef("input empty: %s, sent: %d, received: %d, busy: %d",
355                                 _m._range.empty, _m._sent, _m._received,_m._busy.length );
356         if ( !result_ready && _m._range.empty && _m._sent == _m._received && _m._busy.length==0) {
357             _m._exhausted = true;
358         }
359         else {
360             _m._exhausted = false;
361         }
362     }
363     /**
364         front
365     */
366     Result front()
367     out {
368         assert(_m._busy.length > 0 || _m._range.empty);
369     }
370     body {
371         if ( !_m._result.isNull ) {
372             return _m._result;
373         }
374         Tid w;
375         sendWhilePossible();
376         receive(
377             (Tid t, Result r) {
378                 debug(requests) trace("received first response");
379                 _m._result = r;
380                 // move this thread from busy to idle threads
381                 fromBusyToIdle(t);
382                 _m._received++;
383                 w = t;
384             },
385         );
386         if ( !_m._range.empty && _m._busy.length == 0) {
387             // when max number of workers = 1, then
388             // at this point we will have only one idle worker,
389             // and we need to have at least one busy worker
390             // so that we can always read in popFront
391             Job j = _m._range.front();
392             Route route = Route(j._url);
393             w.send(thisTid(), j);
394             _m._range.popFront();
395             fromIdleToBusy(w, route);
396             _m._sent++;
397         }
398         return _m._result;
399     }
400     /**
401         helpers
402     */
403     void fromBusyToIdle(Tid t) {
404         assert(t in _m._busy);
405         assert(t !in _m._idle);
406         _m._idle[t] = _m._busy[t];
407         _m._busy.remove(t);
408     }
409     void fromIdleToBusy(Tid t, Route r) {
410         assert(t !in _m._busy);
411         assert(t in _m._idle);
412         _m._busy[t] = r;
413         _m._box[t] = Job.init;
414         _m._idle.remove(t);
415     }
416     void sendWhilePossible() {
417         while( !_m._range.empty() && (_m._busy.length+_m._idle.length) < _m._workers) {
418             Tid t = spawn(&worker);
419             Job j = _m._range.front();
420             Route route = Route(j._url);
421 
422             auto owner = thisTid();
423             send(t, owner, j);
424             _m._range.popFront();
425             _m._busy[t] = route;
426             _m._box[t] = Job.init;
427             _m._sent++;
428         }
429     }
430 }
431 
432 Pool!R pool(R)(R r, uint w) {
433     enforce(w>0, "Number of workers must me > 0");
434     return Pool!R(r, w);
435 }
436 
437 version(None) unittest {
438 
439     version(vibeD) {
440         string httpbinurl = "http://httpbin.org";
441     } else {
442         info("Testing pool");
443         import httpbin;
444         auto server = httpbinApp();
445         server.start();
446         scope(exit) {
447             server.stop();
448         }
449         Thread.sleep(1.seconds);
450         globalLogLevel = LogLevel.trace;
451         string httpbinurl = "http://127.0.0.1:8081";
452         Job[] jobs = [
453             Job(httpbinurl ~ "/get").addHeaders([
454                                 "X-Header": "X-Value",
455                                 "Y-Header": "Y-Value"
456                             ]),
457             Job(httpbinurl ~ "/gzip"),
458             Job(httpbinurl ~ "/deflate"),
459             Job(httpbinurl ~ "/absolute-redirect/3")
460                     .maxRedirects(2),
461             Job(httpbinurl ~ "/range/1024"),
462             Job(httpbinurl ~ "/post")
463                     .method("POST")                     // change default GET to POST
464                     .data("test".representation())      // attach data for POST
465                     .opaque("id".representation),       // opaque data - you will receive the same in Result
466             Job(httpbinurl ~ "/delay/3")
467                     .timeout(1.seconds),                // set timeout to 1.seconds - this request will throw exception and fails
468             Job(httpbinurl ~ "/stream/1024"),
469         ];
470 
471         auto count = jobs.
472             pool(6).
473             filter!(r => r.code==200).
474             count();
475 
476         assert(count == jobs.length - 2, "pool test failed");
477         iota(20)
478             .map!(n => Job(httpbinurl ~ "/post")
479                             .data("%d".format(n).representation))
480             .pool(10)
481             .each!(r => assert(r.code==200));
482         info("Testing pool - done");
483     }
484 }