1 module requests.connmanager;
2 
3 import std.typecons;
4 import std.datetime;
5 import std.array;
6 import std.algorithm;
7 import std.exception;
8 import std.format;
9 
10 import std.experimental.logger;
11 
12 import requests.streams;
13 
14 import cachetools.cache;
15 
16 /**
17  * Keep opened connections for HTTP.
18  * It is actually cache over tuple(schema, host, port) -> connection
19  * with limited number of items.
20  *
21  * Evict least used.
22 */
23 
24 package struct ConnManager {
25     package alias CMKey = Tuple!(string, string, ushort);
26     package class CMValue {
27         this(NetworkStream s) {
28             stream = s;
29         }
30         NetworkStream stream;
31         string toString() inout {
32             return "%s".format(stream);
33         }
34     }
35     private {
36         CMValue[][CMKey]    map;
37         int                 capacity;
38         int                 counter;
39     }
40     this(int c) {
41         capacity = c;
42     }
43     void clear()
44     {
45         foreach(v;map.byValue)
46         {
47             foreach(s; v) {
48                 s.stream.close();
49             }
50         }
51         map.clear;
52         counter = 0;
53     }
54     void put(string schema, string host, ushort port, NetworkStream stream) 
55     in
56     {
57         assert(stream !is null);
58         assert(stream.isConnected);
59     }
60     do
61     {
62         debug(requests) {
63             tracef("Place connection to pool: %s://%s:%s", schema, host, port);
64         }
65         CMKey key = CMKey(schema, host, port);
66         auto c = key in map;
67         if ( c ) {
68             (*c) ~= new CMValue(stream);
69         } else {
70             map[key] = [new CMValue(stream)];
71         }
72         counter++;
73         if ( counter > capacity ) {
74             auto k = map.byKey.front;
75             debug(requests) {
76                 tracef("remove random key %s", k);
77             }
78             auto streams = map[k];
79             auto s = streams[0].stream;
80             counter--;
81             map[k] = streams[1..$];
82             if (map[k].length == 0) {
83                 map.remove(k);
84             }
85             s.close();
86         }
87     }
88 
89     NetworkStream get(string schema, string host, ushort port)
90     do
91     {
92         CMKey key = CMKey(schema, host, port);
93         auto c = key in map;
94         if (c) {
95             // trim and return last
96             auto v = *c;
97             auto s = v[$-1].stream;
98             v = v[0..$-1];
99             counter--;
100             debug (requests) {
101                 tracef("Get connection from pool: %s://%s:%s - %s", schema, host, port, s);
102             }
103             if ( v.length == 0 ) {
104                 map.remove(key);
105             }
106             return s;
107         }
108         else {
109             return null;
110         }
111     }
112 
113     // void del(string schema, string host, ushort port, NetworkStream str) 
114     // {
115     //     CMKey key = CMKey(schema, host, port);
116     //     auto c = key in map;
117     //     if ( c ) {
118     //         immutable index = (*c).countUntil!(a => a.stream == str);
119     //         if ( index > -1 ) {
120     //             (*c).remove(index);
121     //         }
122     //     }
123     // }
124 }
125 
126 package struct ConnManager1 {
127     package alias  CMKey = Tuple!(string, string, ushort);
128     package class CMValue {
129         NetworkStream   stream;
130         bool            in_use;
131         string toString() inout {
132             return "%s:%s".format(in_use, stream);
133         }
134     }
135     private {
136         CacheLRU!(CMKey, CMValue[]) __cache;
137     }
138     this(int limit) {
139         __cache = new CacheLRU!(CMKey, CMValue[]);
140         __cache.size = limit;
141         __cache.enableCacheEvents();
142     }
143     ~this() {
144         clear();
145     }
146     @property auto length() {
147         return __cache.length;
148     }
149     ///
150     /// put new stream in cache, evict old stream and return it.
151     /// If nothing evicted return null. Returned(evicted) connection can be
152     /// closed.
153     ///
154     NetworkStream put(string schema, string host, ushort port, NetworkStream stream)
155     in { assert(stream !is null);}
156     out{ assert(__cache.length>0);}
157     do {
158         import std.stdio;
159         CMKey     key = CMKey(schema, host, port);
160         CMValue   new_value = new CMValue;
161         new_value.stream = stream;
162         new_value.in_use = true;
163 
164         CMValue[] old_values;
165         auto cs = __cache.get(key);
166         if ( !cs.isNull ) {
167             old_values = cs.get();
168         }
169         __cache.put(key, old_values ~ new_value);
170 
171         writefln("new: %s", old_values ~ new_value);
172 
173         auto cacheEvents = __cache.cacheEvents();
174         foreach(e; cacheEvents) {
175             // if ( e.event == EventType.Evicted ) {
176 
177             // } else if (e.event == EventType.Updated) {
178 
179             // }
180             //writeln(e);
181         }
182         return null;
183         // switch( cacheEvents.length )
184         // {
185         //     case 0:
186         //         return null;
187         //     case 1:
188         //         old_values = cacheEvents.front.val;
189         //         foreach(s; old_values) {
190         //             s.stream.close();
191         //         }
192         //         return null;
193         //     default:
194         //         assert(0);
195         // }
196     }
197     /**
198         Lookup connection.
199      */
200     NetworkStream get(string schema, string host, ushort port)
201     do
202     {
203         if ( __cache is null ) return null;
204         auto v = __cache.get(CMKey(schema, host, port));
205         if ( ! v.isNull() )
206         {
207             auto streams = v.get;
208             foreach(s; streams) {
209                 if ( !s.in_use ) {
210                     s.in_use = true;
211                     return s.stream;
212                 }
213             }
214         }
215         return null;
216     }
217 
218     /**
219         Remove connection from cache (without close).
220      */
221     void del(string schema, string host, ushort port, NetworkStream str) {
222         NetworkStream s;
223         CMKey key = CMKey(schema, host, port);
224         auto v = __cache.get(key);
225         if ( v.isNull ) {
226             return;
227         }
228         auto streams = v.get();
229         int index;
230         do {
231             if ( streams[index].stream is str) {
232                 break;
233             }
234             index++;
235         } while(index < streams.length);
236 
237         if ( index < streams.length ) {
238             streams.remove(index);
239         }
240 
241         if ( streams.length == 0 ) {
242             __cache.remove(key);
243         } else {
244             __cache.put(key, streams);
245         }
246     }
247 
248     /**
249         clear cache (and close connections)
250      */
251     void clear()
252     out { assert(__cache is null || __cache.length == 0); }
253     do  {
254         if ( __cache is null ) return;
255 
256         __cache.clear();
257         foreach(e; __cache.cacheEvents )
258         {
259             try
260             {
261                 foreach(v; e.val) {
262                     v.stream.close();
263                 }
264             }
265             catch(Exception e)
266             {
267                 debug(requests) tracef("%s while clear connmanager", e.msg);
268             }
269         }
270         __cache = null;
271     }
272 }
273 
274 // unittest {
275 //     globalLogLevel = LogLevel.info;
276 //     ConnManager cm = ConnManager(2);
277 //     auto s0 = new TCPStream();
278 //     auto s1 = new TCPStream();
279 //     auto s2 = new TCPStream();
280 
281 //     auto e = cm.put("http", "s0", 1, s0);
282 //     assert(e is null);
283 //     assert(cm.get("http", "s0", 1) == s0);
284 
285 //     e = cm.put("http", "s1", 1, s1);
286 //     assert(e is null);
287 //     assert(cm.get("http", "s1", 1) == s1);
288 
289 //     e = cm.put("http", "s2", 1, s2);
290 // //    assert(e !is null);
291 //     assert(cm.get("http", "s2", 1) == s2);
292 // //    assert(e == s0); // oldest
293 // //    e.close();
294 
295 //     // at this moment we have s1, s2
296 //     // let try to update s1
297 //     auto s3 = new TCPStream;
298 //     e = cm.put("http", "s1", 1, s3);
299 //     // assert(e == s1);
300 //     // e.close();
301 //     assert(cm.get("http", "s1", 1) == s3);
302 
303 //     cm.clear();
304 //     assert(cm.get("http", "s1", 1) is null);
305 // }