Skip to content

ファイルレジストリ

非構造化データを扱うために、DataRobotには、ファイルをアップロードして操作できるシンプルなファイルシステムインターフェイスが用意されています。 このファイルシステムインターフェイスは、ディレクトリ構造を持つ従来のファイルシステムを再現し、一般的なUnixファイルシステムの操作をサポートしています。 DataRobotのファイルシステムでは、カタログアイテムと呼ばれるコンテナを使用して、ファイルのパスがキーで、その内容が値であるキー/値ストレージアプローチを使用して1つ以上のファイルを格納します。 アップロードされたファイルは、SharePointサイトからアップロードされたファイルでベクターデータベースを作成するなど、DataRobotプラットフォームの他の領域またはワークフローにおいてバックグラウンドで活用できます。

DataRobotのdatarobot.fs.DataRobotFileSystemfsspec互換の実装)を使用すれば、他のfsspec対応ファイルシステムと同じコードパターンを活用したファイルベースのワークフローを迅速に構築できます。

ファイルシステムの用語

DataRobotのファイルシステムでは、以下の用語を使用しています。

  • カタログアイテム:1つ以上のファイルを格納するコンテナ。 カタログアイテムは、DataRobotプラットフォームにおけるデータアセットの一種です。 各カタログアイテムには、独自のID、権限、およびバージョン履歴があります。
  • カタログアイテムディレクトリ:カタログアイテムに対応するファイルシステムの最上位ディレクトリ。 ディレクトリ名はカタログアイテムのIDと一致します。 カタログアイテム内のすべてのファイルは、このディレクトリ内のパスとして存在します。
  • パス:ファイルシステム内のファイルまたはディレクトリの場所。 パスには、カタログアイテムのIDと、そのカタログアイテム内の内部パスが含まれます。 パスの形式はdr://<catalog_item_id>/path/to/fileまたは<catalog_item_id>/path/to/fileです。
  • 上書き戦略:ファイルがすでに存在するパスをアップロードまたは書き込みの対象とする場合の動作を制御する設定。 使用可能なオプションについては、FilesOverwriteStrategyを参照してください。
  • 署名付きURL:追加の認証なしで1つのファイルへの直接読み取りアクセスを許可する、一時的で期限付きのURL。 ファイルの共有や、外部ツールへの引き渡しに便利です。

リマインダー

DataRobotのファイルシステムを使用する際は、以下の点に注意してください。

  • ファイルシステムは、各カタログアイテムにIDにちなんだ名前の独自のディレクトリを割り当てることで、最上位のディレクトリ構造をシミュレートします。 カタログアイテム内のファイルは、そのディレクトリ内のパスとして表示されます。
  • 権限は、ファイルを含むカタログアイテムにアタッチされます。 カタログアイテム内のすべてのファイルは、ここで説明されているファイルシステムAPIの利用に関して、そのカタログアイテムから権限を継承します。 ファイルの取り込みに使用されるコネクターでサポートされている場合は、ファイルに外部アクセス制御リスト(ACL)の権限がアタッチされていることもあります。 詳細については、ACLハイドレーションのドキュメントを参照してください。
  • ファイルシステムは、コンテナ内にファイルを保存する際にキーと値のペアを使用するため、ディレクトリ構造はシミュレートされたものであり、その内容に応じて変化する場合があります。 これにより、次のような結果になります。
    • ファイルシステムは空のディレクトリをサポートしません。 ディレクトリ内のすべてのファイルが削除されると、そのディレクトリは自動的に削除されます。
    • ディレクトリXを作成するには、そのディレクトリ名を含むパス(例:X/file.txt)にファイルをアップロードします。
  • カタログアイテム自体は空にできますが、その中に空のディレクトリを含めることはできません。
  • ファイル操作によっては、ファイルシステムでファイルを作成/移動/コピーする際に、名前の競合が発生することがあります。 ファイルの競合は、操作実行時に指定された上書き戦略に従って処理されます。

ファイルシステムの設定

このガイドの各例は、前の例の内容を踏まえています。 以下の設定ではDataRobotクライアントを構成し、使用するDataRobotFileSystemを作成します。

Python 3.9以降を使用し、datarobotfsパッケージアドオンをインストールします。

pip install 'datarobot[fs]' 
import datarobot as dr
from datarobot.fs import DataRobotFileSystem

dr.Client(token="<YOUR_API_TOKEN>", endpoint="https://app.datarobot.com/api/v2")

