Use secured storage in public AML workspace

Data is often critical in enterprise machine learning.

To meet your needs, Azure Machine Learning (AML) provides the way to set-up an AML workspace behind VNet.
However, it will need a lot of time-consuming tasks for you and your team to configure all related infrastructure and components.

In this post, I’ll show you how to configure secured storage in public AML workspace, without exposing any ports or endpoints for the protected data.

Provision secured storage in AML

First of all, let’s create a virtual network (VNet) to protect Azure storage account. (Later I’ll configure storage account behind this VNet.)

Next, create a public AML workspace as usual.
Make sure to create a workspace in the same region with previous VNet.

When you have created an AML workspace, you will see the default storage account in this resource group. (See below.)
Go to this storage account and click “Networking” for configuring private endpoints and virtual networks.

In firewalls and virtual networks settings (in storage account), click “Selected networks” to configure the protected storage.
This will disable public access for this storage account.

In this settings, add the previous virtual network (VNet) and subnet to enable access from this VNet.

To enable your client to access this storage in client (such as, Azure Portal, Azure Storage Explorer, etc), enable “Add your client IP address”. (See below.)
This setting is optional.

In “Exceptions” settings, make sure to enable “Allow Azure services on the trusted services list to access this storage account”. (See below.)

Make sure to click “Save” button to confirm these all settings.

[Optional] Create a Private Endpoint

In order to enable a private endpoint connection for this storage, configure the additional setting as follows.

To create a private endpoint in this virtual network (VNet), click “private endpoint connections” tab in “Networking” setting in storage account.
In this pane, click the following button and proceed the wizards.

In this private endpoint’s setting (wizards), set the following properties.

  • Region : the same region with previous VNet
  • Target sub resource : blob
  • Virtual network / Subnet : the previous VNet and subnet
  • Integrate with private DNS zone : Yes

Secure training with protected data

To run Python script and train without exposing ports, such as, ssh and RDP, you should create a virtual machine (VM) connected to this VNet, and make sure to remove (disassociate) VM public IP address on this VM.
(See here for configuring your own VM for AML.)

There are several options to configure network for connecting this VM from your local client.
Here I’ll show you several deployment architectures as follows.

Note : When you have configured virtual network in AML’s default storage, you can create a compute instance with virtual network enabled in AML studio UI. (See below.)
However, Jupyter notebook functionalities in AML compute instance can only be accessed with *.instances.azureml.ms address.

When you use a compute cluster in your script, you should also configure VNet with provisioning_configuration() as follows.

from azureml.core.compute import AmlCompute, ComputeTarget

compute_config = AmlCompute.provisioning_configuration(
  vm_size='Standard_NC6',
  min_nodes=0,
  max_nodes=1,
  vnet_resourcegroup_name="AML-Test-rg",
  vnet_name="vnet01",
  subnet_name="default")
compute_target = ComputeTarget.create(ws, "cluster01", compute_config)

Now you can run Python script with AML Python SDK in this secured virtual machine (secured VM) as usual, working with your protected data.

Secured container registry (ACR)

You can also use secured container registry (ACR) together with public AML workspace.
Both storage and container registry should be in the same virtual network and subnet.

When you run image building with this protected container registry (with a private endpoint), make sure to run the following command.
(In Azure Machine Learning, it cannot use a private endpoint to directly build Docker images, and the compute cluster should be used to build the images instead.)

ws.update(image_build_compute = 'cluster01')

Things to Know About Serverless SQL Pool in Azure Synapse Analytics

Some folks often ask me “Is there any appropriate services for ad-hoc and serverless query against unstructured data (flat files) by scaling, such like, Amazon Athena or Google Big Query ?”
Unfortunately, the proven Azure Databricks doesn’t have any corresponding alternative ad-hoc pool (“cluster pool” in Databricks is similar, however computing resources for cluster pool should also be provisioned beforehand), and Azure Data Lake Analytics seems not to be focused in current Azure improvements.
These people will be interested in Serverless SQL pool (formerly, SQL on-demand) in Azure Synapse Analytics.

Using Serverless SQL pool (“Built-in” pool) in Azure Synapse Analytics, you can soon invoke query against CSV, TSV, Parquet, and JSON without the need for preparing and running dedicated computing resources. The system automatically adjusts resources based on your requirements, freeing you up from managing your infrastructure and picking the right size for your solution.
When you run the workload of occasional request’s processing (mostly sitting idle), such as, log analytics or occasional business reports, it will help you save your money.

Getting Started

Before using Azure Synapse Analytics, create an Azure storage account resource and upload your files into blob.
In this tutorial, I used flight-weather parquet dataset in Azure Databricks hands-on.

Now, create Azure Synapse Analytics resource (workspace) in Azure Portal and launch Synapse Studio.

First, click “Develop” menu in left navigation and create a new script file.

As you notice, the default attached computing pool is pre-built pool called “Built-in” (formerly, “SQL on-demand”), because we don’t have any provisioned pools. (See below.) This pool is for Serverless SQL and then you don’t need to change this pool.
In this stage, the attached database might be “master” database and this is also default database in Azure Synapse Analytics workspace.

By default, Serverless SQL pool is trying to access your blob (incl. Data Lake storage) using your Azure Active Directory identity.
However, you should know that you might experience slower performance with Azure AD Pass-through.

In this tutorial, we run severless query using SAS token without AAD pass-through. (Later I’ll show you how to connect remotely with AAD pass-through.)

First, you should generate SAS token in your storage account. (Click “Shared access signature” menu and create a new SAS token in your storage account.)

For security reasons, you cannot create a new database-scoped credential in default master database.
Thus, run the following script to create a new database in your Synapse workspace.

CREATE DATABASE mydb01

In your script editor, change the database setting (see below) into your new database.

Now let’s create a new credential named “sqlondemand”  (in which, SAS token is used as follows) as follows for accessing your blob in database.

