1 module requests.connmanager;
2 
3 import std.typecons;
4 import std.datetime;
5 import std.array;
6 import std.algorithm;
7 import std.exception;
8 
9 import std.experimental.logger;
10 
11 import requests.streams;
12 
13 ///
14 /// Keep opened connections for HTTP
15 /// it is cache over tuple(schema, host, port) -> connection
16 ///
17 /// Evict least used
18 ///
19 package class ConnManager {
20     package alias  CMKey = Tuple!(string, string, ushort);
21     package struct CMValue {
22         NetworkStream   stream;
23         SysTime         timestamp;
24     }
25     private {
26         ubyte            _limit;
27         CMValue[CMKey]   _cache;
28     }
29     this(ubyte limit = 10) {
30         _limit = limit;
31     }
32     ~this() {
33         enforce!Exception(_cache.length == 0, "You must clear connManager before it GC-ed");
34     }
35     ///
36     /// evict oldest connection
37     ///
38     private CMKey evict()
39     in { assert(_cache.length>0); }
40     do {
41         debug(requests) trace("looking something to evict");
42         return _cache.byKeyValue().array.sort!"a.value.timestamp < b.value.timestamp".front().key;
43     }
44     ///
45     /// put new stream in cache, evict old stream and return it
46     /// If nothing evicted return null
47     ///
48     NetworkStream put(string schema, string host, ushort port, NetworkStream stream)
49     in { assert(stream !is null);}
50     out{ assert(_cache.length>0);}
51     do {
52         NetworkStream e;
53         auto key = CMKey(schema, host, port);
54         auto value_ptr = key in _cache;
55 
56         if ( value_ptr is null ) {
57             CMValue v = {stream: stream, timestamp: Clock.currTime};
58             if ( _cache.length >= _limit ) {
59                 CMKey k = evict();
60                 e = _cache[k].stream;
61                 _cache.remove(k);
62             }
63             _cache[key] = v;
64             return e;
65         }
66         auto old_stream = (*value_ptr).stream;
67         if (  old_stream != stream ) {
68             debug(requests) trace("old stream != new stream");
69             e = old_stream;
70             (*value_ptr).stream = stream;
71         }
72         (*value_ptr).timestamp = Clock.currTime;
73         return e;
74     }
75 
76     NetworkStream get(string schema, string host, ushort port) {
77         if ( auto value_ptr = CMKey(schema, host, port) in _cache ) {
78             return (*value_ptr).stream;
79         }
80         return null;
81     }
82 
83     NetworkStream del(string schema, string host, ushort port) {
84         NetworkStream s;
85         CMKey key = CMKey(schema, host, port);
86         if ( auto value_ptr = key in _cache ) {
87             s = (*value_ptr).stream;
88         }
89         _cache.remove(key);
90         return s;
91     }
92 
93     void clear()
94     out { assert(_cache.length == 0); }
95     do  {
96         foreach(k,ref v; _cache) {
97             debug(requests) tracef("Clear ConnManager entry %s", k);
98             try {
99                 v.stream.close();
100             } catch (Exception e) {
101                 debug(requests) tracef("%s while clear connmanager", e.msg);
102             }
103             _cache.remove(k);
104         }
105     }
106 
107 }
108 
109 unittest {
110     globalLogLevel = LogLevel.info;
111     ConnManager cm = new ConnManager(2);
112     auto s0 = new TCPStream();
113     auto s1 = new TCPStream();
114     auto s2 = new TCPStream();
115 
116     auto e = cm.put("http", "s0", 1, s0);
117     assert(e is null);
118     assert(cm.get("http", "s0", 1) == s0);
119 
120     e = cm.put("http", "s1", 1, s1);
121     assert(e is null);
122     assert(cm.get("http", "s1", 1) == s1);
123 
124     e = cm.put("http", "s2", 1, s2);
125     assert(e !is null);
126     assert(cm.get("http", "s2", 1) == s2);
127     assert(e == s0); // oldest
128     e.close();
129 
130     // at this moment we have s1, s2
131     // let try to update s1
132     auto s3 = new TCPStream;
133     e = cm.put("http", "s1", 1, s3);
134     assert(e == s1);
135     e.close();
136     assert(cm.get("http", "s1", 1) == s3);
137 
138     cm.clear();
139     assert(cm.get("http", "s1", 1) is null);
140 }