fs = DataRobotFileSystem() 

新しいカタログアイテムの作成

カタログアイテムとは、DataRobotのファイルシステム内でファイルを格納するコンテナのことです。 参照するすべてのパスはカタログアイテムをルートとするため、ワークフローを開始するにはカタログアイテムが1つ必要になります。 ファイルカタログアイテムを作成するには、新しい空のカタログアイテムを作成する方法と、既存のカタログアイテムのクローンを作成する方法の2つがあります。 どちらの方法でも新しいカタログアイテムのIDが返され、その後のすべての操作において、dr://{catalog_id}/...という形式でパスを構築する際にこのIDを再利用します。

まったく新しい空のカタログアイテムを作成するには、create_catalog_item_dirを使用します。

# Create a brand-new, empty catalog item
catalog_id = fs.create_catalog_item_dir() 

既存のカタログアイテムのコピーを作成するには、clone_catalog_item_dirを使用します。 特定のファイルをクローンから除外するには、files_to_omitを渡します。 files_to_omit内のパスは、ソースのカタログアイテムのルートからの相対パスとなります。

# Clone an existing catalog item, copying every file into a new one
source_catalog_id = "<EXISTING_CATALOG_ITEM_ID>"
clone_id = fs.clone_catalog_item_dir(source_catalog_id)

# Or clone but omit specific files from the source
partial_clone_id = fs.clone_catalog_item_dir(
    source_catalog_id,
    files_to_omit=["data/scores.csv", "notes/draft.txt"],
) 

ファイルの追加

ローカルマシン、公開URL、またはデータソースからファイルをアップロードして、DataRobotのファイルシステムに追加します。 あるいは、ファイルパスに直接コンテンツを書き込んで、新しいファイルを作成します。

新しいファイルに直接コンテンツを書き込む

ファイルパスに直接コンテンツを書き込んで、その場所に新しいファイルを作成します。 テキストまたはバッファリングされたバイナリ書き込みの場合は書き込みモードでopenを使用し、生のバイトを一度だけ書き込むにはpipe_fileまたはpipeを使用します。

# Write a text file in place
with fs.open(f"dr://{catalog_id}/notes/readme.txt", mode="w") as f:
    f.write("This catalog item contains demo files for the file system guide.")

# Write raw bytes in a single call.
fs.pipe_file(f"dr://{catalog_id}/data/sample.csv", b"name,score\nCharlie,72\n") 

デフォルトでは、openFilesOverwriteStrategy.REPLACEを使用するため、すでにファイルが存在するパスに書き込むと、既存のファイルが上書きされます。 あるいは、別のoverwrite_strategyを指定して、この動作を変更します。 たとえば、FilesOverwriteStrategy.RENAMEを使用すると、代わりに(2)という接尾辞が付いた複製ファイルが作成されます。

from datarobot.enums import FilesOverwriteStrategy

# Write to the existing path notes/readme.txt. 
# The new content will be placed in a new file /notes/readme (2).txt
with fs.open(
    f"dr://{catalog_id}/notes/readme.txt",
    mode="w",
    overwrite_strategy=FilesOverwriteStrategy.RENAME
) as f:
    f.write("This content is written to a new file because RENAME was specified.") 

ローカルファイルをアップロード

ローカルマシンからカタログアイテムにファイルをコピーするには、put_fileを使用してください。ファイルやディレクトリが複数ある場合は、putを使用します。

以下の例では、最初に小さなローカルファイルをいくつか作成してから、2つの異なる方法でそれらをアップロードします。

import tempfile
import fsspec

# Use the local fsspec implementation to stage demo files in a temp directory.
local_fs = fsspec.filesystem("local")
local_dir = tempfile.mkdtemp()

local_fs.makedirs(f"{local_dir}/notes", exist_ok=True)
with local_fs.open(f"{local_dir}/scores.csv", "w") as f:
    f.write("name,score\nAlice,95\nBob,87\n")
with local_fs.open(f"{local_dir}/notes/agenda.txt", "w") as f:
    f.write("Q3 planning agenda")
with local_fs.open(f"{local_dir}/notes/actions.txt", "w") as f:
    f.write("1. Review roadmap\n2. Confirm budget\n")

# Upload a single file
fs.put_file(f"{local_dir}/scores.csv", f"dr://{catalog_id}/data/scores.csv")

