WIP rawconvert has converted a (non-protected) fluxengine a2r to woz

This commit is contained in:
Jeff Epler 2022-04-02 09:21:53 -05:00
parent a4060ed7c3
commit 9b625ec118
No known key found for this signature in database
GPG key ID: D5BF15AB975AB4DE
4 changed files with 112 additions and 12 deletions

View file

@ -5,7 +5,7 @@
from passport import eddimage, wozardry, a2rimage
from passport.loggers import DefaultLogger, DebugLogger
from passport import Crack, Verify, Convert
from passport import Crack, Verify, Convert, RawConvert
from passport.strings import __date__, STRINGS
import argparse
import os.path
@ -61,6 +61,15 @@ class CommandConvert(BaseCommand):
BaseCommand.setup(self, subparser,
description="Convert a disk image to .woz format")
class CommandRawConvert(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "rawconvert")
self.processor = RawConvert
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Convert a disk image to .woz format with minimal processing")
class CommandCrack(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "crack")
@ -71,7 +80,7 @@ class CommandCrack(BaseCommand):
description="Convert a disk image to .dsk format")
if __name__ == "__main__":
cmds = [CommandVerify(), CommandConvert(), CommandCrack()]
cmds = [CommandVerify(), CommandConvert(), CommandRawConvert(), CommandCrack()]
parser = argparse.ArgumentParser(prog=__progname__,
description="""A multi-purpose tool for working with copy-protected Apple II disk images.

View file

@ -717,6 +717,59 @@ class Crack(Verify):
else:
self.g.logger.PrintByID("passcrack0")
class RawConvert(BasePassportProcessor):
def run(self):
self.g.logger.PrintByID("header")
self.g.logger.PrintByID("reading", {"filename":self.g.filename})
self.tracks = {}
# main loop - loop through disk from track $22 down to track $00
for logical_track_num in range(0x22, -1, -1):
self.g.track = logical_track_num # for display purposes only
self.g.logger.debug("Seeking to track %s" % hex(self.g.track))
for fractional_track in (0, .25, .5, .75):
physical_track_num = logical_track_num + fractional_track
track = self.g.disk_image.seek(physical_track_num)
if track and track.bits:
print("Track bits before", len(track.bits))
track.fix()
print("Track bits after", len(track.bits))
global track_
track_ = track
self.g.logger.debug("Writing to track %s + %.2f for %d bits" % (hex(self.g.track), fractional_track, len(track.bits)))
self.output_tracks[physical_track_num] = wozardry.Track(track.bits, len(track.bits))
return True
def postprocess(self):
source_base, source_ext = os.path.splitext(self.g.filename)
output_filename = source_base + '.woz'
self.g.logger.PrintByID("writing", {"filename":output_filename})
woz_image = wozardry.WozDiskImage()
json_string = self.g.disk_image.to_json()
woz_image.from_json(json_string)
j = json.loads(json_string)
root = [x for x in j.keys()].pop()
woz_image.info["creator"] = STRINGS["header"].strip()[:32]
woz_image.info["synchronized"] = j[root]["info"]["synchronized"]
woz_image.info["cleaned"] = True #self.g.found_and_cleaned_weakbits
woz_image.info["write_protected"] = self.g.protection_enforces_write_protected or j[root]["info"]["write_protected"]
woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime())
for q in range(1 + (0x23 * 4)):
physical_track_num = q / 4
if physical_track_num in self.output_tracks:
woz_image.add_track(physical_track_num, self.output_tracks[physical_track_num])
try:
wozardry.WozDiskImage(io.BytesIO(bytes(woz_image)))
except Exception as e:
raise Exception from e
with open(output_filename, 'wb') as f:
f.write(bytes(woz_image))
class Convert(BasePassportProcessor):
def preprocess(self):
self.burn = 2

View file

@ -9,7 +9,15 @@ class A2RImage:
def __init__(self, iostream):
self.tracks = collections.OrderedDict()
self.a2r_image = a2rchery.A2RReader(stream=iostream)
self.speed = 32
self._speed = 32
@property
def speed(self):
if self._speed is None:
fluxxen = flux_record["data"][1:]
speeds = [(len([1 for i in fluxxen[:8192] if i%t==0]), t) for t in range(0x1e,0x23)]
self._speed = speeds[-1][1]
return self._speed
def to_json(self):
return self.a2r_image.to_json()
@ -19,14 +27,14 @@ class A2RImage:
bits = bitarray.bitarray()
if not flux_record or flux_record["capture_type"] != a2rchery.kCaptureTiming:
return bits
print(flux_record['data_length'], flux_record['tick_count'])
tick_count = flux_record['tick_count']
fluxxen = flux_record["data"][1:]
if not self.speed:
speeds = [(len([1 for i in fluxxen[:8192] if i%t==0]), t) for t in range(0x1e,0x23)]
speeds.sort()
self.speed = speeds[-1][1]
speed = self.speed
flux_total = flux_start = -speed//2
rev_total = 0
for flux_value in fluxxen:
rev_total += flux_value
flux_total += flux_value
if flux_value == 0xFF:
continue
@ -34,6 +42,9 @@ class A2RImage:
bits.extend("0" * (flux_total // speed))
bits.extend("1")
flux_total = flux_start
# if rev_total > tick_count:
# print(f"bailing out at {rev_total}")
# break
return bits
def seek(self, track_num):
@ -50,8 +61,13 @@ class A2RImage:
# which is smarter but takes longer)
bits = bitarray.bitarray()
if location in self.a2r_image.flux:
bits = self.to_bits(self.a2r_image.flux[location][0])
self.tracks[location] = Track(bits, len(bits))
global flux_
flux_record = self.a2r_image.flux[location][0]
bits = self.to_bits(flux_record)
est_bit_len = round(flux_record['tick_count'] / self.speed)
else:
est_bit_len = None
self.tracks[location] = Track(bits, len(bits), est_bit_len)
return self.tracks[location]
def reseek(self, track_num):

View file

@ -144,13 +144,35 @@ def raise_if(cond, e, s=""):
if cond: raise e(s)
class Track:
def __init__(self, bits, bit_count):
def __init__(self, bits, bit_count, est_bit_len=None):
self.bits = bits
while len(self.bits) > bit_count:
self.bits.pop()
self.bit_count = bit_count
self.bit_index = 0
self.revolutions = 0
self.fixed = False
self.est_bit_len = est_bit_len
def fix(self, max_match_dist=8000, match_range=1000):
if self.fixed:
return
if not self.est_bit_len:
return
ref_range = self.bits[:match_range]
def goodness(i):
return sum(a == b for a, b in zip(ref_range, self.bits[i:i+match_range]))
if (wrap_point := self.bits.find(ref_range, self.est_bit_len - max_match_dist)) == -1:
wrap_point = max(range(self.est_bit_len - max_match_dist, self.est_bit_len + max_match_dist),
key=goodness)
del self.bits[wrap_point:]
while self.bit_index > wrap_point:
self.bit_index -= wrap_point
self.revolutions += 1
self.fixed = True
self.est_bit_len = wrap_point
def bit(self):
b = self.bits[self.bit_index] and 1 or 0
@ -522,8 +544,8 @@ class WozDiskImage:
compatible_hardware_raw = to_uint16(compatible_hardware_bitfield)
required_ram_raw = to_uint16(self.info["required_ram"])
if self.tracks:
largest_bit_count = max([track.bit_count for track in self.tracks])
largest_block_count = (((largest_bit_count+7)//8)+511)//512
largest_bit_len = max([track.bit_count for track in self.tracks])
largest_block_count = (((largest_bit_len+7)//8)+511)//512
else:
largest_block_count = 0
largest_track_raw = to_uint16(largest_block_count)