Elasticsearch .net client NEST 5.x 使用總結
阿新 • • 發佈:2020-09-21
demo原始碼https://github.com/huhangfei/NestDemos
本文是針對NEST 5.X的使用的總結。
引用
NEST.dll Elasticsearch.Net.dll Newtonsoft.Json.dll
除錯
- 本地debug時 可以通過fiddler抓取到請求es服務的http請求。
- 也可可以在程式碼中抓取到request和response
var settings = new ConnectionSettings(pool); //在建立client時開啟設定; //正式環境建議關閉,佔用資源 settings.DisableDirectStreaming(true); var client=new ElasticClient(settings); var result=client.Search(....); var requestStr = System.Text.Encoding.Default.GetString(result.ApiCall.RequestBodyInBytes); var responseStr = System.Text.Encoding.Default.GetString(result.ApiCall.ResponseBodyInBytes); _log.Debug(requestStr + " " + responseStr);
儲存結構:
在Elasticsearch中,文件(Document)歸屬於一種型別(type),而這些型別存在於索引(index)中.
類比傳統關係型資料庫:
Relational DB -> Databases -> Tables -> Rows -> Columns Elasticsearch -> Indices -> Types -> Documents -> Fields
DB使用過程:建立資料庫->建立表(主要是設定各個欄位的屬性)->寫入數 ES使用過程:建立索引->為索引maping一個Type(同樣是設定型別中欄位的屬性)->寫入數
客戶端語法
鏈式lambda 表示式( powerful query DSL)語法
s => s.Query(q => q .Term(p => p.Name, "elasticsearch") )
物件初始化語法
var searchRequest = new SearchRequest<VendorPriceInfo> { Query = new TermQuery { Field = "name", Value = "elasticsearch" } };
Connection連結
//單node Var node = new Uri(“……”); var settings = new ConnectionSettings(node); //多uris Var uris = new Uri [] { new Uri(“……”), new Uri(“……”) }; var pool = new StaticConnectionPool(uris); //多node Var nodes = new Node [] { new Node (new Uri(“……”)), new Node (new Uri(“……”)) }; //連結池 var pool = new StaticConnectionPool(nodes); var settings = new ConnectionSettings(pool); var client = new ElasticClient(settings);
注:nest預設欄位名首字母小寫,如果要設定為與Model中一致,在建立client時按如下設定。(強烈建議使用該設定,避免造成欄位不一致)
var settings = new ConnectionSettings(node).DefaultFieldNameInferrer((name) => name);
Connection Settings
var settings = new ConnectionSettings(pool); //驗證 未開啟 //settings.BasicAuthentication("username", "password"); //驗證證書 //settings.ClientCertificate(""); //settings.ClientCertificates(new X509CertificateCollection()); //settings.ServerCertificateValidationCallback(); //開啟 第一次使用時進行嗅探,需連結池支援 //settings.SniffOnStartup(false); //連結最大併發數 //settings.ConnectionLimit(80); //標記為死亡節點的超時時間 //settings.DeadTimeout(new TimeSpan(10000)); //settings.MaxDeadTimeout(new TimeSpan(10000)); //最大重試次數 //settings.MaximumRetries(5); //重試超時時間 預設是RequestTimeout //settings.MaxRetryTimeout(new TimeSpan(50000)); //禁用代理自動檢測 //settings.DisableAutomaticProxyDetection(true); //禁用ping 第一次使用節點或使用被標記死亡的節點進行ping settings.DisablePing(false); //ping 超時設定 //settings.PingTimeout(new TimeSpan(10000)); //選擇節點 //settings.NodePredicate(node => //{ // // return true; // //}); //預設操作索引 //settings.DefaultIndex(""); //欄位名規則 與model欄位同名 //settings.DefaultFieldNameInferrer(name => name); //根據Type 獲取型別名 //settings.DefaultTypeNameInferrer(name => name.Name); //請求超時設定 //settings.RequestTimeout(new TimeSpan(10000)); //除錯資訊 settings.DisableDirectStreaming(true); //除錯資訊 //settings.EnableDebugMode((apiCallDetails) => //{ // //請求完成 返回 apiCallDetails //}); //丟擲異常 預設false,錯誤資訊在每個操作的response中 settings.ThrowExceptions(true); //settings.OnRequestCompleted(apiCallDetails => //{ // //請求完成 返回 apiCallDetails //}); //settings.OnRequestDataCreated(requestData => //{ // //請求的資料建立完成 返回請求的資料 //}); return new ElasticClient(settings);
不同的連線池型別
//支援ping 說明能夠發現節點的狀態 //支援嗅探 說明能夠發現新的節點 //應用於已知叢集,請求時隨機請求各個正常節點,支援ping 不支援嗅探 IConnectionPool pool = new StaticConnectionPool(nodes); //推薦使用 //IConnectionPool pool=new SingleNodeConnectionPool(nodes[0]); //可動態嗅探叢集 ,隨機請求 支援嗅探、ping //IConnectionPool pool = new SniffingConnectionPool(nodes); //選擇一個可用節點作為請求主節點,支援ping 不支援嗅探 //IConnectionPool pool = new StickyConnectionPool(nodes); //選擇一個可用節點作為請求主節點,支援ping 支援嗅探 //IConnectionPool pool=new StickySniffingConnectionPool(nodes);
操作目標索引/型別選擇
指定索引
//執行操作時指定索引 client.Search<VendorPriceInfo>(s => s.Index("test-index")); client.Index(data,o=>o.Index("test-index")); ....
指定型別
預設型別為索引資料的類名(自動轉換為全小寫,規則可自定義)。
如果特性設定Name[ElasticsearchType(Name = “datatype”)]
則使用該名稱。
//主動指定 client.Index(data, o => o.Type(new TypeName() { Name = "datatype", Type = typeof(VendorPriceInfo) }));
資料模型特性
特性可以設定資料在es中的型別、名稱、是否索引、分詞、格式化等資訊。
應用於第一次建立索引後進行對映時。
重要特性:
-
[ElasticsearchType(Name = “文件的型別”,IdProperty = “文件的唯一鍵欄位名”)]
-
[Number(NumberType.Long,Name = “Id”)]
數字型別 +名稱 -
[Keyword(Name = “Name”,Index = true)]
不需要分詞的字串,name=名稱,index=是否建立索引 -
[Text(Name = “Dic”, Index = true,Analyzer = “ik_max_word”)]
需要分詞的字串,name=名稱,index=是否建立索引,Analyzer=分詞器
/// <summary> /// 5.x 特性 /// </summary> [ElasticsearchType(Name = "TestModel5",IdProperty = "Id")] public class TestModel5 { [Number(NumberType.Long,Name = "Id")] public long Id { get; set; } /// <summary> /// keyword 不分詞 /// </summary> [Keyword(Name = "Name",Index = true)] public string Name { get; set; } /// <summary> /// text 分詞,Analyzer = "ik_max_word" /// </summary> [Text(Name = "Dic", Index = true)] public string Dic { get; set; } [Number(NumberType.Integer,Name = "State")] public int State { get; set; } [Boolean(Name = "Deleted")] public bool Deleted { get; set; } [Date(Name = "AddTime")] public DateTime AddTime { get; set; } [Number(NumberType.Float,Name = "PassingRate")] public float PassingRate { get; set; } [Number(NumberType.Double, Name = "Dvalue")] public double Dvalue { get; set; } }
索引操作
建立
client.CreateIndex("test2"); //基本配置 IIndexState indexState=new IndexState() { Settings = new IndexSettings() { NumberOfReplicas = 1,//副本數 NumberOfShards = 5//分片數 } }; //建立索引 先不maping client.CreateIndex("test2", p => p.InitializeUsing(indexState)); //建立並Mapping client.CreateIndex("test-index3", p => p.InitializeUsing(indexState).Mappings(m => m.Map<VendorPriceInfo>(mp => mp.AutoMap())));
注:索引名稱必須小寫
判斷
client.IndexExists("test2");
刪除
client.DeleteIndex("test2");
索引建立、maping、設定別名、別名操作
/// <summary> /// 建立索引 /// </summary> private void CreateIndex(string indexName) { if (!_client.IndexExists(indexName).Exists) { IndexState indexState = new IndexState { Settings = new IndexSettings { NumberOfReplicas = _replicas, //副本數 NumberOfShards = _shards //分片數 } }; //建立並設定 _client.CreateIndex(indexName, p => p .InitializeUsing(indexState) .Mappings(m => m.Map<EsDataModel>(mps => mps.AutoMap())) .Aliases(a => a.Alias(_indexAliase + "_manager")) ); //map //_client.Map<EsDataModel>(m => m.Index(indexName).AutoMap()); #region 別名操作 Action addAlias = () => { _client.Alias(a => a.Add(d => d.Index(indexName).Alias(_indexAliase))); }; //該別名是否存在 if (!_client.AliasExists(s => s.Name(_indexAliase)).Exists) { addAlias(); return; } var result = _client.GetAlias(a => a.Name(_indexAliase)); //該別名下所有 索引 if (result.Indices == null) { addAlias(); return; } var indices = result.Indices.Select(index => index.Key).Select(dummy => (IndexName)dummy).ToArray(); //該別名下所有 索引 if (indices.Length <=0) { addAlias(); return; } //刪除其它老的索引的別名 //新增到新的索引上 Func<AliasRemoveDescriptor, IAliasRemoveAction> removeSelector = d => { foreach (var index in indices) { d.Alias(_indexAliase).Index(index.Name); } return d; }; _client.Alias(a => a // 刪除 別名 .Remove(removeSelector) //新增 別名 .Add(d => d.Index(indexName).Alias(_indexAliase)) ); #endregion } }
對映
如果建立索引時沒有進行maping操作,可以再單獨maping,已經確定型別的欄位無法更改,可以新增。
//根據物件型別自動對映 var result = _client.Map<TestModel5>(m => m.AutoMap()); //手動指定 var result1 = _client.Map<TestModel5>(m => m.Properties(p => p.Keyword(s => s.Name(n => n.Name).Index(true))));//Keyword 型別
新增對映欄位
//新增欄位 var result = _client.Map<TestModel5>(m => m .Index(indexName) .Properties(p => p .Keyword(s => s .Name("NewField") .Index(true)) .Text(s=>s .Name("NewFieldText") .Index(false)) ) );
注:對映時已存在的欄位將無法重新對映,只有新加的欄位能對映成功。所以最好在首次建立索引後先進性對映再索引資料。
注:對映時同一索引中,多個型別中如果有相同欄位名,那麼在索引時可能會出現問題(會使用第一個對映型別)。
注:如果沒有特殊需求,且欄位沒有過多的重疊,一個索引建議只存放一個型別的資料。
資料
新增單條資料
//寫入資料,指定索引 _client.Index(data, s => s.Index(indexName)); //指定索引、型別 _client.Index(data,s=>s.Index(indexName).Type("TestModel5")); //寫入資料,指定索引 _client.IndexMany(datas, indexName); //指定索引、型別 _client.IndexMany(datas, indexName, "TestModel5");
刪除資料
DocumentPath<TestModel5> deletePath = new DocumentPath<TestModel5>(7); _client.Delete(deletePath,s=>s.Index(indexName)); _client.Delete(deletePath,s=>s.Index(indexName).Type(typeof(TestModel5))); _client.Delete(deletePath,s=>s.Index(indexName).Type("TestModel5")); IDeleteRequest request = new DeleteRequest(indexName, typeof(TestModel5), 7); _client.Delete(request); //1.x中有 2.x中需要安裝外掛 5.x中又回來了 _client.DeleteByQuery<TestModel5>( s =>s .Index(indexName) .Type("TestModel5") .Query(q =>q.Term(tm => tm.Field(fd => fd.State).Value(1))));
更新資料
更新所有欄位
DocumentPath<VendorPriceInfo> deletePath=new DocumentPath<VendorPriceInfo>(2); Var response=client.Update(deletePath,(p)=>p.Doc(new VendorPriceInfo(){vendorName = "test2update..."})); //或 IUpdateRequest<VendorPriceInfo, VendorPriceInfo> request = new UpdateRequest<VendorPriceInfo, VendorPriceInfo>(deletePath) { Doc = new VendorPriceInfo() { priceID = 888, vendorName = "test4update........" } }; var response = client.Update<VendorPriceInfo, VendorPriceInfo>(request);
更新部分欄位
IUpdateRequest<VendorPriceInfo, VendorPriceInfoP> request = new UpdateRequest<VendorPriceInfo, VendorPriceInfoP>(deletePath) { Doc = new VendorPriceInfoP() { priceID = 888, vendorName = "test4update........" } }; var response = client.Update(request);
更新部分欄位
IUpdateRequest<VendorPriceInfo, object> request = new UpdateRequest<VendorPriceInfo, object>(deletePath) { Doc = new { priceID = 888, vendorName = " test4update........" } }; var response = client.Update(request); //或 client.Update<VendorPriceInfo, object>(deletePath, upt => upt.Doc(new { vendorName = "ptptptptp" }));
注:更新時根據唯一id更新
更新時使用本版號加鎖機制
//查詢到版本號 var result = _client.Search<TestModel5>( s => s.Index(indexName) .Query(q => q.Term(tm => tm.Field(fd=>fd.State).Value(1))).Size(1) .Version()//結果中包含版本號 ); foreach (var s in result.Hits) { Console.WriteLine(s.Id + " - " + s.Version); } var path = new DocumentPath<TestModel5>(1); //更新時帶上版本號 如果服務端版本號與傳入的版本好相同才能更新成功 var response = _client.Update(path, (p) => p .Index(indexName) .Type(typeof(TestModel5)) .Version(2)//限制es中版本號為2時才能成功 .Doc(new TestModel5() { Name = "測測測" + DateTime.Now }) );
搜尋
基本搜尋
var result = _client.Search<TestModel5>( s => s .Explain() //引數可以提供查詢的更多詳情。 .FielddataFields(fs => fs //對指定欄位進行分析 .Field(p => p.Name) .Field(p => p.Dic) ) .From(0) //跳過的資料個數 .Size(50) //返回資料個數 .Query(q => q.Term(p => p.State, 100) // 主要用於精確匹配哪些值,比如數字,日期,布林值或 not_analyzed的字串(未經分析的文字資料型別): && q.Term(p => p.Name.Suffix("temp"), "姓名") //用於自定義屬性的查詢 && q.Bool( //bool 查詢 b => b //must should mushnot .Must(mt => mt //所有分句必須全部匹配,與 AND 相同 .TermRange(p => p.Field(f => f.State).GreaterThan("0").LessThan("1"))) //指定範圍查詢 .Should(sd => sd //至少有一個分句匹配,與 OR 相同 .Term(p => p.State, 32915), sd => sd.Terms(t => t.Field(fd => fd.State).Terms(new[] { 10, 20, 30 })), //多值 //|| //sd.Term(p => p.priceID, 1001) //|| //sd.Term(p => p.priceID, 1005) sd => sd.TermRange(tr => tr.GreaterThan("10").LessThan("12").Field(f => f.State)), //出入的時間必須指明時區 sd => sd.DateRange(tr => tr.GreaterThan(DateTime.Now.AddDays(-1)).LessThan(DateTime.Now).Field(f => f.CreateTime)) ) .MustNot(mn => mn//所有分句都必須不匹配,與 NOT 相同 .Term(p => p.State, 1001) , mn => mn.Bool( bb => bb.Must(mt => mt .Match(mc => mc.Field(fd => fd.Name).Query("至尊")) )) ) ) )//查詢條件 .Sort(st => st.Ascending(asc => asc.Id))//排序 //返回特定的欄位 //注:2.x是sc.Include .Source(sc => sc.Includes(ic => ic .Fields( fd => fd.Name, fd => fd.Id, fd => fd.CreateTime))) );
分頁、 深度分頁
搜尋時通過from+size控制分頁,但是由於底層機制,深度分頁將造成更大的效能消耗。所以es預設限制from+size⇐10000
想要更深的分頁,只能通過上頁結果作為條件進行翻頁。
var response=_client.Search<TestModel5>(s => s.Query(q => q.Term(t => t.Field(fd => fd.State).Value(1))) .Size(1000) .Sort(st => st.Descending(ds => ds.Id)) .SearchAfter(new object[] { 10,//上一次結果排序的最後ID值 //可以是多個排序欄位的值 }));
掃描和滾屏(用於非實時的獲取大量資料)
5.x中支援併發掃描
Action<int> sc1 = (id) => { string scrollid = ""; //todo:5.x 多了Slice設定 移除SearchType.Scan var result = _client.Search<TestModel5>(s => s.Index(indexName).Query(q => q.MatchAll()) .Size(15) .Sort(st=>st.Descending(ds=>ds.Id)) .Scroll("1m") //id從0開始 0,1,2... //length=max //例:max=3 id=0,id=1,id=2 .Slice(sl => sl.Id(id).Max(3)) ); //得到滾動掃描的id scrollid = result.ScrollId; foreach (var info in result.Documents) { Console.WriteLine(info.Id + " - " + " -批次count " + result.Documents.Count + " - 執行緒"+Thread.CurrentThread.ManagedThreadId); } while (true) { //執行滾動掃描得到資料 返回資料量是 result.Shards.Successful*size(查詢成功的分片數*size) var result1 = _client.Scroll<TestModel5>("1m", scrollid); if (result1.Documents == null || !result1.Documents.Any()) break; foreach (var info in result1.Documents) { Console.WriteLine(info.Id + " - " +" -批次count "+ result1.Documents.Count+ " - 執行緒" + Thread.CurrentThread.ManagedThreadId); } //得到新的id scrollid = result1.ScrollId; } }; var t1= Task.Factory.StartNew(() => { sc1(0); }); var t2= Task.Factory.StartNew(() => { sc1(1); }); var t3= Task.Factory.StartNew(() => { sc1(2); }); t1.Wait(); t2.Wait(); t3.Wait();
多查詢、排序條件拼接
bool useStateDesc = true; //must 條件 var mustQuerys = new List<Func<QueryContainerDescriptor<TestModel5>, QueryContainer>>(); //Deleted mustQuerys.Add(mt => mt.Term(tm => tm.Field(fd => fd.Deleted).Value(false))); //CreateTime mustQuerys.Add(mt => mt.DateRange(tm => tm.Field(fd => fd.CreateTime).GreaterThanOrEquals(DateTime.Now.AddDays(-1)).LessThanOrEquals(DateTime.Now))); //should 條件 var shouldQuerys = new List<Func<QueryContainerDescriptor<TestModel5>, QueryContainer>>(); //state shouldQuerys.Add(mt => mt.Term(tm => tm.Field(fd => fd.State).Value(1))); shouldQuerys.Add(mt => mt.Term(tm => tm.Field(fd => fd.State).Value(2))); //排序 Func<SortDescriptor<TestModel5>, IPromise<IList<ISort>>> sortDesc = sd => { //根據分值排序 sd.Descending(SortSpecialField.Score); //排序 if (useStateDesc) sd.Descending(d => d.State); else sd.Descending(d => d.Id); return sd; }; var result2 =_client.Search<TestModel5>(s => s .Index(indexName) .Query(q => q.Bool(b => b.Must(mustQuerys).Should(shouldQuerys))) .Size(100) .From(0) .Sort(sortDesc) );
得分控制
//使用functionscore計算得分 var result1 = _client.Search<TestModel5>(s => s .Query(q => q.FunctionScore(f => f //查詢區 .Query(qq => qq.Term(t => t.Field(fd => fd.State).Value(1)) || qq.Term(t => t.Field(fd => fd.State).Value(2)) ) .Boost(1.0) //functionscore 對分值影響 .BoostMode(FunctionBoostMode.Replace)//計算boost 模式 ;Replace為替換 .ScoreMode(FunctionScoreMode.Sum) //計算score 模式;Sum為累加 //邏輯區 .Functions(fun => fun .Weight(w => w.Weight(3).Filter(ft => ft .Term(t => t.Field(fd => fd.State).Value(1))))//匹配cityid +3 .Weight(w => w.Weight(2).Filter(ft => ft .Term(t => t.Field(fd => fd.State).Value(2))))//匹配pvcid +2 ) ) ) .Size(3000) .Sort(st => st.Descending(SortSpecialField.Score)) ); //結果中 State=1,得分=3; State=2 ,得分=2 ,兩者都滿足的,得分=5
聚合
聚合-基本
var result = _client.Search<TestModel5>(s => s .Index(indexName) .From(0) .Size(15) .Aggregations(ag => ag .ValueCount("Count", vc => vc.Field(fd => fd.Id))//總數 .Sum("vendorPrice_Sum", su => su.Field(fd => fd.Id))//求和 .Max("vendorPrice_Max", m => m.Field(fd => fd.Id))//最大值 .Min("vendorPrice_Min", m => m.Field(fd => fd.Id))//最小值 .Average("vendorPrice_Avg", avg => avg.Field(fd => fd.Id))//平均值 .Terms("vendorID_group", t => t.Field(fd => fd.Id).Size(100))//分組 ) );
聚合-分組
var result = _client.Search<TestModel5>(s => s .Index(indexName) .Size(0) .Aggregations(ag => ag .Terms("Group_group", //Group 分組 t => t.Field(fd => fd.Group) .Size(100) .Aggregations(agg => agg .Terms("Group_state_group", //Group_state tt => tt.Field(fd => fd.State) .Size(50) .Aggregations(aggg => aggg .Average("g_g_Avg", av => av.Field(fd => fd.Dvalue))//Price avg .Max("g_g_Max", m => m.Field(fd => fd.Dvalue))//Price max .Min("g_g_Min", m => m.Field(fd => fd.Dvalue))//Price min .ValueCount("g_g_Count", m => m.Field(fd => fd.Id))//總記錄數 ) ) .Cardinality("g_count", dy => dy.Field(fd => fd.State))//分組數量 .ValueCount("g_Count", c => c.Field(fd => fd.Id)) ) ) .Cardinality("vendorID_group_count", dy => dy.Field(fd => fd.Group))//分組數量 .ValueCount("Count", c => c.Field(fd => fd.Id))//總記錄數 ) //分組 );
複雜聚合分組及結果解析
var mustQuerys = new List<Func<QueryContainerDescriptor<TestModel5>, QueryContainer>>(); mustQuerys.Add(t => t.Term(f => f.Deleted, false)); var result = _client.Search<TestModel5>( s => s.Index(indexName) .Query(q => q .Bool(b => b.Must(mustQuerys)) ) .Size(0) .Aggregations(ag => ag .Terms("Group_Group", tm => tm .OrderDescending("Dvalue_avg")//使用平均值排序 desc .Field(fd => fd.Group) .Size(100) .Aggregations(agg => agg .TopHits("top_test_hits", th => th.Sort(srt => srt.Field(fd => fd.Dvalue).Descending()).Size(1))//取出該分組下按dvalue分組 .Max("Dvalue_Max", m => m.Field(fd => fd.Dvalue)) .Min("Dvalue_Min", m => m.Field(fd => fd.Dvalue)) .Average("Dvalue_avg", avg => avg.Field(fd => fd.Dvalue))//平均值 ) ) ) ); var vendorIdGroup = (BucketAggregate)result.Aggregations["VendorID_Group"]; foreach (var bucket1 in vendorIdGroup.Items) { var bucket = (KeyedBucket<TestModel5>)bucket1; var maxPrice = ((ValueAggregate)bucket.Aggregations["vendorPrice_Max"]).Value; var minPrice = ((ValueAggregate)bucket.Aggregations["vendorPrice_Min"]).Value; var sources = ((TopHitsAggregate)bucket.Aggregations["top_vendor_hits"]).Documents<TestModel5>().ToList(); var data = sources.FirstOrDefault(); }