# Upload a directory recursively. Trailing slashes mark both paths as directories.
fs.put(f"{local_dir}/notes/", f"dr://{catalog_id}/notes/", recursive=True) 

URLからファイルをアップロードする

DataRobotサーバーがアクセス可能な任意のURLからファイルを直接取り込むには、put_from_urlを使用します。 ファイルはサーバー側でストリーミングされるため、事前にローカルにダウンロードする必要はありません。

# Ingest from url and create file dr://<catalog-id>/external/iris.csv
fs.put_from_url(
    path=f"dr://{catalog_id}/external/",
    url="https://s3.amazonaws.com/datarobot_public_datasets/iris.csv",
) 

デフォルトでは、put_from_urlはアップロードが完了するまでブロックされます。 アップロードを開始してすぐに戻るには、wait_for_completion=Falseを渡すか、upload_timeoutを使用してブロック時の待機時間を制御します。

データソースからファイルをアップロードする

コネクター対応システム(S3、SharePoint、Googleドライブ、Confluenceなど)からファイルを取り込むには、put_from_data_sourceを使用します。 これには、構造化されていないDataStoreに対して設定されたDataSourceと、それにアクセスできるCredentialが必要です。

以下の例では、S3バケットのDataSourceを設定し、ドキュメントのフォルダーをカタログアイテムにコピーします。 同じパターンを他のソースシステムにも適用できます。 SharePointおよびGoogleドライブのバリエーションについては、put_from_data_sourceを参照してください。

credential = dr.Credential.create_s3(
    name="S3 Credential",
    aws_access_key_id="<AWS_ACCESS_KEY_ID>",
    aws_secret_access_key="<AWS_SECRET_ACCESS_KEY>",
)
s3_connector = next(c for c in dr.Connector.list() if c.connector_type == "s3")

s3_data_store = dr.DataStore.create(
    data_store_type=dr.enums.DataStoreTypes.DR_CONNECTOR_V1,
    canonical_name="My S3 Bucket",
    fields=[
        {"id": "fs.defaultFS", "name": "Bucket Name", "value": "my-bucket-name"},
        {"id": "fs.rootDirectory", "name": "Prefix", "value": "/"},
        {"id": "fs.s3.awsRegion", "name": "S3 Bucket Region", "value": "us-east-1"},
    ],
    connector_id=s3_connector.id,
)
s3_data_source = dr.DataSource.create(
    data_source_type=dr.enums.DataStoreTypes.DR_CONNECTOR_V1,
    canonical_name="S3 Documents",
    params=dr.DataSourceParameters(
        data_store_id=s3_data_store.id,
        path="documents/",
    ),
)

fs.put_from_data_source(
    path=f"dr://{catalog_id}/s3_documents/",
    data_source_id=s3_data_source.id,
    credential_id=credential.credential_id,
) 

デフォルトでは、put_from_data_sourceはアップロードが完了するまでブロックされます。 アップロードを開始してすぐに戻るには、wait_for_completion=Falseを渡すか、upload_timeoutを使用してブロック時の待機時間を制御します。

ファイルの参照と検索

このファイルシステムは、標準のfsspec探索方法をサポートしています。

  • ls:ディレクトリの内容の簡易リスト。
  • find:すべてのファイル(必要に応じてディレクトリも含む)を再帰的に検索します。
  • walk:ディレクトリツリーを一度に1レベルずつ生成するジェネレーター(Pythonのos.walkに似ています)。
  • glob:パターンに一致するファイルやディレクトリを検索します。
  • tree:ディレクトリツリーの構造を可視化します。
# List the immediate contents of the catalog item. Set detail=False for just the paths.
fs.ls(f"dr://{catalog_id}/", detail=False)

# Use detail=True (the default) to also retrieve size, type, and format.
for item in fs.ls(f"dr://{catalog_id}/", detail=True):
    print(f"{item['name']:50s} type={item['type']:10s} size={item['size']}")

# Recursively list every file. Pass withdirs=True to include directories.
all_files = fs.find(f"dr://{catalog_id}/")

# Walk the directory tree one level at a time, similar to os.walk().
for dirpath, dirnames, filenames in fs.walk(f"dr://{catalog_id}/"):
    print((dirpath, dirnames, filenames))

# Glob lets you match by pattern. Supports *, **, ?, and [abc] character classes.
csv_files = fs.glob(f"dr://{catalog_id}/**/*.csv")

