| from fontTools.misc.roundTools import noRound, otRound |
| from fontTools.misc.intTools import bit_count |
| from fontTools.ttLib.tables import otTables as ot |
| from fontTools.varLib.models import supportScalar |
| from fontTools.varLib.builder import ( |
| buildVarRegionList, |
| buildVarStore, |
| buildVarRegion, |
| buildVarData, |
| ) |
| from functools import partial |
| from collections import defaultdict |
| from heapq import heappush, heappop |
| |
| |
| NO_VARIATION_INDEX = ot.NO_VARIATION_INDEX |
| ot.VarStore.NO_VARIATION_INDEX = NO_VARIATION_INDEX |
| |
| |
| def _getLocationKey(loc): |
| return tuple(sorted(loc.items(), key=lambda kv: kv[0])) |
| |
| |
| class OnlineVarStoreBuilder(object): |
| def __init__(self, axisTags): |
| self._axisTags = axisTags |
| self._regionMap = {} |
| self._regionList = buildVarRegionList([], axisTags) |
| self._store = buildVarStore(self._regionList, []) |
| self._data = None |
| self._model = None |
| self._supports = None |
| self._varDataIndices = {} |
| self._varDataCaches = {} |
| self._cache = {} |
| |
| def setModel(self, model): |
| self.setSupports(model.supports) |
| self._model = model |
| |
| def setSupports(self, supports): |
| self._model = None |
| self._supports = list(supports) |
| if not self._supports[0]: |
| del self._supports[0] # Drop base master support |
| self._cache = {} |
| self._data = None |
| |
| def finish(self, optimize=True): |
| self._regionList.RegionCount = len(self._regionList.Region) |
| self._store.VarDataCount = len(self._store.VarData) |
| for data in self._store.VarData: |
| data.ItemCount = len(data.Item) |
| data.calculateNumShorts(optimize=optimize) |
| return self._store |
| |
| def _add_VarData(self): |
| regionMap = self._regionMap |
| regionList = self._regionList |
| |
| regions = self._supports |
| regionIndices = [] |
| for region in regions: |
| key = _getLocationKey(region) |
| idx = regionMap.get(key) |
| if idx is None: |
| varRegion = buildVarRegion(region, self._axisTags) |
| idx = regionMap[key] = len(regionList.Region) |
| regionList.Region.append(varRegion) |
| regionIndices.append(idx) |
| |
| # Check if we have one already... |
| key = tuple(regionIndices) |
| varDataIdx = self._varDataIndices.get(key) |
| if varDataIdx is not None: |
| self._outer = varDataIdx |
| self._data = self._store.VarData[varDataIdx] |
| self._cache = self._varDataCaches[key] |
| if len(self._data.Item) == 0xFFFF: |
| # This is full. Need new one. |
| varDataIdx = None |
| |
| if varDataIdx is None: |
| self._data = buildVarData(regionIndices, [], optimize=False) |
| self._outer = len(self._store.VarData) |
| self._store.VarData.append(self._data) |
| self._varDataIndices[key] = self._outer |
| if key not in self._varDataCaches: |
| self._varDataCaches[key] = {} |
| self._cache = self._varDataCaches[key] |
| |
| def storeMasters(self, master_values, *, round=round): |
| deltas = self._model.getDeltas(master_values, round=round) |
| base = deltas.pop(0) |
| return base, self.storeDeltas(deltas, round=noRound) |
| |
| def storeDeltas(self, deltas, *, round=round): |
| deltas = [round(d) for d in deltas] |
| if len(deltas) == len(self._supports) + 1: |
| deltas = tuple(deltas[1:]) |
| else: |
| assert len(deltas) == len(self._supports) |
| deltas = tuple(deltas) |
| |
| varIdx = self._cache.get(deltas) |
| if varIdx is not None: |
| return varIdx |
| |
| if not self._data: |
| self._add_VarData() |
| inner = len(self._data.Item) |
| if inner == 0xFFFF: |
| # Full array. Start new one. |
| self._add_VarData() |
| return self.storeDeltas(deltas) |
| self._data.addItem(deltas, round=noRound) |
| |
| varIdx = (self._outer << 16) + inner |
| self._cache[deltas] = varIdx |
| return varIdx |
| |
| |
| def VarData_addItem(self, deltas, *, round=round): |
| deltas = [round(d) for d in deltas] |
| |
| countUs = self.VarRegionCount |
| countThem = len(deltas) |
| if countUs + 1 == countThem: |
| deltas = list(deltas[1:]) |
| else: |
| assert countUs == countThem, (countUs, countThem) |
| deltas = list(deltas) |
| self.Item.append(deltas) |
| self.ItemCount = len(self.Item) |
| |
| |
| ot.VarData.addItem = VarData_addItem |
| |
| |
| def VarRegion_get_support(self, fvar_axes): |
| return { |
| fvar_axes[i].axisTag: (reg.StartCoord, reg.PeakCoord, reg.EndCoord) |
| for i, reg in enumerate(self.VarRegionAxis) |
| if reg.PeakCoord != 0 |
| } |
| |
| |
| ot.VarRegion.get_support = VarRegion_get_support |
| |
| |
| def VarStore___bool__(self): |
| return bool(self.VarData) |
| |
| |
| ot.VarStore.__bool__ = VarStore___bool__ |
| |
| |
| class VarStoreInstancer(object): |
| def __init__(self, varstore, fvar_axes, location={}): |
| self.fvar_axes = fvar_axes |
| assert varstore is None or varstore.Format == 1 |
| self._varData = varstore.VarData if varstore else [] |
| self._regions = varstore.VarRegionList.Region if varstore else [] |
| self.setLocation(location) |
| |
| def setLocation(self, location): |
| self.location = dict(location) |
| self._clearCaches() |
| |
| def _clearCaches(self): |
| self._scalars = {} |
| |
| def _getScalar(self, regionIdx): |
| scalar = self._scalars.get(regionIdx) |
| if scalar is None: |
| support = self._regions[regionIdx].get_support(self.fvar_axes) |
| scalar = supportScalar(self.location, support) |
| self._scalars[regionIdx] = scalar |
| return scalar |
| |
| @staticmethod |
| def interpolateFromDeltasAndScalars(deltas, scalars): |
| delta = 0.0 |
| for d, s in zip(deltas, scalars): |
| if not s: |
| continue |
| delta += d * s |
| return delta |
| |
| def __getitem__(self, varidx): |
| major, minor = varidx >> 16, varidx & 0xFFFF |
| if varidx == NO_VARIATION_INDEX: |
| return 0.0 |
| varData = self._varData |
| scalars = [self._getScalar(ri) for ri in varData[major].VarRegionIndex] |
| deltas = varData[major].Item[minor] |
| return self.interpolateFromDeltasAndScalars(deltas, scalars) |
| |
| def interpolateFromDeltas(self, varDataIndex, deltas): |
| varData = self._varData |
| scalars = [self._getScalar(ri) for ri in varData[varDataIndex].VarRegionIndex] |
| return self.interpolateFromDeltasAndScalars(deltas, scalars) |
| |
| |
| # |
| # Optimizations |
| # |
| # retainFirstMap - If true, major 0 mappings are retained. Deltas for unused indices are zeroed |
| # advIdxes - Set of major 0 indices for advance deltas to be listed first. Other major 0 indices follow. |
| |
| |
| def VarStore_subset_varidxes( |
| self, varIdxes, optimize=True, retainFirstMap=False, advIdxes=set() |
| ): |
| # Sort out used varIdxes by major/minor. |
| used = {} |
| for varIdx in varIdxes: |
| if varIdx == NO_VARIATION_INDEX: |
| continue |
| major = varIdx >> 16 |
| minor = varIdx & 0xFFFF |
| d = used.get(major) |
| if d is None: |
| d = used[major] = set() |
| d.add(minor) |
| del varIdxes |
| |
| # |
| # Subset VarData |
| # |
| |
| varData = self.VarData |
| newVarData = [] |
| varDataMap = {NO_VARIATION_INDEX: NO_VARIATION_INDEX} |
| for major, data in enumerate(varData): |
| usedMinors = used.get(major) |
| if usedMinors is None: |
| continue |
| newMajor = len(newVarData) |
| newVarData.append(data) |
| |
| items = data.Item |
| newItems = [] |
| if major == 0 and retainFirstMap: |
| for minor in range(len(items)): |
| newItems.append( |
| items[minor] if minor in usedMinors else [0] * len(items[minor]) |
| ) |
| varDataMap[minor] = minor |
| else: |
| if major == 0: |
| minors = sorted(advIdxes) + sorted(usedMinors - advIdxes) |
| else: |
| minors = sorted(usedMinors) |
| for minor in minors: |
| newMinor = len(newItems) |
| newItems.append(items[minor]) |
| varDataMap[(major << 16) + minor] = (newMajor << 16) + newMinor |
| |
| data.Item = newItems |
| data.ItemCount = len(data.Item) |
| |
| data.calculateNumShorts(optimize=optimize) |
| |
| self.VarData = newVarData |
| self.VarDataCount = len(self.VarData) |
| |
| self.prune_regions() |
| |
| return varDataMap |
| |
| |
| ot.VarStore.subset_varidxes = VarStore_subset_varidxes |
| |
| |
| def VarStore_prune_regions(self): |
| """Remove unused VarRegions.""" |
| # |
| # Subset VarRegionList |
| # |
| |
| # Collect. |
| usedRegions = set() |
| for data in self.VarData: |
| usedRegions.update(data.VarRegionIndex) |
| # Subset. |
| regionList = self.VarRegionList |
| regions = regionList.Region |
| newRegions = [] |
| regionMap = {} |
| for i in sorted(usedRegions): |
| regionMap[i] = len(newRegions) |
| newRegions.append(regions[i]) |
| regionList.Region = newRegions |
| regionList.RegionCount = len(regionList.Region) |
| # Map. |
| for data in self.VarData: |
| data.VarRegionIndex = [regionMap[i] for i in data.VarRegionIndex] |
| |
| |
| ot.VarStore.prune_regions = VarStore_prune_regions |
| |
| |
| def _visit(self, func): |
| """Recurse down from self, if type of an object is ot.Device, |
| call func() on it. Works on otData-style classes.""" |
| |
| if type(self) == ot.Device: |
| func(self) |
| |
| elif isinstance(self, list): |
| for that in self: |
| _visit(that, func) |
| |
| elif hasattr(self, "getConverters") and not hasattr(self, "postRead"): |
| for conv in self.getConverters(): |
| that = getattr(self, conv.name, None) |
| if that is not None: |
| _visit(that, func) |
| |
| elif isinstance(self, ot.ValueRecord): |
| for that in self.__dict__.values(): |
| _visit(that, func) |
| |
| |
| def _Device_recordVarIdx(self, s): |
| """Add VarIdx in this Device table (if any) to the set s.""" |
| if self.DeltaFormat == 0x8000: |
| s.add((self.StartSize << 16) + self.EndSize) |
| |
| |
| def Object_collect_device_varidxes(self, varidxes): |
| adder = partial(_Device_recordVarIdx, s=varidxes) |
| _visit(self, adder) |
| |
| |
| ot.GDEF.collect_device_varidxes = Object_collect_device_varidxes |
| ot.GPOS.collect_device_varidxes = Object_collect_device_varidxes |
| |
| |
| def _Device_mapVarIdx(self, mapping, done): |
| """Map VarIdx in this Device table (if any) through mapping.""" |
| if id(self) in done: |
| return |
| done.add(id(self)) |
| if self.DeltaFormat == 0x8000: |
| varIdx = mapping[(self.StartSize << 16) + self.EndSize] |
| self.StartSize = varIdx >> 16 |
| self.EndSize = varIdx & 0xFFFF |
| |
| |
| def Object_remap_device_varidxes(self, varidxes_map): |
| mapper = partial(_Device_mapVarIdx, mapping=varidxes_map, done=set()) |
| _visit(self, mapper) |
| |
| |
| ot.GDEF.remap_device_varidxes = Object_remap_device_varidxes |
| ot.GPOS.remap_device_varidxes = Object_remap_device_varidxes |
| |
| |
| class _Encoding(object): |
| def __init__(self, chars): |
| self.chars = chars |
| self.width = bit_count(chars) |
| self.columns = self._columns(chars) |
| self.overhead = self._characteristic_overhead(self.columns) |
| self.items = set() |
| |
| def append(self, row): |
| self.items.add(row) |
| |
| def extend(self, lst): |
| self.items.update(lst) |
| |
| def get_room(self): |
| """Maximum number of bytes that can be added to characteristic |
| while still being beneficial to merge it into another one.""" |
| count = len(self.items) |
| return max(0, (self.overhead - 1) // count - self.width) |
| |
| room = property(get_room) |
| |
| def get_gain(self): |
| """Maximum possible byte gain from merging this into another |
| characteristic.""" |
| count = len(self.items) |
| return max(0, self.overhead - count) |
| |
| gain = property(get_gain) |
| |
| def gain_sort_key(self): |
| return self.gain, self.chars |
| |
| def width_sort_key(self): |
| return self.width, self.chars |
| |
| @staticmethod |
| def _characteristic_overhead(columns): |
| """Returns overhead in bytes of encoding this characteristic |
| as a VarData.""" |
| c = 4 + 6 # 4 bytes for LOffset, 6 bytes for VarData header |
| c += bit_count(columns) * 2 |
| return c |
| |
| @staticmethod |
| def _columns(chars): |
| cols = 0 |
| i = 1 |
| while chars: |
| if chars & 0b1111: |
| cols |= i |
| chars >>= 4 |
| i <<= 1 |
| return cols |
| |
| def gain_from_merging(self, other_encoding): |
| combined_chars = other_encoding.chars | self.chars |
| combined_width = bit_count(combined_chars) |
| combined_columns = self.columns | other_encoding.columns |
| combined_overhead = _Encoding._characteristic_overhead(combined_columns) |
| combined_gain = ( |
| +self.overhead |
| + other_encoding.overhead |
| - combined_overhead |
| - (combined_width - self.width) * len(self.items) |
| - (combined_width - other_encoding.width) * len(other_encoding.items) |
| ) |
| return combined_gain |
| |
| |
| class _EncodingDict(dict): |
| def __missing__(self, chars): |
| r = self[chars] = _Encoding(chars) |
| return r |
| |
| def add_row(self, row): |
| chars = self._row_characteristics(row) |
| self[chars].append(row) |
| |
| @staticmethod |
| def _row_characteristics(row): |
| """Returns encoding characteristics for a row.""" |
| longWords = False |
| |
| chars = 0 |
| i = 1 |
| for v in row: |
| if v: |
| chars += i |
| if not (-128 <= v <= 127): |
| chars += i * 0b0010 |
| if not (-32768 <= v <= 32767): |
| longWords = True |
| break |
| i <<= 4 |
| |
| if longWords: |
| # Redo; only allow 2byte/4byte encoding |
| chars = 0 |
| i = 1 |
| for v in row: |
| if v: |
| chars += i * 0b0011 |
| if not (-32768 <= v <= 32767): |
| chars += i * 0b1100 |
| i <<= 4 |
| |
| return chars |
| |
| |
| def VarStore_optimize(self, use_NO_VARIATION_INDEX=True, quantization=1): |
| """Optimize storage. Returns mapping from old VarIdxes to new ones.""" |
| |
| # Overview: |
| # |
| # For each VarData row, we first extend it with zeroes to have |
| # one column per region in VarRegionList. We then group the |
| # rows into _Encoding objects, by their "characteristic" bitmap. |
| # The characteristic bitmap is a binary number representing how |
| # many bytes each column of the data takes up to encode. Each |
| # column is encoded in four bits. For example, if a column has |
| # only values in the range -128..127, it would only have a single |
| # bit set in the characteristic bitmap for that column. If it has |
| # values in the range -32768..32767, it would have two bits set. |
| # The number of ones in the characteristic bitmap is the "width" |
| # of the encoding. |
| # |
| # Each encoding as such has a number of "active" (ie. non-zero) |
| # columns. The overhead of encoding the characteristic bitmap |
| # is 10 bytes, plus 2 bytes per active column. |
| # |
| # When an encoding is merged into another one, if the characteristic |
| # of the old encoding is a subset of the new one, then the overhead |
| # of the old encoding is completely eliminated. However, each row |
| # now would require more bytes to encode, to the tune of one byte |
| # per characteristic bit that is active in the new encoding but not |
| # in the old one. The number of bits that can be added to an encoding |
| # while still beneficial to merge it into another encoding is called |
| # the "room" for that encoding. |
| # |
| # The "gain" of an encodings is the maximum number of bytes we can |
| # save by merging it into another encoding. The "gain" of merging |
| # two encodings is how many bytes we save by doing so. |
| # |
| # High-level algorithm: |
| # |
| # - Each encoding has a minimal way to encode it. However, because |
| # of the overhead of encoding the characteristic bitmap, it may |
| # be beneficial to merge two encodings together, if there is |
| # gain in doing so. As such, we need to search for the best |
| # such successive merges. |
| # |
| # Algorithm: |
| # |
| # - Put all encodings into a "todo" list. |
| # |
| # - Sort todo list by decreasing gain (for stability). |
| # |
| # - Make a priority-queue of the gain from combining each two |
| # encodings in the todo list. The priority queue is sorted by |
| # decreasing gain. Only positive gains are included. |
| # |
| # - While priority queue is not empty: |
| # - Pop the first item from the priority queue, |
| # - Merge the two encodings it represents, |
| # - Remove the two encodings from the todo list, |
| # - Insert positive gains from combining the new encoding with |
| # all existing todo list items into the priority queue, |
| # - If a todo list item with the same characteristic bitmap as |
| # the new encoding exists, remove it from the todo list and |
| # merge it into the new encoding. |
| # - Insert the new encoding into the todo list, |
| # |
| # - Encode all remaining items in the todo list. |
| # |
| # The output is then sorted for stability, in the following way: |
| # - The VarRegionList of the input is kept intact. |
| # - All encodings are sorted before the main algorithm, by |
| # gain_key_sort(), which is a tuple of the following items: |
| # * The gain of the encoding. |
| # * The characteristic bitmap of the encoding, with higher-numbered |
| # columns compared first. |
| # - The VarData is sorted by width_sort_key(), which is a tuple |
| # of the following items: |
| # * The "width" of the encoding. |
| # * The characteristic bitmap of the encoding, with higher-numbered |
| # columns compared first. |
| # - Within each VarData, the items are sorted as vectors of numbers. |
| # |
| # Finally, each VarData is optimized to remove the empty columns and |
| # reorder columns as needed. |
| |
| # TODO |
| # Check that no two VarRegions are the same; if they are, fold them. |
| |
| n = len(self.VarRegionList.Region) # Number of columns |
| zeroes = [0] * n |
| |
| front_mapping = {} # Map from old VarIdxes to full row tuples |
| |
| encodings = _EncodingDict() |
| |
| # Collect all items into a set of full rows (with lots of zeroes.) |
| for major, data in enumerate(self.VarData): |
| regionIndices = data.VarRegionIndex |
| |
| for minor, item in enumerate(data.Item): |
| row = list(zeroes) |
| |
| if quantization == 1: |
| for regionIdx, v in zip(regionIndices, item): |
| row[regionIdx] += v |
| else: |
| for regionIdx, v in zip(regionIndices, item): |
| row[regionIdx] += ( |
| round(v / quantization) * quantization |
| ) # TODO https://github.com/fonttools/fonttools/pull/3126#discussion_r1205439785 |
| |
| row = tuple(row) |
| |
| if use_NO_VARIATION_INDEX and not any(row): |
| front_mapping[(major << 16) + minor] = None |
| continue |
| |
| encodings.add_row(row) |
| front_mapping[(major << 16) + minor] = row |
| |
| # Prepare for the main algorithm. |
| todo = sorted(encodings.values(), key=_Encoding.gain_sort_key) |
| del encodings |
| |
| # Repeatedly pick two best encodings to combine, and combine them. |
| |
| heap = [] |
| for i, encoding in enumerate(todo): |
| for j in range(i + 1, len(todo)): |
| other_encoding = todo[j] |
| combining_gain = encoding.gain_from_merging(other_encoding) |
| if combining_gain > 0: |
| heappush(heap, (-combining_gain, i, j)) |
| |
| while heap: |
| _, i, j = heappop(heap) |
| if todo[i] is None or todo[j] is None: |
| continue |
| |
| encoding, other_encoding = todo[i], todo[j] |
| todo[i], todo[j] = None, None |
| |
| # Combine the two encodings |
| combined_chars = other_encoding.chars | encoding.chars |
| combined_encoding = _Encoding(combined_chars) |
| combined_encoding.extend(encoding.items) |
| combined_encoding.extend(other_encoding.items) |
| |
| for k, enc in enumerate(todo): |
| if enc is None: |
| continue |
| |
| # In the unlikely event that the same encoding exists already, |
| # combine it. |
| if enc.chars == combined_chars: |
| combined_encoding.extend(enc.items) |
| todo[k] = None |
| continue |
| |
| combining_gain = combined_encoding.gain_from_merging(enc) |
| if combining_gain > 0: |
| heappush(heap, (-combining_gain, k, len(todo))) |
| |
| todo.append(combined_encoding) |
| |
| encodings = [encoding for encoding in todo if encoding is not None] |
| |
| # Assemble final store. |
| back_mapping = {} # Mapping from full rows to new VarIdxes |
| encodings.sort(key=_Encoding.width_sort_key) |
| self.VarData = [] |
| for encoding in encodings: |
| items = sorted(encoding.items) |
| |
| while items: |
| major = len(self.VarData) |
| data = ot.VarData() |
| self.VarData.append(data) |
| data.VarRegionIndex = range(n) |
| data.VarRegionCount = len(data.VarRegionIndex) |
| |
| # Each major can only encode up to 0xFFFF entries. |
| data.Item, items = items[:0xFFFF], items[0xFFFF:] |
| |
| for minor, item in enumerate(data.Item): |
| back_mapping[item] = (major << 16) + minor |
| |
| # Compile final mapping. |
| varidx_map = {NO_VARIATION_INDEX: NO_VARIATION_INDEX} |
| for k, v in front_mapping.items(): |
| varidx_map[k] = back_mapping[v] if v is not None else NO_VARIATION_INDEX |
| |
| # Recalculate things and go home. |
| self.VarRegionList.RegionCount = len(self.VarRegionList.Region) |
| self.VarDataCount = len(self.VarData) |
| for data in self.VarData: |
| data.ItemCount = len(data.Item) |
| data.optimize() |
| |
| # Remove unused regions. |
| self.prune_regions() |
| |
| return varidx_map |
| |
| |
| ot.VarStore.optimize = VarStore_optimize |
| |
| |
| def main(args=None): |
| """Optimize a font's GDEF variation store""" |
| from argparse import ArgumentParser |
| from fontTools import configLogger |
| from fontTools.ttLib import TTFont |
| from fontTools.ttLib.tables.otBase import OTTableWriter |
| |
| parser = ArgumentParser(prog="varLib.varStore", description=main.__doc__) |
| parser.add_argument("--quantization", type=int, default=1) |
| parser.add_argument("fontfile") |
| parser.add_argument("outfile", nargs="?") |
| options = parser.parse_args(args) |
| |
| # TODO: allow user to configure logging via command-line options |
| configLogger(level="INFO") |
| |
| quantization = options.quantization |
| fontfile = options.fontfile |
| outfile = options.outfile |
| |
| font = TTFont(fontfile) |
| gdef = font["GDEF"] |
| store = gdef.table.VarStore |
| |
| writer = OTTableWriter() |
| store.compile(writer, font) |
| size = len(writer.getAllData()) |
| print("Before: %7d bytes" % size) |
| |
| varidx_map = store.optimize(quantization=quantization) |
| |
| writer = OTTableWriter() |
| store.compile(writer, font) |
| size = len(writer.getAllData()) |
| print("After: %7d bytes" % size) |
| |
| if outfile is not None: |
| gdef.table.remap_device_varidxes(varidx_map) |
| if "GPOS" in font: |
| font["GPOS"].table.remap_device_varidxes(varidx_map) |
| |
| font.save(outfile) |
| |
| |
| if __name__ == "__main__": |
| import sys |
| |
| if len(sys.argv) > 1: |
| sys.exit(main()) |
| import doctest |
| |
| sys.exit(doctest.testmod().failed) |