Sparse Virtual File System  0.4.0
A Sparse Virtual File System.
simulator.py
Go to the documentation of this file.
1 """
2 MIT License
3 
4 Copyright (c) 2020-2024 Paul Ross
5 
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
12 
13 The above copyright notice and this permission notice shall be included in all
14 copies or substantial portions of the Software.
15 
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 SOFTWARE.
23 """
24 import argparse
25 import dataclasses
26 import logging
27 import pprint
28 import sys
29 import time
30 import typing
31 
32 import svfsc
33 
34 import src.cpy.sim_examples as sim_examples
35 
36 logger = logging.getLogger(__file__)
37 
38 LOG_FORMAT_VERBOSE = (
39  '%(asctime)s - %(filename)24s#%(lineno)-4d - %(process)5d - (%(threadName)-10s) - %(levelname)-8s - %(message)s'
40 )
41 
42 LOG_FORMAT_NO_THREAD = (
43  '%(asctime)s - %(filename)24s#%(lineno)-4d - %(process)5d - %(levelname)-8s - %(message)s'
44 )
45 
46 LOG_FORMAT_NO_PROCESS = (
47  '%(asctime)s - %(filename)12s#%(lineno)-4d - %(levelname)-8s - %(message)s'
48 )
49 
50 
52  """Represents the delay over a communication line."""
53 
54  def __init__(self, latency_one_way_s: float, bandwidth_bps: float, realtime: bool = False):
55  """Bandwidth 0.0 means infinite bandwidth."""
56  self.latency_one_way_slatency_one_way_s = latency_one_way_s
57  self.bandwidth_bpsbandwidth_bps = bandwidth_bps
58  self.realtimerealtime = realtime
59  self.time_latencytime_latency = 0.0
60  self.time_bandwidthtime_bandwidth = 0.0
61  self.time_totaltime_total = 0.0
62 
63  def transmit(self, data_bytes: bytes, direction: str) -> None:
64  t = self.latency_one_way_slatency_one_way_s
65  self.time_latencytime_latency += self.latency_one_way_slatency_one_way_s
66  if self.bandwidth_bpsbandwidth_bps:
67  t_bandwidth = 8 * len(data_bytes) / self.bandwidth_bpsbandwidth_bps
68  t += t_bandwidth
69  self.time_bandwidthtime_bandwidth += t_bandwidth
70  logger.debug('COMMS_: %s length %d delay %.3f (ms)', direction, len(data_bytes), t * 1000)
71  self.time_totaltime_total += t
72  if self.realtimerealtime:
73  time.sleep(t)
74 
75 
76 class Server:
77  def __init__(self, seek_rate_byte_per_s: float, read_rate_byte_per_s: float, realtime: bool = False):
78  self.seek_rate_byte_per_sseek_rate_byte_per_s = seek_rate_byte_per_s
79  self.read_rate_byte_per_sread_rate_byte_per_s = read_rate_byte_per_s
80  self.realtimerealtime = realtime
81  self.file_positionfile_position = 0
82  self.total_timetotal_time = 0.0
83 
84  def get(self, file_position: int, length: int) -> bytes:
85  t = abs(file_position - self.file_positionfile_position) / self.seek_rate_byte_per_sseek_rate_byte_per_s
86  t += length / self.read_rate_byte_per_sread_rate_byte_per_s
87  self.total_timetotal_time += t
88  logger.debug('SERVER: fpos %d length %d delay %.3f (ms)', file_position, length, t * 1000)
89  if self.realtimerealtime:
90  time.sleep(t)
91  self.file_positionfile_position = file_position
92  return b' ' * length
93 
94 
95 @dataclasses.dataclass
96 class RunResult:
97  has_hits: int
98  has_misses: int
99  minimal_bytes: int
100  num_bytes: int
101  sizeof: int
102  time_exec: float
103 
104 
105 class Client:
106  def __init__(self, comms: Communications, server: Server):
107  self.commscomms = comms
108  self.serverserver = server
109 
110  def run(self, seek_reads: typing.Tuple[typing.Tuple[int, int], ...], greedy_length: int) -> RunResult:
111  time_start = time.perf_counter()
112  svf = svfsc.cSVF('ID')
113  time_svf = 0.0
114  has_hits = has_misses = 0
115  minimal_bytes = 0
116  for fpos_demand, length_demand in seek_reads:
117  minimal_bytes += length_demand
118  blocks = [f'({fpos:,d} : {length:,d} : {fpos + length:,d})' for fpos, length in svf.blocks()]
119  logger.debug('CLIENT: blocks was: %s', blocks)
120  logger.debug(
121  f'CLIENT: demands fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})')
122  time_svf_start = time.perf_counter()
123  has_data = svf.has_data(fpos_demand, length_demand)
124  time_svf += time.perf_counter() - time_svf_start
125  if not has_data:
126  time_svf_start = time.perf_counter()
127  need = svf.need(fpos_demand, length_demand, greedy_length)
128  time_svf += time.perf_counter() - time_svf_start
129  logger.debug(f'CLIENT: need {need}')
130  for fpos, length in need:
131  logger.debug(f'CLIENT: need fpos {fpos:16,d} length {length:6,d} ({fpos + length:16,d})')
132  # Crude simulation of a GET request.
133  client_server_message = f'GET File position {fpos} length {length}'.encode('ascii')
134  self.commscomms.transmit(client_server_message, 'Client->Server')
135  result = self.serverserver.get(fpos, length)
136  self.commscomms.transmit(result, 'Server->Client')
137  time_svf_start = time.perf_counter()
138  svf.write(fpos, result)
139  time_svf += time.perf_counter() - time_svf_start
140  logger.debug(
141  f'CLIENT: wrote fpos {fpos:16,d} length {len(result):6,d} ({fpos + len(result):16,d})')
142  if not svf.has_data(fpos_demand, length_demand):
143  logger.error(
144  f'CLIENT: demands fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})'
145  )
146  blocks = [f'({fpos:,d} : {length:,d} : {fpos + length:,d})' for fpos, length in svf.blocks()]
147  logger.error('CLIENT: blocks now: %s', blocks)
148  assert 0
149  has_misses += 1
150  else:
151  logger.debug(
152  f'CLIENT: has fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})'
153  )
154  has_hits += 1
155 
156  time_svf_start = time.perf_counter()
157  svf.read(fpos_demand, length_demand)
158  time_svf += time.perf_counter() - time_svf_start
159  time_exec = time.perf_counter() - time_start
160  time_exec = self.commscomms.time_total + self.serverserver.total_time + time_svf
161  logger.info('has(): hits: %d misses: %d', has_hits, has_misses)
162  logger.info(
163  'Blocks: %d bytes: %d sizeof: %d overhead: %d', svf.num_blocks(), svf.num_bytes(), svf.size_of(),
164  svf.size_of() - svf.num_bytes()
165  )
166  logger.info(
167  f'Comms laten: {self.comms.time_latency * 1000:10.3f} (ms)'
168  f' ({self.comms.time_latency / self.comms.time_total:6.1%}) of Comms total.'
169  )
170  logger.info(
171  f'Comms bwidt: {self.comms.time_bandwidth * 1000:10.3f} (ms)'
172  f' ({self.comms.time_bandwidth / self.comms.time_total:6.1%}) of Comms total.'
173  )
174  percent_str = '+' * int(0.5 + 50 * self.commscomms.time_total / time_exec)
175  logger.info(
176  f'Comms time : {self.comms.time_total * 1000:10.3f} (ms) ({self.comms.time_total / time_exec:6.1%})'
177  f' {percent_str}'
178  )
179  percent_str = '+' * int(0.5 + 50 * self.serverserver.total_time / time_exec)
180  logger.info(
181  f'Server time: {self.server.total_time * 1000:10.3f} (ms) ({self.server.total_time / time_exec:6.1%})'
182  f' {percent_str}'
183  )
184  percent_str = '+' * int(0.5 + 50 * time_svf / time_exec)
185  logger.info(
186  f'SVF time : {time_svf * 1000:10.3f} (ms) ({time_svf / time_exec:6.1%})'
187  f' {percent_str}'
188  )
189  # time_residual = time_exec - self.comms.total_time - self.server.total_time - time_svf
190  # percent_str = '+' * int(0.5 + 50 * time_residual / time_exec)
191  # logger.info(
192  # f'Residual : {time_residual * 1000:10.3f} (ms) ({time_residual / time_exec:6.1%})'
193  # f' {percent_str}'
194  # )
195  logger.info(f'Total : {time_exec * 1000:10.3f} (ms) ({time_exec / time_exec:6.1%})')
196  logger.info('SVF contents: %s Execution time: %.3f (s) %.3f (Mb/s)',
197  svf.num_bytes(), time_exec, svf.num_bytes() / time_exec / 1024 ** 2
198  )
199  return RunResult(has_hits, has_misses, minimal_bytes, svf.num_bytes(), svf.size_of(),
200  self.commscomms.time_total + self.serverserver.total_time + time_svf
201  )
202 
203 
204 def run(
205  events: typing.Tuple[typing.Tuple[int, int], ...],
206  greedy_length: int,
207  latency_s: float, bandwidth_bit_ps: float,
208  seek_rate_byte_per_s: float, read_rate_byte_per_s: float, realtime: bool
209 ) -> RunResult:
210  comms = Communications(latency_s, bandwidth_bit_ps, realtime=realtime)
211  server = Server(seek_rate_byte_per_s, read_rate_byte_per_s, realtime=realtime)
212  client = Client(comms, server)
213  return client.run(events, greedy_length)
214 
215 
216 def main():
217  time_start = time.perf_counter()
218  result = 0
219  parser = argparse.ArgumentParser(description='Simulate reading into a SVF.', prog=__file__)
220  parser.add_argument('-l', '--log-level', dest='log_level', type=int, default=20, help='Log level.')
221  parser.add_argument('--latency', type=float, default=10,
222  help='Communications channel latency (NOTE: one way) in ms. [default: %(default)d]')
223  parser.add_argument('--bandwidth', type=float, default=50,
224  help='Communications channel bandwidth in million bits per second.'
225  ' Zero is infinite bandwidth. [default: %(default)d]')
226  parser.add_argument('--seek-rate', type=float, default=10000,
227  help='Server seek rate in million bytes per second. [default: %(default)d]')
228  parser.add_argument('--read-rate', type=float, default=50,
229  help='Server read rate in million bytes per second. [default: %(default)d]')
230  parser.add_argument('--greedy-length', type=int, default=-1,
231  help=(
232  'The greedy length to read fragments from the server.'
233  ' Zero means read every fragment.'
234  ' Default is to run through a range of greedy lengths and report the performance.'
235  ' [default: %(default)d]'
236  )
237  )
238  parser.add_argument('--realtime', action="store_true", default=False,
239  help='Run in realtime (may be slow). [default: %(default)d]')
240  args = parser.parse_args()
241  # print('Args:', args)
242  logging.basicConfig(level=args.log_level, format=LOG_FORMAT_NO_PROCESS, stream=sys.stdout)
243 
244  results_time: typing.Dict[str, typing.List[typing.Tuple[int, RunResult]]] = {}
245  print('Simulator setup:')
246  print(f'Network latency (one way) {args.latency:.3f} (ms) bandwidth {args.bandwidth:.3f} (M bits/s)')
247  print(f'Server seek rate {args.seek_rate:.3f} (M bytes/s) read rate {args.read_rate:.3f} (M bytes/s)')
248  # for name in ('EXAMPLE_FILE_POSITIONS_LENGTHS_TIFF_CMU_1',):
249  # for name in ('EXAMPLE_FILE_POSITIONS_LENGTHS_SYNTHETIC',):
250  t_start = time.perf_counter()
251  for name in sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS:
252  if args.greedy_length == -1: # Default greedy-length, use a range
253  greedy_length = 0
254  # for greedy_length in (1024,):
255  # for greedy_length in range(0, 1024 + 32, 32):
256  while greedy_length <= 2048 * 4 * 4 * 4:
257  logger.info('Running %s with %d file actions and greedy_length %d', name,
258  len(sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name]), greedy_length)
259  result = run(
260  sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name], greedy_length,
261  args.latency / 1000, args.bandwidth * 1e6, args.seek_rate * 1e6, args.read_rate * 1e6, args.realtime
262  )
263  if name not in results_time:
264  results_time[name] = []
265  results_time[name].append((greedy_length, result))
266  if greedy_length == 0:
267  greedy_length = 1
268  elif greedy_length == 1:
269  greedy_length = 16
270  else:
271  greedy_length *= 2
272  else:
273  logger.info('Running %s with %d file actions and greedy_length %d', name,
274  len(sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name]), args.greedy_length)
275  result = run(
276  sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name], args.greedy_length,
277  args.latency / 1000, args.bandwidth * 1e6, args.seek_rate * 1e6, args.read_rate * 1e6, args.realtime
278  )
279  if name not in results_time:
280  results_time[name] = []
281  results_time[name].append((args.greedy_length, result))
282  for key in results_time:
283  print(f'{key}:')
284  print(
285  f'{"greedy_length":>14} {"Time(ms)":>10}'
286  f' {"Hits":>8} {"Miss":>8} {"Hits%":>8}'
287  f' {"Min. Bytes":>12} {"Act. Bytes":>12} {"Act. / Min.":>12}'
288  f' {"sizeof":>10} {"Overhead":>8} {"sizeof / Act.":>14}'
289  )
290  for greedy_length, result in results_time[key]:
291  print(
292  f'{greedy_length:14} {result.time_exec * 1000 :10.1f} {result.has_hits:8d} {result.has_misses:8d}'
293  f' {result.has_hits / (result.has_hits + result.has_misses):8.3%}'
294  f' {result.minimal_bytes:12d} {result.num_bytes:12d} {result.num_bytes / result.minimal_bytes:12.3%}'
295  f' {result.sizeof:10d} {result.sizeof - result.num_bytes:+8d} {result.sizeof / result.num_bytes:14.3%}'
296  )
297  print(f'Execution time: {time.perf_counter() - time_start:10.3f} (s)')
298  return 0
299 
300 
301 if __name__ == '__main__':
302  exit(main())
RunResult run(self, typing.Tuple[typing.Tuple[int, int],...] seek_reads, int greedy_length)
Definition: simulator.py:110
def __init__(self, Communications comms, Server server)
Definition: simulator.py:106
None transmit(self, bytes data_bytes, str direction)
Definition: simulator.py:63
def __init__(self, float latency_one_way_s, float bandwidth_bps, bool realtime=False)
Definition: simulator.py:54
def __init__(self, float seek_rate_byte_per_s, float read_rate_byte_per_s, bool realtime=False)
Definition: simulator.py:77
bytes get(self, int file_position, int length)
Definition: simulator.py:84
RunResult run(typing.Tuple[typing.Tuple[int, int],...] events, int greedy_length, float latency_s, float bandwidth_bit_ps, float seek_rate_byte_per_s, float read_rate_byte_per_s, bool realtime)
Definition: simulator.py:209