4 Copyright (c) 2020-2024 Paul Ross
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
13 The above copyright notice and this permission notice shall be included in all
14 copies or substantial portions of the Software.
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
36 logger = logging.getLogger(__file__)
38 LOG_FORMAT_VERBOSE = (
39 '%(asctime)s - %(filename)24s#%(lineno)-4d - %(process)5d - (%(threadName)-10s) - %(levelname)-8s - %(message)s'
42 LOG_FORMAT_NO_THREAD = (
43 '%(asctime)s - %(filename)24s#%(lineno)-4d - %(process)5d - %(levelname)-8s - %(message)s'
46 LOG_FORMAT_NO_PROCESS = (
47 '%(asctime)s - %(filename)12s#%(lineno)-4d - %(levelname)-8s - %(message)s'
52 """Represents the delay over a communication line."""
54 def __init__(self, latency_one_way_s: float, bandwidth_bps: float, realtime: bool =
False):
55 """Bandwidth 0.0 means infinite bandwidth."""
63 def transmit(self, data_bytes: bytes, direction: str) ->
None:
67 t_bandwidth = 8 * len(data_bytes) / self.
bandwidth_bpsbandwidth_bps
70 logger.debug(
'COMMS_: %s length %d delay %.3f (ms)', direction, len(data_bytes), t * 1000)
77 def __init__(self, seek_rate_byte_per_s: float, read_rate_byte_per_s: float, realtime: bool =
False):
84 def get(self, file_position: int, length: int) -> bytes:
88 logger.debug(
'SERVER: fpos %d length %d delay %.3f (ms)', file_position, length, t * 1000)
95 @dataclasses.dataclass
106 def __init__(self, comms: Communications, server: Server):
110 def run(self, seek_reads: typing.Tuple[typing.Tuple[int, int], ...], greedy_length: int) -> RunResult:
111 time_start = time.perf_counter()
112 svf = svfsc.cSVF(
'ID')
114 has_hits = has_misses = 0
116 for fpos_demand, length_demand
in seek_reads:
117 minimal_bytes += length_demand
118 blocks = [f
'({fpos:,d} : {length:,d} : {fpos + length:,d})' for fpos, length
in svf.blocks()]
119 logger.debug(
'CLIENT: blocks was: %s', blocks)
121 f
'CLIENT: demands fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})')
122 time_svf_start = time.perf_counter()
123 has_data = svf.has_data(fpos_demand, length_demand)
124 time_svf += time.perf_counter() - time_svf_start
126 time_svf_start = time.perf_counter()
127 need = svf.need(fpos_demand, length_demand, greedy_length)
128 time_svf += time.perf_counter() - time_svf_start
129 logger.debug(f
'CLIENT: need {need}')
130 for fpos, length
in need:
131 logger.debug(f
'CLIENT: need fpos {fpos:16,d} length {length:6,d} ({fpos + length:16,d})')
133 client_server_message = f
'GET File position {fpos} length {length}'.encode(
'ascii')
134 self.
commscomms.transmit(client_server_message,
'Client->Server')
135 result = self.
serverserver.get(fpos, length)
136 self.
commscomms.transmit(result,
'Server->Client')
137 time_svf_start = time.perf_counter()
138 svf.write(fpos, result)
139 time_svf += time.perf_counter() - time_svf_start
141 f
'CLIENT: wrote fpos {fpos:16,d} length {len(result):6,d} ({fpos + len(result):16,d})')
142 if not svf.has_data(fpos_demand, length_demand):
144 f
'CLIENT: demands fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})'
146 blocks = [f
'({fpos:,d} : {length:,d} : {fpos + length:,d})' for fpos, length
in svf.blocks()]
147 logger.error(
'CLIENT: blocks now: %s', blocks)
152 f
'CLIENT: has fpos {fpos_demand:16,d} length {length_demand:6,d} ({fpos_demand + length_demand:16,d})'
156 time_svf_start = time.perf_counter()
157 svf.read(fpos_demand, length_demand)
158 time_svf += time.perf_counter() - time_svf_start
159 time_exec = time.perf_counter() - time_start
160 time_exec = self.
commscomms.time_total + self.
serverserver.total_time + time_svf
161 logger.info(
'has(): hits: %d misses: %d', has_hits, has_misses)
163 'Blocks: %d bytes: %d sizeof: %d overhead: %d', svf.num_blocks(), svf.num_bytes(), svf.size_of(),
164 svf.size_of() - svf.num_bytes()
167 f
'Comms laten: {self.comms.time_latency * 1000:10.3f} (ms)'
168 f
' ({self.comms.time_latency / self.comms.time_total:6.1%}) of Comms total.'
171 f
'Comms bwidt: {self.comms.time_bandwidth * 1000:10.3f} (ms)'
172 f
' ({self.comms.time_bandwidth / self.comms.time_total:6.1%}) of Comms total.'
174 percent_str =
'+' * int(0.5 + 50 * self.
commscomms.time_total / time_exec)
176 f
'Comms time : {self.comms.time_total * 1000:10.3f} (ms) ({self.comms.time_total / time_exec:6.1%})'
179 percent_str =
'+' * int(0.5 + 50 * self.
serverserver.total_time / time_exec)
181 f
'Server time: {self.server.total_time * 1000:10.3f} (ms) ({self.server.total_time / time_exec:6.1%})'
184 percent_str =
'+' * int(0.5 + 50 * time_svf / time_exec)
186 f
'SVF time : {time_svf * 1000:10.3f} (ms) ({time_svf / time_exec:6.1%})'
195 logger.info(f
'Total : {time_exec * 1000:10.3f} (ms) ({time_exec / time_exec:6.1%})')
196 logger.info(
'SVF contents: %s Execution time: %.3f (s) %.3f (Mb/s)',
197 svf.num_bytes(), time_exec, svf.num_bytes() / time_exec / 1024 ** 2
199 return RunResult(has_hits, has_misses, minimal_bytes, svf.num_bytes(), svf.size_of(),
200 self.
commscomms.time_total + self.
serverserver.total_time + time_svf
205 events: typing.Tuple[typing.Tuple[int, int], ...],
207 latency_s: float, bandwidth_bit_ps: float,
208 seek_rate_byte_per_s: float, read_rate_byte_per_s: float, realtime: bool
210 comms =
Communications(latency_s, bandwidth_bit_ps, realtime=realtime)
211 server =
Server(seek_rate_byte_per_s, read_rate_byte_per_s, realtime=realtime)
212 client =
Client(comms, server)
213 return client.run(events, greedy_length)
217 time_start = time.perf_counter()
219 parser = argparse.ArgumentParser(description=
'Simulate reading into a SVF.', prog=__file__)
220 parser.add_argument(
'-l',
'--log-level', dest=
'log_level', type=int, default=20, help=
'Log level.')
221 parser.add_argument(
'--latency', type=float, default=10,
222 help=
'Communications channel latency (NOTE: one way) in ms. [default: %(default)d]')
223 parser.add_argument(
'--bandwidth', type=float, default=50,
224 help=
'Communications channel bandwidth in million bits per second.'
225 ' Zero is infinite bandwidth. [default: %(default)d]')
226 parser.add_argument(
'--seek-rate', type=float, default=10000,
227 help=
'Server seek rate in million bytes per second. [default: %(default)d]')
228 parser.add_argument(
'--read-rate', type=float, default=50,
229 help=
'Server read rate in million bytes per second. [default: %(default)d]')
230 parser.add_argument(
'--greedy-length', type=int, default=-1,
232 'The greedy length to read fragments from the server.'
233 ' Zero means read every fragment.'
234 ' Default is to run through a range of greedy lengths and report the performance.'
235 ' [default: %(default)d]'
238 parser.add_argument(
'--realtime', action=
"store_true", default=
False,
239 help=
'Run in realtime (may be slow). [default: %(default)d]')
240 args = parser.parse_args()
242 logging.basicConfig(level=args.log_level, format=LOG_FORMAT_NO_PROCESS, stream=sys.stdout)
244 results_time: typing.Dict[str, typing.List[typing.Tuple[int, RunResult]]] = {}
245 print(
'Simulator setup:')
246 print(f
'Network latency (one way) {args.latency:.3f} (ms) bandwidth {args.bandwidth:.3f} (M bits/s)')
247 print(f
'Server seek rate {args.seek_rate:.3f} (M bytes/s) read rate {args.read_rate:.3f} (M bytes/s)')
250 t_start = time.perf_counter()
251 for name
in sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS:
252 if args.greedy_length == -1:
256 while greedy_length <= 2048 * 4 * 4 * 4:
257 logger.info(
'Running %s with %d file actions and greedy_length %d', name,
258 len(sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name]), greedy_length)
260 sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name], greedy_length,
261 args.latency / 1000, args.bandwidth * 1e6, args.seek_rate * 1e6, args.read_rate * 1e6, args.realtime
263 if name
not in results_time:
264 results_time[name] = []
265 results_time[name].append((greedy_length, result))
266 if greedy_length == 0:
268 elif greedy_length == 1:
273 logger.info(
'Running %s with %d file actions and greedy_length %d', name,
274 len(sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name]), args.greedy_length)
276 sim_examples.EXAMPLE_FILE_POSITIONS_LENGTHS[name], args.greedy_length,
277 args.latency / 1000, args.bandwidth * 1e6, args.seek_rate * 1e6, args.read_rate * 1e6, args.realtime
279 if name
not in results_time:
280 results_time[name] = []
281 results_time[name].append((args.greedy_length, result))
282 for key
in results_time:
285 f
'{"greedy_length":>14} {"Time(ms)":>10}'
286 f
' {"Hits":>8} {"Miss":>8} {"Hits%":>8}'
287 f
' {"Min. Bytes":>12} {"Act. Bytes":>12} {"Act. / Min.":>12}'
288 f
' {"sizeof":>10} {"Overhead":>8} {"sizeof / Act.":>14}'
290 for greedy_length, result
in results_time[key]:
292 f
'{greedy_length:14} {result.time_exec * 1000 :10.1f} {result.has_hits:8d} {result.has_misses:8d}'
293 f
' {result.has_hits / (result.has_hits + result.has_misses):8.3%}'
294 f
' {result.minimal_bytes:12d} {result.num_bytes:12d} {result.num_bytes / result.minimal_bytes:12.3%}'
295 f
' {result.sizeof:10d} {result.sizeof - result.num_bytes:+8d} {result.sizeof / result.num_bytes:14.3%}'
297 print(f
'Execution time: {time.perf_counter() - time_start:10.3f} (s)')
301 if __name__ ==
'__main__':
RunResult run(self, typing.Tuple[typing.Tuple[int, int],...] seek_reads, int greedy_length)
def __init__(self, Communications comms, Server server)
None transmit(self, bytes data_bytes, str direction)
def __init__(self, float latency_one_way_s, float bandwidth_bps, bool realtime=False)
def __init__(self, float seek_rate_byte_per_s, float read_rate_byte_per_s, bool realtime=False)
bytes get(self, int file_position, int length)
RunResult run(typing.Tuple[typing.Tuple[int, int],...] events, int greedy_length, float latency_s, float bandwidth_bit_ps, float seek_rate_byte_per_s, float read_rate_byte_per_s, bool realtime)