Nice! I asked ChatGPT to build a python version of that script and it did. Of course it crashed on first-run, but with a couple back-and-forths with it, I now have a working python version.
Since I invested so little personally, I don't mind sharing here. Note, I have only done rudimentary testing, so I'm not sure how it will handle various edge cases -- YMMV. Feel free to use/update/share.
****************************************
#! /usr/bin/env python3
from __future__ import annotations
import gzip
import io
import math
import re
import struct
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, BinaryIO, Sequence
import numpy as np
import pandas as pd
MAGIC_JMP = bytes([
0xFF, 0xFF, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00,
0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00,
0x02, 0x00,
])
GZIP_SECTION_START = bytes([0xEF, 0xBE, 0xFE, 0xCA])
JMP_STARTDATE = datetime(1904, 1, 1)
ROWSTATE_MARKERS = [
'•', '+', 'X', '□', '◊', '△', 'Y', 'Z', '◯', '▭', '▯', '*', '⚫', '▬', '▮', '■',
'⧫', '▽', '◁', '▷', '▲', '▼', '◀', '▶', '∧', '∨', '<', '>', '∣', '─', '/', '\\',
]
ROWSTATE_COLORS = [
'#000000', '#555555', '#787878', '#C0C0C0', '#FFFFFF', '#A00922', '#C91629', '#F03246',
'#FF5C76', '#FF98A6', '#904700', '#BC5B03', '#E57406', '#FF9138', '#FFB17D', '#706F00',
'#AFA502', '#DAD109', '#F0E521', '#FFF977', '#516A00', '#729400', '#90BF04', '#A2DC06',
'#C1FF3D', '#00670C', '#11981B', '#21BC2D', '#23E72E', '#6AFF6B', '#007254', '#019970',
'#04C791', '#06E3AA', '#0FFFBC', '#006D71', '#01989C', '#0CBCBC', '#06E2E3', '#67FFF7',
'#00638F', '#0380B4', '#05A1D2', '#08C5F7', '#75E4FF', '#034AB0', '#0557D6', '#2867FD',
'#4E9CFF', '#7E89FF', '#6A02A7', '#8C05CF', '#AB08FC', '#C170FF', '#D29AFF', '#930195',
'#B803B9', '#DC06E1', '#FA50FF', '#FD96FF', '#9D0170', '#C5048D', '#E906A4', '#FF49BC',
'#FF8CCD',
]
DATE_CODES = {
(0x67, 0x65), (0x6F, 0x65), (0x72, 0x65), (0x72, 0x6F), (0x72, 0x7F),
(0x72, 0x80), (0x7F, 0x72), (0x88, 0x65), (0x88, 0x7A),
}
DATE_CODES_SAME = {0x65, 0x66, 0x67, 0x6E, 0x6F, 0x70, 0x71, 0x72, 0x75, 0x76, 0x7A, 0x7F, 0x88, 0x8B}
DATETIME_DT5 = {0x69, 0x6A, 0x73, 0x74, 0x77, 0x78, 0x7E, 0x81}
DATETIME_DT4 = {0x69, 0x6A, 0x6C, 0x6D, 0x73, 0x74, 0x77, 0x78, 0x79, 0x7B, 0x7C, 0x7D, 0x7E, 0x80, 0x81, 0x82, 0x86, 0x87, 0x89, 0x8A}
DATETIME_CODES = {(0x77, 0x80), (0x77, 0x7F), (0x89, 0x65)}
DURATION_CODES = {(0x84, 0x79)}
DURATION_SAME = {0x0C, 0x6B, 0x6C, 0x6D, 0x83, 0x84, 0x85}
NUMERIC_SAME = {0x00, 0x03, 0x42, 0x43, 0x44, 0x59, 0x60, 0x63}
@dataclass
class Column:
names: list[str]
widths: list[int]
offsets: list[int]
@dataclass
class Info:
version: str
buildstring: str
savetime: datetime | None
nrows: int
ncols: int
column: Column
@dataclass
class RowState:
marker: str
color: tuple[int, int, int]
class JMPReaderError(Exception):
pass
class _MarkedReader:
def __init__(self, fh: BinaryIO):
self.fh = fh
self._mark: int | None = None
def tell(self) -> int:
return self.fh.tell()
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
return self.fh.seek(offset, whence)
def read(self, n: int = -1) -> bytes:
return self.fh.read(n)
def skip(self, n: int) -> None:
self.fh.seek(n, io.SEEK_CUR)
def peek1(self) -> int:
pos = self.tell()
b = self.read(1)
self.seek(pos)
if not b:
raise EOFError("Unexpected EOF")
return b[0]
def mark(self) -> None:
self._mark = self.tell()
def reset(self) -> None:
if self._mark is None:
raise JMPReaderError("No mark() has been set")
self.seek(self._mark)
def seek_end(self) -> int:
return self.seek(0, io.SEEK_END)
# -----------------------------
# low-level helpers
# -----------------------------
def _read_exact(r: _MarkedReader, n: int) -> bytes:
data = r.read(n)
if len(data) != n:
raise EOFError(f"Expected {n} bytes, got {len(data)}")
return data
def _read_fmt(r: _MarkedReader, fmt: str):
size = struct.calcsize(fmt)
return struct.unpack(fmt, _read_exact(r, size))[0]
def _read_string(r: _MarkedReader, width: int) -> str:
if width == 1:
n = _read_fmt(r, '<b')
elif width == 2:
n = _read_fmt(r, '<h')
elif width == 4:
n = _read_fmt(r, '<i')
else:
raise ValueError(f"Invalid string width {width}")
if n < 0:
raise JMPReaderError(f"Negative string length {n}")
return _read_exact(r, n).decode('utf-8', errors='replace')
def _read_array(r: _MarkedReader, dtype: np.dtype, count: int) -> np.ndarray:
return np.frombuffer(_read_exact(r, np.dtype(dtype).itemsize * count), dtype=dtype, count=count).copy()
def _read_until(r: _MarkedReader, needle: bytes) -> bytes:
out = bytearray()
n = len(needle)
while True:
b = r.read(1)
if not b:
raise EOFError(f"Could not find byte sequence {needle!r}")
out += b
if out[-n:] == needle:
return bytes(out)
def _to_datetime_scalar(value: float | None) -> datetime | None:
if value is None or (isinstance(value, float) and math.isnan(value)):
return None
return JMP_STARTDATE + timedelta(seconds=float(value))
def _normalize_sentinel(values: np.ndarray) -> np.ndarray:
if values.dtype == np.float64:
if np.isnan(values).any():
return np.array([None if np.isnan(x) else float(x) for x in values], dtype=object)
return values.astype(object)
info = np.iinfo(values.dtype)
sentinel = info.min + 1
if np.any(values == sentinel):
return np.array([None if int(x) == sentinel else int(x) for x in values], dtype=object)
return values.astype(object)
def _rstrip_null_bytes(data: bytes) -> bytes:
return data.rstrip(b'\x00')
def _to_fixed_strings(buffer: bytes, nrows: int, width: int) -> list[str]:
out: list[str] = []
for i in range(nrows):
chunk = buffer[i * width:(i + 1) * width]
out.append(_rstrip_null_bytes(chunk).decode('utf-8', errors='replace'))
return out
def _to_variable_strings(buffer: bytes, widths: Sequence[int]) -> list[str]:
out: list[str] = []
pos = 0
buflen = len(buffer)
for width in widths:
width = int(width)
if width < 0:
raise JMPReaderError(f'Negative string width {width}')
end = pos + width
if end > buflen:
# Clamp instead of reading past the payload. This avoids contamination
# from adjacent bytes when one width table entry is slightly wrong.
end = buflen
text = buffer[pos:end].decode('utf-8', errors='replace')
out.append(text)
pos += width
return out
def _find_names(names: Sequence[str], rules: Sequence[Any]) -> list[int]:
out: list[int] = []
for rule in rules:
if isinstance(rule, int):
out.append(rule)
elif isinstance(rule, range):
out.extend(list(rule))
elif isinstance(rule, slice):
start = 1 if rule.start is None else rule.start
stop = len(names) if rule.stop is None else rule.stop
step = 1 if rule.step is None else rule.step
out.extend(list(range(start, stop + (1 if step > 0 else -1), step)))
elif isinstance(rule, str):
if rule.startswith('re:'):
pat = re.compile(rule[3:])
out.extend(i + 1 for i, name in enumerate(names) if pat.search(name))
else:
try:
out.append(names.index(rule) + 1)
except ValueError:
pass
elif isinstance(rule, re.Pattern):
out.extend(i + 1 for i, name in enumerate(names) if rule.search(name))
return out
def _filter_columns(names: Sequence[str], select: Sequence[Any] | None, drop: Sequence[Any] | None) -> list[int]:
cols = set(range(1, len(names) + 1))
if select is not None:
cols &= set(_find_names(names, select))
if drop is not None:
cols -= set(_find_names(names, drop))
return sorted(cols)
def _bitcat(a: int, b: int) -> int:
return ((a & 0xFF) << 8) | (b & 0xFF)
def _hex_to_rgb(s: str) -> tuple[int, int, int]:
s = s.lstrip('#')
return (int(s[0:2], 16), int(s[2:4], 16), int(s[4:6], 16))
def _decode_numeric_block(a: bytes, width: int, nrows: int, kind: str) -> np.ndarray:
dtype = {1: np.int8, 2: np.int16, 4: np.int32}.get(width, np.float64)
data = np.frombuffer(a[-width * nrows:], dtype=dtype, count=nrows).copy()
data = _normalize_sentinel(data)
if kind == 'plain':
return data
if kind == 'date':
return np.array([None if x is None else _to_datetime_scalar(float(x)).date() for x in data], dtype=object)
if kind == 'datetime':
return np.array([None if x is None else _to_datetime_scalar(float(x)) for x in data], dtype=object)
if kind == 'time':
def conv(x: Any):
if x is None:
return None
dt = _to_datetime_scalar(float(x))
return dt.time().replace(microsecond=0)
return np.array([conv(x) for x in data], dtype=object)
if kind == 'duration':
return np.array([None if x is None else timedelta(seconds=float(x)) for x in data], dtype=object)
raise ValueError(kind)
# -----------------------------
# metadata parsing
# -----------------------------
def check_magic(fh: BinaryIO) -> bool:
fh.seek(0)
return fh.read(len(MAGIC_JMP)) == MAGIC_JMP
def seek_to_column_data_offsets(r: _MarkedReader, ncols: int) -> tuple[int, int]:
r.seek(0)
r.skip(2)
while True:
_read_until(r, b'\xff\xff')
while True:
try:
b = r.peek1()
except EOFError as exc:
raise JMPReaderError('Could not find column offset data') from exc
if b != 0xFF:
break
r.skip(1)
r.skip(10)
n_visible = _read_fmt(r, '<I')
n_hidden = _read_fmt(r, '<I')
r.skip(8)
if n_visible + n_hidden == ncols:
return int(n_visible), int(n_hidden)
r.skip(-18)
def _validate_column_info_candidate(r: _MarkedReader, candidate_pos: int, ncols: int) -> tuple[list[str], list[int]] | None:
cur = r.tell()
try:
r.seek(candidate_pos)
ncols2 = _read_fmt(r, '<i')
if ncols2 != ncols:
return None
offsets = _read_array(r, np.int64, ncols).astype(np.int64).tolist()
filesize = r.seek_end()
if not offsets:
return None
if any(off < 0 or off >= filesize for off in offsets):
return None
if offsets != sorted(offsets):
return None
names: list[str] = []
for off in offsets:
r.seek(off)
n = _read_fmt(r, '<h')
if n < 0 or n > 1024 or off + 2 + n > filesize:
return None
raw = _read_exact(r, n)
s = raw.decode('utf-8', errors='replace')
if not s:
return None
names.append(s)
return names, offsets
except Exception:
return None
finally:
r.seek(cur)
def column_info(r: _MarkedReader, ncols: int) -> tuple[list[str], list[int]]:
start = r.tell()
while True:
two = _read_exact(r, 2)
if two in {b'\xfc\xff', b'\xfd\xff', b'\xfe\xff', b'\xff\xff'}:
n = _read_fmt(r, '<q')
if n < 0:
raise JMPReaderError(f'Negative skip length while parsing column info: {n}')
r.skip(n)
else:
r.skip(-2)
break
direct_pos = r.tell()
out = _validate_column_info_candidate(r, direct_pos, ncols)
if out is not None:
return out
# The Julia reader notes this area is hacky. Some JMP files appear to include
# extra marker bytes before the actual Int32 ncols field. Fall back to a local
# scan for a plausible column-info block.
scan_origin = max(0, start - 16)
scan_limit = min(r.seek_end(), direct_pos + 8192)
r.seek(scan_origin)
blob = _read_exact(r, scan_limit - scan_origin)
needle = struct.pack('<i', ncols)
for rel in range(0, max(0, len(blob) - 4) + 1):
if blob[rel:rel+4] != needle:
continue
candidate_pos = scan_origin + rel
out = _validate_column_info_candidate(r, candidate_pos, ncols)
if out is not None:
return out
r.seek(direct_pos)
ncols2 = _read_fmt(r, '<i')
raise JMPReaderError(
f'Number of columns read from two locations do not match: {ncols} vs {ncols2}. '
'Also failed to recover by scanning for a plausible column-info block.'
)
def metadata(fh: BinaryIO) -> Info:
r = _MarkedReader(fh)
r.seek(0)
_read_until(r, bytes([0x07, 0x00, 0x08, 0x00, 0x00, 0x00]))
r.seek(r.tell() - 38)
nrows = int(_read_fmt(r, '<q'))
ncols = int(_read_fmt(r, '<i'))
_ = _read_array(r, np.int16, 5)
_charset = _read_string(r, 4).rstrip('\x00')
_ = _read_array(r, np.uint16, 3)
savetime = _to_datetime_scalar(float(_read_fmt(r, '<d')))
_ = _read_fmt(r, '<H')
buildstring = _read_string(r, 4)
m = re.search(r'Version (?P<version>.*)$', buildstring)
if not m:
raise JMPReaderError('Could not determine JMP version')
version = m.group('version')
n_visible, n_hidden = seek_to_column_data_offsets(r, ncols)
_ = _read_array(r, np.uint32, n_visible)
_ = _read_array(r, np.uint32, n_hidden)
colwidths = _read_array(r, np.uint16, ncols).astype(np.uint16).tolist()
_ = _read_array(r, np.uint32, 7)
colnames, coloffsets = column_info(r, ncols)
return Info(version, buildstring, savetime, nrows, ncols, Column(colnames, colwidths, coloffsets))
# -----------------------------
# column parsing
# -----------------------------
def _read_compressed_payload(r: _MarkedReader | None) -> bytes:
_read_until(r, GZIP_SECTION_START)
gziplen = _read_fmt(r, '<Q')
gunziplen = _read_fmt(r, '<Q')
raw = _read_exact(r, gziplen)
try:
return gzip.decompress(raw)
except Exception as exc:
raise JMPReaderError(f'Failed to gunzip compressed column ({gunziplen} bytes expected)') from exc
def _read_uncompressed_column_bytes(r: _MarkedReader, col_offset: int, next_col_offset: int | None) -> bytes:
end = next_col_offset
if end is None:
cur = r.tell()
end = r.seek_end()
r.seek(cur)
r.seek(col_offset)
return _read_exact(r, end - r.tell())
def column_data(fh: BinaryIO, info: Info, i: int) -> Any:
if not 1 <= i <= info.ncols:
raise IndexError(f'requested column {i} is out of bounds')
r = _MarkedReader(fh)
col_offset = info.column.offsets[i - 1]
next_col_offset = info.column.offsets[i] if i < info.ncols else None
r.seek(col_offset)
_name = _read_string(r, 2)
dt1, dt2, dt3, dt4, dt5, dt6 = struct.unpack('<6B', _read_exact(r, 6))
r.mark()
if dt1 in {0x09, 0x0A}:
a = _read_compressed_payload(r)
else:
a = _read_uncompressed_column_bytes(r, col_offset, next_col_offset)
r.reset()
if dt1 in {0x01, 0x0A}:
if (dt4 == dt5 and dt4 in NUMERIC_SAME) or dt5 in {0x5E, 0x63}:
return _decode_numeric_block(a, dt6, info.nrows, 'plain')
if dt4 == dt5 and dt4 in {0x5F, 0x54, 0x55, 0x56, 0x51, 0x52, 0x53}:
return _decode_numeric_block(a, dt6, info.nrows, 'plain')
if (dt4 == dt5 and dt4 in DATE_CODES_SAME) or (dt4, dt5) in DATE_CODES:
return _decode_numeric_block(a, dt6, info.nrows, 'date')
if ((dt5 in DATETIME_DT5 and dt4 in DATETIME_DT4) or
(dt4 == dt5 and dt4 in {0x79, 0x7D}) or
((dt4, dt5) in DATETIME_CODES)):
return _decode_numeric_block(a, dt6, info.nrows, 'datetime')
if dt4 == dt5 and dt4 == 0x82:
return _decode_numeric_block(a, dt6, info.nrows, 'time')
if (dt4 == dt5 and dt4 in DURATION_SAME) or ((dt4, dt5) in DURATION_CODES):
return _decode_numeric_block(a, dt6, info.nrows, 'duration')
return _decode_numeric_block(a, dt6, info.nrows, 'plain')
if dt1 in {0xFF, 0xFE, 0xFC}:
return _decode_numeric_block(a, dt5, info.nrows, 'plain')
if dt1 == 0x09 and dt2 == 0x03:
width = dt5
out: list[RowState] = []
for row in range(info.nrows):
offset = width * row
markeridx = _bitcat(a[offset + 6], a[offset + 5])
if markeridx <= 0x001F:
marker = ROWSTATE_MARKERS[markeridx]
else:
marker = chr(markeridx)
if a[offset + 3] == 0xFF:
rgb = (a[offset + 2], a[offset + 1], a[offset + 0])
else:
rgb = _hex_to_rgb(ROWSTATE_COLORS[a[offset + 0]])
out.append(RowState(marker=marker, color=rgb))
return out
if dt1 == 0x03 and dt2 == 0x03:
return np.frombuffer(a[-dt5 * info.nrows:], dtype=np.int64, count=info.nrows).copy()
if dt1 in {0x02, 0x09} and dt2 in {0x01, 0x02}:
# fixed-width character
if ((dt3, dt4) == (0x00, 0x00) and dt5 > 0) or (0x01 <= dt3 <= 0x07 and dt4 == 0x00):
width = dt5
data = a[-info.nrows * width:]
return _to_fixed_strings(data, info.nrows, width)
# variable-width character
if (dt3, dt4, dt5) == (0x00, 0x00, 0x00):
if dt1 == 0x09:
total_len = struct.unpack('<q', a[:8])[0]
if total_len == len(a):
bio = io.BytesIO(a)
_reclen = struct.unpack('<q', bio.read(8))[0]
_foo = bio.read(9)
_foo2 = struct.unpack('<q', bio.read(8))[0]
width_bytes = struct.unpack('<b', bio.read(1))[0]
idx_dtype = {1: np.uint8, 2: np.uint16, 4: np.uint32}[width_bytes]
idx = np.frombuffer(bio.read(info.nrows * np.dtype(idx_dtype).itemsize), dtype=idx_dtype, count=info.nrows)
npools = int(idx.max()) if len(idx) else 0
width_bytes = struct.unpack('<b', bio.read(1))[0]
if width_bytes == 1:
len_fmt = '<B'
elif width_bytes == 2:
len_fmt = '<H'
elif width_bytes == 4:
len_fmt = '<I'
else:
raise JMPReaderError(f'Unsupported pooled string pool width: {width_bytes}')
pool: list[str] = []
for _ in range(npools):
n = struct.unpack(len_fmt, bio.read(struct.calcsize(len_fmt)))[0]
text = bio.read(n).decode('utf-8', errors='replace')
pool.append(text)
return ["" if int(j) == 0 else pool[int(j) - 1] for j in idx]
widthbytes = a[8]
if widthbytes == 1:
widths = np.frombuffer(a[13:13 + info.nrows], dtype=np.int8, count=info.nrows).astype(int)
data = a[13 + info.nrows:]
elif widthbytes == 2:
widths = np.frombuffer(a[13:13 + 2 * info.nrows], dtype=np.int16, count=info.nrows).astype(int)
data = a[13 + 2 * info.nrows:]
else:
raise JMPReaderError(f'Unknown widthbytes={widthbytes} in compressed char column {i}')
return _to_variable_strings(data, widths)
else:
r.reset()
r.skip(6)
n1 = _read_fmt(r, '<q')
r.skip(n1)
r.skip(2)
n2 = _read_fmt(r, '<I')
r.skip(n2 + 8)
widthbytes = _read_fmt(r, '<B')
_maxwidth = _read_fmt(r, '<I')
width_dtype = {1: np.int8, 2: np.int16, 4: np.int32}.get(widthbytes)
if width_dtype is None:
raise JMPReaderError(f'Unknown widthbytes={widthbytes} in uncompressed char column {i}')
widths = _read_array(r, width_dtype, info.nrows).astype(int)
if next_col_offset is None:
cur = r.tell()
colend = r.seek_end()
r.seek(cur)
else:
colend = next_col_offset
data_len = int(widths.sum())
data_offset = colend - data_len
r.seek(data_offset)
data = _read_exact(r, data_len)
return _to_variable_strings(data, widths)
return np.array([math.nan] * info.nrows)
# -----------------------------
# public API
# -----------------------------
def readjmp(path: str | Path, select: Sequence[Any] | None = None, drop: Sequence[Any] | None = None) -> pd.DataFrame:
"""
Read a JMP .jmp table into a pandas DataFrame.
Parameters
----------
path:
Path to the .jmp file.
select / drop:
Optional column filters. Elements may be 1-based integers, ranges,
exact strings, compiled regex patterns, or strings of the form
"re:<pattern>".
"""
path = Path(path)
if not path.is_file():
raise FileNotFoundError(path)
with path.open('rb') as fh:
if not check_magic(fh):
raise JMPReaderError(f'{path} does not appear to be a valid .jmp file')
info = metadata(fh)
colinds = _filter_columns(info.column.names, select, drop)
data: dict[str, Any] = {}
for i in colinds:
with path.open('rb') as fh:
data[info.column.names[i - 1]] = column_data(fh, info, i)
df = pd.DataFrame(data)
df = df[[info.column.names[i - 1] for i in colinds]]
return df
if __name__ == '__main__':
import argparse
# import json
parser = argparse.ArgumentParser(description='Read a JMP .jmp file into TSV/JSON using a pure-Python reader.')
parser.add_argument('jmp_file', help='Path to the JMP .jmp file')
parser.add_argument('-o', '--output', help='Write TSV to this file. Defaults to stdout preview.')
parser.add_argument('--json', action='store_true', help='Write JSON instead of TSV/plain-text output')
parser.add_argument('--select', nargs='*', default=None, help='Columns to include (names or 1-based indices)')
parser.add_argument('--drop', nargs='*', default=None, help='Columns to exclude (names or 1-based indices)')
args = parser.parse_args()
def coerce_rules(values: list[str] | None) -> list[Any] | None:
if values is None:
return None
out: list[Any] = []
for v in values:
try:
out.append(int(v))
except ValueError:
out.append(v)
return out
df = readjmp(args.jmp_file, select=coerce_rules(args.select), drop=coerce_rules(args.drop))
if args.output:
if args.json:
Path(args.output).write_text(df.to_json(orient='records', default_handler=str, force_ascii=False, indent=2),encoding='utf-8')
else:
df.to_csv(args.output, sep='\t', index=False)
else:
if args.json:
print(df.to_json(orient='records', default_handler=str, force_ascii=False, indent=2))
else:
with pd.option_context('display.max_columns', None, 'display.width', 200):
print(df.to_string(index=False))