Sparse Virtual File System  0.4.1
A Sparse Virtual File System.
simulator.py
Go to the documentation of this file.
1 """
2 MIT License
3 
4 Copyright (c) 2020-2025 Paul Ross
5 
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
12 
13 The above copyright notice and this permission notice shall be included in all
14 copies or substantial portions of the Software.
15 
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 SOFTWARE.
23 """
24 import argparse
25 import dataclasses
26 import logging
27 import pprint
28 import sys
29 import time
30 import typing
31 
32 import svfsc
33 
34 import src.cpy.sim_examples as sim_examples
35 
36 logger = logging.getLogger(__file__)
37 
38 LOG_FORMAT_VERBOSE = (
39  '%(asctime)s - %(filename)24s#%(lineno)-4d - %(process)5d - (%(threadName)-10s) - %(levelname)-8s - %(message)s'
40 )
41 
42 LOG_FORMAT_NO_THREAD = (
43  '%(asctime)s - %(filename)24s#%(lineno)-4d - %(process)5d - %(levelname)-8s - %(message)s'
44 )
45 
46 LOG_FORMAT_NO_PROCESS = (
47  '%(asctime)s - %(filename)12s#%(lineno)-4d - %(levelname)-8s - %(message)s'
48 )
49 
50 
52  """Represents the delay over a communication line."""
53 
54  def __init__(self, latency_one_way_s: float, bandwidth_bps: float, realtime: bool = False):
55  """Bandwidth 0.0 means infinite bandwidth."""
56  self.latency_one_way_slatency_one_way_s = latency_one_way_s
57  self.bandwidth_bpsbandwidth_bps = bandwidth_bps
58  self.realtimerealtime = realtime
59  self.time_latencytime_latency = 0.0
60  self.time_bandwidthtime_bandwidth = 0.0
61  self.time_totaltime_total = 0.0
62 
63  def transmit(self, data_bytes: bytes, direction: str) -> None:
64  t = self.latency_one_way_slatency_one_way_s
65  self.time_latencytime_latency += self.latency_one_way_slatency_one_way_s
66  t_bandwidth = 0
67  if self.bandwidth_bpsbandwidth_bps:
68  t_bandwidth = 8 * len(data_bytes) / self.bandwidth_bpsbandwidth_bps
69  t += t_bandwidth
70  self.time_bandwidthtime_bandwidth += t_bandwidth
71  logger.debug('COMMS_: %s length %d delay %.3f (ms)', direction, len(data_bytes), t * 1000)
72  logger.info(
73  'COMMS_: %s length %d delay %.3f + %.3f = %.3f (ms)',
74  direction, len(data_bytes), self.latency_one_way_slatency_one_way_s * 1000, t_bandwidth * 1000, t * 1000
75  )
76  self.time_totaltime_total += t
77  if self.realtimerealtime:
78  time.sleep(t)
79 
80 
81 class Server:
82  def __init__(self, seek_rate_byte_per_s: float, read_rate_byte_per_s: float, realtime: bool = False):
83  self.seek_rate_byte_per_sseek_rate_byte_per_s = seek_rate_byte_per_s
84  self.read_rate_byte_per_sread_rate_byte_per_s = read_rate_byte_per_s
85  self.realtimerealtime = realtime
86  self.file_positionfile_position = 0
87  self.total_timetotal_time = 0.0
88 
89  def get(self, file_position: int, length: int) -> bytes:
90  if self.seek_rate_byte_per_sseek_rate_byte_per_s:
91  t = abs(file_position - self.file_positionfile_position) / self.seek_rate_byte_per_sseek_rate_byte_per_s
92  else:
93  t = 0.0
94  if self.read_rate_byte_per_sread_rate_byte_per_s:
95  t += length / self.read_rate_byte_per_sread_rate_byte_per_s
96  self.total_timetotal_time += t
97  logger.debug('SERVER: fpos %d length %d delay %.3f (ms)', file_position, length, t * 1000)
98  logger.info('SERVER: fpos %d length %d delay %.3f (ms)', file_position, length, t * 1000)
99  if self.realtimerealtime:
100  time.sleep(t)
101  self.file_positionfile_position = file_position
102  return b' ' * length
103 
104 
105 @dataclasses.dataclass
106 class RunResult:
107  cache_hits: int
108  cache_misses: int
109  minimal_bytes: int
110  num_bytes: int
111  sizeof: int
112  time_exec: float
113 
114 
115 class Client:
116  def __init__(self, comms: Communications, server: Server):
117  self.commscomms = comms
118  self.serverserver = server
119 
120  def run(self, seek_reads: typing.Tuple[typing.Tuple[int, int], ...], greedy_length: int) -> RunResult:
121  time_start = time.perf_counter()
122  svf = svfsc.cSVF('ID')
123  time_svf = 0.0
124  has_hits = has_misses = 0
125  minimal_bytes = 0
126  for fpos_demand, length_demand in seek_reads:
127  minimal_bytes += length_demand
128  blocks = [f'({fpos:,d} : {length:,d} : {fpos + length:,d})' for fpos, length in svf.blocks()]
129  logger.debug('CLIENT: blocks was: %s', blocks)
130  logger.debug(
131  f'CLIENT: demands fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})')
132  time_svf_start = time.perf_counter()
133  has_data = svf.has_data(fpos_demand, length_demand)
134  time_svf += time.perf_counter() - time_svf_start
135  if not has_data:
136  time_svf_start = time.perf_counter()
137  need = svf.need(fpos_demand, length_demand, greedy_length)
138  time_svf += time.perf_counter() - time_svf_start
139  logger.debug(f'CLIENT: need {need}')
140  for fpos, length in need:
141  logger.debug(f'CLIENT: need fpos {fpos:16,d} length {length:6,d} ({fpos + length:16,d})')
142  logger.info(f'CLIENT: -> need fpos {fpos:16,d} length {length:6,d}')
143  # Crude simulation of a GET request.
144  client_server_message = f'GET File position {fpos} length {length}'.encode('ascii')
145  self.commscomms.transmit(client_server_message, 'Client->Server')
146  result = self.serverserver.get(fpos, length)
147  self.commscomms.transmit(result, 'Server->Client')
148  time_svf_start = time.perf_counter()
149  svf.write(fpos, result)
150  time_svf += time.perf_counter() - time_svf_start
151  logger.debug(
152  f'CLIENT: wrote fpos {fpos:16,d} length {len(result):6,d} ({fpos + len(result):16,d})')
153  logger.info('CLIENT: <- wrote fpos %d length %d delay %.3f (ms)', fpos, length, time_svf * 1000)
154  if not svf.has_data(fpos_demand, length_demand):
155  logger.error(
156  f'CLIENT: demands fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})'
157  )
158  blocks = [f'({fpos:,d} : {length:,d} : {fpos + length:,d})' for fpos, length in svf.blocks()]
159  logger.error('CLIENT: blocks now: %s', blocks)
160  assert 0
161  has_misses += 1
162  else:
163  logger.debug(
164  f'CLIENT: has fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})'
165  )
166  has_hits += 1
167 
168  time_svf_start = time.perf_counter()
169  svf.read(fpos_demand, length_demand)
170  time_svf += time.perf_counter() - time_svf_start
171  time_exec = time.perf_counter() - time_start
172  time_exec = self.commscomms.time_total + self.serverserver.total_time + time_svf
173  logger.info('has(): hits: %d misses: %d', has_hits, has_misses)
174  logger.info(
175  'Blocks: %d bytes: %d sizeof: %d overhead: %d', svf.num_blocks(), svf.num_bytes(), svf.size_of(),
176  svf.size_of() - svf.num_bytes()
177  )
178  if self.commscomms.time_total:
179  logger.info(
180  f'Comms laten: {self.comms.time_latency * 1000:10.3f} (ms)'
181  f' ({self.comms.time_latency / self.comms.time_total:6.1%}) of Comms total.'
182  )
183  logger.info(
184  f'Comms bwidt: {self.comms.time_bandwidth * 1000:10.3f} (ms)'
185  f' ({self.comms.time_bandwidth / self.comms.time_total:6.1%}) of Comms total.'
186  )
187  else:
188  logger.info(
189  f'Comms laten: {self.comms.time_latency * 1000:10.3f} (ms)'
190  f' ({"N/A":6}) of Comms total.'
191  )
192  logger.info(
193  f'Comms bwidt: {self.comms.time_bandwidth * 1000:10.3f} (ms)'
194  f' ({"N/A":6}) of Comms total.'
195  )
196  percent_str = '+' * int(0.5 + 50 * self.commscomms.time_total / time_exec)
197  logger.info(
198  f'Comms time : {self.comms.time_total * 1000:10.3f} (ms) ({self.comms.time_total / time_exec:6.1%})'
199  f' {percent_str}'
200  )
201  percent_str = '+' * int(0.5 + 50 * self.serverserver.total_time / time_exec)
202  logger.info(
203  f'Server time: {self.server.total_time * 1000:10.3f} (ms) ({self.server.total_time / time_exec:6.1%})'
204  f' {percent_str}'
205  )
206  percent_str = '+' * int(0.5 + 50 * time_svf / time_exec)
207  logger.info(
208  f'SVF time : {time_svf * 1000:10.3f} (ms) ({time_svf / time_exec:6.1%})'
209  f' {percent_str}'
210  )
211  # time_residual = time_exec - self.comms.total_time - self.server.total_time - time_svf
212  # percent_str = '+' * int(0.5 + 50 * time_residual / time_exec)
213  # logger.info(
214  # f'Residual : {time_residual * 1000:10.3f} (ms) ({time_residual / time_exec:6.1%})'
215  # f' {percent_str}'
216  # )
217  logger.info(f'Total : {time_exec * 1000:10.3f} (ms) ({time_exec / time_exec:6.1%})')
218  logger.info('SVF contents: %s Execution time: %.3f (s) %.3f (Mb/s)',
219  svf.num_bytes(), time_exec, svf.num_bytes() / time_exec / 1024 ** 2
220  )
221  return RunResult(has_hits, has_misses, minimal_bytes, svf.num_bytes(), svf.size_of(),
222  self.commscomms.time_total + self.serverserver.total_time + time_svf
223  )
224 
225 
226 def run(
227  events: typing.Tuple[typing.Tuple[int, int], ...],
228  greedy_length: int,
229  latency_s: float, bandwidth_bit_ps: float,
230  seek_rate_byte_per_s: float, read_rate_byte_per_s: float, realtime: bool
231 ) -> RunResult:
232  comms = Communications(latency_s, bandwidth_bit_ps, realtime=realtime)
233  server = Server(seek_rate_byte_per_s, read_rate_byte_per_s, realtime=realtime)
234  client = Client(comms, server)
235  return client.run(events, greedy_length)
236 
237 
238 def main():
239  time_start = time.perf_counter()
240  result = 0
241  parser = argparse.ArgumentParser(description='Simulate reading into a SVF.', prog=__file__)
242  parser.add_argument('-l', '--log-level', dest='log_level', type=int, default=20,
243  help='Log level. [default: %(default)d]'
244  )
245  parser.add_argument('--latency', type=float, default=10,
246  help='Communications channel latency (NOTE: one way) in ms. [default: %(default)d]')
247  parser.add_argument('--bandwidth', type=float, default=50,
248  help='Communications channel bandwidth in million bits per second.'
249  ' Zero is infinite bandwidth. [default: %(default)d]')
250  parser.add_argument('--seek-rate', type=float, default=10000,
251  help='Server seek rate in million bytes per second. [default: %(default)d]')
252  parser.add_argument('--read-rate', type=float, default=50,
253  help='Server read rate in million bytes per second. [default: %(default)d]')
254  parser.add_argument('--get-cost', type=float, default=0.0005,
255  help='The cost of GET in currency per 1000 GET requests. [default: %(default)d]')
256  parser.add_argument('--egress-cost', type=float, default=0.1,
257  help='The cost of egress data in currency per GB. [default: %(default)d]')
258  parser.add_argument('--greedy-length', type=int, default=-1,
259  help=(
260  'The greedy length to read fragments from the server.'
261  ' Zero means read every fragment.'
262  ' Default is to run through a range of greedy lengths and report the performance.'
263  ' [default: %(default)d]'
264  )
265  )
266  parser.add_argument('--realtime', action="store_true", default=False,
267  help='Run in realtime (may be slow). [default: %(default)d]')
268  args = parser.parse_args()
269  # print('Args:', args)
270  logging.basicConfig(level=args.log_level, format=LOG_FORMAT_NO_PROCESS, stream=sys.stdout)
271 
272  results_time: typing.Dict[str, typing.List[typing.Tuple[int, RunResult]]] = {}
273  print('Simulator setup:')
274  print(f'Network latency (one way) {args.latency:.3f} (ms) bandwidth {args.bandwidth:.3f} (M bits/s)')
275  print(f'Server seek rate {args.seek_rate:.3f} (M bytes/s) read rate {args.read_rate:.3f} (M bytes/s)')
276  print(f'Cost GET {args.get_cost:.6f} (per 1000 GET requests) egress {args.egress_cost:.3f} (per GB)')
277  # for name in ('EXAMPLE_FILE_POSITIONS_LENGTHS_TIFF_CMU_1',):
278  # for name in ('EXAMPLE_FILE_POSITIONS_LENGTHS_SYNTHETIC',):
279  t_start = time.perf_counter()
280  for name in sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS:
281  if args.greedy_length == -1: # Default greedy-length, use a range
282  greedy_length = 1
283  # for greedy_length in (1024,):
284  # for greedy_length in range(0, 1024 + 32, 32):
285  while greedy_length <= 1 << 22: #2048 * 4 * 4 * 4 * 4:
286  logger.info('Running %s with %d file actions and greedy_length %d', name,
287  len(sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name]), greedy_length)
288  result = run(
289  sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name], greedy_length,
290  args.latency / 1000, args.bandwidth * 1e6, args.seek_rate * 1e6, args.read_rate * 1e6, args.realtime
291  )
292  if name not in results_time:
293  results_time[name] = []
294  results_time[name].append((greedy_length, result))
295  # if greedy_length == 0:
296  # greedy_length = 1
297  # elif greedy_length == 1:
298  # greedy_length = 16
299  # else:
300  # greedy_length *= 2
301  greedy_length *= 2
302  else:
303  logger.info('Running %s with %d file actions and greedy_length %d', name,
304  len(sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name]), args.greedy_length)
305  result = run(
306  sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name], args.greedy_length,
307  args.latency / 1000, args.bandwidth * 1e6, args.seek_rate * 1e6, args.read_rate * 1e6, args.realtime
308  )
309  if name not in results_time:
310  results_time[name] = []
311  results_time[name].append((args.greedy_length, result))
312  for key in results_time:
313  # Cost of downloading the complete file
314  file_size = sim_examples.EXAMPLE_FILE_POSITIONS_SIZES[key]
315  whole_file_get_cost = 1 * args.get_cost / 1000
316  whole_file_egress_cost = file_size * args.egress_cost / (1 << 30)
317  whole_file_total_cost = whole_file_get_cost + whole_file_egress_cost
318  print(f'{key}:')
319  print(
320  f'{"greedy_length":>14} {"Time(ms)":>10}'
321  f' {"Hits":>8} {"Miss":>8} {"Hits%":>8}'
322  f' {"Min. Bytes":>12} {"Act. Bytes":>12} {"Act. / Min.":>12}'
323  f' {"sizeof":>10} {"Overhead":>8} {"sizeof / Act.":>14}'
324  f' {"GET Cost":>12} {"Egress Cost":>12} {"Total Cost":>12}'
325  f' {"Whole File Cost":>15}'
326  )
327  for greedy_length, result in results_time[key]:
328  get_cost = result.cache_misses * args.get_cost / 1000
329  egress_cost = result.num_bytes * args.egress_cost / (1 << 30)
330  print(
331  f'{greedy_length:14} {result.time_exec * 1000 :10.1f} {result.cache_hits:8d} {result.cache_misses:8d}'
332  f' {result.cache_hits / (result.cache_hits + result.cache_misses):8.3%}'
333  f' {result.minimal_bytes:12d} {result.num_bytes:12d} {result.num_bytes / result.minimal_bytes:12.3%}'
334  f' {result.sizeof:10d} {result.sizeof - result.num_bytes:+8d} {result.sizeof / result.num_bytes:14.3%}'
335  f' {get_cost:12.6f} {egress_cost:12.6f} {get_cost + egress_cost:12.6f} {whole_file_total_cost:15.6f}'
336  )
337  print(
338  f'Cost of downloading complete file'
339  f' [{file_size / (1 << 30):.6f} GB]:'
340  f' GET: {whole_file_get_cost:.6f}'
341  f' Egress: {whole_file_egress_cost:.6f}'
342  f' Total: {whole_file_total_cost:.6f}'
343  )
344  print(f'Execution time: {time.perf_counter() - time_start:10.3f} (s)')
345  return 0
346 
347 
348 if __name__ == '__main__':
349  exit(main())
RunResult run(self, typing.Tuple[typing.Tuple[int, int],...] seek_reads, int greedy_length)
Definition: simulator.py:120
def __init__(self, Communications comms, Server server)
Definition: simulator.py:116
None transmit(self, bytes data_bytes, str direction)
Definition: simulator.py:63
def __init__(self, float latency_one_way_s, float bandwidth_bps, bool realtime=False)
Definition: simulator.py:54
def __init__(self, float seek_rate_byte_per_s, float read_rate_byte_per_s, bool realtime=False)
Definition: simulator.py:82
bytes get(self, int file_position, int length)
Definition: simulator.py:89
RunResult run(typing.Tuple[typing.Tuple[int, int],...] events, int greedy_length, float latency_s, float bandwidth_bit_ps, float seek_rate_byte_per_s, float read_rate_byte_per_s, bool realtime)
Definition: simulator.py:231