# Visualize the catalog item layout. The recursion_limit controls how deep to walk.
print(fs.tree(f"dr://{catalog_id}/", recursion_limit=3)) 

ヒント

/で終わるパターンは、ディレクトリにのみ一致します。 たとえば、dr://{catalog_id}/*/は、カタログアイテムの最上位のサブディレクトリを返します。

ファイルの操作

DataRobotのファイルシステムでは、ファイルをコピー、移動、および削除するメソッドがサポートされています。 この3つのメソッドすべてで、単一のパス、パスのリスト、およびグロブパターンを使用でき、ディレクトリに対して再帰的な操作を行うことができます。

ファイルのコピー

ファイルやディレクトリを新しいパスに複製するにはcopyを使用します。 ディレクトリとそのすべての内容をコピーするにはrecursive=Trueを渡し、複数のファイルを一度にコピーするにはグロブパターンを使用します。 コピー先での名前の競合の処理方法を指定するには、overwrite_strategyを渡します。 ユーザーがコピー元およびコピー先のカタログアイテムに対して権限を持っている場合、カタログアイテム間のコピーも可能です。

from datarobot.enums import FilesOverwriteStrategy

# Copy a single file.
fs.copy(
    f"dr://{catalog_id}/data/scores.csv",
    f"dr://{catalog_id}/backups/scores_backup.csv",
)

# Copy a directory recursively, skipping any files that already exist in the destination.
# Both paths end with / to mark them as directories.
fs.copy(
    f"dr://{catalog_id}/notes/",
    f"dr://{catalog_id}/archive/notes_snapshot/",
    recursive=True,
    overwrite_strategy=FilesOverwriteStrategy.SKIP
)

# Use a glob pattern to copy all .txt files into a single folder.
fs.copy(
    f"dr://{catalog_id}/**/*.txt",
    f"dr://{catalog_id}/all_text_files/",
    recursive=True,
) 

ファイルの移動とファイル名の変更

ファイルを新しいパスに移動したり、ファイル名を変更したりするには、mvを使用します。 ユーザーが移動元および移動先のカタログアイテムに対して権限を持っている場合、カタログアイテム間のファイル移動も可能です。

# Rename a file by moving it to a new path within the same catalog item.
fs.mv(f"dr://{catalog_id}/backups/scores_backup.csv", f"dr://{catalog_id}/backups/scores_v1.csv")

# Move a file into a different directory (note the trailing slash on the target).
fs.mv(f"dr://{catalog_id}/backups/scores_v1.csv", f"dr://{catalog_id}/archive/") 

ファイルの削除

ファイルやディレクトリを削除するにはrmを使用します。 ディレクトリとそのすべての内容を削除するにはrecursive=Trueを渡します。 複数のファイルを一度に削除するにはグロブパターンを使用します。

# Delete a single file.
fs.rm(f"dr://{catalog_id}/archive/scores_v1.csv")

# Delete a directory recursively.
fs.rm(f"dr://{catalog_id}/all_text_files/", recursive=True)

# Delete every csv file under archive.
fs.rm(f"dr://{catalog_id}/archive/**/*.csv", recursive=True) 

ディレクトリやカタログアイテムの削除

DataRobotのファイルシステムでは空のディレクトリをサポートしていないため、ディレクトリ内のすべてのファイルを削除すると、そのディレクトリも自動的に削除されます。 ただし、カタログアイテムについては異なります。 カタログアイテム内のすべてのファイルを削除しても、そのカタログアイテム自体は削除されません。

カタログアイテム自体を削除するには、カタログアイテムのルートでfs.rmを呼び出します(例:fs.rm(f"dr://{catalog_id}/"))。 これにより、カタログアイテムがソフト削除されます。 ソフト削除されたカタログアイテムは非表示になりますが、元に戻したい場合はFiles.un_delete()を使用して復元できます。

ファイルの読み取り

ファイルを読み取るには、以下を使用します。

  • ストリーミングおよび標準ファイルと同様のアクセスにはopenを使用します。
  • 1回限りの読み取りにはcatまたはcat_fileを使用します。
  • 一時的な署名付きURLの生成にはsignを使用します。
  • ファイルをローカルにダウンロードするにはgetを使用します。

openを使用したファイルのストリーミング

openは、標準のPythonファイルオブジェクトのように動作するDataRobotFileを返します。 これは、大容量ファイルを読み取るための最も柔軟な方法であり、行単位での反復処理、特定の位置へのシーク、または固定サイズのチャンク単位での読み取りをサポートしています。