-- Set master key
IF NOT EXISTS (SELECT * FROM sys.symmetric_keys) BEGIN
  declare @pasword nvarchar(400) = CAST(newid() as VARCHAR(400));
  EXEC('CREATE MASTER KEY ENCRYPTION BY PASSWORD = ''' + @pasword + '''')
END

-- Create credential for blob
IF EXISTS
   (SELECT * FROM sys.database_scoped_credentials
   WHERE name = 'sqlondemand')
   DROP DATABASE SCOPED CREDENTIAL [sqlondemand]
GO
CREATE DATABASE SCOPED CREDENTIAL [sqlondemand]
WITH IDENTITY='SHARED ACCESS SIGNATURE',  
SECRET = 'sv=2019-10-10&ss=bfqt&srt...'  --fill your storage SAS here
GO

Create data source for the your storage account.
As you see below, here I’m setting “sqlondemand” as CREDENTIAL, which is generated in the previous script.

CREATE EXTERNAL DATA SOURCE DemoStorage WITH (
  LOCATION = 'https://demostore01.blob.core.windows.net',
  CREDENTIAL = sqlondemand
);
GO

Now you can run serverless query as follows. You can run query using T-SQL (not pyspark or Spark SQL) in Serverless SQL pool.

Here we run query for all parquet files on container01/flight_weather_parquet in the registered data source (DemoStorage) and simply fetch top 10 results.
When you retrieve all data (large data) in Notebook with Spark pool (or Databricks), it will quickly respond, since the data is loaded sequentially with pagination UI. However, in T-SQL platform, it will take a long time if you have retrieved so large data, since it will load all data at once. (Note that my sample data includes approximately 2,000,000 rows. See the record count by COUNT_BIG(*).) Then please filter for the required data as follows. (If the number of columns is also large, please filter for only required columns.)

SELECT TOP 10 *
FROM
  OPENROWSET(
    BULK 'container01/flight_weather_parquet/*.parquet',
    DATA_SOURCE = 'DemoStorage',
    FORMAT='PARQUET'
  ) AS flight_weather
GO

As you know, Synapse Analytics uses a local cache to improve performance and this behavior is the same for Serverless SQL pool. Once cache warming is enabled, the performance will be faster until the cache is invalidated. (Please see the performance by running same query repeatedly.)

In order to reuse the same query, you can also use a view object as follows.
Note that a materialized view is not supported in Serverless SQL pool, because it has no local storage and only metadata objects are stored in Serverless SQL pool. (But you can use CETAS instead. I’ll show you about CETAS later.)

CREATE VIEW FlightBasic
AS SELECT YEAR, MONTH, UNIQUE_CARRIER, ORIGIN, DEST
FROM
  OPENROWSET(
    BULK 'container01/flight_weather_parquet/*.parquet',
    DATA_SOURCE = 'DemoStorage',
    FORMAT='PARQUET'
  ) AS flight_weather
GO
SELECT TOP 10 * FROM FlightBasic
GO

Credentials for Data Source

In above tutorial, we’ve run a query with a SAS token credential.
By default, Serverless SQL pool is trying to access the file using your Azure Active Directory identity. As I mentioned above, Azure AD Pass-through will give you slower performance than SAS token credential. But, sometimes it’s useful, since it’s simple and gives you flexible access control.
Here I show you how to connect with Azure AD pass-through.

In order to connect the storage account with AAD credential, you should assign ‘Storage Blob Data Contributor’ role to yourself in storage account’s resource. (You can add a role assignment by clicking “Access control (IAM)” menu on the blade of your storage account resource. See below.)

Now it’s ready.
You might be logging-in Azure Portal (and Synapse Studio) with your own credential. Thus, there’s no need to create a scoped credential. (Also you don’t need to create a new database.)
You can use the default “master” database and run the following script now.

SELECT TOP 10 *
FROM OPENROWSET(
  BULK 'https://demostore01.blob.core.windows.net/container01/flight_weather_parquet/*.parquet',
  FORMAT='PARQUET'
) AS flight_weather

As you saw above, this AAD pass-through will suit for your brief picking in UI (Synapse Studio or other management tools), since there’s no need to preparing any database objects.

Connect Programmatically

Serverless SQL pool runs on familiar T-SQL and SQL protocol.
Thus you can run serverless query with the same manners for SQL Server or Azure SQL Database.

For connecting from remote, both SQL authentication and Azure AD authentication are supported in Synapse Analytics. (There are also two administrative accounts, server admin and active directory admin.)
When you use AAD authentication for remote connection, the credential for accessing files will also be consistently passed through.

Let’s see a brief example. In this tutorial, we use SQL authentication.

First, please copy Serverless SQL endpoint (formerly, “SQL on-demand endpoint”) for your Synapse Analytics workspace in the resource blade on Azure Portal.

Next create a login object with username and password for SQL authentication. (Run the following script in master database.)

CREATE LOGIN demouser01 WITH PASSWORD = 'P@ssw0rd0001';

Change database setting to your own database (mydb01) in script, and run the following script to create a user in your database (mydb01).

CREATE USER demouser01 FROM LOGIN demouser01;

In order to allow this user to use a previous “sqlondemand” credential, run the following script and grant permissions.

GRANT CONTROL ON DATABASE SCOPED CREDENTIAL::sqlondemand TO demouser01

Now you can build your programming code to connect and run serverless query in Azure Synapse Analytics !

For instance, the following will invoke the serverless query using JDBC in Scala. (Try the following code in Azure Databricks.)

val jdbcHostname = "myws-ondemand.sql.azuresynapse.net"
val jdbcPort = 1433
val jdbcDatabase = "mydb01"
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

import java.util.Properties
val connectionProperties = new Properties()

val jdbcUsername = "demouser01"
val jdbcPassword = "P@ssw0rd0001"
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
connectionProperties.setProperty("Driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")

val pushdown_query = "(SELECT TOP 10 * FROM OPENROWSET(BULK 'container01/flight_weather_parquet/*.parquet', DATA_SOURCE = 'DemoStorage', FORMAT='PARQUET') AS flight_weather) top10_flight_list"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

The following will invoke the serverless query using PowerShell. (Sorry, but it uses classical disconnected-styled data access with DataSet object in .NET.)

$connStr = "Data Source=myws-ondemand.sql.azuresynapse.net;database=mydb01;User ID=demouser01;Password=P@ssw0rd0001"
$conn = New-Object System.Data.SqlClient.SqlConnection $connStr
$conn.Open()

$cmd = New-Object System.Data.SqlClient.SqlCommand
$cmd.Connection = $conn
$cmd.CommandText = "SELECT TOP 10 * FROM OPENROWSET(BULK 'container01/flight_weather_parquet/*.parquet', DATA_SOURCE = 'DemoStorage', FORMAT='PARQUET') AS flight_weather"

$adp = New-Object System.Data.SqlClient.SqlDataAdapter $cmd
$data = New-Object System.Data.DataSet
$adp.Fill($data)

# show result
$data.Tables

$conn.Close()

You can invoke serverless query from various applications, such as, Excel, Power BI, so on and so forth.
Same like other SQL based database, you can also manage database using Azure Data Studio or SQL Server Management Studio.

Elasticity

Serverless SQL pool has a distributed data processing system and the query for blob is elastically executed in backend Synapse computing resources.

Data (in data lake) is organized into cells.
User query is then divided into query fragments (called query tasks) to hash-distribute for data processing.

Computing nodes are also automatically scaled in the backend.
The distributed query processor (DQP) component in Serverless SQL pool may instruct the need for more compute power to adjust to peaks during the workload. If it’s granted, DQP will then re-distribute tasks leveraging the new compute container. (The in-flight tasks in the previous topology continue running after re-balancing.)

Here I don’t describe details about this mechanism, but see “Democratizing the Data Lake with On-Demand Capabilities in Azure Synapse Analytics” in Microsoft Ignite 2019.

(From : “Democratizing the Data Lake with On-Demand Capabilities in Azure Synapse Analytics”, Microsoft Ignite 2019)

Supported File Formats and Concerns

Currently, CSV (including TSV), Apache Parquet, and JSON (semi-structured) format are supported in Serverless SQL pool.

Serverless SQL pool also allows you to query data in Azure Cosmos DB with Azure Synapse Link. In this post, we only focus on data source for unstructured data (flat files) in blob.

For performance perspective, it will be recommended to use Apache Parquet format.
Parquet is a columnar compression format, and it will then speed up performance for extraction. Unnecessary columns will also be skipped in querying parquet.
Furthermore, the Latin1_General_100_BIN2_UTF8 collation will speed up more, because the query will skip the row groups in parquet based on the predicate in WHERE clause.

There exist another reason for using Apache Parquet in Serverless SQL pool.
As you saw in above tutorials, the schema structure for underlying files can be auto-detected (inferred) in Serverless SQL pool, such like spark.read() in Apache Spark. However, currently, this schema inference works only for parquet format.
For instance, when you use CSV, you should specify all columns in schema description by WITH clause in OPENROWSET. (See below.)
In my sample data (see above), there are approximately 60 columns. Imagine that I should explicitly describe each column names and types without schema inference. This will be so cumbersome !

SELECT *
FROM OPENROWSET (
  BULK 'https://demostore01.blob.core.windows.net/container01/csv',
  FORMAT = 'CSV',
  FIELDTERMINATOR =',',
  ROWTERMINATOR = '\n'
)
WITH (
  [country_code] VARCHAR (5) COLLATE Latin1_General_BIN2,
  [country_name] VARCHAR (100) COLLATE Latin1_General_BIN2,
  [year] smallint,
  [population] bigint
) AS [r]
WHERE
  country_name = 'Luxembourg' AND year = 2017
GO

However, it might be better for performance to specify schema explicitly. (Schema inference should be avoided in production, when using Spark API.)

Now you can query data on delta lake with SQL serverless on Azure Synapse Analytics. (Generally available)
Currently there are also certain limitations for reading delta files in Serverless SQL, and see here for details. (The delta lake support in Serverless SQL is still a work-in-progress.)
Another important note is that delta lake is not currently supported in Serverless SQL pool. (Though delta lake is supported in dedicated SQL pool on Synapse Analytics.)
Assume that your team explore (experiment) data in Azure Databricks and provide presentations using Serverless SQL pool in Azure Synapse Analytics. (Databricks says that over 75% users are now using delta lake in Databricks.) In such a case, you cannot handle directly the delta lake format in Serverless SQL pool. (See a feedback . I hope this will be supported in the future.)

The delta lake use parquet format at the bottom. (The delta lake consists of files with parquet, transaction logs, and indexes/states.)
See “Exercise 9 : Delta Lake” in Azure Databricks tutorial.

Supported T-SQL and Concerns

You can use more advanced query, such as, group-by, order-by, querying nested columns, so on and so forth in Serverless SQL pool. You can also export query results to Azure Storage Blob or Azure Data Lake Storage Gen2 using CETAS (CREATE EXTERNAL TABLE AS SELECT) statements, and a part of DDL statements is also supported in Serverless SQL pool.

However, you should remember that not all T-SQL operations are supported in Serverless SQL pool, due to architectural reasons.
For instance, T-SQL in Synapse Analytics dedicated SQL pool supports PREDICT() function (see here), with which you can infer values with a trained ONNX model by azure machine learning. But, this might not be used in Serverless SQL pool, because Serverless SQL pool doesn’t have local storage and model binary (which must be stored in a table) cannot be reachable.
Currently all DML is not supported in Serverless SQL pool, too.

For details about supported T-SQL, see official document “Serverless SQL pool in Azure Synapse Analytics“.

Private Endpoint for Serverless SQL pool (Networking)

Sometimes you might need to connect to Serverless SQL endpoint using a private endpoint for security reasons.
You can also configure a private endpoint for Azure Synapse Serverless SQL pool using “Private endpoint connections” menu in workspace blade on Azure Portal. (See below.)

Once you have configured a private endpoint for your own virtual network (VNet), you can connect this endpoint from connected networks, including on-premise, through the gateway in VNet.

When you create Synapse workspace with managed virtual network enabled, a private endpoint for Serverless SQL pool is automatically generated in this managed network. This endpoint is used only inside this managed network in Synapse workspace. (See “Manage” menu in Synapse Studio as follows.)

Performance Optimization

Unfortunately, Serverless SQL doesn’t support sys.dm_pdw_request_steps (which can be used in Synapse dedicated SQL) for performance optimization.

See here for performance optimization in Synapse dedicated SQL. Serverless SQL supports dynamic management views (DMV), such as, sys.dm_exec_connections, sys.dm_exec_sessions, or sys.dm_exec_requests.

However, there exist several tips and best practices for performance optimization in Synapse Serverless SQL, such as, file size, partitioning, data types, etc.
Here I’ll show you several things to check as follows.

Do not return large data

First, you shouldn’t return a large number of results in Serverless SQL query.
When you want to provide a large number of data for users, apply pagination using OFFSET and FETCH in T-SQL functions. Also, consider to use small size of data types, if possible – such as, varchar rather than nvarchar, smallint rather than int.

Use parquet (or delta lake) as possible

As I have mentioned above, it’s recommended to use parquet format (or delta lake) for the performant query, because of native optimization for such as, columnar compression, column skips, etc.
Parquet is also compatible with partitions, and then works better with filepath() and filename() functions in Synapse Serverless SQL as follows. (These functions are also used in other formats.)

SELECT
  tpepPickupDateTime,
  passengerCount
FROM  
  OPENROWSET(
    BULK 'puYear=*/puMonth=*/*.snappy.parquet',
    DATA_SOURCE = 'YellowTaxi',
    FORMAT='PARQUET'
  ) nyc
WHERE
  nyc.filepath(1) = 2017
  AND nyc.filepath(2) IN (1, 2, 3)
  AND tpepPickupDateTime BETWEEN CAST('1/1/2017' AS datetime) AND CAST('3/31/2017' AS datetime)

You can also explicitly filter the partitions as follows in Synapse Serverless SQL.

SELECT 
  payment_type,  
  fare
FROM OPENROWSET(
  BULK (
    'csv/taxi/yellow_tripdata_2017-01.csv',
    'csv/taxi/yellow_tripdata_2017-1*.csv'
  ),
  ...

);

Serverless SQL has additional performance optimization for querying parquet created in Synapse Analytics Spark pool, because Serverless SQL automatically synchronizes metadata from Synapse Analytics Spark pool. When a table is partitioned, it then targets only the necessary files in a WHERE clause of query without explicit specifying the partitions.

With delta lake format, reading partitions directly (manually) is not necessary any time and you can use a WHERE clause of query for data skipping. (The partition elimination will be done automatically.)
The delta lake might be the best choice for performant query in the future, but currently there are also certain limitations, as I have mentioned above.

Statistics

Like Synapse dedicated SQL pool (see here), statistics are automatically created, when the first query targets the table. The distributed query processor (DQP) generates the appropriate query plans based on cost.
For instance, when you filter data with both column A and column B, DQP determines which column should be used to filter at first, based on the distribution of column data.
Therefore you can optimize performance by manually creating (or updating) statistics, such as, in case when you want to warm up for the first query, or in case when the data is largely updated.

Materialize with CETAS

When you want to materialize the frequently used part of query (such as, including JOIN clause), you can also use CETAS (Create External Table ... As Select ...) statement as follows. By using CETAS, it will export query results to a parquet file in a data lake, and you can speed up in the next query.

CREATE EXTERNAL TABLE FactSale_CETAS
WITH (
  LOCATION = 'FactSale_CETAS/',
  DATA_SOURCE = Storage,
  FILE_FORMAT  = Parquet_file
)  
AS
    SELECT
    Dsr.SalesReasonName
    , COUNT_BIG(distinct Fis.SalesOrderNumber) SalesOrderNumber_COUNT
    , AVG(CAST(SalesAmount AS DECIMAL(38,4))) SalesAmount_AVG
    , AVG(CAST(OrderQuantity AS DECIMAL(38,4))) OrderQuantity_AVG
  FROM ViewFactSale AS FIS 
  INNER JOIN  ViewFactSaleReason AS Fisr 
  ON Fisr.SalesOrderNumber = Fis.SalesOrderNumber
  AND Fisr.SalesOrderLineNumber = Fis.SalesOrderLineNumber
  INNER JOIN ViewDimSales AS Dsr 
  ON Fisr.SalesReasonKey = Dsr.SalesReasonKey
  GROUP BY Fis.SalesTerritoryKey, Fis.OrderDateKey, Dsr.SalesReasonName

When you want to update (refresh) results in CETAS table, you should recreate CETAS table, such as, in Azure Synapse Pipeline.

Please see the best practice guide to get the best performance in Serverless SQL.

Programming for Apache Kafka (Quickstart using Cloud Managed Service)

Here I show you step-by-step tutorials for Apache Kafka with Azure HDInsight.
Azure HDInsight is based on famous Hortonworks (see here) and the 1st party managed Hadoop offering in Azure. (i.e, You can take Azure support service for asking about HDInsight service.)

Apache Kafka is open-source and you can take a benefit for a large number of ecosystems (tools, libraries, etc) like a variety of Kafka connectors. Apache Kafka can also be installed on-premise or on cloud-hosted virtual machines, then you cannot be locked into a specific platform. (If you need, you can run on anywhere like AWS, on-premise, etc.)

In this post, I show you trivial examples for popular components, such as producer, consumer, and streams for your beginning. But once the stream is ingested into Apache Kafka, the real-time streaming data can also be analyzed by Databricks and the aggregated results can be connected into massive transaction database (Cosmos DB if you’re using Azure) or analysis database (SQL DW in Azure). (See here for how to collaborate Spark structured streaming with Kafka.)
You can take a lot of advanced approaches for speed-layer processing using Apache Kafka. (This post only shows the very beginning tutorials for programming.)

Ordinarily you could configure VNet and run applications (such as producers, consumers, or streams) in your secure network (see below. please refer here for VNet settings), but in this post we run all applications in primary headnode (built-in ssh server) for simplifying our example. (As you can see here, Kafka broker’s port is non-public.)

Create Managed Kafka Cluster (HDInsight)

First you must create Azure storage account (blob storage) in Azure Portal. All Kafka meta files such as logs, histories, and settings are stored in this blob storage.

Next you create HDInsight resource in Azure Portal.

In the creation wizard, please set the following properties.

  • Select “Kafka” for Hadoop’s “Cluster Type”. (In this post, we use Kafka version 1.1.0 with HDInsight version 3.6.)
  • Select previous storage (Azure storage account) for metadata storage.
  • Set appropriate node number and machine size in your cluster (Trade-off between capacity and pricing).
    Note that VM pricing on HDInsight is reasonable and almost equivalent with the regular VM pricing (just a little bit higher, approximately x1.1 – x1.2).

Create Topic

Before running applications, you must create Kafka topic which is used for storing messages.

First you must copy Zookeeper server hosts (FQDN) of your Hadoop cluster.
Lanuch Ambari UI (https://{your cluster name}.azurehdinsight.net/) with your web browser and click “Zookeeper” and “Config” tab. You can view and copy Zookeeper’s hosts and port as the following screenshot.

Next login to head node ({your cluster name}-ssh.azurehdinsight.net) using SSH. You can also get the host’s address with “SSH + Cluster login” menu on HDInsight resource blade in Azure Portal. (See below.)

Set your zookeeper hosts into $KAFKAZKHOSTS environment variable by running the following command. (You must change the following zookeeper’s host list into your previously copied host names. Here we assume we have 3 hosts for zookeeper in the following sample.)

export KAFKAZKHOSTS=zk1-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:2181,zk2-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net,zk3-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:2181

Now you create topic named “test” with the following command.

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create \
  --replication-factor 3 \
  --partitions 8 \
  --topic test \
  --zookeeper $KAFKAZKHOSTS \
  --config retention.ms=10000

Here we create with 8 partitions and 3 replication for topic “test”. (See below.) And message’s retention is 10 seconds (10,000 ms).

Run Producer Application

Now we create and run a Kafka producer, which sends messages into your generated topic.

First you must copy the broker hosts (FQDN) of Kafka cluster.
Click “Kafka” in Ambari UI and view broker hosts and port in “Kafka Broker” section.

Now here we create a producer with Python !
Install kafka-python and jupyter with the following command on the head node. (As I described earlier, here we run our producer on head node for only test purpose.)

sudo apt install python3-pip
pip3 install kafka-python
pip3 install jupyter

After installation is completed, run jupyter notebook with the following command.

jupyter notebook

Your notebook might be running on localhost:8888.
Then you should connect this port using SSH tunnel (port forwarding) with your terminal client. The steps how to set SSH tunnel depends on your using terminal client’s software and see the document in your terminal client. For instance, if you’re using PuTTY client in Windows, you can set SSH tunnels by the following screenshot. (If you are in Mac OS, you can use built-in open SSH client.)

After you’ve connected with SSH tunnel (port forwarding) settings, please show notebook (go to http://localhost:8888/?token=...) with your web browser and run the following script.
Note that you should change host list for bootstrap_servers to your previously copied broker servers.

from kafka import KafkaProducer
import time;
import json

id = 0
for _ in range(9):
  message = "{etype} {etime}".format(
    etype='Close' if id%3==2 else 'Open',
    etime=time.time() * 1000)
  id = id + 1

  producer = KafkaProducer(
    bootstrap_servers=['wn0-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092','wn1-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092','wn2-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092'])
  producer.send('test', message.encode('ascii'))
  time.sleep(0.2)

This script sends 9 messages with event’s type and timestamp (such as “Open 1554443425483.7773“) into your topic. (Each message will be marked as “deleted” after 10 seconds by the previous “retention.ms=10000” configuration in your topic.)
Now let’s consume these messages in the next section !

Run Consumer Application

In order to consume messages in topic, run the following script in your notebook.
This application permanently consumes the arriving messages by the following “for” loop.

from kafka import KafkaConsumer
consumer = KafkaConsumer(
  bootstrap_servers=['wn0-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092','wn1-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092','wn2-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092'],
  auto_offset_reset='earliest')
consumer.subscribe(['test'])
for msg in consumer:
  print(msg)

Now let’s see the output results when you send messages with the previous producer !
You will find each 9 messages are arriving in your consumer by sending messages with your producer.

Run Kafka Stream (Time Window Sample)

Finally we window input messages with Kafka stream.
Unfortunately we can’t implement Kafka stream with Python, then here we implement with java. (Here we also run our application on head node for test purpose.)

Before starting, install maven with the following command.

sudo apt install maven

Create maven project with the following command and move to project’s directory.

# create project
mvn archetype:generate \
  -DgroupId=com.example \
  -DartifactId=Streaming-Test \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DinteractiveMode=false
# move to project directory
cd Streaming-Test

Then you can find the file “pom.xml” (project configuration) and directory structures (which includes template source code) in your current directory.

First you must open pom.xml and write as follows.

Here we’re setting dependencies for Kafka stream’s libraries and using maven-shade-plugin in order to contain all dependencies in generated jar file. (Or you must copy any non-standard jars in your classpath. Otherwise you will get java.lang.ClassNotFoundException.)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>Streaming-Test</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>Streaming-Test</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>1.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.1.0</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.3</version>
        <configuration>
          <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.example.App</mainClass>
            </transformer>
          </transformers>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>  
</project>

You can find the template source code in ./src/main/java/com/example/App.java.
Open this source file (App.java) and write the following code. (Note that the host list of BOOTSTRAP_SERVERS_CONFIG must be changed for your broker’s hosts.)

In this code, we’re doing :

  • Reading string inputs from “test” topic
  • Grouping by event’s type (“Open” or “Close”) and event’s time in the message body for each 1 second (1000 ms) time window (Setting record’s count in each window)
  • Writing results as serialized JSON object into “test-count” topic

The message conversion is written by Kafka Streams Domain Specific Language (DSL) (such as map(), groupByKey(), windowedBy(), etc). See the official document “Apache Kafka – Streams DSL” for details.

package com.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;

// For Custom TimestampExtractor
import org.apache.kafka.streams.processor.TimestampExtractor;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;

// For Custom Serializer / Deserializer
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Properties;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class App {

  /**********
    Helpers
  **********/

  static public class WindowedEventData {
    public long windowStart;
    public String eventType;
  }

  static public class EventCountData {
    public String eventType;
    public long eventCount;
  }
  
  // Custom Timestamp Extractor for Windowing
  static public class EventTimestampExtractor implements TimestampExtractor
  {
    @Override
    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
      String[] eventArr = ((String) record.value()).toLowerCase().split("\\W+");
      return Long.parseLong(eventArr[1]);
    }
  }

  // Serializer (POJO -> byte[])
  static public class JsonPOJOSerializer<T> implements Serializer<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    public JsonPOJOSerializer() {
    }
    
    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
      try {
        return objectMapper.writeValueAsBytes(data);
      } catch (Exception e) {
        throw new SerializationException("Error JSON serialize", e);
      }
    }

    @Override
    public void close() {
    }
  }

  // Deserializer (byte[] -> POJO)
  static public class JsonPOJODeserializer<T> implements Deserializer<T> {
    private ObjectMapper objectMapper = new ObjectMapper();

    private Class<T> tClass;

    public JsonPOJODeserializer() {
    }

    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
      tClass = (Class<T>) props.get("JsonPOJOClass");
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
      try {
        return objectMapper.readValue(bytes, tClass);
      } catch (Exception e) {
        throw new SerializationException("Error JSON deserialize", e);
      }
    }

    @Override
    public void close() {
    }
  }

  /**********
    Main
  **********/

  public static void main(final String[] args) throws Exception {
    // property settings for kafka stream
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "wn0-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092,wn1-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092,wn2-kafkat.3fddup3yjcxeljyp35tee4ajwf.bx.internal.cloudapp.net:9092");
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimestampExtractor.class);

    // generate custom serde for serializing POJO to the stream
    Map<String, Object> serdeProps = new HashMap<>();
    
    final Serializer<WindowedEventData> windowedEventSerializer = new JsonPOJOSerializer<>();
    serdeProps.put("JsonPOJOClass", WindowedEventData.class);
    windowedEventSerializer.configure(serdeProps, false);
    final Deserializer<WindowedEventData> windowedEventDeserializer = new JsonPOJODeserializer<>();
    serdeProps.put("JsonPOJOClass", WindowedEventData.class);
    windowedEventDeserializer.configure(serdeProps, false);
    final Serde<WindowedEventData> windowedEventSerde = Serdes.serdeFrom(windowedEventSerializer, windowedEventDeserializer);

    final Serializer<EventCountData> eventCountSerializer = new JsonPOJOSerializer<>();
    serdeProps.put("JsonPOJOClass", EventCountData.class);
    eventCountSerializer.configure(serdeProps, false);
    final Deserializer<EventCountData> eventCountDeserializer = new JsonPOJODeserializer<>();
    serdeProps.put("JsonPOJOClass", EventCountData.class);
    eventCountDeserializer.configure(serdeProps, false);
    final Serde<EventCountData> eventCountSerde = Serdes.serdeFrom(eventCountSerializer, eventCountDeserializer);

    // input value is string such as "{event_type} {event_time}"
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> eventObjs = builder.stream("test", Consumed.with(Serdes.String(), Serdes.String()));
    KStream<WindowedEventData, EventCountData> windowedObjs = eventObjs
      // change key to event_type
      .map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
        @Override
        public KeyValue<String, String> apply(String key, String eventStr) {
          String[] eventArr = eventStr.toLowerCase().split("\\W+");
          return new KeyValue<>(eventArr[0], eventStr);
        }
      })
      // group by event_type
      .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
      // windowing (separated by event_type)
      .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(1)))
      // count elements on each window
      .count()
      // change KTable to KStream
      .toStream()
      // change key and value to WindowedEventData and EventCountData respectively
      .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedEventData, EventCountData>>() {
        @Override
        public KeyValue<WindowedEventData, EventCountData> apply(Windowed<String> key, Long value) {
          WindowedEventData wEventData = new WindowedEventData();
          wEventData.windowStart = key.window().start();
          wEventData.eventType = key.key();
          EventCountData eCountData = new EventCountData();
          eCountData.eventType = key.key();
          eCountData.eventCount = value;          
          return new KeyValue<>(wEventData, eCountData);
        }
      });
    // stream output to "test-count" topic
    windowedObjs.to("test-count", Produced.with(windowedEventSerde, eventCountSerde));

    // stream processing start !
    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();
  }

}

Run the following command to build your application.
If succeeded, you could find ./target/Streaming-Test-1.0-SNAPSHOT.jar.

mvn clean package

Before running this application, create new topic “test-count” (in which the windowed messages are stored) as follows. (Here we also set 10 seconds for message retention.)

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create \
  --replication-factor 3 \
  --partitions 8 \
  --topic test-count \
  --zookeeper $KAFKAZKHOSTS \
  --config retention.ms=10000

Now run your generated Kafka stream application with the following command !
This application waits messages in “test” topic and converts incoming messages. The converted messages are sent into “test-count” topic.

java -jar ./target/Streaming-Test-1.0-SNAPSHOT.jar

Please run your previous producer (send messages) and consume “test-count” topic. (Make sure to change consumer’s source code to subscribe “test-count” topic.)
You can find the following windowed results (grouping results by each 1000 ms) in consumer’s output.

 

MongoDB でドキュメント DB の魅力を 30 分で孊ぶ (蚘事玹介)

今回、@IT さんに蚘事を茉させおもらったので玹介したい。

@IT 特集 : MongoDBで理解する「ドキュメント・デヌタベヌス」の䞖界 (1)
http://www.atmarkit.co.jp/ait/articles/1211/09/news056.html

@IT 特集 : MongoDBで理解する「ドキュメント・デヌタベヌス」の䞖界 (2)http://www.atmarkit.co.jp/ait/articles/1211/30/news040.html

プログラマヌのために、ちゃんず実際のコヌドなどを䜿っお本質から理解 (=実感) しおもらう䞻旚。前半は MongoDB を䟋に曞いお、埌半では、逆に MongoDB 特有なずころがあるので、そこをわかるように曞こうず思う。

なかなか文曞にしお䌝えるのっおむずかしいよね。著者の本音で曞くず、芁は CAP 定理みたいな考え方の本質、぀たり、「KVS ずは、実珟したいある究極のゎヌルのために、今ある䟿利さの䞀郚を削っおでも実珟した実装系」であり、ドキュメント デヌタベヌスはそれをより珟実的、か぀理想的な圢で俯瞰したものずいう点を䌝えたかった。でも、説明や前眮きが長かったり、数孊じみた話ずか難しかったりするず、なかなか䞀般向けの文曞ずしお読んでもらえない。そこで、いろいろ悩んだ末、こんな感じの䟋瀺にしたずいう感じ。

MongoDB を䟋にドキュメント デヌタベヌスを語っお終わるず、今埌は、ドキュメント デヌタベヌスは党郚そうだっお誀解するので、埌半は、埗意の RavenDB をひっぱり出しお (あえお Couch ではなくおゎメン)、「実は、MongoDB は、こんなずころが特別だった」ずいう芳点で、逆の偎面から芋おいこうず思う。(䞡方読めば、RDB な人たちも、「なんか、ドキュメント デヌタベヌスが芋えおきた !」ずなるようにしたい。)

個人的には RavenDB のほうが奜きなんだけど、やっぱ、䞖界でもっずも䜿われおいる MongoDB のほうが読む気がするよね。(RavenDB、US では頻繁に Workshop や BootCamp なんかもおこなわれおたす . . . 人口が倚いっお、うらやたしい)

MongoLab で、Cloud な MongoDB æŽ»ç”š

最近アナりンスがあったが (ここ に日本語で玹介しおくれおいる)、 Azure store で提䟛されおいるサヌビスが、Azure Portal から䜿えるようになった。Azure 䞊で Multi tenant で提䟛されおいる著名なサヌビス (䟋えば、MySQL のクラりド版の ClearDB や、SMTP メヌルの SendGrid など) ずか、 デヌタを提䟛する各皮サヌビスが Azure ポヌタルから䜿えるわけだ。

ここで嬉しいのは、Amazon (AWS) だけでなく、Azure からも MongoLab が扱える点だ。(MongoLab は、MongoDB 版の “Database as a Service” ず思っおもらえば良い。デヌタベヌス サヌバヌの監芖や起動・停止、バックアップなど、SLA を自分で実装するのではなく、契玄ベヌスで利甚する。)
Azure Virtual Machine (Azure VM) で、Linux や Windows で MongoDB を立おおも良いが、 Azure ポヌタルから MongoLab のデヌタベヌスを立お、「MongoDB のサヌビス」ずしお Azure 䞊の C# などからアクセスできるようになる。぀たり、すべお PaaS のプラットフォヌムを䜿っお簡易に利甚できる。(この堎合、もちろん、䜿甚する MongoDB は Azure 䞊の Region で hosting される。)

残念なのが、珟段階 (2012/11/03) の Azure Preview では、アカりント Profile が United States の堎合しかアドオンを利甚できないようで、぀たり、日本で契玄しおいる堎合は、ただ䞊図の画面を拝むこずはできない。(この制玄はいずれなくなるらしいが、少なくずも今は無理だ。)
2012/12/25 远蚘 : ようやく、日本語の Azure Preview Portal からも䜿えるようになった。
そこで、ClearDB なども同じだが、本家の MongoLab のサむト (https://mongolab.com/home) から Azure の MongoLab が䜿えるので、 今回は、その手順で、簡単に䜿い方を玹介したい。(MongoDB そのものに関する现かな手法に぀いおは、専甚のサむトを参照しおほしい。)

Sign-up ず Database の䜜成

たずは、MongoLab のサむト (https://mongolab.com/home) に行っおサむンアップをおこなう。ちなみに、0.5 GB たでの Shared Plan (他のデヌタベヌスず、仮想マシンを共有) なら無償で䜿える。

そしお、デヌタベヌスを䜜成するが、ここで、䞋図の通り、Provider ずしお Azure を遞択しよう。

デヌタベヌス䜜成の際に、Database User の䜜成ずパスワヌドを蚭定するが、このあず説明するように、パスワヌドは URI ずしおも䜿甚されるので、できるだけ @ (アットマヌク) など URI の予玄語は䜿わないほうが良い。

なお、残念ながら、珟時点の MongoLab では、Windows Azure 甚の Dedicated VM Plan はないらしく、 専甚の VM で䜜っお VNET (Virtual Network) で構成するなどの䜿い方は詊せない。(たあ、Amazon 版の Dedicated VM Plan も、ただ Beta なんだけどね)

デヌタベヌスが䜜成されたら、デヌタベヌスが䜿甚しおいる xxxxxxxx.mongolab.com のサヌバヌの名前解決をしおみるず良いだろう。ちゃんず、 zzzzzzz.cloudapp.net ずいう Azure のドメむンで動いおいるこずがわかる。 たた、デヌタベヌスの管理画面 (䞋図) を芋お、䜿甚しおいる MongoDB のバヌゞョンもチェックしおおこう。䞋図の通り、 右䞋に mongod プロセスのバヌゞョンが衚瀺されおいるが、 珟圚は、バヌゞョン 2.0.7 であるこずがわかる。(このため、C# から Linq ずかも問題なく䜿える。)

このあず芋おいくように、MongoLab は、MongoDB のコマンドラむン ナヌティリティ (mongo) を䜿っお管理できるが、 この際、できる限り、同じバヌゞョンの MongoDB を入れおおいたほうが良い。そのためにも、䜿っおいる mongod のバヌゞョンはちゃんず把握しおおこう。

Command Line からの管理

では、実際に、コン゜ヌル (mongo) から管理をおこなっおみよう。 䞊図の画面の通り、MongoDB の接続先のアドレスが衚瀺されおいるので、MongoDB をむンストヌルしお、コマンドラむン ナヌティリティ (mongo) でこのアドレスに接続する。(䞋蚘の dbuser ず dbpassword は、デヌタベヌス䜜成時に远加したナヌザヌ情報だ。たた、server name, database port の郚分もテナントによっお異なるので泚意しおほしい。)

mongo <server name>.mongolab.com:<database port>/<database name>
  -u <dbuser> -p <dbpassword>

い぀ものようにプロンプトが衚瀺されるので、 あずは、普通の MongoDB の䜿い方ず同じだ。 䟋えば、䞋蚘では、珟圚䜿甚しおいるデヌタベヌスの名前を取埗しおいる。

> db.getName();
testdb

䞋蚘では、 Orders コレクションの Name プロパティに Index 䜜成をおこなっおいる。

> db.Orders.ensureIndex({Name:1});

コマンドラむンを䜿う堎合、1 ぀泚意点がある。Azure の癖を知っおいる人には説明の必芁はないず思うが、 Azure は、䞀定時間 Idle 状態の接続は匷制切断される。このため、 コマンドラむン ナヌティリティも長時間ログむンしたたたにせず、面倒かもしれないが、仕事が終わったら、ために exit しよう。(Socket ゚ラヌなど、切断されおいたら、再床、mongo で入りなおす。)

プログラミング蚀語 (Driver) からの接続

プログラミング蚀語からも、い぀ものように利甚できる。(なので、特に説明の必芁はないが、念のため曞いおおこう)
䟋えば、C# から接続する堎合、い぀ものように、10gen の Official C# driver を NuGet から取埗する。

あずはプログラミングをおこなうだけだ。Driver から接続する際の接続文字列も、䞊蚘の管理画面に衚瀺されおいる。以䞋のフォヌマットの接続文字列ずなる。

mongodb://<dbuser>:<dbpassword>@<server name>.mongolab.com:<server port>/<database name>

以䞋のコヌドを蚘述するず、登録や怜玢が問題なくできるこずがわかる。䜕床も繰り返すが、䞀般的な MongoDB の䜿い方ず䜕ら倉わらない。(䞋蚘は、ASP.NET MVC のサンプル コヌドだ。)

using MongoDB.Driver;
using MongoDB.Bson;
using MongoDB.Driver.Linq;

public class Order
{
  public ObjectId _id { get; set; }
  public string Name { get; set; }
  public int Price { get; set; }
  public string Category { get; set; }
}

public ActionResult Test1()
{
  MongoServer server = MongoServer.Create(@"mongodb://<dbuser>:
    <dbpassword>@<server name>.mongolab.com:<server port>
    /<database name>");
  MongoDatabase db = server["<database name>"];
  MongoCollection col = db.GetCollection("Orders");

  // save
  Order obj1 = new Order()
  {
    Name = "test1",
    Price = 100,
    Category = "material"
  };
  col.Insert(obj1);
  Order obj2 = new Order()
  {
    Name = "test2",
    Price = 200,
    Category = "material"
  };
  col.Insert(obj2);
  Order obj3 = new Order()
  {
    Name = "test3",
    Price = 150,
    Category = "food"
  };
  col.Insert(obj3);

  // find (Linq)
  Order sel = (from c in col.AsQueryable()
        where c.Name == "test2"
        select c).FirstOrDefault();

  ViewBag.Message = string.Format("_id:{0}, Price:{1}",
    sel._id, sel.Price);

  return View();
}

あずは、完成したアプリケヌションを Azure に発行すれば、 同じ Region で動䜜する MongoLab を䜿ったクラりド アプリケヌションの出来䞊がりだ。(パフォヌマンスなどを考慮し、できるだけ、MongoLab のデヌタベヌスず同じ Region に発行しおおこう。)

Bulk の凊理などを実行しおもらうずわかるが、たあそれほど違和感ない速床で返っおくる。(ただし、Shared なので、いろいろ状況に応じ倉わっおくるずは思うが。。。)

(远蚘) Dedicated 登堎

「Mongolab : Announcing New MongoDB Instances on Microsoft Azure」にあるように、MongoLab の Dedicate 版が提䟛された。

以前はスケヌラブルな構成がむずかしかったが、この Dedicated 版により、埅望の Replica Set、さらに「Mongolab : Plans & Features」によるず Sharding Cluster も利甚できるようなので、是非 お詊しあれ。(ただ詊しおいない)

RavenDB の Replication, Scale Out (Sharding)

前回 玹介した RavenDB に぀いお、続きを蚘茉しようず思う。

今回は、RavenDB を䜿った Failover ず Scaling に぀いお蚘茉しおおく。RavenDB における Scaling では、Scale Up ではなく、埌述する Shared data set による Scale Out の手法が採甚されおいる。

たずは、本題に入る前に、いく぀か準備をしおおこう。

準備 (RavenDB の HTTP ホスト)

今回は、耇数の RavenDB サヌビスを起動しお実隓するので、RavenDB を HTTP ホストで起動する。(実行ファむルを展開しお、起動する。)
たず、RavenDB の Build を ダりンロヌド しお、ダりンロヌドした zip を展開する。 ぀ぎに、RavenDB のむンストヌルず起動をおこなう。RavenDB をむンストヌル (実行) するには、むンストヌル フォルダに移動しお、以䞋のコマンドを実行する。(/uninstall で簡単にアンむンストヌルできる。)

.\Server\Raven.Server.exe /install

䞊蚘のコマンドを実行するず、䞋図の通り、Windows のサヌビスが登録されお起動する。(次回から、OS の起動の際に、自動で起動する。)

なお、今回、デヌタベヌスに察しおどのような芁求が枡されたか確認するため、 デバッグ モヌドで実行しおみる。 デバッグ モヌドで実行するには、䞊蚘のコマンドではなく、 䞋蚘のコマンドを実行する。
デバッグ モヌドの堎合、䞊蚘のような Windows サヌビスではなく、実行したコン゜ヌル䞊で HTTP のプロセスが実行され、どのような芁求を凊理したかコン゜ヌル䞊に衚瀺されるようになる。

cd .\Server
Raven.Server.exe –debug

なお、既定では、ポヌト 8080 で起動する。(起動しおいるかどうかは、ブラりザヌで、http://localhost:8080/raven/studio.html にアクセスしおみるず良い。) このポヌト番号を倉曎するには、 Server\Raven.Server.exe.config を開いお、䞋蚘の通り Raven/Port を倉曎すれば良い。(耇数の RavenDB を同じマシンで起動するには、このポヌトを倉曎しお起動すれば良い。)

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <add key="Raven/Port" value="8081"/>
    <add key="Raven/DataDir" value="~\Data"/>
    <add key="Raven/AnonymousAccess" value="Get"/>
  </appSettings>
  . . .
</configuration>

RavenDB を管理するには、䞋蚘の URL に接続しお、RavenDB Management Studio ずいうブラりザヌ むンタヌフェむスを利甚するず䟿利だ。(プログラムでも管理できるけど、このほうが超䟿利。)

http://<install server>:<port>/raven/studio.html

䟋によっお、Data フォルダヌ (.\Server\Data) を消すずデヌタは初期化され、ポヌタブルな運甚ができるので、いろいろ䜜っおみおは、デヌタを消しお詊すこずができる。

以降のサンプル コヌドでは RavenDB Client (.NET の API) を䜿っおアクセスをおこなうが、HTTP ホストの堎合、クラむアント偎は、API を䜿わず、HTTP をそのたた呌び出しお、RESTful な方法でデヌタ アクセスができるため、jquery などを䜿っお実装しおも良い。

RavenDB ぞの Plug-in (Bundles)

RavenDB では、プラグむン (Plug-in) 可胜な远加の機胜を bundle ず呌んでおり、 埌述する Replication でも、この bundle を䜿甚する。 bundle の远加は非垞に簡単で、RavenDB を実行するディレクトリの䞋に Plugins フォルダヌを䜜成し、ここに必芁な dll を配眮するだけだ。(このため、 RavenDB を起動する堎所によっお Plugins フォルダヌの堎所が倉わるので泚意。䞀般には、RavenDB の実行モゞュヌルが入っおいる .\Server の䞋に䜜成しおおけば良い。)

むンタヌネット䞊から bundle をダりンロヌドしお Plugins フォルダヌに配眮するためのスクリプトが甚意されおいお、 䟋えば、今回䜿甚する Replication bundle をプラグむンするには、PowerShell を管理者暩限で起動し、䞋蚘の通り実行する。

# enable script execution
Set-ExecutionPolicy Unrestricted

# download Raven.Bundles.Replication.dll
#   to Plunins folder
cd .\Server
..\Raven-GetBundles.ps1 Replication

なお、䞊蚘の RavenDB の zip を展開するず、むンストヌル フォルダヌの Bundles フォルダヌに、既にいく぀かの bundle の dll が入っおいるので、 Plugins サブ フォルダヌを䜜成し、ここに手動でコピヌしおも良い。(ここには、Replication bundle も入っおいる。)

Replication

では、早速、Replication から説明しよう。
以降では、サヌバヌを 8080、8081 の各ポヌトで 2 台起動しおいるず仮定する。

たずは、䜿甚するすべおのサヌバヌで、䞊述した Replication bundle がプラグむン (Plug-in) されおいるこずを確認する。

぀ぎに、Replicatoin の構成をおこなう。Raven DB では、構成情報もドキュメント (Json ドキュメント) ずしお登録するようになっおいお、こうした管理甚のドキュメントは System Document (Sys Doc) ず呌ばれおいる。今回は、Replication 甚のドキュメントを登録する。

ブラりザヌを起動し、8080 のサヌバヌの RavenDB Management Studio (URL は䞊述) を䜿甚しお、 Documents タブを遞択し、[Create a Document] をクリックしお、 ドキュメントを䜜成する。
Key を「Raven/Replication/Destinations」ずしお、䞋蚘の Json ドキュメントを远加する。

{
  "Destinations": [
    {
      "Url": "http://localhost:8081/"
    }
  ]
}

今回は、8080 のサヌバヌが master ずなり、8080 の曎新を 8081 に Replication する。(そのため、8081 のサヌバヌに、䞊蚘の構成は必芁ない。) 合蚈 2 台構成なので 1 台分の slave しか远加しおいないが、 3 台構成以䞊の堎合は、䞊蚘の Json 配列に耇数のマシンを远加すれば良い。

構成を倉曎したら、RavenDB を再起動する。 今回は、デバッグ実行しおいるので、「q」で抜けおから再床起動すれば良い。もしサヌビスずしおむンストヌル (Windows サヌビスずしお起動) しおいる堎合は、 管理者暩限で以䞋のコマンドを実行すれば再起動できる。

Raven.Server.exe /restart

以䞊で、Replication の蚭定は完了だ。

では、実際にデヌタを曎新しお確認しおみる。(8080 ず 8081 のサヌバヌをデバッグ モヌドで起動しおおこう。)

今回は、前回 ず違っお HTTP ホストの RavenDB を䜿うので、 クラむアント偎は RavenDB Client のみで充分だ。(NuGet からむンストヌルできる。なお、前述の通り、jquery などを䜿っおアクセスしおも良い。)
RavenDB Client を䜿っお、䞋蚘の通りデヌタを登録しおみる。(なお、前回のように、接続先の情報を .config に蚘述しおおいおも良い。)

using Raven.Client;
using Raven.Client.Document;

static void Main(string[] args)
{
  using (var ds = new DocumentStore
  {
    Url = "http://localhost:8080/"
  })
  {
    ds.Initialize();

    using (IDocumentSession session
        = ds.OpenSession())
    {
      Order o1 = new Order
      {
        Name = "test1",
        Price = 100,
        Category = "material"
      };
      session.Store(o1);
      session.SaveChanges();
      //session.Dispose();
    }

    Console.WriteLine("Done !");
    Console.ReadLine();
    //ds.Dispose();
  }
}

public class Order
{
  public string Name { get; set; }
  public int Price { get; set; }
  public string Category { get; set; }
}

8080 ず 8081 のコン゜ヌルを芋ながら、サヌバヌ䞊でどんな芁求が凊理されおいるか確認すれば䞀目瞭然だ。8080 のサヌバヌに POST/PUT の芁求がおこなわれるず同時に、8081 のサヌバヌにも POST 芁求が入る。その結果、Management Studio で芋るず、双方に、同じデヌタが登録されおいるのが確認できる。

参考たでに、8081 のサヌバヌ䞊 (slave 侊) のコン゜ヌル結果を出力するず䞋蚘の通りになる。

c:\Demo\RavenDB-8081\Server> Raven.Server.exe --debug
Raven is ready to process requests. Build 960, Version 1.0.0 / bce65ae
Server started in 3,298 ms
Data directory: c:\Demo\RavenDB-8081\Server\Data
HostName: <any> Port: 8081, Storage: Esent
Server Url: http://machine01:8081/
Available commands: cls, reset, gc, q

Request # 1: GET  - 614 ms - <default> - 200 - /replication/lastEtag?from=http%3A%2F%2Fmachine01%3A8080%2F&currentEtag=00000000-0000-0300-0000-000000000002
Request # 2: POST - 278 ms - <default> - 200 - /replication/replicateDocs?from=http%3A%2F%2Fmachine01%3A8080%2F
Request # 3: GET  - 266 ms - <default> - 200 - /replication/lastEtag?from=http%3A%2F%2Fmachine01%3A8080%2F&currentEtag=00000000-0000-0300-0000-000000000003
Request # 4: GET  -   5 ms - <default> - 200 - /replication/lastEtag?from=http%3A%2F%2Fmachine01%3A8080%2F&currentEtag=00000000-0000-0300-0000-000000000003
Request # 5: GET  -   3 ms - <default> - 200 - /replication/lastEtag?from=http%3A%2F%2Fmachine01%3A8080%2F&currentEtag=00000000-0000-0300-0000-000000000003

今回はテストのため 2 台ずしおいるが、単䞀のサヌバヌ䞊で曎新がおこなわれるず、ファヌムのすべおのサヌバヌに曎新のバッチが送信される。(この凊理は、background で䞊列に凊理される。)

たた、今回は、8080 のサヌバヌを master ずしお、このサヌバヌに発生した曎新凊理ず同期する 8081 のサヌバヌ (slave) を構成したが、 master – master の構成も可胜だ。 この際、もし、サヌバヌ間の曎新が Conflict した堎合は、䞋蚘のドキュメントの通り凊理すれば良い。

[RavenDB] Dealing with replication conflicts
http://ravendb.net/docs/server/bundles/replicationconflicts

たた、Failover の仕組みも提䟛しおいる。
䟋えば、クラむアントを䞋蚘の通り䜜成し、最初の ReadLine() の箇所で 8080 のサヌバヌを shutdown しおみる。するず、以降の凊理で䟋倖は発生せず、デヌタは、ちゃんず 8081 から取埗される。(ただし、䞋蚘の Initialize() によっお Replication Server の情報を読み蟌むので、Initialize の際に 8080 のサヌバヌが起動しおいなければならない。)

using (var ds = new DocumentStore
{
  Url = "http://localhost:8080/"
})
{
  ds.Initialize();

  Console.ReadLine(); // Wait and shutdown 8080 !!

  using (IDocumentSession session
      = ds.OpenSession())
  {
    Order item = session.Load<Order>("orders/1");
    Console.WriteLine("Price is ${0}.", item.Price);
  }

}

たた、以䞋の通り蚘述するず、デヌタ取埗の際、8080 ず 8081 のサヌバヌに亀互に GET 芁求が送信される。(この手法は、read striping ず呌ばれおいる。)

using (var ds = new DocumentStore
{
  Url = "http://localhost:8080/",
  Conventions =
  {
    FailoverBehavior = FailoverBehavior.ReadFromAllServers
  }
})
{
  . . .

なお、master-master の堎合は、䞊蚘で FailoverBehavior.AllowReadsFromSecondariesAndWritesToSecondaries を指定するず良い。

たた、途䞭たで Replication を䜿わず実行し、 途䞭から Replication を構成した堎合など、 サヌバヌ間でデヌタの盞違 (矛盟) が生じるように思われるかもしれないが、 ちゃんず、最初の同期凊理でデヌタを同䞀に揃えおくれる。(初回に、slave に察し、同期するデヌタの回数分、POST 芁求が送信される。)

ちなみに、RavenDB のドキュメント を芋るず、 PutIndex、DeleteIndex は Replication でサポヌトされおいないようなので泚意しおほしい。 なお、圓然だが、Query をおこなうず、ちゃんず dynamic index は䜜成される。(ただし、その Index は Replication されない。察象のサヌバヌぞ Query をおこなう床に、そのサヌバヌごずに Index が䜜成される。)

Sharding (Scale out using shared data set)

さお、いよいよ、RavenDB の Scale out の話に入りたい。

MongoDB 同様、RavenDB にも Sharding が提䟛されおいる。(Sharding ずは、方針に沿っお、デヌタを耇数のサヌバヌに分散するこず。) 最新の RavenDB では、Sharding 環境で Indexing や Linq Query もサポヌトされおいる。
以降で、ちょっず詳しく芋おみよう。

たず、単玔に、デヌタを任意に (おたかせで) 分散させる Blind Sharding を芋おみよう。 これも、超簡単 ! 䞋蚘の通り、DocumentStore の代わりに、ShardedDocumentStore ずいうオブゞェクトを䜿えば完了だ。(bundle も䞍芁。)
䞋蚘で、o1、o2 は、それぞれ別々のサヌバヌに振り分けられる。
぀たり、Sharding は、サヌバヌ偎で実行されおいるのではなく、すべおクラむアント偎でおこなわれる。

using Raven.Client.Shard;

var stores = new Dictionary<string, IDocumentStore>
{
  {
    "server1",
    new DocumentStore {Url = "http://localhost:8080"}
  },
  {
    "server2",
    new DocumentStore {Url = "http://localhost:8081"}
  }
};

var shrd = new ShardStrategy(stores);

using (var ds =
  new ShardedDocumentStore(shrd))
{
  ds.Initialize();

  using (IDocumentSession session
      = ds.OpenSession())
  {
    Order o1 = new Order
    {
      Name = "test1",
      Price = 100,
      Category = "material"
    };
    session.Store(o1);
    session.SaveChanges();

    Order o2 = new Order
    {
      Name = "test2",
      Price = 200,
      Category = "material"
    };
    session.Store(o2);
    session.SaveChanges();
  }
}

぀ぎに、方針 (ポリシヌ) に沿っお Sharding をおこなう Smart Sharding をプログラミングする。
たず、簡単な䟋ずしお、Order の Category ごずに、デヌタを別々のサヌバヌにわけお配眮するサンプル コヌドを䞋蚘に蚘茉する。
䞋蚘の堎合、o1、o3 は 8080 のサヌバヌに配眮され、o2 のみ 8081 のサヌバヌに配眮される。

var stores = new Dictionary<string, IDocumentStore>
{
  {
    "material",
    new DocumentStore {Url = "http://localhost:8080"}
  },
  {
    "food",
    new DocumentStore {Url = "http://localhost:8081"}
  }
};
var shrd = new ShardStrategy(stores)
  .ShardingOn<Order>(o => o.Category);
using (var ds =
  new ShardedDocumentStore(shrd))
{
  ds.Initialize();

  using (IDocumentSession session
      = ds.OpenSession())
  {
    Order o1 = new Order
    {
      Name = "ball pointpen",
      Price = 100,
      Category = "material"
    };
    session.Store(o1);
    session.SaveChanges();

    Order o2 = new Order
    {
      Name = "ice cream",
      Price = 150,
      Category = "food"
    };
    session.Store(o2);
    session.SaveChanges();

    Order o3 = new Order
    {
      Name = "notebook",
      Price = 80,
      Category = "material"
    };
    session.Store(o3);
    session.SaveChanges();
  }
}

さらに、おもしろい実隓をしおみよう。
䟋えば、Order ず Product を䞋蚘の通り定矩し、Order.Product に Product の Id を蚭定する。(前回 説明したように、RavenDB では、ドキュメントに、垞に、Id が付䞎される。)

public class Order
{
  public string Id { get; set; }
  public string Product { get; set; }
  public int Count { get; set; }
}

public class Product
{
  public string Id { get; set; }
  public string Name { get; set; }
  public int Price { get; set; }
  public string Category { get; set; }
}

そしお、䞋蚘の通り実行しおみる。 するず、p1、p3、o1 は 8080 のサヌバヌに保存され、p2、o2 は 8081 のサヌバヌに保存される。

var stores = new Dictionary<string, IDocumentStore>
{
  {
    "material",
    new DocumentStore {Url = "http://localhost:8080"}
  },
  {
    "food",
    new DocumentStore {Url = "http://localhost:8081"}
  }
};
var shrd = new ShardStrategy(stores)
  .ShardingOn<Product>(p => p.Category)
  .ShardingOn<Order>(o => o.Product);
using (var ds =
  new ShardedDocumentStore(shrd))
{
  ds.Initialize();

  using (IDocumentSession session
      = ds.OpenSession())
  {
    // Create Product
    Product p1 = new Product
    {
      Name = "ball pointpen",
      Price = 100,
      Category = "material"
    };
    session.Store(p1);
    Product p2 = new Product
    {
      Name = "ice cream",
      Price = 150,
      Category = "food"
    };
    session.Store(p2);
    Product p3 = new Product
    {
      Name = "notebook",
      Price = 80,
      Category = "material"
    };
    session.Store(p3);
    session.SaveChanges();

    // Create Order
    Order o1 = new Order
    {
      Product = p3.Id,
      Count = 3
    };
    session.Store(o1);
    Order o2 = new Order
    {
      Product = p2.Id,
      Count = 2
    };
    session.Store(o2);
    session.SaveChanges();
  }
}

さお、勘の良いプログラマヌなら そろそろ気づいたず思うが、Smart Sharding では、読み取り (怜玢) ず連携するこずで効果を発揮する。実際に、その効果を芋おみよう。

䟋えば、Load をおこなっおみる。
たず、予備知識ずしお、Sharding を䜿甚した堎合、Id の既定倀は、前回 説明した <class name>/<sequence number> ではなく、<server name>/<class name>/<sequence number> ずなるので泚意しおほしい。
このため、䟋えば、p3 を取埗する堎合は、䞋蚘のプログラム コヌドになる。

Product item = session.Load<Product>("material/products/3");

さお、この際、サヌバヌにどのような凊理が枡されたか、サヌバヌ䞊のコン゜ヌル りィンドり (䞊述した debug 実行のコン゜ヌル) で確認しおみおほしい。実は、この GET 芁求は 8080 のサヌバヌにしか飛ばない。Id から、このオブゞェクトが 8080 のサヌバヌにあるこずがわかっおいるためだ。

では、䞋蚘はどうだろう ? この Load でも、Order オブゞェクトず、関連する Product オブゞェクトは 8080 のサヌバヌにしかないため、8081 ぞの問い合わせ (GET) はおこなわれない。䞋蚘の Include メ゜ッドによっお、8080 のサヌバヌぞの 1 回の HTTP GET のみで結果を取埗する。

// Prefetch Product object using Include
Order order = session.Include<Order>(o => o.Product)
  .Load("material/orders/1");
// This dosen't ask to server !
Product product = session.Load<Product>(order.Product);
Console.WriteLine("Price is ${0}. Count is {1}.",
  product.Price, order.Count);

Query でも同様だ。䞋蚘のサンプル コヌドの堎合、怜玢や Index 䜜成は 8080 のサヌバヌでしかおこなわれず、䜙蚈なラりンドトリップは発生しない。

var q = from c in session.Query<Product>()
    where c.Category == "material"
    select c;
foreach (var item in q)
{
  Console.WriteLine("{0} : {1}",
    item.Name,
    item.Price);
}

and / or など耇雑な query をおこなった堎合も同様だ。今回は 2 台のサヌバヌだけで確認しおいるが、Linq Query の構文を解析し、m 台あるうちの n 台から結果を取埗すれば良いず刀断されるず、 RavenDB Clinet は、その n 台のみに怜玢をおこない、取埗したデヌタを結合しお返しおくる。(or を䜿った堎合、など。)

たさに、Smart ! (クラむアント ラむブラリヌが、賢いっおこずだね)

䞀方、䞋蚘の Query は、8080、8081 の双方のサヌバヌで実行されお、答えを返しおくる。

var q = from c in session.Query<Product>()
    where c.Category == "notebook"
    select c;
foreach (var item in q)
{
  Console.WriteLine("{0} : {1}",
    item.Name,
    item.Price);
}

たた、プログラマヌなら、orderby を䜿った堎合の動きが気になるよね。サヌバヌ A ずサヌバヌ B で Index を䜿っお Sort 結果を取埗し、 最埌に、これらを結合するず仮定するず、その結果、順番がばらばらになるような気がする。しかし、実際に、このような状態を䜜っお orderby を実行しおみるず、 orderby の結果もちゃんず正しい答えが返っおくる。
きっず、ここは、RavenDB Client が頑匵っちゃっおいるのかもしれない。(もしそうだずするず、この点は、デヌタ数が倚い堎合に芁泚意ずいうこずだ。)

あず、Sharding Strategy を運甚途䞭で倉曎するず、 その倉曎内容によっおは、圓然、怜玢結果はおかしくなっおしたうので泚意しおほしい。(䟋えば、䞊蚘で、”material” ず “food” のサヌバヌを入れ替えるず、Query の際に誀った結果が返っおくるこずになる。) もちろん、それたで登録されおいたデヌタず矛盟しない倉曎であれば問題ない。開始時点で、ちゃんず以降の運甚も怜蚎に入れお䜿ったほうが良さそうだ。

あず、Sharding ず Replication の䜵甚も可胜だ。

いろいろ解説しはじめるずきりがないが、これだけの事が、かなり安䟡に実珟できる点は魅力的だ。(「費甚」ずいう意味ではなく、構成の理解やセットアップなど、トヌタルの意味で「安䟡」ず曞いおいる。)
他の RDB などでも、いたどき Replication や Distribution の仕組みくらいは持っおいるが、この手のデヌタベヌスの良い点は、速床はもちろんだが、ずにかくポヌタブルで、「わかりやすい」ずいう点だろう。動きがわかりやすいず、そのシステム固有の 蚳のわからない動きに悩たされるこずも少ないし、刀断も早い。

RavenDB の特城ず䜿い方 (プログラミング)

最近 はたっおいる RavenDB に぀いお曞いおおこうず思う。

RavenDB は、軜量なドキュメント デヌタベヌス (NoSQL) で、MongoDB などを䜿っおいる人は、䌌た抂念のものず思っおもらっお良い。(もちろん、现かな点は違うけど。) 構造䞊、「速い」ずいうのはもちろんだが、その特城ずしお、.NET ずの芪和性が良く、.NET アプリケヌションに埋め蟌めるずいう点がある。(ASP.NET MVC の開発者にずっおは、超うれしい。)
たた、実際に䜿っおみるずわかるが、そうした簡単な衚珟では足りないくらい、さたざたなメリットず特城があるので、今日は、その蟺りを、䌝えられる限り曞いおおこうず思う。(海倖の䞀郚のマニア達の間では、流行っおるみたいだ。。。)

むンストヌルずデヌタベヌスの準備

たず、構成をちゃんず理解しおもらうために、むンストヌル方法から曞いおおこうず思う。

RavenDB には、.NET のアプリケヌション (ASP.NET 含む) に埋め蟌んで䜿う方法ず、 HTTP にホスト (IIS に配眮, もしくは サヌバヌのバむナリを起動) しお䜿甚する方法がある。(HTTP ホストに぀いおは、「RavenDB の Replication, Scale Out」に蚘茉した。なお、Multiple Database の構成など、HTTP にホストしないず䜿えないものもあるので芁泚意。)
今回は、アプリケヌションに埋め蟌んで䜿甚するが、この堎合は、NuGet からむンストヌルできる。

install-package RavenDB.Embedded

Attempting to resolve dependency 'RavenDB.Database (= 1.0.888)'.
Attempting to resolve dependency 'Newtonsoft.Json (= 4.0.8)'.
Attempting to resolve dependency 'NLog (= 2.0.0.2000)'.
Attempting to resolve dependency 'RavenDB.Client (= 1.0.888)'.
Successfully installed 'Newtonsoft.Json 4.0.8'.
Successfully installed 'NLog 2.0.0.2000'.
Successfully installed 'RavenDB.Database 1.0.888'.
Successfully installed 'RavenDB.Client 1.0.888'.
Successfully installed 'RavenDB.Embedded 1.0.888'.
Successfully added 'Newtonsoft.Json 4.0.8' to ConsoleApplication5.
Successfully added 'NLog 2.0.0.2000' to ConsoleApplication5.
Successfully added 'RavenDB.Database 1.0.888' to ConsoleApplication5.
Successfully added 'RavenDB.Client 1.0.888' to ConsoleApplication5.
Successfully added 'RavenDB.Embedded 1.0.888' to ConsoleApplication5.

この段階では、ただデヌタベヌス ファむル等は生成されず、必芁な dll の配眮ず参照蚭定が远加されるのみだ。

なお、先日リリヌスされた ASP.NET MVC 4 RC 版ず䞀緒に䜿甚する堎合は、ただ RavenDB で䜿甚しおいるラむブラリヌのバヌゞョンず競合するため (Microsoft.AspNet.WebApi パッケヌゞで䜿甚しおいるラむブラリヌのバヌゞョンず競合するため)、䞋蚘の通り、PreRelease 版の RavenDB を入れおおく。(2012 幎 06 月珟圚)

get-package -l -filter RavenDB.Embedded -pre

Id                             Version
--                             -------
RavenDB.Embedded               1.2.2010-Unstable

install-package RavenDB.Embedded -version 1.2.2010-Unstable -pre

ここでは説明しないが、もちろん、Import、Export、Backup など、デヌタベヌス管理における䞀般的なタスクも実行できる。(HTTP ホストの堎合、RavenDB Management Studio を䜿っお簡単に実行できる。)

基本的な䜿い方

「軜量」(light-weight) ず蚘茉したが、では、どんな感じで light なのか芋おみよう。

RavenDB を䜿甚するには、RavenDB Client (Client 甚のラむブラリヌ) を䜿甚するが、 IIS にホストしおいる堎合は、REST (Web API, HTTP API) ずしお䜿甚するこずもできる。(.NET プログラマヌの方は、ちょうど、WCF Data Services のような䜿い方だず思っお良い。)

デヌタベヌス (デヌタ、むンデクス等) はファむルずしお䜜成されるが、これらの必芁なファむルは実行時に䜜成される。(䞀床䜜成されたら、以降は、䜜成されたデヌタベヌス ファむルを䜿甚する。)
デヌタベヌス甚のディレクトリのみを準備しおおき、このディレクトリを指定しお RavenDB を初期化するこずで、必芁なデヌタベヌス ファむルが䜜成される。特にそれ以䞊の特別な準備は䞍芁で、Entity Framework、Hibernate などの OR Mapper (ORM) 同様、簡単なコヌドでデヌタベヌスの Provisioning が完了する。

たた、デヌタ構造やスキヌマ定矩も䞍芁で、.NET むンスタンスを保存するず、内郚で Json 圢匏にシリアラむズされるため、シリアラむズ可胜なオブゞェクトであれば䜕でも保存できる。

䟋えば、䞋蚘は、RavenDB Client を䜿っお、Order クラスのむンスタンスを保存する簡単なサンプル コヌドだ。(今回は、アプリケヌションの実行ディレクトリの䞋に「Database」ずいう名前のサブ ディレクトリを䜜成し、ここを䜿甚する。)

using Raven.Client;
using Raven.Client.Embedded;

static void Main(string[] args)
{
  using (IDocumentStore instance =
    new EmbeddableDocumentStore
    {
      DataDirectory=@"~\Database"
    })
  {
    // all db files are created, here !
    instance.Initialize();

    using (IDocumentSession session
      = instance.OpenSession())
    {
      Order o1 = new Order
      {
        Name = "test1",
        Price = 100,
        Category = "material"
      };
      session.Store(o1);
      session.SaveChanges();
      //session.Dispose();
    }

    Console.WriteLine("Done !");
    Console.ReadLine();
    //instance.Dispose();
  }
}

public class Order
{
  public string Name { get; set; }
  public int Price { get; set; }
  public string Category { get; set; }
}

䞊蚘の Initialize() メ゜ッドの実行によっお、デヌタベヌス関連の䞀連のファむルが䜜成される。(これらのファむルを消せば、デヌタベヌスは、たた初期の状態で再䜜成される。いたっおシンプルだ。)

接続の際は、䞋蚘のように、構成ファむル (.config) に接続情報を蚘述しおも良い。

<?xml version="1.0"?>
<configuration>
  <connectionStrings>
    <add name="RavenDB" connectionString="DataDir = ~\Database" />
  </connectionStrings>
</configuration>

䞊蚘の接続文字列 (RavenDB) を䜿っおデヌタベヌスを初期化する際は、䞋蚘の通り蚘述する。

static void Main(string[] args)
{
  using (IDocumentStore instance =
    new EmbeddableDocumentStore
    {
      ConnectionStringName = "RavenDB"
    })
  {
    . . .

なお、RavenDb を HTTP にホスト (配眮) しおいる堎合は、䞋蚘のように DocumentStore を䜿っお接続する。(この堎合も、同様に、接続先の情報を .config に蚘述しおも良い。)

using Raven.Client.Document;

var ds = new DocumentStore
  { Url = "http://testsite/Raven" };

Key-Value

RavenDB は NoSQL であり、Key-Value を採甚しおいる。デヌタは Json フォヌマットで保存されるが、内郚で、自動的に、Id (identifier) が Key ずしお付䞎される。(これが、取埗の際の Key ずしお䜿甚される。)
぀たり、デヌタは、Id (identifier) ずいう文字列 (Key) ず Json ドキュメント (Value) の Key-Value ずしお栌玍されおいる。
䟋えば、䞋蚘は、Id が「orders/1」のむンスタンスを取埗しおいる。(圓然だが、速い。)

Order res;
using (IDocumentSession session = instance.OpenSession())
{
  res = session.Load<Order>("orders/1");
}

Id は、プログラマヌが指定しない堎合、自動的に <class name>/<sequence number> ずなる。<class name> には、クラス名の小文字の名前を耇数圢にした名前が蚭定される。䟋えば、「Order」クラスなら「orders」ずなる。぀たり、䞊述したアむテムの新芏登録のコヌドの堎合、Id (Key) は「orders/1」ずなる。
たた、Id は、プログラマヌが明瀺的に指定するこずも可胜だ。

session.Store(o1);    // id is "orders/1"
session.Store(o1, "id1"); // id is "id1"

たた、シリアラむズ察象の .NET クラスに Id ずいう名前の文字列型のメンバヌがある堎合、これが自動的に Id ずしお Key に割り圓おられる。 䟋えば、䞋蚘のコヌドの堎合、Id が同䞀のため、o2 によっお、o1 が倉曎 (update) される。(぀たり、登録されるデヌタは 1 件のみ。)

static void Main(string[] args)
{
  ...

  using (IDocumentSession session
    = ds.OpenSession())
  {
    Order o1 = new Order
    {
      Id = "id1",
      Name = "test1",
      Price = 100,
      Category = "material"
    };
    session.Store(o1);
    session.SaveChanges();
  }

  using (IDocumentSession session
    = ds.OpenSession())
  {
    Order o2 = new Order
    {
      Id = "id1",
      Name = "test1",
      Price = 200,
      Category = "food"
    };
    session.Store(o2);
    session.SaveChanges();
  }
  ...
}

public class Order
{
  public string Id { get; set; }
  public string Name { get; set; }
  public int Price { get; set; }
  public string Category { get; set; }
}

しかし、䞋蚘のコヌドでは、Id プロパティが指定されおいないため、o1、o2 の 2 件のデヌタが䜜成 (Create) される。Id には、「orders/1」、「orders/2」が付䞎される。

static void Main(string[] args)
{
  ...

  using (IDocumentSession session
    = ds.OpenSession())
  {
    Order o1 = new Order
    {
      Name = "test1",
      Price = 100,
      Category = "material"
    };
    session.Store(o1);
    session.SaveChanges();
  }

  using (IDocumentSession session
    = ds.OpenSession())
  {
    Order o2 = new Order
    {
      Name = "test1",
      Price = 200,
      Category = "food"
    };
    session.Store(o2);
    session.SaveChanges();
  }
  ...
}

public class Order
{
  public string Name { get; set; }
  public int Price { get; set; }
  public string Category { get; set; }
}

なお、クラスの Id プロパティ (メンバヌ) を空にしお登録 (Store) するず、登録時に、自動的に割り圓おられた Id がむンスタンス (むンスタンスの Id メンバヌ) に蚭定される。

この Id だが、RavenDB が IIS にホストされおいる堎合は Uri の断片そのものなのでわかりやすいが、 Web アプリケヌションに埋め蟌む堎合には、Query String で䜿甚する際に邪魔になるこずがある。䟋えば、orders/1 ずいう Id のアむテムを GET する堎合、䞋蚘の URL ぱラヌずなっおしたうだろう。

GET /webapplication/Order/?id=orders/1

この堎合、MSDN マガゞン に曞かれおいるように、 IdentityPartsSeparator プロパティを䜿っお、Id で䜿甚する Separator を倉曎できる。

using (IDocumentStore instance =
  new EmbeddableDocumentStore
  {
    ConnectionStringName = "RavenDB"
  })
{
  instance.Conventions.IdentityPartsSeparator = "-";
  instance.Initialize();
  ...

Query ず Index

この手のデヌタベヌスで、い぀も困るのが怜玢だ。
Key-Value の堎合、構造䞊、䜕かず融通が効かないこずが倚いが、RavenDB では、高床な Index 管理をサポヌトするこずで、こうした pain を回避しおいる。Index ず蚀っおも、RDB の Index ずは考え方が異っおいるので、以䞋に蚘茉する。(ドキュメント デヌタベヌスなので、党文怜玢甚の Index の抂念だ。)

たず、RavenDB では、Linq の Query を䜿っお以䞋のように曞ける。

var test = from c in session.Query<Order>()
      where c.Name == "test1"
      select c;
foreach (var item in test)
{
  Console.WriteLine("{0} : {1}", item.Name, item.Price);
}

䞊蚘では Key (Id) が䜿えないため、登録されおいるデヌタの Name を 1 ぀ 1 ぀調べお答えを返しおいるように思えるが、 この手のデヌタベヌスでは、「デヌタ党件を調べる」ずいうこずはしない。(厳密には、LuceneQuery ずいう Index ファむルをそのたた怜玢するず、党件怜査を実行できおしたうが。。。)
では、どのように動いおいるのだろうか ?

実は、䞊蚘のような怜玢をおこなうず、内郚で、動的に Index が䜜成されお、その Index が䜿甚される。(これは、dynamic index ず呌ばれおいる。ちなみに、前述の Id で怜玢した堎合であっおも、䞊蚘のような Linq Query を䜿うず、それに応じた Index が必ず䜜成される。) たた、䜜成された Index は、しばらく残り、同じ Index を䜿甚する別の怜玢がおこなわれるず、その Index が再利甚される。最終的に、䜕床も同じ Index を䜿甚するず、RavenDB によっお、dynamics index は氞続化される。(以降、ずっず残る。)
぀たり、アプリケヌション偎で同じ䜿い方をしおいるず、そのアプリケヌションに最適化された Index が自動的に生成され、氞続化されお、䜿甚されるようになる。

こうした仕組みのため、dynamic index を䜿う堎合は、初回の怜玢のみ遅くなるので泚意が必芁。たた、こうした仕組みのため、EUC による動的怜玢など、郜床、怜玢文 (SQL) を動的生成するようなアプリケヌションにも向いおいない。

なお、䜜成された Index は、䞋蚘のコマンドで取埗できるので、芳察しおみるずわかる。(dynamic index が permanent に昇栌されたかどうかも、この名前で確認できる。) Index の明瀺的な削陀も可胜だ。

string[] indexes = instance.DatabaseCommands.GetIndexNames(
  0,
  int.MaxValue);
foreach (var indexname in indexes)
{
  Console.WriteLine("Index : {0}", indexname);
}

さお、ここたでの説明だず、RDB の Index を想像する人も倚いず思うが、実は党然違う。
以䞋に、この Index の正䜓をもう少し现かく芋おみよう。

Index は、䞊蚘 (dynamic index) のように動的に䜜成するこずもできるが、プログラマヌ自身が Index を䜜成し、これを䜿甚できる。(この Index を static index ず呌ぶ。) このため、以降では、この方法で Index を䜜成しお芋おみよう。

䞊蚘ず同じ Name を䜿った怜玢を、static index を䜿っお曞くず、以䞋の通りになる。

using Raven.Client.Indexes;

static void Main(string[] args)
{
  ...

  // all db files are created, here !
  instance.Initialize();

  // create index !
  instance.DatabaseCommands.PutIndex(
    "Orders/ByName",
    new IndexDefinitionBuilder<Order>
    {
      Map = (orders => from order in orders
                select new { order.Name })
    });

  . . .

  using (IDocumentSession session = instance.OpenSession())
  {
    // using static index !
    var test = from c in
            session.Query<Order>("Orders/ByName")
          where c.Name == "test1"
          select c;
    foreach (var item in test)
    {
      Console.WriteLine("{0} : {1}", item.Name, item.Price);
    }
  }
  . . .

}

ちなみに、䞊蚘で、正しい怜玢結果にならない堎合は、数秒埅機しおから怜玢 (Query) しおみおほしい。理由は埌述する。

この Index のメカニズムを簡単に解説する。RavenDB の Index は、実は、内郚では、党文怜玢 (Full Text Search) ゚ンゞンの Lucene.Net が䜿甚されおいる。䞊蚘の Map 関数により、Lucene.NET に登録する Document のフィヌルドが蚭定される。(このフィヌルドを䜿っお、怜玢可胜になる。) そしお、怜玢の際には、Lucene.Net に登録されおいる Index を䜿っお Document を怜玢する。
既定では、Map 関数 (䞊蚘) で抜出された文字列型のフィヌルドをそのたた Token ずしお登録するが、いわゆる党文怜玢゚ンゞンのメリットを掻甚しお、Token 解析を別のものに倉曎するこずも可胜だ。 䟋えば、䞋蚘では、Name を空癜 (whitespace) で Token 分割しお Lucene.Net に登録し、 この分割された Token を䜿っお怜玢 (Query) 可胜にしおいる。この堎合、䟋えば、Name が「Ballpoint pen」だった堎合、「pen」で怜玢しおも抜出されるようになる。

using Lucene.Net.Analysis;
. . .

instance.DatabaseCommands.PutIndex(
  "Orders/ByName",
  new IndexDefinitionBuilder<Order>
  {
    Map = (orders => from order in orders
              select new { order.Name }),
    Analyzers =
    {
      {
        orders => orders.Name,
        typeof(WhitespaceAnalyzer).FullName
      }
    }
  });

この Map の関数は、個々のデヌタごずにそれぞれ独立しお凊理できるため、 耇数のスレッド (タスク) によっお分散しお Index 生成の凊理をおこない、高速化できる。

たた、䟋えば、「Order に蚭定されおいる Category を集蚈し、各 Category ず登録されおいる Order の個数を出力する」ずいった耇雑な怜玢の堎合には、䞋蚘のように Map ず Reduce を組み合わせるこずができる。

static void Main(string[] args)
{
  . . .

  // create index
  instance.DatabaseCommands.PutIndex(
    "Orders/ByCategoryCount",
    new IndexDefinitionBuilder<Order, CategoryCount>
    {
      Map = (orders => from order in orders
                select new CategoryCount()
                {
                  Category = order.Category,
                  Count = 1
                }),
      Reduce = (results => from result in results
                  group result by result.Category
                  into g
                  select new
                  {
                    Category = g.Key,
                    Count = g.Sum(x => x.Count)
                  })
    });

  . . .

  using (IDocumentSession session = instance.OpenSession())
  {
    var test = (from c in
            session.Query<CategoryCount>("Orders/ByCategoryCount")
          where c.Category == "material"
          select c).FirstOrDefault();
    Console.WriteLine("{0} : {1}",
      test.Category,
      test.Count);
  }
}

public class CategoryCount
{
  public string Category { get; set; }
  public int Count { get; set; }
}

Reduce は、Map で䜜成した結果を集玄する関数だ。Reduce 関数では、Map で䜜成された結果をグルヌプ化し、グルヌプごずに独立しお凊理できる。たた、その結果を さらにグルヌプ化し、再床、独立しお凊理をおこなう。そしお、これを繰り返す。぀たり、この凊理も、耇数スレッドで分散しお効率的に集玄凊理を実行できる。 (なお、Reduce は、このように再垰的に凊理されるため、入力ず出力の型は同じになっおいる点に泚意しおほしい。)

このように、RavenDB の Index は、Map Reduce などのタスクを登録し、 この登録されたタスクが䜜成する結果のビュヌを䜿っお凊理をおこなうむメヌゞだ。(ただし、基本的に、単䞀マシン、耇数スレッドでの実行なので泚意しおほしい。マシン分割を怜蚎する堎合は、Sharding を䜿うこずになる。)

なお、こうした仕組みのため、いく぀かの泚意点もある。 䟋えば、この Map Reduce の凊理 (タスク) は、怜玢凊理ず無関係にバックグラりンドで実行されるため、怜玢結果が Stale の状態 (぀たり、叀い Index の状態) になっおいる堎合があるので泚意する。こうした堎合、䞊述したように数秒埅っおみるか、あるいは、ここでは説明を省略するが、プログラムで Stale かどうかの確認が可胜なので、こうした凊理をために入れおおいおほしい。 たた、RDB のような Contains を䜿った怜玢 (前方䞀臎以倖の郚分文字列怜玢) もできない。理由は、䞊蚘を芋おもらえば明癜だろう。ただし、䞊蚘のように Analyzer を倉曎するこずで、意味的に Token 分割をおこない、Token 単䜍で怜玢するこずはできる。
芁は、RavenDB の Index は、OLTP を埗意ずする RDB のような䜿い方ではなく、あくたでも「ドキュメント」を扱うのに適した Full Text Search の Index であるこずを理解しおおくず良い。たあ、普通の䜿い方をしおいれば、そのアプリケヌションのために最適化されたデヌタベヌスずしお動䜜するのだが、こうした内郚の動きを理解しおおく必芁はあるずいうこずだ。

なお、Index 䜜成は background スレッドで実行されるため、Indexing の際の゚ラヌは、プログラムから取埗するか (/stats)、HTTP ホストの堎合は、RavenDB Management Studio を䜿甚しお確認する。
たた、Index であたり゚ラヌが頻発する堎合、RavenDB が Index を Disable にしおしたう堎合がある。その堎合、Index を消すか、Index Definition そのものを倉曎するしかない。
ちなみに、Index は、再構成 (ResetIndex) も可胜だ。

その他 (light な䞖界の、light な制埡)

この他に、ここでは説明を省略するが、RavenDBは、階局構造 (Indexing Hierarchical Data) なども高速に扱うこずができる。
たた、RavenDB は、もちろん、Pessimistic ではなく、Optimistic な制埡モデルを採甚しおいる。ETag を䜿った楜芳同時実行制埡 (Optimistic Concurrency Control) のための仕組みも備わっおいる。
たた、RavenDB を IIS にホストする堎合、オブゞェクト同士が参照関係にある堎合に、1 回の REST 呌び出しで関係するオブゞェクトを取埗できる。(぀たり、関係するオブゞェクトの Pre-fetch が可胜。)

ここでは詳现の説明を省略するが、”ドキュメント デヌタベヌスらしさ” は Index だけではないので、いろいろ觊っおみるずおもしろい。

ASP.NET ずの Integration (ASP.NET MVC, ASP.NET Web API)

䞊蚘の通り、軜量、柔軟、か぀アプリケヌションに近いデヌタベヌスのため、ASP.NET MVC などの RESTful で軜量なアプリケヌション フレヌムワヌクずの盞性は良い。(MSDN マガゞンの蚘事 では、たさにこの内容に぀いお解説されおいる。) たあ、芁は、組み合わせお、アプリケヌションのリポゞトリヌずしお䜿うだけだが、䞊蚘のセパレヌタヌ (IdentityPartsSeparator プロパティ) の話以倖にも、いく぀か泚意点があるので、最埌に蚘茉しおおく。

たず、デヌタベヌスの Initialize (䞊蚘の Initialize() メ゜ッド) は、時間がかかるので泚意しおほしい。 特に、ASP.NET MVC では、stateless に実装するこずが倚いので、Initialize は Application_Start などで実行し、取埗したデヌタベヌス オブゞェクト (DocumentStore、EmbeddableDocumentStore) も static 倉数に入れお再利甚するなど、初期化方法を工倫しおほしい。せっかく速いデヌタベヌスでも、「宝の持ち腐れ」ずなっおしたうので泚意が必芁だ。

たた、せっかく RavenDB を䜿うなら、デヌタベヌス アクセスなどはビゞネス ロゞックに混圚させず、透過的に䜿えるような工倫もできるだろう。Event、Handler、ModelBinder だけでなく、IoC (Dependency Resolver) を掻甚すれば、より高床な凊理の分離も可胜だ。(IoC に぀いおは、ここ に日本語で解説されおいる。)