1 module requests.connmanager;
2 
3 import std.typecons;
4 import std.datetime;
5 import std.array;
6 import std.algorithm;
7 import std.exception;
8 
9 import std.experimental.logger;
10 
11 import requests.streams;
12 
13 /**
14  * Keep opened connections for HTTP.
15  * It is actually cache over tuple(schema, host, port) -> connection
16  * with limited number of items.
17  *
18  * Evict least used.
19 */
20 package struct ConnManager {
21     package alias  CMKey = Tuple!(string, string, ushort);
22     package struct CMValue {
23         NetworkStream   stream;
24         SysTime         timestamp;
25     }
26     private {
27         int              _limit = 10;
28         CMValue[CMKey]   _cache;
29     }
30     this(int limit) {
31         _limit = limit;
32     }
33     ~this() {
34         //assert(0);
35         clear();
36     }
37     @property auto length() {
38         return _cache.length;
39     }
40     ///
41     /// evict oldest connection
42     ///
43     private CMKey evict()
44     in { assert(_cache.length>0); }
45     do {
46         debug(requests) trace("looking something to evict");
47         return _cache.byKeyValue().array.sort!"a.value.timestamp < b.value.timestamp".front().key;
48     }
49     ///
50     /// put new stream in cache, evict old stream and return it.
51     /// If nothing evicted return null. Returned(evicted) connection can be
52     /// closed.
53     ///
54     NetworkStream put(string schema, string host, ushort port, NetworkStream stream)
55     in { assert(stream !is null);}
56     out{ assert(_cache.length>0);}
57     do {
58         NetworkStream e;
59         auto key = CMKey(schema, host, port);
60         auto value_ptr = key in _cache;
61 
62         if ( value_ptr is null ) {
63             CMValue v = {stream: stream, timestamp: Clock.currTime};
64             if ( _cache.length >= _limit ) {
65                 CMKey k = evict();
66                 e = _cache[k].stream;
67                 _cache.remove(k);
68             }
69             _cache[key] = v;
70             return e;
71         }
72         auto old_stream = (*value_ptr).stream;
73         if (  old_stream != stream ) {
74             debug(requests) trace("old stream != new stream");
75             e = old_stream;
76             (*value_ptr).stream = stream;
77         }
78         (*value_ptr).timestamp = Clock.currTime;
79         return e;
80     }
81     /**
82         Lookup connection.
83      */
84     NetworkStream get(string schema, string host, ushort port) {
85         if ( auto value_ptr = CMKey(schema, host, port) in _cache ) {
86             return (*value_ptr).stream;
87         }
88         return null;
89     }
90 
91     /**
92         Remove connection from cache (without close).
93      */
94     NetworkStream del(string schema, string host, ushort port) {
95         NetworkStream s;
96         CMKey key = CMKey(schema, host, port);
97         if ( auto value_ptr = key in _cache ) {
98             s = (*value_ptr).stream;
99         }
100         _cache.remove(key);
101         return s;
102     }
103 
104     /**
105         clear cache (and close connections)
106      */
107     void clear()
108     out { assert(_cache.length == 0); }
109     do  {
110         foreach(k,ref v; _cache) {
111             debug(requests) tracef("Clear ConnManager entry %s", k);
112             try {
113                 v.stream.close();
114             } catch (Exception e) {
115                 debug(requests) tracef("%s while clear connmanager", e.msg);
116             }
117             _cache.remove(k);
118         }
119     }
120 
121 }
122 
123 unittest {
124     globalLogLevel = LogLevel.info;
125     ConnManager cm = ConnManager(2);
126     auto s0 = new TCPStream();
127     auto s1 = new TCPStream();
128     auto s2 = new TCPStream();
129 
130     auto e = cm.put("http", "s0", 1, s0);
131     assert(e is null);
132     assert(cm.get("http", "s0", 1) == s0);
133 
134     e = cm.put("http", "s1", 1, s1);
135     assert(e is null);
136     assert(cm.get("http", "s1", 1) == s1);
137 
138     e = cm.put("http", "s2", 1, s2);
139     assert(e !is null);
140     assert(cm.get("http", "s2", 1) == s2);
141     assert(e == s0); // oldest
142     e.close();
143 
144     // at this moment we have s1, s2
145     // let try to update s1
146     auto s3 = new TCPStream;
147     e = cm.put("http", "s1", 1, s3);
148     assert(e == s1);
149     e.close();
150     assert(cm.get("http", "s1", 1) == s3);
151 
152     cm.clear();
153     assert(cm.get("http", "s1", 1) is null);
154 }