|  | { | 
|  | "metadata": { | 
|  | "language_info": { | 
|  | "codemirror_mode": { | 
|  | "name": "ipython", | 
|  | "version": 3 | 
|  | }, | 
|  | "file_extension": ".py", | 
|  | "mimetype": "text/x-python", | 
|  | "name": "python", | 
|  | "nbconvert_exporter": "python", | 
|  | "pygments_lexer": "ipython3", | 
|  | "version": "3.6.10" | 
|  | }, | 
|  | "orig_nbformat": 2, | 
|  | "kernelspec": { | 
|  | "name": "python3610jvsc74a57bd0eb5e09632d6ea1cbf3eb9da7e37b7cf581db5ed13074b21cc44e159dc62acdab", | 
|  | "display_name": "Python 3.6.10 64-bit ('dataloader': conda)" | 
|  | } | 
|  | }, | 
|  | "nbformat": 4, | 
|  | "nbformat_minor": 2, | 
|  | "cells": [ | 
|  | { | 
|  | "source": [ | 
|  | "## \\[RFC\\] How DataFrames (DF) and DataPipes (DP) work together" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 1, | 
|  | "metadata": {}, | 
|  | "outputs": [], | 
|  | "source": [ | 
|  | "from importlib import reload\n", | 
|  | "import torch\n", | 
|  | "reload(torch)\n", | 
|  | "from torch.utils.data import IterDataPipe" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 2, | 
|  | "metadata": {}, | 
|  | "outputs": [], | 
|  | "source": [ | 
|  | "# Example IterDataPipe\n", | 
|  | "class ExampleIterPipe(IterDataPipe):\n", | 
|  | "    def __init__(self, range = 20):\n", | 
|  | "        self.range = range\n", | 
|  | "    def __iter__(self):\n", | 
|  | "        for i in range(self.range):\n", | 
|  | "            yield i\n", | 
|  | "\n", | 
|  | "def get_dataframes_pipe(range = 10, dataframe_size = 7):\n", | 
|  | "    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))._to_dataframes_pipe(columns = ['i','j'], dataframe_size = dataframe_size)\n", | 
|  | "\n", | 
|  | "def get_regular_pipe(range = 10):\n", | 
|  | "    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))\n" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "Doesn't matter how DF composed internally, iterator over DF Pipe gives single rows to user. This is similar to regular DataPipe." | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 3, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "DataFrames Pipe\n(0, 0)\n(1, 1)\n(2, 2)\n(3, 0)\n(4, 1)\n(5, 2)\n(6, 0)\n(7, 1)\n(8, 2)\n(9, 0)\nRegular DataPipe\n(0, 0)\n(1, 1)\n(2, 2)\n(3, 0)\n(4, 1)\n(5, 2)\n(6, 0)\n(7, 1)\n(8, 2)\n(9, 0)\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "print('DataFrames Pipe')\n", | 
|  | "dp = get_dataframes_pipe()\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)\n", | 
|  | "\n", | 
|  | "print('Regular DataPipe')\n", | 
|  | "dp = get_regular_pipe()\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "You can iterate over raw DF using `raw_iterator`" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 4, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "   i  j\n0  0  0\n1  1  1\n2  2  2\n3  3  0\n4  4  1\n5  5  2\n6  6  0\n   i  j\n0  7  1\n1  8  2\n2  9  0\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe()\n", | 
|  | "for i in dp.raw_iterator():\n", | 
|  | "    print(i)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "Operations over DF Pipe is captured" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 5, | 
|  | "metadata": { | 
|  | "tags": [] | 
|  | }, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "var_3 = input_var_2.i * 100\nvar_4 = var_3 + input_var_2.j\nvar_5 = var_4 - 2.7\ninput_var_2[\"y\"] = var_5\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(dataframe_size = 3)\n", | 
|  | "dp['y'] = dp.i * 100 + dp.j - 2.7\n", | 
|  | "print(dp.ops_str())\n" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "Captured operations executed on `__next__` calls of constructed DataPipe" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 6, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "   i  j      y\n0  0  0   -2.7\n1  1  1   98.3\n2  2  2  199.3\n   i  j      y\n0  3  0  297.3\n1  4  1  398.3\n2  5  2  499.3\n   i  j      y\n0  6  0  597.3\n1  7  1  698.3\n2  8  2  799.3\n   i  j      y\n0  9  0  897.3\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(dataframe_size = 3)\n", | 
|  | "dp['y'] = dp.i * 100 + dp.j - 2.7\n", | 
|  | "for i in dp.raw_iterator():\n", | 
|  | "    print(i)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "`shuffle` of DataFramePipe effects rows in individual manner" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 7, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "Raw DataFrames iterator\n   i  j\n2  8  2\n2  2  2\n2  5  2\n   i  j\n1  4  1\n1  1  1\n0  3  0\n   i  j\n1  7  1\n0  9  0\n0  6  0\n   i  j\n0  0  0\nRegular DataFrames iterator\n(1, 1)\n(5, 2)\n(8, 2)\n(9, 0)\n(7, 1)\n(6, 0)\n(3, 0)\n(4, 1)\n(0, 0)\n(2, 2)\nRegular iterator\n(5, 2)\n(6, 0)\n(0, 0)\n(9, 0)\n(3, 0)\n(1, 1)\n(2, 2)\n(8, 2)\n(4, 1)\n(7, 1)\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(dataframe_size = 3)\n", | 
|  | "dp = dp.shuffle()\n", | 
|  | "print('Raw DataFrames iterator')\n", | 
|  | "for i in dp.raw_iterator():\n", | 
|  | "    print(i)\n", | 
|  | "\n", | 
|  | "print('Regular DataFrames iterator')\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)\n", | 
|  | "\n", | 
|  | "\n", | 
|  | "# this is similar to shuffle of regular DataPipe\n", | 
|  | "dp = get_regular_pipe()\n", | 
|  | "dp = dp.shuffle()\n", | 
|  | "print('Regular iterator')\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "You can continue mixing DF and DP operations" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 8, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "    i   j          y\n0 -17 -17  -197000.0\n1 -13 -16  3813000.0\n0 -11 -17  5803000.0\n    i   j          y\n2 -12 -15  4823000.0\n1 -10 -16  6813000.0\n1 -16 -16   813000.0\n    i   j          y\n0  -8 -17  8803000.0\n2  -9 -15  7823000.0\n0 -14 -17  2803000.0\n    i   j          y\n2 -15 -15  1823000.0\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(dataframe_size = 3)\n", | 
|  | "dp['y'] = dp.i * 100 + dp.j - 2.7\n", | 
|  | "dp = dp.shuffle()\n", | 
|  | "dp = dp - 17\n", | 
|  | "dp['y'] = dp.y * 10000\n", | 
|  | "for i in dp.raw_iterator():\n", | 
|  | "    print(i)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "Batching combines everything into `list` it is possible to nest `list`s. List may have any number of DataFrames as soon as total number of rows equal to batch size." | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 9, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "Iterate over DataFrame batches\n[(6, 0),(0, 0)]\n[(4, 1),(1, 1)]\n[(2, 2),(9, 0)]\n[(3, 0),(5, 2)]\n[(7, 1),(8, 2)]\nIterate over regular batches\n[(1, 1),(4, 1)]\n[(2, 2),(3, 0)]\n[(6, 0),(7, 1)]\n[(8, 2),(0, 0)]\n[(5, 2),(9, 0)]\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(dataframe_size = 3)\n", | 
|  | "dp = dp.shuffle()\n", | 
|  | "dp = dp.batch(2)\n", | 
|  | "print(\"Iterate over DataFrame batches\")\n", | 
|  | "for i,v in enumerate(dp):\n", | 
|  | "    print(v)\n", | 
|  | "\n", | 
|  | "# this is similar to batching of regular DataPipe\n", | 
|  | "dp = get_regular_pipe()\n", | 
|  | "dp = dp.shuffle()\n", | 
|  | "dp = dp.batch(2)\n", | 
|  | "print(\"Iterate over regular batches\")\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "Some details about internal storage of batched DataFrames and how they are iterated" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 10, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", | 
|  | "As string:  [(0, 0),(3, 0)]\n", | 
|  | "Iterated regularly:\n", | 
|  | "-- batch start --\n", | 
|  | "(0, 0)\n", | 
|  | "(3, 0)\n", | 
|  | "-- batch end --\n", | 
|  | "Iterated in inner format (for developers):\n", | 
|  | "-- df batch start --\n", | 
|  | "   i  j\n", | 
|  | "0  0  0\n", | 
|  | "0  3  0\n", | 
|  | "-- df batch end --\n", | 
|  | "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", | 
|  | "As string:  [(6, 0),(1, 1)]\n", | 
|  | "Iterated regularly:\n", | 
|  | "-- batch start --\n", | 
|  | "(6, 0)\n", | 
|  | "(1, 1)\n", | 
|  | "-- batch end --\n", | 
|  | "Iterated in inner format (for developers):\n", | 
|  | "-- df batch start --\n", | 
|  | "   i  j\n", | 
|  | "0  6  0\n", | 
|  | "1  1  1\n", | 
|  | "-- df batch end --\n", | 
|  | "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", | 
|  | "As string:  [(9, 0),(4, 1)]\n", | 
|  | "Iterated regularly:\n", | 
|  | "-- batch start --\n", | 
|  | "(9, 0)\n", | 
|  | "(4, 1)\n", | 
|  | "-- batch end --\n", | 
|  | "Iterated in inner format (for developers):\n", | 
|  | "-- df batch start --\n", | 
|  | "   i  j\n", | 
|  | "0  9  0\n", | 
|  | "1  4  1\n", | 
|  | "-- df batch end --\n", | 
|  | "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", | 
|  | "As string:  [(5, 2),(2, 2)]\n", | 
|  | "Iterated regularly:\n", | 
|  | "-- batch start --\n", | 
|  | "(5, 2)\n", | 
|  | "(2, 2)\n", | 
|  | "-- batch end --\n", | 
|  | "Iterated in inner format (for developers):\n", | 
|  | "-- df batch start --\n", | 
|  | "   i  j\n", | 
|  | "2  5  2\n", | 
|  | "2  2  2\n", | 
|  | "-- df batch end --\n", | 
|  | "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", | 
|  | "As string:  [(8, 2),(7, 1)]\n", | 
|  | "Iterated regularly:\n", | 
|  | "-- batch start --\n", | 
|  | "(8, 2)\n", | 
|  | "(7, 1)\n", | 
|  | "-- batch end --\n", | 
|  | "Iterated in inner format (for developers):\n", | 
|  | "-- df batch start --\n", | 
|  | "   i  j\n", | 
|  | "2  8  2\n", | 
|  | "1  7  1\n", | 
|  | "-- df batch end --\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(dataframe_size = 3)\n", | 
|  | "dp = dp.shuffle()\n", | 
|  | "dp = dp.batch(2)\n", | 
|  | "for i in dp:\n", | 
|  | "    print(\"Type: \", type(i))\n", | 
|  | "    print(\"As string: \", i)\n", | 
|  | "    print(\"Iterated regularly:\")\n", | 
|  | "    print('-- batch start --')\n", | 
|  | "    for item in i:\n", | 
|  | "        print(item)\n", | 
|  | "    print('-- batch end --')\n", | 
|  | "    print(\"Iterated in inner format (for developers):\")\n", | 
|  | "    print('-- df batch start --')\n", | 
|  | "    for item in i.raw_iterator():\n", | 
|  | "        print(item)\n", | 
|  | "    print('-- df batch end --')   " | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "`concat` should work only of DF with same schema, this code should produce an error " | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 11, | 
|  | "metadata": {}, | 
|  | "outputs": [], | 
|  | "source": [ | 
|  | "# TODO!\n", | 
|  | "# dp0 = get_dataframes_pipe(range = 8, dataframe_size = 4)\n", | 
|  | "# dp = get_dataframes_pipe(range = 6, dataframe_size = 3)\n", | 
|  | "# dp['y'] = dp.i * 100 + dp.j - 2.7\n", | 
|  | "# dp = dp.concat(dp0)\n", | 
|  | "# for i,v in enumerate(dp.raw_iterator()):\n", | 
|  | "#     print(v)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "`unbatch` of `list` with DataFrame works similarly to regular unbatch.\n", | 
|  | "Note: DataFrame sizes might change" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 12, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "error", | 
|  | "ename": "AttributeError", | 
|  | "evalue": "", | 
|  | "traceback": [ | 
|  | "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", | 
|  | "\u001b[0;31mAttributeError\u001b[0m                            Traceback (most recent call last)", | 
|  | "\u001b[0;32m<ipython-input-12-fa80e9c68655>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      5\u001b[0m \u001b[0;31m# Here is bug with unbatching which doesn't detect DF type.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 6\u001b[0;31m \u001b[0mdp\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'z'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0my\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0;36m100\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m      7\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      8\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mraw_iterator\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | 
|  | "\u001b[0;32m~/dataset/pytorch/torch/utils/data/dataset.py\u001b[0m in \u001b[0;36m__getattr__\u001b[0;34m(self, attribute_name)\u001b[0m\n\u001b[1;32m    222\u001b[0m             \u001b[0;32mreturn\u001b[0m \u001b[0mfunction\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    223\u001b[0m         \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 224\u001b[0;31m             \u001b[0;32mraise\u001b[0m \u001b[0mAttributeError\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    225\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    226\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0m__reduce_ex__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | 
|  | "\u001b[0;31mAttributeError\u001b[0m: " | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(range = 18, dataframe_size = 3)\n", | 
|  | "dp['y'] = dp.i * 100 + dp.j - 2.7\n", | 
|  | "dp = dp.batch(5).batch(3).batch(1).unbatch(unbatch_level = 3)\n", | 
|  | "\n", | 
|  | "# Here is bug with unbatching which doesn't detect DF type.\n", | 
|  | "dp['z'] = dp.y - 100\n", | 
|  | "\n", | 
|  | "for i in dp.raw_iterator():\n", | 
|  | "    print(i)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "`map` applied to individual rows, `nesting_level` argument used to penetrate batching" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 13, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "Iterate over DataFrame batches\n[(1111000, 1111000),(1112000, 1112000),(1113000, 1113000),(1114000, 1111000),(1115000, 1112000)]\n[(1116000, 1113000),(1117000, 1111000),(1118000, 1112000),(1119000, 1113000),(1120000, 1111000)]\nIterate over regular batches\n[(1111000, 0),(1112000, 1),(1113000, 2),(1114000, 0),(1115000, 1)]\n[(1116000, 2),(1117000, 0),(1118000, 1),(1119000, 2),(1120000, 0)]\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(range = 10, dataframe_size = 3)\n", | 
|  | "dp = dp.map(lambda x: x + 1111)\n", | 
|  | "dp = dp.batch(5).map(lambda x: x * 1000, nesting_level = 1)\n", | 
|  | "print(\"Iterate over DataFrame batches\")\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)\n", | 
|  | "\n", | 
|  | "# Similarly works on row level for classic DataPipe elements\n", | 
|  | "dp = get_regular_pipe(range = 10)\n", | 
|  | "dp = dp.map(lambda x: (x[0] + 1111, x[1]))\n", | 
|  | "dp = dp.batch(5).map(lambda x: (x[0] * 1000, x[1]), nesting_level = 1)\n", | 
|  | "print(\"Iterate over regular batches\")\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)\n", | 
|  | "\n" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "source": [ | 
|  | "`filter` applied to individual rows, `nesting_level` argument used to penetrate batching" | 
|  | ], | 
|  | "cell_type": "markdown", | 
|  | "metadata": {} | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 14, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "output_type": "stream", | 
|  | "name": "stdout", | 
|  | "text": [ | 
|  | "Iterate over DataFrame batches\n[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n[(11, 2),(12, 0)]\nIterate over regular batches\n[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n[(11, 2),(12, 0)]\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "dp = get_dataframes_pipe(range = 30, dataframe_size = 3)\n", | 
|  | "dp = dp.filter(lambda x: x.i > 5)\n", | 
|  | "dp = dp.batch(5).filter(lambda x: x.i < 13, nesting_level = 1)\n", | 
|  | "print(\"Iterate over DataFrame batches\")\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)\n", | 
|  | "\n", | 
|  | "# Similarly works on row level for classic DataPipe elements\n", | 
|  | "dp = get_regular_pipe(range = 30)\n", | 
|  | "dp = dp.filter(lambda x: x[0] > 5)\n", | 
|  | "dp = dp.batch(5).filter(lambda x: x[0] < 13, nesting_level = 1)\n", | 
|  | "print(\"Iterate over regular batches\")\n", | 
|  | "for i in dp:\n", | 
|  | "    print(i)" | 
|  | ] | 
|  | } | 
|  | ] | 
|  | } |