4 Copyright (c) 2020-2025 Paul Ross
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
13 The above copyright notice and this permission notice shall be included in all
14 copies or substantial portions of the Software.
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
36 logger = logging.getLogger(__file__)
38 LOG_FORMAT_VERBOSE = (
39 '%(asctime)s - %(filename)24s#%(lineno)-4d - %(process)5d - (%(threadName)-10s) - %(levelname)-8s - %(message)s'
42 LOG_FORMAT_NO_THREAD = (
43 '%(asctime)s - %(filename)24s#%(lineno)-4d - %(process)5d - %(levelname)-8s - %(message)s'
46 LOG_FORMAT_NO_PROCESS = (
47 '%(asctime)s - %(filename)12s#%(lineno)-4d - %(levelname)-8s - %(message)s'
52 """Represents the delay over a communication line."""
54 def __init__(self, latency_one_way_s: float, bandwidth_bps: float, realtime: bool =
False):
55 """Bandwidth 0.0 means infinite bandwidth."""
63 def transmit(self, data_bytes: bytes, direction: str) ->
None:
68 t_bandwidth = 8 * len(data_bytes) / self.
bandwidth_bpsbandwidth_bps
71 logger.debug(
'COMMS_: %s length %d delay %.3f (ms)', direction, len(data_bytes), t * 1000)
73 'COMMS_: %s length %d delay %.3f + %.3f = %.3f (ms)',
74 direction, len(data_bytes), self.
latency_one_way_slatency_one_way_s * 1000, t_bandwidth * 1000, t * 1000
82 def __init__(self, seek_rate_byte_per_s: float, read_rate_byte_per_s: float, realtime: bool =
False):
89 def get(self, file_position: int, length: int) -> bytes:
97 logger.debug(
'SERVER: fpos %d length %d delay %.3f (ms)', file_position, length, t * 1000)
98 logger.info(
'SERVER: fpos %d length %d delay %.3f (ms)', file_position, length, t * 1000)
105 @dataclasses.dataclass
116 def __init__(self, comms: Communications, server: Server):
120 def run(self, seek_reads: typing.Tuple[typing.Tuple[int, int], ...], greedy_length: int) -> RunResult:
121 time_start = time.perf_counter()
122 svf = svfsc.cSVF(
'ID')
124 has_hits = has_misses = 0
126 for fpos_demand, length_demand
in seek_reads:
127 minimal_bytes += length_demand
128 blocks = [f
'({fpos:,d} : {length:,d} : {fpos + length:,d})' for fpos, length
in svf.blocks()]
129 logger.debug(
'CLIENT: blocks was: %s', blocks)
131 f
'CLIENT: demands fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})')
132 time_svf_start = time.perf_counter()
133 has_data = svf.has_data(fpos_demand, length_demand)
134 time_svf += time.perf_counter() - time_svf_start
136 time_svf_start = time.perf_counter()
137 need = svf.need(fpos_demand, length_demand, greedy_length)
138 time_svf += time.perf_counter() - time_svf_start
139 logger.debug(f
'CLIENT: need {need}')
140 for fpos, length
in need:
141 logger.debug(f
'CLIENT: need fpos {fpos:16,d} length {length:6,d} ({fpos + length:16,d})')
142 logger.info(f
'CLIENT: -> need fpos {fpos:16,d} length {length:6,d}')
144 client_server_message = f
'GET File position {fpos} length {length}'.encode(
'ascii')
145 self.
commscomms.transmit(client_server_message,
'Client->Server')
146 result = self.
serverserver.get(fpos, length)
147 self.
commscomms.transmit(result,
'Server->Client')
148 time_svf_start = time.perf_counter()
149 svf.write(fpos, result)
150 time_svf += time.perf_counter() - time_svf_start
152 f
'CLIENT: wrote fpos {fpos:16,d} length {len(result):6,d} ({fpos + len(result):16,d})')
153 logger.info(
'CLIENT: <- wrote fpos %d length %d delay %.3f (ms)', fpos, length, time_svf * 1000)
154 if not svf.has_data(fpos_demand, length_demand):
156 f
'CLIENT: demands fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})'
158 blocks = [f
'({fpos:,d} : {length:,d} : {fpos + length:,d})' for fpos, length
in svf.blocks()]
159 logger.error(
'CLIENT: blocks now: %s', blocks)
164 f
'CLIENT: has fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})'
168 time_svf_start = time.perf_counter()
169 svf.read(fpos_demand, length_demand)
170 time_svf += time.perf_counter() - time_svf_start
171 time_exec = time.perf_counter() - time_start
172 time_exec = self.
commscomms.time_total + self.
serverserver.total_time + time_svf
173 logger.info(
'has(): hits: %d misses: %d', has_hits, has_misses)
175 'Blocks: %d bytes: %d sizeof: %d overhead: %d', svf.num_blocks(), svf.num_bytes(), svf.size_of(),
176 svf.size_of() - svf.num_bytes()
178 if self.
commscomms.time_total:
180 f
'Comms laten: {self.comms.time_latency * 1000:10.3f} (ms)'
181 f
' ({self.comms.time_latency / self.comms.time_total:6.1%}) of Comms total.'
184 f
'Comms bwidt: {self.comms.time_bandwidth * 1000:10.3f} (ms)'
185 f
' ({self.comms.time_bandwidth / self.comms.time_total:6.1%}) of Comms total.'
189 f
'Comms laten: {self.comms.time_latency * 1000:10.3f} (ms)'
190 f
' ({"N/A":6}) of Comms total.'
193 f
'Comms bwidt: {self.comms.time_bandwidth * 1000:10.3f} (ms)'
194 f
' ({"N/A":6}) of Comms total.'
196 percent_str =
'+' * int(0.5 + 50 * self.
commscomms.time_total / time_exec)
198 f
'Comms time : {self.comms.time_total * 1000:10.3f} (ms) ({self.comms.time_total / time_exec:6.1%})'
201 percent_str =
'+' * int(0.5 + 50 * self.
serverserver.total_time / time_exec)
203 f
'Server time: {self.server.total_time * 1000:10.3f} (ms) ({self.server.total_time / time_exec:6.1%})'
206 percent_str =
'+' * int(0.5 + 50 * time_svf / time_exec)
208 f
'SVF time : {time_svf * 1000:10.3f} (ms) ({time_svf / time_exec:6.1%})'
217 logger.info(f
'Total : {time_exec * 1000:10.3f} (ms) ({time_exec / time_exec:6.1%})')
218 logger.info(
'SVF contents: %s Execution time: %.3f (s) %.3f (Mb/s)',
219 svf.num_bytes(), time_exec, svf.num_bytes() / time_exec / 1024 ** 2
221 return RunResult(has_hits, has_misses, minimal_bytes, svf.num_bytes(), svf.size_of(),
222 self.
commscomms.time_total + self.
serverserver.total_time + time_svf
227 events: typing.Tuple[typing.Tuple[int, int], ...],
229 latency_s: float, bandwidth_bit_ps: float,
230 seek_rate_byte_per_s: float, read_rate_byte_per_s: float, realtime: bool
232 comms =
Communications(latency_s, bandwidth_bit_ps, realtime=realtime)
233 server =
Server(seek_rate_byte_per_s, read_rate_byte_per_s, realtime=realtime)
234 client =
Client(comms, server)
235 return client.run(events, greedy_length)
239 time_start = time.perf_counter()
241 parser = argparse.ArgumentParser(description=
'Simulate reading into a SVF.', prog=__file__)
242 parser.add_argument(
'-l',
'--log-level', dest=
'log_level', type=int, default=20,
243 help=
'Log level. [default: %(default)d]'
245 parser.add_argument(
'--latency', type=float, default=10,
246 help=
'Communications channel latency (NOTE: one way) in ms. [default: %(default)d]')
247 parser.add_argument(
'--bandwidth', type=float, default=50,
248 help=
'Communications channel bandwidth in million bits per second.'
249 ' Zero is infinite bandwidth. [default: %(default)d]')
250 parser.add_argument(
'--seek-rate', type=float, default=10000,
251 help=
'Server seek rate in million bytes per second. [default: %(default)d]')
252 parser.add_argument(
'--read-rate', type=float, default=50,
253 help=
'Server read rate in million bytes per second. [default: %(default)d]')
254 parser.add_argument(
'--get-cost', type=float, default=0.0005,
255 help=
'The cost of GET in currency per 1000 GET requests. [default: %(default)d]')
256 parser.add_argument(
'--egress-cost', type=float, default=0.1,
257 help=
'The cost of egress data in currency per GB. [default: %(default)d]')
258 parser.add_argument(
'--greedy-length', type=int, default=-1,
260 'The greedy length to read fragments from the server.'
261 ' Zero means read every fragment.'
262 ' Default is to run through a range of greedy lengths and report the performance.'
263 ' [default: %(default)d]'
266 parser.add_argument(
'--realtime', action=
"store_true", default=
False,
267 help=
'Run in realtime (may be slow). [default: %(default)d]')
268 args = parser.parse_args()
270 logging.basicConfig(level=args.log_level, format=LOG_FORMAT_NO_PROCESS, stream=sys.stdout)
272 results_time: typing.Dict[str, typing.List[typing.Tuple[int, RunResult]]] = {}
273 print(
'Simulator setup:')
274 print(f
'Network latency (one way) {args.latency:.3f} (ms) bandwidth {args.bandwidth:.3f} (M bits/s)')
275 print(f
'Server seek rate {args.seek_rate:.3f} (M bytes/s) read rate {args.read_rate:.3f} (M bytes/s)')
276 print(f
'Cost GET {args.get_cost:.6f} (per 1000 GET requests) egress {args.egress_cost:.3f} (per GB)')
279 t_start = time.perf_counter()
280 for name
in sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS:
281 if args.greedy_length == -1:
285 while greedy_length <= 1 << 22:
286 logger.info(
'Running %s with %d file actions and greedy_length %d', name,
287 len(sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name]), greedy_length)
289 sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name], greedy_length,
290 args.latency / 1000, args.bandwidth * 1e6, args.seek_rate * 1e6, args.read_rate * 1e6, args.realtime
292 if name
not in results_time:
293 results_time[name] = []
294 results_time[name].append((greedy_length, result))
303 logger.info(
'Running %s with %d file actions and greedy_length %d', name,
304 len(sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name]), args.greedy_length)
306 sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name], args.greedy_length,
307 args.latency / 1000, args.bandwidth * 1e6, args.seek_rate * 1e6, args.read_rate * 1e6, args.realtime
309 if name
not in results_time:
310 results_time[name] = []
311 results_time[name].append((args.greedy_length, result))
312 for key
in results_time:
314 file_size = sim_examples.EXAMPLE_FILE_POSITIONS_SIZES[key]
315 whole_file_get_cost = 1 * args.get_cost / 1000
316 whole_file_egress_cost = file_size * args.egress_cost / (1 << 30)
317 whole_file_total_cost = whole_file_get_cost + whole_file_egress_cost
320 f
'{"greedy_length":>14} {"Time(ms)":>10}'
321 f
' {"Hits":>8} {"Miss":>8} {"Hits%":>8}'
322 f
' {"Min. Bytes":>12} {"Act. Bytes":>12} {"Act. / Min.":>12}'
323 f
' {"sizeof":>10} {"Overhead":>8} {"sizeof / Act.":>14}'
324 f
' {"GET Cost":>12} {"Egress Cost":>12} {"Total Cost":>12}'
325 f
' {"Whole File Cost":>15}'
327 for greedy_length, result
in results_time[key]:
328 get_cost = result.cache_misses * args.get_cost / 1000
329 egress_cost = result.num_bytes * args.egress_cost / (1 << 30)
331 f
'{greedy_length:14} {result.time_exec * 1000 :10.1f} {result.cache_hits:8d} {result.cache_misses:8d}'
332 f
' {result.cache_hits / (result.cache_hits + result.cache_misses):8.3%}'
333 f
' {result.minimal_bytes:12d} {result.num_bytes:12d} {result.num_bytes / result.minimal_bytes:12.3%}'
334 f
' {result.sizeof:10d} {result.sizeof - result.num_bytes:+8d} {result.sizeof / result.num_bytes:14.3%}'
335 f
' {get_cost:12.6f} {egress_cost:12.6f} {get_cost + egress_cost:12.6f} {whole_file_total_cost:15.6f}'
338 f
'Cost of downloading complete file'
339 f
' [{file_size / (1 << 30):.6f} GB]:'
340 f
' GET: {whole_file_get_cost:.6f}'
341 f
' Egress: {whole_file_egress_cost:.6f}'
342 f
' Total: {whole_file_total_cost:.6f}'
344 print(f
'Execution time: {time.perf_counter() - time_start:10.3f} (s)')
348 if __name__ ==
'__main__':
RunResult run(self, typing.Tuple[typing.Tuple[int, int],...] seek_reads, int greedy_length)
def __init__(self, Communications comms, Server server)
None transmit(self, bytes data_bytes, str direction)
def __init__(self, float latency_one_way_s, float bandwidth_bps, bool realtime=False)
def __init__(self, float seek_rate_byte_per_s, float read_rate_byte_per_s, bool realtime=False)
bytes get(self, int file_position, int length)
RunResult run(typing.Tuple[typing.Tuple[int, int],...] events, int greedy_length, float latency_s, float bandwidth_bit_ps, float seek_rate_byte_per_s, float read_rate_byte_per_s, bool realtime)