# Iterate line by line. Never loads the full file into memory.
with fs.open(f"dr://{catalog_id}/data/scores.csv", mode="r") as f:
    for line in f:
        print(line.rstrip())

# Read in binary mode with seeking.
with fs.open(f"dr://{catalog_id}/data/scores.csv", mode="rb") as f:
    header = f.read(20)
    f.seek(0)
    full = f.read() 

catを使用したファイルの読み取り

catは、1回の呼び出しでファイルの内容を返します。 単一のパスを渡すとバイトが返され、パスのグロブ/リストを渡すと{path: bytes}辞書が返されます。

# Read a single file as bytes.
data = fs.cat(f"dr://{catalog_id}/data/scores.csv")

# Read every CSV file in the catalog item at once.
all_csvs = fs.cat(f"dr://{catalog_id}/**/*.csv", recursive=True)
for path, content in all_csvs.items():
    print(f"{path}: {len(content)} bytes") 

署名付きURLの生成

署名付きURLを使用すると、サードパーティのツール(ブラウザー、ダウンストリームサービス、ノートブックユーザー)に対し、DataRobot APIトークンを共有することなく、ファイルへの一時的な読み取りアクセス権を付与できます。 DataRobotのファイルシステムからファイルをダウンロードするには、署名付きURLを使用します。

import requests

url = fs.sign(f"dr://{catalog_id}/data/scores.csv", expiration=300)

# Download file locally using signed url
local_path = "scores.csv"
with requests.get(url, stream=True) as r:
    r.raise_for_status()
    with open(local_path, "wb") as f:
        for chunk in r.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk) 

ファイルをローカルにダウンロードする

ファイルをローカルマシンにダウンロードするには、単一のファイルの場合はget_fileを、複数のファイルやディレクトリ全体の場合はgetを使用します。

import tempfile

local_dir = tempfile.mkdtemp()

# Download a single file to a local path.
fs.get_file(f"dr://{catalog_id}/data/scores.csv", f"{local_dir}/scores.csv")

# Download a directory recursively. Trailing slashes mark both paths as directories.
fs.get(f"dr://{catalog_id}/notes/", f"{local_dir}/notes/", recursive=True)

# Use a glob pattern to download all .csv files into a single local directory.
fs.get(f"dr://{catalog_id}/**/*.csv", f"{local_dir}/all_csvs/", recursive=True) 

ファイルとディレクトリの検査

  • infoを使用すると、単一のファイルまたはディレクトリの詳細なメタデータを取得できます。
  • existsを使用すると、ファイルまたはディレクトリが存在するかどうかを確認できます。
  • isfileを使用すると、パスがファイルを指しているかどうかを判断できます。
  • isdirを使用すると、パスがディレクトリを指しているかどうかを判断できます。
  • duを使用すると、ファイルとディレクトリのディスク使用量を確認できます。
# Detailed metadata for a single file or directory.
file_info = fs.info(f"dr://{catalog_id}/data/scores.csv")

# Quick existence checks.
print("exists?", fs.exists(f"dr://{catalog_id}/data/scores.csv"))
print("isfile?", fs.isfile(f"dr://{catalog_id}/data/scores.csv"))
print("isdir?",  fs.isdir(f"dr://{catalog_id}/data/"))

# Total disk usage for the entire catalog item.
print(f"Total bytes: {fs.du(f'dr://{catalog_id}/', total=True):,}") 

get_mapperを使用した辞書のようなアクセス

get_mapperDataRobotFSMapのインスタンスを返します。これは、指定されたパスをルートとするMutableMappingです。 これは、fsspec形式のマッパーを受け付けるライブラリ(たとえば、Zarr、Xarray、およびその他の配列ストア)を使用する場合や、単にファイルの読み書きに辞書セマンティクスを使いたい場合に便利です。

mapper = fs.get_mapper(f"dr://{catalog_id}/data/")

# List files in mapping
print("Keys:", list(mapper))
# Read bytes from a file
print("scores.csv first 30 bytes:", mapper["scores.csv"][:30])

# Write to create a new file
mapper["generated.txt"] = b"This file was created via the mapper interface."

# Membership checks and size.
print("'scores.csv' in mapper?", "scores.csv" in mapper)
print("Total files:", len(mapper))

# Delete file
del mapper["generated.txt"]